import os import sys import asyncio import logging import threading import queue import gradio as gr import httpx from typing import Generator, Any, Dict, List logger = logging.getLogger(__name__) class Agent(ABC): @abstractmethod async def generate_response(self, prompt: str, api_key: str) -> str: pass class PromptOptimizerAgent(Agent): async def generate_response(self, prompt: str, api_key: str) -> str: system_prompt = ( "Given the user's initial prompt below the ### characters please enhance it. " "1. Start with clear, precise instructions placed at the beginning of the prompt. " "2. Include specific details about the desired context, outcome, length, format, and style. " "3. Provide examples of the desired output format, if possible. " "4. Use appropriate leading words or phrases to guide the desired output, especially if code generation is involved. " "5. Avoid any vague or imprecise language. " "6. Rather than only stating what not to do, provide guidance on what should be done instead. " "Remember to ensure the revised prompt remains true to the user's original intent. " ###User initial prompt### ) return await call_openai(system_prompt, api_key) class OrchestratorAgent(Agent): async def generate_response(self, task_message: str, api_key: str) -> str: plan = f"You are an orchestrator agent. The user has provided the task: '{task_message}'. Generate a detailed, step-by-step plan for completing this task by coordinating a coder agent, a code reviewer agent, and a documentation agent. List the steps as bullet points." return await call_openai(plan, api_key) class CoderAgent(Agent): async def generate_response(self, instructions: str, api_key: str) -> str: prompt = f"Implement the task as described in the following plan:\n{instructions}" return await call_openai(prompt, api_key) class CodeReviewerAgent(Agent): async def generate_response(self, code: str, task: str, api_key: str) -> str: feedback = await call_openai( f"You are a code reviewer agent. Review the provided code: '{code}' and check if it meets the task specifications.", api_key=api_key ) return feedback class DocumentationAgent(Agent): async def generate_response(self, code: str, api_key: str) -> str: prompt = f"You are a documentation agent. Generate a brief documentation for the code:\nCode:\n{code}" return await call_openai(prompt, api_key) async def process_conversation_generator(conversation: list, log_q: queue.Queue, api_key: str) -> Generator[str, None, None]: try: while True: if not conversation or not log_q.get(timeout=0.1): continue msg = log_q.get(timeout=0.1) if isinstance(msg, tuple) and msg[0] == "result": final_result = msg[1] break yield msg except asyncio.CancelledError: pass finally: if log_q.empty(): log_q.put("Final conversation complete.") async def multi_agent_conversation( task_message: str, log_q: queue.Queue, api_key: str, additional_inputs=None ) -> None: if additional_inputs is None: additional_inputs = [gr.Textbox(label="OpenAI API Key (optional)")] agents = [ PromptOptimizerAgent(), OrchestratorAgent(), CoderAgent(), CodeReviewerAgent(), DocumentationAgent() ] log_queue = queue.Queue() run_conversation = None async def run_conversation_thread() -> None: nonlocal run_conversation try: if run_conversation is not None: await run_conversation except asyncio.CancelledError: pass thread = asyncio.to_thread(run_conversation_thread) thread.start() try: conversation = [] log_q.put("[Prompt Optimizer]: Received initial task. Optimizing prompt...") # Step 0: Use Prompt Optimizer optimized_task = await agents[0].generate_response(task_message, api_key) conversation.append({"agent": "Prompt Optimizer", "message": f"Optimized Task:\n{optimized_task}"}) log_q.put(f"[Prompt Optimizer]: Optimized Task:\n{optimized_task}") # Step 1: Generate Plan plan = await agents[1].generate_response(optimized_task, api_key) conversation.append({"agent": "Orchestrator", "message": f"Plan:\n{plan}"}) log_q.put(f"[Orchestrator]: Plan generated:\n{plan}") # Step 2: Generate Code code = await agents[2].generate_response(plan, api_key) conversation.append({"agent": "Coder", "message": f"Code:\n{code}"}) log_q.put(f"[Coder]: Code generated:\n{code}") # Step 3: Code Review code_review = None iteration = 0 while True: if iteration >= 5: log_q.put("[Code Reviewer]: Code not approved after 5 iterations: terminating.") break code_review = await agents[3].generate_response(code, plan, api_key) revised_code = await agents[2].generate_response( f"Please revise the code according to the following feedback: {code_review}", api_key ) code = revised_code iteration += 1 if code == revised_code: break log_q.put(f"[Code Reviewer]: Feedback received:\n{code_review}") log_q.put(f"[Code Reviewer]: Revised code:\n{revised_code}") # Step 4: Documentation doc = await agents[4].generate_response(code, api_key) conversation.append({"agent": "Documentation Agent", "message": f"Documentation:\n{doc}"}) log_q.put(f"[Documentation Agent]: Documentation generated:\n{doc}") except Exception as e: log_q.put(f"[All Agents]: An error occurred: {str(e)}") logger.error(f"Error in multi_agent_conversation: {str(e)}") finally: thread.join() async def multi_agent_conversation_wrapper(task_message: str, api_key: str) -> None: await multi_agent_conversation( task_message, log_q=queue.Queue(), api_key=api_key, additional_inputs=[gr.Textbox(label="OpenAI API Key (optional)") if api_key is None else None] ) if __name__ == "__main__": iface = gr.ChatInterface( fn=multi_agent_conversation_wrapper, additional_inputs=[gr.Textbox(label="OpenAI API Key (optional)")], type="messages", title="Actual Multi-Agent Conversation Chatbot", description=""" - Collaborative workflow between Prompt Enhancer, Orchestrator, Coder, Code-Reviewer and Documentation Agent agents. - Enter a task description to observe the iterative workflow between the agents. - NOTE: The kill-switch mechanism will terminate after five code rejection iterations to prevent endless loops. - NOTE3: You can input your OPENAI_API_KEY at the bottom of the page for this to work! """, ) iface.launch()