|
import gradio as gr |
|
import torch |
|
from transformers import pipeline, set_seed |
|
from diffusers import StableDiffusionPipeline |
|
import openai |
|
import os |
|
import time |
|
import traceback |
|
|
|
|
|
|
|
api_key = os.environ.get("OPENAI_API_KEY") |
|
openai_client = None |
|
openai_available = False |
|
|
|
if api_key: |
|
try: |
|
openai.api_key = api_key |
|
|
|
openai_client = openai.OpenAI(api_key=api_key) |
|
|
|
|
|
openai_available = True |
|
print("OpenAI API key found and client initialized.") |
|
except Exception as e: |
|
print(f"Error initializing OpenAI client: {e}") |
|
print("Proceeding without OpenAI features.") |
|
else: |
|
print("WARNING: OPENAI_API_KEY secret not found. Prompt enhancement via OpenAI is disabled.") |
|
|
|
|
|
device = "cpu" |
|
print(f"Using device: {device}") |
|
|
|
|
|
|
|
|
|
asr_pipeline = None |
|
try: |
|
print("Loading ASR pipeline (Whisper) on CPU...") |
|
|
|
asr_pipeline = pipeline("automatic-speech-recognition", model="openai/whisper-base", device=device) |
|
print("ASR pipeline loaded successfully on CPU.") |
|
except Exception as e: |
|
print(f"Could not load ASR pipeline: {e}. Voice input will be disabled.") |
|
traceback.print_exc() |
|
|
|
|
|
image_generator_pipe = None |
|
try: |
|
print("Loading Stable Diffusion pipeline (v1.5) on CPU...") |
|
print("WARNING: Stable Diffusion on CPU is VERY SLOW (expect minutes per image).") |
|
model_id = "runwayml/stable-diffusion-v1-5" |
|
|
|
image_generator_pipe = StableDiffusionPipeline.from_pretrained(model_id, torch_dtype=torch.float32) |
|
image_generator_pipe = image_generator_pipe.to(device) |
|
print("Stable Diffusion pipeline loaded successfully on CPU.") |
|
except Exception as e: |
|
print(f"CRITICAL: Could not load Stable Diffusion pipeline: {e}. Image generation will fail.") |
|
traceback.print_exc() |
|
|
|
class DummyPipe: |
|
def __call__(self, *args, **kwargs): |
|
raise RuntimeError(f"Stable Diffusion model failed to load: {e}") |
|
image_generator_pipe = DummyPipe() |
|
|
|
|
|
|
|
|
|
|
|
def enhance_prompt_openai(short_prompt, style_modifier="cinematic", quality_boost="photorealistic, highly detailed"): |
|
"""Uses OpenAI API to enhance the short description.""" |
|
if not openai_available or not openai_client: |
|
|
|
print("OpenAI not available. Returning original prompt with modifiers.") |
|
return f"{short_prompt}, {style_modifier}, {quality_boost}" |
|
if not short_prompt: |
|
|
|
raise gr.Error("Input description cannot be empty.") |
|
|
|
|
|
system_message = ( |
|
"You are an expert prompt engineer for AI image generation models like Stable Diffusion. " |
|
"Expand the user's short description into a detailed, vivid, and coherent prompt. " |
|
"Focus on visual details: subjects, objects, environment, lighting, atmosphere, composition. " |
|
"Incorporate the requested style and quality keywords naturally. Avoid conversational text." |
|
) |
|
user_message = ( |
|
f"Enhance this description: \"{short_prompt}\". " |
|
f"Style: '{style_modifier}'. Quality: '{quality_boost}'." |
|
) |
|
|
|
print(f"Sending request to OpenAI for prompt enhancement: {short_prompt}") |
|
|
|
try: |
|
response = openai_client.chat.completions.create( |
|
model="gpt-3.5-turbo", |
|
messages=[ |
|
{"role": "system", "content": system_message}, |
|
{"role": "user", "content": user_message}, |
|
], |
|
temperature=0.7, |
|
max_tokens=150, |
|
n=1, |
|
stop=None |
|
) |
|
enhanced_prompt = response.choices[0].message.content.strip() |
|
print("OpenAI enhancement successful.") |
|
|
|
if enhanced_prompt.startswith('"') and enhanced_prompt.endswith('"'): |
|
enhanced_prompt = enhanced_prompt[1:-1] |
|
return enhanced_prompt |
|
except openai.AuthenticationError: |
|
print("OpenAI Authentication Error: Invalid API key?") |
|
raise gr.Error("OpenAI Authentication Error: Check your API key.") |
|
except openai.RateLimitError: |
|
print("OpenAI Rate Limit Error: You've exceeded your quota or rate limit.") |
|
raise gr.Error("OpenAI Error: Rate limit exceeded.") |
|
except openai.APIError as e: |
|
print(f"OpenAI API Error: {e}") |
|
raise gr.Error(f"OpenAI API Error: {e}") |
|
except Exception as e: |
|
print(f"An unexpected error occurred during OpenAI call: {e}") |
|
traceback.print_exc() |
|
raise gr.Error(f"Prompt enhancement failed: {e}") |
|
|
|
|
|
|
|
def generate_image_cpu(prompt, negative_prompt, guidance_scale, num_inference_steps): |
|
"""Generates image using Stable Diffusion on CPU.""" |
|
if not isinstance(image_generator_pipe, StableDiffusionPipeline): |
|
raise gr.Error("Stable Diffusion model is not available (failed to load).") |
|
if not prompt or "[Error:" in prompt or "Error:" in prompt: |
|
|
|
raise gr.Error("Cannot generate image due to invalid or missing prompt.") |
|
|
|
print(f"Generating image on CPU for prompt: {prompt[:100]}...") |
|
print(f"Negative prompt: {negative_prompt}") |
|
print(f"Guidance scale: {guidance_scale}, Steps: {num_inference_steps}") |
|
start_time = time.time() |
|
|
|
try: |
|
|
|
with torch.no_grad(): |
|
|
|
generator = torch.Generator(device=device).manual_seed(int(time.time())) |
|
image = image_generator_pipe( |
|
prompt=prompt, |
|
negative_prompt=negative_prompt, |
|
guidance_scale=float(guidance_scale), |
|
num_inference_steps=int(num_inference_steps), |
|
generator=generator, |
|
).images[0] |
|
end_time = time.time() |
|
print(f"Image generated successfully on CPU in {end_time - start_time:.2f} seconds.") |
|
return image |
|
except Exception as e: |
|
print(f"Error during image generation on CPU: {e}") |
|
traceback.print_exc() |
|
|
|
raise gr.Error(f"Image generation failed on CPU: {e}") |
|
|
|
|
|
|
|
def transcribe_audio(audio_file_path): |
|
"""Transcribes audio to text using Whisper on CPU.""" |
|
if not asr_pipeline: |
|
|
|
return "[Error: ASR model not loaded]", audio_file_path |
|
if audio_file_path is None: |
|
return "", audio_file_path |
|
|
|
print(f"Transcribing audio file: {audio_file_path} on CPU...") |
|
start_time = time.time() |
|
try: |
|
|
|
transcription = asr_pipeline(audio_file_path)["text"] |
|
end_time = time.time() |
|
print(f"Transcription successful in {end_time - start_time:.2f} seconds.") |
|
print(f"Transcription result: {transcription}") |
|
return transcription, audio_file_path |
|
except Exception as e: |
|
print(f"Error during audio transcription on CPU: {e}") |
|
traceback.print_exc() |
|
|
|
return f"[Error: Transcription failed: {e}]", audio_file_path |
|
|
|
|
|
|
|
|
|
def process_input(input_text, audio_file, style_choice, quality_choice, neg_prompt, guidance, steps): |
|
"""Main function triggered by Gradio button.""" |
|
final_text_input = "" |
|
enhanced_prompt = "" |
|
generated_image = None |
|
status_message = "" |
|
|
|
|
|
if input_text and input_text.strip(): |
|
final_text_input = input_text.strip() |
|
print(f"Using text input: '{final_text_input}'") |
|
elif audio_file is not None: |
|
print("Processing audio input...") |
|
transcribed_text, _ = transcribe_audio(audio_file) |
|
if "[Error:" in transcribed_text: |
|
|
|
status_message = transcribed_text |
|
print(status_message) |
|
|
|
return status_message, None |
|
elif transcribed_text: |
|
final_text_input = transcribed_text |
|
print(f"Using transcribed audio input: '{final_text_input}'") |
|
else: |
|
status_message = "[Error: Audio input received but transcription was empty.]" |
|
print(status_message) |
|
return status_message, None |
|
else: |
|
status_message = "[Error: No input provided. Please enter text or record audio.]" |
|
print(status_message) |
|
return status_message, None |
|
|
|
|
|
if final_text_input: |
|
try: |
|
enhanced_prompt = enhance_prompt_openai(final_text_input, style_choice, quality_choice) |
|
status_message = enhanced_prompt |
|
print(f"Enhanced prompt: {enhanced_prompt}") |
|
except gr.Error as e: |
|
|
|
status_message = f"[Prompt Enhancement Error: {e}]" |
|
print(status_message) |
|
|
|
return status_message, None |
|
except Exception as e: |
|
|
|
status_message = f"[Unexpected Prompt Enhancement Error: {e}]" |
|
print(status_message) |
|
traceback.print_exc() |
|
return status_message, None |
|
|
|
|
|
if enhanced_prompt and not status_message.startswith("[Error:") and not status_message.startswith("[Prompt Enhancement Error:"): |
|
try: |
|
|
|
gr.Info("Starting image generation on CPU... This will take a while (possibly several minutes).") |
|
generated_image = generate_image_cpu(enhanced_prompt, neg_prompt, guidance, steps) |
|
gr.Info("Image generation complete!") |
|
except gr.Error as e: |
|
|
|
status_message = f"{enhanced_prompt}\n\n[Image Generation Error: {e}]" |
|
print(f"Image Generation Error: {e}") |
|
except Exception as e: |
|
status_message = f"{enhanced_prompt}\n\n[Unexpected Image Generation Error: {e}]" |
|
print(f"Unexpected Image Generation Error: {e}") |
|
traceback.print_exc() |
|
|
|
generated_image = None |
|
|
|
|
|
|
|
return status_message, generated_image |
|
|
|
|
|
|
|
|
|
style_options = ["cinematic", "photorealistic", "anime", "fantasy art", "cyberpunk", "steampunk", "watercolor", "illustration", "low poly"] |
|
quality_options = ["highly detailed", "sharp focus", "intricate details", "4k", "masterpiece", "best quality", "professional lighting"] |
|
|
|
|
|
default_steps = 20 |
|
max_steps = 50 |
|
|
|
with gr.Blocks(theme=gr.themes.Soft()) as demo: |
|
gr.Markdown("# AI Image Generator (CPU Version)") |
|
gr.Markdown( |
|
"**Enter a short description or use voice input.** The app uses OpenAI (if API key is provided) " |
|
"to create a detailed prompt, then generates an image using Stable Diffusion v1.5 **on the CPU**." |
|
) |
|
|
|
gr.HTML("<p style='color:orange;font-weight:bold;'>⚠️ Warning: Image generation on CPU is very slow! Expect several minutes per image.</p>") |
|
|
|
|
|
if not openai_available: |
|
gr.Markdown("**Note:** OpenAI API key not found or invalid. Prompt enhancement will use a basic fallback.") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
|
|
inp_text = gr.Textbox(label="Enter short description", placeholder="e.g., A cute robot drinking coffee on Mars") |
|
|
|
|
|
if asr_pipeline: |
|
inp_audio = gr.Audio(sources=["microphone"], type="filepath", label="Or record your idea (clears text box if used)") |
|
else: |
|
gr.Markdown("**Voice input disabled:** Whisper model failed to load.") |
|
inp_audio = gr.Textbox(visible=False) |
|
|
|
|
|
|
|
inp_style = gr.Dropdown(label="Base Style", choices=style_options, value="cinematic") |
|
|
|
inp_quality = gr.Radio(label="Quality Boost", choices=quality_options, value="highly detailed") |
|
|
|
inp_neg_prompt = gr.Textbox(label="Negative Prompt (optional)", placeholder="e.g., blurry, low quality, text, watermark, signature, deformed") |
|
|
|
inp_guidance = gr.Slider(minimum=1.0, maximum=15.0, step=0.5, value=7.0, label="Guidance Scale (CFG)") |
|
|
|
inp_steps = gr.Slider(minimum=10, maximum=max_steps, step=1, value=default_steps, label=f"Inference Steps (lower = faster but less detail, max {max_steps})") |
|
|
|
|
|
btn_generate = gr.Button("Generate Image", variant="primary") |
|
|
|
with gr.Column(scale=1): |
|
|
|
out_prompt = gr.Textbox(label="Generated Prompt / Status", interactive=False, lines=5) |
|
out_image = gr.Image(label="Generated Image", type="pil") |
|
|
|
|
|
|
|
inputs_list = [inp_text] |
|
if asr_pipeline: |
|
inputs_list.append(inp_audio) |
|
else: |
|
inputs_list.append(gr.State(None)) |
|
|
|
inputs_list.extend([inp_style, inp_quality, inp_neg_prompt, inp_guidance, inp_steps]) |
|
|
|
btn_generate.click( |
|
fn=process_input, |
|
inputs=inputs_list, |
|
outputs=[out_prompt, out_image] |
|
) |
|
|
|
|
|
if asr_pipeline: |
|
def clear_text_on_audio(audio_data): |
|
if audio_data is not None: |
|
return "" |
|
return gr.update() |
|
inp_audio.change(fn=clear_text_on_audio, inputs=inp_audio, outputs=inp_text) |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
if not isinstance(image_generator_pipe, StableDiffusionPipeline): |
|
print("CRITICAL FAILURE: Stable Diffusion pipeline did not load. The application UI will load, but image generation WILL NOT WORK.") |
|
|
|
|
|
|
|
|
|
demo.launch(share=False) |