Spaces:
Running
Running
File size: 3,188 Bytes
d9764fe 84481d5 d9764fe bcd9850 d9764fe ec9fbf7 d9764fe 84481d5 d9764fe bcd9850 84481d5 fc749f2 6582396 84481d5 4e584a6 84481d5 4e584a6 bcd9850 d9764fe 84481d5 d13041b d9764fe 2bf48c4 d9764fe 84481d5 d9764fe 2bf48c4 06a4ed9 2bf48c4 d9764fe 57db935 fc749f2 84481d5 d9764fe 84481d5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
import json
import os
import requests
from kafka import KafkaConsumer
from get_gpt_answer import GetGPTAnswer
from typing import List
from concurrent.futures import ThreadPoolExecutor
from predict_custom_model import predict_custom_trained_model
from google.protobuf.json_format import MessageToDict
def get_gpt_responses(data: dict[str, any], gpt_helper: GetGPTAnswer):
data["gpt4o_answer"] = gpt_helper.generate_gpt4o_answer(data["question"])
return data
def process_batch(batch: List[dict[str, any]], batch_size: int, gpt_helper: GetGPTAnswer):
with ThreadPoolExecutor(max_workers=batch_size) as executor:
futures = [executor.submit(
get_gpt_responses, data, gpt_helper) for data in batch]
results = [future.result() for future in futures]
predictions = predict_custom_trained_model(
instances=results, project=os.environ.get("PROJECT_ID"), endpoint_id=os.environ.get("ENDPOINT_ID"))
results = []
for prediction in predictions:
result_dict = {}
for key, value in prediction._pb.items():
# Ensure that 'value' is a protobuf message
if hasattr(value, 'DESCRIPTOR'):
result_dict[key] = MessageToDict(value)
else:
print(f"Item {key} is not a convertible protobuf message.")
results.append(result_dict)
return results
def send_results_back(full_results: dict[str, any], job_application_id: str):
print(f"Sending results back with job_app_id {job_application_id}")
url = "https://ta-2-sistem-cerdas-be-vi2jkj4riq-et.a.run.app/api/anti-cheat/result"
headers = {
"Content-Type": "application/json",
"x-api-key": os.environ.get("X-API-KEY")
}
body = {
"job_application_id": job_application_id,
"evaluations": full_results
}
response = requests.patch(url, json=body, headers=headers)
print(f"Data sent with status code {response.status_code}")
print(response.content)
def consume_messages():
consumer = KafkaConsumer(
"ai-detector",
bootstrap_servers=[os.environ.get("KAFKA_IP")],
auto_offset_reset='earliest',
client_id="ai-detector-1",
group_id="ai-detector",
api_version=(0, 10, 2)
)
print("Successfully connected to Kafka at", os.environ.get("KAFKA_IP"))
BATCH_SIZE = 5
gpt_helper = GetGPTAnswer()
for message in consumer:
try:
incoming_message = json.loads(message.value.decode("utf-8"))
full_batch = incoming_message["data"]
except json.JSONDecodeError:
print("Failed to decode JSON from message:", message.value)
print("Continuing...")
continue
print("Parsing successful. Processing job_app_id {0}".format(
incoming_message['job_application_id']))
full_results = []
for i in range(0, len(full_batch), BATCH_SIZE):
batch = full_batch[i:i+BATCH_SIZE]
batch_results = process_batch(batch, BATCH_SIZE, gpt_helper)
full_results.extend(batch_results)
send_results_back(full_results, incoming_message["job_application_id"])
|