Unit4_scoring / main.py
Jofthomas's picture
Update main.py
719b8ed verified
raw
history blame
28.4 kB
# Import necessary libraries (ensure all required imports are at the top)
import os
import pandas as pd
from fastapi import FastAPI, HTTPException, Body
from fastapi.responses import FileResponse
from pydantic import BaseModel, Field
from typing import List, Dict, Any, Optional
from datasets import load_dataset, Dataset, DatasetDict
from huggingface_hub import HfApi, hf_hub_download
from datetime import datetime, timezone
import logging
import uvicorn
import random
# --- Constants and Config ---
HF_DATASET_ID = "agents-course/unit4-students-scores"
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
task_file_paths: Dict[str, str] = {}
tool_threshold = 3
step_threshold = 5
questions_for_api: List[Dict[str, Any]] = []
ground_truth_answers: Dict[str, str] = {}
filtered_dataset = None
ALLOWED_CACHE_BASE = os.path.abspath("/app/.cache")
# --- Define ErrorResponse if not already defined ---
class ErrorResponse(BaseModel):
detail: str
def load_questions():
"""
Loads the GAIA dataset, filters questions based on tool/step counts,
populates 'questions_for_api' with data for the API (excluding sensitive/internal fields),
stores ground truth answers, and maps task IDs to their local file paths on the server.
"""
global filtered_dataset
global questions_for_api
global ground_truth_answers
global task_file_paths # Declare modification of global
tempo_filtered = []
# Clear existing data from previous runs or restarts
questions_for_api.clear()
ground_truth_answers.clear()
task_file_paths.clear() # Clear the file path mapping
logger.info("Starting to load and filter GAIA dataset (validation split)...")
try:
# Load the specified split
dataset = load_dataset("gaia-benchmark/GAIA", "2023_level1", split="validation", trust_remote_code=True)
logger.info(f"GAIA dataset validation split loaded. Features: {dataset.features}")
except Exception as e:
logger.error(f"Failed to load GAIA dataset: {e}", exc_info=True)
# Depending on requirements, you might want to exit or raise a more specific error
raise RuntimeError("Could not load the primary GAIA dataset.") from e
# --- Filtering Logic based on Annotator Metadata ---
for item in dataset:
metadata = item.get('Annotator Metadata')
if metadata:
num_tools_str = metadata.get('Number of tools')
num_steps_str = metadata.get('Number of steps')
if num_tools_str is not None and num_steps_str is not None:
try:
num_tools = int(num_tools_str)
num_steps = int(num_steps_str)
# Apply filter conditions
if num_tools < tool_threshold and num_steps < step_threshold:
tempo_filtered.append(item) # Add the original item if it matches filter
except ValueError:
logger.warning(f"Skipping Task ID: {item.get('task_id', 'N/A')} - Could not convert tool/step count in metadata: tools='{num_tools_str}', steps='{num_steps_str}'.")
else:
logger.warning(f"Skipping Task ID: {item.get('task_id', 'N/A')} - 'Number of tools' or 'Number of steps' missing in Metadata.")
else:
# If metadata is essential for filtering, you might want to skip items without it
logger.warning(f"Skipping Task ID: {item.get('task_id', 'N/A')} - Missing 'Annotator Metadata'.")
filtered_dataset = tempo_filtered # Store the list of filtered original dataset items
logger.info(f"Found {len(filtered_dataset)} questions matching the criteria (tools < {tool_threshold}, steps < {step_threshold}).")
processed_count = 0
# --- Process filtered items for API and File Mapping ---
for item in filtered_dataset:
# Extract data from the dataset item
task_id = item.get('task_id')
original_question_text = item.get('Question')
final_answer = item.get('Final answer')
local_file_path = item.get('file_path') # Server-local path from dataset
file_name = item.get('file_name') # Filename from dataset
# Validate essential fields needed for processing & ground truth
# Note: We proceed even if file path/name are missing, just won't map the file.
if task_id and original_question_text and final_answer is not None:
# 1. Create the dictionary to be exposed via the API
# (Includes 'file_name' for info, but excludes 'file_path')
processed_item = {
"task_id": str(task_id),
"question": str(original_question_text), # Rename 'Question' -> 'question'
# Include other desired fields, using .get() for safety
"Level": item.get("Level"),
"file_name": file_name, # Include filename for client info
}
# Optional: Remove keys with None values if you prefer cleaner JSON
processed_item = {k: v for k, v in processed_item.items() if v is not None}
questions_for_api.append(processed_item)
# 2. Store the ground truth answer separately
ground_truth_answers[str(task_id)] = str(final_answer)
# 3. Store the file path mapping if file details exist and are valid
if local_file_path and file_name:
# Log if the path from the dataset isn't absolute (might indicate issues)
if not os.path.isabs(local_file_path):
logger.warning(f"Task {task_id}: Path '{local_file_path}' from dataset is not absolute. This might cause issues finding the file on the server.")
# Depending on dataset guarantees, you might try making it absolute:
# Assuming WORKDIR is /app as per Dockerfile if paths are relative
# local_file_path = os.path.abspath(os.path.join("/app", local_file_path))
# Check if the file actually exists at the path ON THE SERVER
if os.path.exists(local_file_path) and os.path.isfile(local_file_path):
# Path exists, store the mapping
task_file_paths[str(task_id)] = local_file_path
logger.debug(f"Stored file path mapping for task_id {task_id}: {local_file_path}")
else:
# Path does *not* exist or is not a file on server filesystem
logger.warning(f"File path '{local_file_path}' for task_id {task_id} does NOT exist or is not a file on server. Mapping skipped.")
# Log if file info was missing in the first place
elif task_id: # Log only if we have a task_id to reference
# Check which specific part was missing for better debugging
if not local_file_path and not file_name:
logger.debug(f"Task {task_id}: No 'file_path' or 'file_name' found in dataset item. No file mapping stored.")
elif not local_file_path:
logger.debug(f"Task {task_id}: 'file_path' is missing in dataset item (file_name: '{file_name}'). No file mapping stored.")
else: # Not file_name
logger.debug(f"Task {task_id}: 'file_name' is missing in dataset item (file_path: '{local_file_path}'). No file mapping stored.")
processed_count += 1
else:
# Log skipping due to missing core fields (task_id, Question, Final answer)
logger.warning(f"Skipping item processing due to missing essential fields: task_id={task_id}, has_question={original_question_text is not None}, has_answer={final_answer is not None}")
# Final summary logging
logger.info(f"Successfully processed {processed_count} questions for the API.")
logger.info(f"Stored file path mappings for {len(task_file_paths)} tasks.")
if not questions_for_api:
logger.error("CRITICAL: No valid questions were loaded after filtering and processing. API endpoints like /questions will fail.")
# Consider raising an error if the application cannot function without questions
# raise RuntimeError("Failed to load mandatory question data after filtering.")
class Question(BaseModel):
task_id: str
question: str
Level: Optional[str] = None
file_name: Optional[str] = None # Keep filename for info
# file_path: Optional[str] = None # REMOVE file_path from the response model
# --- The rest of your Pydantic models remain the same ---
class AnswerItem(BaseModel):
task_id: str
submitted_answer: str = Field(..., description="The agent's answer for the task_id")
class Submission(BaseModel):
username: str = Field(..., description="Hugging Face username", min_length=1)
agent_code: str = Field(..., description="The Python class code for the agent", min_length=10) # Basic check
answers: List[AnswerItem] = Field(..., description="List of answers submitted by the agent")
class ScoreResponse(BaseModel):
username: str
score: float
correct_count: int
total_attempted: int
message: str
timestamp: str
class ErrorResponse(BaseModel):
detail: str
# Keep other models as they are (AnswerItem, Submission, ScoreResponse, ErrorResponse)
# ... (rest of the Pydantic models remain the same) ...
class AnswerItem(BaseModel):
task_id: str
submitted_answer: str = Field(..., description="The agent's answer for the task_id")
class Submission(BaseModel):
username: str = Field(..., description="Hugging Face username", min_length=1)
agent_code: str = Field(..., description="The Python class code for the agent", min_length=10) # Basic check
answers: List[AnswerItem] = Field(..., description="List of answers submitted by the agent")
class ScoreResponse(BaseModel):
username: str
score: float
correct_count: int
total_attempted: int
message: str
timestamp: str
class ErrorResponse(BaseModel):
detail: str
# --- FastAPI Application ---
app = FastAPI(
title="Agent Evaluation API",
description="API to fetch questions and submit agent answers for scoring.",
)
# --- Startup Event ---
@app.on_event("startup")
async def startup_event():
logger.info("Application startup: Loading questions...")
try:
load_questions()
if not questions_for_api:
logger.error("CRITICAL: No questions were loaded during startup.")
else:
logger.info(f"Successfully loaded {len(questions_for_api)} questions.")
except Exception as e:
logger.error(f"CRITICAL ERROR DURING STARTUP while loading questions: {e}", exc_info=True)
# import sys
# sys.exit(1) # Consider exiting if questions are critical
# --- Your Endpoints ---
@app.get("/files/{task_id}",
summary="Get Associated File by Task ID",
description="Downloads the file associated with the given task_id, if one exists and is mapped.",
responses={
200: {
"description": "File content.",
"content": {"*/*": {}} # Indicates response can be any file type
},
403: {"model": ErrorResponse, "description": "Access denied (e.g., path traversal attempt)."},
404: {"model": ErrorResponse, "description": "Task ID not found, no file associated, or file missing on server."},
500: {"model": ErrorResponse, "description": "Server error reading file."}
})
async def get_task_file(task_id: str):
"""
Serves the file associated with a specific task ID.
Includes security checks to prevent accessing arbitrary files.
"""
logger.info(f"Request received for file associated with task_id: {task_id}")
if task_id not in task_file_paths:
logger.warning(f"File request failed: task_id '{task_id}' not found in file path mapping.")
raise HTTPException(status_code=404, detail=f"No file path associated with task_id {task_id}.")
# --- ASSIGNMENT HAPPENS HERE ---
local_file_path = task_file_paths[task_id]
logger.debug(f"Mapped task_id '{task_id}' to local path: {local_file_path}")
# --- CRUCIAL SECURITY CHECK ---
try:
# Resolve to absolute paths to prevent '..' tricks
# --- local_file_path IS NOW DEFINED before being used ---
abs_file_path = os.path.abspath(local_file_path)
abs_base_path = ALLOWED_CACHE_BASE # Already absolute
# Check if the resolved file path starts with the allowed base directory
if not abs_file_path.startswith(abs_base_path):
logger.error(f"SECURITY ALERT: Path traversal attempt denied for task_id '{task_id}'. Path '{local_file_path}' resolves outside base '{abs_base_path}'.")
raise HTTPException(status_code=403, detail="File access denied.")
# Check if the file exists at the resolved, validated path
if not os.path.exists(abs_file_path) or not os.path.isfile(abs_file_path):
logger.error(f"File not found on server for task_id '{task_id}' at expected path: {abs_file_path}")
raise HTTPException(status_code=404, detail=f"File associated with task_id {task_id} not found on server disk.")
except HTTPException as http_exc:
raise http_exc # Re-raise our own security/404 exceptions
except Exception as path_err:
logger.error(f"Error resolving or checking path '{local_file_path}' for task_id '{task_id}': {path_err}", exc_info=True)
raise HTTPException(status_code=500, detail="Server error validating file path.")
# --- END SECURITY CHECK ---
# Determine MIME type for the Content-Type header
mime_type, _ = mimetypes.guess_type(abs_file_path) # Ensure 'import mimetypes' is at the top
media_type = mime_type if mime_type else "application/octet-stream" # Default if unknown
# Extract filename for the Content-Disposition header (suggests filename to browser/client)
file_name_for_download = os.path.basename(abs_file_path)
logger.info(f"Serving file '{file_name_for_download}' (type: {media_type}) for task_id '{task_id}' from path: {abs_file_path}")
# Use FileResponse to efficiently stream the file
return FileResponse(path=abs_file_path, media_type=media_type, filename=file_name_for_download)
def update_huggingface_dataset(username: str, score: float):
"""Loads the dataset, updates the score if higher, and pushes back."""
try:
# 1. Load the dataset
logger.info(f"Loading dataset '{HF_DATASET_ID}'...")
ds_dict = None
try:
# Use hf_hub_download to check if the parquet file exists, avoiding full dataset load error if empty
# This assumes the dataset uses the default 'train' split and parquet format. Adjust if needed.
hf_hub_download(repo_id=HF_DATASET_ID, filename="data/train-00000-of-00001.parquet", repo_type="dataset")
ds_dict = load_dataset(HF_DATASET_ID)
logger.info("Dataset loaded successfully.")
if "train" not in ds_dict:
logger.warning(f"Dataset '{HF_DATASET_ID}' does not contain a 'train' split. Creating one.")
df = pd.DataFrame({'username': pd.Series(dtype='str'),
'score': pd.Series(dtype='float'),
'timestamp': pd.Series(dtype='str')})
else:
# Convert the 'train' split to a pandas DataFrame for easier manipulation
df = ds_dict['train'].to_pandas()
except Exception as load_error: # Catch broad exception for file not found or other loading issues
logger.warning(f"Could not load dataset '{HF_DATASET_ID}' or it might be empty/new ({load_error}). Creating structure.")
# Create an empty DataFrame with the correct schema
df = pd.DataFrame({'username': pd.Series(dtype='str'),
'score': pd.Series(dtype='float'),
'timestamp': pd.Series(dtype='str')})
# Ensure columns exist, add if they don't
for col, dtype in [('username', 'str'), ('score', 'float'), ('timestamp', 'str')]:
if col not in df.columns:
logger.warning(f"Column '{col}' not found in dataset. Adding it.")
df[col] = pd.Series(dtype=dtype)
# Convert score column to numeric, coercing errors
df['score'] = pd.to_numeric(df['score'], errors='coerce')
# Fill potential NaN values in score with 0.0 before comparison/aggregation
df['score'] = df['score'].fillna(0.0)
# 2. Find existing score for the user
existing_entries = df[df['username'] == username]
current_timestamp = datetime.now(timezone.utc).isoformat()
needs_update = False
if not existing_entries.empty:
# User exists, find their highest score
# Handle potential NaN scores from coercion or previous bad data (though fillna above should help)
max_existing_score = existing_entries['score'].max()
if score > max_existing_score:
logger.info(f"New score {score} is higher than existing max {max_existing_score} for {username}. Updating.")
# Remove old entries for this user
df = df[df['username'] != username]
# Add new entry
new_entry = pd.DataFrame([{'username': username, 'score': score, 'timestamp': current_timestamp}])
df = pd.concat([df, new_entry], ignore_index=True)
needs_update = True
else:
logger.info(f"New score {score} is not higher than existing max {max_existing_score} for {username}. No update needed.")
else:
# User does not exist, add them
logger.info(f"User {username} not found. Adding new entry.")
new_entry = pd.DataFrame([{'username': username, 'score': score, 'timestamp': current_timestamp}])
df = pd.concat([df, new_entry], ignore_index=True)
needs_update = True
# 3. Push updated data back to Hugging Face Hub if changes were made
if needs_update:
logger.info(f"Pushing updated dataset to '{HF_DATASET_ID}'...")
# Convert potentially modified DataFrame back to a Dataset object
# Ensure the schema matches if columns were added/modified.
# Use 'train' split convention.
# Make sure the dtypes are correct before creating the Dataset
df['username'] = df['username'].astype(str)
df['score'] = df['score'].astype(float)
df['timestamp'] = df['timestamp'].astype(str)
updated_ds = DatasetDict({'train': Dataset.from_pandas(df)})
logger.info(f"Dataset to push: {updated_ds}") # Log the dataset structure
# updated_ds.push_to_hub(HF_DATASET_ID) # Uncomment this line to enable leaderboard updates
logger.warning("Dataset push to hub is currently commented out. Uncomment the line above to enable leaderboard updates.") # REMINDER
logger.info("Dataset push simulated/attempted.")
return True
else:
return False # No update was pushed
except Exception as e:
logger.error(f"Error interacting with Hugging Face dataset '{HF_DATASET_ID}': {e}", exc_info=True)
# Re-raise the exception to be caught by the endpoint handler
raise HTTPException(status_code=500, detail=f"Failed to update Hugging Face dataset: {e}")
# --- API Endpoints (Modified response_model) ---
@app.get("/questions",
# Return a list of dictionaries with arbitrary keys/values
response_model=List[Dict[str, Any]],
summary="Get All Filtered Questions (Full Data)",
description="Returns the complete list of questions with all associated data (excluding answer/annotation) filtered based on criteria.")
async def get_questions():
"""
Provides the list of questions (with extended data) that agents should answer.
"""
if not questions_for_api:
logger.error("GET /questions requested but no questions are loaded.")
raise HTTPException(status_code=404, detail="No questions available.")
# questions_for_api now contains the richer dictionaries
return questions_for_api
@app.get("/random-question",
# Return a single dictionary with arbitrary keys/values
response_model=Dict[str, Any],
summary="Get One Random Question (Full Data)",
description="Returns a single random question with all associated data (excluding answer/annotation) from the available filtered set.",
responses={
200: {"description": "A random question with its full data."},
404: {"model": ErrorResponse, "description": "No questions available to choose from."}
})
async def get_random_question():
"""
Provides a single, randomly selected question with its extended data.
"""
if not questions_for_api:
logger.warning("GET /random-question requested but no questions are loaded.")
raise HTTPException(status_code=404, detail="No questions available to choose from.")
# Select and return a random question dictionary
random_question = random.choice(questions_for_api)
logger.info(f"Returning random question with task_id: {random_question.get('task_id', 'N/A')}")
# random_question is already the richer dictionary
return random_question
# --- Submit Endpoint (remains the same, uses ground_truth_answers) ---
@app.post("/submit",
response_model=ScoreResponse,
summary="Submit Agent Answers",
description="Submit answers from an agent, calculate score, and update leaderboard on Hugging Face.",
responses={
200: {"description": "Submission successful, score calculated."},
400: {"model": ErrorResponse, "description": "Invalid input data."},
404: {"model": ErrorResponse, "description": "Task ID not found in submission or ground truth."},
500: {"model": ErrorResponse, "description": "Server error (e.g., failed to update dataset)."}
})
async def submit_answers(submission: Submission = Body(...)):
"""
Receives agent submissions:
- Validates input.
- Checks presence of agent code (basic anti-cheat).
- Calculates score based on submitted answers vs ground truth.
- Updates the score on the Hugging Face dataset if it's a new high score for the user.
"""
logger.info(f"Received submission from username: {submission.username}")
# Basic check for agent code presence
if not submission.agent_code or len(submission.agent_code.strip()) < 10:
logger.warning(f"Submission rejected for {submission.username}: Agent code missing or too short.")
raise HTTPException(status_code=400, detail="Agent code is required and must be sufficiently long.")
if not submission.answers:
logger.warning(f"Submission rejected for {submission.username}: No answers provided.")
raise HTTPException(status_code=400, detail="No answers provided in the submission.")
correct_count = 0
total_attempted_in_payload = len(submission.answers)
valid_attempted_count = 0 # Count attempts where task_id was valid
processed_ids = set()
for answer_item in submission.answers:
task_id = str(answer_item.task_id) # Ensure string comparison
submitted = str(answer_item.submitted_answer) # Ensure string comparison
# Prevent duplicate task_id submissions in the same request
if task_id in processed_ids:
logger.warning(f"Duplicate task_id '{task_id}' in submission from {submission.username}. Skipping.")
continue # Don't count this as an attempt for scoring
processed_ids.add(task_id)
# Check if task_id is valid (exists in our loaded ground truth)
if task_id not in ground_truth_answers:
logger.warning(f"Task ID '{task_id}' submitted by {submission.username} not found in ground truth list. Skipping this answer.")
# Don't count this as a valid attempt for score calculation
continue
# If we reach here, the task_id is valid
valid_attempted_count += 1
ground_truth = ground_truth_answers[task_id]
# Compare answers (case-insensitive, strip whitespace)
if submitted.strip().lower() == ground_truth.strip().lower():
correct_count += 1
logger.debug(f"Correct answer for {task_id} from {submission.username}")
else:
logger.debug(f"Incorrect answer for {task_id} from {submission.username}. Submitted: '{submitted}', Expected: '{ground_truth}'")
# Calculate score based on valid attempts AND total number of questions available
if valid_attempted_count == 0:
score = 0.0
message = f"Submission received, but no valid/matching task IDs were found in the {total_attempted_in_payload} answers provided."
logger.warning(f"No valid answers processed for {submission.username} out of {total_attempted_in_payload} submitted.")
elif not ground_truth_answers: # Prevent division by zero if no questions loaded
score = 0.0
message = "Score cannot be calculated because no ground truth answers are loaded."
logger.error(f"Cannot calculate score for {submission.username}: ground_truth_answers is empty.")
else:
# Score is based on correct answers divided by the TOTAL number of questions in the filtered set
score = round((correct_count / len(ground_truth_answers)) * 100, 2)
message = f"Score calculated successfully: {correct_count}/{len(ground_truth_answers)} total questions answered correctly ({valid_attempted_count} valid tasks attempted)."
if valid_attempted_count < total_attempted_in_payload:
message += f" ({total_attempted_in_payload - valid_attempted_count} submitted answers had invalid or duplicate task IDs)."
logger.info(f"Score for {submission.username}: {score}% ({correct_count}/{len(ground_truth_answers)} correct, based on {valid_attempted_count} valid attempts)")
# Update Hugging Face dataset
try:
updated = update_huggingface_dataset(submission.username, score)
if updated:
message += " High score updated on leaderboard."
logger.info(f"Leaderboard updated for {submission.username}.")
else:
message += " Score did not improve previous record, leaderboard not updated."
logger.info(f"Leaderboard not updated for {submission.username} as score was not higher.")
except HTTPException as http_exc:
# Propagate HTTPException from the helper function (e.g., 500 error)
raise http_exc
except Exception as e:
# Catch any other unexpected errors during HF update
logger.error(f"Unexpected error during dataset update for {submission.username}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="An unexpected error occurred while updating the leaderboard.")
return ScoreResponse(
username=submission.username,
score=score,
correct_count=correct_count,
# Return the count of *valid* attempts for clarity
total_attempted=valid_attempted_count,
message=message,
timestamp=datetime.now(timezone.utc).isoformat()
)
# --- Run the application ---
if __name__ == "__main__":
logger.info("Starting FastAPI server for local development...")
try:
load_questions() # Load questions before starting server
if not questions_for_api:
logger.error("EXITING: Cannot start server without loaded questions.")
# Optional: exit if questions are essential
# import sys
# sys.exit(1)
else:
local_port = int(os.getenv("PORT", "8000"))
logger.info(f"Running Uvicorn locally on http://127.0.0.1:{local_port}")
uvicorn.run(app, host="127.0.0.1", port=local_port, log_level="info")
except Exception as e:
logger.error(f"Failed to start server: {e}", exc_info=True)