from kokoro import KPipeline import soundfile as sf import torch import soundfile as sf import os from moviepy.editor import VideoFileClip, AudioFileClip, ImageClip, ColorClip # Added ColorClip from PIL import Image import tempfile import random import cv2 import math import os, requests, io, time, re, random from moviepy.editor import ( VideoFileClip, concatenate_videoclips, AudioFileClip, ImageClip, CompositeVideoClip, TextClip, CompositeAudioClip ) import moviepy.video.fx.all as vfx import moviepy.config as mpy_config from pydub import AudioSegment from pydub.generators import Sine from PIL import Image, ImageDraw, ImageFont import numpy as np from bs4 import BeautifulSoup import base64 from urllib.parse import quote # pysrt is imported but not used in the provided code snippets, keeping for completeness # import pysrt from gtts import gTTS import gradio as gr # Import Gradio import shutil # Needed for temp folder cleanup # Initialize Kokoro TTS pipeline (using American English) # Ensure you have the required voice models downloaded for Kokoro if needed, # or it will fall back to gTTS. 'a' for American English uses voice 'af_heart'. try: pipeline = KPipeline(lang_code='a') print("Kokoro TTS pipeline initialized.") except Exception as e: print(f"Warning: Could not initialize Kokoro TTS pipeline: {e}. Will rely on gTTS.") pipeline = None # Set pipeline to None if initialization fails # Ensure ImageMagick binary is set (Adjust path as needed for your system) # This line requires imagemagick to be installed and the path correct. # If TextClip fails, check ImageMagick installation and policy.xml (handled by fix_imagemagick_policy). # Common paths: "/usr/bin/convert", "/usr/local/bin/convert", "C:\\Program Files\\ImageMagick-X.Y.Z-Q16\\convert.exe" # You might need to adjust this based on your OS and installation IMAGEMAGICK_BINARY_PATH = "/usr/bin/convert" # Default path, check your system if not os.path.exists(IMAGEMAGICK_BINARY_PATH): print(f"Warning: ImageMagick binary not found at {IMAGEMAGICK_BINARY_PATH}. TextClip may not work.") print("Please install ImageMagick or update the IMAGEMAGICK_BINARY_PATH.") mpy_config.change_settings({"IMAGEMAGICK_BINARY": IMAGEMAGICK_BINARY_PATH}) # ---------------- Global Configuration ---------------- # # Using the user's provided API keys PEXELS_API_KEY = 'BhJqbcdm9Vi90KqzXKAhnEHGsuFNv4irXuOjWtT761U49lRzo03qBGna' OPENROUTER_API_KEY = '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' OPENROUTER_MODEL = "mistralai/mistral-small-3.1-24b-instruct:free" # Or another preferred model OUTPUT_VIDEO_FILENAME = "final_video.mp4" USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" # Maximum number of script segments to display for editing MAX_SEGMENTS_FOR_EDITING = 15 # Global placeholder for the temporary folder, will be created per run TEMP_FOLDER = None # ---------------- Helper Functions ---------------- # # Most helper functions remain the same, but some might need minor adjustments # to use passed parameters instead of global state (e.g., resolution). # We'll update create_clip and add_background_music accordingly. def generate_script(user_input, api_key, model): """Generate documentary script with proper OpenRouter handling.""" if not api_key or api_key.startswith('YOUR_OPENROUTER_API_KEY'): print("OpenRouter API key not set or is placeholder. Skipping script generation.") return "[Error] API key not configured." headers = { 'Authorization': f'Bearer {api_key}', 'HTTP-Referer': 'https://your-domain.com', # Replace with your app's domain 'X-Title': 'AI Documentary Maker' } # Updated prompt based on user's specific requirements for tone and style prompt = f"""Short Documentary Script Generator Instructions: If I say "use this," output the script exactly as given. If I only give topics, generate a script based on them. If I provide a full script, rewrite it without any changes. Make everything short, simple, humorous, funny, and act serious but be humorous. Don't say anything off topic. Always end with a funny call to subscribe based on the video topic. Use normal conversational text, avoid AI phrases, humanize the statements. Always focus on negative or cynical views, avoid positive statements. The full script should be for a single video about one topic. Formatting Rules: Title in Square Brackets: Each section starts with a one-word title inside [ ] (max two words if necessary). This title will be used as a search term for Pexels footage. Casual & Funny Narration: Each section has 5-15 words of narration. Keep it natural, funny, and unpredictable. No Special Formatting: No bold, italics, or special characters. Generalized Search Terms: If a term is too specific, make it more general for Pexels search. Scene-Specific Writing: Each section describes only what should be shown in the video. Output Only the Script, make it funny/humorous/hilarious, and add a funny subscribe statement. No extra text, just the script. Example Output: [Cats] They plot world domination while napping. [Dogs] Loyalty is just a bribe for snacks. [Humans] The only species that pays to live on a planet they destroy. [Future] It looks suspiciously like the present, but with more screens. [Warning] Subscribe or a cat will steal your bandwidth. Now here is the Topic/script: {user_input} """ data = { 'model': model, 'messages': [{'role': 'user', 'content': prompt}], 'temperature': 0.7, # Increased temperature slightly for more unpredictable humor 'max_tokens': 500 # Limit token response to keep scripts short } try: response = requests.post( 'https://openrouter.ai/api/v1/chat/completions', headers=headers, json=data, timeout=45 # Increased timeout ) response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) response_data = response.json() if 'choices' in response_data and len(response_data['choices']) > 0: script_text = response_data['choices'][0]['message']['content'] # Basic post-processing to remove potential markdown code blocks if script_text.startswith("```") and script_text.endswith("```"): # Find the first and last ``` lines first_code_block = script_text.find("```") last_code_block = script_text.rfind("```") if first_code_block != -1 and last_code_block != -1 and first_code_block < last_code_block: # Extract content between the markers, removing the language specifier line if present content_start = script_text.find('\n', first_code_block) + 1 content_end = last_code_block script_text = script_text[content_start:content_end].strip() else: # Simple case, remove from start and end script_text = script_text.strip("` \n") return script_text else: print("Unexpected response format:", response_data) return "[Error] Unexpected API response format." except requests.exceptions.RequestException as e: print(f"API Request failed: {str(e)}") return f"[Error] API request failed: {str(e)}" except Exception as e: print(f"An unexpected error occurred during script generation: {e}") return f"[Error] An unexpected error occurred: {str(e)}" def parse_script(script_text): """ Parse the generated script into a list of segment dictionaries. Each dictionary includes original prompt, narration text, estimated duration, and placeholder for uploaded media. Handles potential API errors returned as strings. """ if script_text.startswith("[Error]"): print(f"Skipping parse due to script generation error: {script_text}") return [] segments = [] current_title = None current_text = "" try: lines = script_text.strip().splitlines() if not lines: print("Script text is empty.") return [] for line in lines: line = line.strip() if line.startswith("[") and "]" in line: bracket_start = line.find("[") bracket_end = line.find("]", bracket_start) if bracket_start != -1 and bracket_end != -1: # Add previous segment if title and text are found if current_title is not None and current_text.strip(): # Estimate duration based on word count (adjust factor as needed) duration = max(2.0, len(current_text.split()) * 0.4) # Minimum 2s, approx 0.4s per word segments.append({ "original_prompt": current_title.strip(), "text": current_text.strip(), "duration": duration, "uploaded_media": None # Placeholder for user uploaded file path }) current_title = line[bracket_start+1:bracket_end].strip() current_text = line[bracket_end+1:].strip() elif current_title: # Append text if no new title found but currently parsing a segment current_text += line + " " elif current_title: # Append text to the current segment current_text += line + " " # Ignore lines before the first [Title] # Add the last segment if current_title is not None and current_text.strip(): duration = max(2.0, len(current_text.split()) * 0.4) segments.append({ "original_prompt": current_title.strip(), "text": current_text.strip(), "duration": duration, "uploaded_media": None }) # Limit segments to MAX_SEGMENTS_FOR_EDITING if len(segments) > MAX_SEGMENTS_FOR_EDITING: print(f"Warning: Script generated {len(segments)} segments, limiting to {MAX_SEGMENTS_FOR_EDITING} for editing.") segments = segments[:MAX_SEGMENTS_FOR_EDITING] print(f"Parsed {len(segments)} segments.") return segments except Exception as e: print(f"Error parsing script: {e}") return [] # Pexels and Google Image search and download functions remain unchanged # Using the global PEXELS_API_KEY directly now. def search_pexels_videos(query): """Search for a video on Pexels by query and return a random HD video.""" if not PEXELS_API_KEY or PEXELS_API_KEY.startswith('YOUR_PEXELS_API_KEY'): print("Pexels API key not set or is placeholder. Skipping video search.") return None headers = {'Authorization': PEXELS_API_KEY} base_url = "https://api.pexels.com/videos/search" num_pages = 3 videos_per_page = 15 max_retries = 2 # Reduced retries for faster failure retry_delay = 1 search_query = query all_videos = [] for page in range(1, num_pages + 1): for attempt in range(max_retries): try: params = {"query": search_query, "per_page": videos_per_page, "page": page} response = requests.get(base_url, headers=headers, params=params, timeout=10) if response.status_code == 200: data = response.json() videos = data.get("videos", []) # Filter for HD videos first, then fallback to other qualities hd_videos_on_page = [] other_videos_on_page = [] for video in videos: video_files = video.get("video_files", []) for file in video_files: if file.get("quality") == "hd": hd_videos_on_page.append(file.get("link")) break # Found HD, move to next video file for this video entry # Collect other qualities just in case no HD is found on this page or in total other_videos_on_page.append(file.get("link")) all_videos.extend(hd_videos_on_page) # Add HD videos found if not hd_videos_on_page: # If no HD found on this page, add other videos all_videos.extend(other_videos_on_page) if not videos: print(f"No videos found on page {page} for query '{query}'.") break # No videos on this page or subsequent ones break # Success for this page attempt elif response.status_code == 429: print(f"Pexels rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay}s for query '{query}'...") time.sleep(retry_delay) retry_delay *= 2 else: print(f"Pexels video search error {response.status_code}: {response.text} for query '{query}'") break # Non-recoverable error or too many retries except requests.exceptions.RequestException as e: print(f"Pexels video request exception (attempt {attempt+1}/{max_retries}) for query '{query}': {e}") if attempt < max_retries - 1: time.sleep(retry_delay) retry_delay *= 2 else: break # Too many retries # Stop searching if no videos were found on the last page check if not videos and page > 1: print(f"Stopping Pexels video search for '{query}' as no videos were found on page {page}.") break if all_videos: # Prioritize picking an HD video if any were collected hd_options = [link for link in all_videos if 'hd' in link.lower()] # Simple check, might not be perfect if hd_options: random_video = random.choice(hd_options) print(f"Selected random HD video from {len(hd_options)} options for query '{query}'.") else: # If no HD options, pick from the entire list (which includes SD and potentially others) random_video = random.choice(all_videos) print(f"Selected random video (likely SD or other quality) from {len(all_videos)} options for query '{query}' (no HD found).") return random_video else: print(f"No suitable videos found after searching all pages for query '{query}'.") return None def search_pexels_images(query): """Search for an image on Pexels by query.""" if not PEXELS_API_KEY or PEXELS_API_KEY.startswith('YOUR_PEXELS_API_KEY'): print("Pexels API key not set or is placeholder. Skipping image search.") return None headers = {'Authorization': PEXELS_API_KEY} url = "https://api.pexels.com/v1/search" params = {"query": query, "per_page": 15, "orientation": "landscape"} # Increased per_page max_retries = 2 retry_delay = 1 for attempt in range(max_retries): try: response = requests.get(url, headers=headers, params=params, timeout=10) if response.status_code == 200: data = response.json() photos = data.get("photos", []) if photos: # Choose from the top results photo = random.choice(photos[:min(10, len(photos))]) img_url = photo.get("src", {}).get("original") print(f"Found {len(photos)} images on Pexels for query '{query}', selected one.") return img_url else: print(f"No images found for query: {query} on Pexels.") return None elif response.status_code == 429: print(f"Pexels rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay}s for query '{query}'...") time.sleep(retry_delay) retry_delay *= 2 else: print(f"Pexels image search error {response.status_code}: {response.text} for query '{query}'") break # Non-recoverable error or too many retries except requests.exceptions.RequestException as e: print(f"Pexels image request exception (attempt {attempt+1}/{max_retries}) for query '{query}': {e}") if attempt < max_retries - 1: time.sleep(retry_delay) retry_delay *= 2 else: break # Too many retries print(f"No Pexels images found for query: {query} after all attempts.") return None def search_google_images(query): """Search for images on Google Images (fallback/news)""" try: # Using a simple text search method; dedicated Google Image Search APIs are better but may require setup. # This is prone to breaking if Google changes its HTML structure. search_url = f"https://www.google.com/search?q={quote(query)}&tbm=isch" headers = {"User-Agent": USER_AGENT} print(f"Searching Google Images for: {query}") response = requests.get(search_url, headers=headers, timeout=15) response.raise_for_status() soup = BeautifulSoup(response.text, "html.parser") # Find img tags, look for src attributes # This is a very fragile parsing method, might need adjustment img_tags = soup.find_all("img") image_urls = [] # Look for src attributes that start with http and aren't data URIs or specific gstatic patterns # This is a heuristic and might grab incorrect URLs for img in img_tags: src = img.get("src", "") if src.startswith("http") and "encrypted" not in src and "base64" not in src: # Basic filtering image_urls.append(src) elif img.get("data-src", "").startswith("http"): # Some sites use data-src image_urls.append(img.get("data-src", "")) # Filter out potential tiny icons or invalid URLs valid_image_urls = [url for url in image_urls if url and "gstatic" not in url and url.split('.')[-1].lower() in ['jpg', 'jpeg', 'png', 'gif', 'bmp']] if valid_image_urls: print(f"Found {len(valid_image_urls)} potential Google Images for query '{query}', picking one.") return random.choice(valid_image_urls[:min(10, len(valid_image_urls))]) else: print(f"No valid Google Images found for query: {query}") return None except Exception as e: print(f"Error in Google Images search for query '{query}': {e}") return None def download_image(image_url, filename): """Download an image from a URL to a local file with enhanced error handling.""" if not image_url: print("No image URL provided for download.") return None try: headers = {"User-Agent": USER_AGENT} print(f"Attempting to download image from: {image_url}") response = requests.get(image_url, headers=headers, stream=True, timeout=20) # Increased timeout response.raise_for_status() # Check content type before saving content_type = response.headers.get('Content-Type', '') if not content_type.startswith('image/'): print(f"URL did not return an image Content-Type ({content_type}). Skipping download.") return None # Ensure the directory exists os.makedirs(os.path.dirname(filename), exist_ok=True) with open(filename, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) # print(f"Potential image downloaded to: {filename}") # Keep less noisy # Validate and process the image try: img = Image.open(filename) img.verify() # Verify it's an image file img = Image.open(filename) # Re-open after verify if img.mode != 'RGB': # print("Converting image to RGB") # Keep less noisy img = img.convert('RGB') img.save(filename) # print(f"Image validated and converted to RGB: {filename}") # Keep less noisy return filename except Exception as e_validate: print(f"Downloaded file is not a valid image or processing failed for {filename}: {e_validate}") if os.path.exists(filename): os.remove(filename) # Clean up invalid file return None except requests.exceptions.RequestException as e_download: print(f"Image download error for {image_url}: {e_download}") if os.path.exists(filename): os.remove(filename) # Clean up partially downloaded file return None except Exception as e_general: print(f"General error during image download/processing for {filename}: {e_general}") if os.path.exists(filename): os.remove(filename) # Clean up if needed return None def download_video(video_url, filename): """Download a video from a URL to a local file.""" if not video_url: print("No video URL provided for download.") return None try: headers = {"User-Agent": USER_AGENT} # Some sites block direct downloads print(f"Attempting to download video from: {video_url}") response = requests.get(video_url, stream=True, timeout=45) # Increased timeout for videos response.raise_for_status() # Check content type content_type = response.headers.get('Content-Type', '') if not content_type.startswith('video/'): print(f"URL did not return a video Content-Type ({content_type}). Skipping download.") return None os.makedirs(os.path.dirname(filename), exist_ok=True) # Use smaller chunk size for potentially large files chunk_size = 4096 downloaded_size = 0 total_size = int(response.headers.get('content-length', 0)) with open(filename, 'wb') as f: for chunk in response.iter_content(chunk_size=chunk_size): f.write(chunk) downloaded_size += len(chunk) # Optional: Add progress updates if needed, but noisy for console print(f"Video downloaded successfully to: {filename} ({downloaded_size} bytes)") # Basic check if the file seems valid (not just 0 bytes) if os.path.exists(filename) and os.path.getsize(filename) > 1024: # Check for > 1KB return filename else: print(f"Downloaded video file {filename} is too small or empty ({os.path.getsize(filename)} bytes).") if os.path.exists(filename): os.remove(filename) return None except requests.exceptions.RequestException as e: print(f"Video download error for {video_url}: {e}") if os.path.exists(filename): os.remove(filename) return None except Exception as e_general: print(f"General error during video download for {filename}: {e_general}") if os.path.exists(filename): os.remove(filename) return None def generate_media_asset(prompt, uploaded_media_path): """ Generate a visual asset (video or image). Prioritizes user upload, then searches Pexels video, then Pexels image, then Google Image. Returns a dict: {'path': , 'asset_type': 'video' or 'image'}. Ensures the returned path is within the TEMP_FOLDER. """ safe_prompt = re.sub(r'[^\w\s-]', '', prompt).strip().replace(' ', '_') os.makedirs(TEMP_FOLDER, exist_ok=True) # Ensure temp folder exists # 1. Use user uploaded media if provided if uploaded_media_path and os.path.exists(uploaded_media_path): print(f"Using user uploaded media: {uploaded_media_path}") file_ext = os.path.splitext(uploaded_media_path)[1].lower() asset_type = 'video' if file_ext in ['.mp4', '.mov', '.avi', '.webm', '.mkv'] else 'image' # Copy the user file to temp folder to manage cleanup temp_user_path = os.path.join(TEMP_FOLDER, f"user_upload_{os.path.basename(uploaded_media_path)}") try: # Use copy2 to preserve metadata like modification time shutil.copy2(uploaded_media_path, temp_user_path) print(f"Copied user upload to temp: {temp_user_path}") return {"path": temp_user_path, "asset_type": asset_type} # Handle case where source and destination might be the same (e.g., user uploads from temp) except shutil.SameFileError: print(f"User upload is already in temp folder: {uploaded_media_path}") return {"path": uploaded_media_path, "asset_type": asset_type} except Exception as e: print(f"Error copying user file {uploaded_media_path}: {e}. Falling back to search.") # 2. Search Pexels Videos (Increased chance) # Let's slightly increase video search preference when available if random.random() < 0.4: # Increase video search chance video_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}_video.mp4") print(f"Attempting Pexels video search for: '{prompt}'") video_url = search_pexels_videos(prompt) # Use global API key if video_url: downloaded_video = download_video(video_url, video_file) if downloaded_video: print(f"Pexels video asset saved to {downloaded_video}") return {"path": downloaded_video, "asset_type": "video"} else: print(f"Pexels video search failed or found no video for: '{prompt}'") # 3. Search Pexels Images image_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}.jpg") print(f"Attempting Pexels image search for: '{prompt}'") image_url = search_pexels_images(prompt) # Use global API key if image_url: downloaded_image = download_image(image_url, image_file) if downloaded_image: print(f"Pexels image asset saved to {downloaded_image}") return {"path": downloaded_image, "asset_type": "image"} else: print(f"Pexels image search failed or found no image for: '{prompt}'") # 4. Fallback: Search Google Images (especially useful for news/specific things Pexels might not have) print(f"Attempting Google Images fallback for: '{prompt}'") google_image_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}_google.jpg") google_image_url = search_google_images(prompt) if google_image_url: downloaded_google_image = download_image(google_image_url, google_image_file) if downloaded_google_image: print(f"Google Image asset saved to {downloaded_google_image}") return {"path": downloaded_google_image, "asset_type": "image"} else: print(f"Google Images fallback failed for: '{prompt}'") # 5. Final Fallback: Generic Images if specific search failed fallback_terms = ["nature", "city", "abstract", "background"] # More generic fallbacks for term in fallback_terms: print(f"Trying generic fallback image search with term: '{term}'") fallback_file = os.path.join(TEMP_FOLDER, f"fallback_{term}.jpg") fallback_url = search_pexels_images(term) # Use Pexels for fallbacks, global API key if fallback_url: downloaded_fallback = download_image(fallback_url, fallback_file) if downloaded_fallback: print(f"Generic fallback image saved to {downloaded_fallback}") return {"path": downloaded_fallback, "asset_type": "image"} else: print(f"Generic fallback image download failed for term: '{term}'") else: print(f"Generic fallback image search failed for term: '{term}'") print(f"Failed to generate any visual asset for prompt: '{prompt}' after all attempts.") return None def generate_silent_audio(duration, sample_rate=24000): """Generate a silent WAV audio file lasting 'duration' seconds.""" print(f"Generating {duration:.2f}s of silent audio.") num_samples = int(duration * sample_rate) silence = np.zeros(num_samples, dtype=np.float32) # Use unique filename to avoid conflicts silent_path = os.path.join(TEMP_FOLDER, f"silent_{abs(hash(duration)) % (10**8)}_{int(time.time())}.wav") try: sf.write(silent_path, silence, sample_rate) print(f"Silent audio generated: {silent_path}") return silent_path except Exception as e: print(f"Error generating silent audio to {silent_path}: {e}") return None def generate_tts(text, voice='en'): """ Generate TTS audio using Kokoro, falling back to gTTS or silent audio if needed. Ensures temp folder exists. """ if not text or not text.strip(): print("TTS text is empty. Generating silent audio.") return generate_silent_audio(duration=2.0) # Default silence for empty text os.makedirs(TEMP_FOLDER, exist_ok=True) # Ensure temp folder exists safe_text_hash = str(abs(hash(text)) % (10**10)) # Use a hash for potentially long text file_path = os.path.join(TEMP_FOLDER, f"tts_{safe_text_hash}.wav") if os.path.exists(file_path): # print(f"Using cached TTS for text hash '{safe_text_hash}'") # Keep less noisy return file_path # Estimate duration based on word count (adjust factor as needed), used if TTS fails target_duration_fallback = max(2.0, len(text.split()) * 0.4) if pipeline: try: print(f"Attempting Kokoro TTS for text: '{text[:50]}...'") kokoro_voice = 'af_heart' if voice == 'en' else voice # Kokoro default American English voice # Kokoro pipeline might return multiple segments for long text generator = pipeline(text, voice=kokoro_voice, speed=1.0, split_pattern=r'\n+') # Use speed 1.0 audio_segments = [] total_duration = 0 for i, (gs, ps, audio) in enumerate(generator): audio_segments.append(audio) total_duration += len(audio) / 24000.0 # Assuming 24000 Hz sample rate if audio_segments: full_audio = np.concatenate(audio_segments) if len(audio_segments) > 1 else audio_segments[0] sf.write(file_path, full_audio, 24000) # Use 24000Hz standard # print(f"TTS audio saved to {file_path} (Kokoro, {total_duration:.2f}s)") # Keep less noisy return file_path else: print("Kokoro pipeline returned no audio segments.") except Exception as e: print(f"Error with Kokoro TTS: {e}") # Continue to gTTS fallback try: print(f"Falling back to gTTS for text: '{text[:50]}...'") tts = gTTS(text=text, lang='en', slow=False) # Use standard speed mp3_path = os.path.join(TEMP_FOLDER, f"tts_{safe_text_hash}.mp3") tts.save(mp3_path) audio = AudioSegment.from_mp3(mp3_path) audio.export(file_path, format="wav") if os.path.exists(mp3_path): os.remove(mp3_path) # Clean up intermediate mp3 # print(f"Fallback TTS saved to {file_path} (gTTS, {audio.duration_seconds:.2f}s)") # Keep less noisy return file_path except Exception as fallback_error: print(f"Both TTS methods failed for text: '{text[:50]}...'. Error: {fallback_error}") # Use the estimated duration for silent audio print(f"Generating silent audio of estimated duration {target_duration_fallback:.2f}s.") return generate_silent_audio(duration=target_duration_fallback) def apply_kenburns_effect(clip, target_resolution, effect_type=None): """Apply a smooth Ken Burns effect with a single movement pattern.""" target_w, target_h = target_resolution clip_aspect = clip.w / clip.h target_aspect = target_w / target_h # Resize clip to fill target resolution while maintaining aspect ratio, then scale up # This ensures the image covers the whole frame even after scaling and panning if clip_aspect > target_aspect: # Wider than target: match height, scale width clip = clip.resize(height=target_h) else: # Taller than target: match width, scale height clip = clip.resize(width=target_w) # Now scale the resized clip up for the Ken Burns movement margin initial_w, initial_h = clip.size scale_factor = 1.15 # Scale up by 15% new_width = int(initial_w * scale_factor) new_height = int(initial_h * scale_factor) clip = clip.resize(newsize=(new_width, new_height)) max_offset_x = new_width - target_w max_offset_y = new_height - target_h available_effects = ["zoom-in", "zoom-out", "pan-left", "pan-right", "pan-up", "pan-down", "up-left", "down-right"] if effect_type is None or effect_type == "random": effect_type = random.choice(available_effects) # Define start and end positions of the top-left corner of the target_resolution window start_x, start_y = 0, 0 end_x, end_y = 0, 0 start_zoom_factor = 1.0 # Relative to the scaled image size end_zoom_factor = 1.0 # Set start/end positions based on effect type. Positions are top-left corner of the target frame within the scaled image. if effect_type == "zoom-in": start_zoom_factor = 1.0 # Starts covering the entire scaled image end_zoom_factor = scale_factor # Zooms to cover the original image size within the scaled frame # Stay centered start_x = max_offset_x / 2 # Top-left of the original image center start_y = max_offset_y / 2 end_x = max_offset_x / 2 end_y = max_offset_y / 2 # Note: The zoom factor here is relative to the FINAL frame size during the effect, # which is `target_resolution`. A zoom factor of 1 means crop size is `target_resolution`. # A zoom factor of `scale_factor` means crop size is `target_resolution / scale_factor`. # Let's redefine zoom factors to be relative to target_resolution for clarity start_zoom_relative = 1.0 # Start at target size end_zoom_relative = scale_factor # End zoomed in by scale factor def get_crop_size(zoom_relative): return int(target_w / zoom_relative), int(target_h / zoom_relative) # Adjust start/end positions to match the changing crop size to keep the center aligned def get_current_center(t): progress = t / clip.duration if clip.duration > 0 else 0 eased_progress = 0.5 - 0.5 * math.cos(math.pi * progress) current_zoom_relative = start_zoom_relative + (end_zoom_relative - start_zoom_relative) * eased_progress current_crop_w, current_crop_h = get_crop_size(current_zoom_relative) # Center position in the scaled image coordinates center_x = new_width / 2 center_y = new_height / 2 return center_x, center_y, current_crop_w, current_crop_h def transform_frame_zoom(get_frame, t): frame = get_frame(t) center_x, center_y, crop_w, crop_h = get_current_center(t) # Ensure center stays within bounds center_x = max(crop_w / 2, min(center_x, new_width - crop_w / 2)) center_y = max(crop_h / 2, min(center_y, new_height - crop_h / 2)) cropped_frame = cv2.getRectSubPix(frame, (crop_w, crop_h), (center_x, center_y)) resized_frame = cv2.resize(cropped_frame, (target_w, target_h), interpolation=cv2.INTER_LANCZOS4) return resized_frame return clip.fl(transform_frame_zoom) elif effect_type == "zoom-out": start_zoom_relative = scale_factor # Start zoomed in end_zoom_relative = 1.0 # End at target size def get_crop_size(zoom_relative): return int(target_w / zoom_relative), int(target_h / zoom_relative) def get_current_center(t): progress = t / clip.duration if clip.duration > 0 else 0 eased_progress = 0.5 - 0.5 * math.cos(math.pi * progress) current_zoom_relative = start_zoom_relative + (end_zoom_relative - start_zoom_relative) * eased_progress current_crop_w, current_crop_h = get_crop_size(current_zoom_relative) center_x = new_width / 2 center_y = new_height / 2 return center_x, center_y, current_crop_w, current_crop_h def transform_frame_zoom(get_frame, t): frame = get_frame(t) center_x, center_y, crop_w, crop_h = get_current_center(t) center_x = max(crop_w / 2, min(center_x, new_width - crop_w / 2)) center_y = max(crop_h / 2, min(center_y, new_height - crop_h / 2)) cropped_frame = cv2.getRectSubPix(frame, (crop_w, crop_h), (center_x, center_y)) resized_frame = cv2.resize(cropped_frame, (target_w, target_h), interpolation=cv2.INTER_LANCZOS4) return resized_frame return clip.fl(transform_frame_zoom) # For pan effects, the crop size is constant (target_resolution) # We just interpolate the top-left corner position crop_w, crop_h = target_w, target_h if effect_type == "pan-left": start_x = max_offset_x start_y = max_offset_y / 2 end_x = 0 end_y = max_offset_y / 2 elif effect_type == "pan-right": start_x = 0 start_y = max_offset_y / 2 end_x = max_offset_x end_y = max_offset_y / 2 elif effect_type == "pan-up": start_x = max_offset_x / 2 start_y = max_offset_y end_x = max_offset_x / 2 end_y = 0 elif effect_type == "pan-down": start_x = max_offset_x / 2 start_y = 0 end_x = max_offset_x / 2 end_y = max_offset_y elif effect_type == "up-left": start_x = max_offset_x start_y = max_offset_y end_x = 0 end_y = 0 elif effect_type == "down-right": start_x = 0 start_y = 0 end_x = max_offset_x end_y = max_offset_y else: # Default to pan-right if type is random but somehow invalid (shouldn't happen with random.choice) effect_type = 'pan-right' start_x = 0 start_y = max_offset_y / 2 end_x = max_offset_x end_y = max_offset_y / 2 print(f"Warning: Unexpected effect type '{effect_type}'. Defaulting to 'pan-right'.") def transform_frame_pan(get_frame, t): frame = get_frame(t) # Use a smooth ease-in/ease-out function progress = t / clip.duration if clip.duration > 0 else 0 eased_progress = 0.5 - 0.5 * math.cos(math.pi * progress) # Cosine easing # Interpolate position (top-left corner of the target frame) current_x = start_x + (end_x - start_x) * eased_progress current_y = start_y + (end_y - start_y) * eased_progress # Calculate the center point for cv2.getRectSubPix center_x = current_x + crop_w / 2 center_y = current_y + crop_h / 2 # Ensure center stays within the bounds of the scaled image center_x = max(crop_w / 2, min(center_x, new_width - crop_w / 2)) center_y = max(crop_h / 2, min(center_y, new_height - crop_h / 2)) try: # Perform the crop using cv2.getRectSubPix (expects floating point center) # Ensure frame is a numpy array (moviepy returns numpy arrays) # Clamp coordinates to avoid errors on edges # Note: cv2.getRectSubPix handles bounds clipping internally, but explicit checks can prevent NaNs center_x = np.clip(center_x, 0, new_width) center_y = np.clip(center_y, 0, new_height) cropped_frame = cv2.getRectSubPix(frame, (crop_w, crop_h), (center_x, center_y)) # Resize the cropped frame back to the target resolution (should already be target_resolution size) # This resize is actually redundant if crop_w, crop_h == target_w, target_h # but might be needed if bounds clipping changed effective size slightly? # Let's remove the resize if crop size == target size for efficiency # if (crop_w, crop_h) == (target_w, target_h): # resized_frame = cropped_frame # No need to resize # else: resized_frame = cv2.resize(cropped_frame, (target_w, target_h), interpolation=cv2.INTER_LANCZOS4) return resized_frame except Exception as e: print(f"Error applying Ken Burns transform at t={t:.2f}s: {e}") # Return a black frame or placeholder in case of error return np.zeros((target_h, target_w, 3), dtype=np.uint8) # Apply the panning transform return clip.fl(transform_frame_pan) def resize_to_fill(clip, target_resolution): """Resize and crop a clip to fill the target resolution while maintaining aspect ratio.""" target_w, target_h = target_resolution clip_aspect = clip.w / clip.h target_aspect = target_w / target_h # print(f"Resizing clip {clip.size} to fill target {target_resolution}") if clip_aspect > target_aspect: # Clip is wider than target clip = clip.resize(height=target_h) # Calculate crop amount to make width match target_w crop_amount_x = max(0, (clip.w - target_w) / 2) # Ensure crop coordinates are integers x1 = int(crop_amount_x) x2 = int(clip.w - crop_amount_x) clip = clip.crop(x1=x1, x2=x2, y1=0, y2=clip.h) else: # Clip is taller than target or same aspect clip = clip.resize(width=target_w) # Calculate crop amount to make height match target_h crop_amount_y = max(0, (clip.h - target_h) / 2) # Ensure crop coordinates are integers y1 = int(crop_amount_y) y2 = int(clip.h - crop_amount_y) clip = clip.crop(x1=0, x2=clip.w, y1=y1, y2=y2) # Final check and resize if dimensions are slightly off due to rounding if clip.size != target_resolution: print(f"Warning: Clip size {clip.size} after resize_to_fill does not match target {target_resolution}. Resizing again.") clip = clip.resize(newsize=target_resolution) # print(f"Clip resized to {clip.size}") return clip def find_mp3_files(): """Search for any MP3 files in the current directory and subdirectories.""" mp3_files = [] # Check relative paths first for root, dirs, files in os.walk('.'): for file in files: if file.lower().endswith('.mp3'): mp3_path = os.path.join(root, file) mp3_files.append(mp3_path) print(f"Found MP3 file: {mp3_path}") if mp3_files: return mp3_files[0] # Return the first one found else: # print("No MP3 files found in the current directory or subdirectories.") # Keep less noisy return None def add_background_music(final_video, bg_music_path, bg_music_volume=0.08): """Add background music to the final video.""" if not bg_music_path or not os.path.exists(bg_music_path): print("No valid background music path provided or file not found. Skipping background music.") return final_video try: print(f"Adding background music from: {bg_music_path}") bg_music = AudioFileClip(bg_music_path) # Loop background music if shorter than video if bg_music.duration < final_video.duration: loops_needed = math.ceil(final_video.duration / bg_music.duration) bg_segments = [bg_music.copy() for _ in range(loops_needed)] # Use copy to avoid issues bg_music = concatenate_audioclips(bg_segments) # print(f"Looped background music to {bg_music.duration:.2f}s") # Keep less noisy # Subclip background music to match video duration bg_music = bg_music.subclip(0, final_video.duration) # print(f"Subclipped background music to {bg_music.duration:.2f}s") # Keep less noisy # Adjust volume bg_music = bg_music.volumex(bg_music_volume) # print(f"Set background music volume to {bg_music_volume}") # Keep less noisy # Composite audio video_audio = final_video.audio if video_audio: # Ensure video audio matches video duration before compositing if abs(video_audio.duration - final_video.duration) > 0.1: print(f"Adjusting video audio duration ({video_audio.duration:.2f}s) to match video duration ({final_video.duration:.2f}s)") video_audio = video_audio.fx(vfx.speedx, factor=video_audio.duration / final_video.duration) mixed_audio = CompositeAudioClip([video_audio, bg_music]) # print("Composited video audio and background music") # Keep less noisy else: # Handle case where video might not have audio track initially mixed_audio = bg_music print("Warning: Video had no original audio track, only adding background music.") final_video = final_video.set_audio(mixed_audio) print("Background music added successfully.") return final_video except Exception as e: print(f"Error adding background music: {e}") print("Continuing without background music.") return final_video def create_clip(media_asset, tts_path, estimated_duration, target_resolution, caption_enabled, caption_color, caption_size, caption_position, caption_bg_color, caption_stroke_color, caption_stroke_width, narration_text, segment_index): """Create a video clip with synchronized subtitles and narration.""" try: print(f"Creating clip #{segment_index} from asset: {media_asset.get('path')}, type: {media_asset.get('asset_type')}") media_path = media_asset.get('path') asset_type = media_asset.get('asset_type') # Determine actual audio duration audio_clip = None audio_duration = estimated_duration # Default to estimated duration target_clip_duration = estimated_duration # Default target duration if tts_path and os.path.exists(tts_path): try: audio_clip = AudioFileClip(tts_path).audio_fadeout(0.2) # Fade out TTS slightly audio_duration = audio_clip.duration # Ensure clip duration is slightly longer than audio for transitions/padding target_clip_duration = audio_duration + 0.3 # Add a small buffer after TTS ends print(f"TTS audio duration: {audio_duration:.2f}s. Target clip duration: {target_clip_duration:.2f}s (estimated {estimated_duration:.2f}s)") except Exception as e: print(f"Error loading TTS audio clip {tts_path}: {e}. Using estimated duration {estimated_duration:.2f}s for clip.") audio_clip = None # Ensure audio_clip is None if loading fails target_clip_duration = estimated_duration # Fallback to estimated duration # Handle missing media first if not media_path or not os.path.exists(media_path): print(f"Skipping clip {segment_index}: Missing media file {media_path}") # Create a black clip with silent audio for the target duration clip = ColorClip(size=target_resolution, color=(0,0,0), duration=target_clip_duration) print(f"Created placeholder black clip for segment {segment_index}") # Add placeholder text if captions are enabled if caption_enabled and narration_text and caption_color.lower() != "transparent" and narration_text.strip(): txt_clip = TextClip( "[Missing Media]\n" + narration_text, # Indicate missing media fontsize=caption_size, font='Arial-Bold', # Ensure this font is available color=caption_color, bg_color=caption_bg_color, method='caption', align='center', stroke_width=caption_stroke_width, stroke_color=caption_stroke_color, size=(target_resolution[0] * 0.9, None) ).set_position('center').set_duration(target_clip_duration) # Duration matches black clip clip = CompositeVideoClip([clip, txt_clip]) # Add silent audio to the placeholder clip silent_audio_path = generate_silent_audio(target_clip_duration) if silent_audio_path and os.path.exists(silent_audio_path): try: silent_audio_clip = AudioFileClip(silent_audio_path) # Ensure silent audio duration matches video clip duration if abs(silent_audio_clip.duration - clip.duration) > 0.1: silent_audio_clip = silent_audio_clip.fx(vfx.speedx, factor=silent_audio_clip.duration / clip.duration) clip = clip.set_audio(silent_audio_clip) except Exception as e: print(f"Error adding silent audio to placeholder clip {segment_index}: {e}") clip = clip.set_audio(None) # Set audio to None if silent audio fails else: clip = clip.set_audio(None) # Set audio to None if silent audio generation fails return clip # Return the placeholder clip # Process media if path is valid if asset_type == "video": try: clip = VideoFileClip(media_path) print(f"Loaded video clip from {media_path} with duration {clip.duration:.2f}s") clip = resize_to_fill(clip, target_resolution) if clip.duration < target_clip_duration: print("Looping video clip") # Loop the video to match the target duration clip = clip.loop(duration=target_clip_duration) else: # Subclip the video to match the target duration clip = clip.subclip(0, target_clip_duration) clip = clip.fadein(0.2).fadeout(0.2) # Add simple transitions print(f"Video clip processed to duration {clip.duration:.2f}s") except Exception as e: print(f"Error processing video clip {media_path} for segment {segment_index}: {e}") # Fallback to a black clip if video processing fails print(f"Creating placeholder black clip instead for segment {segment_index}") clip = ColorClip(size=target_resolution, color=(0,0,0), duration=target_clip_duration) if caption_enabled and narration_text and caption_color.lower() != "transparent" and narration_text.strip(): txt_clip = TextClip( "[Video Error]\n" + narration_text, # Indicate video error fontsize=caption_size, color=caption_color, bg_color=caption_bg_color, method='caption', align='center', stroke_width=caption_stroke_width, stroke_color=caption_stroke_color, size=(target_resolution[0] * 0.9, None) ).set_position('center').set_duration(target_clip_duration) clip = CompositeVideoClip([clip, txt_clip]) elif asset_type == "image": try: img = Image.open(media_path) # Ensure image is in RGB format before passing to ImageClip if img.mode != 'RGB': print("Converting image to RGB") img = img.convert('RGB') # ImageClip accepts numpy arrays img_array = np.array(img) img.close() # Close the PIL image clip = ImageClip(img_array).set_duration(target_clip_duration) else: img.close() # Close the PIL image clip = ImageClip(media_path).set_duration(target_clip_duration) # print(f"Loaded image clip from {media_path} with duration {clip.duration:.2f}s") # Keep less noisy clip = apply_kenburns_effect(clip, target_resolution) # Ken Burns with random effect clip = clip.fadein(0.3).fadeout(0.3) # Add simple transitions # print(f"Image clip processed to duration {clip.duration:.2f}s with Ken Burns") # Keep less noisy except Exception as e: print(f"Error processing image clip {media_path} for segment {segment_index}: {e}") # Fallback to a black clip if image processing fails print(f"Creating placeholder black clip instead for segment {segment_index}") clip = ColorClip(size=target_resolution, color=(0,0,0), duration=target_clip_duration) if caption_enabled and narration_text and caption_color.lower() != "transparent" and narration_text.strip(): txt_clip = TextClip( "[Image Error]\n" + narration_text, # Indicate image error fontsize=caption_size, color=caption_color, bg_color=caption_bg_color, method='caption', align='center', stroke_width=caption_stroke_width, stroke_color=caption_stroke_color, size=(target_resolution[0] * 0.9, None) ).set_position('center').set_duration(target_clip_duration) clip = CompositeVideoClip([clip, txt_clip]) else: print(f"Unknown asset type '{asset_type}' for segment {segment_index}. Creating placeholder.") # Create a placeholder black clip clip = ColorClip(size=target_resolution, color=(0,0,0), duration=target_clip_duration) if caption_enabled and narration_text and caption_color.lower() != "transparent" and narration_text.strip(): txt_clip = TextClip( "[Unknown Media Type Error]\n" + narration_text, # Indicate unknown type error fontsize=caption_size, color=caption_color, bg_color=caption_bg_color, method='caption', align='center', stroke_width=caption_stroke_width, stroke_color=caption_stroke_color, size=(target_resolution[0] * 0.9, None) ).set_position('center').set_duration(target_clip_duration) clip = CompositeVideoClip([clip, txt_clip]) # Set the audio for the clip if audio_clip: # Ensure audio clip duration matches video clip duration after processing if abs(audio_clip.duration - clip.duration) > 0.1: # Allow slight difference (e.g., 100ms) print(f"Adjusting audio duration ({audio_clip.duration:.2f}s) to match video duration ({clip.duration:.2f}s) for segment {segment_index}") try: audio_clip = audio_clip.fx(vfx.speedx, factor=audio_clip.duration / clip.duration) except Exception as e: print(f"Error adjusting audio speed for segment {segment_index}: {e}. Using original audio duration.") # If speeding fails, maybe just loop or subclip the audio? Or regenerate silent audio. # For now, if speedx fails, let's just attach the original audio and hope for the best timing wise. pass # Keep the original audio_clip if speedx fails clip = clip.set_audio(audio_clip) else: # If TTS failed or audio loading failed, ensure video clip has no audio or silent audio print(f"No valid audio for clip {segment_index}. Setting silent audio.") silent_audio_path = generate_silent_audio(clip.duration) # Generate silent audio matching the clip's final duration if silent_audio_path and os.path.exists(silent_audio_path): try: silent_audio_clip = AudioFileClip(silent_audio_path) # Should match duration, but double check if abs(silent_audio_clip.duration - clip.duration) > 0.1: silent_audio_clip = silent_audio_clip.fx(vfx.speedx, factor=silent_audio_clip.duration / clip.duration) clip = clip.set_audio(silent_audio_clip) except Exception as e: print(f"Error setting silent audio for segment {segment_index}: {e}") clip = clip.set_audio(None) # Set audio to None if silent audio fails loading else: clip = clip.set_audio(None) # Set audio to None if silent audio generation fails # Add subtitles if enabled and text exists if caption_enabled and narration_text and caption_color.lower() != "transparent" and narration_text.strip(): try: # Determine total audio duration (using actual if available, else estimated) actual_audio_duration_for_subtitles = audio_duration if audio_clip else target_clip_duration # Simple word-based chunking for subtitles words = narration_text.split() # Calculate average word duration based on total audio duration and word count # This is a simple approach; for better sync, use a forced aligner (more complex) total_words = len(words) average_word_duration = actual_audio_duration_for_subtitles / total_words if total_words > 0 else 0.5 # Default if no words subtitle_clips = [] current_time = 0 chunk_size = 6 # Words per caption chunk (adjust as needed for readability) for i in range(0, total_words, chunk_size): chunk_words = words[i:i+chunk_size] chunk_text = ' '.join(chunk_words) # Estimate chunk duration based on word count * average word duration estimated_chunk_duration = len(chunk_words) * average_word_duration start_time = current_time # Ensure end time doesn't exceed the *clip* duration end_time = min(current_time + estimated_chunk_duration, clip.duration) if start_time >= end_time: break # Avoid 0 or negative duration clips # Determine vertical position if caption_position == "Top": subtitle_y_position = int(target_resolution[1] * 0.05) # Slightly lower than top edge elif caption_position == "Middle": subtitle_y_position = int(target_resolution[1] * 0.5) - int(caption_size * 1.2 / 2) # Center adjusted for text height else: # Default to Bottom subtitle_y_position = int(target_resolution[1] * 0.9) - int(caption_size * 1.2) # Slightly higher than bottom edge, accounting for multiple lines txt_clip = TextClip( chunk_text, fontsize=caption_size, font='Arial-Bold', # Ensure this font is available or use a common system font color=caption_color, bg_color=caption_bg_color, # Use background color method='caption', # Enables text wrapping align='center', stroke_width=caption_stroke_width, # Use stroke stroke_color=caption_stroke_color, # Use stroke color size=(target_resolution[0] * 0.9, None) # Caption width max 90% of video width ).set_start(start_time).set_end(end_time) # Position is tuple ('center', y_position) txt_clip = txt_clip.set_position(('center', subtitle_y_position)) subtitle_clips.append(txt_clip) current_time = end_time # Move to the end of the current chunk if subtitle_clips: clip = CompositeVideoClip([clip] + subtitle_clips) # print(f"Added {len(subtitle_clips)} subtitle chunks to clip {segment_index}.") # Keep less noisy # else: # print(f"No subtitle clips generated for segment {segment_index} (might be due to text/duration issues).") # Keep less noisy except Exception as sub_error: print(f"Error adding subtitles for segment {segment_index}: {sub_error}") # Fallback to a single centered text overlay if detailed subtitling fails try: txt_clip = TextClip( narration_text, fontsize=caption_size, font='Arial-Bold', color=caption_color, bg_color=caption_bg_color, method='caption', align='center', stroke_width=caption_stroke_width, stroke_color=caption_stroke_color, size=(target_resolution[0] * 0.8, None) ).set_position(('center', int(target_resolution[1] * 0.75))).set_duration(clip.duration) clip = CompositeVideoClip([clip, txt_clip]) print(f"Added simple fallback subtitle for segment {segment_index}.") except Exception as fallback_sub_error: print(f"Simple fallback subtitle failed for segment {segment_index}: {fallback_sub_error}") # Ensure final clip duration is explicitly set clip = clip.set_duration(clip.duration) # print(f"Clip {segment_index} created successfully: {clip.duration:.2f}s") # Keep less noisy return clip except Exception as e: print(f"Critical error in create_clip for segment {segment_index}: {str(e)}") # Create a black clip with error message if anything goes wrong during the main process error_duration = target_clip_duration if 'target_clip_duration' in locals() else (estimated_duration if estimated_duration else 3.0) print(f"Creating error placeholder black clip for segment {segment_index} with duration {error_duration:.2f}s.") black_clip = ColorClip(size=target_resolution, color=(0,0,0), duration=error_duration) error_text = f"Error in segment {segment_index}" if narration_text: error_text += f":\n{narration_text[:50]}..." error_txt_clip = TextClip( error_text, fontsize=30, color="red", align='center', size=(target_resolution[0] * 0.9, None) ).set_position('center').set_duration(error_duration) clip = CompositeVideoClip([black_clip, error_txt_clip]) silent_audio_path = generate_silent_audio(error_duration) if silent_audio_path and os.path.exists(silent_audio_path): try: clip = clip.set_audio(AudioFileClip(silent_audio_path)) except Exception as audio_e: print(f"Error setting silent audio for error clip {segment_index}: {audio_e}") clip = clip.set_audio(None) else: clip = clip.set_audio(None) return clip def fix_imagemagick_policy(): """Attempt to fix ImageMagick security policies required by TextClip.""" print("Attempting to fix ImageMagick security policies...") policy_paths = [ "/etc/ImageMagick-6/policy.xml", "/etc/ImageMagick-7/policy.xml", "/etc/ImageMagick/policy.xml", # Common symlink path "/usr/local/etc/ImageMagick-7/policy.xml", # macports/homebrew path "/usr/share/ImageMagick/policy.xml", # Another common path "/usr/share/ImageMagick-6/policy.xml", "/usr/share/ImageMagick-7/policy.xml", os.path.join(os.environ.get('MAGICK_HOME', ''), 'policy.xml') if os.environ.get('MAGICK_HOME') else '', # Check MAGICK_HOME # Add more paths if needed based on typical installations ] # Filter out empty paths policy_paths = [path for path in policy_paths if path and os.path.exists(path)] found_policy = None if policy_paths: found_policy = policy_paths[0] # Use the first one found if not found_policy: print("No policy.xml found in common locations. TextClip may fail.") print("Consider installing ImageMagick and checking its installation path and policy.xml location.") return False print(f"Attempting to modify policy file at {found_policy}") try: # Create a backup - use a unique name backup_path = f"{found_policy}.bak_aivgen_{int(time.time())}" if os.path.exists(found_policy): shutil.copy2(found_policy, backup_path) print(f"Created backup at {backup_path}") else: print(f"Warning: Policy file {found_policy} not found at copy stage, cannot create backup.") # Read the original policy file (handle potential permission issues) try: with open(found_policy, 'r') as f: policy_content = f.read() except Exception as e: print(f"Error reading policy file {found_policy}: {e}. Attempting with sudo cat...") try: # Use sudo cat to read if direct read fails process = subprocess.Popen(['sudo', 'cat', found_policy], stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = process.communicate() if process.returncode == 0: policy_content = stdout.decode('utf-8') print("Read policy file content using sudo.") else: print(f"Failed to read policy file using sudo cat. Error: {stderr.decode('utf-8')}") print("Manual intervention may be required.") return False except Exception as e_sudo_read: print(f"Error executing sudo cat: {e_sudo_read}") print("Manual intervention may be required.") return False # Use regex to find and replace the specific policy lines # Allow read and write rights for PDF, EPS, PS, etc. potentially restricted formats # Also ensure path policies allow reading/writing files # Be more specific with replacements to avoid unintended side effects modified_content = re.sub( r'', r'', modified_content ) # Write the modified content back (handle potential permission issues) try: with open(found_policy, 'w') as f: f.write(modified_content) print("ImageMagick policies updated successfully (direct write).") return True except IOError as e: print(f"Direct write failed: {e}. Attempting with sudo tee...") # Fallback to using os.system with sudo tee if direct write fails # This requires the user to be able to run sudo commands without a password prompt for the script's execution # and tee needs to be available. # Using tee is safer than sudo cp for writing potentially large content. try: # Write modified content to a temporary file first temp_policy_file = os.path.join(TEMP_FOLDER, "temp_policy_modified.xml") with open(temp_policy_file, 'w') as f: f.write(modified_content) # Use sudo tee to overwrite the original file # echo | sudo tee > /dev/null cmd = f'sudo tee {found_policy} > /dev/null' print(f"Executing: echo ... | {cmd}") # Using subprocess is safer than os.system for piping process = subprocess.Popen(['sudo', 'tee', found_policy], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = process.communicate(input=modified_content.encode('utf-8')) if process.returncode == 0: print("ImageMagick policies updated successfully using sudo tee.") return True else: print(f"Failed to update ImageMagick policies using sudo tee. Result code: {process.returncode}. Error: {stderr.decode('utf-8')}") print("Please manually edit your policy.xml to grant read/write rights for coder and path domains.") print("Example: Change to ") return False except Exception as e_sudo_write: print(f"Error executing sudo tee process: {e_sudo_write}") print("Manual intervention may be required.") return False finally: # Clean up the temporary file if 'temp_policy_file' in locals() and os.path.exists(temp_policy_file): os.remove(temp_policy_file) except Exception as e_general: print(f"General error during ImageMagick policy modification: {e_general}") print("Manual intervention may be required.") return False # Import subprocess for sudo commands in fix_imagemagick_policy import subprocess # ---------------- Gradio Interface Functions ---------------- # def generate_script_and_show_editor(user_input, resolution_choice, caption_enabled_choice, caption_color, caption_size, caption_position, caption_bg_color, caption_stroke_color, caption_stroke_width): """ Generates the script, parses it, stores segments in state, and prepares the UI updates to show the editing interface. Uses yield to update status. """ global TEMP_FOLDER # Clean up previous run's temp folder if it exists if TEMP_FOLDER and os.path.exists(TEMP_FOLDER): print(f"Cleaning up previous temp folder: {TEMP_FOLDER}") try: # Use onerror to log errors during cleanup def onerror(func, path, exc_info): print(f"Error cleaning up {path}: {exc_info[1]}") shutil.rmtree(TEMP_FOLDER, onerror=onerror) except Exception as e: print(f"Error starting cleanup of temp folder {TEMP_FOLDER}: {e}") # Create a new unique temporary folder for this run TEMP_FOLDER = tempfile.mkdtemp(prefix="aivgen_") print(f"Created new temp folder: {TEMP_FOLDER}") # Store global style choices in state or use them directly (let's store in state) # Gradio State can hold a single object. Let's use a dict. run_config = { "resolution": (1920, 1080) if resolution_choice == "Full (1920x1080)" else (1080, 1920), "caption_enabled": caption_enabled_choice == "Yes", "caption_color": caption_color, "caption_size": caption_size, "caption_position": caption_position, "caption_bg_color": caption_bg_color, "caption_stroke_color": caption_stroke_color, "caption_stroke_width": caption_stroke_width, "temp_folder": TEMP_FOLDER # Store temp folder path } # Initial status update and hide editing/video areas yield (run_config, gr.update(value="Generating script...", visible=True), gr.update(visible=False), # Hide editing area gr.update(value=None, visible=False), # Hide video output and clear value # Updates for dynamic components (initially hide/clear all) [gr.update(visible=False, value="") for _ in range(MAX_SEGMENTS_FOR_EDITING)], # Hide textboxes [gr.update(visible=False, value=None) for _ in range(MAX_SEGMENTS_FOR_EDITING)], # Hide file uploads [gr.update(visible=False) for _ in range(MAX_SEGMENTS_FOR_EDITING)], # Hide segment groups []) # Clear segments_state script_text = generate_script(user_input, OPENROUTER_API_KEY, OPENROUTER_MODEL) # Update raw script preview raw_script_preview = f"### Generated Script Preview\n\n```\n{script_text}\n```" if script_text else "### Generated Script Preview\n\nFailed to generate script." if not script_text or script_text.startswith("[Error]"): # Update status and keep editing/video areas hidden yield (run_config, gr.update(value=f"Script generation failed: {script_text}", visible=True), gr.update(visible=False), gr.update(value=None, visible=False), # Updates for dynamic components (all hidden) [gr.update(visible=False, value="") for _ in range(MAX_SEGMENTS_FOR_EDITING)], [gr.update(visible=False, value=None) for _ in range(MAX_SEGMENTS_FOR_EDITING)], [gr.update(visible=False) for _ in range(MAX_SEGMENTS_FOR_EDITING)], [], # segments_state remains empty raw_script_preview) # Update raw script preview return # Stop execution yield (run_config, gr.update(value="Parsing script...", visible=True), gr.update(visible=False), gr.update(value=None, visible=False), [gr.update(visible=False, value="") for _ in range(MAX_SEGMENTS_FOR_EDITING)], [gr.update(visible=False, value=None) for _ in range(MAX_SEGMENTS_FOR_EDITING)], [gr.update(visible=False) for _ in range(MAX_SEGMENTS_FOR_EDITING)], [], # segments_state will be updated next raw_script_preview) segments = parse_script(script_text) if not segments: yield (run_config, gr.update(value="Failed to parse script or script is empty after parsing.", visible=True), gr.update(visible=False), gr.update(value=None, visible=False), # Updates for dynamic components (all hidden) [gr.update(visible=False, value="") for _ in range(MAX_SEGMENTS_FOR_EDITING)], [gr.update(visible=False, value=None) for _ in range(MAX_SEGMENTS_FOR_EDITING)], [gr.update(visible=False) for _ in range(MAX_SEGMENTS_FOR_EDITING)], [], # segments_state remains empty raw_script_preview) # Update raw script preview return # Stop execution # Prepare updates for dynamic editing components textbox_updates = [] file_updates = [] group_visibility_updates = [] for i in range(MAX_SEGMENTS_FOR_EDITING): if i < len(segments): # Show group, populate text, clear file upload textbox_updates.append(gr.update(value=segments[i]['text'], visible=True)) file_updates.append(gr.update(value=None, visible=True)) # Clear previous uploads group_visibility_updates.append(gr.update(visible=True)) else: # Hide unused groups and clear their values textbox_updates.append(gr.update(value="", visible=False)) file_updates.append(gr.update(value=None, visible=False)) group_visibility_updates.append(gr.update(visible=False)) # Final yield to update UI: show editing area, populate fields, update state yield (run_config, gr.update(value=f"Script generated with {len(segments)} segments. Edit segments below.", visible=True), gr.update(visible=True), # Show Editing area gr.update(value=None, visible=False), # Ensure video output is hidden and cleared textbox_updates, # Update textboxes (visibility and value) file_updates, # Update file uploads (visibility and value) group_visibility_updates, # Update visibility of groups segments, # Update the state with parsed segments raw_script_preview) # Update raw script preview def generate_video_from_edited(run_config, segments_data, segment_texts, segment_uploads, bg_music_volume): """ Takes the edited segment data (text, uploaded files) and configuration, and generates the final video. Uses yield to update status. """ if not segments_data: yield "No segments to process. Generate script first.", None return global TEMP_FOLDER # Ensure TEMP_FOLDER is correctly set from run_config TEMP_FOLDER = run_config.get("temp_folder") if not TEMP_FOLDER or not os.path.exists(TEMP_FOLDER): yield "Error: Temporary folder not found from run config. Please regenerate script.", None # Attempt cleanup just in case temp folder existed but was invalid if TEMP_FOLDER and os.path.exists(TEMP_FOLDER): try: shutil.rmtree(TEMP_FOLDER) except Exception as e: print(f"Error cleaning up invalid temp folder {TEMP_FOLDER}: {e}") TEMP_FOLDER = None # Reset global return # Extract config from run_config TARGET_RESOLUTION = run_config.get("resolution", (1920, 1080)) # Default if missing CAPTION_ENABLED = run_config.get("caption_enabled", True) # Default if missing CAPTION_COLOR = run_config.get("caption_color", "#FFFFFF") # Default if missing CAPTION_SIZE = run_config.get("caption_size", 45) # Default if missing CAPTION_POSITION = run_config.get("caption_position", "Bottom") # Default if missing CAPTION_BG_COLOR = run_config.get("caption_bg_color", "rgba(0, 0, 0, 0.25)") # Default if missing CAPTION_STROKE_COLOR = run_config.get("caption_stroke_color", "#000000") # Default if missing CAPTION_STROKE_WIDTH = run_config.get("caption_stroke_width", 2) # Default if missing # Update segments_data with potentially edited text and uploaded file paths # segment_texts and segment_uploads are lists of values from the Gradio components processed_segments = [] for i, segment in enumerate(segments_data): if i < len(segment_texts) and i < len(segment_uploads): # Ensure we have corresponding input values processed_segment = segment.copy() # Make a copy # Use edited text, strip whitespace processed_segment['text'] = segment_texts[i].strip() if segment_texts[i] is not None else segment.get('text', '').strip() # Use uploaded media path (will be None if nothing uploaded) processed_segment['uploaded_media'] = segment_uploads[i] processed_segments.append(processed_segment) else: # This shouldn't happen if state and UI updates are in sync, but as a safeguard print(f"Warning: Missing input value(s) for segment index {i}. Using original segment data.") processed_segments.append(segment) # Append original if inputs are missing if not processed_segments: yield "No valid segments to process after editing.", None # Clean up if TEMP_FOLDER and os.path.exists(TEMP_FOLDER): try: shutil.rmtree(TEMP_FOLDER) print(f"Cleaned up temp folder: {TEMP_FOLDER}") except Exception as e: print(f"Error cleaning up temp folder {TEMP_FOLDER}: {e}") TEMP_FOLDER = None # Reset global return yield "Fixing ImageMagick policy...", None fix_imagemagick_policy() # Attempt policy fix before creating clips clips = [] yield "Generating media and audio for clips...", None total_segments = len(processed_segments) for idx, segment in enumerate(processed_segments): yield f"Processing segment {idx+1}/{total_segments}...", None print(f"\nProcessing segment {idx+1}/{total_segments} (Prompt: '{segment.get('original_prompt', 'N/A')[:30]}...')") # Determine media source: uploaded or generated media_asset = generate_media_asset( segment.get('original_prompt', 'background'), # Use original prompt for search if available, else a generic term segment.get('uploaded_media') # Pass uploaded media path ) # Generate TTS audio tts_path = generate_tts(segment.get('text', '')) # Use edited text, default to empty string if None/missing # Create the video clip for this segment clip = create_clip( media_asset=media_asset if media_asset else {"path": None, "asset_type": None}, # Pass dummy if generate_media_asset failed tts_path=tts_path, estimated_duration=segment.get('duration', 3.0), # Use estimated duration as a fallback reference target_resolution=TARGET_RESOLUTION, caption_enabled=CAPTION_ENABLED, caption_color=CAPTION_COLOR, caption_size=CAPTION_SIZE, caption_position=CAPTION_POSITION, caption_bg_color=CAPTION_BG_COLOR, caption_stroke_color=CAPTION_STROKE_COLOR, caption_stroke_width=CAPTION_STROKE_WIDTH, narration_text=segment.get('text', ''), # Pass narration text for captions segment_index=idx+1 ) if clip: clips.append(clip) else: print(f"Skipping segment {idx+1} due to clip creation failure.") # If create_clip returns None (shouldn't happen with fallback logic, but as safety) # Add a placeholder black clip placeholder_duration = segment.get('duration', 3.0) # Use estimated duration or default placeholder_clip = ColorClip(size=TARGET_RESOLUTION, color=(0,0,0), duration=placeholder_duration) silent_audio_path = generate_silent_audio(placeholder_duration) if silent_audio_path and os.path.exists(silent_audio_path): placeholder_clip = placeholder_clip.set_audio(AudioFileClip(silent_audio_path)) error_text = f"Segment {idx+1} Failed" if segment.get('text'): error_text += f":\n{segment['text'][:50]}..." error_txt_clip = TextClip(error_text, fontsize=30, color="red", align='center', size=(TARGET_RESOLUTION[0] * 0.9, None)).set_position('center').set_duration(placeholder_duration) placeholder_clip = CompositeVideoClip([placeholder_clip, error_txt_clip]) clips.append(placeholder_clip) if not clips: yield "No clips were successfully created. Video generation failed.", None # Clean up if TEMP_FOLDER and os.path.exists(TEMP_FOLDER): try: shutil.rmtree(TEMP_FOLDER) print(f"Cleaned up temp folder: {TEMP_FOLDER}") except Exception as e: print(f"Error cleaning up temp folder {TEMP_FOLDER}: {e}") TEMP_FOLDER = None # Reset global return yield "Concatenating clips...", None print("\nConcatenating clips...") try: final_video = concatenate_videoclips(clips, method="compose") except Exception as e: print(f"Error concatenating clips: {e}") yield f"Error concatenating clips: {e}", None # Clean up if TEMP_FOLDER and os.path.exists(TEMP_FOLDER): try: shutil.rmtree(TEMP_FOLDER) print(f"Cleaned up temp folder: {TEMP_FOLDER}") except Exception as e: print(f"Error cleaning up temp folder {TEMP_FOLDER}: {e}") TEMP_FOLDER = None # Reset global return yield "Adding background music...", None bg_music_path = find_mp3_files() # Find background music final_video = add_background_music(final_video, bg_music_path, bg_music_volume=bg_music_volume) # Use volume from input yield f"Exporting final video to {OUTPUT_VIDEO_FILENAME}...", None print(f"Exporting final video to {OUTPUT_VIDEO_FILENAME}...") output_path = None try: # Use a temporary output file first for safety, within TEMP_FOLDER temp_output_filename = os.path.join(TEMP_FOLDER, f"temp_final_video_{int(time.time())}.mp4") final_video.write_videofile(temp_output_filename, codec='libx264', fps=24, preset='veryfast') # Ensure the destination directory for the final output exists (current dir) os.makedirs(os.path.dirname(OUTPUT_VIDEO_FILENAME) or '.', exist_ok=True) # Move the final file to the intended location after successful export final_output_path = OUTPUT_VIDEO_FILENAME try: shutil.move(temp_output_filename, final_output_path) print(f"Final video saved as {final_output_path}") output_path = final_output_path except Exception as e: print(f"Error moving temporary file {temp_output_filename} to final destination {final_output_path}: {e}") # If move fails, return the temp file path or None output_path = temp_output_filename # Return temp path so user can access it print(f"Returning video from temporary path: {output_path}") except Exception as e: print(f"Error exporting video: {e}") output_path = None yield f"Video export failed: {e}", None # Provide error message in status # Clean up temporary folder yield "Cleaning up temporary files...", output_path # Update status before cleanup if TEMP_FOLDER and os.path.exists(TEMP_FOLDER): try: # Use onerror to log errors during cleanup def onerror(func, path, exc_info): print(f"Error cleaning up {path}: {exc_info[1]}") shutil.rmtree(TEMP_FOLDER, onerror=onerror) print(f"Cleaned up temp folder: {TEMP_FOLDER}") except Exception as e: print(f"Error starting cleanup of temp folder {TEMP_FOLDER}: {e}") TEMP_FOLDER = None # Reset global yield "Done!", output_path # Final status update # ---------------- Gradio Interface Definition (Blocks) ---------------- # # Need lists to hold the dynamic UI components for segments segment_editing_groups = [] segment_text_inputs = [] segment_file_inputs = [] with gr.Blocks() as demo: gr.Markdown("# 🤖 AI Documentary Video Generator 🎬") gr.Markdown("Enter a concept to generate a funny documentary script. You can then edit the script text and replace the suggested media for each segment before generating the final video.") # --- Global Settings --- with gr.Accordion("Global Settings", open=True): user_concept_input = gr.Textbox(label="Video Concept", placeholder="e.g., The secret life of pigeons, Why socks disappear in the laundry, The futility of alarm clocks...") with gr.Row(): resolution_radio = gr.Radio(["Full (1920x1080)", "Short (1080x1920)"], label="Video Resolution", value="Full (1920x1080)") bg_music_volume_slider = gr.Slider(minimum=0, maximum=0.5, value=0.08, step=0.01, label="Background Music Volume", info="Lower volume keeps narration clear.") # Adjusted max volume # --- Caption Settings --- with gr.Accordion("Caption Settings", open=False): caption_enabled_radio = gr.Radio(["Yes", "No"], label="Show Captions?", value="Yes") with gr.Row(): caption_color_picker = gr.ColorPicker(label="Caption Text Color", value="#FFFFFF") # Default white caption_bg_color_picker = gr.ColorPicker(label="Caption Background Color (with transparency)", value="rgba(0, 0, 0, 0.4)") # Default semi-transparent black, slightly more opaque with gr.Row(): caption_size_slider = gr.Slider(minimum=20, maximum=80, value=45, step=1, label="Caption Font Size") # Adjusted max size caption_stroke_width_slider = gr.Slider(minimum=0, maximum=5, value=2, step=0.5, label="Caption Stroke Width") with gr.Row(): caption_position_radio = gr.Radio(["Top", "Middle", "Bottom"], label="Caption Position", value="Bottom") caption_stroke_color_picker = gr.ColorPicker(label="Caption Stroke Color", value="#000000") # Default black stroke generate_script_btn = gr.Button("Generate Script", variant="primary") # --- Status and Script Output --- status_output = gr.Label(label="Status", value="", visible=True) # Always visible # Using Markdown to show raw script content script_preview_markdown = gr.Markdown("### Generated Script Preview\n\nScript will appear here...", visible=False) # Initially hidden # --- State to hold parsed segments data and run config --- segments_state = gr.State([]) # List of segment dictionaries run_config_state = gr.State({}) # Dictionary for run configuration # --- Dynamic Editing Area (Initially hidden) --- # We create MAX_SEGMENTS_FOR_EDITING groups, and show/hide them dynamically with gr.Column(visible=False) as editing_area: gr.Markdown("### Edit Script Segments") gr.Markdown("Review the AI-generated text and media suggestions below. Edit the text and/or upload your own image/video for any segment. If no file is uploaded, AI will fetch media based on the original prompt.") for i in range(MAX_SEGMENTS_FOR_EDITING): # Use gr.Box for better visual grouping with gr.Box(visible=False) as segment_group: # Each group represents one segment segment_editing_groups.append(segment_group) # Use a Label to display the original prompt - it's non-interactive text segment_prompt_label = gr.Label(f"Segment {i+1} Prompt:", show_label=False) # Label will be set by JS # We'll update the value of this label using JS/state change segment_text = gr.Textbox(label="Narration Text", lines=2, interactive=True) segment_text_inputs.append(segment_text) segment_file = gr.File(label="Upload Custom Media (Image or Video)", type="filepath", interactive=True) segment_file_inputs.append(segment_file) generate_video_btn = gr.Button("Generate Video", variant="primary") # --- Final Video Output --- final_video_output = gr.Video(label="Generated Video", visible=False) # Initially hidden # --- Event Handlers --- # Generate Script Button Click generate_script_btn.click( fn=generate_script_and_show_editor, inputs=[ user_concept_input, resolution_radio, caption_enabled_radio, caption_color_picker, caption_size_slider, caption_position_radio, caption_bg_color_picker, caption_stroke_color_picker, caption_stroke_width_slider ], outputs=[ run_config_state, # Update run config state status_output, # Update status label editing_area, # Show/hide editing area column final_video_output, # Hide and clear video output # Outputs for dynamic components (visibility and value updates) *segment_text_inputs, *segment_file_inputs, *segment_editing_groups, segments_state, # Update segments state script_preview_markdown # Update raw script preview ] ) # Generate Video Button Click generate_video_btn.click( fn=generate_video_from_edited, inputs=[ run_config_state, # Pass run config segments_state, # Pass the original parsed segments data (needed for original_prompt and duration) *segment_text_inputs, # Pass list of edited text values *segment_file_inputs, # Pass list of uploaded file paths bg_music_volume_slider # Pass background music volume ], outputs=[status_output, final_video_output] # Yield status updates and final video ) # Add JS to update segment prompt Labels after script generation # This JS function reads the segments_state and updates the Labels demo.load( None, None, None, _js=f""" // Define the JS function function updateSegmentPromptLabels(segments_data) {{ console.log("updateSegmentPromptLabels called", segments_data); // Gradio stores dynamic component outputs in a flat list. // The prompt labels are the first Label component in each segment group. // Assuming the order is consistent: [Label_0, Textbox_0, File_0, Label_1, Textbox_1, File_1, ...] // We need to find the correct Label element for each segment index. // Find all elements that are potentially segment prompt labels const all_segment_labels = document.querySelectorAll('.segment_group_box > label.svelte-q5b6g8'); // Find Label elements within segment boxes if (!segments_data || segments_data.length === 0) {{ // Clear any existing labels if script generation failed or empty all_segment_labels.forEach(label => label.textContent = ''); return; }} for (let i = 0; i < {MAX_SEGMENTS_FOR_EDITING}; i++) {{ // Assuming the labels correspond directly to the group index const promptLabel = all_segment_labels[i]; // Get the i-th potential label if (promptLabel) {{ if (i < segments_data.length) {{ // Update label text with the original prompt promptLabel.textContent = `Segment ${i+1} (Prompt: ${segments_data[i].original_prompt})`; promptLabel.parentElement.style.display = 'block'; // Ensure parent box is visible (redundant if group visibility is set, but safe) }} else {{ // Hide label for unused segments promptLabel.textContent = ''; promptLabel.parentElement.style.display = 'none'; // Hide parent box }} }} else {{ console.warn(`Prompt label element not found for segment index ${i}`); }} }} }} """ ) # Trigger the JS function whenever segments_state changes segments_state.change( None, # No Python function to call segments_state, # The state variable that changed None, # No output components to update via Python _js=""" (segments_data) => { // Call the JS function defined in demo.load updateSegmentPromptLabels(segments_data); // Return the segments_data itself if needed for chaining, but here it's not. // This function just updates the UI client-side. return arguments[0]; // Return original arguments to avoid state getting cleared } """ ) # Launch the interface if __name__ == "__main__": # Attempt ImageMagick policy fix on script startup # This helps but might still require manual sudo depending on system config fix_imagemagick_policy() print("Launching Gradio interface...") # Check if API keys are still placeholders (unlikely with hardcoded keys, but good practice) if PEXELS_API_KEY.startswith('YOUR_PEXELS_API_KEY'): print("Warning: PEXELS_API_KEY is not configured. Media search may fail.") if OPENROUTER_API_KEY.startswith('YOUR_OPENROUTER_API_KEY'): print("Warning: OPENROUTER_API_KEY is not configured. Script generation will fail.") demo.launch(share=True) # Set share=True to get a public link