Spaces:
Sleeping
Sleeping
import os | |
import logging | |
from typing import List, Dict | |
from llama_index.core.agent.workflow import ReActAgent | |
from llama_index.core.tools import FunctionTool | |
from llama_index.llms.google_genai import GoogleGenAI | |
# Setup logging | |
logger = logging.getLogger(__name__) | |
# Helper function to load prompt from file | |
def load_prompt_from_file(filename: str, default_prompt: str) -> str: | |
"""Loads a prompt from a text file.""" | |
try: | |
# Assuming the prompt file is in the same directory as the agent script | |
script_dir = os.path.dirname(__file__) | |
prompt_path = os.path.join(script_dir, filename) | |
with open(prompt_path, "r") as f: | |
prompt = f.read() | |
logger.info(f"Successfully loaded prompt from {prompt_path}") | |
return prompt | |
except FileNotFoundError: | |
logger.warning(f"Prompt file {filename} not found at {prompt_path}. Using default.") | |
return default_prompt | |
except Exception as e: | |
logger.error(f"Error loading prompt file {filename}: {e}", exc_info=True) | |
return default_prompt | |
# --- Tool Functions --- | |
def plan(objective: str) -> List[str]: | |
""" | |
Generate a list of sub-steps (4-8) from the given objective using an LLM. | |
Args: | |
objective (str): The research or task objective. | |
Returns: | |
List[str]: A list of sub-steps as strings, or an error message list. | |
""" | |
logger.info(f"Generating plan for objective: {objective[:100]}...") | |
# Configuration for planning LLM | |
planner_llm_model = os.getenv("PLANNER_TOOL_LLM_MODEL", "gemini-2.5-pro-preview-03-25") # Specific model for this tool? | |
gemini_api_key = os.getenv("GEMINI_API_KEY") | |
if not gemini_api_key: | |
logger.error("GEMINI_API_KEY not found for planning tool LLM.") | |
return "Error: GEMINI_API_KEY not set for planning." | |
# Prompt for the LLM to generate sub-steps | |
input_prompt = ( | |
"You are a research assistant. " | |
"Given an objective, break it down into a list of 4-8 concise, actionable sub-steps. " | |
"Ensure the steps are logically ordered.\n" | |
f"Objective: {objective}\n" | |
"Sub-steps (one per line, numbered):" | |
) | |
try: | |
llm = GoogleGenAI(api_key=gemini_api_key, model="gemini-2.5-pro-preview-03-25", temperature=0.05) | |
logger.info(f"Using planning LLM: {planner_llm_model}") | |
response = llm.complete(input_prompt) | |
# Post-process: split lines into sub-steps, remove numbering if present | |
lines = response.text.strip().split("\n") | |
sub_steps = [] | |
for line in lines: | |
line = line.strip() | |
if not line: | |
continue | |
# Remove potential leading numbering (e.g., "1. ", "- ") | |
if line and line[0].isdigit() and "." in line[:3]: | |
text = line.split(".", 1)[1].strip() | |
elif line.startswith("- "): | |
text = line[2:].strip() | |
else: | |
text = line | |
if text: | |
sub_steps.append(text) | |
if not sub_steps: | |
logger.warning("LLM generated no sub-steps for the objective.") | |
return "Error: Failed to generate sub-steps." | |
logger.info(f"Generated {len(sub_steps)} sub-steps.") | |
return sub_steps | |
except Exception as e: | |
logger.error(f"LLM call failed during planning: {e}", exc_info=True) | |
return f"Error during planning: {e}" | |
def synthesize_and_report(results: List[Dict[str, str]]) -> str: | |
""" | |
Aggregate results from sub-steps into a coherent final report using an LLM. | |
Args: | |
results (List[Dict[str, str]]): List of dictionaries, each with "sub_step" and "answer" keys. | |
Returns: | |
str: A unified, well-structured report, or an error message. | |
""" | |
logger.info(f"Synthesizing results from {len(results)} sub-steps...") | |
if not results: | |
logger.warning("Synthesize called with empty results list.") | |
return "No results provided to synthesize." | |
# Format the results for the synthesis prompt | |
summary_blocks = "" | |
for i, result in enumerate(results): | |
sub_step = result.get("sub_step", f"Step {i+1}") | |
answer = result.get("answer", "No answer provided.") | |
summary_blocks += f"Sub-step {i+1}: {sub_step}\nAnswer {i+1}: {answer}\n\n" | |
# Configuration for synthesis LLM | |
synthesizer_llm_model = os.getenv("SYNTHESIZER_LLM_MODEL", "gemini-2.5-pro-preview-03-25") # Specific model? | |
gemini_api_key = os.getenv("GEMINI_API_KEY") | |
if not gemini_api_key: | |
logger.error("GEMINI_API_KEY not found for synthesis tool LLM.") | |
return "Error: GEMINI_API_KEY not set for synthesis." | |
# Prompt for the LLM | |
input_prompt = f"""You are an expert synthesizer. Given the following sub-steps and their answers derived | |
from an initial objective, produce a single, coherent, comprehensive final report that | |
addresses the original objective: | |
--- SUB-STEP RESULTS --- | |
{summary_blocks.strip()} | |
--- END SUB-STEP RESULTS --- | |
Generate the Final Report: | |
""" | |
try: | |
llm = GoogleGenAI(api_key=gemini_api_key, model="gemini-2.5-pro-preview-03-25", temperature=0.05) | |
logger.info(f"Using synthesis LLM: {synthesizer_llm_model}") | |
response = llm.complete(input_prompt) | |
logger.info("Synthesis successful.") | |
return response.text | |
except Exception as e: | |
logger.error(f"LLM call failed during synthesis: {e}", exc_info=True) | |
return f"Error during synthesis: {e}" | |
def answer_question(question: str) -> str: | |
""" | |
Answer any question by following this strict format: | |
1. Include your chain of thought (your reasoning steps). | |
2. End your reply with the exact template: | |
FINAL ANSWER: [YOUR FINAL ANSWER] | |
YOUR FINAL ANSWER must be: | |
- A number, or | |
- As few words as possible, or | |
- A comma-separated list of numbers and/or strings. | |
Formatting rules: | |
* If asked for a number, do not use commas or units (e.g., $, %), unless explicitly requested. | |
* If asked for a string, do not include articles or abbreviations (e.g., city names), and write digits in plain text. | |
* If asked for a comma-separated list, apply the above rules to each element. | |
This tool should be invoked immediately after completing the final planning sub-step. | |
""" | |
logger.info(f"Answering question: {question[:100]}") | |
gemini_api_key = os.getenv("GEMINI_API_KEY") | |
if not gemini_api_key: | |
logger.error("GEMINI_API_KEY not set for answer_question tool.") | |
return "Error: GEMINI_API_KEY not set." | |
model_name = os.getenv("ANSWER_TOOL_LLM_MODEL", "gemini-2.5-pro-preview-03-25") | |
# Build the assistant prompt enforcing the required format | |
assistant_prompt = ( | |
"You are a general AI assistant. I will ask you a question. " | |
"Report your thoughts, and finish your answer with the following template: " | |
"FINAL ANSWER: [YOUR FINAL ANSWER]. " | |
"YOUR FINAL ANSWER should be a number OR as few words as possible " | |
"OR a comma separated list of numbers and/or strings. " | |
"If you are asked for a number, don't use commas for thousands or any units like $ or % unless specified. " | |
"If you are asked for a string, omit articles and abbreviations, and write digits in plain text. " | |
"If you are asked for a comma separated list, apply these rules to each element.\n\n" | |
f"Question: {question}\n" | |
"Answer:" | |
) | |
try: | |
llm = GoogleGenAI(api_key=gemini_api_key, model="gemini-2.5-pro-preview-03-25", temperature=0.05) | |
logger.info(f"Using answer LLM: {model_name}") | |
response = llm.complete(assistant_prompt) | |
logger.info("Answer generated successfully.") | |
return response.text | |
except Exception as e: | |
logger.error(f"LLM call failed during answer generation: {e}", exc_info=True) | |
return f"Error during answer generation: {e}" | |
# --- Tool Definitions --- | |
synthesize_tool = FunctionTool.from_defaults( | |
fn=synthesize_and_report, | |
name="synthesize_and_report", | |
description=( | |
"Aggregates results from multiple sub-steps into a final coherent report. " | |
"Input: results (List[Dict[str, str]]) where each dict has \"sub_step\" and \"answer\". " | |
"Output: A unified report (str) or error message." | |
), | |
) | |
generate_substeps_tool = FunctionTool.from_defaults( | |
fn=plan, | |
name="generate_substeps", | |
description=( | |
"Decomposes a high-level objective into a concise roadmap of 4–8 actionable sub-steps using an LLM. " | |
"Input: objective (str). Output: List of sub-step strings (List[str]) or error list." | |
) | |
) | |
answer_question = FunctionTool.from_defaults( | |
fn=answer_question, | |
name="answer_question", | |
description=( | |
"Answers any question and returns the full text, always ending with " | |
"‘FINAL ANSWER: ...’ in accordance with the formatting rules." | |
), | |
) | |
# --- Agent Initialization --- | |
def initialize_planner_agent() -> ReActAgent: | |
"""Initializes the Planner Agent.""" | |
logger.info("Initializing PlannerAgent...") | |
# Configuration for the agent's main LLM | |
agent_llm_model = os.getenv("PLANNER_AGENT_LLM_MODEL", "gemini-2.5-pro-preview-03-25") | |
gemini_api_key = os.getenv("GEMINI_API_KEY") | |
if not gemini_api_key: | |
logger.error("GEMINI_API_KEY not found for PlannerAgent.") | |
raise ValueError("GEMINI_API_KEY must be set for PlannerAgent") | |
try: | |
llm = GoogleGenAI(api_key=gemini_api_key, model="gemini-2.5-pro-preview-03-25", temperature=0.05) | |
logger.info(f"Using agent LLM: {agent_llm_model}") | |
# Load system prompt | |
default_system_prompt = ("You are PlannerAgent... [Default prompt content - replace with actual]" # Placeholder | |
) | |
system_prompt = load_prompt_from_file("../prompts/planner_agent_prompt.txt", default_system_prompt) | |
if system_prompt == default_system_prompt: | |
logger.warning("Using default/fallback system prompt for PlannerAgent.") | |
# Define available tools | |
tools = [generate_substeps_tool, synthesize_tool] | |
# Define valid handoff targets | |
valid_handoffs = [ | |
"code_agent", | |
"research_agent", | |
"math_agent", | |
"role_agent", | |
"image_analyzer_agent", | |
"text_analyzer_agent", | |
"reasoning_agent", | |
"long_context_management_agent", | |
"advanced_validation_agent", | |
"video_analyzer_agent" | |
] | |
agent = ReActAgent( | |
name="planner_agent", | |
description=( | |
"Strategically plans tasks by breaking down objectives into sub-steps using `generate_substeps`. " | |
"Orchestrates execution by handing off sub-steps to specialized agents. " | |
"Synthesizes final results using `synthesize_and_report`." | |
), | |
tools=tools, | |
llm=llm, | |
system_prompt=system_prompt, | |
can_handoff_to=valid_handoffs, | |
) | |
logger.info("PlannerAgent initialized successfully.") | |
return agent | |
except Exception as e: | |
logger.error(f"Error during PlannerAgent initialization: {e}", exc_info=True) | |
raise | |
# Example usage (for testing if run directly) | |
if __name__ == "__main__": | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger.info("Running planner_agent.py directly for testing...") | |
# Ensure API key is set | |
if not os.getenv("GEMINI_API_KEY"): | |
print("Error: GEMINI_API_KEY environment variable not set. Cannot run test.") | |
else: | |
try: | |
# Test plan generation | |
print("\nTesting plan generation...") | |
test_objective = "Analyze the market trends for electric vehicles in Europe for 2024." | |
substeps = plan(test_objective) | |
print(f"Generated Sub-steps:\n{substeps}") | |
# Test synthesis | |
print("\nTesting synthesis...") | |
test_results = [ | |
{"sub_step": "Identify key EV manufacturers in Europe.", "answer": "Tesla, VW, Stellantis, Renault."}, | |
{"sub_step": "Find recent sales data.", "answer": "EV sales grew 25% year-over-year in Q1 2024."}, | |
{"sub_step": "Analyze government incentives.", "answer": "Germany reduced subsidies, France maintained them."} | |
] | |
report = synthesize_and_report(test_results) | |
print(f"Synthesized Report:\n{report}") | |
# Initialize the agent (optional) | |
# test_agent = initialize_planner_agent() | |
# print("\nPlanner Agent initialized successfully for testing.") | |
except Exception as e: | |
print(f"Error during testing: {e}") | |