bearking58 commited on
Commit
d9764fe
·
1 Parent(s): 782aa38

feat: kafka preparation and event loop

Browse files
public-prediction/get_gpt_answer.py ADDED
@@ -0,0 +1,28 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from langchain_openai import ChatOpenAI
2
+ from langchain_core.messages import HumanMessage, SystemMessage
3
+
4
+
5
+ class GetGPTAnswer:
6
+ def __init__(self):
7
+ self.llm_gpt35 = ChatOpenAI(model="gpt-3.5-turbo")
8
+ self.llm_gpt4 = ChatOpenAI(model="gpt-4-turbo")
9
+
10
+ def generate_gpt35_answer(self, question: str):
11
+ messages = [
12
+ SystemMessage(
13
+ content="Please answer the following question based solely on your internal knowledge, without external references. Assume you are the human."),
14
+ HumanMessage(question)
15
+ ]
16
+
17
+ gpt35_answer = self.llm_gpt35.invoke(messages)
18
+ return gpt35_answer.content
19
+
20
+ def generate_gpt4_answer(self, question: str):
21
+ messages = [
22
+ SystemMessage(
23
+ content="Please answer the following question based solely on your internal knowledge, without external references. Assume you are the human."),
24
+ HumanMessage(question)
25
+ ]
26
+
27
+ gpt4_answer = self.llm_gpt4.invoke(messages)
28
+ return gpt4_answer.content
public-prediction/kafka_consumer.py ADDED
@@ -0,0 +1,44 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ import os
3
+ from kafka import KafkaConsumer
4
+ from get_gpt_answer import GetGPTAnswer
5
+ from typing import List
6
+ from concurrent.futures import ThreadPoolExecutor
7
+
8
+
9
+ def get_gpt_responses(data: dict[str, any], gpt_helper: GetGPTAnswer):
10
+ # data["gpt35_answer"] = gpt_helper.generate_gpt35_answer(data["question"])
11
+ # data["gpt4_answer"] = gpt_helper.generate_gpt4_answer(data["question"])
12
+ data["gpt35_answer"] = "This is gpt35 answer"
13
+ data["gpt4_answer"] = "This is gpt4 answer"
14
+ return data
15
+
16
+
17
+ def process_batch(batch: List[dict[str, any]], batch_size: int):
18
+ with ThreadPoolExecutor(max_workers=batch_size) as executor:
19
+ gpt_helper = GetGPTAnswer()
20
+ futures = [executor.submit(
21
+ get_gpt_responses, data, gpt_helper) for data in batch]
22
+ results = [future.result() for future in futures]
23
+
24
+ print("Batch ready with gpt responses", results)
25
+
26
+
27
+ def consume_messages():
28
+ consumer = KafkaConsumer(
29
+ "ai-detector",
30
+ bootstrap_servers=[os.environ.get("KAFKA_IP")],
31
+ auto_offset_reset='earliest',
32
+ client_id="ai-detector-1",
33
+ group_id=None,
34
+ value_deserializer=lambda x: json.loads(x.decode('utf-8'))
35
+ )
36
+
37
+ BATCH_SIZE = 5
38
+
39
+ for message in consumer:
40
+ full_batch = message.value
41
+
42
+ for i in range(0, len(full_batch), BATCH_SIZE):
43
+ batch = full_batch[i:i+BATCH_SIZE]
44
+ process_batch(batch, BATCH_SIZE)
public-prediction/main.py ADDED
@@ -0,0 +1,6 @@
 
 
 
 
 
 
 
1
+ from kafka_consumer import consume_messages
2
+ from dotenv import load_dotenv
3
+
4
+ if __name__ == "__main__":
5
+ load_dotenv()
6
+ consume_messages()
public-prediction/requirements.txt ADDED
@@ -0,0 +1,5 @@
 
 
 
 
 
 
1
+ kafka-python
2
+ langchain
3
+ openai
4
+ langchain-openai
5
+ python-dotenv