File size: 1,655 Bytes
d9764fe
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2bf48c4
 
d9764fe
 
 
2bf48c4
 
 
 
 
 
d9764fe
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import json
import os
from kafka import KafkaConsumer
from get_gpt_answer import GetGPTAnswer
from typing import List
from concurrent.futures import ThreadPoolExecutor


def get_gpt_responses(data: dict[str, any], gpt_helper: GetGPTAnswer):
    # data["gpt35_answer"] = gpt_helper.generate_gpt35_answer(data["question"])
    # data["gpt4_answer"] = gpt_helper.generate_gpt4_answer(data["question"])
    data["gpt35_answer"] = "This is gpt35 answer"
    data["gpt4_answer"] = "This is gpt4 answer"
    return data


def process_batch(batch: List[dict[str, any]], batch_size: int):
    with ThreadPoolExecutor(max_workers=batch_size) as executor:
        gpt_helper = GetGPTAnswer()
        futures = [executor.submit(
            get_gpt_responses, data, gpt_helper) for data in batch]
        results = [future.result() for future in futures]

    print("Batch ready with gpt responses", results)


def consume_messages():
    consumer = KafkaConsumer(
        "ai-detector",
        bootstrap_servers=[os.environ.get("KAFKA_IP")],
        auto_offset_reset='earliest',
        client_id="ai-detector-1",
        group_id=None,
    )

    print("Successfully connected to Kafka at", os.environ.get("KAFKA_IP"))

    BATCH_SIZE = 5

    for message in consumer:
        try:
            full_batch = json.loads(message.value.decode("utf-8"))
        except json.JSONDecodeError:
            print("Failed to decode JSON from message:", message.value)
            print("Continuing...")
            continue

        for i in range(0, len(full_batch), BATCH_SIZE):
            batch = full_batch[i:i+BATCH_SIZE]
            process_batch(batch, BATCH_SIZE)