Test_Magus / app.py
SergeyO7's picture
Update app.py
7fed0af verified
raw
history blame
11.3 kB
import os
import gradio as gr
import requests
import pandas as pd
import json
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from langchain_core.messages import HumanMessage
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from agent import AdvancedAgent
# --- Constants ---
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
CACHE_FILE = "questions_cache.json"
CACHE_EXPIRATION_SECONDS = 86400 # 1 day
MAX_RETRIES = 3
INITIAL_BACKOFF = 2 # seconds
def create_retry_session():
"""Create a requests session with retry logic for handling 429 errors."""
session = requests.Session()
retries = Retry(
total=MAX_RETRIES,
backoff_factor=INITIAL_BACKOFF,
status_forcelist=[429],
allowed_methods=["GET", "POST"]
)
adapter = HTTPAdapter(max_retries=retries)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
def load_cached_questions():
"""Load cached questions if the cache is still valid."""
cache_path = Path(CACHE_FILE)
if cache_path.exists():
try:
with cache_path.open('r') as f:
data = json.load(f)
cached_time = data.get("timestamp")
if cached_time and (time.time() - cached_time) < CACHE_EXPIRATION_SECONDS:
questions = [
{
"task_id": item["task_id"],
"question": HumanMessage(content=item["question"])
}
for item in data["questions"]
]
print(f"Loaded {len(questions)} questions from cache.")
return questions
else:
print("Cache expired.")
except Exception as e:
print(f"Error loading cached questions: {e}")
return None
def cache_questions(questions_data):
"""Cache questions with a timestamp."""
cache_path = Path(CACHE_FILE)
try:
cache_data = {
"timestamp": time.time(),
"questions": [
{
"task_id": item["task_id"],
"question": item["question"].content
}
for item in questions_data
]
}
with cache_path.open('w') as f:
json.dump(cache_data, f, indent=2)
print(f"Cached {len(questions_data)} questions to {CACHE_FILE}.")
except Exception as e:
print(f"Error caching questions: {e}")
async def process_question(agent, item):
"""Process a single question using the agent."""
task_id = item["task_id"]
question = item["question"]
try:
loop = asyncio.get_event_loop()
submitted_answer = await loop.run_in_executor(None, agent, question.content)
return {
"task_id": task_id,
"submitted_answer": submitted_answer,
"question_text": question.content
}
except Exception as e:
print(f"Error processing task {task_id}: {e}")
return {
"task_id": task_id,
"submitted_answer": f"AGENT ERROR: {e}",
"question_text": question.content
}
async def run_and_submit_all(profile: gr.OAuthProfile | None):
"""
Fetches all questions, runs the AdvancedAgent on them, submits all answers,
and displays the results.
"""
# --- Determine HF Space Runtime URL and Repo URL ---
space_id = os.getenv("SPACE_ID")
if profile:
username = f"{profile.username}"
print(f"User logged in: {username}")
else:
print("User not logged in.")
return "Please Login to Hugging Face with the button.", None
api_url = DEFAULT_API_URL
questions_url = f"{api_url}/questions"
submit_url = f"{api_url}/submit"
# 1. Instantiate Agent
try:
agent = AdvancedAgent()
except Exception as e:
print(f"Error instantiating agent: {e}")
return f"Error initializing agent: {e}", None
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main"
print(agent_code)
# 2. Fetch or Load Questions
questions_data = load_cached_questions()
if questions_data is None:
print(f"Fetching questions from: {questions_url}")
try:
session = create_retry_session()
response = session.get(questions_url, timeout=15)
response.raise_for_status()
raw_questions = response.json()
if not raw_questions:
print("Fetched questions list is empty.")
return "Fetched questions list is empty or invalid format.", None
questions_data = [
{
"task_id": item["task_id"],
"question": HumanMessage(content=item["question"])
}
for item in raw_questions
]
print(f"Fetched {len(questions_data)} questions.")
cache_questions(questions_data)
except requests.exceptions.RequestException as e:
print(f"Error fetching questions: {e}")
# Fallback to cache even if expired
questions_data = load_cached_questions()
if questions_data:
print("Using expired cache due to API failure.")
else:
return f"Error fetching questions and no valid cache: {e}", None
except requests.exceptions.JSONDecodeError as e:
print(f"Error decoding JSON response: {e}")
return f"Error decoding server response: {e}", None
except Exception as e:
print(f"An unexpected error occurred fetching questions: {e}")
return f"An unexpected error occurred: {e}", None
# 3. Run Agent Asynchronously
results_log = []
answers_payload = []
print(f"Running agent on {len(questions_data)} questions asynchronously...")
with ThreadPoolExecutor(max_workers=5) as executor:
tasks = [process_question(agent, item) for item in questions_data]
responses = await asyncio.gather(*tasks, return_exceptions=True)
for response in responses:
if isinstance(response, Exception):
print(f"Unexpected error in async processing: {response}")
continue
task_id = response["task_id"]
submitted_answer = response["submitted_answer"]
question_text = response["question_text"]
answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer})
results_log.append({
"Task ID": task_id,
"Question": question_text,
"Submitted Answer": submitted_answer
})
if not answers_payload:
print("Agent did not produce any answers to submit.")
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log)
# 4. Prepare Submission
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload}
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..."
print(status_update)
# 5. Submit
print(f"Submitting {len(answers_payload)} answers to: {submit_url}")
try:
session = create_retry_session()
response = session.post(submit_url, json=submission_data, timeout=60)
response.raise_for_status()
result_data = response.json()
final_status = (
f"Submission Successful!\n"
f"User: {result_data.get('username')}\n"
f"Overall Score: {result_data.get('score', 'N/A')}% "
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n"
f"Message: {result_data.get('message', 'No message received.')}"
)
print("Submission successful.")
results_df = pd.DataFrame(results_log)
return final_status, results_df
except requests.exceptions.HTTPError as e:
error_detail = f"Server responded with status {e.response.status_code}."
try:
error_json = e.response.json()
error_detail += f" Detail: {error_json.get('detail', e.response.text)}"
except requests.exceptions.JSONDecodeError:
error_detail += f" Response: {e.response.text[:500]}"
status_message = f"Submission Failed: {error_detail}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except requests.exceptions.Timeout:
status_message = "Submission Failed: The request timed out."
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except requests.exceptions.RequestException as e:
status_message = f"Submission Failed: Network error - {e}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except Exception as e:
status_message = f"An unexpected error occurred during submission: {e}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
# --- Build Gradio Interface using Blocks ---
with gr.Blocks() as demo:
gr.Markdown("# Advanced Agent Evaluation Runner")
gr.Markdown(
"""
**Instructions:**
1. Modify the `agent.py` to define your agent's logic, tools, and packages.
2. Log in to your Hugging Face account using the button below.
3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score.
---
**Disclaimers:**
The submission process may take time due to the number of questions.
Questions are cached locally to reduce API calls.
"""
)
gr.LoginButton()
run_button = gr.Button("Run Evaluation & Submit All Answers")
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False)
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True)
run_button.click(
fn=run_and_submit_all,
outputs=[status_output, results_table]
)
if __name__ == "__main__":
print("\n" + "-"*30 + " App Starting " + "-"*30)
space_host_startup = os.getenv("SPACE_HOST")
space_id_startup = os.getenv("SPACE_ID")
if space_host_startup:
print(f"✅ SPACE_HOST found: {space_host_startup}")
print(f" Runtime URL should be: https://{space_host_startup}.hf.space")
else:
print("ℹ️ SPACE_HOST environment variable not found (running locally?).")
if space_id_startup:
print(f"✅ SPACE_ID found: {space_id_startup}")
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}")
print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main")
else:
print("ℹ️ SPACE_ID environment variable not found (running locally?).")
print("-"*(60 + len(" App Starting ")) + "\n")
print("Launching Gradio Interface for Advanced Agent Evaluation...")
demo.launch(debug=True, share=False)