Unit4_scoring / main.py
Jofthomas's picture
Update main.py
5b9a69f verified
raw
history blame
16.6 kB
import os
import pandas as pd
from fastapi import FastAPI, HTTPException, Body
from pydantic import BaseModel, Field
from typing import List, Dict, Any
from datasets import load_dataset, Dataset, DatasetDict
from huggingface_hub import HfApi, hf_hub_download
from datetime import datetime, timezone
import logging
import uvicorn # To run the app
tool_threshold = 3
step_threshold = 5
# --- Configuration ---
HF_DATASET_ID = "agents-course/unit4-students-scores"
# Ensure you have write access to this dataset repository on Hugging Face
# and are logged in via `huggingface-cli login` or have HF_TOKEN env var set.
# Prepare data structures for the API
questions_for_api: List[Dict[str, str]] = []
ground_truth_answers: Dict[str, str] = {}
# --- Logging Setup ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
filtered_dataset=None
def load_questions():
global filtered_dataset
global questions_for_api
global ground_truth_answers
tempo_filtered=[]
dataset=load_dataset("gaia-benchmark/GAIA","2023_level1",trust_remote_code=True)
for question in dataset['validation']:
metadata = question.get('Annotator Metadata') # Use .get() for safety
if metadata: # Check if 'Annotator Metadata' exists
num_tools_str = metadata.get('Number of tools')
num_steps_str = metadata.get('Number of steps')
# Check if both numbers exist before trying to convert
if num_tools_str is not None and num_steps_str is not None:
try:
# Convert values to integers for comparison
num_tools = int(num_tools_str)
num_steps = int(num_steps_str)
# Apply the filter conditions
if num_tools < tool_threshold and num_steps < step_threshold:
print(f"MATCH FOUND (Task ID: {question.get('task_id', 'N/A')}) - Tools: {num_tools}, Steps: {num_steps}")
print(question) # Print the matching question dictionary
print("------------------------------------------------------------------")
tempo_filtered.append(question) # Add to the filtered list
# else: # Optional: Handle items that don't match the filter
# print(f"Skipping Task ID: {question.get('task_id', 'N/A')} - Tools: {num_tools}, Steps: {num_steps}")
except ValueError:
# Handle cases where 'Number of tools' or 'Number of steps' is not a valid integer
print(f"Skipping Task ID: {question.get('task_id', 'N/A')} - Could not convert tool/step count to integer.")
print("------------------------------------------------------------------")
filtered_dataset=tempo_filtered
print(filtered_dataset)
for item in filtered_dataset:
task_id = item.get('task_id')
question_text = item.get('Question')
final_answer = item.get('Final answer')
if task_id and question_text and final_answer is not None:
questions_for_api.append({
"task_id": str(task_id), # Ensure ID is string
"question": question_text
})
ground_truth_answers[str(task_id)] = str(final_answer) # Ensure answer is string
else:
logger.warning(f"Skipping item due to missing fields: {item}")
logger.info(f"Loaded {len(questions_for_api)} questions for the API.")
print(questions_for_api)
if not questions_for_api:
logger.error("No valid questions loaded. API will not function correctly.")
# You might want to exit or raise an error here depending on requirements
# --- Pydantic Models for Data Validation ---
class Question(BaseModel):
task_id: str
question: str
class AnswerItem(BaseModel):
task_id: str
submitted_answer: str = Field(..., description="The agent's answer for the task_id")
class Submission(BaseModel):
username: str = Field(..., description="Hugging Face username", min_length=1)
agent_code: str = Field(..., description="The Python class code for the agent", min_length=10) # Basic check
answers: List[AnswerItem] = Field(..., description="List of answers submitted by the agent")
class ScoreResponse(BaseModel):
username: str
score: float
correct_count: int
total_attempted: int
message: str
timestamp: str
class ErrorResponse(BaseModel):
detail: str
# --- FastAPI Application ---
app = FastAPI(
title="Agent Evaluation API",
description="API to fetch questions and submit agent answers for scoring.",
)
# --- Startup Event Handler ---
@app.on_event("startup")
async def startup_event():
"""
Loads the questions when the FastAPI application starts.
"""
logger.info("Application startup: Loading questions...")
load_questions() # Call your loading function here
if not questions_for_api:
logger.error("CRITICAL: No questions were loaded during startup. The /questions endpoint will fail.")
# Depending on requirements, you might want the app to fail startup
# raise RuntimeError("Failed to load mandatory question data.")
else:
logger.info(f"Successfully loaded {len(questions_for_api)} questions.")
# --- Helper Function to interact with HF Dataset ---
def update_huggingface_dataset(username: str, score: float):
"""Loads the dataset, updates the score if higher, and pushes back."""
try:
# 1. Load the dataset
logger.info(f"Loading dataset '{HF_DATASET_ID}'...")
# Try loading, handle case where dataset might be empty or non-existent initially
try:
# Use hf_hub_download to check if the parquet file exists, avoiding full dataset load error if empty
# This assumes the dataset uses the default 'train' split and parquet format. Adjust if needed.
hf_hub_download(repo_id=HF_DATASET_ID, filename="data/train-00000-of-00001.parquet", repo_type="dataset")
ds = load_dataset(HF_DATASET_ID)
logger.info("Dataset loaded successfully.")
# Check if it has a 'train' split, common default
if "train" not in ds:
logger.warning(f"Dataset '{HF_DATASET_ID}' does not contain a 'train' split. Creating one.")
# Create an empty DataFrame with the correct schema if 'train' split is missing
df = pd.DataFrame({'username': pd.Series(dtype='str'),
'score': pd.Series(dtype='float'),
'timestamp': pd.Series(dtype='str')})
ds = DatasetDict({'train': Dataset.from_pandas(df)})
else:
# Convert the 'train' split to a pandas DataFrame for easier manipulation
df = ds['train'].to_pandas()
except Exception as load_error: # Catch broad exception for file not found or other loading issues
logger.warning(f"Could not load dataset '{HF_DATASET_ID}' or it might be empty/new ({load_error}). Creating structure.")
# Create an empty DataFrame with the correct schema
df = pd.DataFrame({'username': pd.Series(dtype='str'),
'score': pd.Series(dtype='float'),
'timestamp': pd.Series(dtype='str')})
# Ensure columns exist, add if they don't
for col, dtype in [('username', 'str'), ('score', 'float'), ('timestamp', 'str')]:
if col not in df.columns:
logger.warning(f"Column '{col}' not found in dataset. Adding it.")
df[col] = pd.Series(dtype=dtype)
# Convert score column to numeric, coercing errors
df['score'] = pd.to_numeric(df['score'], errors='coerce')
# 2. Find existing score for the user
existing_entries = df[df['username'] == username]
current_timestamp = datetime.now(timezone.utc).isoformat()
needs_update = False
if not existing_entries.empty:
# User exists, find their highest score
# Handle potential NaN scores from coercion or previous bad data
max_existing_score = existing_entries['score'].max()
if pd.isna(max_existing_score) or score > max_existing_score:
logger.info(f"New score {score} is higher than existing max {max_existing_score} for {username}. Updating.")
# Remove old entries for this user
df = df[df['username'] != username]
# Add new entry
new_entry = pd.DataFrame([{'username': username, 'score': score, 'timestamp': current_timestamp}])
df = pd.concat([df, new_entry], ignore_index=True)
needs_update = True
else:
logger.info(f"New score {score} is not higher than existing max {max_existing_score} for {username}. No update needed.")
else:
# User does not exist, add them
logger.info(f"User {username} not found. Adding new entry.")
new_entry = pd.DataFrame([{'username': username, 'score': score, 'timestamp': current_timestamp}])
df = pd.concat([df, new_entry], ignore_index=True)
needs_update = True
# 3. Push updated data back to Hugging Face Hub if changes were made
if needs_update:
logger.info(f"Pushing updated dataset to '{HF_DATASET_ID}'...")
# Convert potentially modified DataFrame back to a Dataset object
# Ensure the schema matches if columns were added/modified.
# Use 'train' split convention.
updated_ds = DatasetDict({'train': Dataset.from_pandas(df)})
pritn(updated_ds)
#updated_ds.push_to_hub(HF_DATASET_ID) # Token should be picked up from env or login
logger.info("Dataset push successful.")
return True
else:
return False # No update was pushed
except Exception as e:
logger.error(f"Error interacting with Hugging Face dataset '{HF_DATASET_ID}': {e}", exc_info=True)
# Re-raise the exception to be caught by the endpoint handler
raise HTTPException(status_code=500, detail=f"Failed to update Hugging Face dataset: {e}")
# --- API Endpoints ---
@app.get("/questions",
response_model=List[Question],
summary="Get Filtered Questions",
description="Returns a list of questions (task_id and question text only) for the agent evaluation.")
async def get_questions():
"""
Provides the list of questions that agents should answer.
"""
print(questions_for_api)
if not questions_for_api:
raise HTTPException(status_code=404, detail="No questions available.")
return questions_for_api
@app.post("/submit",
response_model=ScoreResponse,
summary="Submit Agent Answers",
description="Submit answers from an agent, calculate score, and update leaderboard on Hugging Face.",
responses={
200: {"description": "Submission successful, score calculated."},
400: {"model": ErrorResponse, "description": "Invalid input data."},
404: {"model": ErrorResponse, "description": "Task ID not found."},
500: {"model": ErrorResponse, "description": "Server error (e.g., failed to update dataset)."}
})
async def submit_answers(submission: Submission = Body(...)):
"""
Receives agent submissions:
- Validates input.
- Checks presence of agent code (basic anti-cheat).
- Calculates score based on submitted answers vs ground truth.
- Updates the score on the Hugging Face dataset if it's a new high score for the user.
"""
logger.info(f"Received submission from username: {submission.username}")
# Basic check for agent code presence
if not submission.agent_code or len(submission.agent_code.strip()) < 10:
logger.warning(f"Submission rejected for {submission.username}: Agent code missing or too short.")
raise HTTPException(status_code=400, detail="Agent code is required and must be sufficiently long.")
if not submission.answers:
logger.warning(f"Submission rejected for {submission.username}: No answers provided.")
raise HTTPException(status_code=400, detail="No answers provided in the submission.")
correct_count = 0
total_attempted = len(submission.answers)
processed_ids = set()
for answer_item in submission.answers:
task_id = str(answer_item.task_id) # Ensure string comparison
submitted = str(answer_item.submitted_answer) # Ensure string comparison
# Prevent duplicate task_id submissions in the same request
if task_id in processed_ids:
logger.warning(f"Duplicate task_id '{task_id}' in submission from {submission.username}. Skipping.")
total_attempted -= 1 # Adjust count as we skip it
continue
processed_ids.add(task_id)
# Check if task_id is valid
if task_id not in ground_truth_answers:
logger.warning(f"Task ID '{task_id}' submitted by {submission.username} not found in ground truth list.")
# Option 1: Reject the whole submission
# raise HTTPException(status_code=404, detail=f"Task ID '{task_id}' not found.")
# Option 2: Skip this answer and continue scoring others (chosen here)
total_attempted -= 1 # Don't count this attempt if the ID was invalid
continue
# Compare answers (case-insensitive, strip whitespace)
ground_truth = ground_truth_answers[task_id]
if submitted.strip().lower() == ground_truth.strip().lower():
correct_count += 1
logger.debug(f"Correct answer for {task_id} from {submission.username}")
else:
logger.debug(f"Incorrect answer for {task_id} from {submission.username}. Submitted: '{submitted}', Expected: '{ground_truth}'")
# Calculate score
if total_attempted == 0:
score = 0.0
message = "No valid answers submitted or processed."
logger.warning(f"No valid answers processed for {submission.username}.")
else:
score = round((correct_count / total_attempted) * 100, 2)
message = f"Score calculated successfully. {correct_count}/{total_attempted} correct."
logger.info(f"Score for {submission.username}: {score}% ({correct_count}/{total_attempted})")
# Update Hugging Face dataset
try:
updated = update_huggingface_dataset(submission.username, score)
if updated:
message += " High score updated on leaderboard."
logger.info(f"Leaderboard updated for {submission.username}.")
else:
message += " Score did not improve previous record, leaderboard not updated."
logger.info(f"Leaderboard not updated for {submission.username} as score was not higher.")
except HTTPException as http_exc:
# Propagate HTTPException from the helper function (e.g., 500 error)
raise http_exc
except Exception as e:
# Catch any other unexpected errors during HF update
logger.error(f"Unexpected error during dataset update for {submission.username}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="An unexpected error occurred while updating the leaderboard.")
return ScoreResponse(
username=submission.username,
score=score,
correct_count=correct_count,
total_attempted=total_attempted,
message=message,
timestamp=datetime.now(timezone.utc).isoformat()
)
# --- Run the application ---
# This part is mainly for local development without Docker.
# Docker uses the CMD instruction in the Dockerfile.
if __name__ == "__main__":
logger.info("Starting FastAPI server for local development...")
if not questions_for_api:
logger.error("EXITING: Cannot start server without loaded questions.")
else:
# Read port from environment variable for consistency, default to 8000 for local if not set
local_port = int(os.getenv("PORT", "8000"))
logger.info(f"Running Uvicorn locally on port: {local_port}")
# Note: host='127.0.0.1' is usually fine for local runs outside docker
load_questions()
uvicorn.run(app, host="127.0.0.1", port=local_port, log_level="info")