SergeyO7 commited on
Commit
e948527
·
verified ·
1 Parent(s): 5b72b9c

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +33 -28
app.py CHANGED
@@ -15,15 +15,16 @@ import aiohttp
15
  import asyncio
16
  import json
17
  from agent import MagAgent
18
- import base64
19
 
20
  # (Keep Constants as is)
21
  # --- Constants ---
22
  DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
23
 
24
  # Rate limiting configuration
25
- MAX_CONCURRENT_REQUESTS = 1 # Adjust based on performance needs
26
- REQUEST_DELAY = 9.0 # 2 seconds delay to meet 30 RPM
 
27
 
28
  async def fetch_questions(session: aiohttp.ClientSession, questions_url: str) -> list:
29
  """Fetch questions asynchronously."""
@@ -44,7 +45,8 @@ async def fetch_questions(session: aiohttp.ClientSession, questions_url: str) ->
44
  print(f"An unexpected error occurred fetching questions: {e}")
45
  return None
46
 
47
- async def submit_answers(session: aiohttp.ClientSession, submit_url: str, submission_data: dict) -> dict:
 
48
  """Submit answers asynchronously."""
49
  try:
50
  async with session.post(submit_url, json=submission_data, timeout=60) as response:
@@ -60,20 +62,28 @@ async def submit_answers(session: aiohttp.ClientSession, submit_url: str, submis
60
  print(f"An unexpected error occurred during submission: {e}")
61
  return None
62
 
63
- async def process_question(agent, question_text: str, task_id: str, semaphore: asyncio.Semaphore, results_log: list):
64
- """Process a single question with rate limiting."""
65
- async with semaphore:
66
- try:
 
 
 
 
 
 
 
 
 
67
  submitted_answer = await agent(question_text)
68
  results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer})
69
  return {"task_id": task_id, "submitted_answer": submitted_answer}
70
- await asyncio.sleep(REQUEST_DELAY) # Enforce delay after each request
71
- except Exception as e:
72
- print(f"Error running agent on task {task_id}: {e}")
73
- results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"})
74
- return None
75
- finally:
76
- await asyncio.sleep(REQUEST_DELAY) # Enforce delay after each request
77
 
78
  async def run_and_submit_all(profile: gr.OAuthProfile | None):
79
  """
@@ -113,26 +123,21 @@ async def run_and_submit_all(profile: gr.OAuthProfile | None):
113
  return "Fetched questions list is empty or invalid format.", None
114
 
115
  # 3. Run Agent on Questions
116
-
117
- # Initialize semaphore and results log
118
- semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)
119
  results_log = []
120
  answers_payload = []
121
-
122
  print(f"Running agent on {len(questions_data)} questions...")
123
 
124
- tasks = [
125
- process_question(agent, item["question"], item["task_id"], semaphore, results_log)
126
- for item in questions_data
127
- if item.get("task_id") and item.get("question") is not None
128
- ]
129
- results = await asyncio.gather(*tasks)
130
- answers_payload = [r for r in results if r is not None]
131
-
132
  if not answers_payload:
133
  print("Agent did not produce any answers to submit.")
134
  return "Agent did not produce any answers to submit.", pd.DataFrame(results_log)
135
-
136
  # 4. Prepare Submission
137
  submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload}
138
  status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..."
 
15
  import asyncio
16
  import json
17
  from agent import MagAgent
18
+ from token_bucket import TokenBucket
19
 
20
  # (Keep Constants as is)
21
  # --- Constants ---
22
  DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
23
 
24
  # Rate limiting configuration
25
+ RATE_LIMIT = 15 # Requests per minute
26
+ TOKEN_BUCKET_CAPACITY = RATE_LIMIT
27
+ TOKEN_BUCKET_REFILL_RATE = RATE_LIMIT / 60.0 # Tokens per second
28
 
29
  async def fetch_questions(session: aiohttp.ClientSession, questions_url: str) -> list:
30
  """Fetch questions asynchronously."""
 
45
  print(f"An unexpected error occurred fetching questions: {e}")
46
  return None
47
 
48
+ async def submit_answers(session: aiohttp.ClientSession, submit_url: str,
49
+ submission_data: dict) -> dict:
50
  """Submit answers asynchronously."""
51
  try:
52
  async with session.post(submit_url, json=submission_data, timeout=60) as response:
 
62
  print(f"An unexpected error occurred during submission: {e}")
63
  return None
64
 
65
+ async def process_question(agent, question_text: str, task_id: str, results_log: list):
66
+ """Process a single question with global rate limiting."""
67
+ try:
68
+ # Wait for a token before proceeding
69
+ await token_bucket.consume(1)
70
+ submitted_answer = await agent(question_text)
71
+ results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer})
72
+ return {"task_id": task_id, "submitted_answer": submitted_answer}
73
+ except aiohttp.ClientResponseError as e:
74
+ if e.status == 429:
75
+ print(f"Rate limit hit for task {task_id}. Retrying after delay...")
76
+ await asyncio.sleep(60 / RATE_LIMIT) # Wait before retry
77
+ await token_bucket.consume(1)
78
  submitted_answer = await agent(question_text)
79
  results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer})
80
  return {"task_id": task_id, "submitted_answer": submitted_answer}
81
+ else:
82
+ raise
83
+ except Exception as e:
84
+ print(f"Error running agent on task {task_id}: {e}")
85
+ results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"})
86
+ return None
 
87
 
88
  async def run_and_submit_all(profile: gr.OAuthProfile | None):
89
  """
 
123
  return "Fetched questions list is empty or invalid format.", None
124
 
125
  # 3. Run Agent on Questions
126
+ # Process questions sequentially with rate limiting
 
 
127
  results_log = []
128
  answers_payload = []
 
129
  print(f"Running agent on {len(questions_data)} questions...")
130
 
131
+ for item in questions_data:
132
+ if item.get("task_id") and item.get("question"):
133
+ result = await process_question(agent, item["question"], item["task_id"], results_log)
134
+ if result:
135
+ answers_payload.append(result)
136
+
 
 
137
  if not answers_payload:
138
  print("Agent did not produce any answers to submit.")
139
  return "Agent did not produce any answers to submit.", pd.DataFrame(results_log)
140
+
141
  # 4. Prepare Submission
142
  submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload}
143
  status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..."