work / app.py
start3406's picture
Update app.py
f8eb849 verified
raw
history blame
22.5 kB
import gradio as gr
import torch
from transformers import pipeline, set_seed
# 导入 AutoPipelineForText2Image 以便兼容不同模型
from diffusers import AutoPipelineForText2Image
import openai
import os
import time
import traceback # For detailed error logging
# ---- Configuration & API Key ----
# Check for OpenAI API Key in Hugging Face Secrets
api_key = os.environ.get("OPENAI_API_KEY")
openai_client = None
openai_available = False
if api_key:
try:
# Starting with openai v1, client instantiation is preferred
openai_client = openai.OpenAI(api_key=api_key)
# Simple test to check if the key is valid (optional, but good)
# openai_client.models.list() # This call might incur small cost/quota usage
openai_available = True
print("OpenAI API key found and client initialized.")
except Exception as e:
print(f"Error initializing OpenAI client: {e}")
print("Proceeding without OpenAI features.")
else:
print("WARNING: OPENAI_API_KEY secret not found. Prompt enhancement via OpenAI is disabled.")
# Force CPU usage
device = "cpu"
print(f"Using device: {device}")
# ---- Model Loading (CPU Focused) ----
# 1. 语音转文本模型 (Whisper) - 加分项
asr_pipeline = None
try:
print("Loading ASR pipeline (Whisper) on CPU...")
# Force CPU usage with device=-1 or device="cpu"
# 使用 fp16 会更快但需要GPU,CPU上用 float32
asr_pipeline = pipeline("automatic-speech-recognition", model="openai/whisper-base", device=device, torch_dtype=torch.float32)
print("ASR pipeline loaded successfully on CPU.")
except Exception as e:
print(f"Could not load ASR pipeline: {e}. Voice input will be disabled.")
traceback.print_exc() # Print full traceback for debugging
# 2. 文本到图像模型 (nota-ai/bk-sdm-tiny) - 资源友好模型
image_generator_pipe = None
# 使用 nota-ai/bk-sdm-tiny 模型
model_id = "nota-ai/bk-sdm-tiny"
try:
print(f"Loading Text-to-Image pipeline ({model_id}) on CPU...")
print("NOTE: Using a small model for resource efficiency. Image quality and details may differ from larger models.")
# 使用 AutoPipelineForText2Image 自动识别模型类型
image_generator_pipe = AutoPipelineForText2Image.from_pretrained(model_id, torch_dtype=torch.float32)
image_generator_pipe = image_generator_pipe.to(device)
print(f"Text-to-Image pipeline ({model_id}) loaded successfully on CPU.")
except Exception as e:
print(f"CRITICAL: Could not load Text-to-Image pipeline ({model_id}): {e}. Image generation will fail.")
traceback.print_exc() # Print full traceback for debugging
# Define a dummy object to prevent crashes later if loading failed
class DummyPipe:
def __call__(self, *args, **kwargs):
raise RuntimeError(f"Text-to-Image model failed to load: {e}")
image_generator_pipe = DummyPipe()
# ---- Core Function Definitions ----
# Step 1: Prompt-to-Prompt (using OpenAI API)
def enhance_prompt_openai(short_prompt, style_modifier="cinematic", quality_boost="photorealistic, highly detailed"):
"""Uses OpenAI API to enhance the short description."""
if not openai_available or not openai_client:
# Fallback or error if OpenAI key is missing/invalid
print("OpenAI not available. Returning original prompt with modifiers.")
# Basic fallback prompt enhancement
if short_prompt:
return f"{short_prompt}, {style_modifier}, {quality_boost}"
else:
# If short prompt is empty, fallback should also indicate error
raise gr.Error("Input description cannot be empty.")
if not short_prompt:
# Return an error message formatted for Gradio output
raise gr.Error("Input description cannot be empty.")
# Construct the prompt for the OpenAI model
system_message = (
"You are an expert prompt engineer for AI image generation models. "
"Expand the user's short description into a detailed, vivid, and coherent prompt, suitable for smaller, faster text-to-image models. "
"Focus on clear subjects, objects, and main scene elements. "
"Incorporate the requested style and quality keywords naturally, but keep the overall prompt concise enough for smaller models. Avoid conversational text."
# Adjusting guidance for smaller models
)
user_message = (
f"Enhance this description: \"{short_prompt}\". "
f"Style: '{style_modifier}'. Quality: '{quality_boost}'."
)
print(f"Sending request to OpenAI for prompt enhancement: {short_prompt}")
try:
response = openai_client.chat.completions.create(
model="gpt-3.5-turbo", # Cost-effective choice
messages=[
{"role": "system", "content": system_message},
{"role": "user", "content": user_message},
],
temperature=0.7, # Controls creativity vs predictability
max_tokens=100, # Limit output length - reduced for potentially shorter prompts for smaller models
n=1, # Generate one response
stop=None # Let the model decide when to stop
)
enhanced_prompt = response.choices[0].message.content.strip()
print("OpenAI enhancement successful.")
# Basic cleanup: remove potential quotes around the whole response
if enhanced_prompt.startswith('"') and enhanced_prompt.endswith('"'):
enhanced_prompt = enhanced_prompt[1:-1]
return enhanced_prompt
except openai.AuthenticationError:
print("OpenAI Authentication Error: Invalid API key?")
raise gr.Error("OpenAI Authentication Error: Check your API key.")
except openai.RateLimitError:
print("OpenAI Rate Limit Error: You've exceeded your quota or rate limit.")
raise gr.Error("OpenAI Error: Rate limit exceeded.")
except openai.APIError as e:
print(f"OpenAI API Error: {e}")
raise gr.Error(f"OpenAI API Error: {e}")
except Exception as e:
print(f"An unexpected error occurred during OpenAI call: {e}")
traceback.print_exc()
raise gr.Error(f"Prompt enhancement failed: {e}")
# Step 2: Prompt-to-Image (CPU)
def generate_image_cpu(prompt, negative_prompt, guidance_scale, num_inference_steps):
"""Generates image using the loaded model on CPU."""
# 检查加载的模型是否是期望的pipeline类型或DummyPipe
if not isinstance(image_generator_pipe, AutoPipelineForText2Image):
# If it's a DummyPipe or None for some reason
if isinstance(image_generator_pipe, DummyPipe):
# DummyPipe will raise its own error when called, so just let it
pass # The call below will raise the intended error
else:
# Handle unexpected case where pipe is not loaded correctly
raise gr.Error("Image generation pipeline is not available (failed to load model).")
if not prompt or "[Error:" in prompt or "Error:" in prompt:
# Check if the prompt itself is an error message from the previous step
raise gr.Error("Cannot generate image due to invalid or missing prompt.")
print(f"Generating image on CPU for prompt: {prompt[:100]}...") # Log truncated prompt
# Note: Negative prompt and guidance scale might have less impact or behave differently
# on very small models.
print(f"Negative prompt: {negative_prompt}") # Will likely be ignored by tiny model
print(f"Guidance scale: {guidance_scale}, Steps: {num_inference_steps}") # Steps might be fixed internally by tiny model
start_time = time.time()
try:
# Use torch.inference_mode() or torch.no_grad() for efficiency
with torch.no_grad():
# Seed for reproducibility (optional, but good practice)
# generator = torch.Generator(device=device).manual_seed(int(time.time())) # Tiny model might not use generator param
# Call the pipeline - assuming standard parameters are accepted
output = image_generator_pipe(
prompt=prompt,
# It's possible tiny models ignore some parameters, but passing them is safer
negative_prompt=negative_prompt,
guidance_scale=float(guidance_scale),
num_inference_steps=int(num_inference_steps),
# generator=generator, # Omit if tiny model pipeline doesn't accept it
# height and width might need to be specified or limited for tiny models
# height=..., width=...
)
# Access the generated image(s). Assuming standard diffusers output structure (.images[0])
if hasattr(output, 'images') and isinstance(output.images, list) and len(output.images) > 0:
image = output.images[0] # Access the first image
else:
# Handle cases where output format is different (less common for AutoPipelines)
print("Warning: Pipeline output format unexpected. Attempting to use the output directly.")
image = output # Assume output is the image
end_time = time.time()
print(f"Image generated successfully on CPU in {end_time - start_time:.2f} seconds (using {model_id}).")
return image
except Exception as e:
print(f"Error during image generation on CPU ({model_id}): {e}")
traceback.print_exc()
# Propagate error to Gradio UI
raise gr.Error(f"Image generation failed on CPU ({model_id}): {e}")
# Bonus: Voice-to-Text (CPU)
def transcribe_audio(audio_file_path):
"""Transcribes audio to text using Whisper on CPU."""
if not asr_pipeline:
# This case should ideally be handled by hiding the control, but double-check
return "[Error: ASR model not loaded]", audio_file_path
if audio_file_path is None:
return "", audio_file_path # No audio input
print(f"Transcribing audio file: {audio_file_path} on CPU...")
start_time = time.time()
try:
# Ensure the pipeline uses the correct device (should be CPU based on loading)
# Ensure input is in expected format for Whisper pipeline (filepath or audio array)
if isinstance(audio_file_path, tuple): # Handle case where Gradio might pass tuple
# Assuming tuple is (samplerate, numpy_array), need to save to temp file or process directly
# For simplicity with type="filepath", assume it passes path directly
print("Warning: Audio input was tuple, expecting filepath. This might fail.")
# Attempting to process numpy array if it's the second element
if isinstance(audio_file_path[1], torch.Tensor) or isinstance(audio_file_path[1], list) or isinstance(audio_file_path[1], (int, float)):
# This path is complex, sticking to filepath assumption for now
pass # Let the pipeline call below handle potential error
audio_input_for_pipeline = audio_file_path # Pass original tuple, let pipeline handle
else:
audio_input_for_pipeline = audio_file_path # Expected filepath
transcription = asr_pipeline(audio_input_for_pipeline)["text"]
end_time = time.time()
print(f"Transcription successful in {end_time - start_time:.2f} seconds.")
print(f"Transcription result: {transcription}")
return transcription, audio_file_path
except Exception as e:
print(f"Error during audio transcription on CPU: {e}")
traceback.print_exc()
# Return error message in the expected tuple format
return f"[Error: Transcription failed: {e}]", audio_file_path
# ---- Gradio Application Flow ----
def process_input(input_text, audio_file, style_choice, quality_choice, neg_prompt, guidance, steps):
"""Main function triggered by Gradio button."""
final_text_input = ""
enhanced_prompt = ""
generated_image = None
status_message = "" # To gather status/errors for the prompt box
# 1. Determine Input (Text or Audio)
if input_text and input_text.strip():
final_text_input = input_text.strip()
print(f"Using text input: '{final_text_input}'")
elif audio_file is not None:
print("Processing audio input...")
try:
# transcribe_audio handles different Gradio audio output types potentially
transcribed_text, _ = transcribe_audio(audio_file)
if "[Error:" in transcribed_text:
# Display transcription error clearly
status_message = transcribed_text
print(status_message)
return status_message, None # Return error in prompt field, no image
elif transcribed_text:
final_text_input = transcribed_text
print(f"Using transcribed audio input: '{final_text_input}'")
else:
status_message = "[Error: Audio input received but transcription was empty.]"
print(status_message)
return status_message, None # Return error
except Exception as e:
status_message = f"[Unexpected Audio Transcription Error: {e}]"
print(status_message)
traceback.print_exc()
return status_message, None # Return error
else:
status_message = "[Error: No input provided. Please enter text or record audio.]"
print(status_message)
return status_message, None # Return error
# 2. Enhance Prompt (using OpenAI if available)
if final_text_input:
try:
enhanced_prompt = enhance_prompt_openai(final_text_input, style_choice, quality_choice)
status_message = enhanced_prompt # Display the prompt initially
print(f"Enhanced prompt: {enhanced_prompt}")
except gr.Error as e:
# Catch Gradio-specific errors from enhancement function
status_message = f"[Prompt Enhancement Error: {e}]"
print(status_message)
# Return the error, no image generation attempt
return status_message, None
except Exception as e:
# Catch any other unexpected errors
status_message = f"[Unexpected Prompt Enhancement Error: {e}]"
print(status_message)
traceback.print_exc()
return status_message, None
# 3. Generate Image (if prompt is valid)
# Check if the enhanced prompt step resulted in an error message
if enhanced_prompt and not status_message.startswith("[Error:") and not status_message.startswith("[Prompt Enhancement Error:"):
try:
# Show "Generating..." message while waiting
gr.Info(f"Starting image generation on CPU using {model_id}. This should be faster than full SD, but might still take time.")
generated_image = generate_image_cpu(enhanced_prompt, neg_prompt, guidance, steps)
gr.Info("Image generation complete!")
except gr.Error as e:
# Catch Gradio errors from generation function
# Prepend original enhanced prompt to the error message for context
status_message = f"{enhanced_prompt}\n\n[Image Generation Error: {e}]"
print(f"Image Generation Error: {e}")
generated_image = None # Ensure image is None on error
except Exception as e:
# Catch any other unexpected errors
status_message = f"{enhanced_prompt}\n\n[Unexpected Image Generation Error: {e}]"
print(f"Unexpected Image Generation Error: {e}")
traceback.print_exc()
generated_image = None # Ensure image is None on error
else:
# If prompt enhancement failed, status_message already contains the error
# In this case, we just return the existing status_message and None image
print("Skipping image generation due to prompt enhancement failure.")
# 4. Return results to Gradio UI
# Return the status message (enhanced prompt or error) and the image (or None if error)
return status_message, generated_image
# ---- Gradio Interface Construction ----
style_options = ["cinematic", "photorealistic", "anime", "fantasy art", "cyberpunk", "steampunk", "watercolor", "illustration", "low poly"]
quality_options = ["highly detailed", "sharp focus", "intricate details", "4k", "masterpiece", "best quality", "professional lighting"]
# Adjust steps/guidance defaults for a smaller model, still might be ignored by some pipelines
default_steps = 20
max_steps = 40 # Adjusted max steps
default_guidance = 5.0 # Adjusted default guidance
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("# AI Image Generator (CPU Version - Using Small Model)")
gr.Markdown(
"**Enter a short description or use voice input.** The app uses OpenAI (if API key is provided) "
f"to create a detailed prompt, then generates an image using a **small model ({model_id}) on the CPU**."
)
# Add specific warning about CPU speed and potential resource issues for this specific model
gr.HTML("<p style='color:orange;font-weight:bold;'>⚠️ Note: Using a small model for better compatibility on CPU. Generation should be faster than full Stable Diffusion, but quality/details may differ.</p>")
gr.HTML("<p style='color:red;font-weight:bold;'>⏰ CPU generation can still take 1-5 minutes per image depending on load and model specifics.</p>")
# Display OpenAI availability status
if not openai_available:
gr.Markdown("**Note:** OpenAI API key not found or invalid. Prompt enhancement will use a basic fallback.")
else:
gr.Markdown("**Note:** OpenAI API key found. Prompt will be enhanced using OpenAI.")
# Display Model loading status
# Check against AutoPipelineForText2Image type
if not isinstance(image_generator_pipe, AutoPipelineForText2Image):
gr.Markdown(f"**CRITICAL:** Image generation model ({model_id}) failed to load. Image generation is disabled. Check Space logs for details.")
with gr.Row():
with gr.Column(scale=1):
# --- Inputs ---
inp_text = gr.Textbox(label="Enter short description", placeholder="e.g., A cute robot drinking coffee on Mars")
# Only show Audio input if ASR model loaded successfully
if asr_pipeline:
inp_audio = gr.Audio(sources=["microphone"], type="filepath", label="Or record your idea (clears text box if used)")
else:
gr.Markdown("**Voice input disabled:** Whisper model failed to load.")
# Using gr.State as a placeholder that holds None
inp_audio = gr.State(None)
# --- Controls ---
# Note: These controls might have less impact than on larger models
gr.Markdown("*(Optional controls - Note: Their impact might vary on this small model)*")
# Control 1: Dropdown
inp_style = gr.Dropdown(label="Base Style", choices=style_options, value="cinematic")
# Control 2: Radio
inp_quality = gr.Radio(label="Quality Boost", choices=quality_options, value="highly detailed")
# Control 3: Textbox (Negative Prompt)
inp_neg_prompt = gr.Textbox(label="Negative Prompt (optional)", placeholder="e.g., blurry, low quality, text, watermark, signature, deformed")
# Control 4: Slider (Guidance Scale)
inp_guidance = gr.Slider(minimum=1.0, maximum=10.0, step=0.5, value=default_guidance, label="Guidance Scale (CFG)") # Lower max guidance
# Control 5: Slider (Inference Steps) - Adjusted max/default
inp_steps = gr.Slider(minimum=5, maximum=max_steps, step=1, value=default_steps, label=f"Inference Steps (lower = faster but less detail, max {max_steps})") # Lower min steps
# --- Action Button ---
# Disable button if model failed to load
btn_generate = gr.Button("Generate Image", variant="primary", interactive=isinstance(image_generator_pipe, AutoPipelineForText2Image))
with gr.Column(scale=1):
# --- Outputs ---
out_prompt = gr.Textbox(label="Generated Prompt / Status", interactive=False, lines=5) # Show prompt or error status here
out_image = gr.Image(label="Generated Image", type="pil", show_label=True) # Ensure label is shown
# --- Event Handling ---
# Define inputs list carefully, handling potentially invisible audio input
inputs_list = [inp_text]
if asr_pipeline:
inputs_list.append(inp_audio)
else:
inputs_list.append(inp_audio) # Pass the gr.State(None) placeholder
inputs_list.extend([inp_style, inp_quality, inp_neg_prompt, inp_guidance, inp_steps])
# Link button click to processing function
btn_generate.click(
fn=process_input,
inputs=inputs_list,
outputs=[out_prompt, out_image]
)
# Clear text input if audio is used (only if ASR is available)
if asr_pipeline:
def clear_text_on_audio_change(audio_data):
# Check if audio_data is not None or empty (depending on how Gradio signals recording)
if audio_data is not None:
print("Audio input detected, clearing text box.")
return "" # Clear text box
# If audio_data becomes None (e.g., recording cleared), don't clear text
return gr.update()
# .change event fires when the value changes, including becoming None if cleared
inp_audio.change(fn=clear_text_on_audio_change, inputs=inp_audio, outputs=inp_text, api_name="clear_text_on_audio")
# ---- Application Launch ----
if __name__ == "__main__":
# Final check before launch
# Check against AutoPipelineForText2Image type
if not isinstance(image_generator_pipe, AutoPipelineForText2Image):
print("\n" + "="*50)
print("CRITICAL WARNING:")
print(f"Image generation model ({model_id}) failed to load during startup.")
print("The Gradio UI will launch, but the 'Generate Image' button will be disabled.")
print("Check the Space logs above for the specific model loading error.")
print("="*50 + "\n")
# Launch the Gradio app
# Running on 0.0.0.0 is necessary for Hugging Face Spaces
demo.launch(share=False, server_name="0.0.0.0", server_port=7860)