Spaces:
Sleeping
Sleeping
import os | |
import json | |
from dotenv import load_dotenv | |
from openai import OpenAI | |
load_dotenv() | |
class BatchProcessor: | |
def __init__(self): | |
self.client = OpenAI( | |
api_key=os.getenv("OPENAI_API_KEY"), | |
organization=os.getenv("OPENAI_ORG_ID") | |
) | |
def prepare_batch(self, folder_path, output_file): | |
"""Prepare a batch input file from a folder of text files.""" | |
with open(output_file, "w") as out_file: | |
for filename in os.listdir(folder_path): | |
if filename.endswith(".txt"): | |
file_path = os.path.join(folder_path, filename) | |
with open(file_path, "r") as file: | |
text = file.read() | |
batch_entry = { | |
"custom_id": filename, | |
"method": "POST", | |
"url": "/v1/chat/completions", | |
"body": { | |
"model": "gpt-4o-mini", | |
"messages": [ | |
{ | |
"role": "system", | |
"content": ( | |
"You are a helpful assistant designed to check if there's any racial content. " | |
"Please review this document for any racial or discriminatory expressions. " | |
"If yes, return 'Yes'; if there's none, please return 'No racial content found'. " | |
"If there is any doubt or ambiguity, assume the text contains racial content and respond 'Yes'." | |
) | |
}, | |
{"role": "user", "content": text} | |
], | |
"max_tokens": 1000 | |
} | |
} | |
out_file.write(json.dumps(batch_entry) + "\n") | |
print(f"Batch file created: {output_file}") | |
def upload_batch_file(self, batch_file_path): | |
"""Upload the prepared batch input file.""" | |
with open(batch_file_path, "rb") as f: | |
batch_input_file = self.client.files.create( | |
file=f, | |
purpose="batch" | |
) | |
print(f"Batch input file uploaded. File ID: {batch_input_file.id}") | |
return batch_input_file.id | |
def create_batch(self, file_id): | |
"""Create a batch job with the uploaded input file.""" | |
batch = self.client.batches.create( | |
input_file_id=file_id, | |
endpoint="/v1/chat/completions", | |
completion_window="24h", | |
metadata={ | |
"description": "Deed analysis batch" | |
} | |
) | |
print(f"Batch created. Batch ID: {batch.id}") | |
return batch.id | |
def check_batch_status(self, batch_id): | |
"""Check the status of a batch job.""" | |
batch_status = self.client.batches.retrieve(batch_id) | |
print(f"Batch Status: {batch_status.status}") | |
if batch_status.status == "completed": | |
output_file_id = batch_status.output_file_id | |
print(f"Output File ID: {output_file_id}") | |
return output_file_id | |
else: | |
return None | |
def retrieve_results(self, output_file_id, output_path): | |
"""Retrieve the results of a completed batch job.""" | |
file_response = self.client.files.content(output_file_id) | |
with open(output_path, "wb") as out_file: | |
out_file.write(file_response.read()) | |
print(f"Batch results downloaded to {output_path}") | |
if __name__ == "__main__": | |
processor = BatchProcessor() | |
folder_path = "" | |
batch_input_file = "batch_input.jsonl" | |
batch_output_file = "batch_output.jsonl" | |
# Step 1: Prepare the batch input file | |
processor.prepare_batch(folder_path, batch_input_file) | |
# Step 2: Upload the batch input file | |
file_id = processor.upload_batch_file(batch_input_file) | |
# Step 3: Create a batch job | |
batch_id = processor.create_batch(file_id) | |
# Step 4: Poll for batch status | |
import time | |
while True: | |
output_file_id = processor.check_batch_status(batch_id) | |
if output_file_id: | |
break | |
print("Batch not complete. Retrying in 30 minutes...") | |
time.sleep(1800) | |
# Step 5: Retrieve the results | |
processor.retrieve_results(output_file_id, batch_output_file) | |