Spaces:
Sleeping
Sleeping
import os | |
import re | |
import time | |
import json | |
import io | |
import requests | |
import gradio as gr | |
import google.generativeai as genai | |
from huggingface_hub import create_repo, list_models, upload_file, constants | |
from huggingface_hub.utils import build_hf_headers, get_session, hf_raise_for_status | |
# --- Helper functions for Hugging Face integration --- | |
def show_profile(profile: gr.OAuthProfile | None) -> str: | |
if profile is None: | |
return "*Not logged in.*" | |
return f"✅ Logged in as **{profile.username}**" | |
def list_private_models( | |
profile: gr.OAuthProfile | None, | |
oauth_token: gr.OAuthToken | None | |
) -> str: | |
if profile is None or oauth_token is None: | |
return "Please log in to see your models." | |
try: | |
models = [ | |
f"{m.id} ({'private' if m.private else 'public'})" | |
for m in list_models(author=profile.username, token=oauth_token.token) | |
] | |
return "No models found." if not models else "Models:\n\n" + "\n - ".join(models) | |
except Exception as e: | |
return f"Error listing models: {e}" | |
def create_space_action(repo_name: str, sdk: str, profile: gr.OAuthProfile, token: gr.OAuthToken): | |
repo_id = f"{profile.username}/{repo_name}" | |
try: | |
create_repo( | |
repo_id=repo_id, | |
token=token.token, | |
exist_ok=True, | |
repo_type="space", | |
space_sdk=sdk | |
) | |
url = f"https://huggingface.co/spaces/{repo_id}" | |
iframe = f'<iframe src="{url}" width="100%" height="500px"></iframe>' | |
return repo_id, iframe | |
except Exception as e: | |
raise RuntimeError(f"Failed to create Space {repo_id}: {e}") # Raise instead of returning string | |
def upload_file_to_space_action( | |
file_obj, | |
path_in_repo: str, | |
repo_id: str, | |
profile: gr.OAuthProfile, | |
token: gr.OAuthToken | |
) -> None: # Return None on success, raise on failure | |
if not (profile and token and repo_id): | |
raise ValueError("Hugging Face profile, token, or repo_id is missing.") | |
try: | |
upload_file( | |
path_or_fileobj=file_obj, | |
path_in_repo=path_in_repo, | |
repo_id=repo_id, | |
token=token.token, | |
repo_type="space" | |
) | |
except Exception as e: | |
raise RuntimeError(f"Failed to upload `{path_in_repo}` to {repo_id}: {e}") # Raise exception | |
def _fetch_space_logs_level(repo_id: str, level: str, token: str) -> str: | |
if not repo_id or not token: | |
return f"Cannot fetch {level} logs: repo_id or token missing." # Handle missing state gracefully | |
jwt_url = f"{constants.ENDPOINT}/api/spaces/{repo_id}/jwt" | |
try: | |
r = get_session().get(jwt_url, headers=build_hf_headers(token=token)) | |
hf_raise_for_status(r) | |
jwt = r.json()["token"] | |
logs_url = f"https://api.hf.space/v1/{repo_id}/logs/{level}" | |
lines, count = [], 0 | |
with get_session().get(logs_url, headers=build_hf_headers(token=jwt), stream=True, timeout=20) as resp: | |
hf_raise_for_status(resp) | |
for raw in resp.iter_lines(): | |
if count >= 200: | |
lines.append("... truncated ...") | |
break | |
if not raw.startswith(b"data: "): | |
continue | |
payload = raw[len(b"data: "):] | |
try: | |
event = json.loads(payload.decode()) | |
ts = event.get("timestamp", "") | |
txt = event.get("data", "").strip() | |
if txt: | |
lines.append(f"[{ts}] {txt}") | |
count += 1 | |
except json.JSONDecodeError: | |
continue | |
return "\n".join(lines) if lines else f"No {level} logs found." | |
except Exception as e: | |
# Don't raise here, just return error message in logs box | |
return f"Error fetching {level} logs: {e}" | |
def get_build_logs_action(repo_id, profile, token): | |
if not (repo_id and profile and token): | |
return "⚠️ Please log in and create a Space first." | |
return _fetch_space_logs_level(repo_id, "build", token.token) | |
def get_container_logs_action(repo_id, profile, token): | |
if not (repo_id and profile and token): | |
return "⚠️ Please log in and create a Space first." | |
# Add a short delay before fetching run logs, build might just finish | |
time.sleep(5) | |
return _fetch_space_logs_level(repo_id, "run", token.token) | |
# --- Google Gemini integration with model selection --- | |
def configure_gemini(api_key: str | None, model_name: str | None) -> str: | |
if not api_key: | |
return "Gemini API key is not set." | |
if not model_name: | |
return "Please select a Gemini model." | |
try: | |
genai.configure(api_key=api_key) | |
# Test a simple ping | |
genai.GenerativeModel(model_name).generate_content("ping", stream=False) # Use stream=False for sync ping | |
return f"Gemini configured successfully with **{model_name}**." | |
except Exception as e: | |
return f"Error configuring Gemini: {e}" | |
def call_gemini(prompt: str, api_key: str, model_name: str) -> str: | |
if not api_key or not model_name: | |
raise ValueError("Gemini API key or model not set.") | |
try: | |
genai.configure(api_key=api_key) | |
model = genai.GenerativeModel(model_name) | |
response = model.generate_content(prompt, stream=False) # Use stream=False for sync call | |
return response.text or "" | |
except Exception as e: | |
raise RuntimeError(f"Gemini API call failed: {e}") # Raise exception | |
# --- AI workflow logic (State Machine) --- | |
# Define States | |
STATE_IDLE = "idle" | |
STATE_AWAITING_REPO_NAME = "awaiting_repo_name" | |
STATE_CREATING_SPACE = "creating_space" | |
STATE_GENERATING_CODE = "generating_code" | |
STATE_UPLOADING_APP_PY = "uploading_app_py" | |
STATE_GENERATING_REQUIREMENTS = "generating_requirements" | |
STATE_UPLOADING_REQUIREMENTS = "uploading_requirements" | |
STATE_GENERATING_README = "generating_readme" | |
STATE_UPLOADING_README = "uploading_readme" | |
STATE_CHECKING_LOGS_BUILD = "checking_logs_build" | |
STATE_CHECKING_LOGS_RUN = "checking_logs_run" | |
STATE_DEBUGGING_CODE = "debugging_code" | |
STATE_UPLOADING_FIXED_APP_PY = "uploading_fixed_app_py" | |
STATE_COMPLETE = "complete" | |
MAX_DEBUG_ATTEMPTS = 3 | |
def update_chat(history, bot_message): | |
"""Helper to add a bot message and yield state.""" | |
# Ensure last user message is in history | |
if history and history[-1][1] is None: | |
history[-1][1] = "" # Add empty bot response to last user message if none exists | |
history.append([None, bot_message]) | |
return history | |
def ai_workflow_chat( | |
message: str, | |
history: list[list[str | None]], | |
hf_profile: gr.OAuthProfile | None, | |
hf_token: gr.OAuthToken | None, | |
gemini_api_key: str | None, | |
gemini_model: str | None, | |
repo_id_state: str | None, | |
workflow_state: str, | |
space_sdk: str, | |
preview_html: str, | |
container_logs: str, | |
build_logs: str, | |
debug_attempts_state: int, | |
app_description_state: str | None, # Persist initial request | |
repo_name_state: str | None, # Persist chosen name | |
generated_code_state: str | None, # Persist generated code between steps | |
) -> tuple[ | |
list[list[str | None]], # history | |
str | None, # repo_id | |
str, # workflow_state | |
str, # preview_html | |
str, # container_logs | |
str, # build_logs | |
int, # debug_attempts_state | |
str | None, # app_description_state | |
str | None, # repo_name_state | |
str | None, # generated_code_state | |
]: | |
# Unpack state variables | |
repo_id = repo_id_state | |
state = workflow_state | |
attempts = debug_attempts_state | |
app_desc = app_description_state | |
repo_name = repo_name_state | |
generated_code = generated_code_state | |
updated_preview = preview_html | |
updated_build = build_logs | |
updated_run = container_logs | |
# Add user message to history for context | |
history.append([message, None]) | |
# Yield immediately to show user message | |
yield history, repo_id, state, updated_preview, updated_run, updated_build, attempts, app_desc, repo_name, generated_code | |
try: | |
# --- State Machine Logic --- | |
if state == STATE_IDLE: | |
# Check prerequisites first | |
if not (hf_profile and hf_token): | |
history = update_chat(history, "Please log in to Hugging Face first.") | |
yield history, repo_id, state, updated_preview, updated_run, updated_build, attempts, app_desc, repo_name, generated_code | |
return # Stop workflow until login | |
if not (gemini_api_key and gemini_model): | |
history = update_chat(history, "Please enter your API key and select a Gemini model.") | |
yield history, repo_id, state, updated_preview, updated_run, updated_build, attempts, app_desc, repo_name, generated_code | |
return # Stop workflow until API key/model set | |
# Look for commands | |
reset_match = "reset" in message.lower() | |
generate_match = re.search(r'generate (?:me )?(?:a|an) \w+ app called (\w+)', message, re.I) | |
create_match = re.search(r'create (?:a|an)? space called (\w+)', message, re.I) | |
if reset_match: | |
history = update_chat(history, "Workflow reset.") | |
# Reset all state variables | |
yield history, None, STATE_IDLE, "<p>No Space created yet.</p>", "", "", 0, None, None, None | |
return # End workflow | |
elif generate_match: | |
new_repo_name = generate_match.group(1) | |
new_app_desc = message # Store the full request | |
history = update_chat(history, f"Acknowledged: '{message}'. Starting workflow to create Space `{hf_profile.username}/{new_repo_name}`.") | |
# Transition to creating space state, passing name and description | |
yield history, repo_id, STATE_CREATING_SPACE, updated_preview, updated_run, updated_build, attempts, new_app_desc, new_repo_name, generated_code | |
elif create_match: | |
new_repo_name = create_match.group(1) | |
history = update_chat(history, f"Acknowledged: '{message}'. Starting workflow to create Space `{hf_profile.username}/{new_repo_name}`.") | |
# Transition to creating space state, just passing the name (desc will be default) | |
yield history, repo_id, STATE_CREATING_SPACE, updated_preview, updated_run, updated_build, attempts, app_desc, new_repo_name, generated_code # Use existing app_desc or None | |
elif "create" in message.lower() and not repo_id: # Generic create trigger | |
history = update_chat(history, "Okay, what should the Space be called? (e.g., `my-awesome-app`)") | |
# Transition to awaiting name state | |
yield history, repo_id, STATE_AWAITING_REPO_NAME, updated_preview, updated_run, updated_build, attempts, app_desc, repo_name, generated_code # Stay in this state | |
else: | |
# Handle other chat messages if needed, or just respond unknown | |
history = update_chat(history, "Command not recognized. Try 'generate me a gradio app called myapp', or 'reset'.") | |
yield history, repo_id, state, updated_preview, updated_run, updated_build, attempts, app_desc, repo_name, generated_code | |
# Stay in IDLE state | |
elif state == STATE_AWAITING_REPO_NAME: | |
new_repo_name = message.strip() | |
if not new_repo_name or re.search(r'\s', new_repo_name): # Basic validation for repo name | |
history = update_chat(history, "Invalid name. Please provide a single word/slug for the Space name.") | |
yield history, repo_id, state, updated_preview, updated_run, updated_build, attempts, app_desc, repo_name, generated_code # Stay in this state | |
else: | |
history = update_chat(history, f"Using Space name `{new_repo_name}`. Creating Space `{hf_profile.username}/{new_repo_name}`...") | |
# Transition to creating space state, pass the received name | |
yield history, repo_id, STATE_CREATING_SPACE, updated_preview, updated_run, updated_build, attempts, app_desc, new_repo_name, generated_code | |
elif state == STATE_CREATING_SPACE: | |
# This state is triggered when we *already have* the repo_name in state | |
if not repo_name: # Safety check | |
history = update_chat(history, "Internal error: Repo name missing for creation. Resetting.") | |
yield history, None, STATE_IDLE, "<p>Error creating space.</p>", "", "", 0, None, None, None | |
return | |
try: | |
new_repo_id, iframe_html = create_space_action(repo_name, space_sdk, hf_profile, hf_token) | |
updated_preview = iframe_html | |
history = update_chat(history, f"✅ Space `{new_repo_id}` created.") | |
# Transition to generating code state, update repo_id and preview | |
yield history, new_repo_id, STATE_GENERATING_CODE, updated_preview, updated_run, updated_build, attempts, app_desc, repo_name, generated_code | |
except Exception as e: | |
history = update_chat(history, f"❌ Error creating space: {e}") | |
# Reset state on failure | |
yield history, None, STATE_IDLE, "<p>Error creating space.</p>", "", "", 0, None, None, None | |
elif state == STATE_GENERATING_CODE: | |
# Use the stored app description or a default | |
prompt_desc = app_desc if app_desc else 'a Gradio image-blur test app with upload and slider controls' | |
prompt = f""" | |
You are an AI assistant specializing in Hugging Face Spaces using the {space_sdk} SDK. | |
Generate a full, single-file Python app based on: | |
'{prompt_desc}' | |
Return **only** the python code block for app.py. Do not include any extra text, explanations, or markdown outside the code block. | |
""" | |
try: | |
history = update_chat(history, "🧠 Generating `app.py` code with Gemini...") | |
yield history, repo_id, state, updated_preview, updated_run, updated_build, attempts, app_desc, repo_name, generated_code # Update UI to show "Generating..." | |
code = call_gemini(prompt, gemini_api_key, gemini_model) | |
# Clean markdown and whitespace | |
code = code.strip() | |
if code.startswith("```python"): | |
code = code[len("```python"):].strip() | |
if code.endswith("```"): | |
code = code[:-len("```")].strip() | |
if not code: | |
raise ValueError("Gemini returned empty code.") | |
history = update_chat(history, "✅ `app.py` code generated. Click 'Send' to upload.") | |
# Transition to uploading state, store the generated code | |
yield history, repo_id, STATE_UPLOADING_APP_PY, updated_preview, updated_run, updated_build, attempts, app_desc, repo_name, code # Pass code in state | |
except Exception as e: | |
history = update_chat(history, f"❌ Error generating code: {e}. Click 'reset'.") | |
# Reset state on failure | |
yield history, None, STATE_IDLE, updated_preview, updated_run, updated_build, 0, None, None, None | |
elif state == STATE_UPLOADING_APP_PY: | |
# Use the generated_code stored in state | |
if not generated_code: | |
history = update_chat(history, "Internal error: No code to upload. Resetting.") | |
yield history, None, STATE_IDLE, updated_preview, updated_run, updated_build, 0, None, None, None | |
return | |
history = update_chat(history, "☁️ Uploading `app.py`...") | |
yield history, repo_id, state, updated_preview, updated_run, updated_build, attempts, app_desc, repo_name, None # Yield to show message, clear generated_code state | |
try: | |
upload_file_to_space_action(io.StringIO(generated_code), "app.py", repo_id, hf_profile, hf_token) | |
history = update_chat(history, "✅ Uploaded `app.py`. Click 'Send' to generate requirements.") | |
# Transition to generating requirements | |
yield history, repo_id, STATE_GENERATING_REQUIREMENTS, updated_preview, updated_run, updated_build, attempts, app_desc, repo_name, None # Keep generated_code as None | |
except Exception as e: | |
history = update_chat(history, f"❌ Error uploading app.py: {e}. Click 'reset'.") | |
yield history, None, STATE_IDLE, updated_preview, updated_run, updated_build, 0, None, None, None | |
elif state == STATE_GENERATING_REQUIREMENTS: | |
history = update_chat(history, "📄 Generating `requirements.txt`...") | |
yield history, repo_id, state, updated_preview, updated_run, updated_build, attempts, app_desc, repo_name, generated_code # Yield to show message | |
# Simple heuristic for requirements based on SDK and common needs | |
reqs_list = ["gradio"] if space_sdk == "gradio" else ["streamlit"] | |
# Add common deps if likely used (could parse code, but simpler heuristic for demo) | |
if "google.generativeai" in (generated_code or "") or gemini_api_key: # Check if Gemini was used for code or if key is set | |
reqs_list.append("google-generativeai") | |
if "requests" in (generated_code or ""): | |
reqs_list.append("requests") | |
reqs_list.append("huggingface_hub") # Needed for log fetching etc if done inside the space itself (though not in this current app's space code) | |
reqs_content = "\n".join(reqs_list) + "\n" | |
history = update_chat(history, "✅ `requirements.txt` generated. Click 'Send' to upload.") | |
# Transition to uploading requirements, store content temporarily | |
yield history, repo_id, STATE_UPLOADING_REQUIREMENTS, updated_preview, updated_run, updated_build, attempts, app_desc, repo_name, reqs_content # Pass content in state (abusing generated_code slot) | |
elif state == STATE_UPLOADING_REQUIREMENTS: | |
# Use content stored in state (abusing generated_code slot) | |
reqs_content_to_upload = generated_code # Get content from state | |
if not reqs_content_to_upload: | |
history = update_chat(history, "Internal error: No requirements content to upload. Resetting.") | |
yield history, None, STATE_IDLE, updated_preview, updated_run, updated_build, 0, None, None, None | |
return | |
history = update_chat(history, "☁️ Uploading `requirements.txt`...") | |
yield history, repo_id, state, updated_preview, updated_run, updated_build, attempts, app_desc, repo_name, None # Yield, clear temp state | |
try: | |
upload_file_to_space_action(io.StringIO(reqs_content_to_upload), "requirements.txt", repo_id, hf_profile, hf_token) | |
history = update_chat(history, "✅ Uploaded `requirements.txt`. Click 'Send' to generate README.") | |
# Transition to generating README | |
yield history, repo_id, STATE_GENERATING_README, updated_preview, updated_run, updated_build, attempts, app_desc, repo_name, None | |
except Exception as e: | |
history = update_chat(history, f"❌ Error uploading requirements.txt: {e}. Click 'reset'.") | |
yield history, None, STATE_IDLE, updated_preview, updated_run, updated_build, 0, None, None, None | |
elif state == STATE_GENERATING_README: | |
history = update_chat(history, "📝 Generating `README.md`...") | |
yield history, repo_id, state, updated_preview, updated_run, updated_build, attempts, app_desc, repo_name, generated_code # Yield to show message | |
# Generate a simple README based on app_desc or repo_name | |
readme_title = repo_name if repo_name else "My Awesome Space" | |
readme_description = app_desc if app_desc else "This Hugging Face Space hosts an AI-generated application." | |
readme_content = f"# {readme_title}\n\n{readme_description}\n\n" \ | |
"This Space was automatically generated by an AI workflow.\n" | |
history = update_chat(history, "✅ `README.md` generated. Click 'Send' to upload.") | |
# Transition to uploading README, store content temporarily | |
yield history, repo_id, STATE_UPLOADING_README, updated_preview, updated_run, updated_build, attempts, app_desc, repo_name, readme_content # Pass content in state | |
elif state == STATE_UPLOADING_README: | |
# Use content stored in state (abusing generated_code slot) | |
readme_content_to_upload = generated_code # Get content from state | |
if not readme_content_to_upload: | |
history = update_chat(history, "Internal error: No README content to upload. Resetting.") | |
yield history, None, STATE_IDLE, updated_preview, updated_run, updated_build, 0, None, None, None | |
return | |
history = update_chat(history, "☁️ Uploading `README.md`...") | |
yield history, repo_id, state, updated_preview, updated_run, updated_build, attempts, app_desc, repo_name, None # Yield, clear temp state | |
try: | |
upload_file_to_space_action(io.StringIO(readme_content_to_upload), "README.md", repo_id, hf_profile, hf_token) | |
history = update_chat(history, "✅ Uploaded `README.md`. Files uploaded. Space is now building. Click 'Send' to check build logs.") | |
# Transition to checking build logs | |
yield history, repo_id, STATE_CHECKING_LOGS_BUILD, updated_preview, updated_run, updated_build, attempts, app_desc, repo_name, None | |
except Exception as e: | |
history = update_chat(history, f"❌ Error uploading README.md: {e}. Click 'reset'.") | |
yield history, None, STATE_IDLE, updated_preview, updated_run, updated_build, 0, None, None, None | |
elif state == STATE_CHECKING_LOGS_BUILD: | |
# Optional: Add a short delay here if needed, but fetch action includes timeout | |
history = update_chat(history, "🔍 Fetching build logs...") | |
yield history, repo_id, state, updated_preview, updated_run, updated_build, attempts, app_desc, repo_name, generated_code # Show message | |
build_logs_text = get_build_logs_action(repo_id, hf_profile, hf_token) | |
updated_build = build_logs_text | |
# Simple check: if build logs contain "Error" or "Exception", might indicate build issue. | |
# More robust would involve checking build status via API, but logs are simpler for demo. | |
# Assuming successful build leads to container logs check. | |
if "Error" in updated_build or "Exception" in updated_build: | |
history = update_chat(history, "⚠️ Build logs may contain errors. Please inspect. Click 'Send' to check container logs (app might still start).") | |
yield history, repo_id, STATE_CHECKING_LOGS_RUN, updated_preview, updated_run, updated_build, attempts, app_desc, repo_name, generated_code # Transition to run logs check | |
else: | |
history = update_chat(history, "✅ Build logs fetched. Click 'Send' to check container logs.") | |
yield history, repo_id, STATE_CHECKING_LOGS_RUN, updated_preview, updated_run, updated_build, attempts, app_desc, repo_name, generated_code # Transition to run logs check | |
elif state == STATE_CHECKING_LOGS_RUN: | |
history = update_chat(history, "🔍 Fetching container logs...") | |
yield history, repo_id, state, updated_preview, updated_run, updated_build, attempts, app_desc, repo_name, generated_code # Show message | |
container_logs_text = get_container_logs_action(repo_id, hf_profile, hf_token) | |
updated_run = container_logs_text | |
# Check for errors/exceptions in run logs | |
if ("Error" in updated_run or "Exception" in updated_run) and attempts < MAX_DEBUG_ATTEMPTS: | |
attempts += 1 | |
history = update_chat(history, f"❌ Errors detected in container logs. Attempting debug fix #{attempts}/{MAX_DEBUG_ATTEMPTS}. Click 'Send' to proceed.") | |
# Transition to debugging state, increment attempts | |
yield history, repo_id, STATE_DEBUGGING_CODE, updated_preview, updated_run, updated_build, attempts, app_desc, repo_name, generated_code # Keep other states same | |
elif ("Error" in updated_run or "Exception" in updated_run) and attempts >= MAX_DEBUG_ATTEMPTS: | |
history = update_chat(history, f"❌ Errors detected in container logs. Max debug attempts ({MAX_DEBUG_ATTEMPTS}) reached. Please inspect logs manually or click 'reset'.") | |
# Transition to complete/idle after failed attempts | |
yield history, repo_id, STATE_COMPLETE, updated_preview, updated_run, updated_build, attempts, app_desc, repo_name, generated_code | |
else: | |
history = update_chat(history, "✅ App appears to be running successfully! Check the iframe above. Click 'reset' to start a new project.") | |
# Transition to complete/idle on success | |
yield history, repo_id, STATE_COMPLETE, updated_preview, updated_run, updated_build, attempts, app_desc, repo_name, generated_code | |
elif state == STATE_DEBUGGING_CODE: | |
history = update_chat(history, f"🧠 Calling Gemini to generate fix based on logs...") | |
yield history, repo_id, state, updated_preview, updated_run, updated_build, attempts, app_desc, repo_name, generated_code # Show message | |
debug_prompt = f""" | |
You are debugging a {space_sdk} Space. The goal is to fix the code in `app.py` based on the container logs provided. | |
Here are the container logs: | |
{updated_run} | |
Generate the *complete, fixed* content for `app.py` based on these logs. | |
Return **only** the python code block for app.py. Do not include any extra text, explanations, or markdown outside the code block. | |
""" | |
try: | |
fix_code = call_gemini(debug_prompt, gemini_api_key, gemini_model) | |
# Clean markdown and whitespace | |
fix_code = fix_code.strip() | |
if fix_code.startswith("```python"): | |
fix_code = fix_code[len("```python"):].strip() | |
if fix_code.endswith("```"): | |
fix_code = fix_code[:-len("```")].strip() | |
if not fix_code: | |
raise ValueError("Gemini returned empty fix code.") | |
history = update_chat(history, "✅ Fix code generated. Click 'Send' to upload.") | |
# Transition to uploading fixed code, pass the fixed code | |
yield history, repo_id, STATE_UPLOADING_FIXED_APP_PY, updated_preview, updated_run, updated_build, attempts, app_desc, repo_name, fix_code # Pass fix_code in state | |
except Exception as e: | |
history = update_chat(history, f"❌ Error generating debug code: {e}. Click 'reset'.") | |
# Reset on failure | |
yield history, None, STATE_IDLE, updated_preview, updated_run, updated_build, 0, None, None, None | |
elif state == STATE_UPLOADING_FIXED_APP_PY: | |
# Use the fixed code stored in state (abusing generated_code slot) | |
fixed_code_to_upload = generated_code # Get code from state | |
if not fixed_code_to_upload: | |
history = update_chat(history, "Internal error: No fixed code available to upload. Resetting.") | |
yield history, None, STATE_IDLE, updated_preview, updated_run, updated_build, 0, None, None, None | |
return | |
history = update_chat(history, "☁️ Uploading fixed `app.py`...") | |
yield history, repo_id, state, updated_preview, updated_run, updated_build, attempts, app_desc, repo_name, None # Yield, clear temp state | |
try: | |
upload_file_to_space_action(io.StringIO(fixed_code_to_upload), "app.py", repo_id, hf_profile, hf_token) | |
history = update_chat(history, "✅ Fixed `app.py` uploaded. Space will rebuild. Click 'Send' to check logs again.") | |
# Transition back to checking run logs (after rebuild) | |
yield history, repo_id, STATE_CHECKING_LOGS_RUN, updated_preview, updated_run, updated_build, attempts, app_desc, repo_name, None # Keep generated_code as None | |
except Exception as e: | |
history = update_chat(history, f"❌ Error uploading fixed app.py: {e}. Click 'reset'.") | |
yield history, None, STATE_IDLE, updated_preview, updated_run, updated_build, 0, None, None, None | |
elif state == STATE_COMPLETE: | |
# App is successfully deployed or failed after attempts. | |
# User should click reset or start a new command. | |
# Just yield the current state. | |
yield history, repo_id, state, updated_preview, updated_run, updated_build, attempts, app_desc, repo_name, generated_code | |
else: # Should not happen | |
history = update_chat(history, f"Internal error: Unknown state '{state}'. Resetting.") | |
yield history, None, STATE_IDLE, "<p>Unknown state error.</p>", "", "", 0, None, None, None | |
except Exception as e: | |
# Catch-all for unexpected exceptions in any state | |
history = update_chat(history, f"Workflow step failed unexpectedly: {e}. Click 'Send' to re-attempt this step or 'reset'.") | |
# Stay in the current state to allow re-attempt, or maybe go to idle? | |
# Let's go to idle on unexpected errors to prevent getting stuck. | |
yield history, None, STATE_IDLE, updated_preview, updated_run, updated_build, 0, None, None, None | |
# --- Build the Gradio UI --- | |
with gr.Blocks(title="AI-Powered HF Space App Builder") as ai_builder_tab: | |
# State variables | |
hf_profile = gr.State(None) | |
hf_token = gr.State(None) | |
gemini_key = gr.State(None) | |
gemini_model = gr.State("gemini-2.5-flash-preview-04-17") # Default model | |
repo_id = gr.State(None) # ID of the created Space (e.g., 'user/repo') | |
workflow = gr.State(STATE_IDLE) # Current state of the AI workflow | |
sdk_state = gr.State("gradio") # Selected SDK | |
debug_attempts = gr.State(0) # Counter for debug attempts | |
app_description = gr.State(None) # Stores the user's original request string | |
repo_name_state = gr.State(None) # Stores the parsed repo name | |
generated_code_state = gr.State(None) # Temporarily stores generated code or file content | |
with gr.Row(): | |
# Sidebar | |
with gr.Column(scale=1, min_width=300): | |
gr.Markdown("## Hugging Face Login") | |
login_status = gr.Markdown("*Not logged in.*") | |
login_btn = gr.LoginButton(variant="huggingface") | |
# Initial load to check login status | |
ai_builder_tab.load(show_profile, outputs=login_status) | |
# Update status on login click | |
login_btn.click(show_profile, outputs=login_status) | |
# Store profile and token in state on login click | |
login_btn.click(lambda p, t: (p, t), outputs=[hf_profile, hf_token]) | |
gr.Markdown("## Google AI Studio API Key") | |
gemini_input = gr.Textbox(label="API Key", type="password", interactive=True) # Ensure interactive | |
gemini_status = gr.Markdown("") | |
# Update key in state | |
gemini_input.change(lambda k: k, inputs=gemini_input, outputs=gemini_key) | |
gr.Markdown("## Gemini Model") | |
model_selector = gr.Radio( | |
choices=[ | |
("Gemini 2.5 Flash Preview 04-17", "gemini-2.5-flash-preview-04-17"), | |
("Gemini 2.5 Pro Preview 03-25", "gemini-2.5-pro-preview-03-25"), | |
("Gemini 2.0 Flash", "gemini-2.0-flash"), # Add more recent models if available | |
("Gemini 2.0 Flash‑Lite", "gemini-2.0-flash-lite"), | |
("Gemini 1.5 Flash", "gemini-1.5-flash"), | |
], | |
value="gemini-2.5-flash-preview-04-17", | |
label="Select model", | |
interactive=True # Ensure interactive | |
) | |
# Update model in state | |
model_selector.change(lambda m: m, inputs=model_selector, outputs=gemini_model) | |
# Configure Gemini status on load and when key/model changes | |
ai_builder_tab.load( | |
configure_gemini, | |
inputs=[gemini_key, gemini_model], | |
outputs=[gemini_status] | |
) | |
gemini_input.change( | |
configure_gemini, | |
inputs=[gemini_key, gemini_model], | |
outputs=[gemini_status] | |
) | |
model_selector.change( | |
configure_gemini, | |
inputs=[gemini_key, gemini_model], | |
outputs=[gemini_status] | |
) | |
gr.Markdown("## Space SDK") | |
sdk_selector = gr.Radio(choices=["gradio","streamlit"], value="gradio", label="Template SDK", interactive=True) | |
sdk_selector.change(lambda s: s, inputs=sdk_selector, outputs=sdk_state) | |
gr.Markdown("## Workflow Status") | |
status_text = gr.Textbox(label="Current State", value=STATE_IDLE, interactive=False) | |
repo_id_text = gr.Textbox(label="Current Space ID", value="None", interactive=False) | |
# Main content | |
with gr.Column(scale=3): | |
chatbot = gr.Chatbot() | |
user_input = gr.Textbox(placeholder="Type your message…", interactive=True) # Ensure interactive | |
send_btn = gr.Button("Send", interactive=False) | |
# Logic to enable send button only when logged in and API key is set | |
# This logic is handled reactively by the load and change events below | |
def update_send_button_state(profile, token, key, model): | |
is_logged_in = profile is not None and token is not None | |
is_gemini_ready = key is not None and model is not None | |
# Simple check - a more robust check would involve calling configure_gemini | |
# but for UI responsiveness, this is okay. | |
return gr.update(interactive=is_logged_in and is_gemini_ready) | |
ai_builder_tab.load( | |
update_send_button_state, | |
inputs=[hf_profile, hf_token, gemini_key, gemini_model], | |
outputs=[send_btn] | |
) | |
login_btn.click( | |
update_send_button_state, | |
inputs=[hf_profile, hf_token, gemini_key, gemini_model], | |
outputs=[send_btn] | |
) | |
gemini_input.change( | |
update_send_button_state, | |
inputs=[hf_profile, hf_token, gemini_key, gemini_model], | |
outputs=[send_btn] | |
) | |
model_selector.change( | |
update_send_button_state, | |
inputs=[hf_profile, hf_token, gemini_key, gemini_model], | |
outputs=[send_btn] | |
) | |
iframe = gr.HTML("<p>No Space created yet.</p>") | |
build_txt = gr.Textbox(label="Build Logs", lines=10, interactive=False, value="") # Initialize empty | |
run_txt = gr.Textbox(label="Container Logs", lines=10, interactive=False, value="") # Initialize empty | |
# The main event handler for the Send button | |
send_btn.click( | |
ai_workflow_chat, | |
inputs=[ | |
user_input, chatbot, | |
hf_profile, hf_token, | |
gemini_key, gemini_model, | |
repo_id, workflow, sdk_state, | |
iframe, run_txt, build_txt, # Pass current UI values | |
debug_attempts, app_description, repo_name_state, generated_code_state # Pass state variables | |
], | |
outputs=[ | |
chatbot, | |
repo_id, workflow, | |
iframe, run_txt, build_txt, # Update UI values | |
debug_attempts, app_description, repo_name_state, generated_code_state # Update state variables | |
] | |
).success( # Clear input after successful send | |
lambda: gr.update(value=""), | |
inputs=None, | |
outputs=user_input | |
) | |
# Link state variables to UI status displays | |
workflow.change(lambda s: s, inputs=workflow, outputs=status_text) | |
repo_id.change(lambda r: r if r else "None", inputs=repo_id, outputs=repo_id_text) | |
# Update logs and iframe when their state variables change (e.g., after fetch/create) | |
iframe.change(lambda h: h, inputs=iframe, outputs=iframe) # Not strictly needed, already linked via click outputs | |
build_txt.change(lambda t: t, inputs=build_txt, outputs=build_txt) # Not strictly needed | |
run_txt.change(lambda t: t, inputs=run_txt, outputs=run_txt) # Not strictly needed | |
# Add an initial message to the chatbot on load | |
def greet(): | |
return [[None, "Welcome! Please log in to Hugging Face and provide your Google AI Studio API key to start building Spaces. Once ready, type 'generate me a gradio app called myapp' or 'create' to begin."]] | |
ai_builder_tab.load(greet, outputs=chatbot) | |
if __name__ == "__main__": | |
# Set max retries for requests used by huggingface_hub internally | |
# This can help with transient network issues | |
# from requests.adapters import HTTPAdapter | |
# from urllib3.util.retry import Retry | |
# retry_strategy = Retry(total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504]) | |
# adapter = HTTPAdapter(max_retries=retry_strategy) | |
# get_session().mount("http://", adapter) | |
# get_session().mount("https://", adapter) | |
ai_builder_tab.launch() |