TzurVaich's picture
Abort if status file not found
64d4c20
import re
import os
import gradio as gr
import requests
import inspect
import datetime
from markdownify import markdownify
import textwrap
from textwrap import dedent
import pandas as pd
import wikipedia
import requests
from requests.exceptions import RequestException
from youtube_transcript_api import YouTubeTranscriptApi
from urllib.parse import urlparse, parse_qs
import json
from dotenv import load_dotenv
# Import smolagents components
from smolagents import (
tool,
CodeAgent,
HfApiModel,
DuckDuckGoSearchTool,
FinalAnswerTool,
OpenAIServerModel,
ToolCallingAgent,
)
#from smolagents.tools.transcriber import TranscriberTool
from pypdf import PdfReader
# Load environment variables from .env file
load_dotenv()
# (Keep Constants as is)
# --- Constants ---
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
# --- Basic Agent Definition ---
# ----- THIS IS WERE YOU CAN BUILD WHAT YOU WANT ------
# Get the directory of the current script
current_dir = os.path.dirname(os.path.abspath(__file__))
# Create path to the validation directory
GAIA_LEVEL1_VALIDATION_FILES_PATH = os.path.join(
current_dir, "GAIA_level1", "validation")
# Initialize the search tool
search_tool = DuckDuckGoSearchTool()
@tool
def load_docx_file(file_path: str) -> str:
"""
Loads and returns text and tables from a DOCX file in their original order.
Args:
file_path: Path to the .docx file
Returns:
String with paragraphs and markdown-formatted tables in document order.
"""
from docx import Document
from docx.table import Table
from docx.text.paragraph import Paragraph
doc = Document(file_path)
content = []
table_count = 0
# Helper function to convert a table to markdown
def table_to_markdown(table, table_idx):
rows = []
for row in table.rows:
cells = [cell.text.strip() for cell in row.cells]
rows.append("| " + " | ".join(cells) + " |")
# Add markdown separator after header if table has at least one row
if rows:
separator = "| " + \
" | ".join(["---"] * len(table.rows[0].cells)) + " |"
markdown = f"\n### Table {table_idx}\n" + \
"\n".join([rows[0], separator] + rows[1:])
return markdown
return ""
# Iterate through the document's block elements in order
for block in doc.element.body:
if block.tag.endswith('}p'): # Paragraph
para = Paragraph(block, doc)
text = para.text.strip()
if text:
content.append(text)
elif block.tag.endswith('}tbl'): # Table
table_count += 1
table = Table(block, doc)
markdown = table_to_markdown(table, table_count)
if markdown:
content.append(markdown)
return "\n\n".join(content)
@tool
def load_pdf_file(file_path: str) -> str:
"""
Loads and returns text content from a PDF file.
Args:
file_path (str): The path to the .pdf file.
Returns:
str: The extracted text content from the PDF file. Returns an error message if the file cannot be processed.
"""
extracted_text = []
try:
# Check if the file exists
if not os.path.exists(file_path):
return f"Error: PDF file not found at path: {file_path}"
# Open the PDF file
reader = PdfReader(file_path)
# Iterate through each page and extract text
for page_num, page in enumerate(reader.pages):
text = page.extract_text()
if text: # Ensure text was extracted
extracted_text.append(f"--- Page {page_num + 1} ---\n{text.strip()}")
else:
extracted_text.append(f"--- Page {page_num + 1} --- (No text extracted)")
# Join the text from all pages
full_text = "\n\n".join(extracted_text)
if not full_text.strip():
return f"Warning: No text could be extracted from the PDF file: {file_path}"
return full_text
except Exception as e:
# Catch any other exceptions during PDF processing
return f"Error processing PDF file '{file_path}': {type(e).__name__}: {e}"
@tool
def load_xlsx_file_as_markdown(file_path: str) -> str:
"""
Loads data from all sheets of an XLSX file and returns it as a single
markdown-formatted string.
Args:
file_path (str): The path to the .xlsx file.
Returns:
str: A string containing the data from all sheets, formatted as markdown tables.
Returns an error message if the file cannot be processed.
"""
extracted_content = []
try:
# Check if the file exists
if not os.path.exists(file_path):
return f"Error: XLSX file not found at path: {file_path}"
# Read all sheets from the Excel file into a dictionary of DataFrames
# sheet_name=None reads all sheets
excel_data = pd.read_excel(file_path, sheet_name=None)
if not excel_data:
return f"Warning: No sheets found or the XLSX file is empty: {file_path}"
# Iterate through each sheet and convert its DataFrame to markdown
for sheet_name, df in excel_data.items():
if not df.empty:
# Convert DataFrame to markdown table string, excluding the index
markdown_table = df.to_markdown(index=False)
extracted_content.append(f"--- Sheet: {sheet_name} ---\n{markdown_table}")
else:
extracted_content.append(f"--- Sheet: {sheet_name} --- (Sheet is empty)")
# Join the content from all sheets
full_content = "\n\n".join(extracted_content)
if not full_content.strip():
return f"Warning: No data could be extracted from the XLSX file: {file_path}"
return full_content
except FileNotFoundError:
return f"Error: XLSX file not found at path: {file_path}"
except Exception as e:
# Catch pandas-specific errors or other general exceptions
return f"Error processing XLSX file '{file_path}': {type(e).__name__}: {e}"
@tool
def load_xlsx_file_as_dataframe(file_path: str) -> pd.DataFrame:
"""
Loads data from the first sheet of an XLSX file and returns it as a pandas DataFrame.
Args:
file_path (str): The path to the .xlsx file.
Returns:
pd.DataFrame: A pandas DataFrame containing the data from the first sheet.
Returns an empty DataFrame if the file is empty or cannot be processed.
"""
try:
# Check if the file exists
if not os.path.exists(file_path):
print(
f"Warning: XLSX file not found at path: {file_path}. Returning empty DataFrame.")
return pd.DataFrame()
# Read the first sheet (index 0) from the Excel file
# If the first sheet is empty or doesn't exist, it might raise an error or return empty
# Using try-except to handle cases where the sheet might not exist or is unreadable
df = pd.read_excel(file_path, sheet_name=0)
# Return the DataFrame (it will be empty if the sheet was empty)
return df
except FileNotFoundError:
# Return empty DataFrame if file not found (alternative to raising error)
print(
f"Warning: XLSX file not found at path: {file_path}. Returning empty DataFrame.")
return pd.DataFrame()
except Exception as e:
# Catch pandas-specific errors or other general exceptions
print(
f"Error processing XLSX file '{file_path}': {type(e).__name__}: {e}. Returning empty DataFrame.")
return pd.DataFrame() # Return empty DataFrame on error
@tool
def visit_webpage(url: str) -> str:
"""Visits a webpage at the given URL and returns its content as a markdown string.
Args:
url: The URL of the webpage to visit.
Returns:
The content of the webpage converted to Markdown, or an error message if the request fails.
"""
try:
# Send a GET request to the URL
response = requests.get(url, timeout=10)
response.raise_for_status() # Raise an exception for bad status codes
# Convert the HTML content to Markdown
markdown_content = markdownify(response.text).strip()
# Remove multiple line breaks
markdown_content = re.sub(r"\n{3,}", "\n\n", markdown_content)
return markdown_content
except RequestException as e:
return f"Error fetching the webpage: {str(e)}"
except Exception as e:
return f"An unexpected error occurred: {str(e)}"
@tool
def query_wikipedia(query: str, sentences: int = 5) -> str:
"""
Searches Wikipedia for a given query and returns a summary of the most relevant page.
Use this tool especially when Wikipedia is mentioned in the context.
Args:
query (str): The search term or question for Wikipedia.
sentences (int): The desired number of sentences for the summary (default: 5).
Returns:
str: A summary of the Wikipedia page, a list of options if the query is ambiguous,
or an error message if the page is not found or another error occurs.
"""
try:
# Set language if needed, defaults to English
# wikipedia.set_lang("en")
# auto_suggest=False prevents Wikipedia from guessing if the exact title isn't found.
# We handle potential suggestions in the PageError exception if needed.
summary = wikipedia.summary(
query, sentences=sentences, auto_suggest=False)
# Optionally, get the actual page title found
try:
page_title = wikipedia.page(query, auto_suggest=False).title
return f"Wikipedia Summary for '{page_title}':\n\n{summary}"
except wikipedia.exceptions.PageError:
# If getting the page title fails after summary worked (unlikely but possible)
return f"Wikipedia Summary (Query: '{query}'):\n\n{summary}"
except wikipedia.exceptions.DisambiguationError as e:
# If getting the page title causes disambiguation after summary worked
return f"Wikipedia Summary (Query: '{query}'):\n\n{summary}\n\nNote: Query might be ambiguous. Options include: {e.options}"
except wikipedia.exceptions.DisambiguationError as e:
# Handle cases where the query matches multiple pages
options_list = "\n - ".join(e.options[:10]) # Limit to 10 options
return (f"Wikipedia query '{query}' is ambiguous. "
f"Please be more specific or choose from these options:\n - {options_list}")
except wikipedia.exceptions.PageError:
# Handle cases where the page doesn't exist
# Try searching for suggestions
search_results = wikipedia.search(query, results=5)
if search_results:
suggestions = "\n - ".join(search_results)
return (f"Wikipedia page for '{query}' not found. "
f"Did you mean one of these?\n - {suggestions}")
else:
return f"Wikipedia page for '{query}' not found, and no suggestions available."
except Exception as e:
# Handle other potential errors (network issues, etc.)
return f"Error querying Wikipedia for '{query}': {type(e).__name__}: {e}"
@tool
def openai_reasoning(question: str) -> str:
"""
Uses OpenAI's GPT-4o model for in-depth reasoning and analysis of complex questions.
Use this for riddles, puzzles, or questions that require deep thinking rather than code execution.
Args:
question: The question or problem to analyze using GPT-4o's reasoning capabilities.
Returns:
The reasoned answer to the question.
"""
try:
# Create a specialized reasoning model instance
reasoning_model = OpenAIServerModel(
"gpt-4o",
max_tokens=1024,
temperature=0.05
)
# Craft effective system and user prompts
messages = [
{
"role": "system",
"content": """You are an expert reasoning engine specialized in solving complex problems, puzzles and riddles.
When tackling problems:
1. Understand the question thoroughly
2. Break down complex problems into parts
3. Consider multiple approaches before deciding on a solution
4. Think step by step
5. Provide only the final answer unless asked for reasoning
Be precise and concise in your final response."""
},
{
"role": "user",
"content": question
}
]
# Get the response
response = reasoning_model(messages)
# Return just the content from the response
return response.content
except Exception as e:
return f"Error when processing with reasoning model: {str(e)}"
@tool
def extract_youtube_id(url: str) -> str:
"""
Extract the YouTube video ID from a URL.
Args:
url: The YouTube video URL (may contain spaces or formatting issues)
Returns:
The YouTube video ID
"""
# Clean the URL by removing extra spaces
cleaned_url = url.replace(" ", "")
try:
# Handle different YouTube URL formats
parsed_url = urlparse(cleaned_url)
# Check for video ID in query parameters (youtube.com/watch?v=VIDEO_ID)
query_params = parse_qs(parsed_url.query)
if 'v' in query_params:
return query_params['v'][0]
# Check for youtu.be short links (youtu.be/VIDEO_ID)
if 'youtu.be' in parsed_url.netloc:
path = parsed_url.path.strip('/')
return path
# Check for embedded format (youtube.com/embed/VIDEO_ID)
if '/embed/' in parsed_url.path:
return parsed_url.path.split('/embed/')[1]
# If URL parsing fails, try regex patterns
patterns = [
r'(?:youtube\.com\/watch\?v=|youtu\.be\/|youtube\.com\/embed\/|youtube\.com\/v\/|youtube\.com\/e\/|youtube\.com\/watch\?.*v=|youtube\.com\/watch\?.*&v=)([^&\s]+)',
r'(?:youtube\.com\/shorts\/)([^&\s]+)',
r'v=([^&\s]+)'
]
for pattern in patterns:
match = re.search(pattern, cleaned_url)
if match:
return match.group(1)
# Try to extract directly from the raw string as a last resort
if 'v=' in cleaned_url:
v_index = cleaned_url.find('v=')
video_id = cleaned_url[v_index +
2:].split('&')[0].split('#')[0].split('?')[0].split('/')[0]
# YouTube IDs are typically 11 characters
if video_id and len(video_id) in range(10, 12):
return video_id
return "Could not extract a valid YouTube video ID from the provided URL."
except Exception as e:
# Attempt direct extraction if parsing fails
if 'v=' in url:
parts = url.split('v=')
if len(parts) > 1:
return parts[1].split('&')[0].split('#')[0].strip()
return f"Error extracting YouTube ID: {str(e)}"
@tool
def get_youtube_transcript(video_id: str, language: str = "en") -> str:
"""
Get the transcript of a YouTube video.
Args:
video_id: The YouTube video ID
language: The language code for the transcript (default: 'en' for English)
Returns:
The transcript text of the YouTube video
"""
try:
transcript_list = YouTubeTranscriptApi.get_transcript(
video_id, languages=[language])
# Combine all transcript segments into a single text
transcript_text = ""
for segment in transcript_list:
transcript_text += segment['text'] + " "
return transcript_text.strip()
except Exception as e:
return f"Error retrieving transcript: {str(e)}"
@tool
def load_text_file(file_path: str, detect_format: bool = True) -> str:
"""
Loads a text file and optionally detects and processes its format (plain text, code, JSON, etc.).
Args:
file_path: Path to the text file to load
detect_format: Whether to automatically detect and process the format (default: True)
Returns:
String containing the file content, possibly formatted based on detected type
"""
if not os.path.exists(file_path):
return f"Error: File not found at {file_path}"
try:
# Read the file content
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
if not detect_format:
return f"File content ({os.path.basename(file_path)}):\n\n{content}"
# Get file extension
_, ext = os.path.splitext(file_path)
ext = ext.lower()
# Handle based on file extension or content detection
if ext in ['.json', '.geojson']:
# Process JSON
try:
parsed_json = json.loads(content)
formatted_json = json.dumps(parsed_json, indent=2)
return f"JSON content ({os.path.basename(file_path)}):\n\n{formatted_json}"
except json.JSONDecodeError:
return f"Warning: File has JSON extension but content is not valid JSON.\n\n{content}"
elif ext in ['.py', '.js', '.ts', '.java', '.c', '.cpp', '.cs', '.php', '.rb', '.go', '.rs', '.swift']:
# It's a code file, return with appropriate formatting
return f"Code file ({os.path.basename(file_path)}, {ext[1:]} language):\n\n{content}"
elif ext in ['.csv', '.tsv']:
# Handle CSV/TSV files with preview
lines = content.strip().split('\n')
preview_lines = lines[:min(10, len(lines))]
preview = '\n'.join(preview_lines)
if len(lines) > 10:
preview += f"\n\n[...and {len(lines) - 10} more lines]"
return f"Tabular data file ({os.path.basename(file_path)}):\n\n{preview}"
else:
# Try to detect JSON content regardless of extension
if content.strip().startswith('{') and content.strip().endswith('}'):
try:
parsed_json = json.loads(content)
formatted_json = json.dumps(parsed_json, indent=2)
return f"Detected JSON content ({os.path.basename(file_path)}):\n\n{formatted_json}"
except json.JSONDecodeError:
pass # Not valid JSON, continue with other detection
# Try to detect if it might be code
code_indicators = [
'def ', 'class ', 'function ', 'import ', 'from ', 'var ', 'let ', 'const ',
'#include', 'package ', 'using ', 'public class', 'int main'
]
if any(indicator in content for indicator in code_indicators):
language = "unknown programming language"
return f"Detected code file ({os.path.basename(file_path)}, {language}):\n\n{content}"
# Check if it's XML/HTML-like
if re.search(r'<\w+>.*?</\w+>', content, re.DOTALL) or content.strip().startswith('<?xml'):
return f"Markup language file ({os.path.basename(file_path)}):\n\n{content}"
# Default to plain text
return f"Plain text file ({os.path.basename(file_path)}):\n\n{content}"
except UnicodeDecodeError:
# Try different encoding if UTF-8 fails
try:
with open(file_path, 'r', encoding='latin-1') as file:
content = file.read()
return f"File content ({os.path.basename(file_path)}, non-UTF-8 encoding):\n\n{content}"
except Exception as e:
return f"Error reading file (encoding issues): {str(e)}"
except Exception as e:
return f"Error reading file: {str(e)}"
@tool
def transcribe_with_whisper(audio_path: str) -> str:
"""
Transcribes audio using OpenAI's Whisper model.
Args:
audio_path: Path to the audio file to transcribe
Returns:
The transcribed text
"""
if not os.path.exists(audio_path):
return f"Error: Audio file not found at {audio_path}"
# Try with faster-whisper if available, fall back to whisper
try:
try:
# Try with faster-whisper first (it's faster)
from faster_whisper import WhisperModel
model = WhisperModel("medium.en", device="cpu")
segments, info = model.transcribe(audio_path)
transcript = " ".join([segment.text for segment in segments])
return transcript
except ImportError:
# Fall back to original whisper
#import whisper
#model = whisper.load_model("small")
#result = model.transcribe(audio_path)
#return result["text"]
return "Failed to transcribe audio with faster-whisper"
except Exception as e:
return f"Error transcribing audio: {str(e)}"
class BasicAgent:
def __init__(self):
print("BasicAgent initialized.")
self.store_questions_to_log_file = False
# Create a filename with current date and time
current_time = datetime.datetime.now().strftime("%Y%m%d_%H%M")
self.filename = f"questions_{current_time}.txt"
if self.store_questions_to_log_file:
print(f"Questions will be written to {self.filename}")
# Clear the file if it exists or create a new one
with open(self.filename, 'w', encoding='utf-8') as f:
f.write('') # Create empty file
# Initialize the Large Language Model
# The model is used by both agents in this simple setup
# mistralai/Mixtral-8x7B-Instruct-v0.1
# meta-llama/Llama-3.3-70B-Instruct
self.model = HfApiModel(model_id="Qwen/Qwen2.5-Coder-32B-Instruct")
#self.model = HfApiModel(model_id="mistralai/Mixtral-8x7B-Instruct-v0.1")
# For TGI container
# self.model = OpenAIServerModel(
# api_base="http://localhost:8080/v1", # If using TGI container
# api_key="not-needed", # Local servers usually don't need API keys
# model_id="Qwen/Qwen3-1.7B"
# )
# self.model = LiteLLMModel(
# model_name="ollama/qwen3:1.7b", # Prefix with 'ollama/' to use the Ollama provider
# api_base="http://localhost:11434", # Your custom Ollama port
# flatten_messages_as_text=True,
# api_key="", # Try passing an empty API key
# )
#print(self.model)
#print(f"Model Name: {self.model.model_name}")
#print(f"API Base: {self.model.api_base}")
# Define the Web Search Agent
# This agent is specialised for searching the web using a specific tool
# self.web_search_agent = CodeAgent(
# model=self.model, # Assign the model to the agent [
# tools=[DuckDuckGoSearchTool(),
# FinalAnswerTool()], # Provide the web search tool
# name="web_search_agent", # Give the agent a name
# # Describe its capability [
# description="""Searches the web for information.
# In the end you have to return a final answer using the `final_answer` tool.""",
# verbosity_level=1, # Set verbosity level for logging
# max_steps=3, # Limit the steps the agent can take
# planning_interval=2,
# )
self.web_search_specialist_agent = ToolCallingAgent(
model=self.model, # Or any other compatible model instance
tools=[
DuckDuckGoSearchTool(),
query_wikipedia, # Make sure this is the @tool decorated function
visit_webpage,
FinalAnswerTool()
],
name="web_search_specialist_agent",
description=textwrap.dedent("""\
This agent specializes in finding information on the web and answering questions based on web content.
**Core Strategy:**
1. **Understand & Plan:** For any query, especially complex ones or those requiring information from specific sources, first formulate a clear, step-by-step plan. Think about what information is needed and which tools are best for each step.
2. **Execute & Adapt:** Execute your plan step-by-step. After each step, review the results and adapt your plan if necessary.
3. **Extract & Synthesize:** Once relevant information is found (e.g., on a webpage), don't just return raw data. Carefully extract the specific piece of information that answers the original question.
**Tool Usage Guidelines:**
- Use the `DuckDuckGoSearchTool` for general web searches, to find broad information, current events, or to locate specific websites or pages when the URL is unknown.
- If `DuckDuckGoSearchTool` returns URLs, evaluate them. If a URL seems promising for answering the question, a subsequent step in your plan should be to use the `visit_webpage` tool.
- Use the `query_wikipedia` tool when the question specifically asks for information from Wikipedia, or when Wikipedia is clearly the most authoritative source (e.g., for definitions, historical events, biographical information).
- Use the `visit_webpage` tool to get the content of a specific URL.
- **Crucially**: After using `visit_webpage`, your next step is to analyze its content and extract the precise information needed to answer the query. Do not just output the entire page content as the answer.
- If the query explicitly mentions a specific website (e.g., "Merriam-Webster", "Cornell Law School website"), your plan should prioritize searching that site.
- Use `DuckDuckGoSearchTool` with site-specific queries (e.g., "site:merriam-webster.com <your actual query terms>").
- If a direct URL from that site is found or can be inferred, use `visit_webpage` to get the content, then extract the specific information.
**Search & Iteration Tactics:**
- Before taking a new action, review the information and results from your previous steps. Use this history to inform your decisions and refine your plan.
- Do not repeat the exact same or very similar queries to the same tool if the initial attempt did not yield useful information. Use knowledge from past attempts to refine your strategy.
- If information is not found, consider: rephrasing your query, trying a different aspect of the question, or using an alternative search tool, always considering what you've learned.
- Be aware of date format sensitivity in searches. If a date is part of your query, try alternative formats (e.g., "27 July 2010" vs "27/7/2010").
**Final Output:**
- In the end, you must return a final answer using the `final_answer` tool, based on the information you have gathered and processed according to your plan.
"""),
verbosity_level=1, # Adjust as needed
max_steps=3, # Adjust as needed
planning_interval=1 # Adjust as needed
)
# Define your model
self.code_model = "gpt-4.1" # or whatever model you're using
reasoning_model = OpenAIServerModel(
self.code_model,
max_completion_tokens=8096
)
# Create your agent with the reasoning tool and other tools
self.reasoning_agent = ToolCallingAgent(
model=reasoning_model,
tools=[openai_reasoning, FinalAnswerTool()],
planning_interval=2,
max_steps=5,
verbosity_level=1,
name="reasoning_agent",
description="""Solves complex problems riddles and puzzles through reasoning rather than code execution.
In the end you have to return a final answer using the `final_answer` tool."""
)
self.youtube_qa_agent = ToolCallingAgent(
model=reasoning_model, #self.model,
tools=[extract_youtube_id, get_youtube_transcript,
FinalAnswerTool()],
name="youtube_qa_agent",
planning_interval=2,
max_steps=5,
verbosity_level=1,
description=textwrap.dedent("""\
You are an expert assistant that can answer questions about YouTube videos by analyzing their transcripts.
When given a YouTube URL and a question, follow these steps IN ORDER:
1. Extract the video ID from the URL using the `extract_youtube_id` tool
2. Retrieve the transcript of the video using the `get_youtube_transcript` tool
3. Provide a clear and concise answer based solely on the transcript content
4. Return your final answer using the `final_answer` tool
IMPORTANT INSTRUCTIONS:
- After getting the transcript, you MUST use the analyze_transcript tool. DO NOT call get_youtube_transcript twice.
- Never skip the analysis step - it's crucial for answering the question correctly.
- Each tool must be used in the correct sequence - ID extraction, then transcript retrieval, then analysis.
If you cannot find a direct answer to the question in the transcript:
- Acknowledge that you couldn't find a specific answer
- Provide the transcript for reference
- Suggest that the user might want to use a different approach
DO NOT run the same tool with the same arguments multiple times.
DO NOT make up information that is not in the transcript.
""")
)
self.python_code_executer = CodeAgent(
model=reasoning_model,
tools=[load_text_file,
FinalAnswerTool()],
name="python_code_executer",
description=textwrap.dedent("""\
You are an expert assistant that can execute Python code.
Execute the Python code and return the final answer using the `final_answer` tool.
"""),
additional_authorized_imports=["json", "re", "pandas", "numpy", "math", "collections", "itertools", "stat", "statistics", "queue", "unicodedata", "time", "random", "datetime"],
verbosity_level=1,
max_steps=5,
planning_interval=1,
#executor_type="e2b",
#use_e2b_executor=True
)
# Initialize the agent with the transcription tool
# self.transcriber_agent = CodeAgent(
# model=reasoning_model,
# tools=[],
# add_base_tools=True,
# name="transcriber_agent",
# description="Transcribes audio file to text, i.e. take audio file and generates the the transcription of the audio.",
# # Add any imports your agent might need
# additional_authorized_imports=["pandas", "numpy"],
# max_steps=5
# )
# Define the Manager Agent
# This agent manages tasks and delegates to other agents
self.manager_agent = CodeAgent(
model=self.model, # Assign the model to the manager
tools=[load_docx_file,
load_pdf_file,
load_xlsx_file_as_dataframe,
load_xlsx_file_as_markdown,
query_wikipedia,
load_text_file,
transcribe_with_whisper,
FinalAnswerTool()],
# Specify the agents this manager oversees
managed_agents=[self.web_search_specialist_agent,
self.reasoning_agent,
self.youtube_qa_agent,
self.python_code_executer],
#self.transcriber_agent],
name="manager_agent", # Give the manager agent a name
description="Manages tasks by delegating to other agents.", # Describe its role
additional_authorized_imports=[
"json", "re", "pandas", "numpy", "math", "collections", "itertools", "stat", "statistics", "queue", "unicodedata", "time", "random", "datetime"], # Allow specific imports
verbosity_level=1, # Set verbosity level
max_steps=5, # Limit the steps
planning_interval=1,
#final_answer_checks=[]
)
print("MultiAgentSystem initialization complete.")
def __call__(self, question: str,
file_name: str = None) -> str:
print(f"Agent received question (first 50 chars): {question[:50]}...")
# For all other questions, use the manager agent with web search
# manager_prompt = dedent(f"""
# I need to answer the following question accurately:
# {question}
# Please analyze this question and determine the best approach to answer it.
# If needed, use web search to find relevant information.
# Provide a concise, accurate answer to the question.
# """)
manager_prompt = textwrap.dedent(f"""
I need to answer the following question accurately:
{question}
using the following file: '{file_name}' if provided.
Please analyze this question and determine the best approach to answer it
using the available agents and tools.
Note that you are provided with a special agent to resolve logical problems, riddles and puzzles named "reasoning_agent".
If needed, use any of the available tools to find or load the relevant information.
Provide a concise, accurate answer to the question.
""")
manager_agent_response = "I apologize, but I couldn't find an answer to this question."
source = ""
try:
manager_agent_response = self.manager_agent.run(manager_prompt)
source = "manager_agent"
# Check if the answer contains a missing tool warning
# if "Missing Tool Warning:" in manager_agent_response:
# return manager_agent_response
except Exception as e:
print(f"Error in manager agent: {e}")
source = f"Exception {e} "
# Append the question to the file
if self.store_questions_to_log_file:
with open(self.filename, 'a', encoding='utf-8') as f:
f.write(f"{question}\n")
f.write(f"ANSWER by {source}: {manager_agent_response}\n")
f.write(f"{'*'*50}\n")
print(f"Final answer: {manager_agent_response}")
return manager_agent_response
def run_and_submit_all(profile: gr.OAuthProfile | None):
"""
Fetches all questions, runs the BasicAgent on them, submits all answers,
and displays the results.
"""
# --- Determine HF Space Runtime URL and Repo URL ---
# Get the SPACE_ID for sending link to the code
space_id = os.getenv("SPACE_ID")
if profile:
username = f"{profile.username}"
print(f"User logged in: {username}")
else:
print("User not logged in.")
return "Please Login to Hugging Face with the button.", None
api_url = DEFAULT_API_URL
questions_url = f"{api_url}/questions"
submit_url = f"{api_url}/submit"
# 1. Instantiate Agent ( modify this part to create your agent)
try:
agent = BasicAgent()
except Exception as e:
print(f"Error instantiating agent: {e}")
return f"Error initializing agent: {e}", None
# In the case of an app running as a hugging Face space, this link points toward your codebase ( usefull for others so please keep it public)
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main"
print(agent_code)
# 2. Fetch Questions
print(f"Fetching questions from: {questions_url}")
try:
response = requests.get(questions_url, timeout=15)
response.raise_for_status()
questions_data = response.json()
if not questions_data:
print("Fetched questions list is empty.")
return "Fetched questions list is empty or invalid format.", None
print(f"Fetched {len(questions_data)} questions.")
# Save questions data to a JSON file
current_time = datetime.datetime.now().strftime("%Y%m%d_%H%M")
questions_filename = f"questions_data_{current_time}.json"
with open(questions_filename, 'w') as f:
json.dump(questions_data, f, indent=4)
print(f"Saved questions data to {questions_filename}")
except requests.exceptions.RequestException as e:
print(f"Error fetching questions: {e}")
return f"Error fetching questions: {e}", None
except requests.exceptions.JSONDecodeError as e:
print(f"Error decoding JSON response from questions endpoint: {e}")
print(f"Response text: {response.text[:500]}")
return f"Error decoding server response for questions: {e}", None
except Exception as e:
print(f"An unexpected error occurred fetching questions: {e}")
return f"An unexpected error occurred fetching questions: {e}", None
# 3. Run your Agent
results_log = []
answers_payload = []
# Load previous results if available
previous_results = {}
current_script_dir = os.path.dirname(os.path.abspath(__file__))
results_log_status_path = os.path.join(
current_script_dir, 'results_log_status.json')
try:
if os.path.exists(results_log_status_path):
with open(results_log_status_path, 'r', encoding='utf-8') as f:
previous_results_list = json.load(f)
# Convert to dictionary with task_id as key for easier lookup
for item in previous_results_list:
previous_results[item.get("Task ID")] = item
print(f"Loaded {len(previous_results)} previous results")
else:
return f"{results_log_status_path} not found", None
except Exception as e:
print(f"Error loading previous results: {e}")
# Continue without previous results if there's an error
return "results_log_status.json not found", None
print(f"Running agent on {len(questions_data)} questions...")
for idx, item in enumerate(questions_data):
#if idx == 6:
# break
task_id = item.get("task_id")
question_text = item.get("question")
file_name = item.get("file_name", None)
if file_name:
file_name = os.path.join(
GAIA_LEVEL1_VALIDATION_FILES_PATH, file_name)
# Skip the "continue" statement that was in the original code
if not task_id or question_text is None:
print(f"Skipping item with missing task_id or question: {item}")
continue
# Check if we already have an answer for this task
if task_id in previous_results:
submitted_answer = previous_results[task_id].get(
"Submitted Answer")
print(f"Using cached result for task {task_id}")
answers_payload.append(
{"task_id": task_id, "submitted_answer": submitted_answer})
results_log.append({
"Task ID": task_id,
"Question": question_text,
"Submitted Answer": submitted_answer,
"recycled": "true"
})
else:
try:
submitted_answer = agent(question_text, file_name)
answers_payload.append(
{"task_id": task_id, "submitted_answer": submitted_answer})
results_log.append({
"Task ID": task_id,
"Question": question_text,
"Submitted Answer": submitted_answer
})
except Exception as e:
print(f"Error running agent on task {task_id}: {e}")
results_log.append({
"Task ID": task_id,
"Question": question_text,
"Submitted Answer": f"AGENT ERROR: {e}"
})
# Save progress after each question
with open('results_log_progress.json', 'w') as f:
json.dump(results_log, f, indent=4)
if not answers_payload:
print("Agent did not produce any answers to submit.")
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log)
# 4. Prepare Submission
submission_data = {"username": username.strip(
), "agent_code": agent_code, "answers": answers_payload}
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..."
print(status_update)
# 5. Submit
print(f"Submitting {len(answers_payload)} answers to: {submit_url}")
try:
response = requests.post(submit_url, json=submission_data, timeout=60)
response.raise_for_status()
result_data = response.json()
final_status = (
f"Submission Successful!\n"
f"User: {result_data.get('username')}\n"
f"Overall Score: {result_data.get('score', 'N/A')}% "
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n"
f"Message: {result_data.get('message', 'No message received.')}"
)
print("Submission successful.")
results_df = pd.DataFrame(results_log)
return final_status, results_df
except requests.exceptions.HTTPError as e:
error_detail = f"Server responded with status {e.response.status_code}."
try:
error_json = e.response.json()
error_detail += f" Detail: {error_json.get('detail', e.response.text)}"
except requests.exceptions.JSONDecodeError:
error_detail += f" Response: {e.response.text[:500]}"
status_message = f"Submission Failed: {error_detail}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except requests.exceptions.Timeout:
status_message = "Submission Failed: The request timed out."
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except requests.exceptions.RequestException as e:
status_message = f"Submission Failed: Network error - {e}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
except Exception as e:
status_message = f"An unexpected error occurred during submission: {e}"
print(status_message)
results_df = pd.DataFrame(results_log)
return status_message, results_df
# --- Build Gradio Interface using Blocks ---
with gr.Blocks() as demo:
gr.Markdown("# Basic Agent Evaluation Runner")
gr.Markdown(
"""
**Instructions:**
1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc ...
2. Log in to your Hugging Face account using the button below. This uses your HF username for submission.
3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score.
---
**Disclaimers:**
Once clicking on the "submit button, it can take quite some time ( this is the time for the agent to go through all the questions).
This space provides a basic setup and is intentionally sub-optimal to encourage you to develop your own, more robust solution. For instance for the delay process of the submit button, a solution could be to cache the answers and submit in a seperate action or even to answer the questions in async.
"""
)
gr.LoginButton()
run_button = gr.Button("Run Evaluation & Submit All Answers")
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False)
# Removed max_rows=10 from DataFrame constructor
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True)
run_button.click(
fn=run_and_submit_all,
outputs=[status_output, results_table]
)
if __name__ == "__main__":
print("\n" + "-"*30 + " App Starting " + "-"*30)
# Check for SPACE_HOST and SPACE_ID at startup for information
space_host_startup = os.getenv("SPACE_HOST")
space_id_startup = os.getenv("SPACE_ID") # Get SPACE_ID at startup
if space_host_startup:
print(f"✅ SPACE_HOST found: {space_host_startup}")
print(f" Runtime URL should be: https://{space_host_startup}.hf.space")
else:
print("ℹ️ SPACE_HOST environment variable not found (running locally?).")
if space_id_startup: # Print repo URLs if SPACE_ID is found
print(f"✅ SPACE_ID found: {space_id_startup}")
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}")
print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main")
else:
print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.")
print("-"*(60 + len(" App Starting ")) + "\n")
print("Launching Gradio Interface for Basic Agent Evaluation...")
demo.launch(debug=True, share=False)