""" agent.py - Claude implementation for GAIA challenge ----------------------------------------------------------- A simplified implementation with direct litellm access to Anthropic's Claude """ import base64 import mimetypes import os import re import tempfile import time import random from typing import List, Dict, Any, Optional import requests from urllib.parse import urlparse from smolagents import CodeAgent, DuckDuckGoSearchTool, PythonInterpreterTool, tool # --------------------------------------------------------------------------- # # Constants & helpers # --------------------------------------------------------------------------- # DEFAULT_API_URL = os.getenv( "GAIA_API_URL", "https://agents-course-unit4-scoring.hf.space" ) FILE_TAG = re.compile(r"]+)>") # def _download_file(file_id: str) -> bytes: """Download the attachment for a GAIA task.""" url = f"{DEFAULT_API_URL}/files/{file_id}" resp = requests.get(url, timeout=30) resp.raise_for_status() return resp.content # --------------------------------------------------------------------------- # # Direct Claude model implementation with litellm # --------------------------------------------------------------------------- # class DirectClaudeModel: """ Direct interface to Claude via litellm that works with smolagents This avoids the message format issues by keeping things very simple """ def __init__( self, api_key: Optional[str] = None, temperature: float = 0.1 ): """Initialize the Claude model""" self.api_key = api_key or os.getenv("ANTHROPIC_API_KEY") if not self.api_key: raise ValueError("No Anthropic API key provided") self.temperature = temperature self.model_name = "anthropic/claude-3-5-sonnet-20240620" print(f"Initialized DirectClaudeModel with {self.model_name}") # Sleep random amount to avoid race conditions with many queries time.sleep(random.uniform(1, 3)) def __call__(self, prompt: str, **kwargs) -> str: """ Simple call method that works with smolagents Args: prompt: The user prompt **kwargs: Additional parameters (ignored) Returns: Claude's response as a string """ # Import here to avoid any circular imports from litellm import completion # Use a simple format: system message + user message messages = [ { "role": "system", "content": """You are a concise, highly accurate assistant specialized in solving challenges. Your answers should be precise, direct, and exactly match the expected format. All answers are graded by exact string match, so format carefully!""" }, { "role": "user", "content": prompt } ] # Add delay to avoid rate limits time.sleep(random.uniform(0.5, 2.0)) try: # Make API call with simple format response = completion( model=self.model_name, messages=messages, temperature=self.temperature, max_tokens=1024, api_key=self.api_key ) # Extract and return the text content only return response.choices[0].message.content except Exception as e: # If it's a rate limit error, wait and retry if "rate_limit" in str(e).lower(): print(f"Rate limit hit, waiting 30 seconds: {e}") time.sleep(30) return self.__call__(prompt, **kwargs) else: print(f"Error: {str(e)}") raise # --------------------------------------------------------------------------- # # Tools section - All tools used by the agent # --------------------------------------------------------------------------- # @tool def gaia_file_reader(file_id: str) -> str: """ Download a GAIA attachment and return its contents. Args: file_id: The identifier of the file to download from GAIA API. Returns: The content of the file as a string (text files) or base64-encoded (binary files). """ try: raw = _download_file(file_id) mime = mimetypes.guess_type(file_id)[0] or "application/octet-stream" if mime.startswith("text") or mime in ("application/json",): return raw.decode(errors="ignore") return base64.b64encode(raw).decode() except Exception as exc: return f"ERROR downloading {file_id}: {exc}" @tool def save_and_read_file(content: str, filename: Optional[str] = None) -> str: """ Save content to a temporary file and return the path. Args: content: The content to save to the file. filename: Optional filename, will generate a random name if not provided. Returns: Path to the saved file. """ temp_dir = tempfile.gettempdir() if filename is None: temp_file = tempfile.NamedTemporaryFile(delete=False) filepath = temp_file.name else: filepath = os.path.join(temp_dir, filename) with open(filepath, 'w') as f: f.write(content) return f"File saved to {filepath}." @tool def analyze_csv_file(file_path: str, query: str) -> str: """ Analyze a CSV file using pandas and answer questions about it. Args: file_path: Path to the CSV file to analyze. query: A question or instruction about what to analyze in the file. Returns: Analysis results as text. """ try: import pandas as pd df = pd.read_csv(file_path) result = f"CSV file loaded with {len(df)} rows and {len(df.columns)} columns.\n" result += f"Columns: {', '.join(df.columns)}\n\n" result += "Summary statistics:\n" result += str(df.describe()) return result except ImportError: return "Error: pandas is not installed." except Exception as e: return f"Error analyzing CSV file: {str(e)}" @tool def analyze_excel_file(file_path: str, query: str) -> str: """ Analyze an Excel file using pandas and answer questions about it. Args: file_path: Path to the Excel file to analyze. query: A question or instruction about what to analyze in the file. Returns: Analysis results as text. """ try: import pandas as pd df = pd.read_excel(file_path) result = f"Excel file loaded with {len(df)} rows and {len(df.columns)} columns.\n" result += f"Columns: {', '.join(df.columns)}\n\n" result += "Summary statistics:\n" result += str(df.describe()) return result except ImportError: return "Error: pandas and openpyxl are not installed." except Exception as e: return f"Error analyzing Excel file: {str(e)}" @tool def download_file_from_url(url: str, filename: Optional[str] = None) -> str: """ Download a file from a URL and save it to a temporary location. Args: url: The URL to download from. filename: Optional filename, will generate one based on URL if not provided. Returns: Path to the downloaded file. """ try: # Parse URL to get filename if not provided if not filename: path = urlparse(url).path filename = os.path.basename(path) if not filename: # Generate a random name if we couldn't extract one import uuid filename = f"downloaded_{uuid.uuid4().hex[:8]}" # Create temporary file temp_dir = tempfile.gettempdir() filepath = os.path.join(temp_dir, filename) # Download the file response = requests.get(url, stream=True) response.raise_for_status() # Save the file with open(filepath, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) return f"File downloaded to {filepath}. You can now process this file." except Exception as e: return f"Error downloading file: {str(e)}" @tool def extract_text_from_image(image_path: str) -> str: """ Extract text from an image using pytesseract (if available). Args: image_path: Path to the image file to extract text from. Returns: Extracted text from the image. """ try: # Try to import pytesseract import pytesseract from PIL import Image # Open the image image = Image.open(image_path) # Extract text text = pytesseract.image_to_string(image) return f"Extracted text from image:\n\n{text}" except ImportError: return "Error: pytesseract is not installed. Please install it with 'pip install pytesseract' and ensure Tesseract OCR is installed on your system." except Exception as e: return f"Error extracting text from image: {str(e)}" # --------------------------------------------------------------------------- # # ClaudeAgent - Main class for GAIA challenge # --------------------------------------------------------------------------- # class ClaudeAgent: """A simplified Claude agent for the GAIA challenge""" def __init__(self): """Initialize the agent with Claude""" try: # Get API key api_key = os.getenv("ANTHROPIC_API_KEY") if not api_key: raise ValueError("ANTHROPIC_API_KEY environment variable not found") print("✅ Initializing ClaudeAgent") # Create the model with direct implementation model = DirectClaudeModel(api_key=api_key, temperature=0.1) # Set up tools tools = [ DuckDuckGoSearchTool(), PythonInterpreterTool(), save_and_read_file, analyze_csv_file, analyze_excel_file, gaia_file_reader, download_file_from_url, extract_text_from_image ] # Create the CodeAgent self.agent = CodeAgent( tools=tools, model=model, additional_authorized_imports=["pandas", "numpy", "json", "re", "math"], executor_type="local", verbosity_level=2 ) print("Agent initialized successfully") except Exception as e: print(f"Error initializing ClaudeAgent: {e}") raise def __call__(self, question: str) -> str: """Process a question and return the answer""" try: print(f"Processing question: {question[:100]}..." if len(question) > 100 else question) # Add a small delay between questions time.sleep(random.uniform(1.0, 3.0)) # Handle file references file_match = re.search(r"]+)>", question) if file_match: file_id = file_match.group(1) print(f"Detected file: {file_id}") # Download file try: file_content = _download_file(file_id) temp_dir = tempfile.gettempdir() file_path = os.path.join(temp_dir, file_id) with open(file_path, 'wb') as f: f.write(file_content) # Remove file tag from question clean_question = re.sub(r"]+>", "", question).strip() # Build prompt with file context prompt = f""" Question: {clean_question} There is a file available at path: {file_path} Use appropriate tools to analyze this file if needed. Answer the question directly and precisely. """ except Exception as e: print(f"Error downloading file: {e}") prompt = question else: # Handle reversed text separately if question.startswith(".") or ".rewsna eht sa" in question: prompt = f""" This question is in reversed text. Here's the normal version: {question[::-1]} Answer the question directly and precisely. """ else: prompt = question # Execute agent with prompt answer = self.agent.run(prompt) # Clean up response answer = self._clean_answer(answer) print(f"Generated answer: {answer}") return answer except Exception as e: print(f"Error: {str(e)}") return f"Error processing question: {str(e)}" def _clean_answer(self, answer: any) -> str: """Clean up the answer for exact matching""" if not isinstance(answer, str): return str(answer) # Normalize spacing answer = answer.strip() # Remove common prefixes prefixes = [ "The answer is ", "Answer: ", "Final answer: ", "The result is ", "Based on the information provided, " ] for prefix in prefixes: if answer.startswith(prefix): answer = answer[len(prefix):].strip() # Remove quotes if (answer.startswith('"') and answer.endswith('"')) or ( answer.startswith("'") and answer.endswith("'") ): answer = answer[1:-1].strip() return answer