from smolagents import CodeAgent, LiteLLMModel, tool, Tool, load_tool, DuckDuckGoSearchTool, WikipediaSearchTool #, HfApiModel, OpenAIServerModel import asyncio import os import re import pandas as pd from typing import Optional from token_bucket import Limiter import yaml from PIL import Image import requests from io import BytesIO from markdownify import markdownify import whisper # Simulated additional tools (implementation depends on external APIs or setup) #@tool #def GoogleSearchTool(query: str) -> str: # """Tool for performing Google searches using Custom Search JSON API # Args: # query (str): Search query string # Returns: # str: Formatted search results # """ # cse_id = os.environ.get("GOOGLE_CSE_ID") # if not api_key or not cse_id: # raise ValueError("GOOGLE_API_KEY and GOOGLE_CSE_ID must be set in environment variables.") # url = "https://www.googleapis.com/customsearch/v1" # params = { # "key": api_key, # "cx": cse_id, # "q": query, # "num": 5 # Number of results to return # } # try: # response = requests.get(url, params=params) # response.raise_for_status() # results = response.json().get("items", []) # return "\n".join([f"{item['title']}: {item['link']}" for item in results]) or "No results found." # except Exception as e: # return f"Error performing Google search: {str(e)}" #@tool #def ImageAnalysisTool(question: str, model: LiteLLMModel) -> str: # """Tool for analyzing images mentioned in the question. # Args: # question (str): The question text which may contain an image URL. # Returns: # str: Image description or error message. # """ # # Extract URL from question using regex # url_pattern = r'https?://\S+' # match = re.search(url_pattern, question) # if not match: # return "No image URL found in the question." # image_url = match.group(0) # # headers = { # "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36" # } # try: # response = requests.get(image_url, headers=headers) # response.raise_for_status() # image = Image.open(BytesIO(response.content)).convert("RGB") # except Exception as e: # return f"Error fetching image: {e}" # # agent = CodeAgent( # tools=[], # model=model, # max_steps=10, # verbosity_level=2 # ) # # response = agent.run( # "Describe in details the chess position you see in the image.", # images=[image] # ) # # return f"The image description: '{response}'" class VisitWebpageTool(Tool): name = "visit_webpage" description = "Visits a webpage at the given url and reads its content as a markdown string. Use this to browse webpages." inputs = {'url': {'type': 'string', 'description': 'The url of the webpage to visit.'}} output_type = "string" def forward(self, url: str) -> str: try: import requests from markdownify import markdownify from requests.exceptions import RequestException from smolagents.utils import truncate_content except ImportError as e: raise ImportError( "You must install packages `markdownify` and `requests` to run this tool: for instance run `pip install markdownify requests`." ) from e try: # Send a GET request to the URL with a 20-second timeout response = requests.get(url, timeout=20) response.raise_for_status() # Raise an exception for bad status codes # Convert the HTML content to Markdown markdown_content = markdownify(response.text).strip() # Remove multiple line breaks markdown_content = re.sub(r"\n{3,}", "\n\n", markdown_content) return truncate_content(markdown_content, 10000) except requests.exceptions.Timeout: return "The request timed out. Please try again later or check the URL." except RequestException as e: return f"Error fetching the webpage: {str(e)}" except Exception as e: return f"An unexpected error occurred: {str(e)}" def __init__(self, *args, **kwargs): self.is_initialized = False class DownloadTaskAttachmentTool(Tool): name = "download_file" description = "Downloads the file attached to the task ID" inputs = {'task_id': {'type': 'string', 'description': 'The task id to download attachment from.'}} output_type = "string" def forward(self, task_id: str) -> str: """ Downloads a file associated with the given task ID. Returns the file path where the file is saved locally. """ file_url = f"{DEFAULT_API_URL}/files/{task_id}" local_file_path = f"downloads/{task_id}.file" print(f"Downloading file for task ID {task_id} from {file_url}...") try: file_url = f"{DEFAULT_API_URL}/files/{task_id}" local_path = f"downloads/{task_id}.xlsx" response = requests.get(file_url, stream=True, timeout=15) response.raise_for_status() os.makedirs("downloads", exist_ok=True) with open(local_file_path, "wb") as file: for chunk in response.iter_content(chunk_size=8192): file.write(chunk) print(f"File downloaded successfully: {local_file_path}") return local_file_path except requests.exceptions.RequestException as e: print(f"Error downloading file for task {task_id}: {e}") raise def __init__(self, *args, **kwargs): self.is_initialized = False @tool def SpeechToTextTool(audio_path: str) -> str: """Tool for converting an audio file to text using OpenAI Whisper. Args: audio_path (str): Path to audio file Returns: str: audio speech text """ model = whisper.load_model("base") if not os.path.exists(audio_path): return f"Error: File not found at {audio_path}" result = model.transcribe(audio_path) return result.get("text", "") class ExcelReaderTool(Tool): name = "excel_reader" description = """ This tool reads and processes Excel files (.xlsx, .xls). It can extract data, calculate statistics, and perform data analysis on spreadsheets. """ inputs = { "excel_path": { "type": "string", "description": "The path to the Excel file to read", }, "sheet_name": { "type": "string", "description": "The name of the sheet to read (optional, defaults to first sheet)", "nullable": True } } output_type = "string" def forward(self, excel_path: str, sheet_name: str = None) -> str: """ Reads and processes the given Excel file. """ try: # Check if the file exists if not os.path.exists(excel_path): return f"Error: Excel file not found at {excel_path}" import pandas as pd # Read the Excel file if sheet_name: df = pd.read_excel(excel_path, sheet_name=sheet_name) else: df = pd.read_excel(excel_path) # Get basic info about the data info = { "shape": df.shape, "columns": list(df.columns), "dtypes": df.dtypes.to_dict(), "head": df.head(5).to_dict() } # Return formatted info result = f"Excel file: {excel_path}\n" result += f"Shape: {info['shape'][0]} rows × {info['shape'][1]} columns\n\n" result += "Columns:\n" for col in info['columns']: result += f"- {col} ({info['dtypes'].get(col)})\n" result += "\nPreview (first 5 rows):\n" result += df.head(5).to_string() return result except Exception as e: return f"Error reading Excel file: {str(e)}" #@tool #class LocalFileAudioTool: # """Tool for transcribing audio files""" # # @tool # def transcribe(self, file_path: str) -> str: # """Transcribe audio from file # Args: # file_path (str): Path to audio file # Returns: # str: Transcription text # """ # return f"Transcribed audio from '{file_path}' (simulated)." class MagAgent: def __init__(self, rate_limiter: Optional[Limiter] = None): """Initialize the MagAgent with search tools.""" self.rate_limiter = rate_limiter print("Initializing MagAgent with search tools...") model = LiteLLMModel( model_id="gemini/gemini-2.0-flash", api_key= os.environ.get("GEMINI_KEY"), max_tokens=8192 ) # Load prompt templates with open("prompts.yaml", 'r') as stream: prompt_templates = yaml.safe_load(stream) self.agent = CodeAgent( model= model, tools=[ # GoogleSearchTool, DownloadTaskAttachmentTool(), DuckDuckGoSearchTool(), WikipediaSearchTool(), # ImageAnalysisTool, SpeechToTextTool, ExcelReaderTool() # LocalFileAudioTool() ], verbosity_level=2, add_base_tools=True, max_steps=20 ) print("MagAgent initialized.") async def __call__(self, question: str, tast_id) -> str: """Process a question asynchronously using the MagAgent.""" print(f"MagAgent received question (first 50 chars): {question[:50]}... Task ID: {task_id}") try: if self.rate_limiter: while not self.rate_limiter.consume(1): await asyncio.sleep(60 / RATE_LIMIT) # Define a task with fallback search logic task = ( f"Answer the following question accurately and concisely: {question}\n" f"If the question references an attachment, use the download_file tool with task_id: {task_id}\n" f"Return the answer as a string." ) response = await asyncio.to_thread( self.agent.run, task=task ) # Ensure response is a string, fixing the integer error response = str(response) if response is None: print(f"No answer found.") print(f"MagAgent response: {response[:50]}...") return response except Exception as e: error_msg = f"Error processing question: {str(e)}. Check API key or network connectivity." print(error_msg) return error_msg