Spaces:
Sleeping
Sleeping
import re | |
import os | |
import gradio as gr | |
import requests | |
import inspect | |
import datetime | |
from markdownify import markdownify | |
import textwrap | |
from textwrap import dedent | |
import pandas as pd | |
import wikipedia | |
import requests | |
from requests.exceptions import RequestException | |
from youtube_transcript_api import YouTubeTranscriptApi | |
from urllib.parse import urlparse, parse_qs | |
import json | |
from dotenv import load_dotenv | |
# Import smolagents components | |
from smolagents import ( | |
tool, | |
CodeAgent, | |
HfApiModel, | |
DuckDuckGoSearchTool, | |
FinalAnswerTool, | |
OpenAIServerModel, | |
ToolCallingAgent, | |
) | |
#from smolagents.tools.transcriber import TranscriberTool | |
from pypdf import PdfReader | |
# Load environment variables from .env file | |
load_dotenv() | |
# (Keep Constants as is) | |
# --- Constants --- | |
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
# --- Basic Agent Definition --- | |
# ----- THIS IS WERE YOU CAN BUILD WHAT YOU WANT ------ | |
# Get the directory of the current script | |
current_dir = os.path.dirname(os.path.abspath(__file__)) | |
# Create path to the validation directory | |
GAIA_LEVEL1_VALIDATION_FILES_PATH = os.path.join( | |
current_dir, "GAIA_level1", "validation") | |
# Initialize the search tool | |
search_tool = DuckDuckGoSearchTool() | |
def load_docx_file(file_path: str) -> str: | |
""" | |
Loads and returns text and tables from a DOCX file in their original order. | |
Args: | |
file_path: Path to the .docx file | |
Returns: | |
String with paragraphs and markdown-formatted tables in document order. | |
""" | |
from docx import Document | |
from docx.table import Table | |
from docx.text.paragraph import Paragraph | |
doc = Document(file_path) | |
content = [] | |
table_count = 0 | |
# Helper function to convert a table to markdown | |
def table_to_markdown(table, table_idx): | |
rows = [] | |
for row in table.rows: | |
cells = [cell.text.strip() for cell in row.cells] | |
rows.append("| " + " | ".join(cells) + " |") | |
# Add markdown separator after header if table has at least one row | |
if rows: | |
separator = "| " + \ | |
" | ".join(["---"] * len(table.rows[0].cells)) + " |" | |
markdown = f"\n### Table {table_idx}\n" + \ | |
"\n".join([rows[0], separator] + rows[1:]) | |
return markdown | |
return "" | |
# Iterate through the document's block elements in order | |
for block in doc.element.body: | |
if block.tag.endswith('}p'): # Paragraph | |
para = Paragraph(block, doc) | |
text = para.text.strip() | |
if text: | |
content.append(text) | |
elif block.tag.endswith('}tbl'): # Table | |
table_count += 1 | |
table = Table(block, doc) | |
markdown = table_to_markdown(table, table_count) | |
if markdown: | |
content.append(markdown) | |
return "\n\n".join(content) | |
def load_pdf_file(file_path: str) -> str: | |
""" | |
Loads and returns text content from a PDF file. | |
Args: | |
file_path (str): The path to the .pdf file. | |
Returns: | |
str: The extracted text content from the PDF file. Returns an error message if the file cannot be processed. | |
""" | |
extracted_text = [] | |
try: | |
# Check if the file exists | |
if not os.path.exists(file_path): | |
return f"Error: PDF file not found at path: {file_path}" | |
# Open the PDF file | |
reader = PdfReader(file_path) | |
# Iterate through each page and extract text | |
for page_num, page in enumerate(reader.pages): | |
text = page.extract_text() | |
if text: # Ensure text was extracted | |
extracted_text.append(f"--- Page {page_num + 1} ---\n{text.strip()}") | |
else: | |
extracted_text.append(f"--- Page {page_num + 1} --- (No text extracted)") | |
# Join the text from all pages | |
full_text = "\n\n".join(extracted_text) | |
if not full_text.strip(): | |
return f"Warning: No text could be extracted from the PDF file: {file_path}" | |
return full_text | |
except Exception as e: | |
# Catch any other exceptions during PDF processing | |
return f"Error processing PDF file '{file_path}': {type(e).__name__}: {e}" | |
def load_xlsx_file_as_markdown(file_path: str) -> str: | |
""" | |
Loads data from all sheets of an XLSX file and returns it as a single | |
markdown-formatted string. | |
Args: | |
file_path (str): The path to the .xlsx file. | |
Returns: | |
str: A string containing the data from all sheets, formatted as markdown tables. | |
Returns an error message if the file cannot be processed. | |
""" | |
extracted_content = [] | |
try: | |
# Check if the file exists | |
if not os.path.exists(file_path): | |
return f"Error: XLSX file not found at path: {file_path}" | |
# Read all sheets from the Excel file into a dictionary of DataFrames | |
# sheet_name=None reads all sheets | |
excel_data = pd.read_excel(file_path, sheet_name=None) | |
if not excel_data: | |
return f"Warning: No sheets found or the XLSX file is empty: {file_path}" | |
# Iterate through each sheet and convert its DataFrame to markdown | |
for sheet_name, df in excel_data.items(): | |
if not df.empty: | |
# Convert DataFrame to markdown table string, excluding the index | |
markdown_table = df.to_markdown(index=False) | |
extracted_content.append(f"--- Sheet: {sheet_name} ---\n{markdown_table}") | |
else: | |
extracted_content.append(f"--- Sheet: {sheet_name} --- (Sheet is empty)") | |
# Join the content from all sheets | |
full_content = "\n\n".join(extracted_content) | |
if not full_content.strip(): | |
return f"Warning: No data could be extracted from the XLSX file: {file_path}" | |
return full_content | |
except FileNotFoundError: | |
return f"Error: XLSX file not found at path: {file_path}" | |
except Exception as e: | |
# Catch pandas-specific errors or other general exceptions | |
return f"Error processing XLSX file '{file_path}': {type(e).__name__}: {e}" | |
def load_xlsx_file_as_dataframe(file_path: str) -> pd.DataFrame: | |
""" | |
Loads data from the first sheet of an XLSX file and returns it as a pandas DataFrame. | |
Args: | |
file_path (str): The path to the .xlsx file. | |
Returns: | |
pd.DataFrame: A pandas DataFrame containing the data from the first sheet. | |
Returns an empty DataFrame if the file is empty or cannot be processed. | |
""" | |
try: | |
# Check if the file exists | |
if not os.path.exists(file_path): | |
print( | |
f"Warning: XLSX file not found at path: {file_path}. Returning empty DataFrame.") | |
return pd.DataFrame() | |
# Read the first sheet (index 0) from the Excel file | |
# If the first sheet is empty or doesn't exist, it might raise an error or return empty | |
# Using try-except to handle cases where the sheet might not exist or is unreadable | |
df = pd.read_excel(file_path, sheet_name=0) | |
# Return the DataFrame (it will be empty if the sheet was empty) | |
return df | |
except FileNotFoundError: | |
# Return empty DataFrame if file not found (alternative to raising error) | |
print( | |
f"Warning: XLSX file not found at path: {file_path}. Returning empty DataFrame.") | |
return pd.DataFrame() | |
except Exception as e: | |
# Catch pandas-specific errors or other general exceptions | |
print( | |
f"Error processing XLSX file '{file_path}': {type(e).__name__}: {e}. Returning empty DataFrame.") | |
return pd.DataFrame() # Return empty DataFrame on error | |
def visit_webpage(url: str) -> str: | |
"""Visits a webpage at the given URL and returns its content as a markdown string. | |
Args: | |
url: The URL of the webpage to visit. | |
Returns: | |
The content of the webpage converted to Markdown, or an error message if the request fails. | |
""" | |
try: | |
# Send a GET request to the URL | |
response = requests.get(url, timeout=10) | |
response.raise_for_status() # Raise an exception for bad status codes | |
# Convert the HTML content to Markdown | |
markdown_content = markdownify(response.text).strip() | |
# Remove multiple line breaks | |
markdown_content = re.sub(r"\n{3,}", "\n\n", markdown_content) | |
return markdown_content | |
except RequestException as e: | |
return f"Error fetching the webpage: {str(e)}" | |
except Exception as e: | |
return f"An unexpected error occurred: {str(e)}" | |
def query_wikipedia(query: str, sentences: int = 5) -> str: | |
""" | |
Searches Wikipedia for a given query and returns a summary of the most relevant page. | |
Use this tool especially when Wikipedia is mentioned in the context. | |
Args: | |
query (str): The search term or question for Wikipedia. | |
sentences (int): The desired number of sentences for the summary (default: 5). | |
Returns: | |
str: A summary of the Wikipedia page, a list of options if the query is ambiguous, | |
or an error message if the page is not found or another error occurs. | |
""" | |
try: | |
# Set language if needed, defaults to English | |
# wikipedia.set_lang("en") | |
# auto_suggest=False prevents Wikipedia from guessing if the exact title isn't found. | |
# We handle potential suggestions in the PageError exception if needed. | |
summary = wikipedia.summary( | |
query, sentences=sentences, auto_suggest=False) | |
# Optionally, get the actual page title found | |
try: | |
page_title = wikipedia.page(query, auto_suggest=False).title | |
return f"Wikipedia Summary for '{page_title}':\n\n{summary}" | |
except wikipedia.exceptions.PageError: | |
# If getting the page title fails after summary worked (unlikely but possible) | |
return f"Wikipedia Summary (Query: '{query}'):\n\n{summary}" | |
except wikipedia.exceptions.DisambiguationError as e: | |
# If getting the page title causes disambiguation after summary worked | |
return f"Wikipedia Summary (Query: '{query}'):\n\n{summary}\n\nNote: Query might be ambiguous. Options include: {e.options}" | |
except wikipedia.exceptions.DisambiguationError as e: | |
# Handle cases where the query matches multiple pages | |
options_list = "\n - ".join(e.options[:10]) # Limit to 10 options | |
return (f"Wikipedia query '{query}' is ambiguous. " | |
f"Please be more specific or choose from these options:\n - {options_list}") | |
except wikipedia.exceptions.PageError: | |
# Handle cases where the page doesn't exist | |
# Try searching for suggestions | |
search_results = wikipedia.search(query, results=5) | |
if search_results: | |
suggestions = "\n - ".join(search_results) | |
return (f"Wikipedia page for '{query}' not found. " | |
f"Did you mean one of these?\n - {suggestions}") | |
else: | |
return f"Wikipedia page for '{query}' not found, and no suggestions available." | |
except Exception as e: | |
# Handle other potential errors (network issues, etc.) | |
return f"Error querying Wikipedia for '{query}': {type(e).__name__}: {e}" | |
def openai_reasoning(question: str) -> str: | |
""" | |
Uses OpenAI's GPT-4o model for in-depth reasoning and analysis of complex questions. | |
Use this for riddles, puzzles, or questions that require deep thinking rather than code execution. | |
Args: | |
question: The question or problem to analyze using GPT-4o's reasoning capabilities. | |
Returns: | |
The reasoned answer to the question. | |
""" | |
try: | |
# Create a specialized reasoning model instance | |
reasoning_model = OpenAIServerModel( | |
"gpt-4o", | |
max_tokens=1024, | |
temperature=0.05 | |
) | |
# Craft effective system and user prompts | |
messages = [ | |
{ | |
"role": "system", | |
"content": """You are an expert reasoning engine specialized in solving complex problems, puzzles and riddles. | |
When tackling problems: | |
1. Understand the question thoroughly | |
2. Break down complex problems into parts | |
3. Consider multiple approaches before deciding on a solution | |
4. Think step by step | |
5. Provide only the final answer unless asked for reasoning | |
Be precise and concise in your final response.""" | |
}, | |
{ | |
"role": "user", | |
"content": question | |
} | |
] | |
# Get the response | |
response = reasoning_model(messages) | |
# Return just the content from the response | |
return response.content | |
except Exception as e: | |
return f"Error when processing with reasoning model: {str(e)}" | |
def extract_youtube_id(url: str) -> str: | |
""" | |
Extract the YouTube video ID from a URL. | |
Args: | |
url: The YouTube video URL (may contain spaces or formatting issues) | |
Returns: | |
The YouTube video ID | |
""" | |
# Clean the URL by removing extra spaces | |
cleaned_url = url.replace(" ", "") | |
try: | |
# Handle different YouTube URL formats | |
parsed_url = urlparse(cleaned_url) | |
# Check for video ID in query parameters (youtube.com/watch?v=VIDEO_ID) | |
query_params = parse_qs(parsed_url.query) | |
if 'v' in query_params: | |
return query_params['v'][0] | |
# Check for youtu.be short links (youtu.be/VIDEO_ID) | |
if 'youtu.be' in parsed_url.netloc: | |
path = parsed_url.path.strip('/') | |
return path | |
# Check for embedded format (youtube.com/embed/VIDEO_ID) | |
if '/embed/' in parsed_url.path: | |
return parsed_url.path.split('/embed/')[1] | |
# If URL parsing fails, try regex patterns | |
patterns = [ | |
r'(?:youtube\.com\/watch\?v=|youtu\.be\/|youtube\.com\/embed\/|youtube\.com\/v\/|youtube\.com\/e\/|youtube\.com\/watch\?.*v=|youtube\.com\/watch\?.*&v=)([^&\s]+)', | |
r'(?:youtube\.com\/shorts\/)([^&\s]+)', | |
r'v=([^&\s]+)' | |
] | |
for pattern in patterns: | |
match = re.search(pattern, cleaned_url) | |
if match: | |
return match.group(1) | |
# Try to extract directly from the raw string as a last resort | |
if 'v=' in cleaned_url: | |
v_index = cleaned_url.find('v=') | |
video_id = cleaned_url[v_index + | |
2:].split('&')[0].split('#')[0].split('?')[0].split('/')[0] | |
# YouTube IDs are typically 11 characters | |
if video_id and len(video_id) in range(10, 12): | |
return video_id | |
return "Could not extract a valid YouTube video ID from the provided URL." | |
except Exception as e: | |
# Attempt direct extraction if parsing fails | |
if 'v=' in url: | |
parts = url.split('v=') | |
if len(parts) > 1: | |
return parts[1].split('&')[0].split('#')[0].strip() | |
return f"Error extracting YouTube ID: {str(e)}" | |
def get_youtube_transcript(video_id: str, language: str = "en") -> str: | |
""" | |
Get the transcript of a YouTube video. | |
Args: | |
video_id: The YouTube video ID | |
language: The language code for the transcript (default: 'en' for English) | |
Returns: | |
The transcript text of the YouTube video | |
""" | |
try: | |
transcript_list = YouTubeTranscriptApi.get_transcript( | |
video_id, languages=[language]) | |
# Combine all transcript segments into a single text | |
transcript_text = "" | |
for segment in transcript_list: | |
transcript_text += segment['text'] + " " | |
return transcript_text.strip() | |
except Exception as e: | |
return f"Error retrieving transcript: {str(e)}" | |
def load_text_file(file_path: str, detect_format: bool = True) -> str: | |
""" | |
Loads a text file and optionally detects and processes its format (plain text, code, JSON, etc.). | |
Args: | |
file_path: Path to the text file to load | |
detect_format: Whether to automatically detect and process the format (default: True) | |
Returns: | |
String containing the file content, possibly formatted based on detected type | |
""" | |
if not os.path.exists(file_path): | |
return f"Error: File not found at {file_path}" | |
try: | |
# Read the file content | |
with open(file_path, 'r', encoding='utf-8') as file: | |
content = file.read() | |
if not detect_format: | |
return f"File content ({os.path.basename(file_path)}):\n\n{content}" | |
# Get file extension | |
_, ext = os.path.splitext(file_path) | |
ext = ext.lower() | |
# Handle based on file extension or content detection | |
if ext in ['.json', '.geojson']: | |
# Process JSON | |
try: | |
parsed_json = json.loads(content) | |
formatted_json = json.dumps(parsed_json, indent=2) | |
return f"JSON content ({os.path.basename(file_path)}):\n\n{formatted_json}" | |
except json.JSONDecodeError: | |
return f"Warning: File has JSON extension but content is not valid JSON.\n\n{content}" | |
elif ext in ['.py', '.js', '.ts', '.java', '.c', '.cpp', '.cs', '.php', '.rb', '.go', '.rs', '.swift']: | |
# It's a code file, return with appropriate formatting | |
return f"Code file ({os.path.basename(file_path)}, {ext[1:]} language):\n\n{content}" | |
elif ext in ['.csv', '.tsv']: | |
# Handle CSV/TSV files with preview | |
lines = content.strip().split('\n') | |
preview_lines = lines[:min(10, len(lines))] | |
preview = '\n'.join(preview_lines) | |
if len(lines) > 10: | |
preview += f"\n\n[...and {len(lines) - 10} more lines]" | |
return f"Tabular data file ({os.path.basename(file_path)}):\n\n{preview}" | |
else: | |
# Try to detect JSON content regardless of extension | |
if content.strip().startswith('{') and content.strip().endswith('}'): | |
try: | |
parsed_json = json.loads(content) | |
formatted_json = json.dumps(parsed_json, indent=2) | |
return f"Detected JSON content ({os.path.basename(file_path)}):\n\n{formatted_json}" | |
except json.JSONDecodeError: | |
pass # Not valid JSON, continue with other detection | |
# Try to detect if it might be code | |
code_indicators = [ | |
'def ', 'class ', 'function ', 'import ', 'from ', 'var ', 'let ', 'const ', | |
'#include', 'package ', 'using ', 'public class', 'int main' | |
] | |
if any(indicator in content for indicator in code_indicators): | |
language = "unknown programming language" | |
return f"Detected code file ({os.path.basename(file_path)}, {language}):\n\n{content}" | |
# Check if it's XML/HTML-like | |
if re.search(r'<\w+>.*?</\w+>', content, re.DOTALL) or content.strip().startswith('<?xml'): | |
return f"Markup language file ({os.path.basename(file_path)}):\n\n{content}" | |
# Default to plain text | |
return f"Plain text file ({os.path.basename(file_path)}):\n\n{content}" | |
except UnicodeDecodeError: | |
# Try different encoding if UTF-8 fails | |
try: | |
with open(file_path, 'r', encoding='latin-1') as file: | |
content = file.read() | |
return f"File content ({os.path.basename(file_path)}, non-UTF-8 encoding):\n\n{content}" | |
except Exception as e: | |
return f"Error reading file (encoding issues): {str(e)}" | |
except Exception as e: | |
return f"Error reading file: {str(e)}" | |
def transcribe_with_whisper(audio_path: str) -> str: | |
""" | |
Transcribes audio using OpenAI's Whisper model. | |
Args: | |
audio_path: Path to the audio file to transcribe | |
Returns: | |
The transcribed text | |
""" | |
if not os.path.exists(audio_path): | |
return f"Error: Audio file not found at {audio_path}" | |
# Try with faster-whisper if available, fall back to whisper | |
try: | |
try: | |
# Try with faster-whisper first (it's faster) | |
from faster_whisper import WhisperModel | |
model = WhisperModel("medium.en", device="cpu") | |
segments, info = model.transcribe(audio_path) | |
transcript = " ".join([segment.text for segment in segments]) | |
return transcript | |
except ImportError: | |
# Fall back to original whisper | |
#import whisper | |
#model = whisper.load_model("small") | |
#result = model.transcribe(audio_path) | |
#return result["text"] | |
return "Failed to transcribe audio with faster-whisper" | |
except Exception as e: | |
return f"Error transcribing audio: {str(e)}" | |
class BasicAgent: | |
def __init__(self): | |
print("BasicAgent initialized.") | |
self.store_questions_to_log_file = False | |
# Create a filename with current date and time | |
current_time = datetime.datetime.now().strftime("%Y%m%d_%H%M") | |
self.filename = f"questions_{current_time}.txt" | |
if self.store_questions_to_log_file: | |
print(f"Questions will be written to {self.filename}") | |
# Clear the file if it exists or create a new one | |
with open(self.filename, 'w', encoding='utf-8') as f: | |
f.write('') # Create empty file | |
# Initialize the Large Language Model | |
# The model is used by both agents in this simple setup | |
# mistralai/Mixtral-8x7B-Instruct-v0.1 | |
# meta-llama/Llama-3.3-70B-Instruct | |
self.model = HfApiModel(model_id="Qwen/Qwen2.5-Coder-32B-Instruct") | |
#self.model = HfApiModel(model_id="mistralai/Mixtral-8x7B-Instruct-v0.1") | |
# For TGI container | |
# self.model = OpenAIServerModel( | |
# api_base="http://localhost:8080/v1", # If using TGI container | |
# api_key="not-needed", # Local servers usually don't need API keys | |
# model_id="Qwen/Qwen3-1.7B" | |
# ) | |
# self.model = LiteLLMModel( | |
# model_name="ollama/qwen3:1.7b", # Prefix with 'ollama/' to use the Ollama provider | |
# api_base="http://localhost:11434", # Your custom Ollama port | |
# flatten_messages_as_text=True, | |
# api_key="", # Try passing an empty API key | |
# ) | |
#print(self.model) | |
#print(f"Model Name: {self.model.model_name}") | |
#print(f"API Base: {self.model.api_base}") | |
# Define the Web Search Agent | |
# This agent is specialised for searching the web using a specific tool | |
# self.web_search_agent = CodeAgent( | |
# model=self.model, # Assign the model to the agent [ | |
# tools=[DuckDuckGoSearchTool(), | |
# FinalAnswerTool()], # Provide the web search tool | |
# name="web_search_agent", # Give the agent a name | |
# # Describe its capability [ | |
# description="""Searches the web for information. | |
# In the end you have to return a final answer using the `final_answer` tool.""", | |
# verbosity_level=1, # Set verbosity level for logging | |
# max_steps=3, # Limit the steps the agent can take | |
# planning_interval=2, | |
# ) | |
self.web_search_specialist_agent = ToolCallingAgent( | |
model=self.model, # Or any other compatible model instance | |
tools=[ | |
DuckDuckGoSearchTool(), | |
query_wikipedia, # Make sure this is the @tool decorated function | |
visit_webpage, | |
FinalAnswerTool() | |
], | |
name="web_search_specialist_agent", | |
description=textwrap.dedent("""\ | |
This agent specializes in finding information on the web and answering questions based on web content. | |
**Core Strategy:** | |
1. **Understand & Plan:** For any query, especially complex ones or those requiring information from specific sources, first formulate a clear, step-by-step plan. Think about what information is needed and which tools are best for each step. | |
2. **Execute & Adapt:** Execute your plan step-by-step. After each step, review the results and adapt your plan if necessary. | |
3. **Extract & Synthesize:** Once relevant information is found (e.g., on a webpage), don't just return raw data. Carefully extract the specific piece of information that answers the original question. | |
**Tool Usage Guidelines:** | |
- Use the `DuckDuckGoSearchTool` for general web searches, to find broad information, current events, or to locate specific websites or pages when the URL is unknown. | |
- If `DuckDuckGoSearchTool` returns URLs, evaluate them. If a URL seems promising for answering the question, a subsequent step in your plan should be to use the `visit_webpage` tool. | |
- Use the `query_wikipedia` tool when the question specifically asks for information from Wikipedia, or when Wikipedia is clearly the most authoritative source (e.g., for definitions, historical events, biographical information). | |
- Use the `visit_webpage` tool to get the content of a specific URL. | |
- **Crucially**: After using `visit_webpage`, your next step is to analyze its content and extract the precise information needed to answer the query. Do not just output the entire page content as the answer. | |
- If the query explicitly mentions a specific website (e.g., "Merriam-Webster", "Cornell Law School website"), your plan should prioritize searching that site. | |
- Use `DuckDuckGoSearchTool` with site-specific queries (e.g., "site:merriam-webster.com <your actual query terms>"). | |
- If a direct URL from that site is found or can be inferred, use `visit_webpage` to get the content, then extract the specific information. | |
**Search & Iteration Tactics:** | |
- Before taking a new action, review the information and results from your previous steps. Use this history to inform your decisions and refine your plan. | |
- Do not repeat the exact same or very similar queries to the same tool if the initial attempt did not yield useful information. Use knowledge from past attempts to refine your strategy. | |
- If information is not found, consider: rephrasing your query, trying a different aspect of the question, or using an alternative search tool, always considering what you've learned. | |
- Be aware of date format sensitivity in searches. If a date is part of your query, try alternative formats (e.g., "27 July 2010" vs "27/7/2010"). | |
**Final Output:** | |
- In the end, you must return a final answer using the `final_answer` tool, based on the information you have gathered and processed according to your plan. | |
"""), | |
verbosity_level=1, # Adjust as needed | |
max_steps=3, # Adjust as needed | |
planning_interval=1 # Adjust as needed | |
) | |
# Define your model | |
self.code_model = "gpt-4.1" # or whatever model you're using | |
reasoning_model = OpenAIServerModel( | |
self.code_model, | |
max_completion_tokens=8096 | |
) | |
# Create your agent with the reasoning tool and other tools | |
self.reasoning_agent = ToolCallingAgent( | |
model=reasoning_model, | |
tools=[openai_reasoning, FinalAnswerTool()], | |
planning_interval=2, | |
max_steps=5, | |
verbosity_level=1, | |
name="reasoning_agent", | |
description="""Solves complex problems riddles and puzzles through reasoning rather than code execution. | |
In the end you have to return a final answer using the `final_answer` tool.""" | |
) | |
self.youtube_qa_agent = ToolCallingAgent( | |
model=reasoning_model, #self.model, | |
tools=[extract_youtube_id, get_youtube_transcript, | |
FinalAnswerTool()], | |
name="youtube_qa_agent", | |
planning_interval=2, | |
max_steps=5, | |
verbosity_level=1, | |
description=textwrap.dedent("""\ | |
You are an expert assistant that can answer questions about YouTube videos by analyzing their transcripts. | |
When given a YouTube URL and a question, follow these steps IN ORDER: | |
1. Extract the video ID from the URL using the `extract_youtube_id` tool | |
2. Retrieve the transcript of the video using the `get_youtube_transcript` tool | |
3. Provide a clear and concise answer based solely on the transcript content | |
4. Return your final answer using the `final_answer` tool | |
IMPORTANT INSTRUCTIONS: | |
- After getting the transcript, you MUST use the analyze_transcript tool. DO NOT call get_youtube_transcript twice. | |
- Never skip the analysis step - it's crucial for answering the question correctly. | |
- Each tool must be used in the correct sequence - ID extraction, then transcript retrieval, then analysis. | |
If you cannot find a direct answer to the question in the transcript: | |
- Acknowledge that you couldn't find a specific answer | |
- Provide the transcript for reference | |
- Suggest that the user might want to use a different approach | |
DO NOT run the same tool with the same arguments multiple times. | |
DO NOT make up information that is not in the transcript. | |
""") | |
) | |
self.python_code_executer = CodeAgent( | |
model=reasoning_model, | |
tools=[load_text_file, | |
FinalAnswerTool()], | |
name="python_code_executer", | |
description=textwrap.dedent("""\ | |
You are an expert assistant that can execute Python code. | |
Execute the Python code and return the final answer using the `final_answer` tool. | |
"""), | |
additional_authorized_imports=["json", "re", "pandas", "numpy", "math", "collections", "itertools", "stat", "statistics", "queue", "unicodedata", "time", "random", "datetime"], | |
verbosity_level=1, | |
max_steps=5, | |
planning_interval=1, | |
#executor_type="e2b", | |
#use_e2b_executor=True | |
) | |
# Initialize the agent with the transcription tool | |
# self.transcriber_agent = CodeAgent( | |
# model=reasoning_model, | |
# tools=[], | |
# add_base_tools=True, | |
# name="transcriber_agent", | |
# description="Transcribes audio file to text, i.e. take audio file and generates the the transcription of the audio.", | |
# # Add any imports your agent might need | |
# additional_authorized_imports=["pandas", "numpy"], | |
# max_steps=5 | |
# ) | |
# Define the Manager Agent | |
# This agent manages tasks and delegates to other agents | |
self.manager_agent = CodeAgent( | |
model=self.model, # Assign the model to the manager | |
tools=[load_docx_file, | |
load_pdf_file, | |
load_xlsx_file_as_dataframe, | |
load_xlsx_file_as_markdown, | |
query_wikipedia, | |
load_text_file, | |
transcribe_with_whisper, | |
FinalAnswerTool()], | |
# Specify the agents this manager oversees | |
managed_agents=[self.web_search_specialist_agent, | |
self.reasoning_agent, | |
self.youtube_qa_agent, | |
self.python_code_executer], | |
#self.transcriber_agent], | |
name="manager_agent", # Give the manager agent a name | |
description="Manages tasks by delegating to other agents.", # Describe its role | |
additional_authorized_imports=[ | |
"json", "re", "pandas", "numpy", "math", "collections", "itertools", "stat", "statistics", "queue", "unicodedata", "time", "random", "datetime"], # Allow specific imports | |
verbosity_level=1, # Set verbosity level | |
max_steps=5, # Limit the steps | |
planning_interval=1, | |
#final_answer_checks=[] | |
) | |
print("MultiAgentSystem initialization complete.") | |
def __call__(self, question: str, | |
file_name: str = None) -> str: | |
print(f"Agent received question (first 50 chars): {question[:50]}...") | |
# For all other questions, use the manager agent with web search | |
# manager_prompt = dedent(f""" | |
# I need to answer the following question accurately: | |
# {question} | |
# Please analyze this question and determine the best approach to answer it. | |
# If needed, use web search to find relevant information. | |
# Provide a concise, accurate answer to the question. | |
# """) | |
manager_prompt = textwrap.dedent(f""" | |
I need to answer the following question accurately: | |
{question} | |
using the following file: '{file_name}' if provided. | |
Please analyze this question and determine the best approach to answer it | |
using the available agents and tools. | |
Note that you are provided with a special agent to resolve logical problems, riddles and puzzles named "reasoning_agent". | |
If needed, use any of the available tools to find or load the relevant information. | |
Provide a concise, accurate answer to the question. | |
""") | |
manager_agent_response = "I apologize, but I couldn't find an answer to this question." | |
source = "" | |
try: | |
manager_agent_response = self.manager_agent.run(manager_prompt) | |
source = "manager_agent" | |
# Check if the answer contains a missing tool warning | |
# if "Missing Tool Warning:" in manager_agent_response: | |
# return manager_agent_response | |
except Exception as e: | |
print(f"Error in manager agent: {e}") | |
source = f"Exception {e} " | |
# Append the question to the file | |
if self.store_questions_to_log_file: | |
with open(self.filename, 'a', encoding='utf-8') as f: | |
f.write(f"{question}\n") | |
f.write(f"ANSWER by {source}: {manager_agent_response}\n") | |
f.write(f"{'*'*50}\n") | |
print(f"Final answer: {manager_agent_response}") | |
return manager_agent_response | |
def run_and_submit_all(profile: gr.OAuthProfile | None): | |
""" | |
Fetches all questions, runs the BasicAgent on them, submits all answers, | |
and displays the results. | |
""" | |
# --- Determine HF Space Runtime URL and Repo URL --- | |
# Get the SPACE_ID for sending link to the code | |
space_id = os.getenv("SPACE_ID") | |
if profile: | |
username = f"{profile.username}" | |
print(f"User logged in: {username}") | |
else: | |
print("User not logged in.") | |
return "Please Login to Hugging Face with the button.", None | |
api_url = DEFAULT_API_URL | |
questions_url = f"{api_url}/questions" | |
submit_url = f"{api_url}/submit" | |
# 1. Instantiate Agent ( modify this part to create your agent) | |
try: | |
agent = BasicAgent() | |
except Exception as e: | |
print(f"Error instantiating agent: {e}") | |
return f"Error initializing agent: {e}", None | |
# In the case of an app running as a hugging Face space, this link points toward your codebase ( usefull for others so please keep it public) | |
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" | |
print(agent_code) | |
# 2. Fetch Questions | |
print(f"Fetching questions from: {questions_url}") | |
try: | |
response = requests.get(questions_url, timeout=15) | |
response.raise_for_status() | |
questions_data = response.json() | |
if not questions_data: | |
print("Fetched questions list is empty.") | |
return "Fetched questions list is empty or invalid format.", None | |
print(f"Fetched {len(questions_data)} questions.") | |
# Save questions data to a JSON file | |
current_time = datetime.datetime.now().strftime("%Y%m%d_%H%M") | |
questions_filename = f"questions_data_{current_time}.json" | |
with open(questions_filename, 'w') as f: | |
json.dump(questions_data, f, indent=4) | |
print(f"Saved questions data to {questions_filename}") | |
except requests.exceptions.RequestException as e: | |
print(f"Error fetching questions: {e}") | |
return f"Error fetching questions: {e}", None | |
except requests.exceptions.JSONDecodeError as e: | |
print(f"Error decoding JSON response from questions endpoint: {e}") | |
print(f"Response text: {response.text[:500]}") | |
return f"Error decoding server response for questions: {e}", None | |
except Exception as e: | |
print(f"An unexpected error occurred fetching questions: {e}") | |
return f"An unexpected error occurred fetching questions: {e}", None | |
# 3. Run your Agent | |
results_log = [] | |
answers_payload = [] | |
# Load previous results if available | |
previous_results = {} | |
current_script_dir = os.path.dirname(os.path.abspath(__file__)) | |
results_log_status_path = os.path.join( | |
current_script_dir, 'results_log_status.json') | |
try: | |
if os.path.exists(results_log_status_path): | |
with open(results_log_status_path, 'r', encoding='utf-8') as f: | |
previous_results_list = json.load(f) | |
# Convert to dictionary with task_id as key for easier lookup | |
for item in previous_results_list: | |
previous_results[item.get("Task ID")] = item | |
print(f"Loaded {len(previous_results)} previous results") | |
else: | |
return f"{results_log_status_path} not found", None | |
except Exception as e: | |
print(f"Error loading previous results: {e}") | |
# Continue without previous results if there's an error | |
return "results_log_status.json not found", None | |
print(f"Running agent on {len(questions_data)} questions...") | |
for idx, item in enumerate(questions_data): | |
#if idx == 6: | |
# break | |
task_id = item.get("task_id") | |
question_text = item.get("question") | |
file_name = item.get("file_name", None) | |
if file_name: | |
file_name = os.path.join( | |
GAIA_LEVEL1_VALIDATION_FILES_PATH, file_name) | |
# Skip the "continue" statement that was in the original code | |
if not task_id or question_text is None: | |
print(f"Skipping item with missing task_id or question: {item}") | |
continue | |
# Check if we already have an answer for this task | |
if task_id in previous_results: | |
submitted_answer = previous_results[task_id].get( | |
"Submitted Answer") | |
print(f"Using cached result for task {task_id}") | |
answers_payload.append( | |
{"task_id": task_id, "submitted_answer": submitted_answer}) | |
results_log.append({ | |
"Task ID": task_id, | |
"Question": question_text, | |
"Submitted Answer": submitted_answer, | |
"recycled": "true" | |
}) | |
else: | |
try: | |
submitted_answer = agent(question_text, file_name) | |
answers_payload.append( | |
{"task_id": task_id, "submitted_answer": submitted_answer}) | |
results_log.append({ | |
"Task ID": task_id, | |
"Question": question_text, | |
"Submitted Answer": submitted_answer | |
}) | |
except Exception as e: | |
print(f"Error running agent on task {task_id}: {e}") | |
results_log.append({ | |
"Task ID": task_id, | |
"Question": question_text, | |
"Submitted Answer": f"AGENT ERROR: {e}" | |
}) | |
# Save progress after each question | |
with open('results_log_progress.json', 'w') as f: | |
json.dump(results_log, f, indent=4) | |
if not answers_payload: | |
print("Agent did not produce any answers to submit.") | |
return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) | |
# 4. Prepare Submission | |
submission_data = {"username": username.strip( | |
), "agent_code": agent_code, "answers": answers_payload} | |
status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." | |
print(status_update) | |
# 5. Submit | |
print(f"Submitting {len(answers_payload)} answers to: {submit_url}") | |
try: | |
response = requests.post(submit_url, json=submission_data, timeout=60) | |
response.raise_for_status() | |
result_data = response.json() | |
final_status = ( | |
f"Submission Successful!\n" | |
f"User: {result_data.get('username')}\n" | |
f"Overall Score: {result_data.get('score', 'N/A')}% " | |
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" | |
f"Message: {result_data.get('message', 'No message received.')}" | |
) | |
print("Submission successful.") | |
results_df = pd.DataFrame(results_log) | |
return final_status, results_df | |
except requests.exceptions.HTTPError as e: | |
error_detail = f"Server responded with status {e.response.status_code}." | |
try: | |
error_json = e.response.json() | |
error_detail += f" Detail: {error_json.get('detail', e.response.text)}" | |
except requests.exceptions.JSONDecodeError: | |
error_detail += f" Response: {e.response.text[:500]}" | |
status_message = f"Submission Failed: {error_detail}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except requests.exceptions.Timeout: | |
status_message = "Submission Failed: The request timed out." | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except requests.exceptions.RequestException as e: | |
status_message = f"Submission Failed: Network error - {e}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
except Exception as e: | |
status_message = f"An unexpected error occurred during submission: {e}" | |
print(status_message) | |
results_df = pd.DataFrame(results_log) | |
return status_message, results_df | |
# --- Build Gradio Interface using Blocks --- | |
with gr.Blocks() as demo: | |
gr.Markdown("# Basic Agent Evaluation Runner") | |
gr.Markdown( | |
""" | |
**Instructions:** | |
1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc ... | |
2. Log in to your Hugging Face account using the button below. This uses your HF username for submission. | |
3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score. | |
--- | |
**Disclaimers:** | |
Once clicking on the "submit button, it can take quite some time ( this is the time for the agent to go through all the questions). | |
This space provides a basic setup and is intentionally sub-optimal to encourage you to develop your own, more robust solution. For instance for the delay process of the submit button, a solution could be to cache the answers and submit in a seperate action or even to answer the questions in async. | |
""" | |
) | |
gr.LoginButton() | |
run_button = gr.Button("Run Evaluation & Submit All Answers") | |
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) | |
# Removed max_rows=10 from DataFrame constructor | |
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) | |
run_button.click( | |
fn=run_and_submit_all, | |
outputs=[status_output, results_table] | |
) | |
if __name__ == "__main__": | |
print("\n" + "-"*30 + " App Starting " + "-"*30) | |
# Check for SPACE_HOST and SPACE_ID at startup for information | |
space_host_startup = os.getenv("SPACE_HOST") | |
space_id_startup = os.getenv("SPACE_ID") # Get SPACE_ID at startup | |
if space_host_startup: | |
print(f"✅ SPACE_HOST found: {space_host_startup}") | |
print(f" Runtime URL should be: https://{space_host_startup}.hf.space") | |
else: | |
print("ℹ️ SPACE_HOST environment variable not found (running locally?).") | |
if space_id_startup: # Print repo URLs if SPACE_ID is found | |
print(f"✅ SPACE_ID found: {space_id_startup}") | |
print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") | |
print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") | |
else: | |
print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") | |
print("-"*(60 + len(" App Starting ")) + "\n") | |
print("Launching Gradio Interface for Basic Agent Evaluation...") | |
demo.launch(debug=True, share=False) |