Yakobus Iryanto Prasethio commited on
Commit
6b1f3cb
·
unverified ·
2 Parent(s): 8b0574e 57db935

Merge pull request #12 from YakobusIP/main

Browse files

Move production ready deployment from main to production

public-prediction/get_gpt_answer.py ADDED
@@ -0,0 +1,17 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from langchain_openai import ChatOpenAI
2
+ from langchain_core.messages import HumanMessage, SystemMessage
3
+
4
+
5
+ class GetGPTAnswer:
6
+ def __init__(self):
7
+ self.llm_gpt4o = ChatOpenAI(model="gpt-4o")
8
+
9
+ def generate_gpt4o_answer(self, question: str):
10
+ messages = [
11
+ SystemMessage(
12
+ content="Please answer the following question based solely on your internal knowledge, without external references. Assume you are the human."),
13
+ HumanMessage(question)
14
+ ]
15
+
16
+ gpt4_answer = self.llm_gpt4o.invoke(messages)
17
+ return gpt4_answer.content
public-prediction/kafka_consumer.py ADDED
@@ -0,0 +1,90 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ import os
3
+ import requests
4
+ from kafka import KafkaConsumer
5
+ from get_gpt_answer import GetGPTAnswer
6
+ from typing import List
7
+ from concurrent.futures import ThreadPoolExecutor
8
+ from predict_custom_model import predict_custom_trained_model
9
+ from google.protobuf.json_format import MessageToDict
10
+
11
+
12
+ def get_gpt_responses(data: dict[str, any], gpt_helper: GetGPTAnswer):
13
+ data["gpt4o_answer"] = gpt_helper.generate_gpt4o_answer(data["question"])
14
+ return data
15
+
16
+
17
+ def process_batch(batch: List[dict[str, any]], batch_size: int, gpt_helper: GetGPTAnswer):
18
+ with ThreadPoolExecutor(max_workers=batch_size) as executor:
19
+ futures = [executor.submit(
20
+ get_gpt_responses, data, gpt_helper) for data in batch]
21
+ results = [future.result() for future in futures]
22
+
23
+ predictions = predict_custom_trained_model(
24
+ instances=results, project=os.environ.get("PROJECT_ID"), endpoint_id=os.environ.get("ENDPOINT_ID"))
25
+
26
+ results = []
27
+ for prediction in predictions:
28
+ result_dict = {}
29
+ for key, value in prediction._pb.items():
30
+ # Ensure that 'value' is a protobuf message
31
+ if hasattr(value, 'DESCRIPTOR'):
32
+ result_dict[key] = MessageToDict(value)
33
+ else:
34
+ print(f"Item {key} is not a convertible protobuf message.")
35
+ results.append(result_dict)
36
+
37
+ return results
38
+
39
+
40
+ def send_results_back(full_results: dict[str, any], job_application_id: str):
41
+ print(f"Sending results back with job_app_id {job_application_id}")
42
+ url = "https://ta-2-sistem-cerdas-be-vi2jkj4riq-et.a.run.app/api/anti-cheat/result"
43
+ headers = {
44
+ "Content-Type": "application/json",
45
+ "x-api-key": os.environ.get("X-API-KEY")
46
+ }
47
+
48
+ body = {
49
+ "job_application_id": job_application_id,
50
+ "evaluations": full_results
51
+ }
52
+
53
+ response = requests.patch(url, json=body, headers=headers)
54
+ print(f"Data sent with status code {response.status_code}")
55
+ print(response.content)
56
+
57
+
58
+ def consume_messages():
59
+ consumer = KafkaConsumer(
60
+ "ai-detector",
61
+ bootstrap_servers=[os.environ.get("KAFKA_IP")],
62
+ auto_offset_reset='earliest',
63
+ client_id="ai-detector-1",
64
+ group_id="ai-detector",
65
+ )
66
+
67
+ print("Successfully connected to Kafka at", os.environ.get("KAFKA_IP"))
68
+
69
+ BATCH_SIZE = 5
70
+ gpt_helper = GetGPTAnswer()
71
+
72
+ for message in consumer:
73
+ try:
74
+ incoming_message = json.loads(message.value.decode("utf-8"))
75
+ full_batch = incoming_message["data"]
76
+ except json.JSONDecodeError:
77
+ print("Failed to decode JSON from message:", message.value)
78
+ print("Continuing...")
79
+ continue
80
+
81
+ print("Parsing successful. Processing job_app_id {0}".format(
82
+ incoming_message['job_application_id']))
83
+
84
+ full_results = []
85
+ for i in range(0, len(full_batch), BATCH_SIZE):
86
+ batch = full_batch[i:i+BATCH_SIZE]
87
+ batch_results = process_batch(batch, BATCH_SIZE, gpt_helper)
88
+ full_results.extend(batch_results)
89
+
90
+ send_results_back(full_results, incoming_message["job_application_id"])
public-prediction/main.py ADDED
@@ -0,0 +1,6 @@
 
 
 
 
 
 
 
1
+ from kafka_consumer import consume_messages
2
+ from dotenv import load_dotenv
3
+
4
+ if __name__ == "__main__":
5
+ load_dotenv()
6
+ consume_messages()
public-prediction/predict_custom_model.py ADDED
@@ -0,0 +1,45 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import Dict, List, Union
2
+ from google.cloud import aiplatform
3
+ from google.protobuf import json_format
4
+ from google.protobuf.struct_pb2 import Value
5
+ from google.oauth2 import service_account
6
+
7
+
8
+ def predict_custom_trained_model(
9
+ project: str,
10
+ endpoint_id: str,
11
+ instances: Union[Dict, List[Dict]],
12
+ location: str = "us-central1",
13
+ api_endpoint: str = "us-central1-aiplatform.googleapis.com",
14
+ ):
15
+ """
16
+ `instances` can be either single instance of type dict or a list
17
+ of instances.
18
+ """
19
+ # The AI Platform services require regional API endpoints.
20
+ client_options = {"api_endpoint": api_endpoint}
21
+
22
+ credentials = service_account.Credentials.from_service_account_file(
23
+ "steady-climate-416810-ea1536e1868c.json")
24
+ # Initialize client that will be used to create and send requests.
25
+ # This client only needs to be created once, and can be reused for multiple requests.
26
+ client = aiplatform.gapic.PredictionServiceClient(
27
+ credentials=credentials,
28
+ client_options=client_options)
29
+ # The format of each instance should conform to the deployed model's prediction input schema.
30
+ instances = instances if isinstance(instances, list) else [instances]
31
+ instances = [
32
+ json_format.ParseDict(instance_dict, Value()) for instance_dict in instances
33
+ ]
34
+ parameters_dict = {}
35
+ parameters = json_format.ParseDict(parameters_dict, Value())
36
+ endpoint = client.endpoint_path(
37
+ project=project, location=location, endpoint=endpoint_id
38
+ )
39
+ response = client.predict(
40
+ endpoint=endpoint, instances=instances, parameters=parameters
41
+ )
42
+ # The predictions are a google.protobuf.Value representation of the model's predictions.
43
+ predictions = response.predictions
44
+
45
+ return predictions
public-prediction/requirements.txt ADDED
@@ -0,0 +1,7 @@
 
 
 
 
 
 
 
 
1
+ kafka-python
2
+ langchain
3
+ openai
4
+ langchain-openai
5
+ python-dotenv
6
+ google-cloud-aiplatform
7
+ requests