Test_Magus / agent.py
SergeyO7's picture
Update agent.py
a966bbf verified
raw
history blame
11.1 kB
from smolagents import CodeAgent, LiteLLMModel, tool, Tool, load_tool, DuckDuckGoSearchTool, WikipediaSearchTool #, HfApiModel, OpenAIServerModel
import asyncio
import os
import re
import pandas as pd
from typing import Optional
from token_bucket import Limiter
import yaml
from PIL import Image
import requests
from io import BytesIO
from markdownify import markdownify
import whisper
# Simulated additional tools (implementation depends on external APIs or setup)
#@tool
#def GoogleSearchTool(query: str) -> str:
# """Tool for performing Google searches using Custom Search JSON API
# Args:
# query (str): Search query string
# Returns:
# str: Formatted search results
# """
# cse_id = os.environ.get("GOOGLE_CSE_ID")
# if not api_key or not cse_id:
# raise ValueError("GOOGLE_API_KEY and GOOGLE_CSE_ID must be set in environment variables.")
# url = "https://www.googleapis.com/customsearch/v1"
# params = {
# "key": api_key,
# "cx": cse_id,
# "q": query,
# "num": 5 # Number of results to return
# }
# try:
# response = requests.get(url, params=params)
# response.raise_for_status()
# results = response.json().get("items", [])
# return "\n".join([f"{item['title']}: {item['link']}" for item in results]) or "No results found."
# except Exception as e:
# return f"Error performing Google search: {str(e)}"
#@tool
#def ImageAnalysisTool(question: str, model: LiteLLMModel) -> str:
# """Tool for analyzing images mentioned in the question.
# Args:
# question (str): The question text which may contain an image URL.
# Returns:
# str: Image description or error message.
# """
# # Extract URL from question using regex
# url_pattern = r'https?://\S+'
# match = re.search(url_pattern, question)
# if not match:
# return "No image URL found in the question."
# image_url = match.group(0)
#
# headers = {
# "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36"
# }
# try:
# response = requests.get(image_url, headers=headers)
# response.raise_for_status()
# image = Image.open(BytesIO(response.content)).convert("RGB")
# except Exception as e:
# return f"Error fetching image: {e}"
#
# agent = CodeAgent(
# tools=[],
# model=model,
# max_steps=10,
# verbosity_level=2
# )
#
# response = agent.run(
# "Describe in details the chess position you see in the image.",
# images=[image]
# )
#
# return f"The image description: '{response}'"
class VisitWebpageTool(Tool):
name = "visit_webpage"
description = "Visits a webpage at the given url and reads its content as a markdown string. Use this to browse webpages."
inputs = {'url': {'type': 'string', 'description': 'The url of the webpage to visit.'}}
output_type = "string"
def forward(self, url: str) -> str:
try:
import requests
from markdownify import markdownify
from requests.exceptions import RequestException
from smolagents.utils import truncate_content
except ImportError as e:
raise ImportError(
"You must install packages `markdownify` and `requests` to run this tool: for instance run `pip install markdownify requests`."
) from e
try:
# Send a GET request to the URL with a 20-second timeout
response = requests.get(url, timeout=20)
response.raise_for_status() # Raise an exception for bad status codes
# Convert the HTML content to Markdown
markdown_content = markdownify(response.text).strip()
# Remove multiple line breaks
markdown_content = re.sub(r"\n{3,}", "\n\n", markdown_content)
return truncate_content(markdown_content, 10000)
except requests.exceptions.Timeout:
return "The request timed out. Please try again later or check the URL."
except RequestException as e:
return f"Error fetching the webpage: {str(e)}"
except Exception as e:
return f"An unexpected error occurred: {str(e)}"
def __init__(self, *args, **kwargs):
self.is_initialized = False
class DownloadTaskAttachmentTool(Tool):
name = "download_file"
description = "Downloads the file attached to the task ID"
inputs = {'task_id': {'type': 'string', 'description': 'The task id to download attachment from.'}}
output_type = "string"
def forward(self, task_id: str) -> str:
"""
Downloads a file associated with the given task ID.
Returns the file path where the file is saved locally.
"""
file_url = f"{DEFAULT_API_URL}/files/{task_id}"
local_file_path = f"downloads/{task_id}.file"
print(f"Downloading file for task ID {task_id} from {file_url}...")
try:
file_url = f"{DEFAULT_API_URL}/files/{task_id}"
local_path = f"downloads/{task_id}.xlsx"
response = requests.get(file_url, stream=True, timeout=15)
response.raise_for_status()
os.makedirs("downloads", exist_ok=True)
with open(local_file_path, "wb") as file:
for chunk in response.iter_content(chunk_size=8192):
file.write(chunk)
print(f"File downloaded successfully: {local_file_path}")
return local_file_path
except requests.exceptions.RequestException as e:
print(f"Error downloading file for task {task_id}: {e}")
raise
def __init__(self, *args, **kwargs):
self.is_initialized = False
@tool
def SpeechToTextTool(audio_path: str) -> str:
"""Tool for converting an audio file to text using OpenAI Whisper.
Args:
audio_path (str): Path to audio file
Returns:
str: audio speech text
"""
model = whisper.load_model("base")
if not os.path.exists(audio_path):
return f"Error: File not found at {audio_path}"
result = model.transcribe(audio_path)
return result.get("text", "")
class ExcelReaderTool(Tool):
name = "excel_reader"
description = """
This tool reads and processes Excel files (.xlsx, .xls).
It can extract data, calculate statistics, and perform data analysis on spreadsheets.
"""
inputs = {
"excel_path": {
"type": "string",
"description": "The path to the Excel file to read",
},
"sheet_name": {
"type": "string",
"description": "The name of the sheet to read (optional, defaults to first sheet)",
"nullable": True
}
}
output_type = "string"
def forward(self, excel_path: str, sheet_name: str = None) -> str:
"""
Reads and processes the given Excel file.
"""
try:
# Check if the file exists
if not os.path.exists(excel_path):
return f"Error: Excel file not found at {excel_path}"
import pandas as pd
# Read the Excel file
if sheet_name:
df = pd.read_excel(excel_path, sheet_name=sheet_name)
else:
df = pd.read_excel(excel_path)
# Get basic info about the data
info = {
"shape": df.shape,
"columns": list(df.columns),
"dtypes": df.dtypes.to_dict(),
"head": df.head(5).to_dict()
}
# Return formatted info
result = f"Excel file: {excel_path}\n"
result += f"Shape: {info['shape'][0]} rows × {info['shape'][1]} columns\n\n"
result += "Columns:\n"
for col in info['columns']:
result += f"- {col} ({info['dtypes'].get(col)})\n"
result += "\nPreview (first 5 rows):\n"
result += df.head(5).to_string()
return result
except Exception as e:
return f"Error reading Excel file: {str(e)}"
#@tool
#class LocalFileAudioTool:
# """Tool for transcribing audio files"""
#
# @tool
# def transcribe(self, file_path: str) -> str:
# """Transcribe audio from file
# Args:
# file_path (str): Path to audio file
# Returns:
# str: Transcription text
# """
# return f"Transcribed audio from '{file_path}' (simulated)."
class MagAgent:
def __init__(self, rate_limiter: Optional[Limiter] = None):
"""Initialize the MagAgent with search tools."""
self.rate_limiter = rate_limiter
print("Initializing MagAgent with search tools...")
model = LiteLLMModel(
model_id="gemini/gemini-2.0-flash",
api_key= os.environ.get("GEMINI_KEY"),
max_tokens=8192
)
# Load prompt templates
with open("prompts.yaml", 'r') as stream:
prompt_templates = yaml.safe_load(stream)
self.agent = CodeAgent(
model= model,
tools=[
# GoogleSearchTool,
DownloadTaskAttachmentTool(),
DuckDuckGoSearchTool(),
WikipediaSearchTool(),
# ImageAnalysisTool,
SpeechToTextTool,
ExcelReaderTool()
# LocalFileAudioTool()
],
verbosity_level=2,
add_base_tools=True,
max_steps=20
)
print("MagAgent initialized.")
async def __call__(self, question: str, tast_id) -> str:
"""Process a question asynchronously using the MagAgent."""
print(f"MagAgent received question (first 50 chars): {question[:50]}... Task ID: {task_id}")
try:
if self.rate_limiter:
while not self.rate_limiter.consume(1):
await asyncio.sleep(60 / RATE_LIMIT)
# Define a task with fallback search logic
task = (
f"Answer the following question accurately and concisely: {question}\n"
f"If the question references an attachment, use the download_file tool with task_id: {task_id}\n"
f"Return the answer as a string."
)
response = await asyncio.to_thread(
self.agent.run,
task=task
)
# Ensure response is a string, fixing the integer error
response = str(response)
if response is None:
print(f"No answer found.")
print(f"MagAgent response: {response[:50]}...")
return response
except Exception as e:
error_msg = f"Error processing question: {str(e)}. Check API key or network connectivity."
print(error_msg)
return error_msg