Spaces:
Sleeping
Sleeping
File size: 13,004 Bytes
a23082c 114747f a23082c b8f6b7f a23082c 68bd1d5 a23082c b8f6b7f a23082c b8f6b7f a23082c b8f6b7f a23082c b8f6b7f a23082c b8f6b7f a23082c 114747f a23082c b8f6b7f a23082c 68bd1d5 a23082c b8f6b7f 114747f b8f6b7f 68bd1d5 b8f6b7f a23082c b8f6b7f a23082c b8f6b7f 114747f b8f6b7f a23082c 114747f a23082c 68bd1d5 a23082c 114747f a23082c b8f6b7f a23082c b8f6b7f a23082c 114747f a23082c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 |
import os
import logging
from typing import List, Dict
from llama_index.core.agent.workflow import ReActAgent
from llama_index.core.tools import FunctionTool
from llama_index.llms.google_genai import GoogleGenAI
# Setup logging
logger = logging.getLogger(__name__)
# Helper function to load prompt from file
def load_prompt_from_file(filename: str, default_prompt: str) -> str:
"""Loads a prompt from a text file."""
try:
# Assuming the prompt file is in the same directory as the agent script
script_dir = os.path.dirname(__file__)
prompt_path = os.path.join(script_dir, filename)
with open(prompt_path, "r") as f:
prompt = f.read()
logger.info(f"Successfully loaded prompt from {prompt_path}")
return prompt
except FileNotFoundError:
logger.warning(f"Prompt file {filename} not found at {prompt_path}. Using default.")
return default_prompt
except Exception as e:
logger.error(f"Error loading prompt file {filename}: {e}", exc_info=True)
return default_prompt
# --- Tool Functions ---
def plan(objective: str) -> List[str]:
"""
Generate a list of sub-steps (4-8) from the given objective using an LLM.
Args:
objective (str): The research or task objective.
Returns:
List[str]: A list of sub-steps as strings, or an error message list.
"""
logger.info(f"Generating plan for objective: {objective[:100]}...")
# Configuration for planning LLM
planner_llm_model = os.getenv("PLANNER_TOOL_LLM_MODEL", "gemini-2.5-pro-preview-03-25") # Specific model for this tool?
gemini_api_key = os.getenv("GEMINI_API_KEY")
if not gemini_api_key:
logger.error("GEMINI_API_KEY not found for planning tool LLM.")
return "Error: GEMINI_API_KEY not set for planning."
# Prompt for the LLM to generate sub-steps
input_prompt = (
"You are a research assistant. "
"Given an objective, break it down into a list of 4-8 concise, actionable sub-steps. "
"Ensure the steps are logically ordered.\n"
f"Objective: {objective}\n"
"Sub-steps (one per line, numbered):"
)
try:
llm = GoogleGenAI(api_key=gemini_api_key, model="gemini-2.5-pro-preview-03-25", temperature=0.05)
logger.info(f"Using planning LLM: {planner_llm_model}")
response = llm.complete(input_prompt)
# Post-process: split lines into sub-steps, remove numbering if present
lines = response.text.strip().split("\n")
sub_steps = []
for line in lines:
line = line.strip()
if not line:
continue
# Remove potential leading numbering (e.g., "1. ", "- ")
if line and line[0].isdigit() and "." in line[:3]:
text = line.split(".", 1)[1].strip()
elif line.startswith("- "):
text = line[2:].strip()
else:
text = line
if text:
sub_steps.append(text)
if not sub_steps:
logger.warning("LLM generated no sub-steps for the objective.")
return "Error: Failed to generate sub-steps."
logger.info(f"Generated {len(sub_steps)} sub-steps.")
return sub_steps
except Exception as e:
logger.error(f"LLM call failed during planning: {e}", exc_info=True)
return f"Error during planning: {e}"
def synthesize_and_report(results: List[Dict[str, str]]) -> str:
"""
Aggregate results from sub-steps into a coherent final report using an LLM.
Args:
results (List[Dict[str, str]]): List of dictionaries, each with "sub_step" and "answer" keys.
Returns:
str: A unified, well-structured report, or an error message.
"""
logger.info(f"Synthesizing results from {len(results)} sub-steps...")
if not results:
logger.warning("Synthesize called with empty results list.")
return "No results provided to synthesize."
# Format the results for the synthesis prompt
summary_blocks = ""
for i, result in enumerate(results):
sub_step = result.get("sub_step", f"Step {i+1}")
answer = result.get("answer", "No answer provided.")
summary_blocks += f"Sub-step {i+1}: {sub_step}\nAnswer {i+1}: {answer}\n\n"
# Configuration for synthesis LLM
synthesizer_llm_model = os.getenv("SYNTHESIZER_LLM_MODEL", "gemini-2.5-pro-preview-03-25") # Specific model?
gemini_api_key = os.getenv("GEMINI_API_KEY")
if not gemini_api_key:
logger.error("GEMINI_API_KEY not found for synthesis tool LLM.")
return "Error: GEMINI_API_KEY not set for synthesis."
# Prompt for the LLM
input_prompt = f"""You are an expert synthesizer. Given the following sub-steps and their answers derived
from an initial objective, produce a single, coherent, comprehensive final report that
addresses the original objective:
--- SUB-STEP RESULTS ---
{summary_blocks.strip()}
--- END SUB-STEP RESULTS ---
Generate the Final Report:
"""
try:
llm = GoogleGenAI(api_key=gemini_api_key, model="gemini-2.5-pro-preview-03-25", temperature=0.05)
logger.info(f"Using synthesis LLM: {synthesizer_llm_model}")
response = llm.complete(input_prompt)
logger.info("Synthesis successful.")
return response.text
except Exception as e:
logger.error(f"LLM call failed during synthesis: {e}", exc_info=True)
return f"Error during synthesis: {e}"
def answer_question(question: str) -> str:
"""
Answer any question by following this strict format:
1. Include your chain of thought (your reasoning steps).
2. End your reply with the exact template:
FINAL ANSWER: [YOUR FINAL ANSWER]
YOUR FINAL ANSWER must be:
- A number, or
- As few words as possible, or
- A comma-separated list of numbers and/or strings.
Formatting rules:
* If asked for a number, do not use commas or units (e.g., $, %), unless explicitly requested.
* If asked for a string, do not include articles or abbreviations (e.g., city names), and write digits in plain text.
* If asked for a comma-separated list, apply the above rules to each element.
This tool should be invoked immediately after completing the final planning sub-step.
"""
logger.info(f"Answering question: {question[:100]}")
gemini_api_key = os.getenv("GEMINI_API_KEY")
if not gemini_api_key:
logger.error("GEMINI_API_KEY not set for answer_question tool.")
return "Error: GEMINI_API_KEY not set."
model_name = os.getenv("ANSWER_TOOL_LLM_MODEL", "gemini-2.5-pro-preview-03-25")
# Build the assistant prompt enforcing the required format
assistant_prompt = (
"You are a general AI assistant. I will ask you a question. "
"Report your thoughts, and finish your answer with the following template: "
"FINAL ANSWER: [YOUR FINAL ANSWER]. "
"YOUR FINAL ANSWER should be a number OR as few words as possible "
"OR a comma separated list of numbers and/or strings. "
"If you are asked for a number, don't use commas for thousands or any units like $ or % unless specified. "
"If you are asked for a string, omit articles and abbreviations, and write digits in plain text. "
"If you are asked for a comma separated list, apply these rules to each element.\n\n"
f"Question: {question}\n"
"Answer:"
)
try:
llm = GoogleGenAI(api_key=gemini_api_key, model="gemini-2.5-pro-preview-03-25", temperature=0.05)
logger.info(f"Using answer LLM: {model_name}")
response = llm.complete(assistant_prompt)
logger.info("Answer generated successfully.")
return response.text
except Exception as e:
logger.error(f"LLM call failed during answer generation: {e}", exc_info=True)
return f"Error during answer generation: {e}"
# --- Tool Definitions ---
synthesize_tool = FunctionTool.from_defaults(
fn=synthesize_and_report,
name="synthesize_and_report",
description=(
"Aggregates results from multiple sub-steps into a final coherent report. "
"Input: results (List[Dict[str, str]]) where each dict has \"sub_step\" and \"answer\". "
"Output: A unified report (str) or error message."
),
)
generate_substeps_tool = FunctionTool.from_defaults(
fn=plan,
name="generate_substeps",
description=(
"Decomposes a high-level objective into a concise roadmap of 4–8 actionable sub-steps using an LLM. "
"Input: objective (str). Output: List of sub-step strings (List[str]) or error list."
)
)
answer_question = FunctionTool.from_defaults(
fn=answer_question,
name="answer_question",
description=(
"Answers any question and returns the full text, always ending with "
"‘FINAL ANSWER: ...’ in accordance with the formatting rules."
),
)
# --- Agent Initialization ---
def initialize_planner_agent() -> ReActAgent:
"""Initializes the Planner Agent."""
logger.info("Initializing PlannerAgent...")
# Configuration for the agent's main LLM
agent_llm_model = os.getenv("PLANNER_AGENT_LLM_MODEL", "gemini-2.5-pro-preview-03-25")
gemini_api_key = os.getenv("GEMINI_API_KEY")
if not gemini_api_key:
logger.error("GEMINI_API_KEY not found for PlannerAgent.")
raise ValueError("GEMINI_API_KEY must be set for PlannerAgent")
try:
llm = GoogleGenAI(api_key=gemini_api_key, model="gemini-2.5-pro-preview-03-25", temperature=0.05)
logger.info(f"Using agent LLM: {agent_llm_model}")
# Load system prompt
default_system_prompt = ("You are PlannerAgent... [Default prompt content - replace with actual]" # Placeholder
)
system_prompt = load_prompt_from_file("../prompts/planner_agent_prompt.txt", default_system_prompt)
if system_prompt == default_system_prompt:
logger.warning("Using default/fallback system prompt for PlannerAgent.")
# Define available tools
tools = [generate_substeps_tool, synthesize_tool]
# Define valid handoff targets
valid_handoffs = [
"code_agent",
"research_agent",
"math_agent",
"role_agent",
"image_analyzer_agent",
"text_analyzer_agent",
"reasoning_agent",
"long_context_management_agent",
"advanced_validation_agent",
"video_analyzer_agent"
]
agent = ReActAgent(
name="planner_agent",
description=(
"Strategically plans tasks by breaking down objectives into sub-steps using `generate_substeps`. "
"Orchestrates execution by handing off sub-steps to specialized agents. "
"Synthesizes final results using `synthesize_and_report`."
),
tools=tools,
llm=llm,
system_prompt=system_prompt,
can_handoff_to=valid_handoffs,
)
logger.info("PlannerAgent initialized successfully.")
return agent
except Exception as e:
logger.error(f"Error during PlannerAgent initialization: {e}", exc_info=True)
raise
# Example usage (for testing if run directly)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger.info("Running planner_agent.py directly for testing...")
# Ensure API key is set
if not os.getenv("GEMINI_API_KEY"):
print("Error: GEMINI_API_KEY environment variable not set. Cannot run test.")
else:
try:
# Test plan generation
print("\nTesting plan generation...")
test_objective = "Analyze the market trends for electric vehicles in Europe for 2024."
substeps = plan(test_objective)
print(f"Generated Sub-steps:\n{substeps}")
# Test synthesis
print("\nTesting synthesis...")
test_results = [
{"sub_step": "Identify key EV manufacturers in Europe.", "answer": "Tesla, VW, Stellantis, Renault."},
{"sub_step": "Find recent sales data.", "answer": "EV sales grew 25% year-over-year in Q1 2024."},
{"sub_step": "Analyze government incentives.", "answer": "Germany reduced subsidies, France maintained them."}
]
report = synthesize_and_report(test_results)
print(f"Synthesized Report:\n{report}")
# Initialize the agent (optional)
# test_agent = initialize_planner_agent()
# print("\nPlanner Agent initialized successfully for testing.")
except Exception as e:
print(f"Error during testing: {e}")
|