diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -1,2107 +1,353 @@ - - -from kokoro import KPipeline # Keep Kokoro separate as it's not from moviepy - -import soundfile as sf -import torch - -from PIL import Image +# Import necessary libraries +import gradio as gr +import os +import shutil import tempfile import random -import cv2 -import math -import os, requests, io, time, re, random +import requests +import soundfile as sf from moviepy.editor import ( - VideoFileClip, # Corrected typo here - concatenate_videoclips, - AudioFileClip, - ImageClip, - CompositeVideoClip, - TextClip, - CompositeAudioClip, - ColorClip # Included ColorClip in the main import + VideoFileClip, concatenate_videoclips, AudioFileClip, ImageClip, CompositeVideoClip, TextClip ) -import moviepy.video.fx.all as vfx # Keep this separate for fx effects -import moviepy.config as mpy_config -from pydub import AudioSegment -from pydub.generators import Sine - -import numpy as np -from bs4 import BeautifulSoup -import base64 -from urllib.parse import quote -# pysrt is imported but not used in the provided code snippets, keeping for completeness -# import pysrt +import moviepy.video.fx.all as vfx +from kokoro import KPipeline from gtts import gTTS -import gradio as gr # Import Gradio -import shutil # Needed for temp folder cleanup -import subprocess # Needed for sudo commands in fix_imagemagick_policy - +from pydub import AudioSegment +import math +import re +from PIL import Image # Initialize Kokoro TTS pipeline (using American English) -# Ensure you have the required voice models downloaded for Kokoro if needed, -# or it will fall back to gTTS. 'a' for American English uses voice 'af_heart'. -# Add a flag to check if Kokoro initialized successfully -kokoro_initialized = False -pipeline = None # Initialize pipeline to None -try: - # Check if the required voice model is available or if it needs downloading - # Depending on Kokoro version/setup, this might implicitly check/download - # If Kokoro initialization itself is problematic, this try/except will catch it - pipeline = KPipeline(lang_code='a') # 'a' is often mapped to 'af_heart' or similar US voice - kokoro_initialized = True - print("Kokoro TTS pipeline initialized successfully.") -except Exception as e: - print(f"Warning: Could not initialize Kokoro TTS pipeline: {e}. Will rely on gTTS.") - # pipeline remains None - - -# Ensure ImageMagick binary is set (Adjust path as needed for your system) -# This line requires imagemagick to be installed and the path correct. -# If TextClip fails, check ImageMagick installation and policy.xml (handled by fix_imagemagick_policy). -# Common paths: "/usr/bin/convert", "/usr/local/bin/convert", "C:\\Program Files\\ImageMagick-X.Y.Z-Q16\\convert.exe" -# You might need to adjust this based on your OS and installation -IMAGICK_BINARY_DEFAULT_PATH = "/usr/bin/convert" # Default path, check your system -# Add more common paths to check -common_imagemagick_paths = [ - "/usr/bin/convert", - "/usr/local/bin/convert", - "/opt/homebrew/bin/convert", # Homebrew on macOS ARM - "/usr/local/opt/imagemagick/bin/convert", # Older Homebrew - "C:\\Program Files\\ImageMagick-X.Y.Z-Q16\\convert.exe", # Windows example, adjust version - # Add other paths as needed for your environment -] - -found_imagemagick_binary = None -for path in common_imagemagick_paths: - # Check if path is not None or empty before checking existence - if path and os.path.exists(path): - found_imagemagick_binary = path - break - -if found_imagemagick_binary: - print(f"Found ImageMagick binary at: {found_imagemagick_binary}") - mpy_config.change_settings({"IMAGEMAGICK_BINARY": found_imagemagick_binary}) -else: - print("Warning: ImageMagick binary 'convert' not found in common locations.") - print("TextClip may fail. Please install ImageMagick or update the IMAGICK_BINARY setting if it's installed elsewhere.") - # Still try to set a default path, though it might be wrong - mpy_config.change_settings({"IMAGEMAGICK_BINARY": IMAGICK_BINARY_DEFAULT_PATH}) - +pipeline = KPipeline(lang_code='a') -# ---------------- Global Configuration ---------------- # -# Using the user's provided API keys +# Global Configuration PEXELS_API_KEY = 'BhJqbcdm9Vi90KqzXKAhnEHGsuFNv4irXuOjWtT761U49lRzo03qBGna' OPENROUTER_API_KEY = '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' -OPENROUTER_MODEL = "mistralai/mistral-small-3.1-24b-instruct:free" # Or another preferred model +OPENROUTER_MODEL = "mistralai/mistral-small-3.1-24b-instruct:free" OUTPUT_VIDEO_FILENAME = "final_video.mp4" USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" -# Maximum number of script segments to display for editing -MAX_SEGMENTS_FOR_EDITING = 15 # Limit for UI manageability - -# Global placeholder for the temporary folder, will be created per run -TEMP_FOLDER = None - -# ---------------- Helper Functions ---------------- # -# Most helper functions remain the same, but some might need minor adjustments -# to use passed parameters instead of global state (e.g., resolution). -# We'll update create_clip and add_background_music accordingly. - -def generate_script(user_input, api_key, model): - """Generate documentary script with proper OpenRouter handling.""" - if not api_key or api_key.startswith('YOUR_OPENROUTER_API_KEY'): - print("OpenRouter API key not set or is placeholder. Skipping script generation.") - return "[Error] API key not configured." - +# Helper Functions +def generate_script(user_input): + """Generate a documentary script using OpenRouter API.""" headers = { - 'Authorization': f'Bearer {api_key}', - 'HTTP-Referer': 'https://your-domain.com', # Replace with your app's domain + 'Authorization': f'Bearer {OPENROUTER_API_KEY}', + 'HTTP-Referer': 'https://your-domain.com', 'X-Title': 'AI Documentary Maker' } - - # Updated prompt based on user's specific requirements for tone and style - prompt = f"""Short Documentary Script Generator - -Instructions: + prompt = f"""Short Documentary Script GeneratorInstructions: If I say "use this," output the script exactly as given. -If I only give topics, generate a script based on them. -If I provide a full script, rewrite it without any changes. -Make everything short, simple, humorous, funny, and act serious but be humorous. -Don't say anything off topic. -Always end with a funny call to subscribe based on the video topic. -Use normal conversational text, avoid AI phrases, humanize the statements. -Always focus on negative or cynical views, avoid positive statements. -The full script should be for a single video about one topic. - +If I give topics, generate a script based on them. +If I provide a full script, rewrite it unchanged. Keep it short, simple, humorous, and serious but funny. Use normal conversational text. Formatting Rules: -Title in Square Brackets: Each section starts with a one-word title inside [ ] (max two words if necessary). This title will be used as a search term for Pexels footage. -Casual & Funny Narration: Each section has 5-15 words of narration. Keep it natural, funny, and unpredictable. -No Special Formatting: No bold, italics, or special characters. -Generalized Search Terms: If a term is too specific, make it more general for Pexels search. -Scene-Specific Writing: Each section describes only what should be shown in the video. -Output Only the Script, make it funny/humorous/hilarious, and add a funny subscribe statement. -No extra text, just the script. - -Example Output: -[Cats] -They plot world domination while napping. -[Dogs] -Loyalty is just a bribe for snacks. -[Humans] -The only species that pays to live on a planet they destroy. -[Future] -It looks suspiciously like the present, but with more screens. -[Warning] -Subscribe or a cat will steal your bandwidth. - -Now here is the Topic/script: {user_input} +- Title in square brackets: [Title] +- Each section starts with a one-word title in [ ] (max two words). +- Narration: 5-10 words, casual, funny, unpredictable. +- No special formatting, just script text. +- Generalized search terms for Pexels. +- End with a funny subscribe statement. +Example: +[North Korea] +Top 5 unknown facts about North Korea. +[Invisibility] +North Korea’s internet speed doesn’t exist. +[Leadership] +Kim Jong-un won 100% votes… against himself. +[Subscribe] +Subscribe, or Kim sends you a ticket to nowhere. +Topic: {user_input} """ - - data = { - 'model': model, + 'model': OPENROUTER_MODEL, 'messages': [{'role': 'user', 'content': prompt}], - 'temperature': 0.7, # Increased temperature slightly for more unpredictable humor - 'max_tokens': 500 # Limit token response to keep scripts short + 'temperature': 0.4, + 'max_tokens': 5000 } - try: - response = requests.post( - 'https://openrouter.ai/api/v1/chat/completions', - headers=headers, - json=data, - timeout=45 # Increased timeout - ) - - response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) - - response_data = response.json() - if 'choices' in response_data and len(response_data['choices']) > 0: - script_text = response_data['choices'][0]['message']['content'] - # Basic post-processing to remove potential markdown code blocks - if script_text.startswith("```") and script_text.endswith("```"): - # Find the first and last ``` lines - first_code_block = script_text.find("```") - last_code_block = script_text.rfind("```") - if first_code_block != -1 and last_code_block != -1 and first_code_block < last_code_block: - # Extract content between the markers, removing the language specifier line if present - content_start = script_text.find('\n', first_code_block) + 1 - content_end = last_code_block - script_text = script_text[content_start:content_end].strip() - else: # Simple case, remove from start and end - script_text = script_text.strip("` \n") - - return script_text - else: - print("Unexpected response format:", response_data) - return "[Error] Unexpected API response format." - - except requests.exceptions.RequestException as e: - print(f"API Request failed: {str(e)}") - return f"[Error] API request failed: {str(e)}" + response = requests.post('https://openrouter.ai/api/v1/chat/completions', headers=headers, json=data, timeout=30) + response.raise_for_status() + return response.json()['choices'][0]['message']['content'] except Exception as e: - print(f"An unexpected error occurred during script generation: {e}") - return f"[Error] An unexpected error occurred: {str(e)}" - + print(f"Script generation failed: {e}") + return None def parse_script(script_text): - """ - Parse the generated script into a list of segment dictionaries. - Each dictionary includes original prompt, narration text, estimated duration, and placeholder for uploaded media. - Handles potential API errors returned as strings. - """ - if script_text.startswith("[Error]"): - print(f"Skipping parse due to script generation error: {script_text}") - return [] - - segments = [] - current_title = None - current_text = "" - + """Parse script into a list of elements with media prompts and TTS text.""" + elements = [] + lines = script_text.splitlines() + for i in range(0, len(lines), 2): + if i + 1 < len(lines) and lines[i].startswith('[') and lines[i].endswith(']'): + title = lines[i][1:-1].strip() + text = lines[i + 1].strip() + if title and text: + elements.append({'type': 'media', 'prompt': title}) + elements.append({'type': 'tts', 'text': text, 'voice': 'en'}) + return elements + +def search_pexels_videos(query, api_key): + """Search Pexels for a random HD video.""" + headers = {'Authorization': api_key} + params = {"query": query, "per_page": 15} try: - lines = script_text.strip().splitlines() - if not lines: - print("Script text is empty.") - return [] - - for line in lines: - line = line.strip() - if line.startswith("[") and "]" in line: - bracket_start = line.find("[") - bracket_end = line.find("]", bracket_start) # Corrected line here - if bracket_start != -1 and bracket_end != -1: - # Add previous segment if title and text are found - if current_title is not None and current_text.strip(): - # Estimate duration based on word count (adjust factor as needed) - duration = max(2.0, len(current_text.split()) * 0.4) # Minimum 2s, approx 0.4s per word - segments.append({ - "original_prompt": current_title.strip(), - "text": current_text.strip(), - "duration": duration, - "uploaded_media": None # Placeholder for user uploaded file path - }) - current_title = line[bracket_start+1:bracket_end].strip() - current_text = line[bracket_end+1:].strip() - elif current_title: # Append text if no new title found but currently parsing a segment - current_text += line + " " - elif current_title: # Append text to the current segment - current_text += line + " " - # Ignore lines before the first [Title] - - # Add the last segment - if current_title is not None and current_text.strip(): - duration = max(2.0, len(current_text.split()) * 0.4) - segments.append({ - "original_prompt": current_title.strip(), - "text": current_text.strip(), - "duration": duration, - "uploaded_media": None - }) - - # Limit segments to MAX_SEGMENTS_FOR_EDITING - if len(segments) > MAX_SEGMENTS_FOR_EDITING: - print(f"Warning: Script generated {len(segments)} segments, limiting to {MAX_SEGMENTS_FOR_EDITING} for editing.") - segments = segments[:MAX_SEGMENTS_FOR_EDITING] - - print(f"Parsed {len(segments)} segments.") - return segments + response = requests.get("https://api.pexels.com/videos/search", headers=headers, params=params, timeout=10) + response.raise_for_status() + videos = response.json().get("videos", []) + hd_videos = [v["video_files"][0]["link"] for v in videos if v["video_files"] and v["video_files"][0]["quality"] == "hd"] + return random.choice(hd_videos) if hd_videos else None except Exception as e: - print(f"Error parsing script: {e}") - return [] - -# Pexels and Google Image search and download functions remain unchanged -# Using the global PEXELS_API_KEY directly now. -def search_pexels_videos(query): - """Search for a video on Pexels by query and return a random HD video.""" - if not PEXELS_API_KEY or PEXELS_API_KEY.startswith('YOUR_PEXELS_API_KEY'): - print("Pexels API key not set or is placeholder. Skipping video search.") - return None - headers = {'Authorization': PEXELS_API_KEY} - base_url = "https://api.pexels.com/videos/search" - num_pages = 3 - videos_per_page = 15 - max_retries = 2 # Reduced retries for faster failure - retry_delay = 1 - - search_query = query - all_videos = [] - - for page in range(1, num_pages + 1): - for attempt in range(max_retries): - try: - params = {"query": search_query, "per_page": videos_per_page, "page": page, "orientation": "landscape"} # Added orientation - response = requests.get(base_url, headers=headers, params=params, timeout=10) - - if response.status_code == 200: - data = response.json() - videos = data.get("videos", []) - - # Filter for HD videos first, then fallback to other qualities - hd_videos_on_page = [] - other_videos_on_page = [] - for video in videos: - video_files = video.get("video_files", []) - # Sort video files by quality preference if possible - video_files_sorted = sorted(video_files, key=lambda x: {'hd': 0, 'sd': 1}.get(x.get('quality'), 2)) - - for file in video_files_sorted: - link = file.get("link") - quality = file.get("quality") - if link: - if quality == "hd": - hd_videos_on_page.append(link) - break # Found the best quality for this video entry - else: - other_videos_on_page.append(link) - # Don't break, keep looking for HD for this video entry - - all_videos.extend(hd_videos_on_page) # Add HD videos found - if not hd_videos_on_page: # If no HD found on this page, add other videos found on this page - all_videos.extend(other_videos_on_page) - - if not videos: - print(f"No videos found on page {page} for query '{query}'.") - break # No videos on this page or subsequent ones - - - break # Success for this page attempt - - elif response.status_code == 429: - print(f"Pexels rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay}s for query '{query}'...") - time.sleep(retry_delay) - retry_delay *= 2 - else: - print(f"Pexels video search error {response.status_code}: {response.text} for query '{query}'") - break # Non-recoverable error or too many retries - - except requests.exceptions.RequestException as e: - print(f"Pexels video request exception (attempt {attempt+1}/{max_retries}) for query '{query}': {e}") - if attempt < max_retries - 1: - time.sleep(retry_delay) - retry_delay *= 2 - else: - break # Too many retries - - # Stop searching if no videos were found on the last page check - if not videos and page > 1: - print(f"Stopping Pexels video search for '{query}' as no videos were found on page {page}.") - break - - - if all_videos: - # Prioritize picking an HD video if any were collected - hd_options = [link for link in all_videos if 'hd' in link.lower()] # Simple check, might not be perfect - if hd_options: - random_video = random.choice(hd_options) - print(f"Selected random HD video from {len(hd_options)} options for query '{query}'.") - else: - # If no HD options, pick from the entire list (which includes SD and potentially others) - random_video = random.choice(all_videos) - print(f"Selected random video (likely SD or other quality) from {len(all_videos)} options for query '{query}' (no HD found).") - return random_video - else: - print(f"No suitable videos found after searching all pages for query '{query}'.") - return None - - -def search_pexels_images(query): - """Search for an image on Pexels by query.""" - if not PEXELS_API_KEY or PEXELS_API_KEY.startswith('YOUR_PEXELS_API_KEY'): - print("Pexels API key not set or is placeholder. Skipping image search.") + print(f"Pexels video search failed: {e}") return None - headers = {'Authorization': PEXELS_API_KEY} - url = "https://api.pexels.com/v1/search" - params = {"query": query, "per_page": 15, "orientation": "landscape"} # Increased per_page - - max_retries = 2 - retry_delay = 1 - for attempt in range(max_retries): - try: - response = requests.get(url, headers=headers, params=params, timeout=10) - - if response.status_code == 200: - data = response.json() - photos = data.get("photos", []) - if photos: - # Choose from the top results - photo = random.choice(photos[:min(10, len(photos))]) - img_url = photo.get("src", {}).get("original") - print(f"Found {len(photos)} images on Pexels for query '{query}', selected one.") - return img_url - else: - print(f"No images found for query: {query} on Pexels.") - return None - - elif response.status_code == 429: - print(f"Pexels rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay}s for query '{query}'...") - time.sleep(retry_delay) - retry_delay *= 2 - else: - print(f"Pexels image search error {response.status_code}: {response.text} for query '{query}'") - break # Non-recoverable error or too many retries - - except requests.exceptions.RequestException as e: - print(f"Pexels image request exception (attempt {attempt+1}/{max_retries}) for query '{query}': {e}") - if attempt < max_retries - 1: - time.sleep(retry_delay) - retry_delay *= 2 - else: - break # Too many retries - - print(f"No Pexels images found for query: {query} after all attempts.") - return None - -def search_google_images(query): - """Search for images on Google Images (fallback/news)""" +def search_pexels_images(query, api_key): + """Search Pexels for a random image.""" + headers = {'Authorization': api_key} + params = {"query": query, "per_page": 5, "orientation": "landscape"} try: - # Using a simple text search method; dedicated Google Image Search APIs are better but may require setup. - # This is prone to breaking if Google changes its HTML structure. - search_url = f"https://www.google.com/search?q={quote(query)}&tbm=isch" - headers = {"User-Agent": USER_AGENT} - print(f"Searching Google Images for: {query}") - response = requests.get(search_url, headers=headers, timeout=15) + response = requests.get("https://api.pexels.com/v1/search", headers=headers, params=params, timeout=10) response.raise_for_status() - soup = BeautifulSoup(response.text, "html.parser") - - # Find img tags, look for src attributes - # This is a very fragile parsing method, might need adjustment - img_tags = soup.find_all("img") - image_urls = [] - # Look for src attributes that start with http and aren't data URIs or specific gstatic patterns - # This is a heuristic and might grab incorrect URLs - for img in img_tags: - src = img.get("src", "") - if src.startswith("http") and "encrypted" not in src and "base64" not in src: # Basic filtering - image_urls.append(src) - elif img.get("data-src", "").startswith("http"): # Some sites use data-src - image_urls.append(img.get("data-src", "")) - - - # Filter out potential tiny icons or invalid URLs - valid_image_urls = [url for url in image_urls if url and "gstatic" not in url and url.split('.')[-1].lower() in ['jpg', 'jpeg', 'png', 'gif', 'bmp']] - - if valid_image_urls: - print(f"Found {len(valid_image_urls)} potential Google Images for query '{query}', picking one.") - return random.choice(valid_image_urls[:min(10, len(valid_image_urls))]) - else: - print(f"No valid Google Images found for query: {query}") - return None + photos = response.json().get("photos", []) + return random.choice(photos)["src"]["original"] if photos else None except Exception as e: - print(f"Error in Google Images search for query '{query}': {e}") - return None - - -def download_image(image_url, filename): - """Download an image from a URL to a local file with enhanced error handling.""" - if not image_url: - print("No image URL provided for download.") + print(f"Pexels image search failed: {e}") return None +def download_file(url, filename): + """Download a file from a URL.""" try: - headers = {"User-Agent": USER_AGENT} - # print(f"Attempting to download image from: {image_url}") # Keep less noisy - response = requests.get(image_url, headers=headers, stream=True, timeout=20) # Increased timeout + response = requests.get(url, stream=True, timeout=15) response.raise_for_status() - - # Check content type before saving - content_type = response.headers.get('Content-Type', '') - if not content_type.startswith('image/'): - print(f"URL did not return an image Content-Type ({content_type}). Skipping download.") - return None - - # Ensure the directory exists - os.makedirs(os.path.dirname(filename), exist_ok=True) - with open(filename, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) - - # print(f"Potential image downloaded to: {filename}") # Keep less noisy - - # Validate and process the image - try: - img = Image.open(filename) - img.verify() # Verify it's an image file - img = Image.open(filename) # Re-open after verify - if img.mode != 'RGB': - # print("Converting image to RGB") # Keep less noisy - img = img.convert('RGB') - img.save(filename) - # print(f"Image validated and converted to RGB: {filename}") # Keep less noisy - return filename - except Exception as e_validate: - print(f"Downloaded file is not a valid image or processing failed for {filename}: {e_validate}") - if os.path.exists(filename): - os.remove(filename) # Clean up invalid file - return None - - except requests.exceptions.RequestException as e_download: - print(f"Image download error for {image_url}: {e_download}") - if os.path.exists(filename): - os.remove(filename) # Clean up partially downloaded file - return None - except Exception as e_general: - print(f"General error during image download/processing for {filename}: {e_general}") - if os.path.exists(filename): - os.remove(filename) # Clean up if needed - return None - - -def download_video(video_url, filename): - """Download a video from a URL to a local file.""" - if not video_url: - print("No video URL provided for download.") - return None - try: - headers = {"User-Agent": USER_AGENT} # Some sites block direct downloads - print(f"Attempting to download video from: {video_url}") - response = requests.get(video_url, stream=True, timeout=45) # Increased timeout for videos - response.raise_for_status() - - # Check content type - content_type = response.headers.get('Content-Type', '') - if not content_type.startswith('video/'): - print(f"URL did not return a video Content-Type ({content_type}). Skipping download.") - return None - - os.makedirs(os.path.dirname(filename), exist_ok=True) - - # Use smaller chunk size for potentially large files - chunk_size = 4096 - downloaded_size = 0 - total_size = int(response.headers.get('content-length', 0)) - - with open(filename, 'wb') as f: - for chunk in response.iter_content(chunk_size=chunk_size): - f.write(chunk) - downloaded_size += len(chunk) - # Optional: Add progress updates if needed, but noisy for console - - print(f"Video downloaded successfully to: {filename} ({downloaded_size} bytes)") - # Basic check if the file seems valid (not just 0 bytes) - if os.path.exists(filename) and os.path.getsize(filename) > 1024: # Check for > 1KB - return filename - else: - print(f"Downloaded video file {filename} is too small or empty ({os.path.getsize(filename)} bytes).") - if os.path.exists(filename): - os.remove(filename) - return None - - except requests.exceptions.RequestException as e: - print(f"Video download error for {video_url}: {e}") - if os.path.exists(filename): - os.remove(filename) - return None - except Exception as e_general: - print(f"General error during video download for {filename}: {e_general}") - if os.path.exists(filename): - os.remove(filename) + return filename + except Exception as e: + print(f"Download failed: {e}") return None - -def generate_media_asset(prompt, uploaded_media_path): - """ - Generate a visual asset (video or image). Prioritizes user upload, - then searches Pexels video, then Pexels image, then Google Image. - Returns a dict: {'path': , 'asset_type': 'video' or 'image'}. - Ensures the returned path is within the TEMP_FOLDER. - """ +def generate_media(prompt, video_percentage, temp_folder): + """Generate media based on prompt and video percentage.""" safe_prompt = re.sub(r'[^\w\s-]', '', prompt).strip().replace(' ', '_') - if not TEMP_FOLDER: - print("Error: TEMP_FOLDER not set for generate_media_asset.") - return None - - os.makedirs(TEMP_FOLDER, exist_ok=True) # Ensure temp folder exists - - # 1. Use user uploaded media if provided - if uploaded_media_path and os.path.exists(uploaded_media_path): - print(f"Using user uploaded media: {uploaded_media_path}") - file_ext = os.path.splitext(uploaded_media_path)[1].lower() - asset_type = 'video' if file_ext in ['.mp4', '.mov', '.avi', '.webm', '.mkv'] else 'image' - # Copy the user file to temp folder to manage cleanup - temp_user_path = os.path.join(TEMP_FOLDER, f"user_upload_{os.path.basename(uploaded_media_path)}") - try: - # Use copy2 to preserve metadata like modification time - shutil.copy2(uploaded_media_path, temp_user_path) - print(f"Copied user upload to temp: {temp_user_path}") - return {"path": temp_user_path, "asset_type": asset_type} - # Handle case where source and destination might be the same (e.g., user uploads from temp) - except shutil.SameFileError: - print(f"User upload is already in temp folder: {uploaded_media_path}") - return {"path": uploaded_media_path, "asset_type": asset_type} - except Exception as e: - print(f"Error copying user file {uploaded_media_path}: {e}. Falling back to search.") - - - # 2. Search Pexels Videos (Increased chance) - # Let's slightly increase video search preference when available - if random.random() < 0.4: # Increase video search chance - video_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}_video.mp4") - print(f"Attempting Pexels video search for: '{prompt}'") - video_url = search_pexels_videos(prompt) # Use global API key - if video_url: - downloaded_video = download_video(video_url, video_file) - if downloaded_video: - print(f"Pexels video asset saved to {downloaded_video}") - return {"path": downloaded_video, "asset_type": "video"} - else: - print(f"Pexels video search failed or found no video for: '{prompt}'") - - # 3. Search Pexels Images - image_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}.jpg") - print(f"Attempting Pexels image search for: '{prompt}'") - image_url = search_pexels_images(prompt) # Use global API key - if image_url: - downloaded_image = download_image(image_url, image_file) - if downloaded_image: - print(f"Pexels image asset saved to {downloaded_image}") - return {"path": downloaded_image, "asset_type": "image"} - else: - print(f"Pexels image search failed or found no image for: '{prompt}'") - - # 4. Fallback: Search Google Images (especially useful for news/specific things Pexels might not have) - print(f"Attempting Google Images fallback for: '{prompt}'") - google_image_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}_google.jpg") - google_image_url = search_google_images(prompt) - if google_image_url: - downloaded_google_image = download_image(google_image_url, google_image_file) - if downloaded_google_image: - print(f"Google Image asset saved to {downloaded_google_image}") - return {"path": downloaded_google_image, "asset_type": "image"} - else: - print(f"Google Images fallback failed for: '{prompt}'") - - - # 5. Final Fallback: Generic Images if specific search failed - fallback_terms = ["nature", "city", "abstract", "background"] # More generic fallbacks - for term in fallback_terms: - print(f"Trying generic fallback image search with term: '{term}'") - fallback_file = os.path.join(TEMP_FOLDER, f"fallback_{term}.jpg") - fallback_url = search_pexels_images(term) # Use Pexels for fallbacks, global API key - if fallback_url: - downloaded_fallback = download_image(fallback_url, fallback_file) - if downloaded_fallback: - print(f"Generic fallback image saved to {downloaded_fallback}") - return {"path": downloaded_fallback, "asset_type": "image"} - else: - print(f"Generic fallback image download failed for term: '{term}'") - else: - print(f"Generic fallback image search failed for term: '{term}'") - - - print(f"Failed to generate any visual asset for prompt: '{prompt}' after all attempts.") + if random.random() < video_percentage / 100: + video_file = os.path.join(temp_folder, f"{safe_prompt}_video.mp4") + video_url = search_pexels_videos(prompt, PEXELS_API_KEY) + if video_url and download_file(video_url, video_file): + return {"path": video_file, "asset_type": "video"} + image_file = os.path.join(temp_folder, f"{safe_prompt}.jpg") + image_url = search_pexels_images(prompt, PEXELS_API_KEY) + if image_url and download_file(image_url, image_file): + return {"path": image_file, "asset_type": "image"} return None -def generate_silent_audio(duration, sample_rate=24000): - """Generate a silent WAV audio file lasting 'duration' seconds.""" - print(f"Generating {duration:.2f}s of silent audio.") - num_samples = int(duration * sample_rate) - silence = np.zeros(num_samples, dtype=np.float32) - # Use unique filename to avoid conflicts - # Ensure TEMP_FOLDER exists before generating path - if not TEMP_FOLDER: - print("Error: TEMP_FOLDER not set for generate_silent_audio.") - return None - os.makedirs(TEMP_FOLDER, exist_ok=True) - - silent_path = os.path.join(TEMP_FOLDER, f"silent_{abs(hash(duration)) % (10**8)}_{int(time.time())}.wav") +def generate_tts(text, voice, temp_folder): + """Generate TTS audio with fallback.""" + safe_text = re.sub(r'[^\w\s-]', '', text[:10]).strip().replace(' ', '_') + file_path = os.path.join(temp_folder, f"tts_{safe_text}.wav") try: - sf.write(silent_path, silence, sample_rate) - print(f"Silent audio generated: {silent_path}") - return silent_path - except Exception as e: - print(f"Error generating silent audio to {silent_path}: {e}") - return None - - -def generate_tts(text, voice='en'): - """ - Generate TTS audio using Kokoro, falling back to gTTS or silent audio if needed. - Ensures temp folder exists. - """ - if not text or not text.strip(): - print("TTS text is empty. Generating silent audio.") - return generate_silent_audio(duration=2.0) # Default silence for empty text - - if not TEMP_FOLDER: - print("Error: TEMP_FOLDER not set for generate_tts.") - return generate_silent_audio(duration=max(2.0, len(text.split()) * 0.4)) - - os.makedirs(TEMP_FOLDER, exist_ok=True) # Ensure temp folder exists - safe_text_hash = str(abs(hash(text)) % (10**10)) # Use a hash for potentially long text - file_path = os.path.join(TEMP_FOLDER, f"tts_{safe_text_hash}.wav") - - if os.path.exists(file_path): - # print(f"Using cached TTS for text hash '{safe_text_hash}'") # Keep less noisy + generator = pipeline(text, voice='af_heart', speed=0.9) + audio = next(generator)[2] + sf.write(file_path, audio, 24000) return file_path - - # Estimate duration based on word count (adjust factor as needed), used if TTS fails - target_duration_fallback = max(2.0, len(text.split()) * 0.4) - - # Use the global kokoro_initialized flag - if kokoro_initialized and pipeline: + except Exception: try: - print(f"Attempting Kokoro TTS for text: '{text[:50]}...'") - kokoro_voice = 'af_heart' if voice == 'en' else voice # Kokoro default American English voice - # Kokoro pipeline might return multiple segments for long text - generator = pipeline(text, voice=kokoro_voice, speed=1.0, split_pattern=r'\n+') # Use speed 1.0 - audio_segments = [] - total_kokoro_duration = 0 # Track actual generated audio duration - - # Some text might result in many small segments, let's limit total time spent on Kokoro - max_kokoro_total_time = 60 # seconds - - start_time = time.time() # Start time for total timeout check - - for i, (gs, ps, audio) in enumerate(generator): - if time.time() - start_time > max_kokoro_total_time: - print(f"Kokoro TTS total time exceeded {max_kokoro_total_time}s.") - break # Exit loop on total timeout - - audio_segments.append(audio) - segment_duration = len(audio) / 24000.0 # Assuming 24000 Hz sample rate - total_kokoro_duration += segment_duration - - if audio_segments: - full_audio = np.concatenate(audio_segments) if len(audio_segments) > 1 else audio_segments[0] - # Calculate actual duration - total_kokoro_duration = len(full_audio) / 24000.0 # Assuming 24000 Hz sample rate - sf.write(file_path, full_audio, 24000) # Use 24000Hz standard - # print(f"TTS audio saved to {file_path} (Kokoro, {total_kokoro_duration:.2f}s)") # Keep less noisy - return file_path - else: - print("Kokoro pipeline returned no audio segments.") - + tts = gTTS(text=text, lang='en') + mp3_path = os.path.join(temp_folder, f"tts_{safe_text}.mp3") + tts.save(mp3_path) + audio = AudioSegment.from_mp3(mp3_path) + audio.export(file_path, format="wav") + os.remove(mp3_path) + return file_path except Exception as e: - print(f"Error with Kokoro TTS: {e}") - # Continue to gTTS fallback - - try: - print(f"Falling back to gTTS for text: '{text[:50]}...'") - tts = gTTS(text=text, lang='en', slow=False) # Use standard speed - mp3_path = os.path.join(TEMP_FOLDER, f"tts_{safe_text_hash}.mp3") - tts.save(mp3_path) - audio = AudioSegment.from_mp3(mp3_path) - audio.export(file_path, format="wav") - if os.path.exists(mp3_path): - os.remove(mp3_path) # Clean up intermediate mp3 - # print(f"Fallback TTS saved to {file_path} (gTTS, {audio.duration_seconds:.2f}s)") # Keep less noisy - return file_path - except Exception as fallback_error: - print(f"Both TTS methods failed for text: '{text[:50]}...'. Error: {fallback_error}") - # Use the estimated duration for silent audio - print(f"Generating silent audio of estimated duration {target_duration_fallback:.2f}s.") - return generate_silent_audio(duration=target_duration_fallback) + print(f"TTS generation failed: {e}") + return None -def apply_kenburns_effect(clip, target_resolution, effect_type=None): - """Apply a smooth Ken Burns effect with a single movement pattern.""" +def resize_to_fill(clip, target_resolution): + """Resize and crop clip to fill target resolution.""" target_w, target_h = target_resolution clip_aspect = clip.w / clip.h target_aspect = target_w / target_h - - # Resize clip to fill target resolution while maintaining aspect ratio, then scale up - # This ensures the image covers the whole frame even after scaling and panning if clip_aspect > target_aspect: - # Wider than target: match height, scale width clip = clip.resize(height=target_h) + crop_amount = (clip.w - target_w) / 2 + clip = clip.crop(x1=crop_amount, x2=clip.w - crop_amount) else: - # Taller than target: match width, scale height - clip = clip.resize(width=target_w) - - # Now scale the resized clip up for the Ken Burns movement margin - initial_w, initial_h = clip.size - scale_factor = 1.15 # Scale up by 15% - new_width = int(initial_w * scale_factor) - new_height = int(initial_h * scale_factor) - clip = clip.resize(newsize=(new_width, new_height)) - - max_offset_x = new_width - target_w - max_offset_y = new_height - target_h - - available_effects = ["zoom-in", "zoom-out", "pan-left", "pan-right", "pan-up", "pan-down", "up-left", "down-right"] - if effect_type is None or effect_type == "random": - effect_type = random.choice(available_effects) - - # Define start and end positions of the top-left corner of the target_resolution window - start_x, start_y = 0, 0 - end_x, end_y = 0, 0 - start_zoom_relative = 1.0 # Relative to target_resolution size - end_zoom_relative = 1.0 - - # Set start/end positions and zoom based on effect type. - # Positions are top-left corner of the target frame within the scaled image coordinates (new_width, new_height). - if effect_type == "zoom-in": - start_zoom_relative = 1.0 # Start covering target_resolution size - end_zoom_relative = scale_factor # End covering target_resolution / scale_factor size (zoomed in) - # Stay centered in the *scaled* image - start_x = max_offset_x / 2 - start_y = max_offset_y / 2 - end_x = max_offset_x / 2 - end_y = max_offset_y / 2 - - elif effect_type == "zoom-out": - start_zoom_relative = scale_factor # Start zoomed in - end_zoom_relative = 1.0 # End at target_resolution size - # Stay centered in the *scaled* image - start_x = max_offset_x / 2 - start_y = max_offset_y / 2 - end_x = max_offset_x / 2 - end_y = max_offset_y / 2 - - # For pan effects, the crop size is constant (target_resolution, which corresponds to zoom_relative=1.0) - elif effect_type == "pan-left": - start_x = max_offset_x - start_y = max_offset_y / 2 - end_x = 0 - end_y = max_offset_y / 2 - elif effect_type == "pan-right": - start_x = 0 - start_y = max_offset_y / 2 - end_x = max_offset_x - end_y = max_offset_y / 2 - elif effect_type == "pan-up": - start_x = max_offset_x / 2 - start_y = max_offset_y - end_x = max_offset_x / 2 - end_y = 0 - elif effect_type == "pan-down": - start_x = max_offset_x / 2 - start_y = 0 - end_x = max_offset_x / 2 - end_y = max_offset_y - elif effect_type == "up-left": - start_x = max_offset_x - start_y = max_offset_y - end_x = 0 - end_y = 0 - elif effect_type == "down-right": - start_x = 0 - start_y = 0 - end_x = max_offset_x - end_y = max_offset_y - else: - # Default to pan-right if type is random but somehow invalid (shouldn't happen with random.choice) - effect_type = 'pan-right' - start_x = 0 - start_y = max_offset_y / 2 - end_x = max_offset_x - end_y = max_offset_y / 2 - print(f"Warning: Unexpected effect type '{effect_type}'. Defaulting to 'pan-right'.") - - - def transform_frame(get_frame, t): - frame = get_frame(t) - # Use a smooth ease-in/ease-out function - progress = t / clip.duration if clip.duration > 0 else 0 - eased_progress = 0.5 - 0.5 * math.cos(math.pi * progress) # Cosine easing - - # Interpolate zoom relative to target_resolution - current_zoom_relative = start_zoom_relative + (end_zoom_relative - start_zoom_relative) * eased_progress - - # Calculate crop size based on current zoom relative to target resolution - # If zoom_relative is 1, crop size is target_resolution. If zoom_relative is scale_factor, crop size is target_resolution/scale_factor - crop_w = int(target_w / current_zoom_relative) - crop_h = int(target_h / current_zoom_relative) - - # Interpolate position (top-left corner of the target frame within the scaled image) - current_x = start_x + (end_x - start_x) * eased_progress - current_y = start_y + (end_y - start_y) * eased_progress - - # Calculate the center point for cv2.getRectSubPix - center_x = current_x + crop_w / 2 - center_y = current_y + crop_h / 2 - - # Ensure center stays within the bounds of the scaled image (new_width, new_height) - center_x = max(crop_w / 2.0, min(center_x, new_width - crop_w / 2.0)) # Use float division - center_y = max(crop_h / 2.0, min(center_y, new_height - crop_h / 2.0)) - - - try: - # Perform the crop using cv2.getRectSubPix (expects floating point center) - # Ensure frame is a numpy array (moviepy returns numpy arrays) - # Clamp center coordinates just in case, although max/min should handle it - center_x = np.clip(center_x, 0, new_width) - center_y = np.clip(center_y, 0, new_height) - - # Ensure crop dimensions are positive integers - crop_w = max(1, crop_w) - crop_h = max(1, crop_h) - - # Handle cases where crop dimensions might exceed frame dimensions (shouldn't happen with correct logic) - crop_w = min(crop_w, frame.shape[1]) - crop_h = min(crop_h, frame.shape[0]) - - # Ensure crop size is not zero or negative - if crop_w <= 0 or crop_h <= 0: - print(f"Warning: Calculated crop size is non-positive ({crop_w}, {crop_h}) at t={t:.2f}s. Skipping crop/resize.") - return np.zeros((target_h, target_w, 3), dtype=np.uint8) # Return black frame - - - cropped_frame = cv2.getRectSubPix(frame, (crop_w, crop_h), (center_x, center_y)) - # Resize the cropped frame back to the target resolution - resized_frame = cv2.resize(cropped_frame, (target_w, target_h), interpolation=cv2.INTER_LANCZOS4) - - return resized_frame - except Exception as e: - # Log details helpful for debugging Ken Burns issues - frame_shape_info = frame.shape if frame is not None else 'None' - print(f"Error applying Ken Burns transform at t={t:.2f}s: {e}. Frame shape: {frame_shape_info}, Scaled Size: ({new_width}, {new_height}), Center: ({center_x:.2f}, {center_y:.2f}), Crop Size: ({crop_w}, {crop_h}), Target Size: ({target_w}, {target_h})") - # Return a black frame or placeholder in case of error - return np.zeros((target_h, target_w, 3), dtype=np.uint8) - - - # Apply the transformation function - return clip.fl(transform_frame) - - -def resize_to_fill(clip, target_resolution): - """Resize and crop a clip to fill the target resolution while maintaining aspect ratio.""" - target_w, target_h = target_resolution - clip_aspect = clip.w / clip.h - target_aspect = target_w / target_h - - # print(f"Resizing clip {clip.size} to fill target {target_resolution}") - - if clip_aspect > target_aspect: # Clip is wider than target - clip = clip.resize(height=target_h) - # Calculate crop amount to make width match target_w - crop_amount_x = max(0.0, (clip.w - target_w) / 2.0) # Use float division - # Ensure crop coordinates are integers - x1 = int(crop_amount_x) - x2 = int(clip.w - crop_amount_x) - # Handle potential edge cases with integer rounding - x2 = max(x1 + 1, x2) # Ensure at least 1 pixel width if needed - # Ensure crop region is within bounds - x1 = max(0, x1) - x2 = min(clip.w, x2) - - clip = clip.crop(x1=x1, x2=x2, y1=0, y2=clip.h) - else: # Clip is taller than target or same aspect clip = clip.resize(width=target_w) - # Calculate crop amount to make height match target_h - crop_amount_y = max(0.0, (clip.h - target_h) / 2.0) # Use float division - # Ensure crop coordinates are integers - y1 = int(crop_amount_y) - y2 = int(clip.h - crop_amount_y) - # Handle potential edge cases with integer rounding - y2 = max(y1 + 1, y2) # Ensure at least 1 pixel height if needed - # Ensure crop region is within bounds - y1 = max(0, y1) - y2 = min(clip.h, y2) - - clip = clip.crop(x1=0, x2=clip.w, y1=y1, y2=y2) - - # Final check and resize if dimensions are slightly off due to rounding - if clip.size != target_resolution: - print(f"Warning: Clip size {clip.size} after resize_to_fill does not match target {target_resolution}. Resizing again.") - clip = clip.resize(newsize=target_resolution) - - - # print(f"Clip resized to {clip.size}") + crop_amount = (clip.h - target_h) / 2 + clip = clip.crop(y1=crop_amount, y2=clip.h - crop_amount) return clip -def find_mp3_files(): - """Search for any MP3 files in the current directory and subdirectories.""" - mp3_files = [] - # Check relative paths first - for root, dirs, files in os.walk('.'): - for file in files: - if file.lower().endswith('.mp3'): - mp3_path = os.path.join(root, file) - # Exclude files that are likely temporary or part of internal libraries - if not any(keyword in mp3_path.lower() for keyword in ['temp', '.gradio', 'site-packages', 'dist-packages', 'venv', 'tmp']): # Added 'tmp' - mp3_files.append(mp3_path) - print(f"Found MP3 file: {mp3_path}") +def create_clip(media_path, asset_type, tts_path, duration, narration_text, text_color, text_size, caption_bg, target_resolution): + """Create a video clip with media, TTS, and subtitles.""" + try: + audio_clip = AudioFileClip(tts_path).audio_fadeout(0.2) + target_duration = audio_clip.duration + 0.2 + if asset_type == "video": + clip = VideoFileClip(media_path) + clip = resize_to_fill(clip, target_resolution) + clip = clip.loop(duration=target_duration) if clip.duration < target_duration else clip.subclip(0, target_duration) + else: # image + clip = ImageClip(media_path).set_duration(target_duration).resize(target_resolution).fadein(0.3).fadeout(0.3) + + if narration_text and caption_bg != "transparent": + words = narration_text.split() + chunks = [' '.join(words[i:i+5]) for i in range(0, len(words), 5)] + chunk_duration = audio_clip.duration / len(chunks) + subtitle_clips = [ + TextClip( + chunk, + fontsize=text_size, + color=text_color, + bg_color=caption_bg, + size=(target_resolution[0] * 0.8, None), + method='caption', + align='center' + ).set_position(('center', target_resolution[1] * 0.7)).set_start(i * chunk_duration).set_end((i + 1) * chunk_duration) + for i, chunk in enumerate(chunks) + ] + clip = CompositeVideoClip([clip] + subtitle_clips) - if mp3_files: - return mp3_files[0] # Return the first one found that isn't excluded - else: - # print("No user-provided MP3 files found in the current directory or subdirectories.") # Keep less noisy + clip = clip.set_audio(audio_clip) + return clip + except Exception as e: + print(f"Clip creation failed: {e}") return None - -def add_background_music(final_video, bg_music_path, bg_music_volume=0.08): - """Add background music to the final video.""" - if not bg_music_path or not os.path.exists(bg_music_path): - print("No valid background music path provided or file not found. Skipping background music.") - return final_video - +def add_background_music(final_video, custom_music_path, music_volume): + """Add background music to the video.""" try: - print(f"Adding background music from: {bg_music_path} with volume {bg_music_volume}") - bg_music = AudioFileClip(bg_music_path) - - # Loop background music if shorter than video - if bg_music.duration < final_video.duration: - loops_needed = math.ceil(final_video.duration / bg_music.duration) - bg_segments = [bg_music.copy() for _ in range(loops_needed)] # Use copy to avoid issues - bg_music = concatenate_audioclips(bg_segments) - # print(f"Looped background music to {bg_music.duration:.2f}s") # Keep less noisy - - # Subclip background music to match video duration - bg_music = bg_music.subclip(0, final_video.duration) - # print(f"Subclipped background music to {bg_music.duration:.2f}s") # Keep less noisy - - # Adjust volume - bg_music = bg_music.volumex(bg_music_volume) - # print(f"Set background music volume to {bg_music_volume}") # Keep less noisy - - # Composite audio - video_audio = final_video.audio - if video_audio: - # Ensure video audio matches video duration before compositing - if abs(video_audio.duration - final_video.duration) > 0.1: - print(f"Adjusting video audio duration ({video_audio.duration:.2f}s) to match video duration ({final_video.duration:.2f}s) for final mix") - try: - video_audio = video_audio.fx(vfx.speedx, factor=video_audio.duration / final_video.duration) - except Exception as e: - print(f"Error adjusting final video audio speed: {e}. Using original audio.") - pass # Proceed with original audio if speedx fails - - mixed_audio = CompositeAudioClip([video_audio, bg_music]) - # print("Composited video audio and background music") # Keep less noisy + if custom_music_path and os.path.exists(custom_music_path): + bg_music = AudioFileClip(custom_music_path) else: - # Handle case where video might not have audio track initially - mixed_audio = bg_music - print("Warning: Video had no original audio track, only adding background music.") - - final_video = final_video.set_audio(mixed_audio) - print("Background music added successfully.") + bg_music = AudioFileClip("default_music.mp3") # Assume a default music file exists + if bg_music.duration < final_video.duration: + bg_music = concatenate_audioclips([bg_music] * math.ceil(final_video.duration / bg_music.duration)) + bg_music = bg_music.subclip(0, final_video.duration).volumex(music_volume) + final_video = final_video.set_audio(CompositeAudioClip([final_video.audio, bg_music])) return final_video except Exception as e: - print(f"Error adding background music: {e}") - print("Continuing without background music.") + print(f"Background music failed: {e}") return final_video +# Gradio Interface +with gr.Blocks(title="AI Documentary Video Generator") as app: + ### Initial Inputs + with gr.Column(): + concept = gr.Textbox(label="Video Concept", placeholder="Enter your video concept...") + resolution = gr.Radio(["Full", "Short"], label="Resolution", value="Full") + captions = gr.Radio(["Yes", "No"], label="Captions", value="Yes") + video_percentage = gr.Slider(0, 100, label="Video Percentage", value=50) + text_color = gr.ColorPicker(label="Text Color", value="#FFFFFF") + text_size = gr.Slider(20, 60, label="Text Size", value=28) + caption_bg = gr.ColorPicker(label="Caption Background Color", value="transparent") + music_volume = gr.Slider(0, 1, label="Music Volume", value=0.08) + custom_music = gr.File(label="Upload Custom Background Music", type="file") + generate_script_btn = gr.Button("Generate Script") + + ### States + num_clips = gr.State(value=0) + titles_state = gr.State(value=[]) + initial_texts_state = gr.State(value=[]) + + ### Clip Editing Section + with gr.Column(visible=False) as clip_section: + clip_textboxes = [] + clip_files = [] + for i in range(10): # Max 10 clips + with gr.Row(): + text_box = gr.Textbox(label=f"Clip {i+1} Text", visible=False) + file_upload = gr.File(label=f"Upload Media for Clip {i+1}", type="file", visible=False) + clip_textboxes.append(text_box) + clip_files.append(file_upload) + generate_video_btn = gr.Button("Generate Video", visible=False) + + ### Output + video_output = gr.Video(label="Generated Video") + + ### Script Generation Logic + def generate_script_fn(concept): + script = generate_script(concept) + if not script: + return 0, [], [] + elements = parse_script(script) + titles = [e['prompt'] for e in elements if e['type'] == 'media'] + texts = [e['text'] for e in elements if e['type'] == 'tts'] + return len(titles), titles, texts + + def update_textboxes(texts): + return [gr.update(value=texts[i] if i < len(texts) else "", visible=i < len(texts)) for i in range(10)] + + def update_files(n): + return [gr.update(visible=i < n) for i in range(10)] -def create_clip(media_asset, tts_path, estimated_duration, target_resolution, - caption_enabled, caption_color, caption_size, caption_position, - caption_bg_color, caption_stroke_color, caption_stroke_width, - narration_text, segment_index): - """Create a video clip with synchronized subtitles and narration.""" - try: - print(f"Creating clip #{segment_index} from asset: {media_asset.get('path')}, type: {media_asset.get('asset_type')}") - media_path = media_asset.get('path') - asset_type = media_asset.get('asset_type') - - # Determine actual audio duration - audio_clip = None - audio_duration = estimated_duration # Default to estimated duration - target_clip_duration = estimated_duration # Default target duration - - if tts_path and os.path.exists(tts_path): - try: - audio_clip = AudioFileClip(tts_path).audio_fadeout(0.2) # Fade out TTS slightly - audio_duration = audio_clip.duration - # Ensure clip duration is slightly longer than audio for transitions/padding - target_clip_duration = audio_duration + 0.3 # Add a small buffer after TTS ends - # Ensure target duration is not excessively long - target_clip_duration = min(target_clip_duration, estimated_duration * 3 + 5) # Prevent very long clips if TTS audio is unexpectedly long - # Also ensure a minimum duration even if TTS is very short - target_clip_duration = max(target_clip_duration, 2.0) # Minimum clip duration 2 seconds - - - print(f"TTS audio duration: {audio_duration:.2f}s. Target clip duration: {target_clip_duration:.2f}s (estimated {estimated_duration:.2f}s)") - except Exception as e: - print(f"Error loading TTS audio clip {tts_path}: {e}. Using estimated duration {estimated_duration:.2f}s for clip.") - audio_clip = None # Ensure audio_clip is None if loading fails - target_clip_duration = estimated_duration # Fallback to estimated duration - target_clip_duration = max(target_clip_duration, 2.0) # Ensure minimum duration - - else: - # If no TTS path, use estimated duration as target, ensure minimum - target_clip_duration = max(estimated_duration, 2.0) - - - # Handle missing or invalid media first - if not media_path or not os.path.exists(media_path): - print(f"Skipping clip {segment_index}: Missing or invalid media file {media_path}") - # Create a black clip with silent audio for the target duration - clip = ColorClip(size=target_resolution, color=(0,0,0), duration=target_clip_duration) - print(f"Created placeholder black clip for segment {segment_index}") - # Add placeholder text if captions are enabled and text exists - if caption_enabled and narration_text and caption_color.lower() != "transparent" and narration_text.strip(): - txt_clip = TextClip( - "[Missing Media]\n" + narration_text, # Indicate missing media - fontsize=caption_size, - font='Arial-Bold', # Ensure this font is available - color=caption_color, - bg_color=caption_bg_color, - method='caption', - align='center', - stroke_width=caption_stroke_width, - stroke_color=caption_stroke_color, - size=(target_resolution[0] * 0.9, None) - ).set_position('center').set_duration(target_clip_duration) # Duration matches black clip - clip = CompositeVideoClip([clip, txt_clip]) - - # Add silent audio to the placeholder clip - silent_audio_path = generate_silent_audio(target_clip_duration) - if silent_audio_path and os.path.exists(silent_audio_path): - try: - silent_audio_clip = AudioFileClip(silent_audio_path) - # Ensure silent audio duration matches video clip duration - if abs(silent_audio_clip.duration - clip.duration) > 0.1: - silent_audio_clip = silent_audio_clip.fx(vfx.speedx, factor=silent_audio_clip.duration / clip.duration) - clip = clip.set_audio(silent_audio_clip) - except Exception as e: - print(f"Error setting silent audio to placeholder clip {segment_index}: {e}") - clip = clip.set_audio(None) # Set audio to None if silent audio fails loading - else: - clip = clip.set_audio(None) # Set audio to None if silent audio generation fails - - return clip # Return the placeholder clip - - # Process media if path is valid - if asset_type == "video": - try: - clip = VideoFileClip(media_path) - print(f"Loaded video clip from {media_path} with duration {clip.duration:.2f}s") - clip = resize_to_fill(clip, target_resolution) - if clip.duration < target_clip_duration: - print("Looping video clip") - # Loop the video to match the target duration - clip = clip.loop(duration=target_clip_duration) - else: - # Subclip the video to match the target duration - clip = clip.subclip(0, target_clip_duration) - clip = clip.fadein(0.2).fadeout(0.2) # Add simple transitions - print(f"Video clip processed to duration {clip.duration:.2f}s") - - except Exception as e: - print(f"Error processing video clip {media_path} for segment {segment_index}: {e}") - # Fallback to a black clip if video processing fails - print(f"Creating placeholder black clip instead for segment {segment_index}") - clip = ColorClip(size=target_resolution, color=(0,0,0), duration=target_clip_duration) - if caption_enabled and narration_text and caption_color.lower() != "transparent" and narration_text.strip(): - txt_clip = TextClip( - "[Video Error]\n" + narration_text, # Indicate video error - fontsize=caption_size, color=caption_color, bg_color=caption_bg_color, method='caption', align='center', - stroke_width=caption_stroke_width, stroke_color=caption_stroke_color, - size=(target_resolution[0] * 0.9, None) - ).set_position('center').set_duration(target_clip_duration) - clip = CompositeVideoClip([clip, txt_clip]) - - - elif asset_type == "image": - try: - img = Image.open(media_path) - # Ensure image is in RGB format before passing to ImageClip - if img.mode != 'RGB': - print("Converting image to RGB") - img = img.convert('RGB') - # ImageClip accepts numpy arrays - img_array = np.array(img) - img.close() # Close the PIL image - clip = ImageClip(img_array).set_duration(target_clip_duration) - else: - img.close() # Close the PIL image - clip = ImageClip(media_path).set_duration(target_clip_duration) - - # print(f"Loaded image clip from {media_path} with duration {clip.duration:.2f}s") # Keep less noisy - clip = apply_kenburns_effect(clip, target_resolution) # Ken Burns with random effect - clip = clip.fadein(0.3).fadeout(0.3) # Add simple transitions - # print(f"Image clip processed to duration {clip.duration:.2f}s with Ken Burns") # Keep less noisy - - - except Exception as e: - print(f"Error processing image clip {media_path} for segment {segment_index}: {e}") - # Fallback to a black clip if image processing fails - print(f"Creating placeholder black clip instead for segment {segment_index}") - clip = ColorClip(size=target_resolution, color=(0,0,0), duration=target_clip_duration) - if caption_enabled and narration_text and caption_color.lower() != "transparent" and narration_text.strip(): - txt_clip = TextClip( - "[Image Error]\n" + narration_text, # Indicate image error - fontsize=caption_size, color=caption_color, bg_color=caption_bg_color, method='caption', align='center', - stroke_width=caption_stroke_width, stroke_color=caption_stroke_color, - size=(target_resolution[0] * 0.9, None) - ).set_position('center').set_duration(target_clip_duration) - clip = CompositeVideoClip([clip, txt_clip]) - - else: - print(f"Unknown asset type '{asset_type}' for segment {segment_index}. Creating placeholder.") - # Create a placeholder black clip - clip = ColorClip(size=target_resolution, color=(0,0,0), duration=target_clip_duration) - if caption_enabled and narration_text and caption_color.lower() != "transparent" and narration_text.strip(): - txt_clip = TextClip( - "[Unknown Media Type Error]\n" + narration_text, # Indicate unknown type error - fontsize=caption_size, color=caption_color, bg_color=caption_bg_color, method='caption', align='center', - stroke_width=caption_stroke_width, stroke_color=caption_stroke_color, - size=(target_resolution[0] * 0.9, None) - ).set_position('center').set_duration(target_clip_duration) - clip = CompositeVideoClip([clip, txt_clip]) - - - # Set the audio for the clip - if audio_clip: - # Ensure audio clip duration matches video clip duration after processing - if abs(audio_clip.duration - clip.duration) > 0.1: # Allow slight difference (e.g., 100ms) - print(f"Adjusting audio duration ({audio_clip.duration:.2f}s) to match video duration ({clip.duration:.2f}s) for segment {segment_index}") - try: - audio_clip = audio_clip.fx(vfx.speedx, factor=audio_clip.duration / clip.duration) - except Exception as e: - print(f"Error adjusting audio speed for segment {segment_index}: {e}. Using original audio duration.") - # If speeding fails, maybe just loop or subclip the audio? Or regenerate silent audio. - # For now, if speedx fails, let's just attach the original audio and hope for the best timing wise. - pass # Keep the original audio_clip if speedx fails + generate_script_btn.click( + fn=generate_script_fn, + inputs=[concept], + outputs=[num_clips, titles_state, initial_texts_state] + ).then( + fn=update_textboxes, + inputs=[initial_texts_state], + outputs=clip_textboxes + ).then( + fn=update_files, + inputs=[num_clips], + outputs=clip_files + ).then( + fn=lambda: gr.update(visible=True), + outputs=[clip_section] + ).then( + fn=lambda: gr.update(visible=True), + outputs=[generate_video_btn] + ) - clip = clip.set_audio(audio_clip) - else: - # If TTS failed or audio loading failed, ensure video clip has no audio or silent audio - print(f"No valid audio for clip {segment_index}. Setting silent audio.") - silent_audio_path = generate_silent_audio(clip.duration) # Generate silent audio matching the clip's final duration - if silent_audio_path and os.path.exists(silent_audio_path): - try: - silent_audio_clip = AudioFileClip(silent_audio_path) - # Should match duration, but double check - if abs(silent_audio_clip.duration - clip.duration) > 0.1: - silent_audio_clip = silent_audio_clip.fx(vfx.speedx, factor=silent_audio_clip.duration / clip.duration) - clip = clip.set_audio(silent_audio_clip) - except Exception as e: - print(f"Error setting silent audio for segment {segment_index}: {e}") - clip = clip.set_audio(None) # Set audio to None if silent audio fails loading + ### Video Generation Logic + def generate_video_fn(resolution, captions, video_percentage, text_color, text_size, caption_bg, music_volume, custom_music, num_clips, titles, *clip_data): + texts = clip_data[:10] + files = clip_data[10:] + temp_folder = tempfile.mkdtemp() + target_resolution = (1920, 1080) if resolution == "Full" else (1080, 1920) + clips = [] + + for i in range(num_clips): + text = texts[i] + media_file = files[i] + title = titles[i] + if media_file: + ext = os.path.splitext(media_file)[1].lower() + media_path = os.path.join(temp_folder, f"clip_{i}{ext}") + shutil.copy(media_file, media_path) + asset_type = "video" if ext in ['.mp4', '.avi', '.mov'] else "image" else: - clip = clip.set_audio(None) # Set audio to None if silent audio generation fails - - - # Add subtitles if enabled and text exists - if caption_enabled and narration_text and caption_color.lower() != "transparent" and narration_text.strip(): - try: - # Determine total audio duration (using actual if available, else estimated) - # Use clip.duration for subtitle timing as the clip's duration is final - actual_clip_duration_for_subtitles = clip.duration - if actual_clip_duration_for_subtitles <= 0: - print(f"Clip duration is zero or negative for segment {segment_index}, cannot add subtitles.") - else: - # Simple word-based chunking for subtitles - words = narration_text.split() - # Calculate average word duration based on clip duration and word count - total_words = len(words) - average_word_duration = actual_clip_duration_for_subtitles / total_words if total_words > 0 else 0.5 # Default if no words - - subtitle_clips = [] - current_time = 0 - chunk_size = 6 # Words per caption chunk (adjust as needed for readability) - - for i in range(0, total_words, chunk_size): - chunk_words = words[i:i+chunk_size] - chunk_text = ' '.join(chunk_words) - # Estimate chunk duration based on word count * average word duration - estimated_chunk_duration = len(chunk_words) * average_word_duration - - start_time = current_time - # Ensure end time doesn't exceed the *clip* duration - end_time = min(current_time + estimated_chunk_duration, clip.duration) - # Ensure minimal duration for a chunk - if end_time - start_time < 0.1 and i + chunk_size < total_words: - end_time = min(start_time + 0.1, clip.duration) # Give it at least 0.1s - - if start_time >= end_time: break # Avoid 0 or negative duration clips - - - # Determine vertical position - if caption_position == "Top": - subtitle_y_position = int(target_resolution[1] * 0.05) # Slightly lower than top edge - elif caption_position == "Middle": - # Calculate vertical center, then subtract half the estimated text height - # Estimate text height based on font size and number of lines (adjust factor as needed) - estimated_text_lines = max(1, math.ceil(len(chunk_words) / chunk_size)) # Crude estimate, at least 1 line - estimated_total_text_height = estimated_text_lines * caption_size * 1.2 # 1.2 is line spacing approx - subtitle_y_position = int(target_resolution[1] * 0.5) - int(estimated_total_text_height / 2) - # Ensure position is not off-screen (allow negative slightly for vertical alignment) - # subtitle_y_position = max(0, subtitle_y_position) # Don't clamp to 0 for Middle, let moviepy handle it - - else: # Default to Bottom - # Position from the bottom edge - # positioning the top-left of the text box at 85% of height often looks good for bottom captions. - subtitle_y_position = int(target_resolution[1] * 0.85) # Top-left of text box is at 85% height - - - txt_clip = TextClip( - chunk_text, - fontsize=caption_size, - font='Arial-Bold', # Ensure this font is available or use a common system font - color=caption_color, - bg_color=caption_bg_color, # Use background color - method='caption', # Enables text wrapping - align='center', - stroke_width=caption_stroke_width, # Use stroke - stroke_color=caption_stroke_color, # Use stroke color - size=(target_resolution[0] * 0.9, None) # Caption width max 90% of video width - ).set_start(start_time).set_end(end_time) - - # Position is tuple ('center', y_position) - txt_clip = txt_clip.set_position(('center', subtitle_y_position)) - subtitle_clips.append(txt_clip) - current_time = end_time # Move to the end of the current chunk - - if subtitle_clips: - clip = CompositeVideoClip([clip] + subtitle_clips) - # print(f"Added {len(subtitle_clips)} subtitle chunks to clip {segment_index}.") # Keep less noisy - # else: - # print(f"No subtitle clips generated for segment {segment_index} (might be due to text/duration issues).") # Keep less noisy - - - except Exception as sub_error: - print(f"Error adding subtitles for segment {segment_index}: {sub_error}") - # Fallback to a single centered text overlay if detailed subtitling fails - try: - txt_clip = TextClip( - narration_text, - fontsize=caption_size, - font='Arial-Bold', - color=caption_color, - bg_color=caption_bg_color, - method='caption', - align='center', - stroke_width=caption_stroke_width, - stroke_color=caption_stroke_color, - size=(target_resolution[0] * 0.8, None) - ).set_position(('center', int(target_resolution[1] * 0.75))).set_duration(clip.duration) # Position slightly above bottom - clip = CompositeVideoClip([clip, txt_clip]) - print(f"Added simple fallback subtitle for segment {segment_index}.") - except Exception as fallback_sub_error: - print(f"Simple fallback subtitle failed for segment {segment_index}: {fallback_sub_error}") - - - # Ensure final clip duration is explicitly set (already done earlier based on audio) - # clip = clip.set_duration(clip.duration) - - # print(f"Clip {segment_index} created successfully: {clip.duration:.2f}s") # Keep less noisy - return clip - except Exception as e: - print(f"Critical error in create_clip for segment {segment_index}: {str(e)}") - # Create a black clip with error message if anything goes wrong during the main process - # Use a safe duration if previous duration calculation also failed - error_duration = target_clip_duration if 'target_clip_duration' in locals() and target_clip_duration > 0 else (estimated_duration if estimated_duration > 0 else 3.0) - print(f"Creating error placeholder black clip for segment {segment_index} with duration {error_duration:.2f}s.") - black_clip = ColorClip(size=target_resolution, color=(0,0,0), duration=error_duration) - error_text = f"Error in segment {segment_index}" - if narration_text: error_text += f":\n{narration_text[:50]}..." - error_txt_clip = TextClip( - error_text, - fontsize=30, - color="red", - align='center', - size=(target_resolution[0] * 0.9, None) - ).set_position('center').set_duration(black_clip.duration) # Error text duration matches placeholder - clip = CompositeVideoClip([black_clip, error_txt_clip]) - silent_audio_path = generate_silent_audio(clip.duration) # Generate silent audio matching placeholder duration - if silent_audio_path and os.path.exists(silent_audio_path): - try: - # Use the actual placeholder clip duration for silent audio - clip = clip.set_audio(AudioFileClip(silent_audio_path).subclip(0, clip.duration)) - except Exception as audio_e: - print(f"Error setting silent audio for error clip {segment_index}: {audio_e}") - clip = clip.set_audio(None) - else: - clip = clip.set_audio(None) - return clip - - -def fix_imagemagick_policy(): - """Attempt to fix ImageMagick security policies required by TextClip.""" - print("Attempting to fix ImageMagick security policies...") - - # Use the found binary path if available, otherwise use default list - if found_imagemagick_binary: - # Assuming policy.xml is relative to the binary path or in a standard location - # This is a heuristic, may need manual path depending on installation - # Normalize binary path to handle symlinks etc. - real_imagemagick_binary_path = os.path.realpath(found_imagemagick_binary) - binary_dir = os.path.dirname(real_imagemagick_binary_path) - policy_paths_to_check = [ - os.path.join(binary_dir, '..', 'etc', 'ImageMagick-7', 'policy.xml'), - os.path.join(binary_dir, '..', 'etc', 'ImageMagick-6', 'policy.xml'), - os.path.join(binary_dir, '..', 'etc', 'ImageMagick', 'policy.xml'), - os.path.join(binary_dir, '..', 'share', 'ImageMagick-7', 'policy.xml'), - os.path.join(binary_dir, '..', 'share', 'ImageMagick-6', 'policy.xml'), - os.path.join(binary_dir, '..', 'share', 'ImageMagick', 'policy.xml'), - # Add more paths relative to binary if needed - ] - # Add standard system paths as fallbacks - policy_paths_to_check.extend([ - "/etc/ImageMagick-6/policy.xml", - "/etc/ImageMagick-7/policy.xml", - "/etc/ImageMagick/policy.xml", - "/usr/local/etc/ImageMagick-7/policy.xml", # macports/homebrew path - "/usr/share/ImageMagick/policy.xml", - "/usr/share/ImageMagick-6/policy.xml", - "/usr/share/ImageMagick-7/policy.xml", - os.path.join(os.environ.get('MAGICK_HOME', '') if os.environ.get('MAGICK_HOME') else '.', 'policy.xml'), # Check MAGICK_HOME - ]) - else: - # Only check standard system paths if binary wasn't found - policy_paths_to_check = [ - "/etc/ImageMagick-6/policy.xml", - "/etc/ImageMagick-7/policy.xml", - "/etc/ImageMagick/policy.xml", - "/usr/local/etc/ImageMagick-7/policy.xml", # macports/homebrew path - "/usr/share/ImageMagick/policy.xml", - "/usr/share/ImageMagick-6/policy.xml", - "/usr/share/ImageMagick-7/policy.xml", - os.path.join(os.environ.get('MAGICK_HOME', '') if os.environ.get('MAGICK_HOME') else '.', 'policy.xml'), # Check MAGICK_HOME - ] - - - # Filter out empty paths and check existence, prioritize unique paths - existing_policy_paths = [] - seen_paths = set() - for path in policy_paths_to_check: - if path and os.path.exists(path) and path not in seen_paths: - existing_policy_paths.append(path) - seen_paths.add(path) - - - found_policy = None - if existing_policy_paths: - found_policy = existing_policy_paths[0] # Use the first unique one found - - if not found_policy: - print("No policy.xml found in common locations. TextClip may fail.") - print("Consider installing ImageMagick and checking its installation path and policy.xml location.") - return False - - print(f"Attempting to modify policy file at {found_policy}") - try: - # Create a backup - use a unique name - backup_path = f"{found_policy}.bak_aivgen_{int(time.time())}" - if os.path.exists(found_policy): - shutil.copy2(found_policy, backup_path) - print(f"Created backup at {backup_path}") - else: - print(f"Warning: Policy file {found_policy} not found at copy stage, cannot create backup.") - - - # Read the original policy file (handle potential permission issues) - policy_content = None - try: - with open(found_policy, 'r') as f: - policy_content = f.read() - except Exception as e: - print(f"Error reading policy file {found_policy}: {e}. Attempting with sudo cat...") - try: - # Use sudo cat to read if direct read fails - process = subprocess.Popen(['sudo', 'cat', found_policy], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, stderr = process.communicate() - if process.returncode == 0: - policy_content = stdout.decode('utf-8') - print("Read policy file content using sudo.") - else: - print(f"Failed to read policy file using sudo cat. Result code: {process.returncode}. Error: {stderr.decode('utf-8')}") - print("Manual intervention may be required.") - return False - except FileNotFoundError: - print(f"sudo command not found. Cannot read policy file with sudo.") - return False - except Exception as e_sudo_read: - print(f"Error executing sudo cat: {e_sudo_read}") - print("Manual intervention may be required.") - return False - - if policy_content is None: - print("Failed to read policy file content.") - return False - - # Use regex to find and replace the specific policy lines - # Allow read and write rights for PDF, EPS, PS, etc. potentially restricted formats - # Also ensure path policies allow reading/writing files - # Be more specific with replacements to avoid unintended side effects - modified_content = re.sub( - r'', # Added /? for self-closing tag - r'', # Ensure it ends with self-closing tag - modified_content - ) - - # Also handle a more general case if the above didn't match, but with caution - # This attempts to change any 'rights="none"' on 'coder' or 'path' domains - # if the specific patterns weren't matched. - def _replace_none_rights(match): - domain = match.group(1) - rest = match.group(2) - # Only replace if rights is currently "none" - if 'rights="none"' in match.group(0): - print(f"Applying general policy fix for domain '{domain}'") - return f'' - return match.group(0) # Return original if no "none" rights found - - modified_content = re.sub( - r'', - _replace_none_rights, - modified_content - ) - - - # Write the modified content back (handle potential permission issues) - try: - with open(found_policy, 'w') as f: - f.write(modified_content) - print("ImageMagick policies updated successfully (direct write).") - return True - except IOError as e: - print(f"Direct write failed: {e}. Attempting with sudo tee...") - # Fallback to using os.system with sudo tee if direct write fails - # This requires the user to be able to run sudo commands without a password prompt for the script's execution - # and tee needs to be available. - # Using subprocess is safer than os.system for piping - try: - # Write modified content to a temporary file first - # Ensure TEMP_FOLDER is set before creating a temp file path - if not TEMP_FOLDER: - print("Error: TEMP_FOLDER not set for sudo write fallback.") - return False - os.makedirs(TEMP_FOLDER, exist_ok=True) # Ensure temp folder exists - - temp_policy_file = os.path.join(TEMP_FOLDER, "temp_policy_modified.xml") - with open(temp_policy_file, 'w') as f: - f.write(modified_content) - - # Use sudo tee to overwrite the original file - # sudo tee < temp_file - cmd = ['sudo', 'tee', found_policy] - print(f"Executing: {' '.join(cmd)} < {temp_policy_file}") - - # Using subprocess with stdin redirection - with open(temp_policy_file, 'rb') as f_in: # Open in binary mode for input - process = subprocess.Popen(cmd, stdin=f_in, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, stderr = process.communicate() - - if process.returncode == 0: - print("ImageMagick policies updated successfully using sudo tee.") - return True - else: - print(f"Failed to update ImageMagick policies using sudo tee. Result code: {process.returncode}. Error: {stderr.decode('utf-8')}") - print("Please manually edit your policy.xml to grant read/write rights for coder and path domains.") - print("Example: Change to ") - return False - except FileNotFoundError: - print(f"sudo or tee command not found. Cannot write policy file with sudo.") - return False - except Exception as e_sudo_write: - print(f"Error executing sudo tee process: {e_sudo_write}") - print("Manual intervention may be required.") - return False - finally: - # Clean up the temporary file - if 'temp_policy_file' in locals() and os.path.exists(temp_policy_file): - os.remove(temp_policy_file) - - - except Exception as e_general: - print(f"General error during ImageMagick policy modification: {e_general}") - print("Manual intervention may be required.") - return False - - -# ---------------- Gradio Interface Functions ---------------- # - -def generate_script_and_show_editor(user_input, resolution_choice, - caption_enabled_choice, caption_color, - caption_size, caption_position, caption_bg_color, - caption_stroke_color, caption_stroke_width, - bg_music_file): # Added bg_music_file input here - """ - Generates the script, parses it, stores segments in state, - and prepares the UI updates to show the editing interface. - Uses yield to update status. - """ - global TEMP_FOLDER - # Clean up previous run's temp folder if it exists - if TEMP_FOLDER and os.path.exists(TEMP_FOLDER): - print(f"Cleaning up previous temp folder: {TEMP_FOLDER}") - try: - # Use onerror to log errors during cleanup - def onerror(func, path, exc_info): - print(f"Error cleaning up {path}: {exc_info[1]}") - shutil.rmtree(TEMP_FOLDER, onerror=onerror) - except Exception as e: - print(f"Error starting cleanup of temp folder {TEMP_FOLDER}: {e}") - - # Create a new unique temporary folder for this run - # Add a suffix based on time to minimize collision risk if cleanup fails - TEMP_FOLDER = tempfile.mkdtemp(prefix="aivgen_") - print(f"Created new temp folder: {TEMP_FOLDER}") - - # Store global style choices and music file path in state - run_config = { - "resolution": (1920, 1080) if resolution_choice == "Full (1920x1080)" else (1080, 1920), - "caption_enabled": caption_enabled_choice == "Yes", - "caption_color": caption_color, - "caption_size": caption_size, - "caption_position": caption_position, - "caption_bg_color": caption_bg_color, - "caption_stroke_color": caption_stroke_color, - "caption_stroke_width": caption_stroke_width, - "temp_folder": TEMP_FOLDER, # Store temp folder path - "bg_music_path": bg_music_file # Store background music file path - } - - # Initial status update and hide editing/video areas - # Yielding multiple updates in a list/tuple works for simultaneous updates - # The outputs need to match the order specified in the .click() outputs list - # Outputs list order: run_config_state, status_output, editing_area, final_video_output, script_preview_markdown, - # segment_text_inputs (MAX), segment_file_inputs (MAX), segment_editing_groups (MAX), segment_prompt_labels (MAX), - # segments_state (LAST) - num_dynamic_outputs_per_segment = 3 # Textbox, File, Group - num_total_dynamic_outputs = MAX_SEGMENTS_FOR_EDITING * num_dynamic_outputs_per_segment - - # Prepare initial updates for all dynamic components to be hidden - initial_textbox_updates = [gr.update(visible=False, value="") for _ in range(MAX_SEGMENTS_FOR_EDITING)] - initial_file_updates = [gr.update(visible=False, value=None) for _ in range(MAX_SEGMENTS_FOR_EDITING)] - initial_group_visibility_updates = [gr.update(visible=False) for _ in range(MAX_SEGMENTS_FOR_EDITING)] - # Initial prompt labels should be cleared and hidden - initial_label_updates = [gr.update(value="", visible=False) for _ in range(MAX_SEGMENTS_FOR_EDITING)] - - - yield (run_config, # 0 - gr.update(value="Generating script...", visible=True), # 1 - gr.update(visible=False), # 2 editing area - gr.update(value=None, visible=False), # 3 video output - gr.update(visible=False, value="### Generated Script Preview\n\nGenerating script..."), # 4 raw script preview - # Outputs for dynamic components (initially hide/clear all) - Indices 5 onwards - *initial_textbox_updates, # 5... - *initial_file_updates, # ... - *initial_group_visibility_updates, # ... - *initial_label_updates, # ... Update prompt labels visibility and value - [], # segments_state - This is the LAST element updated - ) - - - script_text = generate_script(user_input, OPENROUTER_API_KEY, OPENROUTER_MODEL) - - # Determine raw script preview content - raw_script_preview_content = f"### Generated Script Preview\n\n```\n{script_text}\n```" if script_text and not script_text.startswith("[Error]") else f"### Generated Script Preview\n\n{script_text}" - - if not script_text or script_text.startswith("[Error]"): - # Update status and keep editing/video areas hidden - yield (run_config, - gr.update(value=f"Script generation failed: {script_text}", visible=True), - gr.update(visible=False), - gr.update(value=None, visible=False), - gr.update(visible=True, value=raw_script_preview_content), # Show raw script preview on error - # Outputs for dynamic components (all hidden) - *[gr.update(visible=False, value="") for _ in range(MAX_SEGMENTS_FOR_EDITING)], - *[gr.update(visible=False, value=None) for _ in range(MAX_SEGMENTS_FOR_EDITING)], - *[gr.update(visible=False) for _ in range(MAX_SEGMENTS_FOR_EDITING)], - *[gr.update(value="", visible=False) for _ in range(MAX_SEGMENTS_FOR_EDITING)], # Clear prompt labels - [], # segments_state remains empty - ) - return # Stop execution - - - yield (run_config, - gr.update(value="Parsing script...", visible=True), - gr.update(visible=False), - gr.update(value=None, visible=False), - gr.update(visible=True, value=raw_script_preview_content), # Show raw script preview - *[gr.update(visible=False, value="") for _ in range(MAX_SEGMENTS_FOR_EDITING)], - *[gr.update(visible=False, value=None) for _ in range(MAX_SEGMENTS_FOR_EDITING)], - *[gr.update(visible=False) for _ in range(MAX_SEGMENTS_FOR_EDITING)], - *[gr.update(value="", visible=False) for _ in range(MAX_SEGMENTS_FOR_EDITING)], # Clear prompt labels - [], # segments_state will be updated next - ) - - - segments = parse_script(script_text) - - if not segments: - yield (run_config, - gr.update(value="Failed to parse script or script is empty after parsing.", visible=True), - gr.update(visible=False), - gr.update(value=None, visible=False), - gr.update(visible=True, value=raw_script_preview_content), # Show raw script preview - # Outputs for dynamic components (all hidden) - *[gr.update(visible=False, value="") for _ in range(MAX_SEGMENTS_FOR_EDITING)], - *[gr.update(visible=False, value=None) for _ in range(MAX_SEGMENTS_FOR_EDITING)], - *[gr.update(visible=False) for _ in range(MAX_SEGMENTS_FOR_EDITING)], - *[gr.update(value="", visible=False) for _ in range(MAX_SEGMENTS_FOR_EDITING)], # Clear prompt labels - [], # segments_state remains empty - ) - return # Stop execution - - - # Prepare updates for dynamic editing components based on parsed segments - textbox_updates = [] - file_updates = [] - group_visibility_updates = [] - label_updates = [] # Updates for prompt labels - - for i in range(MAX_SEGMENTS_FOR_EDITING): - if i < len(segments): - # Show group, populate text, clear file upload, set prompt label - textbox_updates.append(gr.update(value=segments[i]['text'], visible=True)) - file_updates.append(gr.update(value=None, visible=True)) # Clear previous uploads - group_visibility_updates.append(gr.update(visible=True)) - label_updates.append(gr.update(value=f"Segment {i+1} (Prompt: {segments[i]['original_prompt']})", visible=True)) # Set label value and show - else: - # Hide unused groups and clear their values - textbox_updates.append(gr.update(value="", visible=False)) - file_updates.append(gr.update(value=None, visible=False)) - group_visibility_updates.append(gr.update(visible=False)) - label_updates.append(gr.update(value="", visible=False)) # Clear label value and hide - - - # Final yield to update UI: show editing area, populate fields, update state - yield (run_config, # 0 - gr.update(value=f"Script generated with {len(segments)} segments. Edit segments below.", visible=True), # 1 - gr.update(visible=True), # 2 Show Editing area - gr.update(value=None, visible=False), # 3 Ensure video output is hidden and cleared - gr.update(visible=True, value=raw_script_preview_content), # 4 Show raw script preview - # Dynamic outputs - Indices 5 onwards - *textbox_updates, # 5 ... - *file_updates, # ... - *group_visibility_updates, # ... - *label_updates, # ... Update prompt labels visibility and value - segments, # LAST - Update the state with parsed segments - ) - - -# Update generate_video_from_edited to accept the unpacked dynamic arguments -# It will receive: run_config_val, segments_data_val, then MAX_SEGMENTS_FOR_EDITING text values, -# then MAX_SEGMENTS_FOR_EDITING file values, then bg_music_volume_val. -def generate_video_from_edited(run_config_val, segments_data_val, *dynamic_segment_inputs, bg_music_volume_val): - """ - Takes the edited segment data (text, uploaded files) and configuration, - and generates the final video. - Uses yield to update status. - """ - # Re-pack the dynamic inputs into lists - # dynamic_segment_inputs contains all text and file inputs from the UI, in order. - # The first MAX_SEGMENTS_FOR_EDITING are text inputs, the next MAX_SEGMENTS_FOR_EDITING are file inputs. - num_dynamic_per_segment = 2 # Textbox and File - expected_dynamic_count = MAX_SEGMENTS_FOR_EDITING * num_dynamic_per_segment - - if len(dynamic_segment_inputs) != expected_dynamic_count: - print(f"Error: Expected {expected_dynamic_count} dynamic inputs (text+files), but received {len(dynamic_segment_inputs)}.") - yield "Error: Mismatch in segment inputs received. Please regenerate script.", None - return # Cannot proceed with incorrect inputs - - segment_texts = list(dynamic_segment_inputs[:MAX_SEGMENTS_FOR_EDITING]) - segment_uploads = list(dynamic_segment_inputs[MAX_SEGMENTS_FOR_EDITING:]) - - - # Now use the re-packed lists and the correctly named arguments - run_config = run_config_val - segments_data = segments_data_val - bg_music_volume = bg_music_volume_val - - - if not segments_data: - yield "No segments to process. Generate script first.", None - return - - global TEMP_FOLDER - # Ensure TEMP_FOLDER is correctly set from run_config - TEMP_FOLDER = run_config.get("temp_folder") - if not TEMP_FOLDER or not os.path.exists(TEMP_FOLDER): - yield "Error: Temporary folder not found from run config. Please regenerate script.", None - # Attempt cleanup just in case temp folder existed but was invalid - if TEMP_FOLDER and os.path.exists(TEMP_FOLDER): - try: - shutil.rmtree(TEMP_FOLDER) - except Exception as e: - print(f"Error cleaning up invalid temp folder {TEMP_FOLDER}: {e}") - TEMP_FOLDER = None # Reset global - return - - # Extract other config from run_config - TARGET_RESOLUTION = run_config.get("resolution", (1920, 1080)) # Default if missing - CAPTION_ENABLED = run_config.get("caption_enabled", True) # Default if missing - CAPTION_COLOR = run_config.get("caption_color", "#FFFFFF") # Default if missing - CAPTION_SIZE = run_config.get("caption_size", 45) # Default if missing - CAPTION_POSITION = run_config.get("caption_position", "Bottom") # Default if missing - CAPTION_BG_COLOR = run_config.get("caption_bg_color", "rgba(0, 0, 0, 0.4)") # Default if missing - CAPTION_STROKE_COLOR = run_config.get("caption_stroke_color", "#000000") # Default if missing - CAPTION_STROKE_WIDTH = run_config.get("caption_stroke_width", 2) # Default if missing - BG_MUSIC_UPLOAD_PATH = run_config.get("bg_music_path") # Get uploaded music path from config - - - # Update segments_data with potentially edited text and uploaded file paths - # segment_texts and segment_uploads are lists of values from the Gradio components - processed_segments = [] - # Iterate up to the minimum of state segments and provided inputs - num_segments_to_process = min(len(segments_data), len(segment_texts), len(segment_uploads), MAX_SEGMENTS_FOR_EDITING) - - if num_segments_to_process == 0: - yield "No segments to process after reading editor inputs. Script might be empty or inputs missing.", None - # Clean up - if TEMP_FOLDER and os.path.exists(TEMP_FOLDER): - try: - shutil.rmtree(TEMP_FOLDER) - print(f"Cleaned up temp folder: {TEMP_FOLDER}") - except Exception as e: - print(f"Error cleaning up temp folder {TEMP_FOLDER}: {e}") - TEMP_FOLDER = None # Reset global - return - - - for i in range(num_segments_to_process): - segment = segments_data[i] # Get original segment data - processed_segment = segment.copy() # Make a copy - # Use edited text, strip whitespace - processed_segment['text'] = segment_texts[i].strip() if segment_texts[i] is not None else segment.get('text', '').strip() - # Use uploaded media path (will be None if nothing uploaded) - processed_segment['uploaded_media'] = segment_uploads[i] - processed_segments.append(processed_segment) - - - yield "Fixing ImageMagick policy...", None - # Call fix_imagemagick_policy again just before video generation as a safeguard - fix_imagemagick_policy() - - clips = [] - yield "Generating media and audio for clips...", None - - total_segments = len(processed_segments) - for idx, segment in enumerate(processed_segments): - yield f"Processing segment {idx+1}/{total_segments}: Generating media and audio...", None - print(f"\nProcessing segment {idx+1}/{total_segments} (Prompt: '{segment.get('original_prompt', 'N/A')[:30]}...')") - - # Determine media source: uploaded or generated - media_asset = generate_media_asset( - segment.get('original_prompt', 'background'), # Use original prompt for search if available, else a generic term - segment.get('uploaded_media') # Pass uploaded media path - ) - - # Generate TTS audio - tts_path = generate_tts(segment.get('text', '')) # Use edited text, default to empty string if None/missing - - # Create the video clip for this segment - yield f"Processing segment {idx+1}/{total_segments}: Creating clip...", None - clip = create_clip( - media_asset=media_asset if media_asset else {"path": None, "asset_type": None}, # Pass dummy if generate_media_asset failed - tts_path=tts_path, - estimated_duration=segment.get('duration', 3.0), # Use estimated duration as a fallback reference - target_resolution=TARGET_RESOLUTION, - caption_enabled=CAPTION_ENABLED, - caption_color=CAPTION_COLOR, - caption_size=CAPTION_SIZE, - caption_position=CAPTION_POSITION, - caption_bg_color=CAPTION_BG_COLOR, - caption_stroke_color=CAPTION_STROKE_COLOR, - caption_stroke_width=CAPTION_STROKE_WIDTH, - narration_text=segment.get('text', ''), # Pass narration text for captions - segment_index=idx+1 - ) - - if clip: - clips.append(clip) - else: - print(f"Skipping segment {idx+1} due to clip creation failure.") - # If create_clip returns None (shouldn't happen with fallback logic, but as safety) - # Add a placeholder black clip - placeholder_duration = segment.get('duration', 3.0) # Use estimated duration or default - placeholder_clip = ColorClip(size=TARGET_RESOLUTION, color=(0,0,0), duration=placeholder_duration) - silent_audio_path = generate_silent_audio(placeholder_duration) - if silent_audio_path and os.path.exists(silent_audio_path): - placeholder_clip = placeholder_clip.set_audio(AudioFileClip(silent_audio_path)) - error_text = f"Segment {idx+1} Failed" - if segment.get('text'): error_text += f":\n{segment['text'][:50]}..." - error_txt_clip = TextClip(error_text, fontsize=30, color="red", align='center', size=(TARGET_RESOLUTION[0] * 0.9, None)).set_position('center').set_duration(placeholder_clip.duration) # Error text duration matches placeholder - placeholder_clip = CompositeVideoClip([placeholder_clip, error_txt_clip]) - clips.append(placeholder_clip) - - - if not clips: - yield "No clips were successfully created. Video generation failed.", None - # Clean up - if TEMP_FOLDER and os.path.exists(TEMP_FOLDER): - try: - shutil.rmtree(TEMP_FOLDER) - print(f"Cleaned up temp folder: {TEMP_FOLDER}") - except Exception as e: - print(f"Error cleaning up temp folder {TEMP_FOLDER}: {e}") - TEMP_FOLDER = None # Reset global - return + media_asset = generate_media(title, video_percentage, temp_folder) + if not media_asset: + continue + media_path = media_asset['path'] + asset_type = media_asset['asset_type'] + + tts_path = generate_tts(text, 'en', temp_folder) + if not tts_path: + continue + + duration = max(3, len(text.split()) * 0.5) + clip = create_clip( + media_path, asset_type, tts_path, duration, text, + text_color, text_size, caption_bg if captions == "Yes" else "transparent", target_resolution + ) + if clip: + clips.append(clip) + + if not clips: + shutil.rmtree(temp_folder) + return None - yield "Concatenating clips...", None - print("\nConcatenating clips...") - try: final_video = concatenate_videoclips(clips, method="compose") - except Exception as e: - print(f"Error concatenating clips: {e}") - yield f"Error concatenating clips: {e}", None - # Clean up - if TEMP_FOLDER and os.path.exists(TEMP_FOLDER): - try: - shutil.rmtree(TEMP_FOLDER) - print(f"Cleaned up temp folder: {TEMP_FOLDER}") - except Exception as e: - print(f"Error cleaning up temp folder {TEMP_FOLDER}: {e}") - TEMP_FOLDER = None # Reset global - return - - - yield "Adding background music...", None - # Use the uploaded music path first, fall back to finding an MP3 - bg_music_to_use = BG_MUSIC_UPLOAD_PATH if BG_MUSIC_UPLOAD_PATH else find_mp3_files() - final_video = add_background_music(final_video, bg_music_to_use, bg_music_volume=bg_music_volume) # Use volume from input - - - yield f"Exporting final video to {OUTPUT_VIDEO_FILENAME}...", None - print(f"Exporting final video to {OUTPUT_VIDEO_FILENAME}...") - output_path = None - try: - # Use a temporary output file first for safety, within TEMP_FOLDER - temp_output_filename = os.path.join(TEMP_FOLDER, f"temp_final_video_{int(time.time())}.mp4") - final_video.write_videofile(temp_output_filename, codec='libx264', fps=24, preset='veryfast') - - # Ensure the destination directory for the final output exists (current dir) - os.makedirs(os.path.dirname(OUTPUT_VIDEO_FILENAME) or '.', exist_ok=True) - - # Move the final file to the intended location after successful export - final_output_path = OUTPUT_VIDEO_FILENAME - try: - shutil.move(temp_output_filename, final_output_path) - print(f"Final video saved as {final_output_path}") - output_path = final_output_path - except shutil.SameFileError: - print(f"Output path is the same as temp path, no move needed: {temp_output_filename}") - output_path = temp_output_filename - except Exception as e: - print(f"Error moving temporary file {temp_output_filename} to final destination {final_output_path}: {e}") - # If move fails, return the temp file path or None - output_path = temp_output_filename # Return temp path so user can access it - print(f"Returning video from temporary path: {output_path}") - - - except Exception as e: - print(f"Video export failed: {e}") - output_path = None - yield f"Video export failed: {e}", None # Provide error message in status - - # Clean up temporary folder - yield "Cleaning up temporary files...", output_path # Update status before cleanup - if TEMP_FOLDER and os.path.exists(TEMP_FOLDER): - try: - # Use onerror to log errors during cleanup - def onerror(func, path, exc_info): - print(f"Error cleaning up {path}: {exc_info[1]}") - shutil.rmtree(TEMP_FOLDER, onerror=onerror) - print(f"Cleaned up temp folder: {TEMP_FOLDER}") - except Exception as e: - print(f"Error starting cleanup of temp folder {TEMP_FOLDER}: {e}") - TEMP_FOLDER = None # Reset global - - yield "Done!", output_path # Final status update - - -# ---------------- Gradio Interface Definition (Blocks) ---------------- # - -# Need lists to hold the dynamic UI components for segments -segment_editing_groups = [] -segment_prompt_labels = [] # List to hold the prompt Labels -segment_text_inputs = [] -segment_file_inputs = [] + final_video = add_background_music(final_video, custom_music, music_volume) + final_video.write_videofile(OUTPUT_VIDEO_FILENAME, codec='libx264', fps=24) + shutil.rmtree(temp_folder) + return OUTPUT_VIDEO_FILENAME -with gr.Blocks() as demo: - gr.Markdown("# 🤖 AI Documentary Video Generator 🎬") - gr.Markdown("Enter a concept to generate a funny documentary script. You can then edit the script text and replace the suggested media for each segment before generating the final video.") - - # --- Global Settings --- - with gr.Accordion("Global Settings", open=True): - user_concept_input = gr.Textbox(label="Video Concept", placeholder="e.g., The secret life of pigeons, Why socks disappear in the laundry, The futility of alarm clocks...") - with gr.Row(): - resolution_radio = gr.Radio(["Full (1920x1080)", "Short (1080x1920)"], label="Video Resolution", value="Full (1920x1080)") - bg_music_volume_slider = gr.Slider(minimum=0, maximum=0.5, value=0.08, step=0.01, label="Background Music Volume", info="Lower volume keeps narration clear.") # Adjusted max volume - # Added music upload component - bg_music_upload = gr.File(label="Upload Background Music (MP3, WAV)", type="filepath", interactive=True, file_types=[".mp3", ".wav"]) - - - # --- Caption Settings --- - with gr.Accordion("Caption Settings", open=False): - caption_enabled_radio = gr.Radio(["Yes", "No"], label="Show Captions?", value="Yes") - with gr.Row(): - caption_color_picker = gr.ColorPicker(label="Caption Text Color", value="#FFFFFF") # Default white - caption_bg_color_picker = gr.ColorPicker(label="Caption Background Color (with transparency)", value="rgba(0, 0, 0, 0.4)") # Default semi-transparent black, slightly more opaque - with gr.Row(): - caption_size_slider = gr.Slider(minimum=20, maximum=80, value=45, step=1, label="Caption Font Size") # Adjusted max size - caption_stroke_width_slider = gr.Slider(minimum=0, maximum=5, value=2, step=0.5, label="Caption Stroke Width") - with gr.Row(): - caption_position_radio = gr.Radio(["Top", "Middle", "Bottom"], label="Caption Position", value="Bottom") - caption_stroke_color_picker = gr.ColorPicker(label="Caption Stroke Color", value="#000000") # Default black stroke - - - generate_script_btn = gr.Button("Generate Script", variant="primary") - - # --- Status and Script Output --- - status_output = gr.Label(label="Status", value="", visible=True) # Always visible - # Using Markdown to show raw script content - script_preview_markdown = gr.Markdown("### Generated Script Preview\n\nScript will appear here...", visible=False) # Initially hidden - - # --- State to hold parsed segments data and run config --- - segments_state = gr.State([]) # List of segment dictionaries - run_config_state = gr.State({}) # Dictionary for run configuration - - # --- Dynamic Editing Area (Initially hidden) --- - # We create MAX_SEGMENTS_FOR_EDITING groups, and show/hide them dynamically - with gr.Column(visible=False, elem_id="editing_area_id") as editing_area: # Added elem_id - gr.Markdown("### Edit Script Segments") - gr.Markdown("Review the AI-generated text and media suggestions below. Edit the text and/or upload your own image/video for any segment. If no file is uploaded, AI will fetch media based on the original prompt.") - for i in range(MAX_SEGMENTS_FOR_EDITING): - # Use gr.Group instead of gr.Box for compatibility - with gr.Group(visible=False) as segment_group: # Each group represents one segment - segment_editing_groups.append(segment_group) - # Use a Label to display the original prompt - it's non-interactive text - # The value will be updated by JS or Python outputs - segment_prompt_label = gr.Label( - f"Segment {i+1} Prompt:", # Initial placeholder text, will be overwritten - show_label=False, - ) - segment_prompt_labels.append(segment_prompt_label) - - - segment_text = gr.Textbox(label="Narration Text", lines=2, interactive=True) - segment_text_inputs.append(segment_text) - - segment_file = gr.File(label="Upload Custom Media (Image or Video)", type="filepath", interactive=True) - segment_file_inputs.append(segment_file) - - generate_video_btn = gr.Button("Generate Video", variant="primary") - - - # --- Final Video Output --- - final_video_output = gr.Video(label="Generated Video", visible=False) # Initially hidden - - # --- Event Handlers --- - - # Generate Script Button Click - # Outputs list must match the order of components being updated by yield in generate_script_and_show_editor - generate_script_btn.click( - fn=generate_script_and_show_editor, - inputs=[ - user_concept_input, - resolution_radio, - caption_enabled_radio, - caption_color_picker, - caption_size_slider, - caption_position_radio, - caption_bg_color_picker, - caption_stroke_color_picker, - caption_stroke_width_slider, - bg_music_upload # Added music upload input here - ], - outputs=[ - run_config_state, # 0 - status_output, # 1 - editing_area, # 2 Show/hide editing area column - final_video_output, # 3 Hide and clear video output - script_preview_markdown, # 4 Update raw script preview - # Outputs for dynamic components (visibility and value updates) - Indices 5 onwards - *segment_text_inputs, # 5 ... - *segment_file_inputs, # ... - *segment_editing_groups, # ... - *segment_prompt_labels, # ... Update prompt labels visibility and value - segments_state, # LAST - Update the state with parsed segments - ] - ) - - # Generate Video Button Click - # Inputs must match the definition of generate_video_from_edited generate_video_btn.click( - fn=generate_video_from_edited, - inputs=[ - run_config_state, # 1st arg - segments_state, # 2nd arg - # All segment text and file inputs will be collected by *dynamic_segment_inputs - *segment_text_inputs, - *segment_file_inputs, - bg_music_volume_slider # Last arg - ], - outputs=[status_output, final_video_output] # Yield status updates and final video + fn=generate_video_fn, + inputs=[resolution, captions, video_percentage, text_color, text_size, caption_bg, music_volume, custom_music, num_clips, titles_state] + clip_textboxes + clip_files, + outputs=[video_output] ) - # We don't need a segments_state.change JS handler anymore because the prompt labels - # are updated directly by the Python function via the outputs list. - # Removing the segments_state.change event listener entirely. - - -# Launch the interface -if __name__ == "__main__": - # Attempt ImageMagick policy fix on script startup - # This helps but might still require manual sudo depending on system config - fix_imagemagick_policy() - - print("Launching Gradio interface...") - - # Check if API keys are still placeholders (unlikely with hardcoded keys, but good practice) - if PEXELS_API_KEY.startswith('YOUR_PEXELS_API_KEY'): - print("Warning: PEXELS_API_KEY is not configured. Media search may fail.") - if OPENROUTER_API_KEY.startswith('YOUR_OPENROUTER_API_KEY'): - print("Warning: OPENROUTER_API_KEY is not configured. Script generation will fail.") - - demo.launch(share=True) # Set share=True to get a public link +app.launch(share=True) \ No newline at end of file