import os from typing import Annotated, Optional, TypedDict import gradio as gr from langchain_core.messages import AnyMessage, HumanMessage, SystemMessage from langchain_openai import ChatOpenAI from langgraph.graph.message import add_messages from langgraph.graph import StateGraph, START from langgraph.prebuilt import tools_condition, ToolNode import requests import pandas as pd from langchain.tools import Tool from dotenv import load_dotenv from arxiv_searcher import ArxivSearcher from chess_algebraic_notation_retriever import ChessAlgebraicNotationMoveRetriever from excel_file_reader import ExcelFileReader from image_question_answer_tool import ImageQuestionAnswerTool from python_code_question_answer_tool import PythonCodeQuestionAnswerTool from tavily_searcher import TavilySearcher from transcriber import Transcriber from wikipedia_searcher import WikipediaSearcher from youtube_video_question_answer_tool import YoutubeVideoQuestionAnswerTool load_dotenv() # (Keep Constants as is) # --- Constants --- DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" ASSOCIATED_FILE_ENDPOINT = f"{DEFAULT_API_URL}/files/" # --- Basic Agent Definition --- # ----- THIS IS WERE YOU CAN BUILD WHAT YOU WANT ------ #search_tool = DuckDuckGoSearchRun() #search_tool = DuckDuckGoSearcherTool() def retrieve_task_file(task_id: str) -> Optional[bytes]: """ Retrieve the task file for a given task ID. """ try: response = requests.get(ASSOCIATED_FILE_ENDPOINT + task_id, timeout=15) response.raise_for_status() if response.status_code != 200: print(f"Error fetching file: {response.status_code}") return None #print(f"Fetched file: {response.content}") return response.content except requests.exceptions.RequestException as e: print(f"Error fetching file: {e}") return None except Exception as e: print(f"An unexpected error occurred fetching file: {e}") return None def retrieve_next_chess_move_in_algebraic_notation(task_file_path: str, is_black_turn: bool) -> str: """ Retrieve the next chess move in algebraic notation from an image path. """ if task_file_path is None: return "Error: Task file not found." # Retrieve the next chess move in algebraic notation next_chess_move = ChessAlgebraicNotationMoveRetriever().retrieve(task_file_path, is_black_turn) return next_chess_move # Initialize the tool retrieve_next_chess_move_in_algebraic_notation_tool = Tool( name="retrieve_next_chess_move_in_algebraic_notation", func=retrieve_next_chess_move_in_algebraic_notation, description="Retrieve the next chess move in algebraic notation from an image path." ) def transcribe_audio(file_path: str) -> str: if file_path is None: return "Error: Audio path not found." # Transcribe the audio return Transcriber().transcribe(file_path) # Initialize the tool transcribe_audio_tool = Tool( name="transcribe_audio", func=transcribe_audio, description="Transcribe the audio from an audio path." ) # Initialize the tool answer_python_code_tool = PythonCodeQuestionAnswerTool() # Initialize the tool answer_image_question_tool = ImageQuestionAnswerTool() # Initialize the tool answer_youtube_video_question_tool = YoutubeVideoQuestionAnswerTool() '''def answer_youtube_video_question(youtube_video_url: str, question: str) -> str: """ Answer the question based on the youtube video. """ if youtube_video_url is None: return "Error: Video not found." # Download the video video_path = YoutubeVideoDownloader().download_video(youtube_video_url) # Answer the question return VideoQuestionAnswer().answer(video_path, question) # Initialize the tool answer_youtube_video_question_tool = Tool( name="answer_youtube_video_question", func=answer_youtube_video_question, description="Answer the question based on the youtube video." )''' def read_excel_file(file_path: str) -> str: if file_path is None: return "Error: File not found." return ExcelFileReader().read_file(file_path) # Initialize the tool read_excel_file_tool = Tool( name="read_excel_file", func=read_excel_file, description="Read the excel file." ) # Initialize the tool wikipedia_search_tool = Tool( name="wikipedia_search", func=WikipediaSearcher().search, description="Search Wikipedia for a given query." ) # Initialize the tool arxiv_search_tool = Tool( name="arxiv_search", func=ArxivSearcher().search, description="Search Arxiv for a given query." ) tavily_search_tool = Tool( name="tavily_search", func=TavilySearcher().search, description="Search the web for a given query." ) def format_gaia_answer(answer: str) -> str: llm = ChatOpenAI(model="o3-mini", openai_api_key=os.getenv("OPENAI_API_KEY")) prompt = f""" You are formatting answers for the GAIA benchmark, which requires responses to be concise and unambiguous. Given the answer: {answer} Return the answer in the correct GAIA format: - If the answer is a single word or number, return it without any additional text or formatting. - If the answer is a list, return a comma-separated list without any additional text or formatting. - If the answer is a string, return it without any additional text or formatting. Do not include any prefixes, dots, enumerations, explanations, or quotation marks. Do not include any additional text or formatting. """ response = llm.invoke(prompt) # Delete double quotes return response.content.strip().replace('"', '') class AgentState(TypedDict): # The document provided messages: Annotated[list[AnyMessage], add_messages] file_path: Optional[str] class BasicAgent: def __init__(self): tools = [ tavily_search_tool, arxiv_search_tool, wikipedia_search_tool, transcribe_audio_tool, answer_python_code_tool, answer_image_question_tool, answer_youtube_video_question_tool, read_excel_file_tool ] '''llm = ChatGoogleGenerativeAI( model="gemini-2.0-flash", temperature=0.2, api_key=os.getenv("GEMINI_API_KEY") )''' llm = ChatOpenAI(model="o3-mini", openai_api_key=os.getenv("OPENAI_API_KEY")) self.llm_with_tools = llm.bind_tools(tools) builder = StateGraph(AgentState) # Define nodes: these do the work builder.add_node("assistant", self.assistant) builder.add_node("tools", ToolNode(tools)) # Define edges: these determine how the control flow moves builder.add_edge(START, "assistant") builder.add_conditional_edges( "assistant", # If the latest message requires a tool, route to tools # Otherwise, provide a direct response tools_condition, ) builder.add_edge("tools", "assistant") self.agent = builder.compile() print("BasicAgent initialized.") def assistant(self, state: AgentState): # System message textual_description_of_tools=""" tavily_search(query: str) -> str: Search the web for a given query. Args: query: Query to search the web for (string). Returns: A single string containing the information found on the web. arxiv_search(query: str) -> str: Search Arxiv, that contains scientific papers, for a given query. Args: query: Query to search Arxiv for (string). Returns: A single string containing the answer to the question. wikipedia_search(query: str) -> str: Search Wikipedia for a given query. Args: query: Query to search Wikipedia for (string). Returns: A single string containing the answer to the question. transcribe_audio(file_path: str) -> str: Transcribe the audio from an audio path. Args: file_path: File path of the audio file (string). Returns: A single string containing the transcribed text from the audio. answer_python_code(file_path: str, question: str) -> str: Answer the question based on the python code. Args: file_path: File path of the python file (string). question: Question to answer (string). Returns: A single string containing the answer to the question. answer_image_question(file_path: str, question: str) -> str: Answer the question based on the image. Args: file_path: File path of the image (string). question: Question to answer (string). Returns: A single string containing the answer to the question. download_youtube_video(youtube_video_url: str) -> str: Download the Youtube video into a local file based on the URL Args: youtube_video_url: A youtube video url (string). Returns: A single string containing the file path of the downloaded youtube video. answer_youtube_video_question(file_path: str, question: str) -> str: Answer the question based on file path of the downloaded youtube video Args: file_path: File path of the downloaded youtube video (string). question: Question to answer (string). Returns: A single string containing the answer to the question. read_excel_file(file_path: str) -> str: Read the excel file. Args: file_path: File path of the excel file (string). Returns: A markdown formatted string containing the contents of the excel file. """ file_path=state["file_path"] prompt = f""" You are a helpful assistant that can analyse images, videos, excel files and Python scripts and run computations with provided tools: {textual_description_of_tools} You have access to the file path of the attached file in case it's informed. Currently the file path is: {file_path} Be direct and specific. GAIA benchmark requires exact matching answers. For example, if asked "What is the capital of France?", respond simply with "Paris". Do not include any prefixes, dots, enumerations, explanations, or quotation marks. Do not include any additional text or formatting. If you are required a number, return a number, not the items. """ sys_msg = SystemMessage(content=prompt) return { "messages": [self.llm_with_tools.invoke([sys_msg] + state["messages"], config={"configurable": {"file_path": state["file_path"]}})], "file_path": state["file_path"] } '''return { "messages": [self.llm_with_tools.invoke( state["messages"], config={"configurable": {"file_path": state["file_path"]}} # Aquí pasas el task_id )], "file_path": state["file_path"] }''' def __call__(self, question: str, task_id: str, file_name: str) -> str: print(f"######################### Agent received question (first 50 chars): {question[:50]}... with file_name: {file_name}") # Get the file path tmp_file_path = None if file_name is not None and file_name != "": file_content = retrieve_task_file(task_id) if file_content is not None: print(f"Saving file {file_name} to tmp folder") tmp_file_path = f"tmp/{file_name}" with open(tmp_file_path, "wb") as f: f.write(file_content) # Show the file path print(f"File path: {tmp_file_path}") messages = self.agent.invoke({"messages": [HumanMessage(question)], "file_path": tmp_file_path}) # Show the messages for m in messages['messages']: m.pretty_print() answer = messages["messages"][-1].content answer = format_gaia_answer(answer) print(f"######################### Agent returning answer: {answer}\n") # Delete the file if tmp_file_path is not None: os.remove(tmp_file_path) return answer def run_and_submit_all( profile: gr.OAuthProfile | None): """ Fetches all questions, runs the BasicAgent on them, submits all answers, and displays the results. """ # --- Determine HF Space Runtime URL and Repo URL --- space_id = os.getenv("SPACE_ID") # Get the SPACE_ID for sending link to the code if profile: username= f"{profile.username}" print(f"User logged in: {username}") else: print("User not logged in.") return "Please Login to Hugging Face with the button.", None api_url = DEFAULT_API_URL questions_url = f"{api_url}/questions" submit_url = f"{api_url}/submit" # 1. Instantiate Agent ( modify this part to create your agent) try: agent = BasicAgent() except Exception as e: print(f"Error instantiating agent: {e}") return f"Error initializing agent: {e}", None # In the case of an app running as a hugging Face space, this link points toward your codebase ( usefull for others so please keep it public) agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" print(agent_code) # 2. Fetch Questions print(f"Fetching questions from: {questions_url}") try: response = requests.get(questions_url, timeout=15) response.raise_for_status() questions_data = response.json() if not questions_data: print("Fetched questions list is empty.") return "Fetched questions list is empty or invalid format.", None print(f"Fetched {len(questions_data)} questions.") except requests.exceptions.RequestException as e: print(f"Error fetching questions: {e}") return f"Error fetching questions: {e}", None except requests.exceptions.JSONDecodeError as e: print(f"Error decoding JSON response from questions endpoint: {e}") print(f"Response text: {response.text[:500]}") return f"Error decoding server response for questions: {e}", None except Exception as e: print(f"An unexpected error occurred fetching questions: {e}") return f"An unexpected error occurred fetching questions: {e}", None # 3. Run your Agent results_log = [] answers_payload = [] print(f"Running agent on {len(questions_data)} questions...") for item in questions_data: task_id = item.get("task_id") question_text = item.get("question") if not task_id or question_text is None: print(f"Skipping item with missing task_id or question: {item}") continue try: submitted_answer = agent(question_text) answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) except Exception as e: print(f"Error running agent on task {task_id}: {e}") results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) if not answers_payload: print("Agent did not produce any answers to submit.") return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) # 4. Prepare Submission submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." print(status_update) # 5. Submit print(f"Submitting {len(answers_payload)} answers to: {submit_url}") try: response = requests.post(submit_url, json=submission_data, timeout=60) response.raise_for_status() result_data = response.json() final_status = ( f"Submission Successful!\n" f"User: {result_data.get('username')}\n" f"Overall Score: {result_data.get('score', 'N/A')}% " f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" f"Message: {result_data.get('message', 'No message received.')}" ) print("Submission successful.") results_df = pd.DataFrame(results_log) return final_status, results_df except requests.exceptions.HTTPError as e: error_detail = f"Server responded with status {e.response.status_code}." try: error_json = e.response.json() error_detail += f" Detail: {error_json.get('detail', e.response.text)}" except requests.exceptions.JSONDecodeError: error_detail += f" Response: {e.response.text[:500]}" status_message = f"Submission Failed: {error_detail}" print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df except requests.exceptions.Timeout: status_message = "Submission Failed: The request timed out." print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df except requests.exceptions.RequestException as e: status_message = f"Submission Failed: Network error - {e}" print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df except Exception as e: status_message = f"An unexpected error occurred during submission: {e}" print(status_message) results_df = pd.DataFrame(results_log) return status_message, results_df # --- Build Gradio Interface using Blocks --- with gr.Blocks() as demo: gr.Markdown("# Basic Agent Evaluation Runner") gr.Markdown( """ **Instructions:** 1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc ... 2. Log in to your Hugging Face account using the button below. This uses your HF username for submission. 3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score. --- **Disclaimers:** Once clicking on the "submit button, it can take quite some time ( this is the time for the agent to go through all the questions). This space provides a basic setup and is intentionally sub-optimal to encourage you to develop your own, more robust solution. For instance for the delay process of the submit button, a solution could be to cache the answers and submit in a seperate action or even to answer the questions in async. """ ) gr.LoginButton() run_button = gr.Button("Run Evaluation & Submit All Answers") status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) # Removed max_rows=10 from DataFrame constructor results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) run_button.click( fn=run_and_submit_all, outputs=[status_output, results_table] ) if __name__ == "__main__": print("\n" + "-"*30 + " App Starting " + "-"*30) # Check for SPACE_HOST and SPACE_ID at startup for information space_host_startup = os.getenv("SPACE_HOST") space_id_startup = os.getenv("SPACE_ID") # Get SPACE_ID at startup if space_host_startup: print(f"✅ SPACE_HOST found: {space_host_startup}") print(f" Runtime URL should be: https://{space_host_startup}.hf.space") else: print("ℹ️ SPACE_HOST environment variable not found (running locally?).") if space_id_startup: # Print repo URLs if SPACE_ID is found print(f"✅ SPACE_ID found: {space_id_startup}") print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") else: print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") print("-"*(60 + len(" App Starting ")) + "\n") print("Launching Gradio Interface for Basic Agent Evaluation...") demo.launch(debug=True, share=False)