Spaces:
Sleeping
Sleeping
import os | |
import re | |
import time | |
import json | |
import io | |
import requests | |
import logging | |
from typing import List, Dict, Any, Tuple, Optional, Literal, Generator | |
import gradio as gr | |
import google.generativeai as genai | |
from google.generativeai import types # Import types for configuration and tools | |
from huggingface_hub import create_repo, list_models, upload_file, constants | |
from huggingface_hub.utils import build_hf_headers, get_session, hf_raise_for_status | |
from requests.adapters import HTTPAdapter | |
from urllib3.util.retry import Retry | |
# --- Configure Logging --- | |
# Replace print() statements with logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
# You could add a file handler here for persistent logs if needed, but console is fine for Spaces | |
# --- Configure Hugging Face API Retries --- | |
# Added retry strategy to make HF API calls more robust to transient errors | |
retry_strategy = Retry(total=5, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504]) # Define retry strategy for specific HTTP codes | |
adapter = HTTPAdapter(max_retries=retry_strategy) | |
session = get_session() # Get the session object used internally by huggingface_hub | |
session.mount("http://", adapter) | |
session.mount("https://", adapter) | |
# --- Define Gemini Model Information --- | |
GEMINI_MODELS = { | |
"gemini-1.5-flash": ("Gemini 1.5 Flash", "Fast and versatile performance across a diverse variety of tasks."), | |
"gemini-1.5-pro": ("Gemini 1.5 Pro", "Complex reasoning tasks requiring more intelligence."), | |
"gemini-1.5-flash-8b": ("Gemini 1.5 Flash 8B", "High volume and lower intelligence tasks."), | |
"gemini-2.0-flash": ("Gemini 2.0 Flash", "Next generation features, speed, thinking, realtime streaming, and multimodal generation."), | |
"gemini-2.0-flash-lite": ("Gemini 2.0 Flash-Lite", "Cost efficiency and low latency."), | |
# Note: Preview models might have shorter lifespans or different capabilities. Uncomment if you want to include them. | |
# "gemini-2.5-flash-preview-04-17": ("Gemini 2.5 Flash Preview (04-17)", "Adaptive thinking, cost efficiency."), | |
# "gemini-2.5-pro-preview-03-25": ("Gemini 2.5 Pro Preview (03-25)", "Enhanced thinking and reasoning, multimodal understanding, advanced coding, and more."), | |
} | |
# Create the list of choices for the Gradio Radio component | |
GEMINI_MODEL_CHOICES = [(display_name, internal_name) for internal_name, (display_name, description) in GEMINI_MODELS.items()] | |
DEFAULT_GEMINI_MODEL = "gemini-1.5-flash" | |
# --- Helper functions for Hugging Face integration --- | |
def show_profile(profile: gr.OAuthProfile | None) -> str: | |
"""Displays the logged-in Hugging Face profile username.""" | |
if profile is None: | |
return "*Not logged in.*" | |
return f"✅ Logged in as **{profile.username}**" | |
# list_private_models function is not used in the main workflow, kept as is. | |
def list_private_models( | |
profile: gr.OAuthProfile | None, | |
oauth_token: gr.OAuthToken | None | |
) -> str: | |
"""Lists private models for the logged-in user (not used in the main workflow, but kept).""" | |
if profile is None or oauth_token is None: | |
return "Please log in to see your models." | |
try: | |
models = [ | |
f"{m.id} ({'private' if m.private else 'public'})" | |
for m in list_models(author=profile.username, token=oauth_token.token) | |
] | |
return "No models found." if not models else "Models:\n\n" + "\n - ".join(models) | |
except Exception as e: | |
logging.error(f"Error listing models: {e}") | |
return f"Error listing models: {e}" | |
def create_space_action(repo_name: str, sdk: str, profile: gr.OAuthProfile, token: gr.OAuthToken) -> Tuple[str, str]: | |
"""Creates a new Hugging Face Space repository.""" | |
if not profile or not token: | |
# This should ideally not happen if button logic is correct, but kept as safeguard | |
raise ValueError("Hugging Face profile or token is missing.") | |
repo_id = f"{profile.username}/{repo_name}" | |
try: | |
logging.info(f"Attempting to create Space: {repo_id} with SDK: {sdk}") | |
create_repo( | |
repo_id=repo_id, | |
token=token.token, | |
exist_ok=True, # Allow creating if it already exists | |
repo_type="space", | |
space_sdk=sdk | |
) | |
url = f"https://huggingface.co/spaces/{repo_id}" | |
iframe = f'<iframe src="{url}" width="100%" height="500px"></iframe>' | |
logging.info(f"Successfully created/verified Space: {repo_id}") | |
return repo_id, iframe | |
except Exception as e: | |
logging.error(f"Failed to create Space {repo_id}: {e}") | |
# Catch specific HTTP errors from huggingface_hub if possible | |
if isinstance(e, requests.exceptions.HTTPError): | |
raise RuntimeError(f"HF API Error creating Space `{repo_id}`: {e.response.status_code} {e.response.reason}") from e | |
raise RuntimeError(f"Failed to create Space `{repo_id}`: {e}") from e # Re-raise as RuntimeError | |
def upload_file_to_space_action( | |
file_obj: io.StringIO, # Specify type hint for clarity | |
path_in_repo: str, | |
repo_id: str, | |
profile: gr.OAuthProfile, | |
token: gr.OAuthToken | |
) -> None: | |
"""Uploads a file to a Hugging Face Space repository.""" | |
if not (profile and token and repo_id): | |
raise ValueError("Hugging Face profile, token, or repo_id is missing.") | |
try: | |
logging.info(f"Attempting to upload file: {path_in_repo} to Space: {repo_id}") | |
upload_file( | |
path_or_fileobj=file_obj, | |
path_in_repo=path_in_repo, | |
repo_id=repo_id, | |
token=token.token, | |
repo_type="space" | |
) | |
logging.info(f"Successfully uploaded file: {path_in_repo} to Space: {repo_id}") | |
except Exception as e: | |
logging.error(f"Failed to upload {path_in_repo} to {repo_id}: {e}") | |
if isinstance(e, requests.exceptions.HTTPError): | |
raise RuntimeError(f"HF API Error uploading {path_in_repo} to `{repo_id}`: {e.response.status_code} {e.response.reason}") from e | |
raise RuntimeError(f"Failed to upload `{path_in_repo}` to `{repo_id}`: {e}") from e | |
def _fetch_space_logs_level(repo_id: str, level: str, token: str) -> str: | |
"""Fetches build or run logs for a Space.""" | |
if not repo_id or not token: | |
logging.warning(f"Cannot fetch {level} logs: repo_id or token missing.") | |
return f"Cannot fetch {level} logs: log in and create a Space first." | |
jwt_url = f"{constants.ENDPOINT}/api/spaces/{repo_id}/jwt" | |
try: | |
logging.info(f"Attempting to fetch {level} logs for Space: {repo_id}") | |
r = get_session().get(jwt_url, headers=build_hf_headers(token=token), timeout=10) # Added timeout | |
hf_raise_for_status(r) # Raise HTTPError for bad responses (4xx or 5xx) | |
jwt = r.json()["token"] | |
logs_url = f"https://api.hf.space/v1/{repo_id}/logs/{level}" | |
lines, count = [], 0 | |
# Using stream=True is good for potentially large logs | |
with get_session().get(logs_url, headers=build_hf_headers(token=jwt), stream=True, timeout=30) as resp: | |
hf_raise_for_status(resp) | |
for raw in resp.iter_lines(): | |
if count >= 200: # Limit output lines to prevent UI overload | |
lines.append("... truncated ...") | |
break | |
if not raw.startswith(b"data: "): # EventStream protocol expected from HF logs API | |
continue | |
payload = raw[len(b"data: "):] | |
try: | |
event = json.loads(payload.decode()) | |
ts = event.get("timestamp", "") | |
txt = event.get("data", "").strip() | |
if txt: | |
lines.append(f"[{ts}] {txt}") | |
count += 1 | |
except json.JSONDecodeError: | |
# Skip lines that aren't valid JSON events | |
logging.warning(f"Skipping non-JSON log line for {repo_id} ({level}): {payload.decode()}") | |
continue | |
log_output = "\n".join(lines) if lines else f"No {level} logs found." | |
logging.info(f"Successfully fetched {count} {level} log lines for {repo_id}") | |
return log_output | |
except Exception as e: | |
logging.error(f"Error fetching {level} logs for {repo_id}: {e}") | |
if isinstance(e, requests.exceptions.HTTPError): | |
return f"Error fetching {level} logs for `{repo_id}`: {e.response.status_code} {e.response.reason}" | |
if isinstance(e, requests.exceptions.Timeout): | |
return f"Timeout fetching {level} logs for `{repo_id}`. Space might be starting slowly." | |
return f"Error fetching {level} logs for `{repo_id}`: {e}" | |
def get_build_logs_action(repo_id, profile, token): | |
"""Action to fetch build logs with a small delay.""" | |
if not (repo_id and profile and token): | |
return "⚠️ Cannot fetch build logs: log in and create a Space first." | |
# Small delay to allow build process to potentially start on HF side | |
time.sleep(5) | |
return _fetch_space_logs_level(repo_id, "build", token.token) | |
def get_container_logs_action(repo_id, profile, token): | |
"""Action to fetch container logs with a delay.""" | |
if not (repo_id and profile and token): | |
return "⚠️ Cannot fetch container logs: log in and create a Space first." | |
# Longer delay to allow container to start after build completes | |
time.sleep(10) | |
return _fetch_space_logs_level(repo_id, "run", token.token) | |
# --- Google Gemini integration with model selection and grounding --- | |
def configure_gemini(api_key: str | None, model_name: str | None) -> str: | |
"""Configures the Gemini API and checks if the model is accessible.""" | |
# Check for empty string "" as well as None | |
if not isinstance(api_key, str) or not api_key.strip(): | |
logging.info("Gemini API key is not set.") | |
return "⚠️ Gemini API key is not set." | |
# Check if model_name is None or not a valid key in GEMINI_MODELS | |
if not model_name or model_name not in GEMINI_MODELS: | |
logging.warning(f"Invalid Gemini model selected: {model_name}") | |
return "⚠️ Please select a valid Gemini model." | |
try: | |
logging.info(f"Attempting to configure Gemini with model: {model_name}") | |
genai.configure(api_key=api_key) | |
# Attempt a simple call to verify credentials and model availability | |
# This will raise an exception if the key is invalid or model not found | |
genai.GenerativeModel(model_name).generate_content("ping", stream=False) | |
# This message indicates the API call *for configuration check* was successful | |
logging.info(f"Gemini configured successfully with model: {model_name}") | |
return f"✅ Gemini configured successfully with **{GEMINI_MODELS[model_name][0]}**." | |
except Exception as e: | |
# This message indicates the API call *for configuration check* failed | |
logging.error(f"Error configuring Gemini with model {model_name}: {e}") | |
# Catch specific Gemini errors if possible (e.g., authentication errors) | |
return f"❌ Error configuring Gemini: {e}" | |
def get_model_description(model_name: str | None) -> str: | |
"""Retrieves the description for a given model name.""" | |
if model_name is None or model_name not in GEMINI_MODELS: | |
return "Select a model to see its description." | |
return GEMINI_MODELS.get(model_name, (model_name, "No description available."))[1] | |
def call_gemini(prompt: str, api_key: str, model_name: str, use_grounding: bool = False) -> str: | |
"""Calls the Gemini API with a given prompt, optionally using grounding.""" | |
# These checks are crucial - they will raise an error *before* the API call if prereqs aren't met | |
if not isinstance(api_key, str) or not api_key.strip(): | |
raise ValueError("Gemini API key is empty or invalid.") | |
if not model_name or model_name not in GEMINI_MODELS: | |
raise ValueError(f"Gemini model '{model_name}' is invalid or not selected.") | |
try: | |
logging.info(f"Calling Gemini model '{model_name}' (Grounding: {use_grounding}) with prompt (first 50 chars): '{prompt[:50]}...'") | |
genai.configure(api_key=api_key) # Re-configure just in case | |
model = genai.GenerativeModel(model_name) | |
tools_config = [types.Tool(google_search=types.GoogleSearch())] if use_grounding else None | |
response = model.generate_content( | |
prompt, | |
stream=False, # Using stream=False for simplicity in this workflow | |
tools=tools_config, | |
request_options={'timeout': 120} # Added timeout for API call | |
) | |
if response.prompt_feedback and response.prompt_feedback.block_reason: | |
logging.warning(f"Gemini API call blocked: {response.prompt_feedback.block_reason}") | |
raise RuntimeError(f"Gemini API call blocked: {response.prompt_feedback.block_reason}") | |
if not response.candidates: | |
if response.prompt_feedback and response.prompt_feedback.safety_ratings: | |
ratings = "; ".join([f"{r.category}: {r.probability}" for r in response.prompt_feedback.safety_ratings]) | |
logging.warning(f"Gemini API call returned no candidates. Safety ratings: {ratings}") | |
raise RuntimeError(f"Gemini API call returned no candidates. Safety ratings: {ratings}") | |
else: | |
logging.warning("Gemini API call returned no candidates.") | |
raise RuntimeError("Gemini API call returned no candidates.") | |
generated_text = response.text or "" | |
logging.info(f"Gemini API call successful. Generated text length: {len(generated_text)}") | |
return generated_text | |
except Exception as e: | |
logging.error(f"Gemini API call failed: {e}") | |
# Re-raising as RuntimeError for the workflow to catch and manage | |
raise RuntimeError(f"Gemini API call failed: {e}") from e | |
# --- AI workflow logic (State Machine) --- | |
# Define States for the workflow using Literal for type safety | |
WorkflowState = Literal[ | |
"idle", "awaiting_repo_name", "creating_space", "generating_code", | |
"uploading_app_py", "generating_requirements", "uploading_requirements", | |
"generating_readme", "uploading_readme", "checking_logs_build", | |
"checking_logs_run", "debugging_code", "uploading_fixed_app_py", "complete" | |
] | |
STATE_IDLE: WorkflowState = "idle" | |
STATE_AWAITING_REPO_NAME: WorkflowState = "awaiting_repo_name" | |
STATE_CREATING_SPACE: WorkflowState = "creating_space" | |
STATE_GENERATING_CODE: WorkflowState = "generating_code" | |
STATE_UPLOADING_APP_PY: WorkflowState = "uploading_app_py" | |
STATE_GENERATING_REQUIREMENTS: WorkflowState = "generating_requirements" | |
STATE_UPLOADING_REQUIREMENTS: WorkflowState = "uploading_requirements" | |
STATE_GENERATING_README: WorkflowState = "generating_readme" | |
STATE_UPLOADING_README: WorkflowState = "uploading_readme" | |
STATE_CHECKING_LOGS_BUILD: WorkflowState = "checking_logs_build" | |
STATE_CHECKING_LOGS_RUN: WorkflowState = "checking_logs_run" | |
STATE_DEBUGGING_CODE: WorkflowState = "debugging_code" | |
STATE_UPLOADING_FIXED_APP_PY: WorkflowState = "uploading_fixed_app_py" | |
STATE_COMPLETE: WorkflowState = "complete" | |
MAX_DEBUG_ATTEMPTS = 3 # Limit the number of automatic debug attempts | |
# Helper function to add a new assistant message to the chatbot history. | |
def add_bot_message(history: list[dict], bot_message: str) -> list[dict]: | |
# Make a copy to avoid modifying history in place if needed later, though generator pattern usually handles this | |
new_history = list(history) | |
new_history.append({"role": "assistant", "content": bot_message}) | |
logging.info(f"Added bot message: {bot_message[:100]}...") | |
return new_history | |
# Add an initial welcome message to the chatbot (defined outside Blocks to be called by load chain) | |
def greet() -> List[Dict[str, str]]: | |
logging.info("Generating initial welcome message.") | |
return [{"role": "assistant", "content": "Welcome! Please log in to Hugging Face and provide your Google AI Studio API key to start building Spaces. Once ready, type 'generate me a gradio app called myapp' or 'create' to begin."}] | |
# Helper function to update send button interactivity based on prereqs | |
# This function has the clean signature it expects. | |
def check_send_button_ready( | |
hf_profile: gr.OAuthProfile | None, | |
hf_token: gr.OAuthToken | None, | |
gemini_key: str | None, | |
gemini_model: str | None, | |
workflow_state: WorkflowState # Also depend on workflow state | |
) -> gr.Button.update: # Use specific component update | |
"""Checks if HF login and Gemini configuration are complete and returns update for button interactivity.""" | |
# Button should NOT be interactive when workflow is running | |
if workflow_state != STATE_IDLE and workflow_state != STATE_AWAITING_REPO_NAME: | |
logging.debug(f"check_send_button_ready: Workflow state is {workflow_state}, disabling button.") | |
return gr.Button.update(interactive=False) | |
is_logged_in = hf_profile is not None and hf_token is not None | |
# Use strip() to handle cases where key is just whitespace | |
is_gemini_ready = isinstance(gemini_key, str) and bool(gemini_key.strip()) and bool(gemini_model) | |
is_ready = is_logged_in and is_gemini_ready | |
logging.debug(f"check_send_button_ready - HF Ready: {is_logged_in}, Gemini Ready: {is_gemini_ready}, Button Ready: {is_ready}") | |
# Button is interactive only in IDLE or AWAITING_REPO_NAME states AND when prereqs are met | |
return gr.Button.update(interactive=is_ready and (workflow_state == STATE_IDLE or workflow_state == STATE_AWAITING_REPO_NAME)) | |
# --- State Handler Functions --- | |
# These functions encapsulate the logic for each state. | |
# They take all necessary inputs from the main generator's arguments | |
# and return the full tuple of outputs required by the generator's yield signature. | |
WorkflowInputs = Tuple[ | |
str, List[Dict[str, str]], Optional[gr.OAuthProfile], Optional[gr.OAuthToken], | |
Optional[str], Optional[str], Optional[str], WorkflowState, str, str, str, str, | |
int, Optional[str], Optional[str], Optional[str], bool | |
] | |
WorkflowOutputs = Tuple[ | |
List[Dict[str, str]], Optional[str], WorkflowState, str, str, str, | |
int, Optional[str], Optional[str], Optional[str], bool, Optional[str], Optional[str] | |
] | |
def package_workflow_outputs( | |
history: List[Dict[str, str]], | |
repo_id: Optional[str], | |
state: WorkflowState, | |
updated_preview: str, | |
updated_run: str, | |
updated_build: str, | |
attempts: int, | |
app_desc: Optional[str], | |
repo_name: Optional[str], | |
generated_code: Optional[str], | |
use_grounding: bool, | |
current_gemini_key: Optional[str], # Explicitly include these | |
current_gemini_model: Optional[str] # Explicitly include these | |
) -> WorkflowOutputs: | |
"""Helper to package all workflow state and UI outputs into the required tuple.""" | |
return (history, repo_id, state, updated_preview, updated_run, updated_build, | |
attempts, app_desc, repo_name, generated_code, use_grounding, | |
current_gemini_key, current_gemini_model) | |
def handle_idle( | |
message: str, | |
history: List[Dict[str, str]], | |
hf_profile: gr.OAuthProfile | None, | |
hf_token: gr.OAuthToken | None, | |
gemini_api_key: str | None, | |
gemini_model: str | None, | |
repo_id: str | None, | |
state: WorkflowState, # Should be STATE_IDLE | |
space_sdk: str, | |
preview_html: str, | |
container_logs: str, | |
build_logs: str, | |
debug_attempts: int, | |
app_description: str | None, | |
repo_name: str | None, | |
generated_code: str | None, | |
use_grounding: bool, | |
*args, # Catch potential extra args | |
**kwargs # Catch potential extra kwargs | |
) -> WorkflowOutputs: | |
"""Handles logic when in the IDLE state.""" | |
logging.info(f"Handling STATE_IDLE with message: {message[:50]}...") | |
reset_match = "reset" in message.lower() | |
generate_match = re.search(r'generate (?:me )?(?:a|an) (.+) app called (\w+)', message, re.I) | |
create_match = re.search(r'create (?:a|an)? space called (\w+)', message, re.I) # Simple create command | |
current_gemini_key = gemini_api_key # Use the input vars directly | |
current_gemini_model = gemini_model | |
if reset_match: | |
logging.info("Reset command received.") | |
history = add_bot_message(history, "Workflow reset.") | |
# Reset relevant states and UI outputs | |
return package_workflow_outputs( | |
history=history, repo_id=None, state=STATE_IDLE, | |
updated_preview="<p>No Space created yet.</p>", updated_run="", updated_build="", | |
attempts=0, app_desc=None, repo_name=None, generated_code=None, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
elif generate_match: | |
logging.info("Generate command received.") | |
new_app_desc = generate_match.group(1).strip() # Capture description part | |
new_repo_name = generate_match.group(2).strip() # Capture name part | |
# Perform basic validation on repo name format | |
if not new_repo_name or re.search(r'[^a-zA-Z0-9_-]', new_repo_name) or len(new_repo_name) > 100: | |
logging.warning(f"Invalid repo name format received: {new_repo_name}") | |
history = add_bot_message(history, "Invalid name. Please provide a single word/slug for the Space name (letters, numbers, underscores, hyphens only, max 100 chars).") | |
# Stay in IDLE and yield message | |
return package_workflow_outputs( | |
history=history, repo_id=repo_id, state=STATE_IDLE, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=debug_attempts, app_desc=app_description, repo_name=repo_name, generated_code=generated_code, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
history = add_bot_message(history, f"Acknowledged: '{message}'. Starting workflow to create Space `{hf_profile.username}/{new_repo_name}` for a '{new_app_desc}' app.") | |
logging.info(f"Transitioning to STATE_CREATING_SPACE for repo '{new_repo_name}' and description '{new_app_desc}'") | |
# Update state variables for the next step | |
return package_workflow_outputs( | |
history=history, repo_id=repo_id, state=STATE_CREATING_SPACE, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=0, app_desc=new_app_desc, repo_name=new_repo_name, generated_code=None, # Reset attempts and generated_code | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
elif create_match: | |
logging.info("Simple create command received.") | |
new_repo_name = create_match.group(1).strip() | |
# Perform basic validation on repo name format | |
if not new_repo_name or re.search(r'[^a-zA-Z0-9_-]', new_repo_name) or len(new_repo_name) > 100: | |
logging.warning(f"Invalid repo name format received: {new_repo_name}") | |
history = add_bot_message(history, "Invalid name. Please provide a single word/slug for the Space name (letters, numbers, underscores, hyphens only, max 100 chars).") | |
# Stay in IDLE and yield message | |
return package_workflow_outputs( | |
history=history, repo_id=repo_id, state=STATE_IDLE, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=debug_attempts, app_desc=app_description, repo_name=repo_name, generated_code=generated_code, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
history = add_bot_message(history, f"Acknowledged: '{message}'. Starting workflow to create Space `{hf_profile.username}/{new_repo_name}`.") | |
logging.info(f"Transitioning to STATE_CREATING_SPACE for repo '{new_repo_name}'") | |
return package_workflow_outputs( | |
history=history, repo_id=repo_id, state=STATE_CREATING_SPACE, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=0, app_desc=app_description, repo_name=new_repo_name, generated_code=None, # Reset attempts and generated_code | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
elif "create" in message.lower() and not repo_id: | |
logging.info("Create command without name received.") | |
history = add_bot_message(history, "Okay, what should the Space be called? (e.g., `my-awesome-app`)") | |
logging.info("Transitioning to STATE_AWAITING_REPO_NAME") | |
return package_workflow_outputs( | |
history=history, repo_id=repo_id, state=STATE_AWAITING_REPO_NAME, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=debug_attempts, app_desc=app_description, repo_name=repo_name, generated_code=generated_code, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
else: | |
logging.info("Command not recognized in IDLE state.") | |
history = add_bot_message(history, "Command not recognized. Try 'generate me a gradio app called myapp', or 'reset'.") | |
# Stay in IDLE state | |
return package_workflow_outputs( | |
history=history, repo_id=repo_id, state=STATE_IDLE, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=debug_attempts, app_desc=app_description, repo_name=repo_name, generated_code=generated_code, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
def handle_awaiting_repo_name( | |
message: str, | |
history: List[Dict[str, str]], | |
hf_profile: gr.OAuthProfile | None, | |
hf_token: gr.OAuthToken | None, | |
gemini_api_key: str | None, | |
gemini_model: str | None, | |
repo_id: str | None, | |
state: WorkflowState, # Should be STATE_AWAITING_REPO_NAME | |
space_sdk: str, | |
preview_html: str, | |
container_logs: str, | |
build_logs: str, | |
debug_attempts: int, | |
app_description: str | None, | |
repo_name: str | None, | |
generated_code: str | None, | |
use_grounding: bool, | |
*args, | |
**kwargs | |
) -> WorkflowOutputs: | |
"""Handles logic when in the AWAITING_REPO_NAME state.""" | |
logging.info(f"Handling STATE_AWAITING_REPO_NAME with message: {message[:50]}...") | |
current_gemini_key = gemini_api_key # Use the input vars directly | |
current_gemini_model = gemini_model | |
new_repo_name = message.strip() | |
# Basic validation for Hugging Face repo name format | |
# Allow letters, numbers, hyphens, underscores, max 100 chars (HF limit check) | |
if not new_repo_name or re.search(r'[^a-zA-Z0-9_-]', new_repo_name) or len(new_repo_name) > 100: | |
logging.warning(f"Invalid repo name format received while awaiting name: {new_repo_name}") | |
history = add_bot_message(history, "Invalid name. Please provide a single word/slug for the Space name (letters, numbers, underscores, hyphens only, max 100 chars).") | |
# Stay in AWAITING_REPO_NAME state | |
return package_workflow_outputs( | |
history=history, repo_id=repo_id, state=STATE_AWAITING_REPO_NAME, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=debug_attempts, app_desc=app_description, repo_name=repo_name, generated_code=generated_code, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
else: | |
history = add_bot_message(history, f"Using Space name `{new_repo_name}`. Creating Space `{hf_profile.username}/{new_repo_name}`...") | |
logging.info(f"Validated repo name '{new_repo_name}'. Transitioning to STATE_CREATING_SPACE.") | |
# Transition state to creation | |
return package_workflow_outputs( | |
history=history, repo_id=repo_id, state=STATE_CREATING_SPACE, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=0, app_desc=app_description, repo_name=new_repo_name, generated_code=None, # Reset attempts and generated_code | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
def handle_creating_space( | |
message: str, | |
history: List[Dict[str, str]], | |
hf_profile: gr.OAuthProfile | None, | |
hf_token: gr.OAuthToken | None, | |
gemini_api_key: str | None, | |
gemini_model: str | None, | |
repo_id: str | None, | |
state: WorkflowState, # Should be STATE_CREATING_SPACE | |
space_sdk: str, | |
preview_html: str, | |
container_logs: str, | |
build_logs: str, | |
debug_attempts: int, | |
app_description: str | None, | |
repo_name: str | None, | |
generated_code: str | None, | |
use_grounding: bool, | |
*args, | |
**kwargs | |
) -> WorkflowOutputs: | |
"""Handles logic when in the CREATING_SPACE state.""" | |
logging.info(f"Handling STATE_CREATING_SPACE for repo '{repo_name}'") | |
current_gemini_key = gemini_api_key # Use the input vars directly | |
current_gemini_model = gemini_model | |
# Ensure repo_name is available (it should have been set in the previous step) | |
if not repo_name: | |
logging.error("Internal error: Repo name missing in STATE_CREATING_SPACE. Resetting.") | |
history = add_bot_message(history, "Internal error: Repo name missing for creation. Resetting.") | |
# Reset state on error | |
return package_workflow_outputs( | |
history=history, repo_id=None, state=STATE_IDLE, | |
updated_preview="<p>Error creating space.</p>", updated_run="", updated_build="", | |
attempts=0, app_desc=None, repo_name=None, generated_code=None, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
else: | |
try: | |
new_repo_id, iframe_html = create_space_action(repo_name, space_sdk, hf_profile, hf_token) | |
history = add_bot_message(history, f"✅ Space `{new_repo_id}` created. Click 'Send' to generate and upload code.") | |
logging.info(f"Space '{new_repo_id}' created. Transitioning to STATE_GENERATING_CODE.") | |
# Update state variables for the next step (generation) | |
return package_workflow_outputs( | |
history=history, repo_id=new_repo_id, state=STATE_GENERATING_CODE, | |
updated_preview=iframe_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=debug_attempts, app_desc=app_description, repo_name=repo_name, generated_code=generated_code, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
except RuntimeError as e: # Catch specific RuntimeErrors raised by actions | |
logging.error(f"Caught RuntimeError creating space: {e}") | |
history = add_bot_message(history, f"❌ Error creating space: {e}. Click 'reset'.") | |
# Reset state on error | |
return package_workflow_outputs( | |
history=history, repo_id=None, state=STATE_IDLE, | |
updated_preview="<p>Error creating space.</p>", updated_run="", updated_build="", | |
attempts=0, app_desc=None, repo_name=None, generated_code=None, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
def handle_generating_code( | |
message: str, | |
history: List[Dict[str, str]], | |
hf_profile: gr.OAuthProfile | None, | |
hf_token: gr.OAuthToken | None, | |
gemini_api_key: str | None, | |
gemini_model: str | None, | |
repo_id: str | None, | |
state: WorkflowState, # Should be STATE_GENERATING_CODE | |
space_sdk: str, | |
preview_html: str, | |
container_logs: str, | |
build_logs: str, | |
debug_attempts: int, | |
app_description: str | None, | |
repo_name: str | None, | |
generated_code: str | None, | |
use_grounding: bool, | |
*args, | |
**kwargs | |
) -> Generator[WorkflowOutputs, None, WorkflowOutputs]: # Specify return type as Generator | |
"""Handles logic when in the GENERATING_CODE state.""" | |
logging.info("Handling STATE_GENERATING_CODE") | |
current_gemini_key = gemini_api_key # Use the input vars directly | |
current_gemini_model = gemini_model | |
# Define the prompt for Gemini based on the app description or a default | |
prompt_desc = app_description if app_description else f'a simple {space_sdk} app' | |
prompt = f""" | |
You are an AI assistant specializing in Hugging Face Spaces using the {space_sdk} SDK. | |
Generate a full, single-file Python app based on: | |
'{prompt_desc}' | |
Ensure the code is runnable as `app.py` in a Hugging Face Space using the `{space_sdk}` SDK. Include necessary imports and setup. | |
Return **only** the python code block for `app.py`. Do not include any extra text, explanations, or markdown outside the code block. | |
""" | |
try: | |
history = add_bot_message(history, f"🧠 Generating `{prompt_desc}` `{space_sdk}` app (`app.py`) code with Gemini...") | |
if use_grounding: | |
history = add_bot_message(history, "(Using Grounding with Google Search)") | |
# Yield message before API call to show immediate feedback | |
# Use package_workflow_outputs to construct the tuple | |
yield package_workflow_outputs( | |
history=history, repo_id=repo_id, state=state, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=debug_attempts, app_desc=app_description, repo_name=repo_name, generated_code=generated_code, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
code = call_gemini(prompt, current_gemini_key, current_gemini_model, use_grounding=use_grounding) | |
code = code.strip() | |
# Clean up markdown code blocks | |
code = re.sub(r'^```python\s*', '', code, flags=re.MULTILINE).strip() | |
code = re.sub(r'^```\s*', '', code, flags=re.MULTILINE).strip() # Catch generic code blocks too | |
code = re.sub(r'\s*```$', '', code, flags=re.MULTILINE).strip() | |
if not code: | |
logging.warning("Gemini returned empty code.") | |
raise ValueError("Gemini returned empty code.") | |
history = add_bot_message(history, "✅ `app.py` code generated. Click 'Send' to upload.") | |
logging.info("Code generated. Transitioning to STATE_UPLOADING_APP_PY.") | |
# Transition state and store generated code | |
return package_workflow_outputs( | |
history=history, repo_id=repo_id, state=STATE_UPLOADING_APP_PY, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=debug_attempts, app_desc=app_description, repo_name=repo_name, generated_code=code, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
except RuntimeError as e: # Catch specific RuntimeErrors from call_gemini | |
logging.error(f"Caught RuntimeError generating code: {e}") | |
history = add_bot_message(history, f"❌ Error generating code: {e}. Click 'reset'.") | |
# Reset state on error | |
return package_workflow_outputs( | |
history=history, repo_id=None, state=STATE_IDLE, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=0, app_desc=None, repo_name=None, generated_code=None, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
def handle_uploading_app_py( | |
message: str, | |
history: List[Dict[str, str]], | |
hf_profile: gr.OAuthProfile | None, | |
hf_token: gr.OAuthToken | None, | |
gemini_api_key: str | None, | |
gemini_model: str | None, | |
repo_id: str | None, | |
state: WorkflowState, # Should be STATE_UPLOADING_APP_PY | |
space_sdk: str, | |
preview_html: str, | |
container_logs: str, | |
build_logs: str, | |
debug_attempts: int, | |
app_description: str | None, | |
repo_name: str | None, | |
generated_code: str | None, # This should hold the code to upload | |
use_grounding: bool, | |
*args, | |
**kwargs | |
) -> Generator[WorkflowOutputs, None, WorkflowOutputs]: # Specify return type as Generator | |
"""Handles logic when in the UPLOADING_APP_PY state.""" | |
logging.info("Handling STATE_UPLOADING_APP_PY") | |
current_gemini_key = gemini_api_key # Use the input vars directly | |
current_gemini_model = gemini_model | |
# Retrieve the generated code from the state variable | |
code_to_upload = generated_code | |
if not code_to_upload: | |
logging.error("Internal error: No code to upload in STATE_UPLOADING_APP_PY. Resetting.") | |
history = add_bot_message(history, "Internal error: No code to upload. Resetting.") | |
return package_workflow_outputs( | |
history=history, repo_id=None, state=STATE_IDLE, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=0, app_desc=None, repo_name=None, generated_code=None, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
else: | |
history = add_bot_message(history, "☁️ Uploading `app.py`...") | |
# Yield message before upload | |
yield package_workflow_outputs( | |
history=history, repo_id=repo_id, state=state, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=debug_attempts, app_desc=app_description, repo_name=repo_name, generated_code=generated_code, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
try: | |
upload_file_to_space_action(io.StringIO(code_to_upload), "app.py", repo_id, hf_profile, hf_token) | |
history = add_bot_message(history, "✅ Uploaded `app.py`. Click 'Send' to generate requirements.") | |
logging.info("app.py uploaded. Transitioning to STATE_GENERATING_REQUIREMENTS.") | |
# Transition state, clear generated code after use | |
return package_workflow_outputs( | |
history=history, repo_id=repo_id, state=STATE_GENERATING_REQUIREMENTS, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=debug_attempts, app_desc=app_description, repo_name=repo_name, generated_code=None, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
except RuntimeError as e: # Catch specific RuntimeErrors from upload_file_to_space_action | |
logging.error(f"Caught RuntimeError uploading app.py: {e}") | |
history = add_bot_message(history, f"❌ Error uploading `app.py`: {e}. Click 'reset'.") | |
# Reset state on error | |
return package_workflow_outputs( | |
history=history, repo_id=None, state=STATE_IDLE, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=0, app_desc=None, repo_name=None, generated_code=None, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
def handle_generating_requirements( | |
message: str, | |
history: List[Dict[str, str]], | |
hf_profile: gr.OAuthProfile | None, | |
hf_token: gr.OAuthToken | None, | |
gemini_api_key: str | None, | |
gemini_model: str | None, | |
repo_id: str | None, | |
state: WorkflowState, # Should be STATE_GENERATING_REQUIREMENTS | |
space_sdk: str, | |
preview_html: str, | |
container_logs: str, | |
build_logs: str, | |
debug_attempts: int, | |
app_description: str | None, | |
repo_name: str | None, | |
generated_code: str | None, | |
use_grounding: bool, | |
*args, | |
**kwargs | |
) -> Generator[WorkflowOutputs, None, WorkflowOutputs]: # Specify return type as Generator | |
"""Handles logic when in the GENERATING_REQUIREMENTS state.""" | |
logging.info("Handling STATE_GENERATING_REQUIREMENTS") | |
current_gemini_key = gemini_api_key # Use the input vars directly | |
current_gemini_model = gemini_model | |
history = add_bot_message(history, "📄 Generating `requirements.txt`...") | |
# Yield message before generating requirements | |
yield package_workflow_outputs( | |
history=history, repo_id=repo_id, state=state, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=debug_attempts, app_desc=app_description, repo_name=repo_name, generated_code=generated_code, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
# Logic to determine required packages based on SDK and keywords in the app description | |
reqs_list = ["gradio"] if space_sdk == "gradio" else ["streamlit"] | |
# Add essential libraries regardless of description keywords or grounding | |
essential_libs = ["google-generativeai", "huggingface_hub"] | |
reqs_list.extend(essential_libs) | |
# Add common libraries if description suggests they might be needed | |
if app_description: | |
app_desc_lower = app_description.lower() | |
if "requests" in app_desc_lower or "api" in app_desc_lower: | |
reqs_list.append("requests") | |
if "image" in app_desc_lower or "upload" in app_desc_lower or "blur" in app_desc_lower or "vision" in app_desc_lower or "photo" in app_desc_lower: | |
reqs_list.append("Pillow") | |
if "numpy" in app_desc_lower: reqs_list.append("numpy") | |
if "pandas" in app_desc_lower or "dataframe" in app_desc_lower: reqs_list.append("pandas") | |
if any(lib in app_desc_lower for lib in ["scikit-image", "skimage", "cv2", "opencv"]): | |
reqs_list.extend(["scikit-image", "opencv-python"]) | |
if any(lib in app_desc_lower for lib in ["transformer", "llama", "mistral", "bert", "gpt2"]): | |
reqs_list.append("transformers") | |
if any(lib in app_desc_lower for lib in ["torch", "pytorch", "tensorflow", "keras"]): | |
reqs_list.extend(["torch", "tensorflow"]) # Consider adding specific hardware versions if needed | |
# Use dict.fromkeys to get unique items while preserving insertion order (Python 3.7+) | |
reqs_list = list(dict.fromkeys(reqs_list)) | |
# Sort alphabetically for cleaner requirements.txt | |
reqs_list.sort() | |
reqs_content = "\n".join(reqs_list) + "\n" | |
history = add_bot_message(history, "✅ `requirements.txt` generated. Click 'Send' to upload.") | |
logging.info("requirements.txt generated. Transitioning to STATE_UPLOADING_REQUIREMENTS.") | |
# Transition state and store requirements content | |
return package_workflow_outputs( | |
history=history, repo_id=repo_id, state=STATE_UPLOADING_REQUIREMENTS, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=debug_attempts, app_desc=app_description, repo_name=repo_name, generated_code=reqs_content, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
def handle_uploading_requirements( | |
message: str, | |
history: List[Dict[str, str]], | |
hf_profile: gr.OAuthProfile | None, | |
hf_token: gr.OAuthToken | None, | |
gemini_api_key: str | None, | |
gemini_model: str | None, | |
repo_id: str | None, | |
state: WorkflowState, # Should be STATE_UPLOADING_REQUIREMENTS | |
space_sdk: str, | |
preview_html: str, | |
container_logs: str, | |
build_logs: str, | |
debug_attempts: int, | |
app_description: str | None, | |
repo_name: str | None, | |
generated_code: str | None, # This should hold the requirements content | |
use_grounding: bool, | |
*args, | |
**kwargs | |
) -> Generator[WorkflowOutputs, None, WorkflowOutputs]: # Specify return type as Generator | |
"""Handles logic when in the UPLOADING_REQUIREMENTS state.""" | |
logging.info("Handling STATE_UPLOADING_REQUIREMENTS") | |
current_gemini_key = gemini_api_key # Use the input vars directly | |
current_gemini_model = gemini_model | |
# Retrieve requirements content from state variable | |
reqs_content_to_upload = generated_code | |
if not reqs_content_to_upload: | |
logging.error("Internal error: No requirements content to upload in STATE_UPLOADING_REQUIREMENTS. Resetting.") | |
history = add_bot_message(history, "Internal error: No requirements content to upload. Resetting.") | |
return package_workflow_outputs( | |
history=history, repo_id=None, state=STATE_IDLE, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=0, app_desc=None, repo_name=None, generated_code=None, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
else: | |
history = add_bot_message(history, "☁️ Uploading `requirements.txt`...") | |
# Yield message before upload | |
yield package_workflow_outputs( | |
history=history, repo_id=repo_id, state=state, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=debug_attempts, app_desc=app_description, repo_name=repo_name, generated_code=generated_code, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
try: | |
# Perform requirements file upload | |
upload_file_to_space_action(io.StringIO(reqs_content_to_upload), "requirements.txt", repo_id, hf_profile, hf_token) | |
history = add_bot_message(history, "✅ Uploaded `requirements.txt`. Click 'Send' to generate README.") | |
logging.info("requirements.txt uploaded. Transitioning to STATE_GENERATING_README.") | |
# Transition state, clear generated code after use | |
return package_workflow_outputs( | |
history=history, repo_id=repo_id, state=STATE_GENERATING_README, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=debug_attempts, app_desc=app_description, repo_name=repo_name, generated_code=None, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
except RuntimeError as e: # Catch specific RuntimeErrors | |
logging.error(f"Caught RuntimeError uploading requirements.txt: {e}") | |
history = add_bot_message(history, f"❌ Error uploading `requirements.txt`: {e}. Click 'reset'.") | |
# Yield error message and reset state on failure | |
return package_workflow_outputs( | |
history=history, repo_id=None, state=STATE_IDLE, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=0, app_desc=None, repo_name=None, generated_code=None, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
def handle_generating_readme( | |
message: str, | |
history: List[Dict[str, str]], | |
hf_profile: gr.OAuthProfile | None, | |
hf_token: gr.OAuthToken | None, | |
gemini_api_key: str | None, | |
gemini_model: str | None, | |
repo_id: str | None, | |
state: WorkflowState, # Should be STATE_GENERATING_README | |
space_sdk: str, | |
preview_html: str, | |
container_logs: str, | |
build_logs: str, | |
debug_attempts: int, | |
app_description: str | None, | |
repo_name: str | None, | |
generated_code: str | None, | |
use_grounding: bool, | |
*args, | |
**kwargs | |
) -> Generator[WorkflowOutputs, None, WorkflowOutputs]: # Specify return type as Generator | |
"""Handles logic when in the GENERATING_README state.""" | |
logging.info("Handling STATE_GENERATING_README") | |
current_gemini_key = gemini_api_key # Use the input vars directly | |
current_gemini_model = gemini_model | |
history = add_bot_message(history, "📝 Generating `README.md`...") | |
# Yield message before generating README | |
yield package_workflow_outputs( | |
history=history, repo_id=repo_id, state=state, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=debug_attempts, app_desc=app_description, repo_name=repo_name, generated_code=generated_code, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
# Generate simple README content with Space metadata header | |
readme_title = repo_name if repo_name else "My Awesome Space" | |
readme_description = app_description if app_description else f"This Hugging Face Space hosts an AI-generated {space_sdk} application." | |
readme_content = f"""--- | |
title: {readme_title} | |
emoji: 🚀 | |
colorFrom: blue | |
colorTo: yellow | |
sdk: {space_sdk} | |
app_file: app.py | |
pinned: false | |
--- | |
# {readme_title} | |
{readme_description} | |
This Space was automatically generated by an AI workflow using Google Gemini and Gradio. | |
""" | |
history = add_bot_message(history, "✅ `README.md` generated. Click 'Send' to upload.") | |
logging.info("README.md generated. Transitioning to STATE_UPLOADING_README.") | |
# Transition state and store README content | |
return package_workflow_outputs( | |
history=history, repo_id=repo_id, state=STATE_UPLOADING_README, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=debug_attempts, app_desc=app_description, repo_name=repo_name, generated_code=readme_content, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
def handle_uploading_readme( | |
message: str, | |
history: List[Dict[str, str]], | |
hf_profile: gr.OAuthProfile | None, | |
hf_token: gr.OAuthToken | None, | |
gemini_api_key: str | None, | |
gemini_model: str | None, | |
repo_id: str | None, | |
state: WorkflowState, # Should be STATE_UPLOADING_README | |
space_sdk: str, | |
preview_html: str, | |
container_logs: str, | |
build_logs: str, | |
debug_attempts: int, | |
app_description: str | None, | |
repo_name: str | None, | |
generated_code: str | None, # This should hold the README content | |
use_grounding: bool, | |
*args, | |
**kwargs | |
) -> Generator[WorkflowOutputs, None, WorkflowOutputs]: # Specify return type as Generator | |
"""Handles logic when in the UPLOADING_README state.""" | |
logging.info("Handling STATE_UPLOADING_README") | |
current_gemini_key = gemini_api_key # Use the input vars directly | |
current_gemini_model = gemini_model | |
# Retrieve README content from state variable | |
readme_content_to_upload = generated_code | |
if not readme_content_to_upload: | |
logging.error("Internal error: No README content to upload in STATE_UPLOADING_README. Resetting.") | |
history = add_bot_message(history, "Internal error: No README content to upload. Resetting.") | |
return package_workflow_outputs( | |
history=history, repo_id=None, state=STATE_IDLE, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=0, app_desc=None, repo_name=None, generated_code=None, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
else: | |
history = add_bot_message(history, "☁️ Uploading `README.md`...") | |
# Yield message before upload | |
yield package_workflow_outputs( | |
history=history, repo_id=repo_id, state=state, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=debug_attempts, app_desc=app_description, repo_name=repo_name, generated_code=generated_code, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
try: | |
# Perform README file upload | |
upload_file_to_space_action(io.StringIO(readme_content_to_upload), "README.md", repo_id, hf_profile, hf_token) | |
history = add_bot_message(history, "✅ Uploaded `README.md`. All files uploaded. Space is now building. Click 'Send' to check build logs.") | |
logging.info("README.md uploaded. Transitioning to STATE_CHECKING_LOGS_BUILD.") | |
# Transition to checking build logs, clear content after use | |
return package_workflow_outputs( | |
history=history, repo_id=repo_id, state=STATE_CHECKING_LOGS_BUILD, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=debug_attempts, app_desc=app_description, repo_name=repo_name, generated_code=None, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
except RuntimeError as e: # Catch specific RuntimeErrors | |
logging.error(f"Caught RuntimeError uploading README.md: {e}") | |
history = add_bot_message(history, f"❌ Error uploading `README.md`: {e}. Click 'reset'.") | |
# Yield error message and reset state on failure | |
return package_workflow_outputs( | |
history=history, repo_id=None, state=STATE_IDLE, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=0, app_desc=None, repo_name=None, generated_code=None, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
def handle_checking_logs_build( | |
message: str, | |
history: List[Dict[str, str]], | |
hf_profile: gr.OAuthProfile | None, | |
hf_token: gr.OAuthToken | None, | |
gemini_api_key: str | None, | |
gemini_model: str | None, | |
repo_id: str | None, | |
state: WorkflowState, # Should be STATE_CHECKING_LOGS_BUILD | |
space_sdk: str, | |
preview_html: str, | |
container_logs: str, # Current UI value | |
build_logs: str, # Current UI value | |
debug_attempts: int, | |
app_description: str | None, | |
repo_name: str | None, | |
generated_code: str | None, | |
use_grounding: bool, | |
*args, | |
**kwargs | |
) -> Generator[WorkflowOutputs, None, WorkflowOutputs]: # Specify return type as Generator | |
"""Handles logic when in the CHECKING_LOGS_BUILD state.""" | |
logging.info(f"Handling STATE_CHECKING_LOGS_BUILD for repo '{repo_id}'") | |
current_gemini_key = gemini_api_key # Use the input vars directly | |
current_gemini_model = gemini_model | |
history = add_bot_message(history, "🔍 Fetching build logs...") | |
# Yield message before fetching logs (which includes a delay) | |
yield package_workflow_outputs( | |
history=history, repo_id=repo_id, state=state, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=debug_attempts, app_desc=app_description, repo_name=repo_name, generated_code=generated_code, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
# Fetch build logs from HF Space | |
build_logs_text = get_build_logs_action(repo_id, hf_profile, hf_token) | |
updated_build = build_logs_text # Update the logs display variable | |
# Simple check for common error indicators in logs (case-insensitive) | |
if "error" in updated_build.lower() or "exception" in updated_build.lower() or "build failed" in updated_build.lower(): | |
logging.warning("Build logs indicate potential issues.") | |
history = add_bot_message(history, "⚠️ Build logs indicate potential issues. Please inspect above. Click 'Send' to check container logs (app might still start despite build warnings).") | |
state = STATE_CHECKING_LOGS_RUN # Transition even on build error, to see if container starts | |
logging.info("Build logs show issues. Transitioning to STATE_CHECKING_LOGS_RUN.") | |
# Yield updated state, logs, and variables | |
return package_workflow_outputs( | |
history=history, repo_id=repo_id, state=state, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=updated_build, # Updated build logs | |
attempts=debug_attempts, app_desc=app_description, repo_name=repo_name, generated_code=generated_code, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
else: | |
logging.info("Build logs appear clean.") | |
history = add_bot_message(history, "✅ Build logs fetched. Click 'Send' to check container logs.") | |
state = STATE_CHECKING_LOGS_RUN # Transition to next log check | |
logging.info("Transitioning to STATE_CHECKING_LOGS_RUN.") | |
# Yield updated state, logs, and variables | |
return package_workflow_outputs( | |
history=history, repo_id=repo_id, state=state, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=updated_build, # Updated build logs | |
attempts=debug_attempts, app_desc=app_description, repo_name=repo_name, generated_code=generated_code, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
def handle_checking_logs_run( | |
message: str, | |
history: List[Dict[str, str]], | |
hf_profile: gr.OAuthProfile | None, | |
hf_token: gr.OAuthToken | None, | |
gemini_api_key: str | None, | |
gemini_model: str | None, | |
repo_id: str | None, | |
state: WorkflowState, # Should be STATE_CHECKING_LOGS_RUN | |
space_sdk: str, | |
preview_html: str, | |
container_logs: str, # Current UI value | |
build_logs: str, # Current UI value | |
debug_attempts: int, | |
app_description: str | None, | |
repo_name: str | None, | |
generated_code: str | None, | |
use_grounding: bool, | |
*args, | |
**kwargs | |
) -> Generator[WorkflowOutputs, None, WorkflowOutputs]: # Specify return type as Generator | |
"""Handles logic when in the CHECKING_LOGS_RUN state.""" | |
logging.info(f"Handling STATE_CHECKING_LOGS_RUN for repo '{repo_id}'") | |
current_gemini_key = gemini_api_key # Use the input vars directly | |
current_gemini_model = gemini_model | |
history = add_bot_message(history, "🔍 Fetching container logs...") | |
# Yield message before fetching logs (includes a delay) | |
yield package_workflow_outputs( | |
history=history, repo_id=repo_id, state=state, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=debug_attempts, app_desc=app_description, repo_name=repo_name, generated_code=generated_code, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
# Fetch container logs from HF Space | |
container_logs_text = get_container_logs_action(repo_id, hf_profile, hf_token) | |
updated_run = container_logs_text # Update the logs display variable | |
# Check for errors in run logs and if we have debug attempts left | |
if ("error" in updated_run.lower() or "exception" in updated_run.lower()) and debug_attempts < MAX_DEBUG_ATTEMPTS: | |
new_attempts = debug_attempts + 1 # Increment debug attempts counter | |
logging.warning(f"Errors detected in container logs. Attempting debug fix #{new_attempts}.") | |
history = add_bot_message(history, f"❌ Errors detected in container logs. Attempting debug fix #{new_attempts}/{MAX_DEBUG_ATTEMPTS}. Click 'Send' to proceed.") | |
state = STATE_DEBUGGING_CODE # Transition to the debugging state | |
logging.info("Transitioning to STATE_DEBUGGING_CODE.") | |
# Yield updated state, logs, attempts, and variables | |
return package_workflow_outputs( | |
history=history, repo_id=repo_id, state=state, | |
updated_preview=preview_html, updated_run=updated_run, updated_build=build_logs, # Updated run logs | |
attempts=new_attempts, app_desc=app_description, repo_name=repo_name, generated_code=generated_code, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
elif ("error" in updated_run.lower() or "exception" in updated_run.lower()) and debug_attempts >= MAX_DEBUG_ATTEMPTS: | |
# Max debug attempts reached | |
logging.error(f"Errors detected in container logs. Max debug attempts ({MAX_DEBUG_ATTEMPTS}) reached.") | |
history = add_bot_message(history, f"❌ Errors detected in container logs. Max debug attempts ({MAX_DEBUG_ATTEMPTS}) reached. Please inspect logs manually or click 'reset'.") | |
state = STATE_COMPLETE # Workflow ends on failure after attempts | |
logging.info("Max debug attempts reached. Transitioning to STATE_COMPLETE.") | |
# Yield updated state, logs, attempts, and variables | |
return package_workflow_outputs( | |
history=history, repo_id=repo_id, state=state, | |
updated_preview=preview_html, updated_run=updated_run, updated_build=build_logs, # Updated run logs | |
attempts=debug_attempts, app_desc=app_description, repo_name=repo_name, generated_code=generated_code, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
else: | |
# No significant errors found in logs, assume success | |
logging.info("No significant errors found in run logs.") | |
history = add_bot_message(history, "✅ App appears to be running successfully! Check the iframe above. Click 'reset' to start a new project.") | |
state = STATE_COMPLETE # Workflow ends on success | |
logging.info("Transitioning to STATE_COMPLETE.") | |
# Yield updated state, logs, attempts, and variables | |
return package_workflow_outputs( | |
history=history, repo_id=repo_id, state=state, | |
updated_preview=preview_html, updated_run=updated_run, updated_build=build_logs, # Updated run logs | |
attempts=debug_attempts, app_desc=app_description, repo_name=repo_name, generated_code=generated_code, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
def handle_debugging_code( | |
message: str, | |
history: List[Dict[str, str]], | |
hf_profile: gr.OAuthProfile | None, | |
hf_token: gr.OAuthToken | None, | |
gemini_api_key: str | None, | |
gemini_model: str | None, | |
repo_id: str | None, | |
state: WorkflowState, # Should be STATE_DEBUGGING_CODE | |
space_sdk: str, | |
preview_html: str, | |
container_logs: str, # Current UI value (contains logs to debug from) | |
build_logs: str, | |
debug_attempts: int, | |
app_description: str | None, | |
repo_name: str | None, | |
generated_code: str | None, | |
use_grounding: bool, | |
*args, | |
**kwargs | |
) -> Generator[WorkflowOutputs, None, WorkflowOutputs]: # Specify return type as Generator | |
"""Handles logic when in the DEBUGGING_CODE state.""" | |
logging.info(f"Handling STATE_DEBUGGING_CODE (attempt #{debug_attempts}) for repo '{repo_id}'") | |
current_gemini_key = gemini_api_key # Use the input vars directly | |
current_gemini_model = gemini_model | |
history = add_bot_message(history, f"🧠 Calling Gemini to generate fix based on logs...") | |
if use_grounding: | |
history = add_bot_message(history, "(Using Grounding with Google Search)") | |
# Yield message before Gemini API call | |
yield package_workflow_outputs( | |
history=history, repo_id=repo_id, state=state, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=debug_attempts, app_desc=app_description, repo_name=repo_name, generated_code=generated_code, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
# Construct prompt for Gemini including the container logs | |
debug_prompt = f""" | |
You are debugging a {space_sdk} Space. The goal is to fix the code in `app.py` based on the container logs provided. | |
Here are the container logs: | |
Use code with caution. | |
Python | |
{container_logs} | |
Generate the *complete, fixed* content for `app.py` based on these logs. | |
Return **only** the python code block for app.py. Do not include any extra text, explanations, or markdown outside the code block. | |
""" | |
try: | |
# Call Gemini to generate the corrected code, optionally using grounding | |
# Note: Grounding might be less effective for debugging based *only* on logs, | |
# but we include the option as requested. | |
# Use the current_gemini_key and current_gemini_model derived from state inputs | |
fix_code = call_gemini(debug_prompt, current_gemini_key, current_gemini_model, use_grounding=use_grounding) | |
fix_code = fix_code.strip() | |
# Clean up potential markdown formatting | |
fix_code = re.sub(r'^```python\s*', '', fix_code, flags=re.MULTILINE).strip() | |
fix_code = re.sub(r'^```\s*', '', fix_code, flags=re.MULTILINE).strip() | |
fix_code = re.sub(r'\s*```$', '', fix_code, flags=re.MULTILINE).strip() | |
if not fix_code: | |
logging.warning("Gemini returned empty fix code.") | |
raise ValueError("Gemini returned empty fix code.") | |
history = add_bot_message(history, "✅ Fix code generated. Click 'Send' to upload.") | |
logging.info("Fix code generated. Transitioning to STATE_UPLOADING_FIXED_APP_PY.") | |
# Transition to the upload state for the fix, store generated code | |
return package_workflow_outputs( | |
history=history, repo_id=repo_id, state=STATE_UPLOADING_FIXED_APP_PY, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=debug_attempts, app_desc=app_description, repo_name=repo_name, generated_code=fix_code, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
except RuntimeError as e: # Catch specific RuntimeErrors | |
logging.error(f"Caught RuntimeError generating debug code: {e}") | |
history = add_bot_message(history, f"❌ Error generating debug code: {e}. Click 'reset'.") | |
# Yield error message and reset state on failure | |
return package_workflow_outputs( | |
history=history, repo_id=None, state=STATE_IDLE, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=0, app_desc=None, repo_name=None, generated_code=None, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
def handle_uploading_fixed_app_py( | |
message: str, | |
history: List[Dict[str, str]], | |
hf_profile: gr.OAuthProfile | None, | |
hf_token: gr.OAuthToken | None, | |
gemini_api_key: str | None, | |
gemini_model: str | None, | |
repo_id: str | None, | |
state: WorkflowState, # Should be STATE_UPLOADING_FIXED_APP_PY | |
space_sdk: str, | |
preview_html: str, | |
container_logs: str, | |
build_logs: str, | |
debug_attempts: int, | |
app_description: str | None, | |
repo_name: str | None, | |
generated_code: str | None, # This should hold the fixed code | |
use_grounding: bool, | |
*args, | |
**kwargs | |
) -> Generator[WorkflowOutputs, None, WorkflowOutputs]: # Specify return type as Generator | |
"""Handles logic when in the UPLOADING_FIXED_APP_PY state.""" | |
logging.info(f"Handling STATE_UPLOADING_FIXED_APP_PY for repo '{repo_id}'") | |
current_gemini_key = gemini_api_key # Use the input vars directly | |
current_gemini_model = gemini_model | |
# Retrieve the fixed code from the state variable | |
fixed_code_to_upload = generated_code | |
if not fixed_code_to_upload: | |
logging.error("Internal error: No fixed code available to upload in STATE_UPLOADING_FIXED_APP_PY. Resetting.") | |
history = add_bot_message(history, "Internal error: No fixed code available to upload. Resetting.") | |
return package_workflow_outputs( | |
history=history, repo_id=None, state=STATE_IDLE, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=0, app_desc=None, repo_name=None, generated_code=None, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
else: | |
history = add_bot_message(history, "☁️ Uploading fixed `app.py`...") | |
# Yield message before upload | |
yield package_workflow_outputs( | |
history=history, repo_id=repo_id, state=state, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=debug_attempts, app_desc=app_description, repo_name=repo_name, generated_code=generated_code, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
try: | |
# Perform the upload of the fixed app.py | |
upload_file_to_space_action(io.StringIO(fixed_code_to_upload), "app.py", repo_id, hf_profile, hf_token) | |
history = add_bot_message(history, "✅ Fixed `app.py` uploaded. Space will rebuild. Click 'Send' to check logs again.") | |
state = STATE_CHECKING_LOGS_RUN # Go back to checking run logs after uploading the fix | |
logging.info("Fixed app.py uploaded. Transitioning to STATE_CHECKING_LOGS_RUN.") | |
# Transition state, clear code after use | |
return package_workflow_outputs( | |
history=history, repo_id=repo_id, state=state, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=debug_attempts, app_desc=app_description, repo_name=repo_name, generated_code=None, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
except RuntimeError as e: # Catch specific RuntimeErrors | |
logging.error(f"Caught RuntimeError uploading fixed app.py: {e}") | |
history = add_bot_message(history, f"❌ Error uploading fixed `app.py`: {e}. Click 'reset'.") | |
# Yield error message and reset state on failure | |
return package_workflow_outputs( | |
history=history, repo_id=None, state=STATE_IDLE, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=0, app_desc=None, repo_name=None, generated_code=None, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
def handle_complete( | |
message: str, # User might type something in COMPLETE state | |
history: List[Dict[str, str]], | |
hf_profile: gr.OAuthProfile | None, | |
hf_token: gr.OAuthToken | None, | |
gemini_api_key: str | None, | |
gemini_model: str | None, | |
repo_id: str | None, | |
state: WorkflowState, # Should be STATE_COMPLETE | |
space_sdk: str, | |
preview_html: str, | |
container_logs: str, | |
build_logs: str, | |
debug_attempts: int, | |
app_description: str | None, | |
repo_name: str | None, | |
generated_code: str | None, | |
use_grounding: bool, | |
*args, | |
**kwargs | |
) -> WorkflowOutputs: | |
"""Handles logic when in the COMPLETE state.""" | |
logging.info("Handling STATE_COMPLETE") | |
current_gemini_key = gemini_api_key # Use the input vars directly | |
current_gemini_model = gemini_model | |
# If the user types something in the complete state, maybe interpret it? | |
# For now, we'll just stay in COMPLETE unless they type 'reset'. | |
if "reset" in message.lower(): | |
logging.info("Reset command received in COMPLETE state.") | |
history = add_bot_message(history, "Workflow reset.") | |
# Reset relevant states and UI outputs | |
return package_workflow_outputs( | |
history=history, repo_id=None, state=STATE_IDLE, | |
updated_preview="<p>No Space created yet.</p>", updated_run="", updated_build="", | |
attempts=0, app_desc=None, repo_name=None, generated_code=None, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
else: | |
# Stay in COMPLETE state | |
history = add_bot_message(history, "Workflow is complete. Type 'reset' to start a new project.") | |
return package_workflow_outputs( | |
history=history, repo_id=repo_id, state=STATE_COMPLETE, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=debug_attempts, app_desc=app_description, repo_name=repo_name, generated_code=generated_code, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
# --- Dispatch Table --- | |
STATE_HANDLERS: Dict[WorkflowState, Any] = { # Use Any for type hint simplicity here | |
STATE_IDLE: handle_idle, | |
STATE_AWAITING_REPO_NAME: handle_awaiting_repo_name, | |
STATE_CREATING_SPACE: handle_creating_space, | |
STATE_GENERATING_CODE: handle_generating_code, | |
STATE_UPLOADING_APP_PY: handle_uploading_app_py, | |
STATE_GENERATING_REQUIREMENTS: handle_generating_requirements, | |
STATE_UPLOADING_REQUIREMENTS: handle_uploading_requirements, | |
STATE_GENERATING_README: handle_generating_readme, | |
STATE_UPLOADING_README: handle_uploading_readme, | |
STATE_CHECKING_LOGS_BUILD: handle_checking_logs_build, | |
STATE_CHECKING_LOGS_RUN: handle_checking_logs_run, | |
STATE_DEBUGGING_CODE: handle_debugging_code, | |
STATE_UPLOADING_FIXED_APP_PY: handle_uploading_fixed_app_py, | |
STATE_COMPLETE: handle_complete, | |
} | |
# This is the main generator function for the workflow, triggered by the 'Send' button | |
# Inputs and Outputs list must match exactly. The generator receives values from the inputs list. | |
def ai_workflow_chat( | |
message: str, | |
history: List[Dict[str, str]], | |
hf_profile: gr.OAuthProfile | None, | |
hf_token: gr.OAuthToken | None, | |
# Pass gemini_api_key and gemini_model as inputs - these come from the State variables | |
gemini_api_key_state: str | None, | |
gemini_model_state: str | None, | |
repo_id_state: str | None, | |
workflow_state: WorkflowState, # Use the Literal type hint | |
space_sdk: str, | |
# NOTE: UI component values are passed *by value* to the generator | |
preview_html: str, # Value from iframe HTML | |
container_logs: str, # Value from run_txt Textbox | |
build_logs: str, # Value from build_txt Textbox | |
debug_attempts_state: int, | |
app_description_state: str | None, | |
repo_name_state: str | None, | |
generated_code_state: str | None, | |
use_grounding_state: bool, # Value from use_grounding_checkbox | |
# Accept any extra args/kwargs passed by Gradio, common for generators | |
*args, | |
**kwargs | |
) -> Any: # Use Any because it yields multiple times before returning the final value (None in this case) | |
""" | |
Generator function to handle the AI workflow state machine. | |
Each 'yield' pauses execution and sends values to update Gradio outputs/state. | |
""" | |
# Unpack state variables and UI values from Gradio inputs | |
repo_id = repo_id_state | |
state = workflow_state | |
attempts = debug_attempts_state | |
app_desc = app_description_state | |
repo_name = repo_name_state | |
generated_code = generated_code_state | |
use_grounding = use_grounding_state | |
current_gemini_key = gemini_api_key_state | |
current_gemini_model = gemini_model_state | |
logging.info(f"ai_workflow_chat generator started. State: {state}, Message: {message[:50]}...") | |
# Log all inputs for debugging if needed | |
# logging.debug(f"ai_workflow_chat inputs: {locals()}") | |
# Add the user's message to the chat history immediately | |
user_message_entry = {"role": "user", "content": message} | |
if hf_profile and hf_profile.username: | |
user_message_entry["name"] = hf_profile.username | |
history.append(user_message_entry) | |
logging.debug("User message added to history.") | |
# Yield immediately to update the chat UI with the user's message | |
# This provides immediate feedback to the user while the AI processes | |
# Ensure all state variables and UI outputs are yielded back in the correct order | |
yield package_workflow_outputs( | |
history=history, repo_id=repo_id, state=state, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, | |
attempts=attempts, app_desc=app_desc, repo_name=repo_name, generated_code=generated_code, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
try: | |
# --- State Machine Logic using Dispatch Table --- | |
handler = STATE_HANDLERS.get(state) | |
if handler: | |
logging.debug(f"Invoking handler for state: {state}") | |
# Call the state handler function, passing all necessary data | |
# Need to pass *all* inputs to the handler function | |
handler_output = handler( | |
message=message, history=history, | |
hf_profile=hf_profile, hf_token=hf_token, | |
gemini_api_key=current_gemini_key, gemini_model=current_gemini_model, # Pass current values | |
repo_id=repo_id, state=state, space_sdk=space_sdk, | |
preview_html=preview_html, container_logs=container_logs, build_logs=build_logs, # Pass current UI values | |
debug_attempts=attempts, app_description=app_desc, repo_name=repo_name, generated_code=generated_code, # Pass current state values | |
use_grounding=use_grounding | |
) | |
# The handler might yield intermediate updates (e.g., "Generating...") | |
if isinstance(handler_output, Generator): | |
# If the handler is also a generator, yield from it | |
logging.debug("Handler is a generator, yielding from it.") | |
yield from handler_output | |
else: | |
# If the handler returned the final tuple for this step, yield it | |
logging.debug("Handler returned final output tuple, yielding it.") | |
yield handler_output | |
else: | |
logging.error(f"No handler found for state: {state}. Resetting.") | |
# Fallback for unknown state | |
history = add_bot_message(history, f"Internal error: Unknown state `{state}`. Resetting.") | |
yield package_workflow_outputs( | |
history=history, repo_id=None, state=STATE_IDLE, | |
updated_preview="<p>Error: Unknown state.</p>", updated_run="", updated_build="", | |
attempts=0, app_desc=None, repo_name=None, generated_code=None, | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model | |
) | |
except Exception as e: | |
# This catches any unexpected errors that occur within any state's logic | |
# Specific errors from helper functions (like RuntimeError) should ideally be caught in handlers, | |
# but this is a safety net. | |
error_message = f"Workflow step failed unexpectedly ({state}): {e}. Click 'Send' to re-attempt this step or 'reset'." | |
history = add_bot_message(history, error_message) | |
logging.exception(f"Critical Error caught in ai_workflow_chat generator for state {state}") # Log with traceback | |
# On unexpected error, reset to IDLE, but pass through the current Gemini state | |
yield package_workflow_outputs( | |
history=history, repo_id=None, state=STATE_IDLE, | |
updated_preview=preview_html, updated_run=container_logs, updated_build=build_logs, # Keep existing UI logs | |
attempts=0, app_desc=None, repo_name=None, generated_code=None, # Reset project-specific states | |
use_grounding=use_grounding, current_gemini_key=current_gemini_key, current_gemini_model=current_gemini_model # Pass through Gemini states | |
) | |
# --- Build the Gradio UI --- | |
with gr.Blocks(title="AI-Powered HF Space App Builder") as ai_builder_tab: | |
# Gradio State variables - these persist their values across user interactions (clicks) | |
hf_profile = gr.State(None) | |
hf_token = gr.State(None) | |
gemini_api_key_state = gr.State("") # start with no key | |
gemini_model_state = gr.State(DEFAULT_GEMINI_MODEL) # Default selected model | |
repo_id = gr.State(None) # Stores the ID of the created Space | |
workflow = gr.State(STATE_IDLE, live=True) # Stores the current state, live update for status_text | |
sdk_state = gr.State("gradio") # Stores the selected Space SDK (Gradio or Streamlit) | |
debug_attempts = gr.State(0) # Counter for how many debugging attempts have been made | |
app_description = gr.State(None) # Stores the user's initial description of the desired app | |
repo_name_state = gr.State(None) # Stores the chosen repository name for the Space | |
generated_code_state = gr.State(None) # Temporary storage for generated file content (app.py, reqs, README) | |
use_grounding_state = gr.State(False) | |
with gr.Row(): | |
with gr.Column(scale=1, min_width=300): | |
gr.Markdown("## Hugging Face Login") | |
login_status = gr.Markdown("*Not logged in.*") | |
login_btn = gr.LoginButton(variant="huggingface") | |
gr.Markdown("## Google AI Studio / Gemini") | |
gemini_input = gr.Textbox( | |
label="Your Google AI Studio API Key", | |
type="password", | |
interactive=True, | |
value="", | |
info="Enter your own key here" | |
) | |
gemini_status = gr.Markdown("") | |
model_selector = gr.Radio( | |
choices=GEMINI_MODEL_CHOICES, | |
value=DEFAULT_GEMINI_MODEL, | |
label="Select model", | |
interactive=True | |
) | |
model_description_text = gr.Markdown(get_model_description(DEFAULT_GEMINI_MODEL)) | |
use_grounding_checkbox = gr.Checkbox( | |
label="Enable Grounding with Google Search", | |
value=False, | |
interactive=True, | |
info="Use Google Search results to inform Gemini's response (may improve factuality)." | |
) | |
gr.Markdown("## Space SDK") | |
sdk_selector = gr.Radio(choices=["gradio","streamlit"], value="gradio", label="Template SDK", interactive=True) | |
gr.Markdown("## Workflow Status") | |
status_text = gr.Textbox(label="Current State", value=STATE_IDLE, interactive=False) | |
repo_id_text = gr.Textbox(label="Current Space ID", value="None", interactive=False) | |
with gr.Column(scale=3): | |
chatbot = gr.Chatbot(type='messages', label="AI Workflow Chat") | |
user_input = gr.Textbox(placeholder="Type your message…", interactive=True) | |
send_btn = gr.Button("Send", interactive=False) | |
iframe = gr.HTML("<p>No Space created yet.</p>") | |
build_txt = gr.Textbox(label="Build Logs", lines=10, interactive=False, value="", max_lines=20) | |
run_txt = gr.Textbox(label="Container Logs", lines=10, interactive=False, value="", max_lines=20) | |
# --- Define Event Handlers and Chains --- | |
# List of prerequisite State components for the send button logic | |
prerequisite_states_for_button = [ | |
hf_profile, hf_token, gemini_api_key_state, gemini_model_state, workflow # Add workflow state | |
] | |
# Use the pattern suggested in the feedback: wire each dependency change to the same handler | |
for state_comp in prerequisite_states_for_button: | |
state_comp.change( | |
# Lambda function receives the new value of the changed component first, | |
# followed by the values of the components in 'inputs'. | |
# We pass ALL prerequisite states (including the one that changed) to the lambda's inputs. | |
# The lambda then passes the *explicitly listed* input values to the target function, check_send_button_ready. | |
# This avoids relying on the order of implicit args. | |
lambda *args, states=prerequisite_states_for_button: check_send_button_ready( | |
states[0], states[1], states[2], states[3], states[4] # Pass values from the 'states' list closure | |
), | |
inputs=prerequisite_states_for_button, # Pass all required states | |
outputs=[send_btn], # Update only the send button | |
) | |
# Add a debug log to confirm wiring (optional debug) | |
# logging.debug(f"Wired {state_comp.label}.change to check_send_button_ready.") | |
# Handle login button click: Update profile/token state -> Their .change handlers trigger check_send_button_ready | |
login_btn.click( | |
# Lambda takes the LoginButton output (profile, token tuple) which is 2 args: (profile, token) | |
lambda profile, token: (profile, token), | |
inputs=[login_btn], | |
outputs=[hf_profile, hf_token] | |
) # The .change handlers on hf_profile and hf_token will trigger check_send_button_ready | |
# Handle Gemini Key Input change: Update key state -> Configure Gemini status | |
gemini_input.change( | |
# Lambda receives the new value of gemini_input (1 arg) because inputs=[gemini_input] | |
lambda new_key_value: new_key_value, | |
inputs=[gemini_input], # Explicitly pass the changed component for clarity | |
outputs=[gemini_api_key_state] # This output updates the state | |
).then( | |
# Configure Gemini using the updated state variables | |
# Lambda receives (prev_output, api_key_val_from_state, model_name_val_from_state) | |
# The prev_output is the new key value from the previous step's output (gemini_api_key_state) | |
# We use the explicit inputs instead of prev_output for robustness. | |
lambda prev_output, api_key_val_from_state, model_name_val_from_state: configure_gemini(api_key_val_from_state, model_name_val_from_state), | |
inputs=[gemini_api_key_state, gemini_model_state], # Explicitly pass the required states | |
outputs=[gemini_status] # Update Gemini status display. | |
) # The gemini_api_key_state.change handler (wired in the loop above) handles button updates. | |
# Handle Gemini Model Selector change: Update model state -> Update description -> Configure Gemini status | |
model_selector.change( | |
# Lambda receives the new value of model_selector (1 arg) because inputs=[model_selector] | |
lambda new_model_name: new_model_name, | |
inputs=[model_selector], # Explicitly pass the changed component for clarity | |
outputs=[gemini_model_state] # This output updates the state | |
).then( | |
# Update the model description display | |
# Lambda receives (prev_output, model_name_val_from_state) | |
# The prev_output is the new model name from the previous step's output (gemini_model_state) | |
# We use the explicit inputs instead of prev_output for robustness. | |
lambda prev_output, model_name_val_from_state: get_model_description(model_name_val_from_state), | |
inputs=[gemini_model_state], # Get the new state value | |
outputs=[model_description_text] # Update description UI. | |
).then( | |
# Configure Gemini using the updated state variables | |
# Lambda receives (prev_output, api_key_val_from_state, model_name_val_from_state) | |
# The prev_output is the description text from the previous step. | |
# We use the explicit inputs instead of prev_output for robustness. | |
lambda prev_output, api_key_val_from_state, model_name_val_from_state: configure_gemini(api_key_val_from_state, model_name_val_from_state), | |
inputs=[gemini_api_key_state, gemini_model_state], # Explicitly pass the required states | |
outputs=[gemini_status] # Update Gemini status display. | |
) # The gemini_model_state.change handler (wired in the loop above) handles button updates. | |
# Handle Grounding checkbox change: update grounding state | |
use_grounding_checkbox.change( | |
lambda v: v, inputs=[use_grounding_checkbox], outputs=[use_grounding_state] # Use lists for inputs/outputs | |
) | |
# Handle SDK selector change: update sdk state | |
sdk_selector.change( | |
lambda s: s, inputs=[sdk_selector], outputs=[sdk_state] # Use lists for inputs/outputs | |
) | |
# Link Workflow State variable change to UI status display | |
workflow.change( | |
lambda new_state_value: new_state_value, | |
inputs=[workflow], # Use lists for inputs | |
outputs=[status_text] # Use lists for outputs | |
) | |
# Link Repo ID State variable change to UI status display | |
repo_id.change( | |
lambda new_repo_id_value: new_repo_id_value if new_repo_id_value else "None", | |
inputs=[repo_id], # Use lists for inputs | |
outputs=[repo_id_text] # Use lists for outputs | |
) | |
# The main event handler for the Send button (generator) | |
# This .click() event triggers the ai_workflow_chat generator function | |
# Inputs are read from UI components AND State variables | |
# Outputs are updated by the values yielded from the generator | |
# Ensure inputs and outputs match the ai_workflow_chat signature and yield tuple EXACTLY. | |
# This call is direct, not in a .then() chain, so it does NOT receive a prev_output arg. | |
# It receives args only from the inputs list. | |
send_btn_inputs = [ | |
user_input, chatbot, # UI component inputs (message, current chat history) | |
hf_profile, hf_token, # HF State variables | |
gemini_api_key_state, gemini_model_state, # Gemini State variables | |
repo_id, workflow, sdk_state, # Workflow State variables | |
iframe, run_txt, build_txt, # UI component inputs (current values) | |
debug_attempts, app_description, repo_name_state, generated_code_state, # Other State variables | |
use_grounding_state # Grounding state input | |
] | |
send_btn_outputs = [ | |
chatbot, # Updates Chatbot | |
repo_id, workflow, # Updates State variables (repo_id, workflow) | |
iframe, run_txt, build_txt, # Updates UI components (iframe, logs) | |
debug_attempts, app_description, repo_name_state, generated_code_state, # Updates other State variables | |
use_grounding_state, # Updates Grounding state | |
gemini_api_key_state, gemini_model_state # Updates Gemini State variables - these are passed through the generator | |
] | |
send_btn.click( | |
ai_workflow_chat, | |
inputs=send_btn_inputs, | |
outputs=send_btn_outputs | |
).success( # Chain a .success() event to run *after* the .click() handler completes without error | |
# Clear the user input textbox after the message is sent and processed | |
lambda: gr.Textbox.update(value=""), # Use specific component update | |
inputs=None, | |
outputs=[user_input] # Use lists for outputs | |
) | |
# --- Initial Load Event Chain --- | |
# This chain runs once when the app loads | |
ai_builder_tab.load( | |
# Action 1: Show profile (loads cached login if available) | |
# Lambda receives args corresponding to load's inputs. Load has no explicit inputs here. | |
# However, Gradio *does* pass the initial values of all components/states defined *before* the load event. | |
# The most robust way is to pass the specific state needed. | |
lambda initial_profile: show_profile(initial_profile), | |
inputs=[hf_profile], # Pass the initial profile state value | |
outputs=[login_status] # Updates UI. Use lists for outputs. This output becomes prev_output for the next .then() | |
).then( | |
# Action 2: Configure Gemini using initial state | |
# Lambda receives (prev_output, api_key_val, model_name_val) | |
# prev_output is the string from show_profile. Use explicit inputs. | |
lambda prev_output, api_key_val_from_state, model_name_val_from_state: configure_gemini(api_key_val_from_state, model_name_val_from_state), | |
inputs=[gemini_api_key_state, gemini_model_state], # Explicitly pass the required states | |
outputs=[gemini_status] # Update Gemini status display. Use lists for outputs. | |
).then( | |
# Action 3: After initial load checks, update the button state based on initial states | |
# Lambda receives (prev_output, *prereq_state_values) | |
# prev_output is the string from configure_gemini. Use explicit inputs. | |
lambda prev_output, p1, p2, p3, p4, p5: check_send_button_ready(p1, p2, p3, p4, p5), # Match check_send_button_ready signature | |
inputs=prerequisite_states_for_button, # Pass all 5 prerequisite states | |
outputs=[send_btn], # Update the send button. Use lists for outputs. | |
).then( | |
# Action 4: Update the model description text based on the default selected model | |
# Lambda receives (prev_output, model_name_val) | |
# prev_output is the gr.Button.update object. Use explicit input. | |
lambda prev_output, model_name_val_from_state: get_model_description(model_name_val_from_state), | |
inputs=[gemini_model_state], # Get the default model name from state | |
outputs=[model_description_text] # Update description UI. Use lists for outputs. | |
).then( | |
# Action 5: Add the initial welcome message to the chat history | |
# Lambda receives (prev_output) | |
# prev_output is the description text. | |
lambda prev_output: greet(), | |
inputs=None, # Greet takes no explicit inputs | |
outputs=[chatbot] # Updates the chatbot display. Use lists for outputs. | |
) | |
# The main workflow function and other helper functions are correctly defined OUTSIDE the gr.Blocks context | |
# because they operate on the *values* passed to them by Gradio event triggers, not the UI component objects themselves. | |
if __name__ == "__main__": | |
# Optional: Configure Gradio settings using environment variables | |
os.environ["GRADIO_MAX_FILE_SIZE"] = "100MB" | |
os.environ["GRADIO_TEMP_DIR"] = "./tmp" | |
os.makedirs(os.environ["GRADIO_TEMP_DIR"], exist_ok=True) | |
logging.info("Starting Gradio app...") | |
# Launch the Gradio UI | |
ai_builder_tab.launch() |