bearking58 commited on
Commit
84481d5
·
1 Parent(s): 562e1f8

feat: finish the pipeline to send back

Browse files
public-prediction/kafka_consumer.py CHANGED
@@ -1,5 +1,6 @@
1
  import json
2
  import os
 
3
  from kafka import KafkaConsumer
4
  from get_gpt_answer import GetGPTAnswer
5
  from typing import List
@@ -14,9 +15,8 @@ def get_gpt_responses(data: dict[str, any], gpt_helper: GetGPTAnswer):
14
  return data
15
 
16
 
17
- def process_batch(batch: List[dict[str, any]], batch_size: int, job_application_id: int):
18
  with ThreadPoolExecutor(max_workers=batch_size) as executor:
19
- gpt_helper = GetGPTAnswer()
20
  futures = [executor.submit(
21
  get_gpt_responses, data, gpt_helper) for data in batch]
22
  results = [future.result() for future in futures]
@@ -37,7 +37,23 @@ def process_batch(batch: List[dict[str, any]], batch_size: int, job_application_
37
  print(f"Item {key} is not a convertible protobuf message.")
38
  results.append(result_dict)
39
 
40
- print({"result": results})
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
41
 
42
 
43
  def consume_messages():
@@ -46,12 +62,13 @@ def consume_messages():
46
  bootstrap_servers=[os.environ.get("KAFKA_IP")],
47
  auto_offset_reset='earliest',
48
  client_id="ai-detector-1",
49
- group_id=None,
50
  )
51
 
52
  print("Successfully connected to Kafka at", os.environ.get("KAFKA_IP"))
53
 
54
  BATCH_SIZE = 5
 
55
 
56
  for message in consumer:
57
  try:
@@ -62,7 +79,10 @@ def consume_messages():
62
  print("Continuing...")
63
  continue
64
 
 
65
  for i in range(0, len(full_batch), BATCH_SIZE):
66
  batch = full_batch[i:i+BATCH_SIZE]
67
- process_batch(batch, BATCH_SIZE,
68
- incoming_message["job_application_id"])
 
 
 
1
  import json
2
  import os
3
+ import requests
4
  from kafka import KafkaConsumer
5
  from get_gpt_answer import GetGPTAnswer
6
  from typing import List
 
15
  return data
16
 
17
 
18
+ def process_batch(batch: List[dict[str, any]], batch_size: int, gpt_helper: GetGPTAnswer):
19
  with ThreadPoolExecutor(max_workers=batch_size) as executor:
 
20
  futures = [executor.submit(
21
  get_gpt_responses, data, gpt_helper) for data in batch]
22
  results = [future.result() for future in futures]
 
37
  print(f"Item {key} is not a convertible protobuf message.")
38
  results.append(result_dict)
39
 
40
+ return results
41
+
42
+
43
+ def send_results_back(full_results: dict[str, any], job_application_id: str):
44
+ url = "https://ta-2-sistem-cerdas-be-vi2jkj4riq-et.a.run.app/api/anti-cheat/update"
45
+ headers = {
46
+ "Content-Type": "application/json",
47
+ "x-api-key": os.environ.get("X-API-KEY")
48
+ }
49
+
50
+ body = {
51
+ "job_application_id": job_application_id,
52
+ "evaluation": full_results
53
+ }
54
+
55
+ response = requests.patch(url, json=body, headers=headers)
56
+ print(f"Data sent with status code {response.status_code}")
57
 
58
 
59
  def consume_messages():
 
62
  bootstrap_servers=[os.environ.get("KAFKA_IP")],
63
  auto_offset_reset='earliest',
64
  client_id="ai-detector-1",
65
+ group_id="ai-detector",
66
  )
67
 
68
  print("Successfully connected to Kafka at", os.environ.get("KAFKA_IP"))
69
 
70
  BATCH_SIZE = 5
71
+ gpt_helper = GetGPTAnswer()
72
 
73
  for message in consumer:
74
  try:
 
79
  print("Continuing...")
80
  continue
81
 
82
+ full_results = []
83
  for i in range(0, len(full_batch), BATCH_SIZE):
84
  batch = full_batch[i:i+BATCH_SIZE]
85
+ batch_results = process_batch(batch, BATCH_SIZE, gpt_helper)
86
+ full_results.extend(batch_results)
87
+
88
+ send_results_back(full_results, incoming_message["job_application_id"])
public-prediction/requirements.txt CHANGED
@@ -3,4 +3,5 @@ langchain
3
  openai
4
  langchain-openai
5
  python-dotenv
6
- google-cloud-aiplatform
 
 
3
  openai
4
  langchain-openai
5
  python-dotenv
6
+ google-cloud-aiplatform
7
+ requests