bearking58's picture
fix: add api version to kafka
d13041b
raw
history blame
3.19 kB
import json
import os
import requests
from kafka import KafkaConsumer
from get_gpt_answer import GetGPTAnswer
from typing import List
from concurrent.futures import ThreadPoolExecutor
from predict_custom_model import predict_custom_trained_model
from google.protobuf.json_format import MessageToDict
def get_gpt_responses(data: dict[str, any], gpt_helper: GetGPTAnswer):
data["gpt4o_answer"] = gpt_helper.generate_gpt4o_answer(data["question"])
return data
def process_batch(batch: List[dict[str, any]], batch_size: int, gpt_helper: GetGPTAnswer):
with ThreadPoolExecutor(max_workers=batch_size) as executor:
futures = [executor.submit(
get_gpt_responses, data, gpt_helper) for data in batch]
results = [future.result() for future in futures]
predictions = predict_custom_trained_model(
instances=results, project=os.environ.get("PROJECT_ID"), endpoint_id=os.environ.get("ENDPOINT_ID"))
results = []
for prediction in predictions:
result_dict = {}
for key, value in prediction._pb.items():
# Ensure that 'value' is a protobuf message
if hasattr(value, 'DESCRIPTOR'):
result_dict[key] = MessageToDict(value)
else:
print(f"Item {key} is not a convertible protobuf message.")
results.append(result_dict)
return results
def send_results_back(full_results: dict[str, any], job_application_id: str):
print(f"Sending results back with job_app_id {job_application_id}")
url = "https://ta-2-sistem-cerdas-be-vi2jkj4riq-et.a.run.app/api/anti-cheat/result"
headers = {
"Content-Type": "application/json",
"x-api-key": os.environ.get("X-API-KEY")
}
body = {
"job_application_id": job_application_id,
"evaluations": full_results
}
response = requests.patch(url, json=body, headers=headers)
print(f"Data sent with status code {response.status_code}")
print(response.content)
def consume_messages():
consumer = KafkaConsumer(
"ai-detector",
bootstrap_servers=[os.environ.get("KAFKA_IP")],
auto_offset_reset='earliest',
client_id="ai-detector-1",
group_id="ai-detector",
api_version=(0, 10, 2)
)
print("Successfully connected to Kafka at", os.environ.get("KAFKA_IP"))
BATCH_SIZE = 5
gpt_helper = GetGPTAnswer()
for message in consumer:
try:
incoming_message = json.loads(message.value.decode("utf-8"))
full_batch = incoming_message["data"]
except json.JSONDecodeError:
print("Failed to decode JSON from message:", message.value)
print("Continuing...")
continue
print("Parsing successful. Processing job_app_id {0}".format(
incoming_message['job_application_id']))
full_results = []
for i in range(0, len(full_batch), BATCH_SIZE):
batch = full_batch[i:i+BATCH_SIZE]
batch_results = process_batch(batch, BATCH_SIZE, gpt_helper)
full_results.extend(batch_results)
send_results_back(full_results, incoming_message["job_application_id"])