File size: 3,188 Bytes
d9764fe
 
84481d5
d9764fe
 
 
 
bcd9850
 
d9764fe
 
 
ec9fbf7
d9764fe
 
 
84481d5
d9764fe
 
 
 
 
bcd9850
 
 
 
 
 
 
 
 
 
 
 
 
 
84481d5
 
 
 
fc749f2
6582396
84481d5
 
 
 
 
 
 
4e584a6
84481d5
 
 
 
4e584a6
bcd9850
d9764fe
 
 
 
 
 
 
84481d5
d13041b
d9764fe
 
2bf48c4
 
d9764fe
84481d5
d9764fe
 
2bf48c4
06a4ed9
 
2bf48c4
 
 
 
d9764fe
57db935
 
fc749f2
84481d5
d9764fe
 
84481d5
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import json
import os
import requests
from kafka import KafkaConsumer
from get_gpt_answer import GetGPTAnswer
from typing import List
from concurrent.futures import ThreadPoolExecutor
from predict_custom_model import predict_custom_trained_model
from google.protobuf.json_format import MessageToDict


def get_gpt_responses(data: dict[str, any], gpt_helper: GetGPTAnswer):
    data["gpt4o_answer"] = gpt_helper.generate_gpt4o_answer(data["question"])
    return data


def process_batch(batch: List[dict[str, any]], batch_size: int, gpt_helper: GetGPTAnswer):
    with ThreadPoolExecutor(max_workers=batch_size) as executor:
        futures = [executor.submit(
            get_gpt_responses, data, gpt_helper) for data in batch]
        results = [future.result() for future in futures]

    predictions = predict_custom_trained_model(
        instances=results, project=os.environ.get("PROJECT_ID"), endpoint_id=os.environ.get("ENDPOINT_ID"))

    results = []
    for prediction in predictions:
        result_dict = {}
        for key, value in prediction._pb.items():
            # Ensure that 'value' is a protobuf message
            if hasattr(value, 'DESCRIPTOR'):
                result_dict[key] = MessageToDict(value)
            else:
                print(f"Item {key} is not a convertible protobuf message.")
        results.append(result_dict)

    return results


def send_results_back(full_results: dict[str, any], job_application_id: str):
    print(f"Sending results back with job_app_id {job_application_id}")
    url = "https://ta-2-sistem-cerdas-be-vi2jkj4riq-et.a.run.app/api/anti-cheat/result"
    headers = {
        "Content-Type": "application/json",
        "x-api-key": os.environ.get("X-API-KEY")
    }

    body = {
        "job_application_id": job_application_id,
        "evaluations": full_results
    }

    response = requests.patch(url, json=body, headers=headers)
    print(f"Data sent with status code {response.status_code}")
    print(response.content)


def consume_messages():
    consumer = KafkaConsumer(
        "ai-detector",
        bootstrap_servers=[os.environ.get("KAFKA_IP")],
        auto_offset_reset='earliest',
        client_id="ai-detector-1",
        group_id="ai-detector",
        api_version=(0, 10, 2)
    )

    print("Successfully connected to Kafka at", os.environ.get("KAFKA_IP"))

    BATCH_SIZE = 5
    gpt_helper = GetGPTAnswer()

    for message in consumer:
        try:
            incoming_message = json.loads(message.value.decode("utf-8"))
            full_batch = incoming_message["data"]
        except json.JSONDecodeError:
            print("Failed to decode JSON from message:", message.value)
            print("Continuing...")
            continue

        print("Parsing successful. Processing job_app_id {0}".format(
            incoming_message['job_application_id']))

        full_results = []
        for i in range(0, len(full_batch), BATCH_SIZE):
            batch = full_batch[i:i+BATCH_SIZE]
            batch_results = process_batch(batch, BATCH_SIZE, gpt_helper)
            full_results.extend(batch_results)

        send_results_back(full_results, incoming_message["job_application_id"])