Spaces:
Running
Running
File size: 1,655 Bytes
d9764fe 2bf48c4 d9764fe 2bf48c4 d9764fe |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
import json
import os
from kafka import KafkaConsumer
from get_gpt_answer import GetGPTAnswer
from typing import List
from concurrent.futures import ThreadPoolExecutor
def get_gpt_responses(data: dict[str, any], gpt_helper: GetGPTAnswer):
# data["gpt35_answer"] = gpt_helper.generate_gpt35_answer(data["question"])
# data["gpt4_answer"] = gpt_helper.generate_gpt4_answer(data["question"])
data["gpt35_answer"] = "This is gpt35 answer"
data["gpt4_answer"] = "This is gpt4 answer"
return data
def process_batch(batch: List[dict[str, any]], batch_size: int):
with ThreadPoolExecutor(max_workers=batch_size) as executor:
gpt_helper = GetGPTAnswer()
futures = [executor.submit(
get_gpt_responses, data, gpt_helper) for data in batch]
results = [future.result() for future in futures]
print("Batch ready with gpt responses", results)
def consume_messages():
consumer = KafkaConsumer(
"ai-detector",
bootstrap_servers=[os.environ.get("KAFKA_IP")],
auto_offset_reset='earliest',
client_id="ai-detector-1",
group_id=None,
)
print("Successfully connected to Kafka at", os.environ.get("KAFKA_IP"))
BATCH_SIZE = 5
for message in consumer:
try:
full_batch = json.loads(message.value.decode("utf-8"))
except json.JSONDecodeError:
print("Failed to decode JSON from message:", message.value)
print("Continuing...")
continue
for i in range(0, len(full_batch), BATCH_SIZE):
batch = full_batch[i:i+BATCH_SIZE]
process_batch(batch, BATCH_SIZE)
|