real-jiakai's picture
Update agent.py
524e312 verified
raw
history blame
14.2 kB
"""
agent.py - Claude implementation for GAIA challenge
-----------------------------------------------------------
A simplified implementation with direct litellm access to Anthropic's Claude
"""
import base64
import mimetypes
import os
import re
import tempfile
import time
import random
from typing import List, Dict, Any, Optional
import requests
from urllib.parse import urlparse
from smolagents import CodeAgent, DuckDuckGoSearchTool, PythonInterpreterTool, tool
# --------------------------------------------------------------------------- #
# Constants & helpers
# --------------------------------------------------------------------------- #
DEFAULT_API_URL = os.getenv(
"GAIA_API_URL", "https://agents-course-unit4-scoring.hf.space"
)
FILE_TAG = re.compile(r"<file:([^>]+)>") # <file:xyz>
def _download_file(file_id: str) -> bytes:
"""Download the attachment for a GAIA task."""
url = f"{DEFAULT_API_URL}/files/{file_id}"
resp = requests.get(url, timeout=30)
resp.raise_for_status()
return resp.content
# --------------------------------------------------------------------------- #
# Direct Claude model implementation with litellm
# --------------------------------------------------------------------------- #
class DirectClaudeModel:
"""
Direct interface to Claude via litellm that works with smolagents
This avoids the message format issues by keeping things very simple
"""
def __init__(
self,
api_key: Optional[str] = None,
temperature: float = 0.1
):
"""Initialize the Claude model"""
self.api_key = api_key or os.getenv("ANTHROPIC_API_KEY")
if not self.api_key:
raise ValueError("No Anthropic API key provided")
self.temperature = temperature
self.model_name = "anthropic/claude-3-5-sonnet-20240620"
print(f"Initialized DirectClaudeModel with {self.model_name}")
# Sleep random amount to avoid race conditions with many queries
time.sleep(random.uniform(1, 3))
def __call__(self, prompt: str, **kwargs) -> str:
"""
Simple call method that works with smolagents
Args:
prompt: The user prompt
**kwargs: Additional parameters (ignored)
Returns:
Claude's response as a string
"""
# Import here to avoid any circular imports
from litellm import completion
# Use a simple format: system message + user message
messages = [
{
"role": "system",
"content": """You are a concise, highly accurate assistant specialized in solving challenges.
Your answers should be precise, direct, and exactly match the expected format.
All answers are graded by exact string match, so format carefully!"""
},
{
"role": "user",
"content": prompt
}
]
# Add delay to avoid rate limits
time.sleep(random.uniform(0.5, 2.0))
try:
# Make API call with simple format
response = completion(
model=self.model_name,
messages=messages,
temperature=self.temperature,
max_tokens=1024,
api_key=self.api_key
)
# Extract and return the text content only
return response.choices[0].message.content
except Exception as e:
# If it's a rate limit error, wait and retry
if "rate_limit" in str(e).lower():
print(f"Rate limit hit, waiting 30 seconds: {e}")
time.sleep(30)
return self.__call__(prompt, **kwargs)
else:
print(f"Error: {str(e)}")
raise
# --------------------------------------------------------------------------- #
# Tools section - All tools used by the agent
# --------------------------------------------------------------------------- #
@tool
def gaia_file_reader(file_id: str) -> str:
"""
Download a GAIA attachment and return its contents.
Args:
file_id: The identifier of the file to download from GAIA API.
Returns:
The content of the file as a string (text files) or base64-encoded (binary files).
"""
try:
raw = _download_file(file_id)
mime = mimetypes.guess_type(file_id)[0] or "application/octet-stream"
if mime.startswith("text") or mime in ("application/json",):
return raw.decode(errors="ignore")
return base64.b64encode(raw).decode()
except Exception as exc:
return f"ERROR downloading {file_id}: {exc}"
@tool
def save_and_read_file(content: str, filename: Optional[str] = None) -> str:
"""
Save content to a temporary file and return the path.
Args:
content: The content to save to the file.
filename: Optional filename, will generate a random name if not provided.
Returns:
Path to the saved file.
"""
temp_dir = tempfile.gettempdir()
if filename is None:
temp_file = tempfile.NamedTemporaryFile(delete=False)
filepath = temp_file.name
else:
filepath = os.path.join(temp_dir, filename)
with open(filepath, 'w') as f:
f.write(content)
return f"File saved to {filepath}."
@tool
def analyze_csv_file(file_path: str, query: str) -> str:
"""
Analyze a CSV file using pandas and answer questions about it.
Args:
file_path: Path to the CSV file to analyze.
query: A question or instruction about what to analyze in the file.
Returns:
Analysis results as text.
"""
try:
import pandas as pd
df = pd.read_csv(file_path)
result = f"CSV file loaded with {len(df)} rows and {len(df.columns)} columns.\n"
result += f"Columns: {', '.join(df.columns)}\n\n"
result += "Summary statistics:\n"
result += str(df.describe())
return result
except ImportError:
return "Error: pandas is not installed."
except Exception as e:
return f"Error analyzing CSV file: {str(e)}"
@tool
def analyze_excel_file(file_path: str, query: str) -> str:
"""
Analyze an Excel file using pandas and answer questions about it.
Args:
file_path: Path to the Excel file to analyze.
query: A question or instruction about what to analyze in the file.
Returns:
Analysis results as text.
"""
try:
import pandas as pd
df = pd.read_excel(file_path)
result = f"Excel file loaded with {len(df)} rows and {len(df.columns)} columns.\n"
result += f"Columns: {', '.join(df.columns)}\n\n"
result += "Summary statistics:\n"
result += str(df.describe())
return result
except ImportError:
return "Error: pandas and openpyxl are not installed."
except Exception as e:
return f"Error analyzing Excel file: {str(e)}"
@tool
def download_file_from_url(url: str, filename: Optional[str] = None) -> str:
"""
Download a file from a URL and save it to a temporary location.
Args:
url: The URL to download from.
filename: Optional filename, will generate one based on URL if not provided.
Returns:
Path to the downloaded file.
"""
try:
# Parse URL to get filename if not provided
if not filename:
path = urlparse(url).path
filename = os.path.basename(path)
if not filename:
# Generate a random name if we couldn't extract one
import uuid
filename = f"downloaded_{uuid.uuid4().hex[:8]}"
# Create temporary file
temp_dir = tempfile.gettempdir()
filepath = os.path.join(temp_dir, filename)
# Download the file
response = requests.get(url, stream=True)
response.raise_for_status()
# Save the file
with open(filepath, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return f"File downloaded to {filepath}. You can now process this file."
except Exception as e:
return f"Error downloading file: {str(e)}"
@tool
def extract_text_from_image(image_path: str) -> str:
"""
Extract text from an image using pytesseract (if available).
Args:
image_path: Path to the image file to extract text from.
Returns:
Extracted text from the image.
"""
try:
# Try to import pytesseract
import pytesseract
from PIL import Image
# Open the image
image = Image.open(image_path)
# Extract text
text = pytesseract.image_to_string(image)
return f"Extracted text from image:\n\n{text}"
except ImportError:
return "Error: pytesseract is not installed. Please install it with 'pip install pytesseract' and ensure Tesseract OCR is installed on your system."
except Exception as e:
return f"Error extracting text from image: {str(e)}"
# --------------------------------------------------------------------------- #
# ClaudeAgent - Main class for GAIA challenge
# --------------------------------------------------------------------------- #
class ClaudeAgent:
"""A simplified Claude agent for the GAIA challenge"""
def __init__(self):
"""Initialize the agent with Claude"""
try:
# Get API key
api_key = os.getenv("ANTHROPIC_API_KEY")
if not api_key:
raise ValueError("ANTHROPIC_API_KEY environment variable not found")
print("βœ… Initializing ClaudeAgent")
# Create the model with direct implementation
model = DirectClaudeModel(api_key=api_key, temperature=0.1)
# Set up tools
tools = [
DuckDuckGoSearchTool(),
PythonInterpreterTool(),
save_and_read_file,
analyze_csv_file,
analyze_excel_file,
gaia_file_reader,
download_file_from_url,
extract_text_from_image
]
# Create the CodeAgent
self.agent = CodeAgent(
tools=tools,
model=model,
additional_authorized_imports=["pandas", "numpy", "json", "re", "math"],
executor_type="local",
verbosity_level=2
)
print("Agent initialized successfully")
except Exception as e:
print(f"Error initializing ClaudeAgent: {e}")
raise
def __call__(self, question: str) -> str:
"""Process a question and return the answer"""
try:
print(f"Processing question: {question[:100]}..." if len(question) > 100 else question)
# Add a small delay between questions
time.sleep(random.uniform(1.0, 3.0))
# Handle file references
file_match = re.search(r"<file:([^>]+)>", question)
if file_match:
file_id = file_match.group(1)
print(f"Detected file: {file_id}")
# Download file
try:
file_content = _download_file(file_id)
temp_dir = tempfile.gettempdir()
file_path = os.path.join(temp_dir, file_id)
with open(file_path, 'wb') as f:
f.write(file_content)
# Remove file tag from question
clean_question = re.sub(r"<file:[^>]+>", "", question).strip()
# Build prompt with file context
prompt = f"""
Question: {clean_question}
There is a file available at path: {file_path}
Use appropriate tools to analyze this file if needed.
Answer the question directly and precisely.
"""
except Exception as e:
print(f"Error downloading file: {e}")
prompt = question
else:
# Handle reversed text separately
if question.startswith(".") or ".rewsna eht sa" in question:
prompt = f"""
This question is in reversed text. Here's the normal version:
{question[::-1]}
Answer the question directly and precisely.
"""
else:
prompt = question
# Execute agent with prompt
answer = self.agent.run(prompt)
# Clean up response
answer = self._clean_answer(answer)
print(f"Generated answer: {answer}")
return answer
except Exception as e:
print(f"Error: {str(e)}")
return f"Error processing question: {str(e)}"
def _clean_answer(self, answer: any) -> str:
"""Clean up the answer for exact matching"""
if not isinstance(answer, str):
return str(answer)
# Normalize spacing
answer = answer.strip()
# Remove common prefixes
prefixes = [
"The answer is ", "Answer: ", "Final answer: ",
"The result is ", "Based on the information provided, "
]
for prefix in prefixes:
if answer.startswith(prefix):
answer = answer[len(prefix):].strip()
# Remove quotes
if (answer.startswith('"') and answer.endswith('"')) or (
answer.startswith("'") and answer.endswith("'")
):
answer = answer[1:-1].strip()
return answer