Spaces:
Sleeping
Sleeping
import os | |
import re | |
import time | |
import json | |
import io | |
import requests | |
import gradio as gr | |
import google.generativeai as genai | |
from google.generativeai import types # Import types for configuration and tools | |
from huggingface_hub import create_repo, list_models, upload_file, constants | |
from huggingface_hub.utils import build_hf_headers, get_session, hf_raise_for_status | |
# Removed the debugging print that attempts to read GOOGLE_API_KEY from environment | |
# --- Define Gemini Model Information --- | |
# Dictionary mapping internal model name to (Display Name, Description) | |
GEMINI_MODELS = { | |
"gemini-1.5-flash": ("Gemini 1.5 Flash", "Fast and versatile performance across a diverse variety of tasks."), | |
"gemini-1.5-pro": ("Gemini 1.5 Pro", "Complex reasoning tasks requiring more intelligence."), | |
"gemini-1.5-flash-8b": ("Gemini 1.5 Flash 8B", "High volume and lower intelligence tasks."), | |
"gemini-2.0-flash": ("Gemini 2.0 Flash", "Next generation features, speed, thinking, realtime streaming, and multimodal generation."), | |
"gemini-2.0-flash-lite": ("Gemini 2.0 Flash-Lite", "Cost efficiency and low latency."), | |
# Note: Preview models might have shorter lifespans or different capabilities. Uncomment if you want to include them. | |
# "gemini-2.5-flash-preview-04-17": ("Gemini 2.5 Flash Preview (04-17)", "Adaptive thinking, cost efficiency."), | |
# "gemini-2.5-pro-preview-03-25": ("Gemini 2.5 Pro Preview (03-25)", "Enhanced thinking and reasoning, multimodal understanding, advanced coding, and more."), | |
} | |
# Create the list of choices for the Gradio Radio component | |
# Format is (Display Name, Internal Name) | |
GEMINI_MODEL_CHOICES = [(display_name, internal_name) for internal_name, (display_name, description) in GEMINI_MODELS.items()] | |
# Define the default model to be selected | |
DEFAULT_GEMINI_MODEL = "gemini-1.5-flash" # Ensure this key exists in GEMINI_MODELS | |
# --- Helper functions for Hugging Face integration --- | |
def show_profile(profile: gr.OAuthProfile | None) -> str: | |
"""Displays the logged-in Hugging Face profile username.""" | |
if profile is None: | |
return "*Not logged in.*" | |
return f"✅ Logged in as **{profile.username}**" | |
def list_private_models( | |
profile: gr.OAuthProfile | None, | |
oauth_token: gr.OAuthToken | None | |
) -> str: | |
"""Lists private models for the logged-in user (not used in the main workflow, but kept).""" | |
if profile is None or oauth_token is None: | |
return "Please log in to see your models." | |
try: | |
models = [ | |
f"{m.id} ({'private' if m.private else 'public'})" | |
for m in list_models(author=profile.username, token=oauth_token.token) | |
] | |
return "No models found." if not models else "Models:\n\n" + "\n - ".join(models) | |
except Exception as e: | |
# Catching generic exception is acceptable for helper functions | |
return f"Error listing models: {e}" | |
def create_space_action(repo_name: str, sdk: str, profile: gr.OAuthProfile, token: gr.OAuthToken): | |
"""Creates a new Hugging Face Space repository.""" | |
if not profile or not token: | |
raise ValueError("Hugging Face profile or token is missing.") | |
repo_id = f"{profile.username}/{repo_name}" | |
try: | |
create_repo( | |
repo_id=repo_id, | |
token=token.token, | |
exist_ok=True, # Allow creating if it already exists | |
repo_type="space", | |
space_sdk=sdk | |
) | |
url = f"https://huggingface.co/spaces/{repo_id}" | |
iframe = f'<iframe src="{url}" width="100%" height="500px"></iframe>' | |
return repo_id, iframe | |
except Exception as e: | |
raise RuntimeError(f"Failed to create Space `{repo_id}`: {e}") | |
def upload_file_to_space_action( | |
file_obj: io.StringIO, # Specify type hint for clarity | |
path_in_repo: str, | |
repo_id: str, | |
profile: gr.OAuthProfile, | |
token: gr.OAuthToken | |
) -> None: | |
"""Uploads a file to a Huging Face Space repository.""" | |
if not (profile and token and repo_id): | |
raise ValueError("Hugging Face profile, token, or repo_id is missing.") | |
try: | |
upload_file( | |
path_or_fileobj=file_obj, | |
path_in_repo=path_in_repo, | |
repo_id=repo_id, | |
token=token.token, | |
repo_type="space" | |
) | |
except Exception as e: | |
raise RuntimeError(f"Failed to upload `{path_in_repo}` to `{repo_id}`: {e}") | |
def _fetch_space_logs_level(repo_id: str, level: str, token: str) -> str: | |
"""Fetches build or run logs for a Space.""" | |
if not repo_id or not token: | |
return f"Cannot fetch {level} logs: repo_id or token missing." | |
jwt_url = f"{constants.ENDPOINT}/api/spaces/{repo_id}/jwt" | |
try: | |
r = get_session().get(jwt_url, headers=build_hf_headers(token=token)) | |
hf_raise_for_status(r) # Raise HTTPError for bad responses (4xx or 5xx) | |
jwt = r.json()["token"] | |
logs_url = f"https://api.hf.space/v1/{repo_id}/logs/{level}" | |
lines, count = [], 0 | |
# Using stream=True is good for potentially large logs | |
with get_session().get(logs_url, headers=build_hf_headers(token=jwt), stream=True, timeout=30) as resp: | |
hf_raise_for_status(resp) | |
for raw in resp.iter_lines(): | |
if count >= 200: # Limit output lines to prevent UI overload | |
lines.append("... truncated ...") | |
break | |
if not raw.startswith(b"data: "): # EventStream protocol expected from HF logs API | |
continue | |
payload = raw[len(b"data: "):] | |
try: | |
event = json.loads(payload.decode()) | |
ts = event.get("timestamp", "") | |
txt = event.get("data", "").strip() | |
if txt: | |
lines.append(f"[{ts}] {txt}") | |
count += 1 | |
except json.JSONDecodeError: | |
# Skip lines that aren't valid JSON events | |
continue | |
return "\n".join(lines) if lines else f"No {level} logs found." | |
except Exception as e: | |
# Catching generic exception is acceptable for helper functions | |
return f"Error fetching {level} logs for `{repo_id}`: {e}" | |
def get_build_logs_action(repo_id, profile, token): | |
"""Action to fetch build logs with a small delay.""" | |
if not (repo_id and profile and token): | |
return "⚠️ Cannot fetch build logs: log in and create a Space first." | |
# Small delay to allow build process to potentially start on HF side | |
time.sleep(5) | |
return _fetch_space_logs_level(repo_id, "build", token.token) | |
def get_container_logs_action(repo_id, profile, token): | |
"""Action to fetch container logs with a delay.""" | |
if not (repo_id and profile and token): | |
return "⚠️ Cannot fetch container logs: log in and create a Space first." | |
# Longer delay to allow container to start after build completes | |
time.sleep(10) | |
return _fetch_space_logs_level(repo_id, "run", token.token) | |
# --- Google Gemini integration with model selection and grounding --- | |
def configure_gemini(api_key: str | None, model_name: str | None) -> str: | |
"""Configures the Gemini API and checks if the model is accessible.""" | |
# Check for empty string "" as well as None | |
if not api_key: | |
return "⚠️ Gemini API key is not set." | |
# Check if model_name is None or not a valid key in GEMINI_MODELS | |
if not model_name or model_name not in GEMINI_MODELS: | |
return "⚠️ Please select a valid Gemini model." | |
try: | |
genai.configure(api_key=api_key) | |
# Attempt a simple call to verify credentials and model availability | |
# This will raise an exception if the key is invalid or model not found | |
genai.GenerativeModel(model_name).generate_content("ping", stream=False) | |
# This message indicates the API call *for configuration check* was successful | |
return f"✅ Gemini configured successfully with **{GEMINI_MODELS[model_name][0]}**." | |
except Exception as e: | |
# This message indicates the API call *for configuration check* failed | |
return f"❌ Error configuring Gemini: {e}" | |
def get_model_description(model_name: str | None) -> str: | |
"""Retrieves the description for a given model name.""" | |
if model_name is None or model_name not in GEMINI_MODELS: | |
return "Select a model to see its description." | |
# Use .get with a default value to handle cases where the key might not be found | |
return GEMINI_MODELS.get(model_name, (model_name, "No description available."))[1] | |
def call_gemini(prompt: str, api_key: str, model_name: str, use_grounding: bool = False) -> str: | |
"""Calls the Gemini API with a given prompt, optionally using grounding.""" | |
# This check is crucial - it will raise an error *before* the API call if prereqs aren't met | |
# Check for empty string "" as well as None | |
if not isinstance(api_key, str) or api_key == "" or not model_name: | |
# This error indicates a failure in the workflow logic or state propagation | |
# because this function should only be called when prereqs are met. | |
raise ValueError(f"Gemini API call prerequisites not met: api_key={api_key}, model_name={model_name}") | |
try: | |
genai.configure(api_key=api_key) | |
model = genai.GenerativeModel(model_name) | |
# Define tools for grounding if requested. | |
# Using genai.types.GoogleSearch() is recommended for Gemini 2.0+ | |
# and is backwards compatible with 1.5 for retrieval. | |
tools_config = [types.Tool(google_search=types.GoogleSearch())] if use_grounding else None | |
# Using generate_content and stream=False for simplicity here | |
response = model.generate_content( | |
prompt, | |
stream=False, | |
tools=tools_config # Pass the tools configuration | |
) | |
# Check if response is blocked | |
if response.prompt_feedback and response.prompt_feedback.block_reason: | |
raise RuntimeError(f"Gemini API call blocked: {response.prompt_feedback.block_reason}") | |
if not response.candidates: | |
# Check for safety ratings if no candidates are returned but not blocked | |
if response.prompt_feedback and response.prompt_feedback.safety_ratings: | |
ratings = "; ".join([f"{r.category}: {r.probability}" for r in response.prompt_feedback.safety_ratings]) | |
raise RuntimeError(f"Gemini API call returned no candidates. Safety ratings: {ratings}") | |
else: | |
raise RuntimeError("Gemini API call returned no candidates.") | |
# If response.candidates is not empty, get the text | |
# Using response.text is a convenient way to get text from the first candidate part | |
return response.text or "" # Return empty string if no text | |
except Exception as e: | |
# Re-raising as RuntimeError for the workflow to catch and manage | |
raise RuntimeError(f"Gemini API call failed: {e}") | |
# --- AI workflow logic (State Machine) --- | |
# Define States for the workflow | |
STATE_IDLE = "idle" | |
STATE_AWAITING_REPO_NAME = "awaiting_repo_name" | |
STATE_CREATING_SPACE = "creating_space" | |
STATE_GENERATING_CODE = "generating_code" | |
STATE_UPLOADING_APP_PY = "uploading_app_py" | |
STATE_GENERATING_REQUIREMENTS = "generating_requirements" | |
STATE_UPLOADING_REQUIREMENTS = "uploading_requirements" | |
STATE_GENERATING_README = "generating_readme" | |
STATE_UPLOADING_README = "uploading_readme" | |
STATE_CHECKING_LOGS_BUILD = "checking_logs_build" | |
STATE_CHECKING_LOGS_RUN = "checking_logs_run" | |
STATE_DEBUGGING_CODE = "debugging_code" | |
STATE_UPLOADING_FIXED_APP_PY = "uploading_fixed_app_py" | |
STATE_COMPLETE = "complete" | |
MAX_DEBUG_ATTEMPTS = 3 # Limit the number of automatic debug attempts | |
def add_bot_message(history: list[dict], bot_message: str) -> list[dict]: | |
"""Helper to add a new assistant message to the chatbot history.""" | |
history.append({"role": "assistant", "content": bot_message}) | |
return history | |
# Add an initial welcome message to the chatbot (defined outside Blocks to be called by load chain) | |
def greet(): | |
# Updated welcome message to reflect the change in API key handling | |
return [{"role": "assistant", "content": "Welcome! Please log in to Hugging Face and provide your Google AI Studio API key to start building Spaces. Once ready, type 'generate me a gradio app called myapp' or 'create' to begin."}] | |
# Helper function to update send button interactivity based on prereqs | |
# This function has the clean signature it expects. Wrappers handle Gradio's argument passing. | |
def check_send_button_ready(hf_profile: gr.OAuthProfile | None, hf_token: gr.OAuthToken | None, gemini_key: str | None, gemini_model: str | None) -> gr.update: | |
"""Checks if HF login and Gemini configuration are complete and returns update for button interactivity.""" | |
# --- START ENHANCED DEBUGGING LOGS --- | |
print("\n--- check_send_button_ready START ---") | |
print(f" Received hf_profile: {hf_profile is not None}") | |
print(f" Received hf_token: {hf_token is not None}") | |
# For api_key, print part of the key if not None/empty for verification | |
api_key_display = gemini_key[:5] if isinstance(gemini_key, str) and gemini_key else ('Empty String' if isinstance(gemini_key, str) and gemini_key == "" else 'None') | |
print(f" Received gemini_key: Value starts with '{api_key_display}'") | |
print(f" Received gemini_model: {gemini_model}") | |
# --- END ENHANCED DEBUGGING LOGS --- | |
is_logged_in = hf_profile is not None and hf_token is not None | |
# Use bool() check for simplicity - handles None and "" correctly | |
is_gemini_ready = bool(gemini_key) and bool(gemini_model) | |
is_ready = is_logged_in and is_gemini_ready | |
print(f"check_send_button_ready - HF Ready: {is_logged_in}, Gemini Ready: {is_gemini_ready}, Button Ready: {is_ready}") | |
print("--- check_send_button_ready END ---\n") | |
return gr.update(interactive=is_ready) | |
# --- Wrappers to handle Gradio's argument passing in event chains --- | |
# These wrappers accept whatever Gradio passes (*args, **kwargs) and call the target function | |
# with the specific arguments it expects, extracted from *args based on the expected call signature. | |
# Wrapper for functions called in .then() chains with specific inputs list: expects (prev_output, *input_values) | |
# e.g., .then(wrapper, inputs=[s1, s2]) -> wrapper receives (prev_out, s1_val, s2_val) | |
def wrapper_from_then_inputs(func, num_inputs): | |
def wrapped(*args, **kwargs): | |
# We expect num_inputs values at the end of *args, after prev_output (index 0) | |
if len(args) > num_inputs: | |
required_args = args[-num_inputs:] | |
try: | |
return func(*required_args) | |
except Exception as e: | |
print(f"Error calling wrapped function {func.__name__} with args {required_args}: {e}") | |
# Provide a fallback or re-raise depending on context | |
if func == configure_gemini: return f"❌ Error configuring Gemini: {e}" | |
if func == get_model_description: return f"Error getting description: {e}" | |
if func == check_send_button_ready: return gr.update(interactive=False) | |
raise # Re-raise if no specific fallback | |
else: | |
print(f"Warning: wrapper_from_then_inputs for {func.__name__} received unexpected args (expecting at least {num_inputs+1}): {args}") | |
# Provide a fallback or re-raise | |
if func == configure_gemini: return "❌ Error configuring Gemini: unexpected arguments received." | |
if func == get_model_description: return "No description available (unexpected arguments received)." | |
if func == check_send_button_ready: return gr.update(interactive=False) | |
raise ValueError(f"Unexpected args received for {func.__name__}: {args}") | |
return wrapped | |
# Wrapper for functions called by .change() trigger with specific inputs list: expects (changed_value, *input_values) | |
# e.g., component.change(wrapper, inputs=[s1, s2]) -> wrapper receives (changed_val, s1_val, s2_val) | |
def wrapper_from_change_inputs(func, num_inputs): | |
def wrapped(*args, **kwargs): | |
# We expect num_inputs values at the end of *args, after the changed_value (index 0) | |
if len(args) > num_inputs: | |
required_args = args[-num_inputs:] | |
try: | |
return func(*required_args) | |
except Exception as e: | |
print(f"Error calling wrapped function {func.__name__} with args {required_args}: {e}") | |
if func == check_send_button_ready: return gr.update(interactive=False) | |
raise # Re-raise if no specific fallback | |
else: | |
print(f"Warning: wrapper_from_change_inputs for {func.__name__} received unexpected args (expecting at least {num_inputs+1}): {args}") | |
if func == check_send_button_ready: return gr.update(interactive=False) | |
raise ValueError(f"Unexpected args received for {func.__name__}: {args}") | |
return wrapped | |
# Wrapper for functions called in .then() chains with inputs=None: expects (prev_output,) | |
# e.g., .then(wrapper, inputs=None) -> wrapper receives (prev_out,) | |
def wrapper_from_prev_output(func): | |
def wrapped(*args, **kwargs): | |
# We expect only prev_output, or sometimes nothing if the chain starts | |
if len(args) >= 0: # Just accept anything here | |
try: | |
# The target function expects 0 args, so call it with no args | |
return func() | |
except Exception as e: | |
print(f"Error calling wrapped function {func.__name__} with args {args}: {e}") | |
# Provide a fallback or re-raise | |
if func == greet: return [{"role": "assistant", "content": f"❌ Error loading initial message: {e}"}] | |
raise # Re-raise if no specific fallback | |
else: | |
print(f"Warning: wrapper_from_prev_output for {func.__name__} received unexpected args: {args}") | |
if func == greet: return [{"role": "assistant", "content": "❌ Error loading initial message: unexpected arguments received."}] | |
raise ValueError(f"Unexpected args received for {func.__name__}: {args}") | |
return wrapped | |
# Instantiate specific wrappers using the generic ones | |
wrapper_check_button_change = wrapper_from_change_inputs(check_send_button_ready, 4) # Expects (changed, s1, s2, s3, s4) | |
wrapper_check_button_then = wrapper_from_then_inputs(check_send_button_ready, 4) # Expects (prev_out, s1, s2, s3, s4) | |
wrapper_configure_gemini_then = wrapper_from_then_inputs(configure_gemini, 2) # Expects (prev_out, s1, s2) -> api_key, model_name | |
wrapper_get_model_description_then = wrapper_from_then_inputs(get_model_description, 1) # Expects (prev_out, s1) -> model_name | |
wrapper_greet_then = wrapper_from_prev_output(greet) # Expects (prev_out,), needs 0 args | |
# This is the main generator function for the workflow, triggered by the 'Send' button | |
# Inputs and Outputs list must match exactly. The generator receives values from the inputs list. | |
def ai_workflow_chat( | |
message: str, | |
history: list[dict], | |
hf_profile: gr.OAuthProfile | None, | |
hf_token: gr.OAuthToken | None, | |
# Pass gemini_api_key and gemini_model as inputs - these come from the State variables | |
gemini_api_key_state: str | None, | |
gemini_model_state: str | None, | |
repo_id_state: str | None, | |
workflow_state: str, | |
space_sdk: str, | |
# NOTE: UI component values are passed *by value* to the generator | |
preview_html: str, # Value from iframe HTML | |
container_logs: str, # Value from run_txt Textbox | |
build_logs: str, # Value from build_txt Textbox | |
debug_attempts_state: int, | |
app_description_state: str | None, | |
repo_name_state: str | None, | |
generated_code_state: str | None, | |
use_grounding_state: bool, # Value from use_grounding_checkbox | |
# Absorb potential extra args passed by Gradio event listeners (important for generators) | |
*args, # Generators might receive extra args, need to accept them but don't need to yield unless they are state | |
**kwargs # Generators might receive extra kwargs | |
) -> tuple[ | |
list[dict], # 0: Updated chat history (for chatbot) | |
str | None, # 1: Updated repo_id (for repo_id state) | |
str, # 2: Updated workflow state (for workflow state) | |
str, # 3: Updated iframe HTML (for iframe UI component) | |
str, # 4: Updated container logs (for run_txt UI component) | |
str, # 5: Updated build logs (for build_txt UI component) | |
int, # 6: Updated debug attempts count (for debug_attempts state) | |
str | None, # 7: Updated app description (for app_description state) | |
str | None, # 8: Updated repo name (for repo_name_state state) | |
str | None, # 9: Updated generated code (for generated_code_state state) | |
bool, # 10: Updated use_grounding_state (for use_grounding_state state) | |
str | None, # 11: Explicitly yield gemini_api_key_state | |
str | None, # 12: Explicitly yield gemini_model_state | |
]: | |
""" | |
Generator function to handle the AI workflow state machine. | |
Each 'yield' pauses execution and sends values to update Gradio outputs/state. | |
""" | |
# Unpack state variables from Gradio State components passed as inputs | |
repo_id = repo_id_state | |
state = workflow_state | |
attempts = debug_attempts_state | |
app_desc = app_description_state | |
repo_name = repo_name_state | |
generated_code = generated_code_state | |
use_grounding = use_grounding_state # Unpack grounding state | |
# Use the input parameters for Gemini key/model directly in the generator | |
current_gemini_key = gemini_api_key_state | |
current_gemini_model = gemini_model_state | |
# --- START DEBUGGING ai_workflow_chat inputs --- | |
print("\n--- ai_workflow_chat START (Inputs received) ---") | |
print(f" message: {message}") | |
print(f" history len: {len(history)}") | |
print(f" hf_profile: {hf_profile is not None}") | |
print(f" hf_token: {hf_token is not None}") | |
api_key_display = current_gemini_key[:5] if isinstance(current_gemini_key, str) and current_gemini_key else ('Empty String' if isinstance(current_gemini_key, str) and current_gemini_key == "" else 'None') | |
print(f" gemini_api_key_state: Value starts with '{api_key_display}'") | |
print(f" gemini_model_state: {current_gemini_model}") | |
print(f" repo_id_state: {repo_id_state}") # Check value here | |
print(f" workflow_state: {workflow_state}") | |
print(f" space_sdk: {space_sdk}") | |
print(f" use_grounding_state: {use_grounding_state}") | |
print(f" debug_attempts_state: {debug_attempts_state}") | |
print(f" app_description_state: {app_description_state}") | |
print(f" repo_name_state: {repo_name_state}") | |
print(f" generated_code_state: {'Present' if generated_code_state is not None else 'None'}") | |
print(f" *args (unexpected by generator): {args}") # Added debug for unexpected args | |
print(f" **kwargs (unexpected by generator): {kwargs}") # Added debug for unexpected kwargs | |
print("--- END DEBUGGING ai_workflow_chat inputs ---\n") | |
# Keep copies of potentially updated UI elements passed as inputs to update them later | |
# These are the *current values* of the UI components as of the button click | |
updated_preview = preview_html | |
updated_build = build_logs | |
updated_run = container_logs | |
# Add the user's message to the chat history immediately | |
user_message_entry = {"role": "user", "content": message} | |
# Add username if logged in (optional, but nice) | |
if hf_profile and hf_profile.username: | |
user_message_entry["name"] = hf_profile.username | |
history.append(user_message_entry) | |
# Yield immediately to update the chat UI with the user's message | |
# This provides immediate feedback to the user while the AI processes | |
# Ensure all state variables and UI outputs are yielded back in the correct order | |
# Include gemini_api_key_state and gemini_model_state in the yield tuple | |
# The yielded tuple must match the send_btn.click outputs list exactly. | |
yield (history, repo_id, state, updated_preview, updated_run, updated_build, | |
attempts, app_desc, repo_name, generated_code, use_grounding, | |
current_gemini_key, current_gemini_model) # Explicitly pass back current state values | |
try: | |
# --- State Machine Logic based on the current 'state' variable --- | |
# Although button interactivity prevents reaching here without key/model, | |
# the checks remain as a safeguard for the workflow logic itself. | |
# Use the local variables derived from state inputs (current_gemini_key, current_gemini_model) | |
if not (hf_profile and hf_token): | |
history = add_bot_message(history, "Workflow paused: Please log in to Hugging Face first.") | |
# Re-yield state to update chat and keep current state values | |
yield (history, repo_id, state, updated_preview, updated_run, updated_build, | |
attempts, app_desc, repo_name, generated_code, use_grounding, | |
current_gemini_key, current_gemini_model) | |
return # Stop workflow execution for this click | |
if not (isinstance(current_gemini_key, str) and current_gemini_key != "" and current_gemini_model): | |
history = add_bot_message(history, "Workflow cannot start: Please ensure your Gemini API key is entered and a model is selected.") | |
# Re-yield state to update chat and keep current state values | |
yield (history, repo_id, state, updated_preview, updated_run, updated_build, | |
attempts, app_desc, repo_name, generated_code, use_grounding, | |
current_gemini_key, current_gemini_model) # Correctly yielding back the received (likely empty/None) Gemini states | |
return # Stop workflow execution for this click | |
if state == STATE_IDLE: | |
# Look for specific commands in the user's message | |
reset_match = "reset" in message.lower() | |
# Capture app description AND repo name using regex | |
generate_match = re.search(r'generate (?:me )?(?:a|an) (.+) app called (\w+)', message, re.I) | |
# Capture repo name for a simple 'create space' command | |
create_match = re.search(r'create (?:a|an)? space called (\w+)', message, re.I) | |
if reset_match: | |
history = add_bot_message(history, "Workflow reset.") | |
# Reset relevant states and UI outputs, passing through current Gemini state | |
yield (history, None, STATE_IDLE, "<p>No Space created yet.</p>", "", "", 0, | |
None, None, None, False, current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
elif generate_match: | |
# User requested generation with description and name | |
new_app_desc = generate_match.group(1).strip() # Capture description part | |
new_repo_name = generate_match.group(2).strip() # Capture name part | |
history = add_bot_message(history, f"Acknowledged: '{message}'. Starting workflow to create Space `{hf_profile.username}/{new_repo_name}` for a '{new_app_desc}' app.") | |
# Update state variables for the next step (creation) | |
state = STATE_CREATING_SPACE | |
repo_name = new_repo_name | |
app_desc = new_app_desc | |
# Yield updated state variables, passing others through | |
yield (history, repo_id, state, updated_preview, updated_run, updated_build, | |
attempts, app_desc, repo_name, generated_code, use_grounding, | |
current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
elif create_match: | |
# User requested simple space creation with a name | |
new_repo_name = create_match.group(1).strip() | |
history = add_bot_message(history, f"Acknowledged: '{message}'. Starting workflow to create Space `{hf_profile.username}/{new_repo_name}`.") | |
state = STATE_CREATING_SPACE # Transition state to creation | |
repo_name = new_repo_name # Store the validated repo name | |
# Yield updated state variables, passing others through | |
yield (history, repo_id, state, updated_preview, updated_run, updated_build, | |
attempts, app_desc, repo_name, generated_code, use_grounding, | |
current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
elif "create" in message.lower() and not repo_id: | |
# User wants to create but didn't specify a name yet | |
history = add_bot_message(history, "Okay, what should the Space be called? (e.g., `my-awesome-app`)") | |
state = STATE_AWAITING_REPO_NAME # Transition to the state where we wait for the name | |
# Yield updated state, passing others through | |
yield (history, repo_id, state, updated_preview, updated_run, updated_build, | |
attempts, app_desc, repo_name, generated_code, use_grounding, | |
current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
else: | |
# Command not recognized in IDLE state | |
history = add_bot_message(history, "Command not recognized. Try 'generate me a gradio app called myapp', or 'reset'.") | |
# Yield current state, passing others through | |
yield (history, repo_id, state, updated_preview, updated_run, updated_build, | |
attempts, app_desc, repo_name, generated_code, use_grounding, | |
current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
elif state == STATE_AWAITING_REPO_NAME: | |
# User's message is expected to be the repo name | |
new_repo_name = message.strip() | |
# Basic validation for Hugging Face repo name format | |
# Allow letters, numbers, hyphens, underscores, max 100 chars (HF limit check) | |
if not new_repo_name or re.search(r'[^a-zA-Z0-9_-]', new_repo_name) or len(new_repo_name) > 100: | |
history = add_bot_message(history, "Invalid name. Please provide a single word/slug for the Space name (letters, numbers, underscores, hyphens only, max 100 chars).") | |
# Stay in AWAITING_REPO_NAME state and yield message (pass UI outputs through) | |
yield (history, repo_id, state, updated_preview, updated_run, updated_build, | |
attempts, app_desc, repo_name, generated_code, use_grounding, | |
current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
else: | |
history = add_bot_message(history, f"Using Space name `{new_repo_name}`. Creating Space `{hf_profile.username}/{new_repo_name}`...") | |
state = STATE_CREATING_SPACE # Transition state to creation | |
repo_name = new_repo_name # Store the validated repo name | |
# Yield updated state variables, passing others through | |
yield (history, repo_id, state, updated_preview, updated_run, updated_build, | |
attempts, app_desc, repo_name, generated_code, use_grounding, | |
current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
elif state == STATE_CREATING_SPACE: | |
# Ensure repo_name is available (it should have been set in the previous step) | |
if not repo_name: | |
history = add_bot_message(history, "Internal error: Repo name missing for creation. Resetting.") | |
# Reset state on error, passing through current Gemini state | |
yield (history, None, STATE_IDLE, "<p>Error creating space.</p>", "", "", 0, | |
None, None, None, use_grounding, current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
else: | |
try: | |
new_repo_id, iframe_html = create_space_action(repo_name, space_sdk, hf_profile, hf_token) | |
updated_preview = iframe_html | |
repo_id = new_repo_id # Update repo_id state variable | |
history = add_bot_message(history, f"✅ Space `{repo_id}` created. Click 'Send' to generate and upload code.") | |
state = STATE_GENERATING_CODE | |
# Yield updated state variables, passing others through | |
yield (history, repo_id, state, updated_preview, updated_run, updated_build, | |
attempts, app_desc, repo_name, generated_code, use_grounding, | |
current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
except Exception as e: | |
history = add_bot_message(history, f"❌ Error creating space: {e}. Click 'reset'.") | |
# Reset state on error, passing through current Gemini state | |
yield (history, None, STATE_IDLE, "<p>Error creating space.</p>", "", "", 0, | |
None, None, None, use_grounding, current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
elif state == STATE_GENERATING_CODE: | |
# Define the prompt for Gemini based on the app description or a default | |
prompt_desc = app_desc if app_desc else f'a simple {space_sdk} app' | |
prompt = f""" | |
You are an AI assistant specializing in Hugging Face Spaces using the {space_sdk} SDK. | |
Generate a full, single-file Python app based on: | |
'{prompt_desc}' | |
Ensure the code is runnable as `app.py` in a Hugging Face Space using the `{space_sdk}` SDK. Include necessary imports and setup. | |
Return **only** the python code block for `app.py`. Do not include any extra text, explanations, or markdown outside the code block. | |
""" | |
try: | |
history = add_bot_message(history, f"🧠 Generating `{prompt_desc}` `{space_sdk}` app (`app.py`) code with Gemini...") | |
if use_grounding: | |
history = add_bot_message(history, "(Using Grounding with Google Search)") | |
# Yield message before API call | |
yield (history, repo_id, state, updated_preview, updated_run, updated_build, | |
attempts, app_desc, repo_name, generated_code, use_grounding, | |
current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
code = call_gemini(prompt, current_gemini_key, current_gemini_model, use_grounding=use_grounding) | |
code = code.strip() | |
# Clean up markdown | |
if code.startswith("```python"): code = code[len("```python"):].strip() | |
if code.startswith("```"): code = code[len("```"):].strip() | |
if code.endswith("```"): code = code[:-len("```")].strip() | |
if not code: raise ValueError("Gemini returned empty code.") | |
history = add_bot_message(history, "✅ `app.py` code generated. Click 'Send' to upload.") | |
state = STATE_UPLOADING_APP_PY | |
generated_code = code | |
# Yield updated state variables | |
yield (history, repo_id, state, updated_preview, updated_run, updated_build, | |
attempts, app_desc, repo_name, generated_code, use_grounding, | |
current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
except Exception as e: | |
history = add_bot_message(history, f"❌ Error generating code: {e}. Click 'reset'.") | |
# Reset state on error, passing through current Gemini state | |
yield (history, None, STATE_IDLE, updated_preview, updated_run, updated_build, 0, | |
None, None, None, use_grounding, current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
elif state == STATE_UPLOADING_APP_PY: | |
# Retrieve the generated code from the state variable | |
code_to_upload = generated_code | |
if not code_to_upload: | |
history = add_bot_message(history, "Internal error: No code to upload. Resetting.") | |
yield (history, None, STATE_IDLE, updated_preview, updated_run, updated_build, 0, | |
None, None, None, use_grounding, current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
else: | |
history = add_bot_message(history, "☁️ Uploading `app.py`...") | |
# Yield message before upload | |
yield (history, repo_id, state, updated_preview, updated_run, updated_build, | |
attempts, app_desc, repo_name, generated_code, use_grounding, | |
current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
try: | |
upload_file_to_space_action(io.StringIO(code_to_upload), "app.py", repo_id, hf_profile, hf_token) | |
history = add_bot_message(history, "✅ Uploaded `app.py`. Click 'Send' to generate requirements.") | |
state = STATE_GENERATING_REQUIREMENTS | |
generated_code = None | |
# Yield updated state variables | |
yield (history, repo_id, state, updated_preview, updated_run, updated_build, | |
attempts, app_desc, repo_name, generated_code, use_grounding, | |
current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
except Exception as e: | |
history = add_bot_message(history, f"❌ Error uploading `app.py`: {e}. Click 'reset'.") | |
# Reset state on error, passing through current Gemini state | |
yield (history, None, STATE_IDLE, updated_preview, updated_run, updated_build, 0, | |
None, None, None, use_grounding, current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
elif state == STATE_GENERATING_REQUIREMENTS: | |
history = add_bot_message(history, "📄 Generating `requirements.txt`...") | |
# Yield message before generating requirements | |
yield (history, repo_id, state, updated_preview, updated_run, updated_build, | |
attempts, app_desc, repo_name, generated_code, use_grounding, | |
current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
# Logic to determine required packages based on SDK and keywords in the app description | |
reqs_list = ["gradio"] if space_sdk == "gradio" else ["streamlit"] | |
# Add essential libraries regardless of description keywords or grounding | |
essential_libs = ["google-generativeai", "huggingface_hub"] | |
# Only add if Gemini is actually needed for the app (determined by description or if key is present) | |
# If we are here, key and model are available based on STATE_IDLE checks | |
reqs_list.extend(essential_libs) | |
# Add common libraries if description suggests they might be needed | |
if app_desc: | |
app_desc_lower = app_desc.lower() | |
if "requests" in app_desc_lower or "api" in app_desc_lower: | |
reqs_list.append("requests") | |
# Image processing libraries | |
if "image" in app_desc_lower or "upload" in app_desc_lower or "blur" in app_desc_lower or "vision" in app_desc_lower or "photo" in app_desc_lower: | |
reqs_list.append("Pillow") | |
if "numpy" in app_desc_lower: reqs_list.append("numpy") | |
if "pandas" in app_desc_lower or "dataframe" in app_desc_lower: reqs_list.append("pandas") | |
# Add scikit-image and opencv if image processing is heavily implied | |
if any(lib in app_desc_lower for lib in ["scikit-image", "skimage", "cv2", "opencv"]): | |
reqs_list.extend(["scikit-image", "opencv-python"]) # Note: opencv-python for pip | |
# Add transformers if large models are implied | |
if any(lib in app_desc_lower for lib in ["transformer", "llama", "mistral", "bert", "gpt2"]): | |
reqs_list.append("transformers") | |
# Add torch or tensorflow if deep learning frameworks are implied | |
if any(lib in app_desc_lower for lib in ["torch", "pytorch", "tensorflow", "keras"]): | |
reqs_list.extend(["torch", "tensorflow"]) # Users might need specific versions, but this is a start | |
# Use dict.fromkeys to get unique items while preserving insertion order (Python 3.7+) | |
reqs_list = list(dict.fromkeys(reqs_list)) | |
# Sort alphabetically for cleaner requirements.txt | |
reqs_list.sort() | |
reqs_content = "\n".join(reqs_list) + "\n" | |
history = add_bot_message(history, "✅ `requirements.txt` generated. Click 'Send' to upload.") | |
state = STATE_UPLOADING_REQUIREMENTS # Transition state | |
generated_code = reqs_content # Store requirements content | |
# Yield updated state variables and history (pass UI outputs and other states through) | |
yield (history, repo_id, state, updated_preview, updated_run, updated_build, | |
attempts, app_desc, repo_name, generated_code, use_grounding, | |
current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
elif state == STATE_UPLOADING_REQUIREMENTS: | |
# Retrieve requirements content from state variable | |
reqs_content_to_upload = generated_code | |
if not reqs_content_to_upload: | |
history = add_bot_message(history, "Internal error: No requirements content to upload. Resetting.") | |
yield (history, None, STATE_IDLE, updated_preview, updated_run, updated_build, 0, | |
None, None, None, use_grounding, current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
else: | |
history = add_bot_message(history, "☁️ Uploading `requirements.txt`...") | |
# Yield message before upload (pass UI outputs and states through) | |
yield (history, repo_id, state, updated_preview, updated_run, updated_build, | |
attempts, app_desc, repo_name, generated_code, use_grounding, | |
current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
try: | |
# Perform requirements file upload | |
upload_file_to_space_action(io.StringIO(reqs_content_to_upload), "requirements.txt", repo_id, hf_profile, hf_token) | |
history = add_bot_message(history, "✅ Uploaded `requirements.txt`. Click 'Send' to generate README.") | |
state = STATE_GENERATING_README # Transition state | |
generated_code = None # Clear content after use | |
# Yield updated state variables and history (pass UI outputs and other states through) | |
yield (history, repo_id, state, updated_preview, updated_run, updated_build, | |
attempts, app_desc, repo_name, generated_code, use_grounding, | |
current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
except Exception as e: | |
history = add_bot_message(history, f"❌ Error uploading `requirements.txt`: {e}. Click 'reset'.") | |
# Yield error message and reset state on failure | |
yield (history, None, STATE_IDLE, updated_preview, updated_run, updated_build, 0, | |
None, None, None, use_grounding, current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
elif state == STATE_GENERATING_README: | |
history = add_bot_message(history, "📝 Generating `README.md`...") | |
# Yield message before generating README (pass UI outputs and states through) | |
yield (history, repo_id, state, updated_preview, updated_run, updated_build, | |
attempts, app_desc, repo_name, generated_code, use_grounding, | |
current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
# Generate simple README content with Space metadata header | |
readme_title = repo_name if repo_name else "My Awesome Space" | |
readme_description = app_desc if app_desc else f"This Hugging Face Space hosts an AI-generated {space_sdk} application." | |
readme_content = f"""--- | |
title: {readme_title} | |
emoji: 🚀 | |
colorFrom: blue | |
colorTo: yellow | |
sdk: {space_sdk} | |
app_file: app.py | |
pinned: false | |
--- | |
# {readme_title} | |
{readme_description} | |
This Space was automatically generated by an AI workflow using Google Gemini and Gradio. | |
""" # Added Space metadata header and slightly improved content | |
history = add_bot_message(history, "✅ `README.md` generated. Click 'Send' to upload.") | |
state = STATE_UPLOADING_README # Transition state | |
generated_code = readme_content # Store README content | |
# Yield updated state variables and history (pass UI outputs and other states through) | |
yield (history, repo_id, state, updated_preview, updated_run, updated_build, | |
attempts, app_desc, repo_name, generated_code, use_grounding, | |
current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
elif state == STATE_UPLOADING_README: | |
# Retrieve README content from state variable | |
readme_content_to_upload = generated_code | |
if not readme_content_to_upload: | |
history = add_bot_message(history, "Internal error: No README content to upload. Resetting.") | |
yield (history, None, STATE_IDLE, updated_preview, updated_run, updated_build, 0, | |
None, None, None, use_grounding, current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
else: | |
history = add_bot_message(history, "☁️ Uploading `README.md`...") | |
# Yield message before upload (pass UI outputs and states through) | |
yield (history, repo_id, state, updated_preview, updated_run, updated_build, | |
attempts, app_desc, repo_name, generated_code, use_grounding, | |
current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
try: | |
# Perform README file upload | |
upload_file_to_space_action(io.StringIO(readme_content_to_upload), "README.md", repo_id, hf_profile, hf_token) | |
history = add_bot_message(history, "✅ Uploaded `README.md`. All files uploaded. Space is now building. Click 'Send' to check build logs.") | |
state = STATE_CHECKING_LOGS_BUILD # Transition to checking build logs | |
generated_code = None # Clear content after use | |
# Yield updated state variables and history (pass UI outputs and other states through) | |
yield (history, repo_id, state, updated_preview, updated_run, updated_build, | |
attempts, app_desc, repo_name, generated_code, use_grounding, | |
current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
except Exception as e: | |
history = add_bot_message(history, f"❌ Error uploading `README.md`: {e}. Click 'reset'.") | |
# Yield error message and reset state on failure | |
yield (history, None, STATE_IDLE, updated_preview, updated_run, updated_build, 0, | |
None, None, None, use_grounding, current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
elif state == STATE_CHECKING_LOGS_BUILD: | |
history = add_bot_message(history, "🔍 Fetching build logs...") | |
# Yield message before fetching logs (which includes a delay) (pass UI outputs and states through) | |
yield (history, repo_id, state, updated_preview, updated_run, updated_build, | |
attempts, app_desc, repo_name, generated_code, use_grounding, | |
current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
# Fetch build logs from HF Space | |
build_logs_text = get_build_logs_action(repo_id, hf_profile, hf_token) | |
updated_build = build_logs_text # Update the logs display variable | |
# Simple check for common error indicators in logs (case-insensitive) | |
if "error" in updated_build.lower() or "exception" in updated_build.lower() or "build failed" in updated_build.lower(): | |
history = add_bot_message(history, "⚠️ Build logs indicate potential issues. Please inspect above. Click 'Send' to check container logs (app might still start despite build warnings).") | |
state = STATE_CHECKING_LOGS_RUN # Transition even on build error, to see if container starts | |
# Yield updated state, logs, and variables | |
yield (history, repo_id, state, updated_preview, updated_run, updated_build, | |
attempts, app_desc, repo_name, generated_code, use_grounding, | |
current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
else: | |
history = add_bot_message(history, "✅ Build logs fetched. Click 'Send' to check container logs.") | |
state = STATE_CHECKING_LOGS_RUN # Transition to next log check | |
# Yield updated state, logs, and variables | |
yield (history, repo_id, state, updated_preview, updated_run, updated_build, | |
attempts, app_desc, repo_name, generated_code, use_grounding, | |
current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
elif state == STATE_CHECKING_LOGS_RUN: | |
history = add_bot_message(history, "🔍 Fetching container logs...") | |
# Yield message before fetching logs (includes a delay) (pass UI outputs and states through) | |
yield (history, repo_id, state, updated_preview, updated_run, updated_build, | |
attempts, app_desc, repo_name, generated_code, use_grounding, | |
current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
# Fetch container logs from HF Space | |
container_logs_text = get_container_logs_action(repo_id, hf_profile, hf_token) | |
updated_run = container_logs_text # Update the logs display variable | |
# Check for errors in run logs and if we have debug attempts left | |
if ("error" in updated_run.lower() or "exception" in updated_run.lower()) and attempts < MAX_DEBUG_ATTEMPTS: | |
attempts += 1 # Increment debug attempts counter | |
history = add_bot_message(history, f"❌ Errors detected in container logs. Attempting debug fix #{attempts}/{MAX_DEBUG_ATTEMPTS}. Click 'Send' to proceed.") | |
state = STATE_DEBUGGING_CODE # Transition to the debugging state | |
# Yield updated state, logs, attempts, and variables | |
yield (history, repo_id, state, updated_preview, updated_run, updated_build, | |
attempts, app_desc, repo_name, generated_code, use_grounding, | |
current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
elif ("error" in updated_run.lower() or "exception" in updated_run.lower()) and attempts >= MAX_DEBUG_ATTEMPTS: | |
# Max debug attempts reached | |
history = add_bot_message(history, f"❌ Errors detected in container logs. Max debug attempts ({MAX_DEBUG_ATTEMPTS}) reached. Please inspect logs manually or click 'reset'.") | |
state = STATE_COMPLETE # Workflow ends on failure after attempts | |
# Yield updated state, logs, attempts, and variables | |
yield (history, repo_id, state, updated_preview, updated_run, updated_build, | |
attempts, app_desc, repo_name, generated_code, use_grounding, | |
current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
else: | |
# No significant errors found in logs, assume success | |
history = add_bot_message(history, "✅ App appears to be running successfully! Check the iframe above. Click 'reset' to start a new project.") | |
state = STATE_COMPLETE # Workflow ends on success | |
# Yield updated state, logs, attempts, and variables | |
yield (history, repo_id, state, updated_preview, updated_run, updated_build, | |
attempts, app_desc, repo_name, generated_code, use_grounding, | |
current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
elif state == STATE_DEBUGGING_CODE: | |
history = add_bot_message(history, f"🧠 Calling Gemini to generate fix based on logs...") | |
if use_grounding: | |
history = add_bot_message(history, "(Using Grounding with Google Search)") | |
# Yield message before Gemini API call (pass UI outputs and states through) | |
yield (history, repo_id, state, updated_preview, updated_run, updated_build, | |
attempts, app_desc, repo_name, generated_code, use_grounding, | |
current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
# Construct prompt for Gemini including the container logs | |
debug_prompt = f""" | |
You are debugging a {space_sdk} Space. The goal is to fix the code in `app.py` based on the container logs provided. | |
Here are the container logs: | |
Use code with caution. | |
Python | |
{updated_run} | |
Generate the *complete, fixed* content for `app.py` based on these logs. | |
Return **only** the python code block for app.py. Do not include any extra text, explanations, or markdown outside the code block. | |
""" | |
try: | |
# Call Gemini to generate the corrected code, optionally using grounding | |
# Note: Grounding might be less effective for debugging based *only* on logs, | |
# but we include the option as requested. | |
# Use the current_gemini_key and current_gemini_model derived from state inputs | |
fix_code = call_gemini(debug_prompt, current_gemini_key, current_gemini_model, use_grounding=use_grounding) | |
fix_code = fix_code.strip() | |
# Clean up potential markdown formatting | |
if fix_code.startswith("```python"): | |
fix_code = fix_code[len("```python"):].strip() | |
if fix_code.startswith("```"): | |
fix_code = fix_code[len("```"):].strip() | |
if fix_code.endswith("```"): | |
fix_code = fix_code[:-len("```")].strip() | |
if not fix_code: | |
raise ValueError("Gemini returned empty fix code.") | |
history = add_bot_message(history, "✅ Fix code generated. Click 'Send' to upload.") | |
state = STATE_UPLOADING_FIXED_APP_PY # Transition to the upload state for the fix | |
generated_code = fix_code # Store the generated fix code | |
# Yield updated state, code, and variables (pass UI outputs and states through) | |
yield (history, repo_id, state, updated_preview, updated_run, updated_build, | |
attempts, app_desc, repo_name, generated_code, use_grounding, | |
current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
except Exception as e: | |
history = add_bot_message(history, f"❌ Error generating debug code: {e}. Click 'reset'.") | |
# Yield error message and reset state on failure | |
yield (history, None, STATE_IDLE, updated_preview, updated_run, updated_build, 0, | |
None, None, None, use_grounding, | |
current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
elif state == STATE_UPLOADING_FIXED_APP_PY: | |
# Retrieve the fixed code from the state variable | |
fixed_code_to_upload = generated_code | |
if not fixed_code_to_upload: | |
history = add_bot_message(history, "Internal error: No fixed code available to upload. Resetting.") | |
yield (history, None, STATE_IDLE, updated_preview, updated_run, updated_build, 0, | |
None, None, None, use_grounding, current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
else: | |
history = add_bot_message(history, "☁️ Uploading fixed `app.py`...") | |
# Yield message before upload (pass UI outputs and states through) | |
yield (history, repo_id, state, updated_preview, updated_run, updated_build, | |
attempts, app_desc, repo_name, generated_code, use_grounding, | |
current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
try: | |
# Perform the upload of the fixed app.py | |
upload_file_to_space_action(io.StringIO(fixed_code_to_upload), "app.py", repo_id, hf_profile, hf_token) | |
history = add_bot_message(history, "✅ Fixed `app.py` uploaded. Space will rebuild. Click 'Send' to check logs again.") | |
state = STATE_CHECKING_LOGS_RUN # Go back to checking run logs after uploading the fix | |
generated_code = None # Clear code after use | |
# Yield updated state, code, and variables (pass UI outputs and states through) | |
yield (history, repo_id, state, updated_preview, updated_run, updated_build, | |
attempts, app_desc, repo_name, generated_code, use_grounding, | |
current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
except Exception as e: | |
history = add_bot_message(history, f"❌ Error uploading fixed `app.py`: {e}. Click 'reset'.") | |
# Yield error message and reset state on failure | |
yield (history, None, STATE_IDLE, updated_preview, updated_run, updated_build, 0, | |
None, None, None, use_grounding, current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
elif state == STATE_COMPLETE: | |
# If in the complete state, the workflow is finished for this project. | |
# Subsequent clicks just add user messages; we simply yield the current state. | |
yield (history, repo_id, state, updated_preview, updated_run, updated_build, | |
attempts, app_desc, repo_name, generated_code, use_grounding, | |
current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
except Exception as e: | |
# This catches any unexpected errors that occur within any state's logic | |
error_message = f"Workflow step failed unexpectedly ({state}): {e}. Click 'Send' to re-attempt this step or 'reset'." | |
history = add_bot_message(history, error_message) | |
print(f"Critical Error in state {state}: {e}") # Log the error for debugging purposes | |
# On unexpected error, reset to IDLE, but pass through the current Gemini state | |
yield (history, None, STATE_IDLE, updated_preview, updated_run, updated_build, 0, | |
None, None, None, use_grounding, | |
current_gemini_key, current_gemini_model) # Correctly yielding back the received Gemini states | |
# --- Build the Gradio UI --- | |
with gr.Blocks(title="AI-Powered HF Space App Builder") as ai_builder_tab: | |
# Gradio State variables - these persist their values across user interactions (clicks) | |
# Define these first as they might be used in default values for components | |
hf_profile = gr.State(None) | |
hf_token = gr.State(None) | |
# Initialize gemini_api_key_state to empty string | |
gemini_api_key_state = gr.State("") # start with no key | |
# Initialize gemini_model_state with the default model key | |
gemini_model_state = gr.State(DEFAULT_GEMINI_MODEL) # Default selected model | |
repo_id = gr.State(None) # Stores the ID of the created Space | |
workflow = gr.State(STATE_IDLE) # Stores the current state of the AI workflow | |
sdk_state = gr.State("gradio") # Stores the selected Space SDK (Gradio or Streamlit) | |
debug_attempts = gr.State(0) # Counter for how many debugging attempts have been made | |
app_description = gr.State(None) # Stores the user's initial description of the desired app | |
repo_name_state = gr.State(None) # Stores the chosen repository name for the Space | |
generated_code_state = gr.State(None) # Temporary storage for generated file content (app.py, reqs, README) | |
# New State variable for grounding checkbox | |
use_grounding_state = gr.State(False) | |
with gr.Row(): | |
# Sidebar column for inputs and status displays | |
with gr.Column(scale=1, min_width=300): | |
gr.Markdown("## Hugging Face Login") | |
# Define login_status before it's used in login_btn.click outputs | |
login_status = gr.Markdown("*Not logged in.*") | |
# Hugging Face Login Button | |
login_btn = gr.LoginButton(variant="huggingface") | |
gr.Markdown("## Google AI Studio / Gemini") | |
# Define gemini_input and gemini_status before they are used in change handlers | |
# Blank out textbox on load and update info text | |
gemini_input = gr.Textbox( | |
label="Your Google AI Studio API Key", # Changed label | |
type="password", # Hides input for security | |
interactive=True, | |
value="", # Don't pre-fill from env var | |
info="Enter your own key here" # Updated info text | |
) | |
gemini_status = gr.Markdown("") # Display Gemini configuration status | |
# Define model_selector before it's used in its change handler | |
model_selector = gr.Radio( | |
# Use the list of choices generated from the GEMINI_MODELS dictionary | |
choices=GEMINI_MODEL_CHOICES, | |
value=DEFAULT_GEMINI_MODEL, # Default selection using the key | |
label="Select model", | |
interactive=True | |
) | |
# Add a markdown field to display the model description | |
# Initialize with the description of the default model | |
model_description_text = gr.Markdown(get_model_description(DEFAULT_GEMINI_MODEL)) | |
# Define grounding checkbox before its change handler | |
use_grounding_checkbox = gr.Checkbox( | |
label="Enable Grounding with Google Search", | |
value=False, # Default to off | |
interactive=True, | |
info="Use Google Search results to inform Gemini's response (may improve factuality)." | |
) | |
gr.Markdown("## Space SDK") | |
# Define sdk_selector before its change handler | |
sdk_selector = gr.Radio(choices=["gradio","streamlit"], value="gradio", label="Template SDK", interactive=True) | |
gr.Markdown("## Workflow Status") | |
# Define status_text and repo_id_text before they are used in change handlers | |
status_text = gr.Textbox(label="Current State", value=STATE_IDLE, interactive=False) | |
repo_id_text = gr.Textbox(label="Current Space ID", value="None", interactive=False) | |
# Main content area column | |
with gr.Column(scale=3): | |
# Define chatbot, user_input, send_btn before send_btn.click | |
chatbot = gr.Chatbot(type='messages', label="AI Workflow Chat") | |
user_input = gr.Textbox(placeholder="Type your message…", interactive=True) | |
# Define send_btn here, BEFORE it's used in send_button_update_output | |
send_btn = gr.Button("Send", interactive=False) # Start disabled by default | |
# Define iframe, build_txt, run_txt after send_btn | |
# These are UI components, NOT State variables | |
iframe = gr.HTML("<p>No Space created yet.</p>") # HTML element for the Space iframe | |
build_txt = gr.Textbox(label="Build Logs", lines=10, interactive=False, value="", max_lines=20) # Set max_lines for scrollability | |
run_txt = gr.Textbox(label="Container Logs", lines=10, interactive=False, value="", max_lines=20) # Set max_lines for scrollability | |
# --- Define Event Handlers and Chains AFTER all components and required lists are defined --- | |
# Define the inputs used for checking prerequisites (These are State components) | |
send_button_interactive_binding_inputs = [ | |
hf_profile, | |
hf_token, | |
gemini_api_key_state, | |
gemini_model_state | |
] | |
# Define the output for updating the send button interactivity | |
send_button_update_output = [send_btn] | |
# Trigger check_send_button_ready whenever any prerequisite state changes | |
# Use the specific change wrapper which expects (changed_value, *input_values) | |
hf_profile.change( | |
wrapper_check_button_change, | |
inputs=send_button_interactive_binding_inputs, # Pass all 4 prerequisite states | |
outputs=send_button_update_output, # Update only the send button | |
) | |
hf_token.change( | |
wrapper_check_button_change, | |
inputs=send_button_interactive_binding_inputs, | |
outputs=send_button_update_output, | |
) | |
# When gemini_api_key_state changes (updated by gemini_input.change), check button readiness | |
gemini_api_key_state.change( | |
wrapper_check_button_change, | |
inputs=send_button_interactive_binding_inputs, | |
outputs=send_button_update_output, | |
) | |
# When gemini_model_state changes (updated by model_selector.change), check button readiness | |
gemini_model_state.change( | |
wrapper_check_button_change, | |
inputs=send_button_interactive_binding_inputs, | |
outputs=send_button_update_output, | |
) | |
# Handle login button click: Update profile/token state -> Their .change handlers trigger check_send_button_ready | |
login_btn.click( | |
lambda x: (x[0], x[1]), # Lambda takes the LoginButton output (profile, token tuple) and returns it | |
inputs=[login_btn], # Pass the LoginButton itself to get its output | |
outputs=[hf_profile, hf_token] # Update state variables | |
) | |
# Handle Gemini Key Input change: Update key state -> Configure Gemini status -> Update send button state | |
gemini_input.change( | |
# Lambda receives the new value of gemini_input (1 arg) because inputs=None (implied) | |
lambda new_key_value: new_key_value, | |
inputs=None, # Only need the new value of the changed component | |
outputs=[gemini_api_key_state] # This output becomes the implicit first arg for the next .then() in this chain | |
).then( | |
# Configure Gemini using the updated state variables | |
# Use the then_inputs wrapper which expects (prev_output, api_key_val, model_name_val) | |
wrapper_configure_gemini_then, | |
inputs=[gemini_api_key_state, gemini_model_state], # Explicitly pass the required states | |
outputs=[gemini_status] # Update Gemini status display. This output becomes the implicit first arg for the next .then() | |
) | |
# The .then chain continues from the outputs of the configure_gemini call, handled by gemini_api_key_state.change handler above | |
# Handle Gemini Model Selector change: Update model state -> Update description -> Configure Gemini status -> Update send button state | |
model_selector.change( | |
# Lambda receives the new value of model_selector (1 arg) | |
lambda new_model_name: new_model_name, | |
inputs=None, # Only need the new value of the changed component | |
outputs=[gemini_model_state] # This output becomes the implicit first arg for the next .then() | |
).then( | |
# Update the model description display | |
# Use the then_inputs wrapper which expects (prev_output, model_name_val) | |
wrapper_get_model_description_then, | |
inputs=[gemini_model_state], # Get the new state value | |
outputs=[model_description_text] # Update description UI. This output becomes implicit first arg for next .then() | |
).then( | |
# Configure Gemini using the updated state variables | |
# Use the then_inputs wrapper which expects (prev_output, api_key_val, model_name_val) | |
wrapper_configure_gemini_then, | |
inputs=[gemini_api_key_state, gemini_model_state], # Explicitly pass the required states | |
outputs=[gemini_status] # Update Gemini status display. This output becomes the implicit first arg for the next .then() | |
) | |
# The .then chain continues from the outputs of the configure_gemini call, handled by gemini_model_state.change handler above | |
# Handle Grounding checkbox change: update grounding state | |
use_grounding_checkbox.change( | |
lambda v: v, inputs=use_grounding_checkbox, outputs=use_grounding_state | |
) | |
# Handle SDK selector change: update sdk state | |
sdk_selector.change( | |
lambda s: s, inputs=sdk_selector, outputs=sdk_state | |
) | |
# Link Workflow State variable change to UI status display | |
# Lambda receives the new state value (1 arg) because inputs=None | |
workflow.change(lambda new_state_value: new_state_value, inputs=None, outputs=status_text) | |
# Link Repo ID State variable change to UI status display | |
# Lambda receives the new state value (1 arg) because inputs=None | |
# The warning about receiving 0 args persists, likely ignorable for this lambda | |
repo_id.change(lambda new_repo_id_value: new_repo_id_value if new_repo_id_value else "None", inputs=None, outputs=repo_id_text) | |
# The main event handler for the Send button (generator) | |
# This .click() event triggers the ai_workflow_chat generator function | |
# Inputs are read from UI components AND State variables | |
# Outputs are updated by the values yielded from the generator | |
# Ensure inputs and outputs match the ai_workflow_chat signature and yield tuple EXACTLY. | |
# This call is direct, not in a .then() chain, so it does NOT receive a prev_output arg. | |
# It receives args only from the inputs list. | |
send_btn.click( | |
ai_workflow_chat, # The generator function to run (signature handles potential extra args, just in case) | |
inputs=[ | |
user_input, chatbot, # UI component inputs (message, current chat history) | |
hf_profile, hf_token, # HF State variables | |
gemini_api_key_state, gemini_model_state, # Gemini State variables | |
repo_id, workflow, sdk_state, # Workflow State variables | |
iframe, run_txt, build_txt, # UI component inputs (current values) | |
debug_attempts, app_description, repo_name_state, generated_code_state, # Other State variables | |
use_grounding_state # Grounding state input | |
], | |
outputs=[ | |
chatbot, # Updates Chatbot | |
repo_id, workflow, # Updates State variables (repo_id, workflow) | |
iframe, run_txt, build_txt, # Updates UI components (iframe, logs) | |
debug_attempts, app_description, repo_name_state, generated_code_state, # Updates other State variables | |
use_grounding_state, # Updates Grounding state | |
gemini_api_key_state, gemini_model_state # Updates Gemini State variables | |
] | |
).success( # Chain a .success() event to run *after* the .click() handler completes without error | |
# Clear the user input textbox after the message is sent and processed | |
lambda: gr.update(value=""), | |
inputs=None, | |
outputs=user_input # Update the user input textbox | |
) | |
# --- Initial Load Event Chain (Defined INSIDE gr.Blocks, AFTER components and required lists are defined) --- | |
# This chain runs once when the app loads | |
ai_builder_tab.load( | |
# Action 1: Show profile (loads cached login if available) | |
# show_profile expects 1 arg (profile) or None. It receives 1 from load. Correct. | |
show_profile, | |
inputs=None, | |
outputs=login_status # Updates UI. This output becomes the implicit first arg for the next .then() | |
).then( | |
# Action 2: Configure Gemini using initial state | |
# Use the then_inputs wrapper which expects (prev_output, api_key_val, model_name_val) | |
wrapper_configure_gemini_then, | |
inputs=[gemini_api_key_state, gemini_model_state], # Explicitly pass the required states | |
outputs=[gemini_status] # Update Gemini status display. This output becomes the implicit first arg for the next .then() | |
).then( | |
# Action 3: After initial load checks, update the button state based on initial states | |
# Use the then_inputs wrapper which expects (prev_output, *prereq_state_values) | |
wrapper_check_button_then, | |
inputs=send_button_interactive_binding_inputs, # Pass all 4 prerequisite states | |
outputs=send_button_update_output, # Update the send button. This output becomes implicit first arg for next .then() | |
).then( | |
# Action 4: Update the model description text based on the default selected model | |
# Use the then_inputs wrapper which expects (prev_output, model_name_val) | |
wrapper_get_model_description_then, | |
inputs=[gemini_model_state], # Get the default model name from state | |
outputs=[model_description_text] # Update description UI. This output becomes implicit first arg for next .then() | |
).then( | |
# Action 5: Add the initial welcome message to the chat history | |
# Use the prev_output wrapper which expects (prev_output,) | |
wrapper_greet_then, | |
inputs=None, # Greet takes no explicit inputs | |
outputs=chatbot # Updates the chatbot display | |
) | |
# The main workflow function and other helper functions are correctly defined OUTSIDE the gr.Blocks context | |
# because they operate on the *values* passed to them by Gradio event triggers, not the UI component objects themselves. | |
if __name__ == "__main__": | |
# Optional: Configure retries for huggingface_hub requests to make them more robust | |
# from requests.adapters import HTTPAdapter | |
# from urllib3.util.retry import Retry | |
# retry_strategy = Retry(total=5, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504]) # Define retry strategy for specific HTTP codes | |
# adapter = HTTPAdapter(max_retries=retry_strategy) | |
# session = get_session() # Get the session object used internally by huggingface_hub | |
# session.mount("http://", adapter) | |
# session.mount("https://", adapter) | |
# Optional: Configure Gradio settings using environment variables | |
# Set max upload size (e.g., 100MB) for files like app.py | |
os.environ["GRADIO_MAX_FILE_SIZE"] = "100MB" | |
# Optional: Set a local temporary directory for Gradio uploads | |
os.environ["GRADIO_TEMP_DIR"] = "./tmp" | |
os.makedirs(os.environ["GRADIO_TEMP_DIR"], exist_ok=True) # Ensure the directory exists | |
# Launch the Gradio UI | |
# The Gradio launch call blocks execution. | |
ai_builder_tab.launch() |