bearking58 commited on
Commit
bcd9850
·
1 Parent(s): 06a4ed9

feat: connect to vertex ai

Browse files
public-prediction/kafka_consumer.py CHANGED
@@ -4,13 +4,13 @@ from kafka import KafkaConsumer
4
  from get_gpt_answer import GetGPTAnswer
5
  from typing import List
6
  from concurrent.futures import ThreadPoolExecutor
 
 
7
 
8
 
9
  def get_gpt_responses(data: dict[str, any], gpt_helper: GetGPTAnswer):
10
- # data["gpt35_answer"] = gpt_helper.generate_gpt35_answer(data["question"])
11
- # data["gpt4_answer"] = gpt_helper.generate_gpt4_answer(data["question"])
12
- data["gpt35_answer"] = "This is gpt35 answer"
13
- data["gpt4_answer"] = "This is gpt4 answer"
14
  return data
15
 
16
 
@@ -23,6 +23,22 @@ def process_batch(batch: List[dict[str, any]], batch_size: int, job_application_
23
 
24
  print("Batch ready with gpt responses", results)
25
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
26
 
27
  def consume_messages():
28
  consumer = KafkaConsumer(
 
4
  from get_gpt_answer import GetGPTAnswer
5
  from typing import List
6
  from concurrent.futures import ThreadPoolExecutor
7
+ from predict_custom_model import predict_custom_trained_model
8
+ from google.protobuf.json_format import MessageToDict
9
 
10
 
11
  def get_gpt_responses(data: dict[str, any], gpt_helper: GetGPTAnswer):
12
+ data["gpt35_answer"] = gpt_helper.generate_gpt35_answer(data["question"])
13
+ data["gpt4_answer"] = gpt_helper.generate_gpt4_answer(data["question"])
 
 
14
  return data
15
 
16
 
 
23
 
24
  print("Batch ready with gpt responses", results)
25
 
26
+ predictions = predict_custom_trained_model(
27
+ instances=results, project=os.environ.get("PROJECT_ID"), endpoint_id=os.environ.get("ENDPOINT_ID"))
28
+
29
+ results = []
30
+ for prediction in predictions:
31
+ result_dict = {}
32
+ for key, value in prediction._pb.items():
33
+ # Ensure that 'value' is a protobuf message
34
+ if hasattr(value, 'DESCRIPTOR'):
35
+ result_dict[key] = MessageToDict(value)
36
+ else:
37
+ print(f"Item {key} is not a convertible protobuf message.")
38
+ results.append(result_dict)
39
+
40
+ print({"result": results})
41
+
42
 
43
  def consume_messages():
44
  consumer = KafkaConsumer(
public-prediction/predict_custom_model.py ADDED
@@ -0,0 +1,40 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import Dict, List, Union
2
+ from google.cloud import aiplatform
3
+ from google.protobuf import json_format
4
+ from google.protobuf.struct_pb2 import Value
5
+
6
+
7
+ def predict_custom_trained_model(
8
+ project: str,
9
+ endpoint_id: str,
10
+ instances: Union[Dict, List[Dict]],
11
+ location: str = "us-central1",
12
+ api_endpoint: str = "us-central1-aiplatform.googleapis.com",
13
+ ):
14
+ """
15
+ `instances` can be either single instance of type dict or a list
16
+ of instances.
17
+ """
18
+ # The AI Platform services require regional API endpoints.
19
+ client_options = {"api_endpoint": api_endpoint}
20
+ # Initialize client that will be used to create and send requests.
21
+ # This client only needs to be created once, and can be reused for multiple requests.
22
+ client = aiplatform.gapic.PredictionServiceClient(
23
+ client_options=client_options)
24
+ # The format of each instance should conform to the deployed model's prediction input schema.
25
+ instances = instances if isinstance(instances, list) else [instances]
26
+ instances = [
27
+ json_format.ParseDict(instance_dict, Value()) for instance_dict in instances
28
+ ]
29
+ parameters_dict = {}
30
+ parameters = json_format.ParseDict(parameters_dict, Value())
31
+ endpoint = client.endpoint_path(
32
+ project=project, location=location, endpoint=endpoint_id
33
+ )
34
+ response = client.predict(
35
+ endpoint=endpoint, instances=instances, parameters=parameters
36
+ )
37
+ # The predictions are a google.protobuf.Value representation of the model's predictions.
38
+ predictions = response.predictions
39
+
40
+ return predictions