Spaces:
Running
Running
import gradio as gr | |
from PIL import Image | |
from moviepy.editor import VideoFileClip, AudioFileClip | |
import os | |
from openai import OpenAI | |
import subprocess | |
from pathlib import Path | |
import uuid | |
import tempfile | |
import shlex | |
import shutil | |
import logging | |
import traceback # For detailed error logging | |
# --- Configuration --- | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
# Supported models configuration | |
MODELS = { | |
# Format: "Display Name": {"base_url": "...", "env_key": "...", "model_name_for_api": "..."} | |
# Add your models here | |
"deepseek-ai/DeepSeek-V3": { | |
"base_url": "https://api.deepseek.com/v1", | |
"env_key": "DEEPSEEK_API_KEY", | |
"model_name_for_api": "deepseek-chat", # Use the specific model name required by DeepSeek API | |
}, | |
"Qwen/Qwen2.5-Coder-32B-Instruct": { | |
"base_url": "https://api-inference.huggingface.co/v1/", # Check if correct for chat completions | |
"env_key": "HF_TOKEN", | |
# Note: HF Inference API might use a different endpoint or format for chat completions. | |
# This base URL might be for text-generation. Adjust if needed. | |
# Also, the model name might need /chat/completions appended or similar. | |
"model_name_for_api": "Qwen/Qwen2.5-Coder-32B-Instruct", # Usually the model ID on HF | |
}, | |
# Example using a local server (like LM Studio, Ollama) | |
# "Local Model (via Ollama)": { | |
# "base_url": "http://localhost:11434/v1", # Ollama's OpenAI-compatible endpoint | |
# "env_key": "OLLAMA_API_KEY", # Often not needed, use "NONE" or similar if no key | |
# "model_name_for_api": "qwen:14b", # The specific model name served by Ollama | |
# }, | |
} | |
# Allowed media file extensions | |
allowed_medias = [ | |
".png", ".jpg", ".webp", ".jpeg", ".tiff", ".bmp", ".gif", ".svg", | |
".mp3", ".wav", ".ogg", ".aac", ".flac", ".m4a", | |
".mp4", ".avi", ".mov", ".mkv", ".flv", ".wmv", ".webm", ".mpg", ".mpeg", ".m4v", | |
".3gp", ".3g2", ".3gpp", | |
] | |
# --- Global Variables --- | |
client = None | |
initial_model_choice = None | |
# --- Helper Functions --- | |
def get_first_available_key_config(): | |
"""Finds the first model config with a valid API key in environment variables.""" | |
for model_display_name, config in MODELS.items(): | |
api_key = os.environ.get(config["env_key"]) | |
# Treat empty string "" as missing key, handle potential "NONE" placeholder | |
if api_key and api_key.upper() != "NONE": | |
logging.info(f"Found valid API key for model: {model_display_name}") | |
return model_display_name, config | |
logging.warning("No valid API keys found in environment variables for any configured models.") | |
return None, None | |
def initialize_client(): | |
"""Initializes the OpenAI client with the first available config.""" | |
global client, initial_model_choice | |
initial_model_choice, config = get_first_available_key_config() | |
if config: | |
try: | |
api_key = os.environ.get(config["env_key"]) | |
# Handle case where key is explicitly set to "NONE" or similar for keyless local models | |
effective_api_key = api_key if api_key and api_key.upper() != "NONE" else "required-but-not-used" # Placeholder for local models if needed | |
client = OpenAI( | |
base_url=config["base_url"], | |
api_key=effective_api_key, | |
) | |
logging.info(f"OpenAI client initialized for model: {initial_model_choice} using base_url: {config['base_url']}") | |
except Exception as e: | |
logging.error(f"Failed to initialize OpenAI client: {e}", exc_info=True) | |
client = None | |
initial_model_choice = list(MODELS.keys())[0] # Fallback UI selection | |
else: | |
client = None | |
# Set a default model choice for the UI even if client fails | |
initial_model_choice = list(MODELS.keys())[0] if MODELS else None | |
def get_files_infos(files): | |
"""Extracts metadata from uploaded files, handling potential errors.""" | |
results = [] | |
if not files: | |
return results | |
for file_obj in files: | |
file_path = Path(file_obj.name) | |
info = {"error": None, "original_name": file_path.name} | |
try: | |
info["size"] = os.path.getsize(file_path) | |
# Sanitize filename (used in ffmpeg command) | |
info["name"] = file_path.name.replace(" ", "_") | |
# Validate sanitized name (basic check) | |
if not info["name"] or "/" in info["name"] or "\\" in info["name"]: | |
raise ValueError(f"Invalid sanitized filename generated: '{info['name']}'") | |
file_extension = file_path.suffix.lower() | |
# Video Processing | |
if file_extension in (".mp4", ".avi", ".mov", ".mkv", ".flv", ".wmv", ".webm", ".mpg", ".mpeg", ".m4v", ".3gp", ".3g2", ".3gpp"): | |
info["type"] = "video" | |
try: | |
# Ensure ffmpeg is found by moviepy, handle potential issues | |
if not shutil.which("ffmpeg"): | |
raise FileNotFoundError("ffmpeg command not found in PATH. MoviePy cannot process video/audio.") | |
video = VideoFileClip(str(file_path), verbose=False) | |
info["duration"] = video.duration | |
info["dimensions"] = f"{video.size[0]}x{video.size[1]}" if video.size else "N/A" | |
if video.audio: | |
info["type"] = "video/audio" | |
info["audio_channels"] = video.audio.nchannels if hasattr(video.audio, 'nchannels') else "N/A" | |
video.close() # Release file handle | |
except UnicodeDecodeError as ude: | |
info["error"] = f"Metadata decoding error ({ude}). Duration/dimensions might be missing." | |
logging.warning(f"UnicodeDecodeError processing video '{info['original_name']}': {ude}") | |
except FileNotFoundError as fnf: | |
info["error"] = str(fnf) | |
logging.error(f"FFmpeg not found: {fnf}") | |
except Exception as e: | |
info["error"] = f"Error reading video metadata ({type(e).__name__})." | |
logging.warning(f"Error processing video '{info['original_name']}': {e}", exc_info=False) # Log less verbose traceback for common errors | |
# Audio Processing | |
elif file_extension in (".mp3", ".wav", ".ogg", ".aac", ".flac", ".m4a"): | |
info["type"] = "audio" | |
try: | |
if not shutil.which("ffmpeg"): | |
raise FileNotFoundError("ffmpeg command not found in PATH. MoviePy cannot process video/audio.") | |
audio = AudioFileClip(str(file_path), verbose=False) | |
info["duration"] = audio.duration | |
info["audio_channels"] = audio.nchannels if hasattr(audio, 'nchannels') else "N/A" | |
audio.close() | |
except UnicodeDecodeError as ude: | |
info["error"] = f"Metadata decoding error ({ude}). Duration/channels might be missing." | |
logging.warning(f"UnicodeDecodeError processing audio '{info['original_name']}': {ude}") | |
except FileNotFoundError as fnf: | |
info["error"] = str(fnf) | |
logging.error(f"FFmpeg not found: {fnf}") | |
except Exception as e: | |
info["error"] = f"Error reading audio metadata ({type(e).__name__})." | |
logging.warning(f"Error processing audio '{info['original_name']}': {e}", exc_info=False) | |
# Image Processing | |
elif file_extension in (".png", ".jpg", ".jpeg", ".tiff", ".bmp", ".gif", ".svg", ".webp"): | |
info["type"] = "image" | |
try: | |
with Image.open(file_path) as img: | |
info["dimensions"] = f"{img.size[0]}x{img.size[1]}" | |
except Exception as e: | |
info["error"] = f"Error reading image metadata ({type(e).__name__})." | |
logging.warning(f"Error processing image '{info['original_name']}': {e}", exc_info=False) | |
else: | |
info["type"] = "unknown" | |
info["error"] = "Unsupported file type." | |
logging.warning(f"Unsupported file type: {info['original_name']}") | |
except OSError as ose: | |
info["error"] = f"File system error: {ose}" | |
logging.error(f"OSError accessing file {file_path}: {ose}", exc_info=True) | |
if "name" not in info: info["name"] = info["original_name"].replace(" ", "_") # Ensure sanitized name exists | |
except ValueError as ve: # Catch invalid sanitized name error | |
info["error"] = str(ve) | |
logging.error(f"Filename sanitization error for {info['original_name']}: {ve}") | |
if "name" not in info: info["name"] = f"invalid_name_{uuid.uuid4()}" # Provide a fallback name | |
except Exception as e: | |
info["error"] = f"Unexpected error processing file: {e}" | |
logging.error(f"Unexpected error processing file {file_path}: {e}", exc_info=True) | |
if "name" not in info: info["name"] = info["original_name"].replace(" ", "_") | |
results.append(info) | |
return results | |
def get_completion(prompt, files_info, top_p, temperature, model_choice): | |
"""Generates the FFMPEG command using the selected AI model.""" | |
global client # Ensure we use the potentially updated client | |
if client is None: | |
# This should ideally be caught earlier, but double-check | |
raise gr.Error("API Client not initialized. Cannot contact AI.") | |
if model_choice not in MODELS: | |
raise ValueError(f"Model '{model_choice}' is not found in configuration.") | |
model_config = MODELS[model_choice] | |
model_name_for_api = model_config["model_name_for_api"] | |
# --- Create files info table (Markdown for the AI) --- | |
files_info_string = "| Type | Name (for command) | Dimensions | Duration (s) | Audio Channels | Status |\n" | |
files_info_string += "|------|--------------------|------------|--------------|----------------|--------|\n" | |
valid_files_count = 0 | |
for file_info in files_info: | |
name_for_command = file_info.get("name", "N/A") # Use sanitized name | |
file_type = file_info.get("type", "N/A") | |
dimensions = file_info.get("dimensions", "-") | |
duration_val = file_info.get('duration') | |
duration_str = f"{duration_val:.2f}" if duration_val is not None else "-" | |
audio_ch_val = file_info.get('audio_channels') | |
audio_ch_str = str(audio_ch_val) if audio_ch_val is not None else "-" | |
status = "Error" if file_info.get("error") else "OK" | |
if not file_info.get("error"): | |
valid_files_count += 1 | |
files_info_string += f"| {file_type} | `{name_for_command}` | {dimensions} | {duration_str} | {audio_ch_str} | {status} |\n" | |
if file_info.get("error"): | |
# Provide error details clearly | |
files_info_string += f"| `Error Details` | `{file_info['error'][:100]}` | - | - | - | - |\n" # Truncate long errors | |
if valid_files_count == 0: | |
raise gr.Error("No valid media files could be processed. Please check the file formats or errors.") | |
# --- Construct Messages for the AI --- | |
system_prompt = """You are a highly skilled FFMPEG expert simulating a command-line interface. | |
Given a list of media assets and a user's objective, generate the SIMPLEST POSSIBLE, SINGLE ffmpeg command to achieve the goal. | |
**Input Files:** Use the filenames provided in the 'Name (for command)' column of the asset list. These names have spaces replaced with underscores. | |
**Output File:** The final output MUST be named exactly `output.mp4`. | |
**Output Format:** The final output MUST be a video/mp4 container. | |
**Key Requirements:** | |
1. **Single Command:** Output ONLY the ffmpeg command, on a single line. No explanations, no comments, no introductory text, no code blocks (like ```bash ... ```). | |
2. **Simplicity:** Use the minimum required options. Avoid `-filter_complex` unless absolutely necessary. Prefer direct mapping, simple filters (`-vf`, `-af`), concatenation (`concat` demuxer), etc. | |
3. **Correctness:** Ensure options, filter syntax, and stream mapping are correct. | |
4. **Input Names:** Strictly use the provided sanitized input filenames (e.g., `My_Video.mp4`). | |
5. **Output Name:** End the command with `-y output.mp4` (the `-y` allows overwriting). | |
6. **Handle Errors:** If an asset has an 'Error' status, try to work around it if possible (e.g., ignore a faulty audio stream if only video is needed), or generate a command that likely fails gracefully if the task is impossible without that asset. Do NOT output error messages yourself, just the command. | |
7. **Specific Tasks:** | |
* *Waveform:* If asked for waveform, use `showwaves` filter (e.g., `"[0:a]showwaves=s=1280x100:mode=line,format=pix_fmts=yuv420p[v]"`), map video and audio (`-map "[v]" -map 0:a?`), and consider making audio mono (`-ac 1`) unless stereo is requested. Use video dimensions if provided, otherwise default to something reasonable like 1280x100. | |
* *Image Sequence:* Use `-framerate` and pattern (`img%03d.png`) if applicable. For single images, use `-loop 1 -t duration`. | |
* *Text Overlay:* Use `drawtext` filter. Get position (e.g., `x=(w-text_w)/2:y=h-th-10`), font, size, color from user prompt if possible, otherwise use defaults. | |
* *Concatenation:* Prefer the `concat` demuxer (requires a temporary file list) over the `concat` filter if possible for simple cases without re-encoding. However, since you MUST output a single command, you might need to use the filter (`[0:v][1:v]concat=n=2:v=1[outv]`) if creating a temp file list isn't feasible within the single command constraint. Prioritize simplicity. | |
**Example Output:** | |
ffmpeg -i input_video.mp4 -vf "scale=1280:720" -c:a copy -y output.mp4 | |
**DO NOT include ```bash or ``` anywhere in your response.** Just the raw command. | |
""" | |
user_message_content = f"""Generate the single-line FFMPEG command based on the assets and objective. | |
**AVAILABLE ASSETS:** | |
{files_info_string} | |
**OBJECTIVE:** {prompt} | |
**FFMPEG Command:** | |
""" | |
messages = [ | |
{"role": "system", "content": system_prompt}, | |
{"role": "user", "content": user_message_content}, | |
] | |
try: | |
logging.info(f"Sending request to AI model: {model_name_for_api} at {client.base_url}") | |
# Optional: Log the prompt itself (can be very long) | |
# logging.debug(f"System Prompt:\n{system_prompt}") | |
# logging.debug(f"User Message:\n{user_message_content}") | |
completion = client.chat.completions.create( | |
model=model_name_for_api, | |
messages=messages, | |
temperature=temperature, | |
top_p=top_p, | |
max_tokens=1024, # Adjust token limit as needed | |
) | |
content = completion.choices[0].message.content.strip() | |
logging.info(f"AI Raw Response: '{content}'") | |
# --- Command Validation and Cleaning --- | |
# Remove potential markdown code blocks manually if AI didn't follow instructions | |
if content.startswith("```") and content.endswith("```"): | |
content = re.sub(r"^```(?:bash|sh)?\s*", "", content) | |
content = re.sub(r"\s*```$", "", content) | |
content = content.strip() | |
logging.warning("AI included code blocks despite instructions, attempting cleanup.") | |
# Remove any leading text before "ffmpeg" if necessary | |
ffmpeg_index = content.lower().find("ffmpeg ") | |
if ffmpeg_index > 0: | |
logging.warning(f"AI included leading text, stripping: '{content[:ffmpeg_index]}'") | |
content = content[ffmpeg_index:] | |
elif ffmpeg_index == -1 and not content.lower().startswith("ffmpeg"): | |
logging.error(f"AI response does not contain 'ffmpeg': '{content}'") | |
raise ValueError("AI did not generate a valid ffmpeg command.") | |
# Ensure it ends with the expected output file pattern (flexible space before -y) | |
if not content.rstrip().endswith("-y output.mp4"): | |
logging.warning("AI response doesn't end with '-y output.mp4'. Appending it.") | |
# Append '-y output.mp4' if missing, trying to be robust | |
if content.rstrip().endswith("output.mp4"): | |
content = content.rstrip() + " -y output.mp4" # Add -y if only output.mp4 is there | |
elif not " output.mp4" in content: # Avoid adding if output.mp4 is elsewhere | |
content = content.rstrip() + " -y output.mp4" | |
# Remove potential extra newlines | |
command = content.replace('\n', ' ').replace('\r', '').strip() | |
if not command: | |
raise ValueError("AI generated an empty command string.") | |
logging.info(f"Cleaned AI Command: '{command}'") | |
return command | |
except Exception as e: | |
logging.error(f"Error during AI completion or processing: {e}", exc_info=True) | |
# Try to give a more specific error to the user | |
if "authentication" in str(e).lower(): | |
raise gr.Error(f"AI API Authentication Error. Check your API key ({model_config['env_key']}). Error: {e}") | |
elif "rate limit" in str(e).lower(): | |
raise gr.Error(f"AI API Rate Limit Exceeded. Please try again later. Error: {e}") | |
else: | |
raise gr.Error(f"Failed to get command from AI. Error: {e}") | |
# --- Main Gradio Update Function --- | |
def update( | |
files, | |
prompt, | |
top_p=1, | |
temperature=1, | |
model_choice=None, # Default to None, will use initial_model_choice | |
): | |
"""Handles the main logic: file processing, AI call, FFMPEG execution.""" | |
# *** Fix: Declare global client at the beginning *** | |
global client | |
# Use initial choice if none provided (e.g., from direct call) | |
if model_choice is None: | |
model_choice = initial_model_choice | |
# --- Input Validations --- | |
if not files: | |
raise gr.Error("β Please upload at least one media file.") | |
if not prompt: | |
raise gr.Error("π Please enter editing instructions (prompt).") | |
if not model_choice or model_choice not in MODELS: | |
raise gr.Error(f"β Invalid model selected: {model_choice}. Please choose from the list.") | |
# --- Check FFMPEG Availability --- | |
if not shutil.which("ffmpeg"): | |
error_msg = "β FFMPEG command not found in system PATH. This application requires FFMPEG to be installed and accessible." | |
logging.error(error_msg) | |
raise gr.Error(error_msg) | |
# --- Check and potentially update API client --- | |
model_config = MODELS[model_choice] | |
api_key_env_var = model_config["env_key"] | |
api_key = os.environ.get(api_key_env_var) | |
effective_api_key = api_key if api_key and api_key.upper() != "NONE" else "required-but-not-used" | |
# Check if key is missing (and not intentionally "NONE") | |
if not api_key and effective_api_key != "required-but-not-used": | |
raise gr.Error(f"π API Key ({api_key_env_var}) for the selected model '{model_choice}' is missing. Please set it as an environment variable.") | |
# Initialize or update client if needed | |
if client is None: | |
logging.warning(f"Client was None, attempting re-initialization for model: {model_choice}") | |
try: | |
client = OpenAI(base_url=model_config["base_url"], api_key=effective_api_key) | |
logging.info(f"API Client initialized/updated for model: {model_choice}") | |
except Exception as e: | |
logging.error(f"Failed to initialize API client: {e}", exc_info=True) | |
raise gr.Error(f"Failed to initialize API client: {e}") | |
# If client exists, check if base_url or key needs update for the selected model | |
elif client.base_url != model_config["base_url"] or client.api_key != effective_api_key: | |
logging.info(f"Updating API client configuration for selected model: {model_choice}") | |
client.base_url = model_config["base_url"] | |
client.api_key = effective_api_key | |
# --- Get File Infos and Check for Errors --- | |
logging.info("Processing uploaded files...") | |
files_info = get_files_infos(files) | |
file_errors = [f"- '{f.get('original_name', 'Unknown file')}': {f['error']}" | |
for f in files_info if f.get("error")] | |
if file_errors: | |
error_message = "β οΈ Errors occurred while processing uploaded files:\n" + "\n".join(file_errors) | |
logging.error(error_message) | |
# Allow proceeding if *some* files are okay, but warn the user. | |
# Let the AI decide how to handle the errored files based on the prompt. | |
# If *all* files have errors, then raise the error. | |
if len(file_errors) == len(files_info): | |
raise gr.Error(error_message + "\n\nCannot proceed as no files could be read.") | |
else: | |
gr.Warning(error_message + "\n\nAttempting to proceed with valid files. The AI will be informed about the errors.") | |
# --- Validate File Sizes and Durations (Optional limits) --- | |
for file_info in files_info: | |
if not file_info.get("error"): # Only check valid files | |
if "size" in file_info and file_info["size"] > 1024 * 1024 * 1024: # 150MB limit | |
raise gr.Error(f"File '{file_info.get('original_name')}' ({file_info['size'] / (1024*1024):.1f}MB) exceeds the 150MB size limit.") | |
# if file_info.get("type", "").startswith("video") and "duration" in file_info and file_info["duration"] > 3000: # 5 minute limit for videos | |
# raise gr.Error(f"Video '{file_info.get('original_name')}' ({file_info['duration']:.0f}s) exceeds the 50-minute duration limit.") | |
# --- Get FFMPEG Command from AI --- | |
command_string = None | |
try: | |
logging.info(f"Getting FFMPEG command from AI model: {model_choice}") | |
command_string = get_completion( | |
prompt, files_info, top_p, temperature, model_choice | |
) | |
except gr.Error as e: | |
raise e # Propagate Gradio errors directly | |
except Exception as e: | |
logging.error(f"Failed to get command from AI: {e}", exc_info=True) | |
raise gr.Error(f"Failed to get or process command from AI. Error: {e}") | |
if not command_string: | |
raise gr.Error("AI returned an empty command. Please try again or rephrase.") | |
# --- Prepare Temporary Directory and Execute FFMPEG --- | |
# Using 'with' ensures cleanup even if errors occur | |
with tempfile.TemporaryDirectory() as temp_dir: | |
logging.info(f"Created temporary directory: {temp_dir}") | |
final_output_location = None # Path to the final video outside temp dir | |
try: | |
# Copy necessary files to temp dir using sanitized names | |
logging.info("Copying files to temporary directory...") | |
input_file_mapping = {} # Map sanitized name to original path if needed | |
for i, file_obj in enumerate(files): | |
file_info = files_info[i] | |
# Only copy files that were processed without error | |
if not file_info.get("error"): | |
original_path = Path(file_obj.name) | |
sanitized_name = file_info['name'] | |
destination_path = Path(temp_dir) / sanitized_name | |
try: | |
shutil.copy(original_path, destination_path) | |
logging.info(f"Copied '{original_path.name}' -> '{destination_path}'") | |
input_file_mapping[sanitized_name] = original_path | |
except Exception as copy_err: | |
logging.error(f"Failed to copy file {original_path} to {destination_path}: {copy_err}") | |
# Raise error as ffmpeg will fail if inputs are missing | |
raise gr.Error(f"Failed to prepare input file: {original_path.name}. Error: {copy_err}") | |
# --- Parse and Validate FFMPEG Command --- | |
try: | |
# Split command string safely | |
args = shlex.split(command_string) | |
except ValueError as e: | |
logging.error(f"Command syntax error: {e}. Command: {command_string}") | |
raise gr.Error(f"Generated command has syntax errors (e.g., unbalanced quotes): {e}\nCommand: {command_string}") | |
if not args or args[0].lower() != "ffmpeg": | |
raise gr.Error(f"Generated command does not start with 'ffmpeg'. Command: {command_string}") | |
# --- Prepare Final Command Arguments --- | |
# Define the actual temporary output path *inside* the temp dir | |
temp_output_file_name = f"output_{uuid.uuid4()}.mp4" | |
temp_output_path = str(Path(temp_dir) / temp_output_file_name) | |
# Replace the placeholder 'output.mp4' with the actual temp output path | |
final_args = [] | |
output_placeholder_found = False | |
for arg in args: | |
if arg == "output.mp4": | |
# Check if it's preceded by -y, if not, add -y | |
if final_args and final_args[-1] != "-y": | |
final_args.append("-y") | |
final_args.append(temp_output_path) | |
output_placeholder_found = True | |
else: | |
final_args.append(arg) | |
# If AI forgot output.mp4, add it (shouldn't happen with good prompting) | |
if not output_placeholder_found: | |
logging.warning("AI command did not include 'output.mp4'. Appending target output path.") | |
if final_args[-1] != "-y": | |
final_args.append("-y") | |
final_args.append(temp_output_path) | |
# --- Execute FFMPEG --- | |
logging.info(f"Executing FFMPEG: {' '.join(final_args)}") | |
try: | |
process = subprocess.run( | |
final_args, | |
cwd=temp_dir, # Execute in the directory with copied files | |
capture_output=True, # Captures stdout and stderr | |
text=True, | |
encoding='utf-8', errors='replace', | |
check=True, # Raise CalledProcessError if return code is non-zero | |
) | |
logging.info("FFMPEG command executed successfully.") | |
# Log stderr as it often contains useful info/warnings | |
if process.stderr: logging.info(f"FFMPEG stderr:\n{process.stderr}") | |
# Log stdout only if needed for debugging | |
if process.stdout: logging.debug(f"FFMPEG stdout:\n{process.stdout}") | |
except subprocess.CalledProcessError as e: | |
error_output = e.stderr or e.stdout or "No output captured." | |
logging.error(f"FFMPEG execution failed! Return code: {e.returncode}\nCommand: {' '.join(e.cmd)}\nOutput:\n{error_output}") | |
error_summary = error_output.strip().split('\n')[-1] # Get last line | |
raise gr.Error(f"β FFMPEG execution failed: {error_summary}\n(Check logs/console for full command and error details)") | |
except subprocess.TimeoutExpired as e: | |
logging.error(f"FFMPEG command timed out after {e.timeout} seconds.\nCommand: {' '.join(e.cmd)}") | |
raise gr.Error(f"β³ FFMPEG command timed out after {e.timeout} seconds. The operation might be too complex or files too large.") | |
except FileNotFoundError as e: | |
# This should be caught earlier, but double-check | |
logging.error(f"FFMPEG command failed: {e}. Is ffmpeg installed and in PATH?") | |
raise gr.Error(f"β FFMPEG execution failed: '{e.filename}' not found. Ensure FFMPEG is installed and accessible.") | |
# --- Copy Result Out of Temp Directory --- | |
if Path(temp_output_path).exists() and os.path.getsize(temp_output_path) > 0: | |
# Create an output directory if it doesn't exist | |
output_dir = Path("./output_videos") | |
output_dir.mkdir(parents=True, exist_ok=True) | |
# Copy to a filename based on UUID to avoid collisions | |
final_output_location = shutil.copy(temp_output_path, output_dir / f"{Path(temp_output_path).stem}.mp4") | |
logging.info(f"Copied final output video to: {final_output_location}") | |
else: | |
logging.error(f"FFMPEG seemed to succeed, but output file '{temp_output_path}' is missing or empty.") | |
raise gr.Error("β FFMPEG finished, but the output file was not created or is empty. Check the generated command and logs.") | |
# --- Prepare Display Command (using original placeholder) --- | |
display_command_markdown = f"### Generated Command\n```bash\n{command_string}\n```" | |
# --- Return Results --- | |
return final_output_location, gr.update(value=display_command_markdown) | |
except Exception as e: | |
# Catch any other unexpected errors during setup or execution within the temp dir | |
logging.error(f"Error during processing: {e}", exc_info=True) | |
# No need to manually cleanup temp_dir, 'with' handles it | |
if isinstance(e, gr.Error): raise e # Re-raise Gradio errors | |
else: raise gr.Error(f"An unexpected error occurred: {e}") | |
# --- Initialize Client on Startup --- | |
initialize_client() | |
if client is None and initial_model_choice: | |
logging.warning("Application starting without a functional AI client due to initialization errors or missing keys.") | |
# Consider showing a warning in the UI if possible, or rely on errors during `update` | |
# --- Gradio Interface Definition --- | |
with gr.Blocks(title="AI Video Editor - Edit with Natural Language", theme=gr.themes.Soft(primary_hue=gr.themes.colors.sky)) as demo: | |
gr.Markdown( | |
""" | |
# ποΈ AI Video Editor: Your Smart Editing Assistant π¬ | |
Welcome to the AI Video Editor! This tool uses AI models like **DeepSeek-V3** or **Qwen** to understand your editing needs in plain English. | |
Upload your media, describe the desired result, and the AI generates the **FFMPEG command** to create your video. | |
**No complex software needed!** Ideal for quick edits, learning FFMPEG, or automating simple video tasks. Trim, merge, add text, change speed, apply filters, combine media β just tell the AI! | |
**Get started:** Upload files, type instructions, click **"π Run Edit"**! | |
""", | |
elem_id="header", | |
) | |
with gr.Accordion("π Usage Instructions & Examples", open=False): | |
gr.Markdown( | |
""" | |
### How to Use | |
1. **Upload Files**: Use the "Upload Media Files" area. | |
2. **Write Instructions**: Describe the edit in the "Instructions" box. | |
3. **(Optional) Adjust Parameters**: Select AI model, tweak Top-p/Temperature for creativity. | |
4. **Generate**: Click **"π Run Edit"**. | |
5. **Review**: Watch the result in "Generated Video Output". The FFMPEG command used appears below. | |
### Example Instructions | |
* `Trim the video to keep only the segment from 10s to 25s.` | |
* `Concatenate video1.mp4 and video2.mp4.` | |
* `Add text "Hello World" at the bottom center, white font, size 24.` | |
* `Convert video to black and white.` | |
* `Create slideshow from image1.jpg, image2.png (5s each) with background.mp3.` | |
* `Resize video to 1280x720.` | |
* `Speed up video 2x.` | |
* `Generate waveform visualization for the audio file, 1280x120 pixels.` | |
### Tips | |
* **Be Specific**: "remove first 5 seconds" is better than "make shorter". | |
* **Use Filenames**: Refer to files like `Combine intro.mp4 and main.mp4` (AI uses names with underscores). | |
* **Details Matter**: For text, specify position, color, size. For fades, mention duration. | |
* **Keep it Simple**: One main goal per instruction works best. | |
""" | |
) | |
with gr.Row(): | |
with gr.Column(scale=1): | |
user_files = gr.File( | |
file_count="multiple", | |
label="π€ Upload Media Files", | |
file_types=allowed_medias, | |
) | |
user_prompt = gr.Textbox( | |
placeholder="e.g., 'Combine video1.mp4 and video2.mp4'", | |
label="π Instructions / Editing Objective", | |
lines=3, | |
) | |
with gr.Accordion("βοΈ Advanced Parameters", open=False): | |
# Ensure initial_model_choice is valid before setting value | |
valid_initial_model = initial_model_choice if initial_model_choice in MODELS else (list(MODELS.keys())[0] if MODELS else None) | |
model_choice_dropdown = gr.Dropdown( # Changed to Dropdown for better UI with many models | |
choices=list(MODELS.keys()), | |
value=valid_initial_model, | |
label="π§ Select AI Model", | |
) | |
top_p_slider = gr.Slider( | |
minimum=0.0, maximum=1.0, value=0.7, step=0.05, | |
label="Top-p (Diversity)", info="Lower values = more focused, higher = more random." | |
) | |
temperature_slider = gr.Slider( | |
minimum=0.0, maximum=2.0, value=0.2, step=0.1, # Default lower temp for more predictable ffmpeg | |
label="Temperature (Randomness)", info="Lower values = more deterministic, higher = more creative/random." | |
) | |
run_button = gr.Button("π Run Edit", variant="primary") | |
with gr.Column(scale=1): | |
generated_video_output = gr.Video( | |
label="π¬ Generated Video Output", | |
interactive=False, | |
include_audio=True, | |
) | |
generated_command_output = gr.Markdown(label="π» Generated FFMPEG Command") | |
# --- Event Handling --- | |
run_button.click( | |
fn=update, | |
inputs=[user_files, user_prompt, top_p_slider, temperature_slider, model_choice_dropdown], | |
outputs=[generated_video_output, generated_command_output], | |
api_name="generate_edit" | |
) | |
# --- Examples --- | |
# IMPORTANT: Update example file paths relative to where you run the script! | |
# Create an 'examples' folder or adjust paths. | |
example_list = [ | |
[ | |
["./examples/video1.mp4"], # Make sure this path exists | |
"Add text 'Watermark' to the top right corner, white font, size 40, slightly transparent.", | |
0.7, 0.2, list(MODELS.keys())[0] if MODELS else None, | |
], | |
[ | |
["./examples/video2.mp4"], | |
"Cut the video to keep only 3 seconds, starting from 00:00:02.", | |
0.7, 0.2, list(MODELS.keys())[min(1, len(MODELS)-1)] if len(MODELS) > 1 else (list(MODELS.keys())[0] if MODELS else None), | |
], | |
[ | |
["./examples/video2.mp4"], # Make sure this path exists | |
"Convert the video to grayscale (black and white).", | |
0.7, 0.2, list(MODELS.keys())[0] if MODELS else None, | |
], | |
[ | |
["./examples/image1.jpg", "./examples/image2.jpg"], # Make sure paths exist | |
"Create a slideshow: image1.jpg for 3s, then image2.jpg for 3s. Output size 480x720.", | |
0.7, 0.2, list(MODELS.keys())[0] if MODELS else None, | |
], | |
] | |
# Filter out examples if no models are configured | |
valid_examples = [ex for ex in example_list if ex[4] is not None] | |
if valid_examples: | |
gr.Examples( | |
examples=valid_examples, | |
inputs=[user_files, user_prompt, top_p_slider, temperature_slider, model_choice_dropdown], | |
outputs=[generated_video_output, generated_command_output], | |
fn=update, | |
cache_examples=True, # Keep False unless examples are very stable and slow | |
label="β¨ Example Use Cases (Click to Run)", | |
run_on_click=False, | |
) | |
else: | |
gr.Markdown("_(Examples disabled as no models seem to be configured with API keys)_") | |
# Footer removed as requested | |
# --- Launch the App --- | |
if __name__ == "__main__": | |
# Set concurrency limit based on resources | |
demo.queue(default_concurrency_limit=20) | |
# Launch on 0.0.0.0 to make accessible on network if needed | |
# demo.launch(show_api=False, server_name="0.0.0.0") | |
demo.launch(show_api=False) # Default for local/Hugging Face Spaces |