Spaces:
Sleeping
Sleeping
import os | |
import gradio as gr | |
import requests | |
import pandas as pd | |
from pathlib import Path | |
import tempfile | |
from smolagents import CodeAgent, OpenAIServerModel | |
from smolagents import DuckDuckGoSearchTool, WikipediaSearchTool | |
from tools import AnswerTool, SpeechToTextTool, ExcelToTextTool | |
# --- Constants --- | |
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
def download_file_if_any(base_api_url: str, task_id: str) -> str | None: | |
""" | |
Try GET /files/{task_id}. | |
β’ On HTTP 200 β save to a temp dir and return local path. | |
β’ On 404 β return None. | |
""" | |
url = f"{base_api_url}/files/{task_id}" | |
try: | |
resp = requests.get(url, timeout=30) | |
if resp.status_code == 404: | |
return None | |
resp.raise_for_status() | |
except requests.exceptions.HTTPError as e: | |
raise e | |
cdisp = resp.headers.get("content-disposition", "") | |
filename = task_id | |
if "filename=" in cdisp: | |
import re | |
m = re.search(r'filename="([^"]+)"', cdisp) | |
if m: | |
filename = m.group(1) | |
tmp_dir = Path(tempfile.gettempdir()) / "gaia_files" | |
tmp_dir.mkdir(exist_ok=True) | |
file_path = tmp_dir / filename | |
with open(file_path, "wb") as f: | |
f.write(resp.content) | |
return str(file_path) | |
class BasicAgent: | |
def __init__(self): | |
model = OpenAIServerModel(model_id="gpt-4o") | |
# Tool priority: Wiki β Web β Python REPL (via base tools) β Audio β Excel β Fallback | |
tools = [ | |
WikipediaSearchTool(), | |
DuckDuckGoSearchTool(), | |
SpeechToTextTool(), | |
ExcelToTextTool(), | |
AnswerTool(), | |
] | |
self.agent = CodeAgent( | |
model=model, | |
tools=tools, | |
add_base_tools=True, # enable python REPL, calculator, etc. | |
max_steps=6, # allow up to 6 planning/execution steps | |
verbosity_level=0, | |
planning_interval=1, | |
) | |
def __call__(self, question: str, task_id: str = None) -> str: | |
prompt = question | |
if task_id: | |
file_path = download_file_if_any(DEFAULT_API_URL, task_id) | |
if file_path: | |
prompt += f"\n\n---\nA file was downloaded for this task and saved locally at:\n{file_path}\n---\n" | |
return self.agent.run(prompt) | |
def run_and_submit_all(username): | |
if not username: | |
return "Please enter your Hugging Face username.", None | |
try: | |
resp = requests.get(f"{DEFAULT_API_URL}/questions", timeout=15) | |
if resp.status_code == 429: | |
return "Server rate limited the requests. Please wait a moment and try again.", None | |
resp.raise_for_status() | |
questions = resp.json() | |
except Exception as e: | |
return f"Error fetching questions: {e}", None | |
agent = BasicAgent() | |
results = [] | |
payload = [] | |
for q in questions: | |
tid = q.get("task_id") | |
text = q.get("question") | |
if not (tid and text): | |
continue | |
try: | |
ans = agent(text, task_id=tid) | |
except Exception as e: | |
ans = f"ERROR: {e}" | |
results.append({"Task ID": tid, "Question": text, "Answer": ans}) | |
payload.append({"task_id": tid, "submitted_answer": ans}) | |
if not payload: | |
return "Agent returned no answers.", pd.DataFrame(results) | |
submission = { | |
"username": username, | |
"agent_code": f"https://huggingface.co/spaces/{os.getenv('SPACE_ID')}/tree/main", | |
"answers": payload, | |
} | |
try: | |
sub_resp = requests.post(f"{DEFAULT_API_URL}/submit", json=submission, timeout=60) | |
sub_resp.raise_for_status() | |
data = sub_resp.json() | |
status = ( | |
f"Submission Successful!\n" | |
f"User: {data.get('username')}\n" | |
f"Score: {data.get('score')}% ({data.get('correct_count')}/{data.get('total_attempted')})\n" | |
f"Message: {data.get('message')}" | |
) | |
except Exception as e: | |
status = f"Submission Failed: {e}" | |
return status, pd.DataFrame(results) | |
def test_random_question(username): | |
if not username: | |
return "Please enter your Hugging Face username.", "" | |
try: | |
q = requests.get(f"{DEFAULT_API_URL}/random-question", timeout=15).json() | |
question = q.get("question", "") | |
ans = BasicAgent()(question, task_id=q.get("task_id")) | |
return question, ans | |
except Exception as e: | |
return f"Error during test: {e}", "" | |
# --- Gradio UI --- | |
with gr.Blocks() as demo: | |
gr.Markdown("# Basic Agent Evaluation Runner") | |
gr.Markdown( | |
""" | |
**Instructions:** | |
1. Enter your Hugging Face username. | |
2. Use **Test Random Question** to check a single question. | |
3. Use **Run Evaluation & Submit All Answers** to evaluate on all questions. | |
""" | |
) | |
username_input = gr.Textbox(label="Hugging Face Username", placeholder="your-username") | |
run_btn = gr.Button("Run Evaluation & Submit All Answers") | |
test_btn = gr.Button("Test Random Question") | |
status_out = gr.Textbox(label="Status / Result", lines=5, interactive=False) | |
table_out = gr.DataFrame(label="Full Results Table", wrap=True) | |
question_out = gr.Textbox(label="Random Question", lines=3, interactive=False) | |
answer_out = gr.Textbox(label="Agent Answer", lines=3, interactive=False) | |
run_btn.click(fn=run_and_submit_all, inputs=[username_input], outputs=[status_out, table_out]) | |
test_btn.click(fn=test_random_question, inputs=[username_input], outputs=[question_out, answer_out]) | |
if __name__ == "__main__": | |
demo.launch(debug=True, share=False) |