File size: 13,004 Bytes
a23082c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
114747f
a23082c
 
 
b8f6b7f
a23082c
 
 
 
 
 
 
 
 
 
 
68bd1d5
a23082c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b8f6b7f
a23082c
 
b8f6b7f
a23082c
 
 
 
b8f6b7f
a23082c
b8f6b7f
a23082c
 
 
 
 
b8f6b7f
a23082c
 
 
 
 
 
 
 
 
 
 
 
 
 
114747f
a23082c
 
 
 
 
 
b8f6b7f
 
 
a23082c
 
 
 
 
 
 
 
 
68bd1d5
a23082c
 
 
 
 
 
 
 
b8f6b7f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
114747f
b8f6b7f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
68bd1d5
b8f6b7f
 
 
 
 
 
 
 
a23082c
 
b8f6b7f
 
a23082c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b8f6b7f
 
 
 
114747f
 
b8f6b7f
 
 
a23082c
 
 
 
 
 
114747f
a23082c
 
 
 
 
 
 
68bd1d5
a23082c
 
 
 
 
 
 
 
 
 
114747f
a23082c
 
 
 
 
 
 
 
 
b8f6b7f
 
 
 
a23082c
 
 
 
 
 
 
b8f6b7f
a23082c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
114747f
a23082c
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
import os
import logging
from typing import List, Dict

from llama_index.core.agent.workflow import ReActAgent
from llama_index.core.tools import FunctionTool
from llama_index.llms.google_genai import GoogleGenAI


# Setup logging
logger = logging.getLogger(__name__)

# Helper function to load prompt from file
def load_prompt_from_file(filename: str, default_prompt: str) -> str:
    """Loads a prompt from a text file."""
    try:
        # Assuming the prompt file is in the same directory as the agent script
        script_dir = os.path.dirname(__file__)
        prompt_path = os.path.join(script_dir, filename)
        with open(prompt_path, "r") as f:
            prompt = f.read()
            logger.info(f"Successfully loaded prompt from {prompt_path}")
            return prompt
    except FileNotFoundError:
        logger.warning(f"Prompt file {filename} not found at {prompt_path}. Using default.")
        return default_prompt
    except Exception as e:
        logger.error(f"Error loading prompt file {filename}: {e}", exc_info=True)
        return default_prompt

# --- Tool Functions ---

def plan(objective: str) -> List[str]:
    """
    Generate a list of sub-steps (4-8) from the given objective using an LLM.
    Args:
        objective (str): The research or task objective.
    Returns:
        List[str]: A list of sub-steps as strings, or an error message list.
    """
    logger.info(f"Generating plan for objective: {objective[:100]}...")
    
    # Configuration for planning LLM
    planner_llm_model = os.getenv("PLANNER_TOOL_LLM_MODEL", "gemini-2.5-pro-preview-03-25") # Specific model for this tool?
    gemini_api_key = os.getenv("GEMINI_API_KEY")
    if not gemini_api_key:
        logger.error("GEMINI_API_KEY not found for planning tool LLM.")
        return "Error: GEMINI_API_KEY not set for planning."

    # Prompt for the LLM to generate sub-steps
    input_prompt = (
        "You are a research assistant. "
        "Given an objective, break it down into a list of 4-8 concise, actionable sub-steps. "
        "Ensure the steps are logically ordered.\n"
        f"Objective: {objective}\n"
        "Sub-steps (one per line, numbered):"
    )

    try:
        llm = GoogleGenAI(api_key=gemini_api_key, model="gemini-2.5-pro-preview-03-25", temperature=0.05)
        logger.info(f"Using planning LLM: {planner_llm_model}")
        response = llm.complete(input_prompt)
        
        # Post-process: split lines into sub-steps, remove numbering if present
        lines = response.text.strip().split("\n")
        sub_steps = []
        for line in lines:
            line = line.strip()
            if not line:
                continue
            # Remove potential leading numbering (e.g., "1. ", "- ")
            if line and line[0].isdigit() and "." in line[:3]:
                text = line.split(".", 1)[1].strip()
            elif line.startswith("- "):
                 text = line[2:].strip()
            else:
                text = line
            
            if text:
                sub_steps.append(text)
                
        if not sub_steps:
             logger.warning("LLM generated no sub-steps for the objective.")
             return "Error: Failed to generate sub-steps."
             
        logger.info(f"Generated {len(sub_steps)} sub-steps.")

        return sub_steps
        
    except Exception as e:
        logger.error(f"LLM call failed during planning: {e}", exc_info=True)
        return f"Error during planning: {e}"

def synthesize_and_report(results: List[Dict[str, str]]) -> str:
    """
    Aggregate results from sub-steps into a coherent final report using an LLM.
    Args:
        results (List[Dict[str, str]]): List of dictionaries, each with "sub_step" and "answer" keys.
    Returns:
        str: A unified, well-structured report, or an error message.
    """
    logger.info(f"Synthesizing results from {len(results)} sub-steps...")
    if not results:
        logger.warning("Synthesize called with empty results list.")
        return "No results provided to synthesize."

    # Format the results for the synthesis prompt
    summary_blocks = ""
    for i, result in enumerate(results):
        sub_step = result.get("sub_step", f"Step {i+1}")
        answer = result.get("answer", "No answer provided.")
        summary_blocks += f"Sub-step {i+1}: {sub_step}\nAnswer {i+1}: {answer}\n\n"

    # Configuration for synthesis LLM
    synthesizer_llm_model = os.getenv("SYNTHESIZER_LLM_MODEL", "gemini-2.5-pro-preview-03-25") # Specific model?
    gemini_api_key = os.getenv("GEMINI_API_KEY")
    if not gemini_api_key:
        logger.error("GEMINI_API_KEY not found for synthesis tool LLM.")
        return "Error: GEMINI_API_KEY not set for synthesis."

    # Prompt for the LLM
    input_prompt = f"""You are an expert synthesizer. Given the following sub-steps and their answers derived 
    from an initial objective, produce a single, coherent, comprehensive final report that 
    addresses the original objective:

    --- SUB-STEP RESULTS ---
    {summary_blocks.strip()}
    --- END SUB-STEP RESULTS ---

    Generate the Final Report:
    """

    try:
        llm = GoogleGenAI(api_key=gemini_api_key, model="gemini-2.5-pro-preview-03-25", temperature=0.05)
        logger.info(f"Using synthesis LLM: {synthesizer_llm_model}")
        response = llm.complete(input_prompt)
        logger.info("Synthesis successful.")
        return response.text
    except Exception as e:
        logger.error(f"LLM call failed during synthesis: {e}", exc_info=True)
        return f"Error during synthesis: {e}"

def answer_question(question: str) -> str:
    """
    Answer any question by following this strict format:
      1. Include your chain of thought (your reasoning steps).
      2. End your reply with the exact template:
         FINAL ANSWER: [YOUR FINAL ANSWER]
    YOUR FINAL ANSWER must be:
      - A number, or
      - As few words as possible, or
      - A comma-separated list of numbers and/or strings.
    Formatting rules:
      * If asked for a number, do not use commas or units (e.g., $, %), unless explicitly requested.
      * If asked for a string, do not include articles or abbreviations (e.g., city names), and write digits in plain text.
      * If asked for a comma-separated list, apply the above rules to each element.
    This tool should be invoked immediately after completing the final planning sub-step.
    """
    logger.info(f"Answering question: {question[:100]}")

    gemini_api_key = os.getenv("GEMINI_API_KEY")
    if not gemini_api_key:
        logger.error("GEMINI_API_KEY not set for answer_question tool.")
        return "Error: GEMINI_API_KEY not set."

    model_name = os.getenv("ANSWER_TOOL_LLM_MODEL", "gemini-2.5-pro-preview-03-25")

    # Build the assistant prompt enforcing the required format
    assistant_prompt = (
        "You are a general AI assistant. I will ask you a question. "
        "Report your thoughts, and finish your answer with the following template: "
        "FINAL ANSWER: [YOUR FINAL ANSWER]. "
        "YOUR FINAL ANSWER should be a number OR as few words as possible "
        "OR a comma separated list of numbers and/or strings. "
        "If you are asked for a number, don't use commas for thousands or any units like $ or % unless specified. "
        "If you are asked for a string, omit articles and abbreviations, and write digits in plain text. "
        "If you are asked for a comma separated list, apply these rules to each element.\n\n"
        f"Question: {question}\n"
        "Answer:"
    )

    try:
        llm = GoogleGenAI(api_key=gemini_api_key, model="gemini-2.5-pro-preview-03-25", temperature=0.05)
        logger.info(f"Using answer LLM: {model_name}")
        response = llm.complete(assistant_prompt)
        logger.info("Answer generated successfully.")
        return response.text
    except Exception as e:
        logger.error(f"LLM call failed during answer generation: {e}", exc_info=True)
        return f"Error during answer generation: {e}"

# --- Tool Definitions ---
synthesize_tool = FunctionTool.from_defaults(
    fn=synthesize_and_report,
    name="synthesize_and_report",
    description=(
        "Aggregates results from multiple sub-steps into a final coherent report. "
        "Input: results (List[Dict[str, str]]) where each dict has \"sub_step\" and \"answer\". "
        "Output: A unified report (str) or error message."
    ),
)

generate_substeps_tool = FunctionTool.from_defaults(
    fn=plan,
    name="generate_substeps",
    description=(
        "Decomposes a high-level objective into a concise roadmap of 4–8 actionable sub-steps using an LLM. "
        "Input: objective (str). Output: List of sub-step strings (List[str]) or error list."
    )
)

answer_question = FunctionTool.from_defaults(
    fn=answer_question,
    name="answer_question",
    description=(
        "Answers any question and returns the full text, always ending with "
        "‘FINAL ANSWER: ...’ in accordance with the formatting rules."
    ),
)

# --- Agent Initialization ---
def initialize_planner_agent() -> ReActAgent:
    """Initializes the Planner Agent."""
    logger.info("Initializing PlannerAgent...")

    # Configuration for the agent's main LLM
    agent_llm_model = os.getenv("PLANNER_AGENT_LLM_MODEL", "gemini-2.5-pro-preview-03-25")
    gemini_api_key = os.getenv("GEMINI_API_KEY")

    if not gemini_api_key:
        logger.error("GEMINI_API_KEY not found for PlannerAgent.")
        raise ValueError("GEMINI_API_KEY must be set for PlannerAgent")

    try:
        llm = GoogleGenAI(api_key=gemini_api_key, model="gemini-2.5-pro-preview-03-25", temperature=0.05)
        logger.info(f"Using agent LLM: {agent_llm_model}")

        # Load system prompt
        default_system_prompt = ("You are PlannerAgent... [Default prompt content - replace with actual]" # Placeholder
                              )
        system_prompt = load_prompt_from_file("../prompts/planner_agent_prompt.txt", default_system_prompt)
        if system_prompt == default_system_prompt:
             logger.warning("Using default/fallback system prompt for PlannerAgent.")

        # Define available tools
        tools = [generate_substeps_tool, synthesize_tool]

        # Define valid handoff targets
        valid_handoffs = [
            "code_agent",
            "research_agent",
            "math_agent",
            "role_agent",
            "image_analyzer_agent",
            "text_analyzer_agent",
            "reasoning_agent",
            "long_context_management_agent",
            "advanced_validation_agent",
            "video_analyzer_agent"
        ]

        agent = ReActAgent(
            name="planner_agent",
            description=(
                "Strategically plans tasks by breaking down objectives into sub-steps using `generate_substeps`. "
                "Orchestrates execution by handing off sub-steps to specialized agents. "
                "Synthesizes final results using `synthesize_and_report`."
            ),
            tools=tools,
            llm=llm,
            system_prompt=system_prompt,
            can_handoff_to=valid_handoffs,
        )
        logger.info("PlannerAgent initialized successfully.")
        return agent

    except Exception as e:
        logger.error(f"Error during PlannerAgent initialization: {e}", exc_info=True)
        raise

# Example usage (for testing if run directly)
if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    logger.info("Running planner_agent.py directly for testing...")

    # Ensure API key is set
    if not os.getenv("GEMINI_API_KEY"):
        print("Error: GEMINI_API_KEY environment variable not set. Cannot run test.")
    else:
        try:
            # Test plan generation
            print("\nTesting plan generation...")
            test_objective = "Analyze the market trends for electric vehicles in Europe for 2024."
            substeps = plan(test_objective)
            print(f"Generated Sub-steps:\n{substeps}")

            # Test synthesis
            print("\nTesting synthesis...")
            test_results = [
                {"sub_step": "Identify key EV manufacturers in Europe.", "answer": "Tesla, VW, Stellantis, Renault."}, 
                {"sub_step": "Find recent sales data.", "answer": "EV sales grew 25% year-over-year in Q1 2024."}, 
                {"sub_step": "Analyze government incentives.", "answer": "Germany reduced subsidies, France maintained them."}
            ]
            report = synthesize_and_report(test_results)
            print(f"Synthesized Report:\n{report}")

            # Initialize the agent (optional)
            # test_agent = initialize_planner_agent()
            # print("\nPlanner Agent initialized successfully for testing.")

        except Exception as e:
            print(f"Error during testing: {e}")