CultriX's picture
Update app.py
69af1c5 verified
raw
history blame
16 kB
import os
import sys
import asyncio
import logging
import threading
import queue
import gradio as gr
import httpx
from typing import Generator, Any, Dict, List, Optional
# -------------------- Configuration --------------------
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
# -------------------- External Model Call --------------------
async def call_model(prompt: str, model: str = "gpt-4o", api_key: str = None) -> str:
"""
Sends a prompt to the OpenAI API endpoint.
"""
if api_key is None:
api_key = os.getenv("OPENAI_API_KEY")
if api_key is None:
raise ValueError("OpenAI API key not found.")
url = "https://api.openai.com/v1/chat/completions"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
payload = {
"model": model,
"messages": [{"role": "user", "content": prompt}],
}
async with httpx.AsyncClient(timeout=httpx.Timeout(300.0)) as client:
response = await client.post(url, headers=headers, json=payload)
response.raise_for_status()
response_json = response.json()
return response_json["choices"][0]["message"]["content"]
# -------------------- Agent Classes --------------------
class PromptOptimizerAgent:
async def optimize_prompt(self, user_prompt: str, api_key: str) -> str:
"""Optimizes the user's initial prompt."""
system_prompt = (
"You are a prompt optimization expert. Improve the given user prompt. "
"Be clear, specific, and complete. Maintain the user's original intent."
"Return ONLY the revised prompt."
)
full_prompt = f"{system_prompt}\n\nUser's initial prompt:\n{user_prompt}"
optimized = await call_model(full_prompt, model="gpt-4o", api_key=api_key)
return optimized
class OrchestratorAgent:
def __init__(self, log_queue: queue.Queue, human_in_the_loop_event: threading.Event, human_input_queue: queue.Queue) -> None:
self.log_queue = log_queue
self.human_in_the_loop_event = human_in_the_loop_event
self.human_input_queue = human_input_queue
async def generate_plan(self, task: str, api_key: str, human_feedback: Optional[str] = None) -> str:
"""
Generates a plan, potentially requesting human feedback.
"""
if human_feedback: # Use human feedback if provided
prompt = (
f"You are a master planner. You previously generated a partial plan for the task: '{task}'.\n"
"You requested human feedback, and here's the feedback you received:\n"
f"{human_feedback}\n\n"
"Now, complete or revise the plan, incorporating the human feedback. "
"Output the plan as a numbered list."
)
plan = await call_model(prompt, model="gpt-4o", api_key=api_key)
return plan
prompt = (
f"You are a master planner. Given the task: '{task}', create a detailed, step-by-step plan. "
"Break down the task into sub-tasks. Assign each sub-task to agents: Coder, Code Reviewer, Quality Assurance Tester, and Documentation Agent. "
"Include steps for review and revision. Consider potential issues and error handling. "
"Include instructions for documentation.\n\n"
"HOWEVER, if at ANY point you are unsure how to proceed, you can request human feedback. "
"To do this, output ONLY the following phrase (and nothing else): 'REQUEST_HUMAN_FEEDBACK'\n"
"Followed by a newline and a clear and concise question for the human. Example:\n\nREQUEST_HUMAN_FEEDBACK\nShould the output be in JSON or XML format?"
"\n\nOutput the plan as a numbered list (or as much as you can before requesting feedback)."
)
plan = await call_model(prompt, model="gpt-4o", api_key=api_key)
if "REQUEST_HUMAN_FEEDBACK" in plan:
self.log_queue.put("[Orchestrator]: Requesting human feedback...")
question = plan.split("REQUEST_HUMAN_FEEDBACK\n", 1)[1].strip()
self.log_queue.put(f"[Orchestrator]: Question for human: {question}")
self.human_in_the_loop_event.set() # Signal the human input thread
human_response = self.human_input_queue.get() # Wait for human input
self.human_in_the_loop_event.clear() # Reset the event
self.log_queue.put(f"[Orchestrator]: Received human feedback: {human_response}")
return await self.generate_plan(task, api_key, human_response) # Recursive call with feedback
return plan
class CoderAgent:
async def generate_code(self, instructions: str, api_key: str, model: str = "gpt-4o") -> str:
"""Generates code based on instructions."""
prompt = (
"You are a highly skilled coding agent. Output ONLY the code. "
"Adhere to best practices. Include error handling.\n\n"
f"Instructions:\n{instructions}"
)
code = await call_model(prompt, model=model, api_key=api_key)
return code
class CodeReviewerAgent:
async def review_code(self, code: str, task: str, api_key: str) -> str:
"""Reviews code. Provides concise, actionable feedback or 'APPROVE'."""
prompt = (
"You are a meticulous code reviewer. Provide CONCISE feedback. "
"Focus on correctness, efficiency, readability, error handling, security, and adherence to the task. "
"Suggest improvements. If acceptable, respond with ONLY 'APPROVE'. "
"Do NOT generate code.\n\n"
f"Task: {task}\n\nCode:\n{code}"
)
review = await call_model(prompt, model="gpt-4o", api_key=api_key)
return review
class QualityAssuranceTesterAgent:
async def generate_test_cases(self, code: str, task: str, api_key: str) -> str:
"""Generates test cases."""
prompt = (
"You are a quality assurance testing agent. Generate test cases. "
"Consider edge cases and error scenarios. Output in a clear format.\n\n"
f"Task: {task}\n\nCode:\n{code}"
)
test_cases = await call_model(prompt, model="gpt-4o", api_key=api_key)
return test_cases
async def run_tests(self, code:str, test_cases:str, api_key:str) -> str:
"""Runs tests and reports results."""
prompt = (
"Run the generated test cases. Compare actual vs expected output. "
"State discrepancies. If all pass, output 'TESTS PASSED'.\n\n"
f"Code:\n{code}\n\nTest Cases:\n{test_cases}"
)
test_results = await call_model(prompt, model="gpt-4o", api_key=api_key)
return test_results
class DocumentationAgent:
async def generate_documentation(self, code: str, api_key: str) -> str:
"""Generates documentation, including a --help message."""
prompt = (
"Generate clear and concise documentation. "
"Include a brief description, explanation, and a --help message.\n\n"
f"Code:\n{code}"
)
documentation = await call_model(prompt, model="gpt-4o", api_key=api_key)
return documentation
# -------------------- Multi-Agent Conversation --------------------
async def multi_agent_conversation(task_message: str, log_queue: queue.Queue, api_key: str, human_in_the_loop_event: threading.Event, human_input_queue: queue.Queue) -> None:
"""
Conducts the multi-agent conversation.
"""
conversation: List[Dict[str, str]] = []
# Step 0: Optimize Prompt
log_queue.put("[Prompt Optimizer]: Optimizing prompt...")
prompt_optimizer = PromptOptimizerAgent()
optimized_task = await prompt_optimizer.optimize_prompt(task_message, api_key=api_key)
conversation.append({"agent": "Prompt Optimizer", "message": f"Optimized Task:\n{optimized_task}"})
log_queue.put(f"[Prompt Optimizer]: Optimized task prompt:\n{optimized_task}")
# Step 1: Generate Plan
log_queue.put("[Orchestrator]: Generating plan...")
orchestrator = OrchestratorAgent(log_queue, human_in_the_loop_event, human_input_queue)
plan = await orchestrator.generate_plan(optimized_task, api_key=api_key)
conversation.append({"agent": "Orchestrator", "message": f"Plan:\n{plan}"})
log_queue.put(f"[Orchestrator]: Plan generated:\n{plan}")
# Step 2: Generate Code
coder = CoderAgent()
coder_instructions = f"Implement the task:\n{plan}"
log_queue.put("[Coder]: Generating code...")
code = await coder.generate_code(coder_instructions, api_key=api_key)
conversation.append({"agent": "Coder", "message": f"Code:\n{code}"})
log_queue.put(f"[Coder]: Code generated:\n{code}")
# Step 3: Code Review and Revision
reviewer = CodeReviewerAgent()
tester = QualityAssuranceTesterAgent()
approval_keyword = "approve"
revision_iteration = 0
while True:
log_queue.put(f"[Code Reviewer]: Reviewing code (Iteration {revision_iteration})...")
review = await reviewer.review_code(code, optimized_task, api_key=api_key)
conversation.append({"agent": "Code Reviewer", "message": f"Review (Iteration {revision_iteration}):\n{review}"})
log_queue.put(f"[Code Reviewer]: Review (Iteration {revision_iteration}):\n{review}")
if approval_keyword in review.lower():
log_queue.put("[Code Reviewer]: Code approved.")
break
revision_iteration += 1
if revision_iteration >= 5:
log_queue.put("Unable to solve task satisfactorily.")
sys.exit("Unable to solve task satisfactorily.")
log_queue.put("[QA Tester]: Generating test cases...")
test_cases = await tester.generate_test_cases(code, optimized_task, api_key=api_key)
conversation.append({"agent": "QA Tester", "message": f"Test Cases:\n{test_cases}"})
log_queue.put(f"[QA Tester]: Test Cases:\n{test_cases}")
log_queue.put("[QA Tester]: Running tests...")
test_results = await tester.run_tests(code, test_cases, api_key)
conversation.append({"agent": "QA Tester", "message": f"Test Results:\n{test_results}"})
log_queue.put(f"[QA Tester]: Test Results:\n{test_results}")
log_queue.put(f"[Orchestrator]: Revising code (Iteration {revision_iteration})...")
update_instructions = f"Revise:\nReview:\n{review}\nTests:\n{test_results}\nPlan:\n{plan}"
revised_code = await coder.generate_code(update_instructions, api_key=api_key, model="gpt-3.5-turbo-16k")
conversation.append({"agent": "Coder", "message": f"Revised Code (Iteration {revision_iteration}):\n{revised_code}"})
log_queue.put(f"[Coder]: Revised (Iteration {revision_iteration}):\n{revised_code}")
code = revised_code
# Step 4: Generate Documentation
doc_agent = DocumentationAgent()
log_queue.put("[Documentation Agent]: Generating documentation...")
documentation = await doc_agent.generate_documentation(code, api_key=api_key)
conversation.append({"agent": "Documentation Agent", "message": f"Documentation:\n{documentation}"})
log_queue.put(f"[Documentation Agent]: Documentation generated:\n{documentation}")
log_queue.put("Conversation complete.")
log_queue.put(("result", conversation))
# -------------------- Process Generator and Human Input --------------------
def process_conversation_generator(task_message: str, api_key: str, human_in_the_loop_event: threading.Event, human_input_queue: queue.Queue) -> Generator[str, None, None]:
"""
Wraps the conversation and yields log messages. Handles human input.
"""
log_q: queue.Queue = queue.Queue()
def run_conversation() -> None:
asyncio.run(multi_agent_conversation(task_message, log_q, api_key, human_in_the_loop_event, human_input_queue))
thread = threading.Thread(target=run_conversation)
thread.start()
final_result = None
while thread.is_alive() or not log_q.empty():
try:
msg = log_q.get(timeout=0.1)
if isinstance(msg, tuple) and msg[0] == "result":
final_result = msg[1]
yield "Conversation complete."
else:
yield msg
except queue.Empty:
continue
thread.join()
if final_result:
conv_text = "\n=== Conversation ===\n"
for entry in final_result:
conv_text += f"[{entry['agent']}]: {entry['message']}\n\n"
yield conv_text
def get_human_feedback(placeholder_text):
"""Gets human input using a Gradio Textbox."""
with gr.Blocks() as human_feedback_interface:
with gr.Row():
human_input = gr.Textbox(lines=4, placeholder=placeholder_text, label="Human Feedback")
with gr.Row():
submit_button = gr.Button("Submit Feedback")
feedback_queue = queue.Queue()
def submit_feedback(input_text):
feedback_queue.put(input_text)
return ""
submit_button.click(submit_feedback, inputs=human_input, outputs=human_input)
human_feedback_interface.load(None, [], []) # This is needed to keep the interface alive
return human_feedback_interface, feedback_queue
# -------------------- Chat Function for Gradio --------------------
def multi_agent_chat(message: str, history: List[Any], openai_api_key: str = None) -> Generator[str, None, None]:
"""Chat function for Gradio."""
if not openai_api_key:
openai_api_key = os.getenv("OPENAI_API_KEY")
if not openai_api_key:
yield "Error: API key not provided."
return
human_in_the_loop_event = threading.Event()
human_input_queue = queue.Queue()
yield from process_conversation_generator(message, openai_api_key, human_in_the_loop_event, human_input_queue)
while human_in_the_loop_event.is_set():
yield "Waiting for human feedback..."
placeholder = "Please provide your feedback."
human_interface, feedback_queue = get_human_feedback(placeholder)
#This is a hacky but currently only working way to make this work with gradio
yield gr.Textbox.update(visible=False), gr.update(visible=True)
try:
human_feedback = feedback_queue.get(timeout=300) # Wait for up to 5 minutes
human_input_queue.put(human_feedback)
human_in_the_loop_event.clear()
yield gr.Textbox.update(visible=True), human_interface.close()
yield from process_conversation_generator(message, openai_api_key, human_in_the_loop_event, human_input_queue)
except queue.Empty:
human_input_queue.put("No feedback provided.") #Timeout
human_in_the_loop_event.clear()
yield gr.Textbox.update(visible=True), human_interface.close()
yield from process_conversation_generator(message, openai_api_key, human_in_the_loop_event, human_input_queue)
# -------------------- Launch the Chatbot --------------------
# Create the main chat interface
iface = gr.ChatInterface(
fn=multi_agent_chat,
additional_inputs=[gr.Textbox(label="OpenAI API Key (optional)", type="password", placeholder="Leave blank to use env variable")],
title="Multi-Agent Task Solver with Human-in-the-Loop",
description="""
- Collaborative workflow with Human-in-the-Loop capability.
- The Orchestrator can ask for human feedback if needed.
- Enter a task, and the agents will work on it. You may be prompted for input.
- Max 5 revision iterations.
- Provide your OpenAI API Key below.
"""
)
#Need a dummy interface to make the human feedback interface update
dummy_iface = gr.Interface(lambda x:x, "textbox", "textbox")
if __name__ == "__main__":
demo = gr.TabbedInterface([iface, dummy_iface], ["Chatbot", "Dummy"])
demo.launch()