import gradio as gr import torch from transformers import pipeline, set_seed from diffusers import AutoPipelineForText2Image # 导入 AutoPipelineForText2Image 以便兼容不同模型 import openai import os import time import traceback # 用于详细错误日志记录 from typing import Optional, Tuple, Union # 用于类型提示 from PIL import Image # 用于图像类型提示 # ---- Configuration & API Key ---- # 检查 Hugging Face Secrets 中是否存在 OpenAI API Key api_key: Optional[str] = os.environ.get("OPENAI_API_KEY") openai_client: Optional[openai.OpenAI] = None openai_available: bool = False if api_key: try: # 使用 openai v1 版本,推荐实例化 client # openai.api_key = api_key # 老版本写法,新版本推荐下方实例化 openai_client = openai.OpenAI(api_key=api_key) # 可选:简单的测试检查密钥是否有效(可能产生少量费用/占用配额) # openai_client.models.list() openai_available = True print("OpenAI API key found and client initialized.") except Exception as e: print(f"Error initializing OpenAI client: {e}") print("Proceeding without OpenAI features.") else: print("WARNING: OPENAI_API_KEY secret not found. Prompt enhancement via OpenAI is disabled.") # 强制使用 CPU device: str = "cpu" print(f"Using device: {device}") # 定义 DummyPipe 类,用于模型加载失败时的占位符 # 需要在模型加载块之前定义 class DummyPipe: """ A placeholder class used when the actual image generation pipeline fails to load. Its __call__ method raises a RuntimeError indicating the failure. """ def __call__(self, *args, **kwargs) -> None: # 这个错误消息会被调用者 (process_input -> generate_image_cpu) 捕获并显示 raise RuntimeError("Image generation pipeline is not available (failed to load model).") # ---- Model Loading (CPU Focused) ---- # 1. 语音转文本模型 (Whisper) - 可选功能 asr_pipeline = None try: print("Loading ASR pipeline (Whisper) on CPU...") # 强制使用 CPU,并使用 float32 类型以兼容 CPU asr_pipeline = pipeline("automatic-speech-recognition", model="openai/whisper-base", device=device, torch_dtype=torch.float32) print("ASR pipeline loaded successfully on CPU.") except Exception as e: print(f"Could not load ASR pipeline (Whisper): {e}. Voice input will be disabled.") traceback.print_exc() # 打印完整 traceback 以便于调试 # 2. 文本到图像模型 (nota-ai/bk-sdm-tiny) - 资源友好模型 image_generator_pipe: Union[AutoPipelineForText2Image, DummyPipe] = DummyPipe() # 初始化为 DummyPipe model_id: str = "nota-ai/bk-sdm-tiny" # 使用 nota-ai/bk-sdm-tiny 模型 try: print(f"Loading Text-to-Image pipeline ({model_id}) on CPU...") print("NOTE: Using a small model for resource efficiency. Image quality and details may differ from larger models.") # 使用 AutoPipelineForText2Image 自动识别模型类型 pipeline_instance = AutoPipelineForText2Image.from_pretrained(model_id, torch_dtype=torch.float32) image_generator_pipe = pipeline_instance.to(device) print(f"Text-to-Image pipeline ({model_id}) loaded successfully on CPU.") except Exception as e: print(f"CRITICAL: Could not load Text-to-Image pipeline ({model_id}): {e}. Image generation will fail.") traceback.print_exc() # 打印完整 traceback 以便于调试 # image_generator_pipe 保持为初始化的 DummyPipe() # ---- Core Function Definitions ---- # Step 1: Prompt Enhancement (using OpenAI API or Fallback) def enhance_prompt_openai(short_prompt: str, style_modifier: str = "cinematic", quality_boost: str = "photorealistic, highly detailed") -> str: """使用 OpenAI API (如果可用) 增强用户输入的简短描述。""" if not short_prompt or not short_prompt.strip(): # 如果输入为空,直接抛出错误 raise gr.Error("Input description cannot be empty.") if not openai_available or not openai_client: # 如果 OpenAI 不可用,使用基本备用方案 print("OpenAI not available. Returning original prompt with modifiers.") return f"{short_prompt.strip()}, {style_modifier}, {quality_boost}" # 如果 OpenAI 可用,构建并发送请求 system_message: str = ( "You are an expert prompt engineer for AI image generation models. " "Expand the user's short description into a detailed, vivid, and coherent prompt, suitable for smaller, faster text-to-image models. " "Focus on clear subjects, objects, and main scene elements. " "Incorporate the requested style and quality keywords naturally, but keep the overall prompt concise enough for smaller models. Avoid conversational text." ) user_message: str = ( f"Enhance this description: \"{short_prompt.strip()}\". " f"Style: '{style_modifier}'. Quality: '{quality_boost}'." ) print(f"Sending request to OpenAI for prompt enhancement: '{short_prompt.strip()}'") try: response = openai_client.chat.completions.create( model="gpt-3.5-turbo", # 成本效益高的选择 messages=[ {"role": "system", "content": system_message}, {"role": "user", "content": user_message}, ], temperature=0.7, # 控制创造性 max_tokens=100, # 限制输出长度 n=1, # 生成一个响应 stop=None # 让模型决定何时停止 ) enhanced_prompt: str = response.choices[0].message.content.strip() print("OpenAI enhancement successful.") # 基本清理:移除可能出现在整个响应外部的引号 if enhanced_prompt.startswith('"') and enhanced_prompt.endswith('"'): enhanced_prompt = enhanced_prompt[1:-1] return enhanced_prompt except openai.AuthenticationError: print("OpenAI Authentication Error: Invalid API key?") raise gr.Error("OpenAI Authentication Error: Check your API key.") except openai.RateLimitError: print("OpenAI Rate Limit Error: You've exceeded your quota or rate limit.") raise gr.Error("OpenAI Error: Rate limit exceeded.") except openai.APIError as e: print(f"OpenAI API Error: {e}") raise gr.Error(f"OpenAI API Error: {e}") except Exception as e: print(f"An unexpected error occurred during OpenAI call: {e}") traceback.print_exc() raise gr.Error(f"Prompt enhancement failed: {e}") # Step 2: Image Generation (CPU) def generate_image_cpu(prompt: str, negative_prompt: str, guidance_scale: float, num_inference_steps: int) -> Image.Image: """在 CPU 上使用加载的模型生成图像。""" # 检查模型是否成功加载 (是否是 DummyPipe) if isinstance(image_generator_pipe, DummyPipe): # 如果是 DummyPipe,调用它会抛出加载失败的错误 image_generator_pipe() # 这会直接抛出 intended 的错误 # 如果不是 DummyPipe,它应该是 AutoPipelineForText2Image 的实例 if not prompt or "[Error:" in prompt or "Error:" in prompt: # 检查提示词本身是否是来自前一步的错误信息 raise gr.Error("Cannot generate image due to invalid or missing prompt.") print(f"Generating image on CPU for prompt: {prompt[:100]}...") # 记录截断的提示词 # 注意:负面提示词、guidance_scale 和 num_inference_steps 对小型模型影响可能较小或行为不同 print(f"Negative prompt: {negative_prompt}") print(f"Guidance scale: {guidance_scale}, Steps: {num_inference_steps}") start_time: float = time.time() try: # 使用 torch.no_grad() 提高效率 with torch.no_grad(): # 调用 pipeline # 传递标准参数,即使小型模型可能忽略其中一些 output = image_generator_pipe( prompt=prompt, negative_prompt=negative_prompt, guidance_scale=float(guidance_scale), num_inference_steps=int(num_inference_steps), # generator 和 height/width 参数可能需要根据具体小型模型进行调整或省略 # generator=torch.Generator(device=device).manual_seed(int(time.time())), # height=..., width=... ) # 获取生成的图像。假设标准的 diffusers 输出结构 (.images[0]) if hasattr(output, 'images') and isinstance(output.images, list) and len(output.images) > 0: image: Image.Image = output.images[0] # 获取第一张图片 else: # 处理输出格式不同的情况 (AutoPipelines 较少出现) print("Warning: Pipeline output format unexpected. Attempting to use the output directly.") # 尝试将整个输出视为图像,但这可能需要根据实际模型输出类型进行调整 if isinstance(output, Image.Image): image = output else: # 如果输出既没有 .images 也不是 PIL Image,则认为是失败 raise RuntimeError(f"Image generation pipeline returned unexpected output type: {type(output)}") end_time: float = time.time() print(f"Image generated successfully on CPU in {end_time - start_time:.2f} seconds (using {model_id}).") return image except Exception as e: print(f"Error during image generation on CPU ({model_id}): {e}") traceback.print_exc() # 将错误传播给 Gradio UI raise gr.Error(f"Image generation failed on CPU ({model_id}): {e}") # Bonus: Voice-to-Text (CPU) def transcribe_audio(audio_file_path: Optional[str]) -> Tuple[str, Optional[str]]: """使用 Whisper 在 CPU 上将音频转录为文本。""" # 检查 ASR pipeline 是否加载成功 if not asr_pipeline: # 返回错误信息 tuple return "[Error: ASR model not loaded]", audio_file_path if audio_file_path is None: # 没有音频输入,返回空字符串 return "", audio_file_path print(f"Transcribing audio file: {audio_file_path} on CPU...") start_time: float = time.time() try: # 假设 audio_file_path 是一个字符串路径,因为 Gradio Audio 组件 type="filepath" # asr_pipeline 期望输入是文件路径字符串或音频数据数组 # 这里假设 type="filepath" 传递的是文件路径 transcription: str = asr_pipeline(audio_file_path)["text"] end_time: float = time.time() print(f"Transcription successful in {end_time - start_time:.2f} seconds.") print(f"Transcription result: {transcription}") return transcription, audio_file_path except Exception as e: print(f"Error during audio transcription on CPU: {e}") traceback.print_exc() # 返回错误信息 tuple return f"[Error: Transcription failed: {e}]", audio_file_path # ---- Gradio Application Flow ---- def process_input( input_text: str, audio_file: Optional[str], # 根据 type="filepath" 是字符串路径或 None style_choice: str, quality_choice: str, neg_prompt: str, guidance: float, steps: int ) -> Tuple[str, Optional[Image.Image]]: """由 Gradio 按钮触发的主处理函数。""" final_text_input: str = "" enhanced_prompt: str = "" generated_image: Optional[Image.Image] = None status_message: str = "" # 用于在 prompt 输出框显示状态/错误 # 1. 确定输入 (文本或语音) if input_text and input_text.strip(): final_text_input = input_text.strip() print(f"Using text input: '{final_text_input}'") elif audio_file is not None: print("Processing audio input...") try: transcribed_text, _ = transcribe_audio(audio_file) if "[Error:" in transcribed_text: # 清晰显示转录错误 status_message = transcribed_text print(status_message) return status_message, None # 在 prompt 字段返回错误,不生成图像 elif transcribed_text and transcribed_text.strip(): # 确保转录结果不为空 final_text_input = transcribed_text.strip() print(f"Using transcribed audio input: '{final_text_input}'") else: status_message = "[Error: Audio input received but transcription was empty or whitespace.]" print(status_message) return status_message, None # 返回错误 except Exception as e: status_message = f"[Unexpected Audio Transcription Error: {e}]" print(status_message) traceback.print_exc() return status_message, None # 返回错误 else: status_message = "[Error: No input provided. Please enter text or record audio.]" print(status_message) return status_message, None # 返回错误 # 2. 增强提示词 (使用 OpenAI 如果可用) if final_text_input: try: enhanced_prompt = enhance_prompt_openai(final_text_input, style_choice, quality_choice) status_message = enhanced_prompt # 初始显示增强后的提示词 print(f"Enhanced prompt: {enhanced_prompt}") except gr.Error as e: # 捕获来自增强函数的 Gradio 特定的错误 status_message = f"[Prompt Enhancement Error: {e}]" print(status_message) # 返回错误,不尝试生成图像 return status_message, None except Exception as e: # 捕获其他意外错误 status_message = f"[Unexpected Prompt Enhancement Error: {e}]" print(status_message) traceback.print_exc() return status_message, None # 3. 生成图像 (如果提示词有效) # 检查增强提示词步骤是否返回了错误信息 if enhanced_prompt and not status_message.startswith("[Error:") and not status_message.startswith("[Prompt Enhancement Error:"): try: # 显示“正在生成...”消息 gr.Info(f"Starting image generation on CPU using {model_id}. This should be faster than full SD, but might still take time.") generated_image = generate_image_cpu(enhanced_prompt, neg_prompt, guidance, steps) gr.Info("Image generation complete!") except gr.Error as e: # 捕获来自生成函数的 Gradio 错误 # 在错误消息前加上原始的增强提示词以便提供上下文 status_message = f"{enhanced_prompt}\n\n[Image Generation Error: {e}]" print(f"Image Generation Error: {e}") generated_image = None # 确保错误时图像为 None except Exception as e: # 捕获其他意外错误 status_message = f"{enhanced_prompt}\n\n[Unexpected Image Generation Error: {e}]" print(f"Unexpected Image Generation Error: {e}") traceback.print_exc() generated_image = None # 确保错误时图像为 None else: # 如果提示词增强失败,status_message 已经包含了错误信息 # 此时,我们只返回现有的 status_message 和 None 图像 print("Skipping image generation due to prompt enhancement failure.") # 4. 将结果返回给 Gradio UI # 返回状态消息 (增强提示词或错误) 和图像 (如果出错则为 None) return status_message, generated_image # ---- Gradio Interface Construction ---- style_options: list[str] = ["cinematic", "photorealistic", "anime", "fantasy art", "cyberpunk", "steampunk", "watercolor", "illustration", "low poly"] quality_options: list[str] = ["highly detailed", "sharp focus", "intricate details", "4k", "masterpiece", "best quality", "professional lighting"] # 为小型模型调整步数/Guidance Scale 默认值和最大值,注意它们的影响可能不如大型模型显著 default_steps: int = 20 max_steps: int = 40 # 调整最大步数 default_guidance: float = 5.0 # 调整默认 Guidance Scale max_guidance: float = 10.0 # 调整最大 Guidance Scale with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# AI Image Generator (CPU Version - Using Small Model)") gr.Markdown( "**Enter a short description or use voice input.** The app uses OpenAI (if API key is provided) " f"to create a detailed prompt, then generates an image using a **small model ({model_id}) on the CPU**." ) # 添加关于 CPU 速度和模型特性的警告 gr.HTML("

⚠️ Note: Using a small model for better compatibility on CPU. Generation should be faster than full Stable Diffusion, but quality/details may differ.

") gr.HTML("

⏰ CPU generation can still take 1-5 minutes per image depending on load and model specifics.

") # 显示 OpenAI 可用状态 if not openai_available: gr.Markdown("**Note:** OpenAI API key not found or invalid. Prompt enhancement will use a basic fallback.") else: gr.Markdown("**Note:** OpenAI API key found. Prompt will be enhanced using OpenAI.") # 显示模型加载状态 - 修改检查逻辑 # 检查 image_generator_pipe 是否是 DummyPipe,如果是则表示加载失败 if isinstance(image_generator_pipe, DummyPipe): gr.Markdown(f"**CRITICAL:** Image generation model ({model_id}) failed to load. Image generation is disabled. Check Space logs for details.") with gr.Row(): with gr.Column(scale=1): # --- 输入控件 --- inp_text = gr.Textbox(label="Enter short description", placeholder="e.g., A cute robot drinking coffee on Mars") # 只有当 ASR 模型加载成功时才显示音频输入控件 if asr_pipeline: # type="filepath" 会将录音保存为临时文件并传递文件路径 inp_audio = gr.Audio(sources=["microphone"], type="filepath", label="Or record your idea (clears text box if used)") else: gr.Markdown("**Voice input disabled:** Whisper model failed to load.") # 使用 gr.State 作为占位符,其值为 None inp_audio = gr.State(None) # --- 控制参数 --- # 注意:这些控制参数对小型模型的影响可能不如对大型模型显著 gr.Markdown("*(Optional controls - Note: Their impact might vary on this small model)*") # 控制 1: 下拉选择框 inp_style = gr.Dropdown(label="Base Style", choices=style_options, value="cinematic") # 控制 2: 单选按钮组 inp_quality = gr.Radio(label="Quality Boost", choices=quality_options, value="highly detailed") # 控制 3: 文本框 (负面提示词) inp_neg_prompt = gr.Textbox(label="Negative Prompt (optional)", placeholder="e.g., blurry, low quality, text, watermark, signature, deformed") # 控制 4: 滑块 (Guidance Scale) inp_guidance = gr.Slider(minimum=1.0, maximum=max_guidance, step=0.5, value=default_guidance, label="Guidance Scale (CFG)") # 降低最大值和默认值 # 控制 5: 滑块 (Inference Steps) - 调整最大值和默认值 inp_steps = gr.Slider(minimum=5, maximum=max_steps, step=1, value=default_steps, label=f"Inference Steps (lower = faster but less detail, max {max_steps})") # 调整最小值、最大值和默认值 # --- 操作按钮 --- # 如果模型加载失败 (是 DummyPipe),则禁用按钮 btn_generate = gr.Button("Generate Image", variant="primary", interactive=not isinstance(image_generator_pipe, DummyPipe)) with gr.Column(scale=1): # --- 输出控件 --- out_prompt = gr.Textbox(label="Generated Prompt / Status", interactive=False, lines=5) # 显示提示词或错误状态 out_image = gr.Image(label="Generated Image", type="pil", show_label=True) # 确保显示标签 # --- 事件处理 --- # 仔细定义输入列表,处理可能不可见的音频输入控件 inputs_list = [inp_text] # 如果 ASR 可用,将 inp_audio 加入输入列表 if asr_pipeline: inputs_list.append(inp_audio) else: # 如果 ASR 不可用,将 gr.State(None) 占位符加入输入列表 inputs_list.append(inp_audio) inputs_list.extend([inp_style, inp_quality, inp_neg_prompt, inp_guidance, inp_steps]) # 将按钮点击事件连接到主处理函数 btn_generate.click( fn=process_input, inputs=inputs_list, outputs=[out_prompt, out_image] ) # 如果使用了音频输入,则清空文本输入框 (仅当 ASR 可用时) if asr_pipeline: def clear_text_on_audio_change(audio_data: Optional[str]) -> Union[str, gr.update]: # 检查 audio_data 是否不是 None 或空 if audio_data is not None: print("Audio input detected, clearing text box.") return "" # 清空文本框 # 如果 audio_data 变为 None (例如,录音被清除),则不改变文本框 return gr.update() # .change 事件在值改变时触发,包括变为 None (如果控件支持) inp_audio.change(fn=clear_text_on_audio_change, inputs=inp_audio, outputs=inp_text, api_name="clear_text_on_audio") # ---- Application Launch ---- if __name__ == "__main__": # 最终检查并打印警告,基于 image_generator_pipe 是否为 DummyPipe if isinstance(image_generator_pipe, DummyPipe): print("\n" + "="*50) print("CRITICAL WARNING:") print(f"Image generation model ({model_id}) failed to load during startup.") print("The Gradio UI will launch, but the 'Generate Image' button will be disabled.") print("Check the Space logs above for the specific model loading error.") print("="*50 + "\n") # 启动 Gradio 应用 # 在 Hugging Face Spaces 中,需要监听 0.0.0.0 和 7860 端口 demo.launch(share=False, server_name="0.0.0.0", server_port=7860)