|
""" |
|
agent.py - Claude implementation for GAIA challenge |
|
----------------------------------------------------------- |
|
A simplified implementation with direct litellm access to Anthropic's Claude |
|
""" |
|
|
|
import base64 |
|
import mimetypes |
|
import os |
|
import re |
|
import tempfile |
|
import time |
|
import random |
|
from typing import List, Dict, Any, Optional |
|
import requests |
|
from urllib.parse import urlparse |
|
|
|
from smolagents import CodeAgent, DuckDuckGoSearchTool, PythonInterpreterTool, tool |
|
|
|
|
|
|
|
|
|
DEFAULT_API_URL = os.getenv( |
|
"GAIA_API_URL", "https://agents-course-unit4-scoring.hf.space" |
|
) |
|
FILE_TAG = re.compile(r"<file:([^>]+)>") |
|
|
|
def _download_file(file_id: str) -> bytes: |
|
"""Download the attachment for a GAIA task.""" |
|
url = f"{DEFAULT_API_URL}/files/{file_id}" |
|
resp = requests.get(url, timeout=30) |
|
resp.raise_for_status() |
|
return resp.content |
|
|
|
|
|
|
|
|
|
class DirectClaudeModel: |
|
""" |
|
Direct interface to Claude via litellm that works with smolagents |
|
This avoids the message format issues by keeping things very simple |
|
""" |
|
|
|
def __init__( |
|
self, |
|
api_key: Optional[str] = None, |
|
temperature: float = 0.1 |
|
): |
|
"""Initialize the Claude model""" |
|
self.api_key = api_key or os.getenv("ANTHROPIC_API_KEY") |
|
if not self.api_key: |
|
raise ValueError("No Anthropic API key provided") |
|
|
|
self.temperature = temperature |
|
self.model_name = "anthropic/claude-3-5-sonnet-20240620" |
|
|
|
print(f"Initialized DirectClaudeModel with {self.model_name}") |
|
|
|
|
|
time.sleep(random.uniform(1, 3)) |
|
|
|
def __call__(self, prompt: str, **kwargs) -> str: |
|
""" |
|
Simple call method that works with smolagents |
|
|
|
Args: |
|
prompt: The user prompt |
|
**kwargs: Additional parameters (ignored) |
|
|
|
Returns: |
|
Claude's response as a string |
|
""" |
|
|
|
from litellm import completion |
|
|
|
|
|
messages = [ |
|
{ |
|
"role": "system", |
|
"content": """You are a concise, highly accurate assistant specialized in solving challenges. |
|
Your answers should be precise, direct, and exactly match the expected format. |
|
All answers are graded by exact string match, so format carefully!""" |
|
}, |
|
{ |
|
"role": "user", |
|
"content": prompt |
|
} |
|
] |
|
|
|
|
|
time.sleep(random.uniform(0.5, 2.0)) |
|
|
|
try: |
|
|
|
response = completion( |
|
model=self.model_name, |
|
messages=messages, |
|
temperature=self.temperature, |
|
max_tokens=1024, |
|
api_key=self.api_key |
|
) |
|
|
|
|
|
return response.choices[0].message.content |
|
|
|
except Exception as e: |
|
|
|
if "rate_limit" in str(e).lower(): |
|
print(f"Rate limit hit, waiting 30 seconds: {e}") |
|
time.sleep(30) |
|
return self.__call__(prompt, **kwargs) |
|
else: |
|
print(f"Error: {str(e)}") |
|
raise |
|
|
|
|
|
|
|
|
|
@tool |
|
def gaia_file_reader(file_id: str) -> str: |
|
""" |
|
Download a GAIA attachment and return its contents. |
|
|
|
Args: |
|
file_id: The identifier of the file to download from GAIA API. |
|
|
|
Returns: |
|
The content of the file as a string (text files) or base64-encoded (binary files). |
|
""" |
|
try: |
|
raw = _download_file(file_id) |
|
mime = mimetypes.guess_type(file_id)[0] or "application/octet-stream" |
|
if mime.startswith("text") or mime in ("application/json",): |
|
return raw.decode(errors="ignore") |
|
return base64.b64encode(raw).decode() |
|
except Exception as exc: |
|
return f"ERROR downloading {file_id}: {exc}" |
|
|
|
@tool |
|
def save_and_read_file(content: str, filename: Optional[str] = None) -> str: |
|
""" |
|
Save content to a temporary file and return the path. |
|
|
|
Args: |
|
content: The content to save to the file. |
|
filename: Optional filename, will generate a random name if not provided. |
|
|
|
Returns: |
|
Path to the saved file. |
|
""" |
|
temp_dir = tempfile.gettempdir() |
|
if filename is None: |
|
temp_file = tempfile.NamedTemporaryFile(delete=False) |
|
filepath = temp_file.name |
|
else: |
|
filepath = os.path.join(temp_dir, filename) |
|
|
|
with open(filepath, 'w') as f: |
|
f.write(content) |
|
|
|
return f"File saved to {filepath}." |
|
|
|
@tool |
|
def analyze_csv_file(file_path: str, query: str) -> str: |
|
""" |
|
Analyze a CSV file using pandas and answer questions about it. |
|
|
|
Args: |
|
file_path: Path to the CSV file to analyze. |
|
query: A question or instruction about what to analyze in the file. |
|
|
|
Returns: |
|
Analysis results as text. |
|
""" |
|
try: |
|
import pandas as pd |
|
df = pd.read_csv(file_path) |
|
|
|
result = f"CSV file loaded with {len(df)} rows and {len(df.columns)} columns.\n" |
|
result += f"Columns: {', '.join(df.columns)}\n\n" |
|
result += "Summary statistics:\n" |
|
result += str(df.describe()) |
|
|
|
return result |
|
except ImportError: |
|
return "Error: pandas is not installed." |
|
except Exception as e: |
|
return f"Error analyzing CSV file: {str(e)}" |
|
|
|
@tool |
|
def analyze_excel_file(file_path: str, query: str) -> str: |
|
""" |
|
Analyze an Excel file using pandas and answer questions about it. |
|
|
|
Args: |
|
file_path: Path to the Excel file to analyze. |
|
query: A question or instruction about what to analyze in the file. |
|
|
|
Returns: |
|
Analysis results as text. |
|
""" |
|
try: |
|
import pandas as pd |
|
df = pd.read_excel(file_path) |
|
|
|
result = f"Excel file loaded with {len(df)} rows and {len(df.columns)} columns.\n" |
|
result += f"Columns: {', '.join(df.columns)}\n\n" |
|
result += "Summary statistics:\n" |
|
result += str(df.describe()) |
|
|
|
return result |
|
except ImportError: |
|
return "Error: pandas and openpyxl are not installed." |
|
except Exception as e: |
|
return f"Error analyzing Excel file: {str(e)}" |
|
|
|
@tool |
|
def download_file_from_url(url: str, filename: Optional[str] = None) -> str: |
|
""" |
|
Download a file from a URL and save it to a temporary location. |
|
|
|
Args: |
|
url: The URL to download from. |
|
filename: Optional filename, will generate one based on URL if not provided. |
|
|
|
Returns: |
|
Path to the downloaded file. |
|
""" |
|
try: |
|
|
|
if not filename: |
|
path = urlparse(url).path |
|
filename = os.path.basename(path) |
|
if not filename: |
|
|
|
import uuid |
|
filename = f"downloaded_{uuid.uuid4().hex[:8]}" |
|
|
|
|
|
temp_dir = tempfile.gettempdir() |
|
filepath = os.path.join(temp_dir, filename) |
|
|
|
|
|
response = requests.get(url, stream=True) |
|
response.raise_for_status() |
|
|
|
|
|
with open(filepath, 'wb') as f: |
|
for chunk in response.iter_content(chunk_size=8192): |
|
f.write(chunk) |
|
|
|
return f"File downloaded to {filepath}. You can now process this file." |
|
except Exception as e: |
|
return f"Error downloading file: {str(e)}" |
|
|
|
@tool |
|
def extract_text_from_image(image_path: str) -> str: |
|
""" |
|
Extract text from an image using pytesseract (if available). |
|
|
|
Args: |
|
image_path: Path to the image file to extract text from. |
|
|
|
Returns: |
|
Extracted text from the image. |
|
""" |
|
try: |
|
|
|
import pytesseract |
|
from PIL import Image |
|
|
|
|
|
image = Image.open(image_path) |
|
|
|
|
|
text = pytesseract.image_to_string(image) |
|
|
|
return f"Extracted text from image:\n\n{text}" |
|
except ImportError: |
|
return "Error: pytesseract is not installed. Please install it with 'pip install pytesseract' and ensure Tesseract OCR is installed on your system." |
|
except Exception as e: |
|
return f"Error extracting text from image: {str(e)}" |
|
|
|
|
|
|
|
|
|
class ClaudeAgent: |
|
"""A simplified Claude agent for the GAIA challenge""" |
|
|
|
def __init__(self): |
|
"""Initialize the agent with Claude""" |
|
try: |
|
|
|
api_key = os.getenv("ANTHROPIC_API_KEY") |
|
if not api_key: |
|
raise ValueError("ANTHROPIC_API_KEY environment variable not found") |
|
|
|
print("β
Initializing ClaudeAgent") |
|
|
|
|
|
model = DirectClaudeModel(api_key=api_key, temperature=0.1) |
|
|
|
|
|
tools = [ |
|
DuckDuckGoSearchTool(), |
|
PythonInterpreterTool(), |
|
save_and_read_file, |
|
analyze_csv_file, |
|
analyze_excel_file, |
|
gaia_file_reader, |
|
download_file_from_url, |
|
extract_text_from_image |
|
] |
|
|
|
|
|
self.agent = CodeAgent( |
|
tools=tools, |
|
model=model, |
|
additional_authorized_imports=["pandas", "numpy", "json", "re", "math"], |
|
executor_type="local", |
|
verbosity_level=2 |
|
) |
|
|
|
print("Agent initialized successfully") |
|
|
|
except Exception as e: |
|
print(f"Error initializing ClaudeAgent: {e}") |
|
raise |
|
|
|
def __call__(self, question: str) -> str: |
|
"""Process a question and return the answer""" |
|
try: |
|
print(f"Processing question: {question[:100]}..." if len(question) > 100 else question) |
|
|
|
|
|
time.sleep(random.uniform(1.0, 3.0)) |
|
|
|
|
|
file_match = re.search(r"<file:([^>]+)>", question) |
|
if file_match: |
|
file_id = file_match.group(1) |
|
print(f"Detected file: {file_id}") |
|
|
|
|
|
try: |
|
file_content = _download_file(file_id) |
|
temp_dir = tempfile.gettempdir() |
|
file_path = os.path.join(temp_dir, file_id) |
|
|
|
with open(file_path, 'wb') as f: |
|
f.write(file_content) |
|
|
|
|
|
clean_question = re.sub(r"<file:[^>]+>", "", question).strip() |
|
|
|
|
|
prompt = f""" |
|
Question: {clean_question} |
|
There is a file available at path: {file_path} |
|
Use appropriate tools to analyze this file if needed. |
|
Answer the question directly and precisely. |
|
""" |
|
except Exception as e: |
|
print(f"Error downloading file: {e}") |
|
prompt = question |
|
else: |
|
|
|
if question.startswith(".") or ".rewsna eht sa" in question: |
|
prompt = f""" |
|
This question is in reversed text. Here's the normal version: |
|
{question[::-1]} |
|
Answer the question directly and precisely. |
|
""" |
|
else: |
|
prompt = question |
|
|
|
|
|
answer = self.agent.run(prompt) |
|
|
|
|
|
answer = self._clean_answer(answer) |
|
|
|
print(f"Generated answer: {answer}") |
|
return answer |
|
|
|
except Exception as e: |
|
print(f"Error: {str(e)}") |
|
return f"Error processing question: {str(e)}" |
|
|
|
def _clean_answer(self, answer: any) -> str: |
|
"""Clean up the answer for exact matching""" |
|
if not isinstance(answer, str): |
|
return str(answer) |
|
|
|
|
|
answer = answer.strip() |
|
|
|
|
|
prefixes = [ |
|
"The answer is ", "Answer: ", "Final answer: ", |
|
"The result is ", "Based on the information provided, " |
|
] |
|
|
|
for prefix in prefixes: |
|
if answer.startswith(prefix): |
|
answer = answer[len(prefix):].strip() |
|
|
|
|
|
if (answer.startswith('"') and answer.endswith('"')) or ( |
|
answer.startswith("'") and answer.endswith("'") |
|
): |
|
answer = answer[1:-1].strip() |
|
|
|
return answer |