Spaces:
Sleeping
Sleeping
import re | |
import json | |
import time | |
import requests | |
import importlib.metadata | |
import gradio as gr | |
import os # Needed for writing files | |
from huggingface_hub import ( | |
create_repo, upload_file, list_models, constants | |
) | |
from huggingface_hub.utils import build_hf_headers, get_session, hf_raise_for_status | |
from google import genai | |
# Import Content and Part types for structured input | |
from google.genai.types import Content, Part | |
from google.genai.types import Tool, GenerateContentConfig, GoogleSearch | |
# --- USER INFO & MODEL LISTING --- | |
def show_profile(profile: gr.OAuthProfile | None) -> str: | |
return f"✅ Logged in as **{profile.username}**" if profile else "*Not logged in.*" | |
def list_private_models( | |
profile: gr.OAuthProfile | None, | |
oauth_token: gr.OAuthToken | None | |
) -> str: | |
# Gradio injects profile and oauth_token automatically when inputs=None | |
# and the function signature has these parameter types. | |
if not profile or not oauth_token or not hasattr(oauth_token, 'token') or not oauth_token.token: | |
return "Please log in to see your models." | |
try: | |
models = [ | |
f"{m.id} ({'private' if m.private else 'public'})" | |
for m in list_models(author=profile.username, token=oauth_token.token) | |
] | |
return "No models found." if not models else "Models:\n\n" + "\n - ".join(models) | |
except Exception as e: | |
# Catching potential API errors during model listing | |
return f"Error listing models: {e}" | |
# --- UTILITIES --- | |
def get_sdk_version(sdk_choice: str) -> str: | |
pkg = "gradio" if sdk_choice == "gradio" else "streamlit" | |
try: | |
return importlib.metadata.version(pkg) | |
except importlib.metadata.PackageNotFoundError: | |
return "UNKNOWN" | |
def classify_errors(logs: str) -> str: | |
errs = set() | |
# Convert logs to lower for case-insensitive matching | |
logs_lower = logs.lower() | |
if "syntaxerror" in logs_lower: | |
errs.add("syntax") | |
elif "importerror" in logs_lower or "modulenotfounderror" in logs_lower: | |
errs.add("import") | |
# Catch common error indicators | |
elif "traceback" in logs_lower or "exception" in logs_lower or "error" in logs_lower: | |
errs.add("runtime/generic") # More general error indication | |
return ", ".join(errs) or "none" | |
# --- HF SPACE LOGGING --- | |
def _get_space_jwt(repo_id: str, token: str) -> str: | |
"""Fetches JWT for Space logs using the user's Hf token.""" | |
url = f"{constants.ENDPOINT}/api/spaces/{repo_id}/jwt" | |
headers = build_hf_headers(token=token) | |
r = get_session().get(url, headers=headers) | |
hf_raise_for_status(r) # Raises HTTPError for bad responses (e.g. 404 if repo doesn't exist) | |
return r.json()["token"] | |
def fetch_logs(repo_id: str, level: str, token: str) -> str: | |
"""Fetches build or run logs from an HF Space.""" | |
if not token: | |
return "Login required to fetch logs." | |
try: | |
jwt = _get_space_jwt(repo_id, token) | |
url = f"https://api.hf.space/v1/{repo_id}/logs/{level}" | |
lines = [] | |
headers = build_hf_headers(token=jwt) | |
# Use a timeout for the request | |
with get_session().get(url, headers=headers, stream=True, timeout=10) as resp: | |
hf_raise_for_status(resp) | |
# Read lines with a timeout | |
for raw in resp.iter_lines(decode_unicode=True, chunk_size=512): | |
if raw is None: # handle keep-alive or similar | |
continue | |
if raw.startswith("data: "): | |
try: | |
ev = json.loads(raw[len("data: "):]) | |
ts, txt = ev.get("timestamp","N/A"), ev.get("data","") | |
lines.append(f"[{ts}] {txt}") | |
except json.JSONDecodeError: | |
lines.append(f"Error decoding log line: {raw}") | |
except Exception as e: | |
lines.append(f"Unexpected error processing log line: {raw} - {e}") | |
return "\n".join(lines) | |
except requests.exceptions.Timeout: | |
return f"Error: Timeout fetching {level} logs." | |
except requests.exceptions.RequestException as e: | |
return f"Error fetching {level} logs: {e}" | |
except Exception as e: | |
return f"An unexpected error occurred while fetching logs: {e}" | |
def check_iframe(url: str, timeout: int = 10) -> bool: | |
"""Checks if the iframe URL is reachable and returns a 200 status code.""" | |
try: | |
# Use a HEAD request for efficiency if only status is needed, but GET is safer for | |
# checking if content is served. Let's stick to GET with a timeout. | |
response = requests.get(url, timeout=timeout) | |
return response.status_code == 200 | |
except requests.exceptions.RequestException: | |
return False # Any request exception (timeout, connection error, etc.) means it's not accessible | |
# --- AGENT PROMPTS --- | |
SYSTEM_ORCHESTRATOR = { | |
"role": "system", | |
"content": ( | |
"You are **Orchestrator Agent**, the project manager. " | |
"Your role is to guide the development process from user request to a deployed HF Space application. " | |
"You will analyze the current project state (requirements, plan, files, logs, feedback, status, attempt_count) " | |
"and decide the *single* next step/task for the team. " | |
"Output *only* the name of the next task from the following list: " | |
"'PLANNING', 'CODING - {task_description}', 'PUSHING', 'LOGGING', 'DEBUGGING', 'COMPLETE', 'FAILED'. " | |
"If moving to 'CODING', briefly describe the specific area to focus on (e.g., 'CODING - Initial UI', 'CODING - Adding Data Loading', 'CODING - Fixing Import Errors'). " | |
"Analyze the debug feedback and logs carefully to decide the appropriate coding task description." | |
"If the debug feedback indicates 'All clear', transition to 'COMPLETE'." | |
"If maximum attempts are reached or a critical error occurs, transition to 'FAILED'." | |
) | |
} | |
SYSTEM_ARCHITECT = { | |
"role": "system", | |
"content": ( | |
"You are **Architect Agent**, the lead planner. " | |
"Given the user requirements and the current project state, your task is to devise or refine the high-level plan for the application. " | |
"Outline the main features, suggest a logical structure, identify potential files (e.g., `app.py`, `utils.py`, `requirements.txt`), and key components needed. " | |
"The target SDK is {sdk_choice}. The main application file should be `{main_app_file}`. " | |
"Output the plan clearly, using bullet points or a simple numbered list. Do NOT write code. Focus only on the plan." | |
) | |
} | |
SYSTEM_CODEGEN = { | |
"role": "system", | |
"content": ( | |
"You are **Code‑Gen Agent**, a proactive AI developer. " | |
"Your sole responsibility is to author and correct code files based on the plan and the assigned task. " | |
"You will receive the full project state, including the requirements, plan, existing files, and debug feedback. " | |
"Based on the current task assigned by the Orchestrator ('{current_task}'), write or modify the necessary code *only* in the specified file(s). " | |
"Output the *full content* of the updated file(s) in markdown code blocks, clearly indicating the filename(s) immediately before the code block like this: `filename`\n```<language>\ncode goes here\n```" | |
"If the task involves creating a new file, include it in the output. If modifying an existing file, provide the *complete* modified code for that file." | |
"Ensure the code adheres to the plan and addresses the debug feedback if provided." | |
"Only output the code blocks and their preceding filenames. Do not add extra commentary outside the code blocks." | |
) | |
} | |
SYSTEM_DEBUG = { | |
"role": "system", | |
"content": ( | |
"You are **Debug Agent**, a meticulous code reviewer and tester. " | |
"You have access to the full project state: requirements, plan, code files, build logs, and run logs. " | |
"Your task is to analyze the logs and code in the context of the plan and requirements. " | |
"Identify errors, potential issues, missing features based on the plan, and suggest concrete improvements or fixes for the Code-Gen agent. " | |
"Pay close attention to the build and run logs for specific errors (SyntaxError, ImportError, runtime errors). " | |
"Also check if the implemented features align with the plan." | |
"If the application appears to be working based on the logs and iframe check, and seems to meet the plan's core requirements, state 'All clear. Project appears complete.' as the *first line* of your feedback." | |
"Otherwise, provide actionable feedback, referencing file names and line numbers where possible. Format feedback clearly." | |
"Example feedback:\n'Error in `app.py`: ModuleNotFoundError for 'missing_library'. Add 'missing_library' to `requirements.txt`.'\n'Issue: The plan required a download button, but it's missing in `app.py`.'\n'Suggestion: Check the loop in `utils.py`, it might cause an infinite loop based on run logs.' " | |
"Do NOT write or suggest large code blocks directly in your feedback. Focus on *what* needs fixing/adding and *why*." | |
) | |
} | |
# --- AGENT RUNNER HELPER --- | |
def run_agent(client, model_name, system_prompt_template, user_input_state, config): | |
"""Helper to run a single agent interaction using the project state as input.""" | |
try: | |
# Format the system prompt using state variables | |
system_prompt = system_prompt_template["content"].format(**user_input_state) | |
except KeyError as e: | |
print(f"Error formatting system prompt: Missing key {e}. Prompt template: {system_prompt_template['content']}") | |
return f"ERROR: Internal agent error - Missing key {e} for prompt formatting." | |
# Prepare the message content by formatting the project state | |
user_message_content = "Project State:\n" + json.dumps(user_input_state, indent=2) | |
# Define the model to use based on the parameter | |
model_to_use = model_name | |
try: | |
# Use a single Content object with role="user" and combined text in a Part | |
# The GenAI API for models like Gemini Flash Preview expects input roles to be 'user'. | |
messages = [ | |
Content(role="user", parts=[Part(text=system_prompt + "\n\n" + user_message_content)]) | |
] | |
response = client.models.generate_content( | |
model=model_to_use, | |
contents=messages, # Pass the list of Content objects | |
config=config | |
) | |
# API errors are handled by the SDK raising exceptions caught below. | |
# Some models return parts, concatenate them | |
# Ensure candidate and content exist before accessing parts | |
if not response.candidates or not response.candidates[0].content: | |
print("Agent returned no candidates or empty content.") | |
# Check if there was a rejection reason | |
if response.prompt_feedback and response.prompt_feedback.block_reason: | |
block_reason = response.prompt_feedback.block_reason | |
print(f"Prompt was blocked. Reason: {block_reason}") | |
return f"ERROR: Agent response blocked by safety filters. Reason: {block_reason.name}" # Return reason if available | |
return f"ERROR: Agent returned no response content." | |
response_text = "".join([part.text for part in response.candidates[0].content.parts]) | |
print(f"--- Agent Response --- ({model_to_use})") | |
# print(response_text) # Careful: can be very long | |
print("----------------------") | |
# Return just the response text. The calling functions manage the project_state history. | |
return response_text.strip() | |
except Exception as e: | |
print(f"Agent call failed: {e}") | |
# Attempt to extract error message from response object if possible | |
error_details = str(e) | |
if hasattr(e, 'response') and e.response is not None: | |
try: | |
# Check if response has a usable text body or error structure | |
error_json = e.response.json() | |
error_details = json.dumps(error_json, indent=2) | |
except: | |
try: | |
error_details = e.response.text # Fallback to raw text | |
except: | |
pass # Cannot get response text | |
return f"ERROR: Agent failed - {error_details}" # Indicate failure | |
# --- AGENT FUNCTIONS (called by Orchestrator) --- | |
# These functions now expect only the response text from run_agent | |
def run_planner(client, project_state, config): | |
print("Orchestrator: Running Planner Agent...") | |
# Planner needs requirements and basic project info | |
input_state_for_planner = { | |
"requirements": project_state['requirements'], | |
"sdk_choice": project_state['sdk_choice'], | |
"main_app_file": project_state['main_app_file'], | |
"files": project_state['files'] # Include existing files | |
} | |
response_text = run_agent( | |
client=client, | |
model_name="gemini-2.5-flash-preview-04-17", # Use the specific model name | |
system_prompt_template=SYSTEM_ARCHITECT, | |
user_input_state=input_state_for_planner, | |
config=config, | |
) | |
if response_text.startswith("ERROR:"): | |
project_state['status_message'] = response_text | |
return False # Indicate failure | |
project_state['plan'] = response_text | |
print("Orchestrator: Planner Output Received.") | |
project_state['status_message'] = "Planning complete." | |
# Add plan to chat history for user | |
project_state['chat_history'].append({"role": "assistant", "content": f"**Plan:**\n{project_state['plan']}"}) | |
return True | |
def run_codegen(client, project_state, config): | |
print(f"Orchestrator: Running Code-Gen Agent for task: {project_state['current_task']}...") | |
# Code-Gen needs requirements, plan, existing files, and debug feedback | |
input_state_for_codegen = { | |
"current_task": project_state['current_task'], | |
"requirements": project_state['requirements'], | |
"plan": project_state['plan'], | |
"files": project_state['files'], # Pass current files so it can modify | |
"feedback": project_state['feedback'] or 'None', | |
"sdk_choice": project_state['sdk_choice'], | |
"main_app_file": project_state['main_app_file'] # Ensure it knows the main file convention | |
} | |
response_text = run_agent( | |
client=client, | |
model_name="gemini-2.5-flash-preview-04-17", # Use the specific model name | |
system_prompt_template=SYSTEM_CODEGEN, | |
user_input_state=input_state_for_codegen, | |
config=config, | |
) | |
if response_text.startswith("ERROR:"): | |
project_state['status_message'] = response_text | |
# The error message added here will be processed by the debugger | |
# No need to add to chat history here, debugger feedback will summarize | |
return False # Indicate failure | |
# Parse the response text to extract code blocks for potentially multiple files | |
files_updated = {} | |
# Regex to find blocks like `filename`\n```[language]\ncode...\n``` | |
blocks = re.findall(r"(`[^`]+`)\s*```(?:\w*\n)?([\s\S]*?)```", response_text) | |
if not blocks: | |
print("Code-Gen Agent did not output any code blocks in expected format.") | |
parse_error_msg = "ERROR: Code-Gen Agent failed to output code blocks in `filename`\\n```code``` format." | |
project_state['status_message'] = parse_error_msg | |
# Add the agent's raw response to feedback for debugging | |
project_state['feedback'] = project_state['feedback'] + "\n\n" + parse_error_msg + "\nRaw Agent Response (no code blocks detected):\n" + response_text[:1000] + "..." # Add truncated raw response | |
project_state['chat_history'].append({"role": "assistant", "content": parse_error_msg + "\nSee Debug Feedback for raw response."}) | |
return False # Indicate failure | |
syntax_errors = [] | |
for filename_match, code_content in blocks: | |
filename = filename_match.strip('`').strip() | |
if not filename: | |
syntax_errors.append(f"Code block found with empty filename.") | |
continue # Skip this block | |
files_updated[filename] = code_content.strip() # Store updated code | |
# Quick syntax check for Python files | |
if filename.endswith('.py'): | |
try: | |
compile(code_content, filename, "exec") | |
# print(f"Syntax check passed for {filename}") # Too verbose | |
except SyntaxError as e: | |
syntax_errors.append(f"Syntax Error in {filename}: {e}") | |
print(f"Syntax Error in {filename}: {e}") | |
except Exception as e: | |
syntax_errors.append(f"Unexpected error during syntax check for {filename}: {e}") | |
print(f"Unexpected error during syntax check for {filename}: {e}") | |
if not files_updated: | |
print("Code-Gen Agent outputted blocks but couldn't parse any valid filenames.") | |
parse_error_msg = "ERROR: Code-Gen Agent outputted blocks but couldn't parse any valid filenames." | |
project_state['status_message'] = parse_error_msg | |
project_state['feedback'] = project_state['feedback'] + "\n\n" + parse_error_msg | |
project_state['chat_history'].append({"role": "assistant", "content": parse_error_msg}) | |
return False # Indicate failure | |
if syntax_errors: | |
# If syntax errors found, add them to feedback and signal failure for CodeGen step | |
syntax_error_msg = "ERROR: Code-Gen Agent introduced syntax errors." | |
project_state['feedback'] = syntax_error_msg + "\n" + "\n".join(syntax_errors) + "\n\n" + project_state.get('feedback', '') # Prepend errors | |
project_state['status_message'] = syntax_error_msg + " Debugging needed." | |
# Add syntax errors to chat history for user visibility | |
project_state['chat_history'].append({"role": "assistant", "content": project_state['status_message']}) | |
project_state['chat_history'].append({"role": "assistant", "content": "Details:\n" + "\n".join(syntax_errors)}) | |
return False # Indicate failure due to syntax errors | |
project_state['files'].update(files_updated) # Update existing files or add new ones | |
print(f"Orchestrator: Code-Gen Agent updated files: {list(files_updated.keys())}") | |
# Add the generated/updated code content snippet to the chat history for visibility | |
code_summary = "\n".join([f"`{fn}`:\n```python\n{code[:500]}{'...' if len(code) > 500 else ''}\n```" for fn, code in files_updated.items()]) # Show snippet | |
project_state['chat_history'].append({"role": "assistant", "content": f"**Code Generated/Updated:**\n\n{code_summary}"}) | |
project_state['status_message'] = f"Code generated/updated: {list(files_updated.keys())}" | |
return True # Indicate success | |
def run_debugger(client, project_state, config): | |
print("Orchestrator: Running Debug Agent...") | |
# Debugger needs requirements, plan, files, logs, and iframe status | |
input_state_for_debugger = { | |
"requirements": project_state['requirements'], | |
"plan": project_state['plan'], | |
"files": project_state['files'], | |
"build_logs": project_state['logs'].get('build', 'No build logs.'), | |
"run_logs": project_state['logs'].get('run', 'No run logs.'), | |
"iframe_status": 'Responding OK' if project_state.get('iframe_ok', False) else 'Not responding or check failed.', | |
"error_types_found": classify_errors(project_state['logs'].get('build', '') + '\n' + project_state['logs'].get('run', '')) | |
} | |
response_text = run_agent( | |
client=client, | |
model_name="gemini-2.5-flash-preview-04-17", # Use the specific model name | |
system_prompt_template=SYSTEM_DEBUG, | |
user_input_state=input_state_for_debugger, | |
config=config, | |
) | |
if response_text.startswith("ERROR:"): | |
project_state['status_message'] = response_text | |
# Add the debugger error to feedback for visibility | |
project_state['feedback'] = project_state.get('feedback', '') + "\n\n" + response_text | |
project_state['chat_history'].append({"role": "assistant", "content": response_text}) # Add error to chat | |
return False # Indicate failure | |
project_state['feedback'] = response_text | |
print("Orchestrator: Debug Agent Feedback Received.") | |
project_state['status_message'] = "Debug feedback generated." | |
# Add debug feedback to chat history | |
project_state['chat_history'].append({"role": "assistant", "content": f"**Debug Feedback:**\n{project_state['feedback']}"}) | |
return True | |
# --- MAIN ORCHESTRATION LOGIC --- | |
def orchestrate_development(client, project_state, config, oauth_token_token): | |
"""Manages the overall development workflow.""" | |
# Initial step transition | |
if project_state['current_task'] == 'START': | |
project_state['current_task'] = 'PLANNING' | |
project_state['status_message'] = "Starting project: Initializing and moving to Planning." | |
# Add initial message to chat history | |
project_state['chat_history'].append({"role": "assistant", "content": "Project initialized. Starting development team."}) | |
while project_state['status'] == 'In Progress' and project_state['attempt_count'] < 7: | |
print(f"\n--- Attempt {project_state['attempt_count'] + 1} ---") | |
print(f"Current Task: {project_state['current_task']}") | |
current_task = project_state['current_task'] | |
# Add current task to history for UI visibility | |
task_message = f"➡️ Task: {current_task}" | |
if not project_state['chat_history'] or project_state['chat_history'][-1].get('content', '').strip() != task_message.strip(): | |
project_state['chat_history'].append({"role": "assistant", "content": task_message}) | |
step_successful = True # Flag to track if the current step completed without error | |
if current_task == 'PLANNING': | |
step_successful = run_planner(client, project_state, config) | |
if step_successful: | |
project_state['current_task'] = 'CODING - Initial Implementation' # Move to coding after planning | |
# Add plan to chat history if it wasn't added by run_planner (depends on its implementation) | |
if project_state['plan'] and not any("**Plan:**" in msg['content'] for msg in project_state['chat_history']): | |
project_state['chat_history'].append({"role": "assistant", "content": f"**Plan:**\n{project_state['plan']}"}) | |
else: | |
project_state['current_task'] = 'FAILED' # Planning failed, end process | |
elif current_task.startswith('CODING'): | |
# Ensure minimum files exist before asking CodeGen to code | |
# This happens once at the start of the first coding task or if syntax errors occurred | |
# Simplify the stubbing logic - just ensure these files exist in the state before CodeGen runs | |
if project_state['main_app_file'] not in project_state['files']: | |
print(f"Adding initial stub for {project_state['main_app_file']}...") | |
project_state['files'][project_state['main_app_file']] = f"# Initial {project_state['sdk_choice']} app file\n" # Start with a basic stub | |
if project_state['sdk_choice'] == 'gradio': | |
project_state['files'][project_state['main_app_file']] += "import gradio as gr\n\n# Define a simple interface\n# For example: gr.Interface(...).launch()\n" | |
elif project_state['sdk_choice'] == 'streamlit': | |
project_state['files'][project_state['main_app_file']] += "import streamlit as st\n\n# Your Streamlit app starts here\n# For example: st.write('Hello, world!')\n" | |
if 'requirements.txt' not in project_state['files']: | |
print("Adding initial requirements.txt stub...") | |
req_content = "pandas\n" + ("streamlit\n" if project_state['sdk_choice']=="streamlit" else "gradio\n") + "google-generativeai\nhuggingface-hub\n" | |
project_state['files']['requirements.txt'] = req_content | |
if 'README.md' not in project_state['files']: | |
print("Adding initial README.md stub...") | |
readme_content = f"""--- | |
title: {project_state['repo_id']} | |
emoji: 🐢 | |
sdk: {project_state['sdk_choice']} | |
sdk_version: {project_state['sdk_version']} | |
app_file: {project_state['main_app_file']} | |
pinned: false | |
--- | |
# {project_state['repo_id']} | |
This is an auto-generated HF Space. | |
**Requirements:** {project_state['requirements']} | |
**Plan:** | |
{project_state['plan']} | |
""" | |
project_state['files']['README.md'] = readme_content | |
step_successful = run_codegen(client, project_state, config) | |
if step_successful: | |
project_state['current_task'] = 'PUSHING' # Always push after attempting to code | |
else: | |
# Code-gen failed (syntax error, parsing issue, etc.) | |
# The failure is handled within run_codegen by setting status_message and feedback | |
# We'll try debugging/coding again in the next attempt loop iteration if attempts allow | |
print("Code-Gen step failed. Moving to Debugging.") | |
# attempt_count is incremented AFTER debugging phase analyses results | |
project_state['current_task'] = 'DEBUGGING' # Go to debugging to analyze the failure | |
elif current_task == 'PUSHING': | |
try: | |
# Create/update repo first | |
create_repo(repo_id=project_state['repo_id'], token=oauth_token_token, | |
exist_ok=True, repo_type="space", space_sdk=project_state['sdk_choice']) | |
# *** FIX: Filter out any empty string keys before iterating *** | |
files_to_push = { | |
fn: content | |
for fn, content in project_state['files'].items() | |
if fn and fn.strip() # Keep only non-empty, non-whitespace filenames | |
} | |
print(f"Attempting to push {len(files_to_push)} valid files...") | |
# Write and upload all valid files | |
for fn, content in files_to_push.items(): | |
# Ensure directories exist for files like utils/data.py | |
dirpath = os.path.dirname(fn) | |
if dirpath: # Only create dir if filename has a path component | |
os.makedirs(dirpath, exist_ok=True) | |
with open(fn, "w") as f: | |
f.write(content) | |
upload_file( | |
path_or_fileobj=fn, path_in_repo=fn, | |
repo_id=project_state['repo_id'], token=oauth_token_token, | |
repo_type="space" | |
) | |
print(f"Pushed {len(files_to_push)} files to {project_state['repo_id']}") | |
project_state['status_message'] = f"Pushed code to HF Space **{project_state['repo_id']}**. Waiting for build..." | |
project_state['chat_history'].append({"role": "assistant", "content": project_state['status_message']}) | |
project_state['current_task'] = 'LOGGING' # Move to fetching logs | |
except Exception as e: | |
step_successful = False | |
project_state['status'] = 'Failed' # Pushing is critical, fail if it fails | |
project_state['status_message'] = f"ERROR: Failed to push to HF Space: {e}" | |
project_state['chat_history'].append({"role": "assistant", "content": project_state['status_message']}) | |
print(project_state['status_message']) | |
project_state['current_task'] = 'FINISHED' # End process | |
elif current_task == 'LOGGING': | |
# Wait a moment for build to start | |
time.sleep(5) # Initial wait | |
wait_time = 5 | |
max_log_wait = 150 # Increased max wait time for logs | |
elapsed_log_wait = 0 | |
logs_fetched = False | |
iframe_checked = False | |
status_logging_message = "Fetching logs and checking iframe..." | |
if not project_state['chat_history'] or project_state['chat_history'][-1].get('content', '').strip() != status_logging_message.strip(): | |
project_state['chat_history'].append({"role": "assistant", "content": status_logging_message}) | |
project_state['status_message'] = status_logging_message | |
while elapsed_log_wait < max_log_wait: | |
try: | |
build_logs = fetch_logs(project_state['repo_id'], "build", oauth_token_token) | |
run_logs = fetch_logs(project_state['repo_id'], "run", oauth_token_token) | |
project_state['logs']['build'] = build_logs | |
project_state['logs']['run'] = run_logs | |
logs_fetched = True | |
# Only check iframe once logs indicate something might be running, or after a delay | |
if elapsed_log_wait > 10 or len(run_logs) > 0 or len(build_logs) > 100: | |
project_state['iframe_ok'] = check_iframe(project_state['iframe_url']) | |
iframe_checked = True | |
else: | |
project_state['iframe_ok'] = False # Assume not ready yet | |
print(f"Log/Iframe check at {elapsed_log_wait}s. Build logs len: {len(build_logs)}, Run logs len: {len(run_logs)}, Iframe OK: {project_state['iframe_ok']}") | |
# Conditions to proceed to debugging: | |
# 1. Iframe is OK (app is running and accessible) - strongest signal | |
# 2. Build logs show errors (need debugging ASAP) | |
# 3. Max wait time is almost reached (proceed with whatever logs we have) | |
# 4. Build logs exist and indicate *some* progress (e.g., contain "Building" or sufficient length) | |
# 5. Run logs exist (app is at least trying to run) | |
if project_state['iframe_ok'] or \ | |
"ERROR" in build_logs.upper() or "FATAL" in build_logs.upper() or \ | |
elapsed_log_wait >= max_log_wait - wait_time or \ | |
("Building" in build_logs or len(build_logs) > 100) and logs_fetched or \ | |
len(run_logs) > 0: | |
break # Exit the log fetching wait loop | |
else: | |
print(f"Logs or iframe not ready. Waiting {wait_time}s...") | |
time.sleep(wait_time) | |
elapsed_log_wait += wait_time | |
wait_time = min(wait_time * 1.5, 20) # Increase wait time, cap at 20s | |
except Exception as e: | |
print(f"Error during log fetching or iframe check: {e}. Will retry.") | |
time.sleep(wait_time) | |
elapsed_log_wait += wait_time | |
wait_time = min(wait_time * 1.5, 20) | |
# Update status message after the wait loop | |
if logs_fetched or iframe_checked: # Proceed if we got logs OR checked the iframe | |
project_state['status_message'] = "Logs fetched and iframe checked (or timeout reached)." | |
else: | |
project_state['status_message'] = "Warning: Could not fetch logs or check iframe status within timeout." | |
step_successful = False # Indicate that this step didn't fully succeed | |
project_state['chat_history'].append({"role": "assistant", "content": project_state['status_message']}) | |
project_state['current_task'] = 'DEBUGGING' # Always move to debugging after attempting to log/check | |
elif current_task == 'DEBUGGING': | |
step_successful = run_debugger(client, project_state, config) | |
# Debug feedback is added to chat history inside run_debugger now | |
if step_successful: | |
# Analyze feedback to decide next step | |
feedback = project_state['feedback'] | |
iframe_ok = project_state.get('iframe_ok', False) | |
error_types = classify_errors(project_state['logs'].get('build', '') + '\n' + project_state['logs'].get('run', '')) | |
print(f"Debug Analysis - Feedback: {feedback[:100]}... | Iframe OK: {iframe_ok} | Errors: {error_types}") | |
# Decision Logic: | |
# 1. Success? Debugger says clear AND iframe works AND no/minor errors in logs AND run logs have some content | |
is_complete = ("All clear. Project appears complete." in feedback) or \ | |
(iframe_ok and error_types == "none" and "ERROR" not in feedback.upper() and len(project_state['logs'].get('run', '')) > 10) | |
if is_complete: | |
project_state['status'] = 'Complete' | |
project_state['current_task'] = 'FINISHED' | |
project_state['status_message'] = "Debug Agent reports clear. Project appears complete." | |
elif project_state['attempt_count'] >= 6: # Max attempts reached AFTER debugging analysis | |
project_state['status'] = 'Failed' | |
project_state['current_task'] = 'FINISHED' | |
project_state['status_message'] = f"Max attempts ({project_state['attempt_count']+1}/7) reached after debugging. Project failed." | |
else: | |
# Errors or issues found, need more coding/debugging | |
project_state['current_task'] = 'CODING - Addressing Feedback' | |
project_state['status_message'] = "Debug Agent found issues. Returning to Coding phase to address feedback." | |
project_state['attempt_count'] += 1 # Increment attempt count AFTER a debug cycle points back to coding | |
backoff_wait = min(project_state['attempt_count'] * 5, 30) # Backoff before next coding attempt | |
print(f"Waiting {backoff_wait} seconds before next coding attempt...") | |
time.sleep(backoff_wait) | |
else: | |
# Debugger failed (e.g. API error) | |
project_state['status'] = 'Failed' | |
project_state['current_task'] = 'FINISHED' | |
# status_message and feedback already set by run_debugger | |
elif current_task == 'FINISHED': | |
# Exit the main loop | |
pass # Loop condition handles exit | |
else: | |
# Unknown task | |
step_successful = False | |
project_state['status'] = 'Failed' | |
project_state['status_message'] = f"ERROR: Orchestrator entered an unknown task state: {current_task}" | |
project_state['chat_history'].append({"role": "assistant", "content": project_state['status_message']}) | |
print(project_state['status_message']) | |
project_state['current_task'] = 'FINISHED' # End process | |
# If a step failed and didn't explicitly set status to FAILED (like PUSHING), | |
# the orchestrator logic above should handle transition to FAILED or DEBUGGING. | |
# This check acts as a safeguard. | |
if not step_successful and project_state['status'] == 'In Progress': | |
print(f"Orchestration step '{current_task}' failed, but status is still 'In Progress'. Forcing Failure.") | |
project_state['status'] = 'Failed' | |
project_state['status_message'] = project_state.get('status_message', f'An unexpected error caused task failure: {current_task}') | |
project_state['chat_history'].append({"role": "assistant", "content": project_state['status_message']}) | |
project_state['current_task'] = 'FINISHED' | |
# --- End of Orchestration Loop --- | |
# Final status message if loop exited without explicit FINISHED state | |
if project_state['status'] == 'In Progress': | |
project_state['status'] = 'Failed' | |
project_state['status_message'] = project_state.get('status_message', 'Orchestration loop exited unexpectedly.') | |
# Add final outcome message to history if not already the last message | |
final_outcome_message = f"**Project Outcome:** {project_state['status']} - {project_state['status_message']}" | |
if not project_state['chat_history'] or project_state['chat_history'][-1].get('content', '').strip() != final_outcome_message.strip(): | |
project_state['chat_history'].append({"role": "assistant", "content": final_outcome_message}) | |
if project_state['status'] == 'Complete': | |
completion_message = "✅ Application deployed successfully (likely)! Check the preview above." | |
if not project_state['chat_history'] or project_state['chat_history'][-1].get('content', '').strip() != completion_message.strip(): | |
project_state['chat_history'].append({"role": "assistant", "content": completion_message}) | |
elif project_state['status'] == 'Failed': | |
failure_message = "❌ Project failed to complete. Review logs and feedback for details." | |
if not project_state['chat_history'] or project_state['chat_history'][-1].get('content', '').strip() != failure_message.strip(): | |
project_state['chat_history'].append({"role": "assistant", "content": failure_message}) | |
# Return final state for UI update | |
return ( | |
project_state['chat_history'], | |
project_state['logs'].get('build', 'No build logs.'), | |
project_state['logs'].get('run', 'No run logs.'), | |
(f'<iframe src="{project_state["iframe_url"]}" width="100%" height="500px"></iframe>' | |
+ ("" if project_state.get('iframe_ok') else "<p style='color:red;'>⚠️ iframe not responding or check failed.</p>")), | |
project_state['status_message'] # Return the final status message | |
) | |
# --- MAIN HANDLER (Called by Gradio) --- | |
# Updated signature to include user_input | |
def handle_user_message( | |
history, # This is the list of messages in the Gradio Chatbot (from previous turns) | |
user_input: str, # <--- The *new* text input from the user_in textbox | |
sdk_choice: str, | |
gemini_api_key: str, | |
grounding_enabled: bool, | |
temperature: float, | |
max_output_tokens: int, | |
profile: gr.OAuthProfile | None, # Gradio auto-injects | |
oauth_token: gr.OAuthToken | None # Gradio auto-injects | |
): | |
# The user_input is already the new prompt. | |
# We need to add it to the history list here at the beginning, | |
# as Gradio's Chatbot expects the handler to return the *updated* history. | |
# Check if the last message is *not* a user message or is empty to avoid duplicates | |
if not history or history[-1].get("role") != "user" or history[-1].get("content") != user_input: | |
history.append({"role": "user", "content": user_input}) | |
if not profile or not oauth_token or not oauth_token.token: | |
# Append error message to history for display | |
error_msg = "⚠️ Please log in first via the Hugging Face button." | |
if not history or history[-1].get("content") != error_msg: | |
history.append({"role":"assistant","content":error_msg}) | |
# Return current state, logs etc. + the new history | |
return history, "", "", "<p>Please log in.</p>", "Login required." | |
if not gemini_api_key: | |
error_msg = "⚠️ Please provide your Gemini API Key." | |
if not history or history[-1].get("content") != error_msg: | |
history.append({"role":"assistant","content":error_msg}) | |
return history, "", "", "<p>Please provide API Key.</p>", "API Key required." | |
if not user_input or user_input.strip() == "": | |
# Handle empty prompt case - the prompt is now the user_input parameter | |
error_msg = "Please enter requirements for the application." | |
if not history or history[-1].get("content") != error_msg: | |
history.append({"role":"assistant","content":error_msg}) | |
return history, "", "", "<p>Enter requirements.</p>", "Waiting for prompt." | |
client = genai.Client(api_key=gemini_api_key) | |
repo_id = f"{profile.username}/{profile.username}-auto-space" | |
iframe_url = f"https://huggingface.co/spaces/{repo_id}" | |
sdk_version = get_sdk_version(sdk_choice) | |
code_fn = "app.py" if sdk_choice == "gradio" else "streamlit_app.py" # Standard main file name convention | |
# The user's latest prompt is the user_input parameter | |
user_prompt = user_input | |
# Initialize project state for this development session | |
# History will be updated throughout and returned at the end | |
project_state = { | |
'requirements': user_prompt, | |
'plan': '', | |
'files': {}, # Use a dict to store multiple file contents {filename: code} | |
'logs': {'build': '', 'run': ''}, | |
'feedback': '', | |
'current_task': 'START', # Start the orchestration state machine | |
'status': 'In Progress', | |
'status_message': 'Initializing...', | |
'attempt_count': 0, | |
'sdk_choice': sdk_choice, | |
'sdk_version': sdk_version, | |
'repo_id': repo_id, | |
'iframe_url': iframe_url, | |
'main_app_file': code_fn, | |
'chat_history': history[:] # Use the passed-in history to build upon | |
} | |
cfg = GenerateContentConfig( | |
tools=[Tool(google_search=GoogleSearch())] if grounding_enabled else [], | |
response_modalities=["TEXT"], | |
temperature=temperature, | |
max_output_tokens=int(max_output_tokens), # Ensure integer | |
) | |
# Start the orchestration process | |
final_history, final_build_logs, final_run_logs, final_iframe_html, final_status_message = orchestrate_development( | |
client, project_state, cfg, oauth_token.token # Pass the token string | |
) | |
# Return the final state for the UI | |
return ( | |
final_history, | |
final_build_logs, | |
final_run_logs, | |
final_iframe_html, | |
final_status_message | |
) | |
# --- SIMPLE UI WITH HIGHER MAX TOKENS & STATUS DISPLAY --- | |
with gr.Blocks(title="HF Space Auto‑Builder (Team AI)") as demo: | |
gr.Markdown("## 🐢 HF Space Auto‑Builder (Team AI)\nUse AI agents to build and deploy a simple Gradio or Streamlit app on a Hugging Face Space.") | |
gr.Markdown("1) Log in with Hugging Face. 2) Enter your Gemini API Key. 3) Provide app requirements. 4) Click 'Start Development Team' and watch the process.") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
# --- LOGIN BUTTON / PROFILE & MODEL LISTING --- | |
login_btn = gr.LoginButton(variant="huggingface", size="lg") | |
status_md = gr.Markdown("*Not logged in.*") | |
models_md = gr.Markdown() | |
# On app load, show “not logged in” and list public models (or none) | |
# inputs=None tells Gradio to auto-inject LoginButton state if signature matches | |
demo.load(show_profile, inputs=None, outputs=status_md, api_name="load_profile") | |
demo.load(list_private_models, inputs=None, outputs=models_md, api_name="load_models") | |
# When the user actually logs in: | |
# inputs=None tells Gradio to auto-inject LoginButton state | |
login_btn.click( | |
fn=show_profile, | |
inputs=None, | |
outputs=status_md, | |
api_name="login_profile" | |
) | |
login_btn.click( | |
fn=list_private_models, | |
inputs=None, | |
outputs=models_md, | |
api_name="login_models" | |
) | |
# --- END LOGIN FIX --- | |
gr.Markdown("---") | |
sdk_choice = gr.Radio(["gradio","streamlit"], value="gradio", label="SDK", info="Choose the framework for your app.") | |
api_key = gr.Textbox(label="Gemini API Key", type="password", info="Get one from Google AI Studio.") | |
grounding = gr.Checkbox(label="Enable Google Search (Grounding)", value=False, info="Allow agents to use Google Search.") | |
temp = gr.Slider(0,1,value=0.2, label="Temperature", info="Creativity of agents. Lower is more focused.") | |
max_tokens = gr.Number(value=4096, label="Max Output Tokens", minimum=1000, info="Max length of agent responses (code, feedback, etc.). Recommend 4096+.") | |
with gr.Column(scale=2): | |
project_status_md = gr.Markdown("Waiting for prompt...") | |
chatbot = gr.Chatbot(type="messages", label="Team Communication & Status", show_copy_button=True) | |
user_in = gr.Textbox(placeholder="Describe the application you want to build...", label="Application Requirements", lines=3) | |
send_btn = gr.Button("🚀 Start Development Team") | |
# Separate accordions for logs and preview | |
with gr.Accordion("Logs", open=False): | |
build_box = gr.Textbox(label="Build logs", lines=10, interactive=False, max_lines=20) | |
run_box = gr.Textbox(label="Run logs", lines=10, interactive=False, max_lines=20) | |
# Need login state for refresh button. For a lambda function, | |
# auto-injection doesn't work based on type hints in the same way, | |
# so we explicitly pass the component state. | |
refresh_btn = gr.Button("🔄 Refresh Logs Only") | |
with gr.Accordion("App Preview", open=True): | |
preview = gr.HTML("<p>App preview will load here when available.</p>") | |
# Update the button click handler - inputs match handle_user_message signature | |
send_btn.click( | |
fn=handle_user_message, | |
inputs=[ | |
chatbot, # history | |
user_in, # user_input | |
sdk_choice, | |
api_key, | |
grounding, | |
temp, | |
max_tokens, | |
# profile (auto-injected by LoginButton based on signature) | |
# oauth_token (auto-injected by LoginButton based on signature) | |
], | |
outputs=[chatbot, build_box, run_box, preview, project_status_md] | |
) | |
user_in.submit( | |
fn=handle_user_message, | |
inputs=[ | |
chatbot, | |
user_in, | |
sdk_choice, | |
api_key, | |
grounding, | |
temp, | |
max_tokens, | |
# profile (auto-injected) | |
# oauth_token (auto-injected) | |
], | |
outputs=[chatbot, build_box, run_box, preview, project_status_md] | |
) | |
# Handler for refreshing logs manually | |
# For this lambda, we explicitly pass login_btn state as it's not a | |
# function with OAuth type hints for auto-injection. | |
refresh_btn.click( | |
fn=lambda profile_token_state: ( # Receive the tuple (profile, token) from the login_btn state | |
fetch_logs(f"{profile_token_state[0].username}/{profile_token_state[0].username}-auto-space", "build", profile_token_state[1].token) if profile_token_state and profile_token_state[0] and profile_token_state[1] and profile_token_state[1].token else "Login required to fetch logs.", | |
fetch_logs(f"{profile_token_state[0].username}/{profile_token_state[0].username}-auto-space", "run", profile_token_state[1].token) if profile_token_state and profile_token_state[0] and profile_token_state[1] and profile_token_state[1].token else "Login required to fetch logs." | |
), | |
inputs=[login_btn], # Pass the login_btn component state | |
outputs=[build_box, run_box] | |
) | |
# Clean up files created during the process when the app stops (optional, good for Spaces) | |
# Consider adding more specific cleanup if needed | |
# demo.on_event("close", lambda: [os.remove(f) for f in os.listdir() if os.path.isfile(f) and (f.endswith(".py") or f.endswith(".txt") or f.endswith(".md"))]) # Be careful with this in production | |
demo.launch(server_name="0.0.0.0", server_port=7860) |