Spaces:
Sleeping
Sleeping
File size: 4,496 Bytes
97208ad |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
import os
import json
from dotenv import load_dotenv
from openai import OpenAI
load_dotenv()
class BatchProcessor:
def __init__(self):
self.client = OpenAI(
api_key=os.getenv("OPENAI_API_KEY"),
organization=os.getenv("OPENAI_ORG_ID")
)
def prepare_batch(self, folder_path, output_file):
"""Prepare a batch input file from a folder of text files."""
with open(output_file, "w") as out_file:
for filename in os.listdir(folder_path):
if filename.endswith(".txt"):
file_path = os.path.join(folder_path, filename)
with open(file_path, "r") as file:
text = file.read()
batch_entry = {
"custom_id": filename,
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": "gpt-4o-mini",
"messages": [
{
"role": "system",
"content": (
"You are a helpful assistant designed to check if there's any racial content. "
"Please review this document for any racial or discriminatory expressions. "
"If yes, return 'Yes'; if there's none, please return 'No racial content found'. "
"If there is any doubt or ambiguity, assume the text contains racial content and respond 'Yes'."
)
},
{"role": "user", "content": text}
],
"max_tokens": 1000
}
}
out_file.write(json.dumps(batch_entry) + "\n")
print(f"Batch file created: {output_file}")
def upload_batch_file(self, batch_file_path):
"""Upload the prepared batch input file."""
with open(batch_file_path, "rb") as f:
batch_input_file = self.client.files.create(
file=f,
purpose="batch"
)
print(f"Batch input file uploaded. File ID: {batch_input_file.id}")
return batch_input_file.id
def create_batch(self, file_id):
"""Create a batch job with the uploaded input file."""
batch = self.client.batches.create(
input_file_id=file_id,
endpoint="/v1/chat/completions",
completion_window="24h",
metadata={
"description": "Deed analysis batch"
}
)
print(f"Batch created. Batch ID: {batch.id}")
return batch.id
def check_batch_status(self, batch_id):
"""Check the status of a batch job."""
batch_status = self.client.batches.retrieve(batch_id)
print(f"Batch Status: {batch_status.status}")
if batch_status.status == "completed":
output_file_id = batch_status.output_file_id
print(f"Output File ID: {output_file_id}")
return output_file_id
else:
return None
def retrieve_results(self, output_file_id, output_path):
"""Retrieve the results of a completed batch job."""
file_response = self.client.files.content(output_file_id)
with open(output_path, "wb") as out_file:
out_file.write(file_response.read())
print(f"Batch results downloaded to {output_path}")
if __name__ == "__main__":
processor = BatchProcessor()
folder_path = ""
batch_input_file = "batch_input.jsonl"
batch_output_file = "batch_output.jsonl"
# Step 1: Prepare the batch input file
processor.prepare_batch(folder_path, batch_input_file)
# Step 2: Upload the batch input file
file_id = processor.upload_batch_file(batch_input_file)
# Step 3: Create a batch job
batch_id = processor.create_batch(file_id)
# Step 4: Poll for batch status
import time
while True:
output_file_id = processor.check_batch_status(batch_id)
if output_file_id:
break
print("Batch not complete. Retrying in 30 minutes...")
time.sleep(1800)
# Step 5: Retrieve the results
processor.retrieve_results(output_file_id, batch_output_file)
|