File size: 2,420 Bytes
d9764fe
 
 
 
 
 
bcd9850
 
d9764fe
 
 
bcd9850
 
d9764fe
 
 
06a4ed9
d9764fe
 
 
 
 
 
 
 
bcd9850
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d9764fe
 
 
 
 
 
 
 
 
 
2bf48c4
 
d9764fe
 
 
2bf48c4
06a4ed9
 
2bf48c4
 
 
 
d9764fe
 
 
06a4ed9
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import json
import os
from kafka import KafkaConsumer
from get_gpt_answer import GetGPTAnswer
from typing import List
from concurrent.futures import ThreadPoolExecutor
from predict_custom_model import predict_custom_trained_model
from google.protobuf.json_format import MessageToDict


def get_gpt_responses(data: dict[str, any], gpt_helper: GetGPTAnswer):
    data["gpt35_answer"] = gpt_helper.generate_gpt35_answer(data["question"])
    data["gpt4_answer"] = gpt_helper.generate_gpt4_answer(data["question"])
    return data


def process_batch(batch: List[dict[str, any]], batch_size: int, job_application_id: int):
    with ThreadPoolExecutor(max_workers=batch_size) as executor:
        gpt_helper = GetGPTAnswer()
        futures = [executor.submit(
            get_gpt_responses, data, gpt_helper) for data in batch]
        results = [future.result() for future in futures]

    print("Batch ready with gpt responses", results)

    predictions = predict_custom_trained_model(
        instances=results, project=os.environ.get("PROJECT_ID"), endpoint_id=os.environ.get("ENDPOINT_ID"))

    results = []
    for prediction in predictions:
        result_dict = {}
        for key, value in prediction._pb.items():
            # Ensure that 'value' is a protobuf message
            if hasattr(value, 'DESCRIPTOR'):
                result_dict[key] = MessageToDict(value)
            else:
                print(f"Item {key} is not a convertible protobuf message.")
        results.append(result_dict)

    print({"result": results})


def consume_messages():
    consumer = KafkaConsumer(
        "ai-detector",
        bootstrap_servers=[os.environ.get("KAFKA_IP")],
        auto_offset_reset='earliest',
        client_id="ai-detector-1",
        group_id=None,
    )

    print("Successfully connected to Kafka at", os.environ.get("KAFKA_IP"))

    BATCH_SIZE = 5

    for message in consumer:
        try:
            incoming_message = json.loads(message.value.decode("utf-8"))
            full_batch = incoming_message["data"]
        except json.JSONDecodeError:
            print("Failed to decode JSON from message:", message.value)
            print("Continuing...")
            continue

        for i in range(0, len(full_batch), BATCH_SIZE):
            batch = full_batch[i:i+BATCH_SIZE]
            process_batch(batch, BATCH_SIZE,
                          incoming_message["job_application_id"])