mhattingpete's picture
fixed issue with missing api_url
4a8d3f6
raw
history blame
7.94 kB
import asyncio
import json
import os
from pathlib import Path
import aiohttp
import gradio as gr
import pandas as pd
from agent import Agent
# --- Constants --------------------------------------------------------------
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
CACHE_PATH = Path("answers_cache.json") # local answer cache
MAX_CONCURRENCY = int(os.getenv("MAX_CONCURRENCY", 8)) # tune if needed
# ----------------------------------------------------------------------------
# ---------------------------------------------------------------------------
# Small helpers
# ---------------------------------------------------------------------------
def load_cache() -> dict[str, str]:
if CACHE_PATH.is_file():
try:
return json.loads(CACHE_PATH.read_text())
except Exception:
print("⚠️ Cache file corrupt – starting fresh.")
return {}
def save_cache(cache: dict[str, str]) -> None:
tmp = CACHE_PATH.with_suffix(".tmp")
tmp.write_text(json.dumps(cache, ensure_ascii=False, indent=2))
tmp.replace(CACHE_PATH)
# ---------------------------------------------------------------------------
# Core async logic
# ---------------------------------------------------------------------------
async def _fetch_questions(
session: aiohttp.ClientSession, url: str
) -> list[dict]:
async with session.get(url, timeout=15) as r:
r.raise_for_status()
return await r.json()
async def _submit_answers(session: aiohttp.ClientSession, url: str, data: dict):
async with session.post(url, json=data, timeout=60) as r:
r.raise_for_status()
return await r.json()
async def _run_agent_async(
agent: Agent,
question: str,
task_id: str | int,
file_name: str,
cache: dict[str, str],
semaphore: asyncio.Semaphore,
) -> tuple[str | int, str]:
"""
Run the agent in a threadpool (because most agents are sync / blocking),
respecting concurrency limits, and fill the cache.
"""
if str(task_id) in cache:
return task_id, cache[str(task_id)]
loop = asyncio.get_running_loop()
async with semaphore:
answer = await loop.run_in_executor(
None, agent, question, task_id, file_name, DEFAULT_API_URL
) # execute in default thread‑pool
cache[str(task_id)] = answer
return task_id, answer
async def _async_main(profile: gr.OAuthProfile | None):
"""
Async counterpart of run_and_submit_all; returns (status_msg, results_df)
"""
if not profile:
return "Please login with the Hugging Face button.", None
username = profile.username
space_id = os.getenv("SPACE_ID")
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main"
api_url = DEFAULT_API_URL
questions_url = f"{api_url}/questions"
submit_url = f"{api_url}/submit"
# 1. Build agent (sync, cheap)
try:
agent = Agent()
except Exception as e:
return f"Error initializing agent: {e}", None
# 2. Fetch questions
async with aiohttp.ClientSession() as session:
try:
questions = await _fetch_questions(session, questions_url)
except Exception as e:
return f"Error fetching questions: {e}", None
if not questions:
return "Fetched questions list is empty.", None
# 3. Run agent with cache + limited concurrency
cache = load_cache()
sem = asyncio.Semaphore(MAX_CONCURRENCY)
coros = [
_run_agent_async(
agent, q["question"], q["task_id"], q["file_name"], cache, sem
)
for q in questions
if q.get("task_id") and q.get("question") is not None
]
results_log: list[dict] = []
answers_json: list[dict] = []
for task_id, answer in await asyncio.gather(*coros):
answers_json.append(
{"task_id": task_id, "submitted_answer": answer}
)
results_log.append(
{
"Task ID": task_id,
"Question": next(
(
q["question"]
for q in questions
if q["task_id"] == task_id
),
"",
),
"Submitted Answer": answer,
}
)
# 3b. Persist cache for later runs
save_cache(cache)
if not answers_json:
return "Agent produced no answers.", pd.DataFrame(results_log)
# 4. Submit
submission = {
"username": username.strip(),
"agent_code": agent_code,
"answers": answers_json,
}
try:
result = await _submit_answers(session, submit_url, submission)
except Exception as e:
status = f"Submission failed: {e}"
return status, pd.DataFrame(results_log)
# Format success message
final = (
"Submission Successful!\n"
f"User: {result.get('username')}\n"
f"Overall Score: {result.get('score', 'N/A')}% "
f"({result.get('correct_count', '?')}/{result.get('total_attempted', '?')} correct)\n"
f"Message: {result.get('message', 'No message received.')}"
)
return final, pd.DataFrame(results_log)
# ---------------------------------------------------------------------------
# Thin sync wrapper required by gradio
# ---------------------------------------------------------------------------
def run_and_submit_all(profile: gr.OAuthProfile | None):
"""
Synchronous façade expected by Gradio; just dispatches to asyncio.
"""
return asyncio.run(_async_main(profile))
# ---------------------------------------------------------------------------
# Gradio UI (unchanged except for doc‑string tweaks)
# ---------------------------------------------------------------------------
with gr.Blocks() as demo:
gr.Markdown("# Async‑cached Agent Evaluation Runner")
gr.Markdown(
"""
**Quick‑start**
1. Log in with the HF button (needed for ranking).
2. Hit **Run Evaluation & Submit All Answers** – answers are cached
locally so reruns are instant; agent calls & HTTP are parallel.
"""
)
gr.LoginButton()
run_btn = gr.Button("Run Evaluation & Submit All Answers")
status_box = gr.Textbox(label="Status / Submission Result", lines=5)
results_table = gr.DataFrame(label="Questions & Answers", wrap=True)
run_btn.click(
fn=run_and_submit_all,
outputs=[status_box, results_table],
)
if __name__ == "__main__":
print("\n" + "-" * 30 + " App Starting " + "-" * 30)
# Check for SPACE_HOST and SPACE_ID at startup for information
space_host_startup = os.getenv("SPACE_HOST")
space_id_startup = os.getenv("SPACE_ID") # Get SPACE_ID at startup
if space_host_startup:
print(f"✅ SPACE_HOST found: {space_host_startup}")
print(
f" Runtime URL should be: https://{space_host_startup}.hf.space"
)
else:
print(
"ℹ️ SPACE_HOST environment variable not found (running locally?)."
)
if space_id_startup: # Print repo URLs if SPACE_ID is found
print(f"✅ SPACE_ID found: {space_id_startup}")
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}")
print(
f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main"
)
else:
print(
"ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined."
)
print("-" * (60 + len(" App Starting ")) + "\n")
print("Launching Gradio Interface for Basic Agent Evaluation...")
demo.launch(debug=True, share=False)