diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -1,270 +1,2107 @@ -# Import necessary libraries -from kokoro import KPipeline + + +from kokoro import KPipeline # Keep Kokoro separate as it's not from moviepy + import soundfile as sf import torch -import os + +from PIL import Image +import tempfile +import random +import cv2 +import math +import os, requests, io, time, re, random from moviepy.editor import ( - VideoFileClip, concatenate_videoclips, AudioFileClip, ImageClip, - CompositeVideoClip, TextClip + VideoFileClip, # Corrected typo here + concatenate_videoclips, + AudioFileClip, + ImageClip, + CompositeVideoClip, + TextClip, + CompositeAudioClip, + ColorClip # Included ColorClip in the main import ) -import moviepy.video.fx.all as vfx +import moviepy.video.fx.all as vfx # Keep this separate for fx effects import moviepy.config as mpy_config -from PIL import Image +from pydub import AudioSegment +from pydub.generators import Sine + import numpy as np -import requests -import gradio as gr -import tempfile -import random -import re -import shutil +from bs4 import BeautifulSoup +import base64 +from urllib.parse import quote +# pysrt is imported but not used in the provided code snippets, keeping for completeness +# import pysrt +from gtts import gTTS +import gradio as gr # Import Gradio +import shutil # Needed for temp folder cleanup +import subprocess # Needed for sudo commands in fix_imagemagick_policy + # Initialize Kokoro TTS pipeline (using American English) -pipeline = KPipeline(lang_code='a') # Use voice 'af_heart' for American English -# Ensure ImageMagick binary is set (adjust path as needed) -mpy_config.change_settings({"IMAGEMAGICK_BINARY": "/usr/bin/convert"}) +# Ensure you have the required voice models downloaded for Kokoro if needed, +# or it will fall back to gTTS. 'a' for American English uses voice 'af_heart'. +# Add a flag to check if Kokoro initialized successfully +kokoro_initialized = False +pipeline = None # Initialize pipeline to None +try: + # Check if the required voice model is available or if it needs downloading + # Depending on Kokoro version/setup, this might implicitly check/download + # If Kokoro initialization itself is problematic, this try/except will catch it + pipeline = KPipeline(lang_code='a') # 'a' is often mapped to 'af_heart' or similar US voice + kokoro_initialized = True + print("Kokoro TTS pipeline initialized successfully.") +except Exception as e: + print(f"Warning: Could not initialize Kokoro TTS pipeline: {e}. Will rely on gTTS.") + # pipeline remains None -# Global Configuration + +# Ensure ImageMagick binary is set (Adjust path as needed for your system) +# This line requires imagemagick to be installed and the path correct. +# If TextClip fails, check ImageMagick installation and policy.xml (handled by fix_imagemagick_policy). +# Common paths: "/usr/bin/convert", "/usr/local/bin/convert", "C:\\Program Files\\ImageMagick-X.Y.Z-Q16\\convert.exe" +# You might need to adjust this based on your OS and installation +IMAGICK_BINARY_DEFAULT_PATH = "/usr/bin/convert" # Default path, check your system +# Add more common paths to check +common_imagemagick_paths = [ + "/usr/bin/convert", + "/usr/local/bin/convert", + "/opt/homebrew/bin/convert", # Homebrew on macOS ARM + "/usr/local/opt/imagemagick/bin/convert", # Older Homebrew + "C:\\Program Files\\ImageMagick-X.Y.Z-Q16\\convert.exe", # Windows example, adjust version + # Add other paths as needed for your environment +] + +found_imagemagick_binary = None +for path in common_imagemagick_paths: + # Check if path is not None or empty before checking existence + if path and os.path.exists(path): + found_imagemagick_binary = path + break + +if found_imagemagick_binary: + print(f"Found ImageMagick binary at: {found_imagemagick_binary}") + mpy_config.change_settings({"IMAGEMAGICK_BINARY": found_imagemagick_binary}) +else: + print("Warning: ImageMagick binary 'convert' not found in common locations.") + print("TextClip may fail. Please install ImageMagick or update the IMAGICK_BINARY setting if it's installed elsewhere.") + # Still try to set a default path, though it might be wrong + mpy_config.change_settings({"IMAGEMAGICK_BINARY": IMAGICK_BINARY_DEFAULT_PATH}) + + +# ---------------- Global Configuration ---------------- # +# Using the user's provided API keys PEXELS_API_KEY = 'BhJqbcdm9Vi90KqzXKAhnEHGsuFNv4irXuOjWtT761U49lRzo03qBGna' OPENROUTER_API_KEY = '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' -OPENROUTER_MODEL = "mistralai/mistral-small-3.1-24b-instruct:free" +OPENROUTER_MODEL = "mistralai/mistral-small-3.1-24b-instruct:free" # Or another preferred model OUTPUT_VIDEO_FILENAME = "final_video.mp4" +USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" + +# Maximum number of script segments to display for editing +MAX_SEGMENTS_FOR_EDITING = 15 # Limit for UI manageability -# Global variables +# Global placeholder for the temporary folder, will be created per run TEMP_FOLDER = None -# Helper Functions -def generate_script(user_input): +# ---------------- Helper Functions ---------------- # +# Most helper functions remain the same, but some might need minor adjustments +# to use passed parameters instead of global state (e.g., resolution). +# We'll update create_clip and add_background_music accordingly. + +def generate_script(user_input, api_key, model): + """Generate documentary script with proper OpenRouter handling.""" + if not api_key or api_key.startswith('YOUR_OPENROUTER_API_KEY'): + print("OpenRouter API key not set or is placeholder. Skipping script generation.") + return "[Error] API key not configured." + headers = { - 'Authorization': f'Bearer {OPENROUTER_API_KEY}', - 'HTTP-Referer': 'https://your-domain.com', - 'X-Title': 'AI Video Generator' + 'Authorization': f'Bearer {api_key}', + 'HTTP-Referer': 'https://your-domain.com', # Replace with your app's domain + 'X-Title': 'AI Documentary Maker' } - prompt = f"Generate a short, creative script based on: {user_input}. Format each segment with a prompt in square brackets followed by narrative text." + + # Updated prompt based on user's specific requirements for tone and style + prompt = f"""Short Documentary Script Generator + +Instructions: +If I say "use this," output the script exactly as given. +If I only give topics, generate a script based on them. +If I provide a full script, rewrite it without any changes. +Make everything short, simple, humorous, funny, and act serious but be humorous. +Don't say anything off topic. +Always end with a funny call to subscribe based on the video topic. +Use normal conversational text, avoid AI phrases, humanize the statements. +Always focus on negative or cynical views, avoid positive statements. +The full script should be for a single video about one topic. + +Formatting Rules: +Title in Square Brackets: Each section starts with a one-word title inside [ ] (max two words if necessary). This title will be used as a search term for Pexels footage. +Casual & Funny Narration: Each section has 5-15 words of narration. Keep it natural, funny, and unpredictable. +No Special Formatting: No bold, italics, or special characters. +Generalized Search Terms: If a term is too specific, make it more general for Pexels search. +Scene-Specific Writing: Each section describes only what should be shown in the video. +Output Only the Script, make it funny/humorous/hilarious, and add a funny subscribe statement. +No extra text, just the script. + +Example Output: +[Cats] +They plot world domination while napping. +[Dogs] +Loyalty is just a bribe for snacks. +[Humans] +The only species that pays to live on a planet they destroy. +[Future] +It looks suspiciously like the present, but with more screens. +[Warning] +Subscribe or a cat will steal your bandwidth. + +Now here is the Topic/script: {user_input} +""" + + data = { - 'model': OPENROUTER_MODEL, + 'model': model, 'messages': [{'role': 'user', 'content': prompt}], - 'temperature': 0.7, - 'max_tokens': 1000 + 'temperature': 0.7, # Increased temperature slightly for more unpredictable humor + 'max_tokens': 500 # Limit token response to keep scripts short } + try: response = requests.post( 'https://openrouter.ai/api/v1/chat/completions', headers=headers, json=data, - timeout=30 + timeout=45 # Increased timeout ) - response.raise_for_status() - return response.json()['choices'][0]['message']['content'] + + response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) + + response_data = response.json() + if 'choices' in response_data and len(response_data['choices']) > 0: + script_text = response_data['choices'][0]['message']['content'] + # Basic post-processing to remove potential markdown code blocks + if script_text.startswith("```") and script_text.endswith("```"): + # Find the first and last ``` lines + first_code_block = script_text.find("```") + last_code_block = script_text.rfind("```") + if first_code_block != -1 and last_code_block != -1 and first_code_block < last_code_block: + # Extract content between the markers, removing the language specifier line if present + content_start = script_text.find('\n', first_code_block) + 1 + content_end = last_code_block + script_text = script_text[content_start:content_end].strip() + else: # Simple case, remove from start and end + script_text = script_text.strip("` \n") + + return script_text + else: + print("Unexpected response format:", response_data) + return "[Error] Unexpected API response format." + + except requests.exceptions.RequestException as e: + print(f"API Request failed: {str(e)}") + return f"[Error] API request failed: {str(e)}" except Exception as e: - print(f"Script generation error: {e}") - return "Error generating script." + print(f"An unexpected error occurred during script generation: {e}") + return f"[Error] An unexpected error occurred: {str(e)}" + def parse_script(script_text): + """ + Parse the generated script into a list of segment dictionaries. + Each dictionary includes original prompt, narration text, estimated duration, and placeholder for uploaded media. + Handles potential API errors returned as strings. + """ + if script_text.startswith("[Error]"): + print(f"Skipping parse due to script generation error: {script_text}") + return [] + segments = [] + current_title = None + current_text = "" + try: - for line in script_text.splitlines(): + lines = script_text.strip().splitlines() + if not lines: + print("Script text is empty.") + return [] + + for line in lines: line = line.strip() if line.startswith("[") and "]" in line: - prompt = line[line.find("[")+1:line.find("]")] - text = line[line.find("]")+1:].strip() - if prompt and text: - duration = max(3, len(text.split()) * 0.15) # Estimate duration - segments.append({"prompt": prompt, "text": text, "duration": duration}) + bracket_start = line.find("[") + bracket_end = line.find("]", bracket_start) # Corrected line here + if bracket_start != -1 and bracket_end != -1: + # Add previous segment if title and text are found + if current_title is not None and current_text.strip(): + # Estimate duration based on word count (adjust factor as needed) + duration = max(2.0, len(current_text.split()) * 0.4) # Minimum 2s, approx 0.4s per word + segments.append({ + "original_prompt": current_title.strip(), + "text": current_text.strip(), + "duration": duration, + "uploaded_media": None # Placeholder for user uploaded file path + }) + current_title = line[bracket_start+1:bracket_end].strip() + current_text = line[bracket_end+1:].strip() + elif current_title: # Append text if no new title found but currently parsing a segment + current_text += line + " " + elif current_title: # Append text to the current segment + current_text += line + " " + # Ignore lines before the first [Title] + + # Add the last segment + if current_title is not None and current_text.strip(): + duration = max(2.0, len(current_text.split()) * 0.4) + segments.append({ + "original_prompt": current_title.strip(), + "text": current_text.strip(), + "duration": duration, + "uploaded_media": None + }) + + # Limit segments to MAX_SEGMENTS_FOR_EDITING + if len(segments) > MAX_SEGMENTS_FOR_EDITING: + print(f"Warning: Script generated {len(segments)} segments, limiting to {MAX_SEGMENTS_FOR_EDITING} for editing.") + segments = segments[:MAX_SEGMENTS_FOR_EDITING] + + print(f"Parsed {len(segments)} segments.") return segments except Exception as e: print(f"Error parsing script: {e}") return [] -def generate_media(prompt, temp_folder, user_upload=None): - safe_prompt = re.sub(r'[^\w\s-]', '', prompt).strip().replace(' ', '_') - if user_upload: - file_ext = os.path.splitext(user_upload)[1].lower() - dest_path = os.path.join(temp_folder, f"{safe_prompt}{file_ext}") - shutil.copy(user_upload, dest_path) - return {"path": dest_path, "type": "video" if file_ext in ['.mp4', '.avi'] else "image"} - # Fallback to Pexels if no upload - image_path = os.path.join(temp_folder, f"{safe_prompt}.jpg") +# Pexels and Google Image search and download functions remain unchanged +# Using the global PEXELS_API_KEY directly now. +def search_pexels_videos(query): + """Search for a video on Pexels by query and return a random HD video.""" + if not PEXELS_API_KEY or PEXELS_API_KEY.startswith('YOUR_PEXELS_API_KEY'): + print("Pexels API key not set or is placeholder. Skipping video search.") + return None headers = {'Authorization': PEXELS_API_KEY} - response = requests.get(f"https://api.pexels.com/v1/search?query={prompt}&per_page=1", headers=headers) - if response.status_code == 200 and response.json().get('photos'): - img_url = response.json()['photos'][0]['src']['medium'] - img_data = requests.get(img_url).content - with open(image_path, 'wb') as f: - f.write(img_data) - return {"path": image_path, "type": "image"} + base_url = "https://api.pexels.com/videos/search" + num_pages = 3 + videos_per_page = 15 + max_retries = 2 # Reduced retries for faster failure + retry_delay = 1 + + search_query = query + all_videos = [] + + for page in range(1, num_pages + 1): + for attempt in range(max_retries): + try: + params = {"query": search_query, "per_page": videos_per_page, "page": page, "orientation": "landscape"} # Added orientation + response = requests.get(base_url, headers=headers, params=params, timeout=10) + + if response.status_code == 200: + data = response.json() + videos = data.get("videos", []) + + # Filter for HD videos first, then fallback to other qualities + hd_videos_on_page = [] + other_videos_on_page = [] + for video in videos: + video_files = video.get("video_files", []) + # Sort video files by quality preference if possible + video_files_sorted = sorted(video_files, key=lambda x: {'hd': 0, 'sd': 1}.get(x.get('quality'), 2)) + + for file in video_files_sorted: + link = file.get("link") + quality = file.get("quality") + if link: + if quality == "hd": + hd_videos_on_page.append(link) + break # Found the best quality for this video entry + else: + other_videos_on_page.append(link) + # Don't break, keep looking for HD for this video entry + + all_videos.extend(hd_videos_on_page) # Add HD videos found + if not hd_videos_on_page: # If no HD found on this page, add other videos found on this page + all_videos.extend(other_videos_on_page) + + if not videos: + print(f"No videos found on page {page} for query '{query}'.") + break # No videos on this page or subsequent ones + + + break # Success for this page attempt + + elif response.status_code == 429: + print(f"Pexels rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay}s for query '{query}'...") + time.sleep(retry_delay) + retry_delay *= 2 + else: + print(f"Pexels video search error {response.status_code}: {response.text} for query '{query}'") + break # Non-recoverable error or too many retries + + except requests.exceptions.RequestException as e: + print(f"Pexels video request exception (attempt {attempt+1}/{max_retries}) for query '{query}': {e}") + if attempt < max_retries - 1: + time.sleep(retry_delay) + retry_delay *= 2 + else: + break # Too many retries + + # Stop searching if no videos were found on the last page check + if not videos and page > 1: + print(f"Stopping Pexels video search for '{query}' as no videos were found on page {page}.") + break + + + if all_videos: + # Prioritize picking an HD video if any were collected + hd_options = [link for link in all_videos if 'hd' in link.lower()] # Simple check, might not be perfect + if hd_options: + random_video = random.choice(hd_options) + print(f"Selected random HD video from {len(hd_options)} options for query '{query}'.") + else: + # If no HD options, pick from the entire list (which includes SD and potentially others) + random_video = random.choice(all_videos) + print(f"Selected random video (likely SD or other quality) from {len(all_videos)} options for query '{query}' (no HD found).") + return random_video + else: + print(f"No suitable videos found after searching all pages for query '{query}'.") + return None + + +def search_pexels_images(query): + """Search for an image on Pexels by query.""" + if not PEXELS_API_KEY or PEXELS_API_KEY.startswith('YOUR_PEXELS_API_KEY'): + print("Pexels API key not set or is placeholder. Skipping image search.") + return None + headers = {'Authorization': PEXELS_API_KEY} + url = "https://api.pexels.com/v1/search" + params = {"query": query, "per_page": 15, "orientation": "landscape"} # Increased per_page + + max_retries = 2 + retry_delay = 1 + + for attempt in range(max_retries): + try: + response = requests.get(url, headers=headers, params=params, timeout=10) + + if response.status_code == 200: + data = response.json() + photos = data.get("photos", []) + if photos: + # Choose from the top results + photo = random.choice(photos[:min(10, len(photos))]) + img_url = photo.get("src", {}).get("original") + print(f"Found {len(photos)} images on Pexels for query '{query}', selected one.") + return img_url + else: + print(f"No images found for query: {query} on Pexels.") + return None + + elif response.status_code == 429: + print(f"Pexels rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay}s for query '{query}'...") + time.sleep(retry_delay) + retry_delay *= 2 + else: + print(f"Pexels image search error {response.status_code}: {response.text} for query '{query}'") + break # Non-recoverable error or too many retries + + except requests.exceptions.RequestException as e: + print(f"Pexels image request exception (attempt {attempt+1}/{max_retries}) for query '{query}': {e}") + if attempt < max_retries - 1: + time.sleep(retry_delay) + retry_delay *= 2 + else: + break # Too many retries + + print(f"No Pexels images found for query: {query} after all attempts.") return None -def generate_tts(text, temp_folder): - safe_text = re.sub(r'[^\w\s-]', '', text[:10]).strip().replace(' ', '_') - file_path = os.path.join(temp_folder, f"tts_{safe_text}.wav") +def search_google_images(query): + """Search for images on Google Images (fallback/news)""" try: - generator = pipeline(text, voice='af_heart', speed=0.9) - audio_segments = [audio for _, _, audio in generator] - full_audio = np.concatenate(audio_segments) if audio_segments else audio_segments[0] - sf.write(file_path, full_audio, 24000) - return file_path + # Using a simple text search method; dedicated Google Image Search APIs are better but may require setup. + # This is prone to breaking if Google changes its HTML structure. + search_url = f"https://www.google.com/search?q={quote(query)}&tbm=isch" + headers = {"User-Agent": USER_AGENT} + print(f"Searching Google Images for: {query}") + response = requests.get(search_url, headers=headers, timeout=15) + response.raise_for_status() + soup = BeautifulSoup(response.text, "html.parser") + + # Find img tags, look for src attributes + # This is a very fragile parsing method, might need adjustment + img_tags = soup.find_all("img") + image_urls = [] + # Look for src attributes that start with http and aren't data URIs or specific gstatic patterns + # This is a heuristic and might grab incorrect URLs + for img in img_tags: + src = img.get("src", "") + if src.startswith("http") and "encrypted" not in src and "base64" not in src: # Basic filtering + image_urls.append(src) + elif img.get("data-src", "").startswith("http"): # Some sites use data-src + image_urls.append(img.get("data-src", "")) + + + # Filter out potential tiny icons or invalid URLs + valid_image_urls = [url for url in image_urls if url and "gstatic" not in url and url.split('.')[-1].lower() in ['jpg', 'jpeg', 'png', 'gif', 'bmp']] + + if valid_image_urls: + print(f"Found {len(valid_image_urls)} potential Google Images for query '{query}', picking one.") + return random.choice(valid_image_urls[:min(10, len(valid_image_urls))]) + else: + print(f"No valid Google Images found for query: {query}") + return None except Exception as e: - print(f"TTS error: {e}") + print(f"Error in Google Images search for query '{query}': {e}") + return None + + +def download_image(image_url, filename): + """Download an image from a URL to a local file with enhanced error handling.""" + if not image_url: + print("No image URL provided for download.") return None -def resize_to_fill(clip, target_resolution): - w, h = target_resolution - clip_w, clip_h = clip.size - scale = max(w / clip_w, h / clip_h) - new_w, new_h = int(clip_w * scale), int(clip_h * scale) - clip = clip.resize((new_w, new_h)) - return clip.crop(x_center=new_w/2, y_center=new_h/2, width=w, height=h) - -def create_clip(media_info, tts_path, duration, customizations, target_resolution): try: - audio = AudioFileClip(tts_path).audio_fadeout(0.2) - if media_info['type'] == "video": - clip = VideoFileClip(media_info['path']) - clip = resize_to_fill(clip, target_resolution) - clip = clip.loop(duration=duration) if clip.duration < duration else clip.subclip(0, duration) + headers = {"User-Agent": USER_AGENT} + # print(f"Attempting to download image from: {image_url}") # Keep less noisy + response = requests.get(image_url, headers=headers, stream=True, timeout=20) # Increased timeout + response.raise_for_status() + + # Check content type before saving + content_type = response.headers.get('Content-Type', '') + if not content_type.startswith('image/'): + print(f"URL did not return an image Content-Type ({content_type}). Skipping download.") + return None + + # Ensure the directory exists + os.makedirs(os.path.dirname(filename), exist_ok=True) + + with open(filename, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + + # print(f"Potential image downloaded to: {filename}") # Keep less noisy + + # Validate and process the image + try: + img = Image.open(filename) + img.verify() # Verify it's an image file + img = Image.open(filename) # Re-open after verify + if img.mode != 'RGB': + # print("Converting image to RGB") # Keep less noisy + img = img.convert('RGB') + img.save(filename) + # print(f"Image validated and converted to RGB: {filename}") # Keep less noisy + return filename + except Exception as e_validate: + print(f"Downloaded file is not a valid image or processing failed for {filename}: {e_validate}") + if os.path.exists(filename): + os.remove(filename) # Clean up invalid file + return None + + except requests.exceptions.RequestException as e_download: + print(f"Image download error for {image_url}: {e_download}") + if os.path.exists(filename): + os.remove(filename) # Clean up partially downloaded file + return None + except Exception as e_general: + print(f"General error during image download/processing for {filename}: {e_general}") + if os.path.exists(filename): + os.remove(filename) # Clean up if needed + return None + + +def download_video(video_url, filename): + """Download a video from a URL to a local file.""" + if not video_url: + print("No video URL provided for download.") + return None + try: + headers = {"User-Agent": USER_AGENT} # Some sites block direct downloads + print(f"Attempting to download video from: {video_url}") + response = requests.get(video_url, stream=True, timeout=45) # Increased timeout for videos + response.raise_for_status() + + # Check content type + content_type = response.headers.get('Content-Type', '') + if not content_type.startswith('video/'): + print(f"URL did not return a video Content-Type ({content_type}). Skipping download.") + return None + + os.makedirs(os.path.dirname(filename), exist_ok=True) + + # Use smaller chunk size for potentially large files + chunk_size = 4096 + downloaded_size = 0 + total_size = int(response.headers.get('content-length', 0)) + + with open(filename, 'wb') as f: + for chunk in response.iter_content(chunk_size=chunk_size): + f.write(chunk) + downloaded_size += len(chunk) + # Optional: Add progress updates if needed, but noisy for console + + print(f"Video downloaded successfully to: {filename} ({downloaded_size} bytes)") + # Basic check if the file seems valid (not just 0 bytes) + if os.path.exists(filename) and os.path.getsize(filename) > 1024: # Check for > 1KB + return filename else: - clip = ImageClip(media_info['path'], duration=duration).resize(target_resolution) - clip = clip.fx(vfx.fadein, 0.5).fx(vfx.fadeout, 0.5) - if customizations['show_captions']: - txt = TextClip( - customizations['text'], - fontsize=45, - color=customizations['text_color'], - bg_color=customizations['bg_color'], - size=(target_resolution[0] * 0.8, None), - method='caption' - ).set_position(('center', 'bottom')).set_duration(duration) - clip = CompositeVideoClip([clip, txt]) - return clip.set_audio(audio) + print(f"Downloaded video file {filename} is too small or empty ({os.path.getsize(filename)} bytes).") + if os.path.exists(filename): + os.remove(filename) + return None + + except requests.exceptions.RequestException as e: + print(f"Video download error for {video_url}: {e}") + if os.path.exists(filename): + os.remove(filename) + return None + except Exception as e_general: + print(f"General error during video download for {filename}: {e_general}") + if os.path.exists(filename): + os.remove(filename) + return None + + +def generate_media_asset(prompt, uploaded_media_path): + """ + Generate a visual asset (video or image). Prioritizes user upload, + then searches Pexels video, then Pexels image, then Google Image. + Returns a dict: {'path': , 'asset_type': 'video' or 'image'}. + Ensures the returned path is within the TEMP_FOLDER. + """ + safe_prompt = re.sub(r'[^\w\s-]', '', prompt).strip().replace(' ', '_') + if not TEMP_FOLDER: + print("Error: TEMP_FOLDER not set for generate_media_asset.") + return None + + os.makedirs(TEMP_FOLDER, exist_ok=True) # Ensure temp folder exists + + # 1. Use user uploaded media if provided + if uploaded_media_path and os.path.exists(uploaded_media_path): + print(f"Using user uploaded media: {uploaded_media_path}") + file_ext = os.path.splitext(uploaded_media_path)[1].lower() + asset_type = 'video' if file_ext in ['.mp4', '.mov', '.avi', '.webm', '.mkv'] else 'image' + # Copy the user file to temp folder to manage cleanup + temp_user_path = os.path.join(TEMP_FOLDER, f"user_upload_{os.path.basename(uploaded_media_path)}") + try: + # Use copy2 to preserve metadata like modification time + shutil.copy2(uploaded_media_path, temp_user_path) + print(f"Copied user upload to temp: {temp_user_path}") + return {"path": temp_user_path, "asset_type": asset_type} + # Handle case where source and destination might be the same (e.g., user uploads from temp) + except shutil.SameFileError: + print(f"User upload is already in temp folder: {uploaded_media_path}") + return {"path": uploaded_media_path, "asset_type": asset_type} + except Exception as e: + print(f"Error copying user file {uploaded_media_path}: {e}. Falling back to search.") + + + # 2. Search Pexels Videos (Increased chance) + # Let's slightly increase video search preference when available + if random.random() < 0.4: # Increase video search chance + video_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}_video.mp4") + print(f"Attempting Pexels video search for: '{prompt}'") + video_url = search_pexels_videos(prompt) # Use global API key + if video_url: + downloaded_video = download_video(video_url, video_file) + if downloaded_video: + print(f"Pexels video asset saved to {downloaded_video}") + return {"path": downloaded_video, "asset_type": "video"} + else: + print(f"Pexels video search failed or found no video for: '{prompt}'") + + # 3. Search Pexels Images + image_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}.jpg") + print(f"Attempting Pexels image search for: '{prompt}'") + image_url = search_pexels_images(prompt) # Use global API key + if image_url: + downloaded_image = download_image(image_url, image_file) + if downloaded_image: + print(f"Pexels image asset saved to {downloaded_image}") + return {"path": downloaded_image, "asset_type": "image"} + else: + print(f"Pexels image search failed or found no image for: '{prompt}'") + + # 4. Fallback: Search Google Images (especially useful for news/specific things Pexels might not have) + print(f"Attempting Google Images fallback for: '{prompt}'") + google_image_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}_google.jpg") + google_image_url = search_google_images(prompt) + if google_image_url: + downloaded_google_image = download_image(google_image_url, google_image_file) + if downloaded_google_image: + print(f"Google Image asset saved to {downloaded_google_image}") + return {"path": downloaded_google_image, "asset_type": "image"} + else: + print(f"Google Images fallback failed for: '{prompt}'") + + + # 5. Final Fallback: Generic Images if specific search failed + fallback_terms = ["nature", "city", "abstract", "background"] # More generic fallbacks + for term in fallback_terms: + print(f"Trying generic fallback image search with term: '{term}'") + fallback_file = os.path.join(TEMP_FOLDER, f"fallback_{term}.jpg") + fallback_url = search_pexels_images(term) # Use Pexels for fallbacks, global API key + if fallback_url: + downloaded_fallback = download_image(fallback_url, fallback_file) + if downloaded_fallback: + print(f"Generic fallback image saved to {downloaded_fallback}") + return {"path": downloaded_fallback, "asset_type": "image"} + else: + print(f"Generic fallback image download failed for term: '{term}'") + else: + print(f"Generic fallback image search failed for term: '{term}'") + + + print(f"Failed to generate any visual asset for prompt: '{prompt}' after all attempts.") + return None + +def generate_silent_audio(duration, sample_rate=24000): + """Generate a silent WAV audio file lasting 'duration' seconds.""" + print(f"Generating {duration:.2f}s of silent audio.") + num_samples = int(duration * sample_rate) + silence = np.zeros(num_samples, dtype=np.float32) + # Use unique filename to avoid conflicts + # Ensure TEMP_FOLDER exists before generating path + if not TEMP_FOLDER: + print("Error: TEMP_FOLDER not set for generate_silent_audio.") + return None + os.makedirs(TEMP_FOLDER, exist_ok=True) + + silent_path = os.path.join(TEMP_FOLDER, f"silent_{abs(hash(duration)) % (10**8)}_{int(time.time())}.wav") + try: + sf.write(silent_path, silence, sample_rate) + print(f"Silent audio generated: {silent_path}") + return silent_path except Exception as e: - print(f"Clip creation error: {e}") + print(f"Error generating silent audio to {silent_path}: {e}") + return None + + +def generate_tts(text, voice='en'): + """ + Generate TTS audio using Kokoro, falling back to gTTS or silent audio if needed. + Ensures temp folder exists. + """ + if not text or not text.strip(): + print("TTS text is empty. Generating silent audio.") + return generate_silent_audio(duration=2.0) # Default silence for empty text + + if not TEMP_FOLDER: + print("Error: TEMP_FOLDER not set for generate_tts.") + return generate_silent_audio(duration=max(2.0, len(text.split()) * 0.4)) + + os.makedirs(TEMP_FOLDER, exist_ok=True) # Ensure temp folder exists + safe_text_hash = str(abs(hash(text)) % (10**10)) # Use a hash for potentially long text + file_path = os.path.join(TEMP_FOLDER, f"tts_{safe_text_hash}.wav") + + if os.path.exists(file_path): + # print(f"Using cached TTS for text hash '{safe_text_hash}'") # Keep less noisy + return file_path + + # Estimate duration based on word count (adjust factor as needed), used if TTS fails + target_duration_fallback = max(2.0, len(text.split()) * 0.4) + + # Use the global kokoro_initialized flag + if kokoro_initialized and pipeline: + try: + print(f"Attempting Kokoro TTS for text: '{text[:50]}...'") + kokoro_voice = 'af_heart' if voice == 'en' else voice # Kokoro default American English voice + # Kokoro pipeline might return multiple segments for long text + generator = pipeline(text, voice=kokoro_voice, speed=1.0, split_pattern=r'\n+') # Use speed 1.0 + audio_segments = [] + total_kokoro_duration = 0 # Track actual generated audio duration + + # Some text might result in many small segments, let's limit total time spent on Kokoro + max_kokoro_total_time = 60 # seconds + + start_time = time.time() # Start time for total timeout check + + for i, (gs, ps, audio) in enumerate(generator): + if time.time() - start_time > max_kokoro_total_time: + print(f"Kokoro TTS total time exceeded {max_kokoro_total_time}s.") + break # Exit loop on total timeout + + audio_segments.append(audio) + segment_duration = len(audio) / 24000.0 # Assuming 24000 Hz sample rate + total_kokoro_duration += segment_duration + + if audio_segments: + full_audio = np.concatenate(audio_segments) if len(audio_segments) > 1 else audio_segments[0] + # Calculate actual duration + total_kokoro_duration = len(full_audio) / 24000.0 # Assuming 24000 Hz sample rate + sf.write(file_path, full_audio, 24000) # Use 24000Hz standard + # print(f"TTS audio saved to {file_path} (Kokoro, {total_kokoro_duration:.2f}s)") # Keep less noisy + return file_path + else: + print("Kokoro pipeline returned no audio segments.") + + except Exception as e: + print(f"Error with Kokoro TTS: {e}") + # Continue to gTTS fallback + + try: + print(f"Falling back to gTTS for text: '{text[:50]}...'") + tts = gTTS(text=text, lang='en', slow=False) # Use standard speed + mp3_path = os.path.join(TEMP_FOLDER, f"tts_{safe_text_hash}.mp3") + tts.save(mp3_path) + audio = AudioSegment.from_mp3(mp3_path) + audio.export(file_path, format="wav") + if os.path.exists(mp3_path): + os.remove(mp3_path) # Clean up intermediate mp3 + # print(f"Fallback TTS saved to {file_path} (gTTS, {audio.duration_seconds:.2f}s)") # Keep less noisy + return file_path + except Exception as fallback_error: + print(f"Both TTS methods failed for text: '{text[:50]}...'. Error: {fallback_error}") + # Use the estimated duration for silent audio + print(f"Generating silent audio of estimated duration {target_duration_fallback:.2f}s.") + return generate_silent_audio(duration=target_duration_fallback) + +def apply_kenburns_effect(clip, target_resolution, effect_type=None): + """Apply a smooth Ken Burns effect with a single movement pattern.""" + target_w, target_h = target_resolution + clip_aspect = clip.w / clip.h + target_aspect = target_w / target_h + + # Resize clip to fill target resolution while maintaining aspect ratio, then scale up + # This ensures the image covers the whole frame even after scaling and panning + if clip_aspect > target_aspect: + # Wider than target: match height, scale width + clip = clip.resize(height=target_h) + else: + # Taller than target: match width, scale height + clip = clip.resize(width=target_w) + + # Now scale the resized clip up for the Ken Burns movement margin + initial_w, initial_h = clip.size + scale_factor = 1.15 # Scale up by 15% + new_width = int(initial_w * scale_factor) + new_height = int(initial_h * scale_factor) + clip = clip.resize(newsize=(new_width, new_height)) + + max_offset_x = new_width - target_w + max_offset_y = new_height - target_h + + available_effects = ["zoom-in", "zoom-out", "pan-left", "pan-right", "pan-up", "pan-down", "up-left", "down-right"] + if effect_type is None or effect_type == "random": + effect_type = random.choice(available_effects) + + # Define start and end positions of the top-left corner of the target_resolution window + start_x, start_y = 0, 0 + end_x, end_y = 0, 0 + start_zoom_relative = 1.0 # Relative to target_resolution size + end_zoom_relative = 1.0 + + # Set start/end positions and zoom based on effect type. + # Positions are top-left corner of the target frame within the scaled image coordinates (new_width, new_height). + if effect_type == "zoom-in": + start_zoom_relative = 1.0 # Start covering target_resolution size + end_zoom_relative = scale_factor # End covering target_resolution / scale_factor size (zoomed in) + # Stay centered in the *scaled* image + start_x = max_offset_x / 2 + start_y = max_offset_y / 2 + end_x = max_offset_x / 2 + end_y = max_offset_y / 2 + + elif effect_type == "zoom-out": + start_zoom_relative = scale_factor # Start zoomed in + end_zoom_relative = 1.0 # End at target_resolution size + # Stay centered in the *scaled* image + start_x = max_offset_x / 2 + start_y = max_offset_y / 2 + end_x = max_offset_x / 2 + end_y = max_offset_y / 2 + + # For pan effects, the crop size is constant (target_resolution, which corresponds to zoom_relative=1.0) + elif effect_type == "pan-left": + start_x = max_offset_x + start_y = max_offset_y / 2 + end_x = 0 + end_y = max_offset_y / 2 + elif effect_type == "pan-right": + start_x = 0 + start_y = max_offset_y / 2 + end_x = max_offset_x + end_y = max_offset_y / 2 + elif effect_type == "pan-up": + start_x = max_offset_x / 2 + start_y = max_offset_y + end_x = max_offset_x / 2 + end_y = 0 + elif effect_type == "pan-down": + start_x = max_offset_x / 2 + start_y = 0 + end_x = max_offset_x / 2 + end_y = max_offset_y + elif effect_type == "up-left": + start_x = max_offset_x + start_y = max_offset_y + end_x = 0 + end_y = 0 + elif effect_type == "down-right": + start_x = 0 + start_y = 0 + end_x = max_offset_x + end_y = max_offset_y + else: + # Default to pan-right if type is random but somehow invalid (shouldn't happen with random.choice) + effect_type = 'pan-right' + start_x = 0 + start_y = max_offset_y / 2 + end_x = max_offset_x + end_y = max_offset_y / 2 + print(f"Warning: Unexpected effect type '{effect_type}'. Defaulting to 'pan-right'.") + + + def transform_frame(get_frame, t): + frame = get_frame(t) + # Use a smooth ease-in/ease-out function + progress = t / clip.duration if clip.duration > 0 else 0 + eased_progress = 0.5 - 0.5 * math.cos(math.pi * progress) # Cosine easing + + # Interpolate zoom relative to target_resolution + current_zoom_relative = start_zoom_relative + (end_zoom_relative - start_zoom_relative) * eased_progress + + # Calculate crop size based on current zoom relative to target resolution + # If zoom_relative is 1, crop size is target_resolution. If zoom_relative is scale_factor, crop size is target_resolution/scale_factor + crop_w = int(target_w / current_zoom_relative) + crop_h = int(target_h / current_zoom_relative) + + # Interpolate position (top-left corner of the target frame within the scaled image) + current_x = start_x + (end_x - start_x) * eased_progress + current_y = start_y + (end_y - start_y) * eased_progress + + # Calculate the center point for cv2.getRectSubPix + center_x = current_x + crop_w / 2 + center_y = current_y + crop_h / 2 + + # Ensure center stays within the bounds of the scaled image (new_width, new_height) + center_x = max(crop_w / 2.0, min(center_x, new_width - crop_w / 2.0)) # Use float division + center_y = max(crop_h / 2.0, min(center_y, new_height - crop_h / 2.0)) + + + try: + # Perform the crop using cv2.getRectSubPix (expects floating point center) + # Ensure frame is a numpy array (moviepy returns numpy arrays) + # Clamp center coordinates just in case, although max/min should handle it + center_x = np.clip(center_x, 0, new_width) + center_y = np.clip(center_y, 0, new_height) + + # Ensure crop dimensions are positive integers + crop_w = max(1, crop_w) + crop_h = max(1, crop_h) + + # Handle cases where crop dimensions might exceed frame dimensions (shouldn't happen with correct logic) + crop_w = min(crop_w, frame.shape[1]) + crop_h = min(crop_h, frame.shape[0]) + + # Ensure crop size is not zero or negative + if crop_w <= 0 or crop_h <= 0: + print(f"Warning: Calculated crop size is non-positive ({crop_w}, {crop_h}) at t={t:.2f}s. Skipping crop/resize.") + return np.zeros((target_h, target_w, 3), dtype=np.uint8) # Return black frame + + + cropped_frame = cv2.getRectSubPix(frame, (crop_w, crop_h), (center_x, center_y)) + # Resize the cropped frame back to the target resolution + resized_frame = cv2.resize(cropped_frame, (target_w, target_h), interpolation=cv2.INTER_LANCZOS4) + + return resized_frame + except Exception as e: + # Log details helpful for debugging Ken Burns issues + frame_shape_info = frame.shape if frame is not None else 'None' + print(f"Error applying Ken Burns transform at t={t:.2f}s: {e}. Frame shape: {frame_shape_info}, Scaled Size: ({new_width}, {new_height}), Center: ({center_x:.2f}, {center_y:.2f}), Crop Size: ({crop_w}, {crop_h}), Target Size: ({target_w}, {target_h})") + # Return a black frame or placeholder in case of error + return np.zeros((target_h, target_w, 3), dtype=np.uint8) + + + # Apply the transformation function + return clip.fl(transform_frame) + + +def resize_to_fill(clip, target_resolution): + """Resize and crop a clip to fill the target resolution while maintaining aspect ratio.""" + target_w, target_h = target_resolution + clip_aspect = clip.w / clip.h + target_aspect = target_w / target_h + + # print(f"Resizing clip {clip.size} to fill target {target_resolution}") + + if clip_aspect > target_aspect: # Clip is wider than target + clip = clip.resize(height=target_h) + # Calculate crop amount to make width match target_w + crop_amount_x = max(0.0, (clip.w - target_w) / 2.0) # Use float division + # Ensure crop coordinates are integers + x1 = int(crop_amount_x) + x2 = int(clip.w - crop_amount_x) + # Handle potential edge cases with integer rounding + x2 = max(x1 + 1, x2) # Ensure at least 1 pixel width if needed + # Ensure crop region is within bounds + x1 = max(0, x1) + x2 = min(clip.w, x2) + + clip = clip.crop(x1=x1, x2=x2, y1=0, y2=clip.h) + else: # Clip is taller than target or same aspect + clip = clip.resize(width=target_w) + # Calculate crop amount to make height match target_h + crop_amount_y = max(0.0, (clip.h - target_h) / 2.0) # Use float division + # Ensure crop coordinates are integers + y1 = int(crop_amount_y) + y2 = int(clip.h - crop_amount_y) + # Handle potential edge cases with integer rounding + y2 = max(y1 + 1, y2) # Ensure at least 1 pixel height if needed + # Ensure crop region is within bounds + y1 = max(0, y1) + y2 = min(clip.h, y2) + + clip = clip.crop(x1=0, x2=clip.w, y1=y1, y2=y2) + + # Final check and resize if dimensions are slightly off due to rounding + if clip.size != target_resolution: + print(f"Warning: Clip size {clip.size} after resize_to_fill does not match target {target_resolution}. Resizing again.") + clip = clip.resize(newsize=target_resolution) + + + # print(f"Clip resized to {clip.size}") + return clip + +def find_mp3_files(): + """Search for any MP3 files in the current directory and subdirectories.""" + mp3_files = [] + # Check relative paths first + for root, dirs, files in os.walk('.'): + for file in files: + if file.lower().endswith('.mp3'): + mp3_path = os.path.join(root, file) + # Exclude files that are likely temporary or part of internal libraries + if not any(keyword in mp3_path.lower() for keyword in ['temp', '.gradio', 'site-packages', 'dist-packages', 'venv', 'tmp']): # Added 'tmp' + mp3_files.append(mp3_path) + print(f"Found MP3 file: {mp3_path}") + + if mp3_files: + return mp3_files[0] # Return the first one found that isn't excluded + else: + # print("No user-provided MP3 files found in the current directory or subdirectories.") # Keep less noisy return None -def generate_final_video(clips_data, bg_volume, target_resolution, temp_folder): + +def add_background_music(final_video, bg_music_path, bg_music_volume=0.08): + """Add background music to the final video.""" + if not bg_music_path or not os.path.exists(bg_music_path): + print("No valid background music path provided or file not found. Skipping background music.") + return final_video + + try: + print(f"Adding background music from: {bg_music_path} with volume {bg_music_volume}") + bg_music = AudioFileClip(bg_music_path) + + # Loop background music if shorter than video + if bg_music.duration < final_video.duration: + loops_needed = math.ceil(final_video.duration / bg_music.duration) + bg_segments = [bg_music.copy() for _ in range(loops_needed)] # Use copy to avoid issues + bg_music = concatenate_audioclips(bg_segments) + # print(f"Looped background music to {bg_music.duration:.2f}s") # Keep less noisy + + # Subclip background music to match video duration + bg_music = bg_music.subclip(0, final_video.duration) + # print(f"Subclipped background music to {bg_music.duration:.2f}s") # Keep less noisy + + # Adjust volume + bg_music = bg_music.volumex(bg_music_volume) + # print(f"Set background music volume to {bg_music_volume}") # Keep less noisy + + # Composite audio + video_audio = final_video.audio + if video_audio: + # Ensure video audio matches video duration before compositing + if abs(video_audio.duration - final_video.duration) > 0.1: + print(f"Adjusting video audio duration ({video_audio.duration:.2f}s) to match video duration ({final_video.duration:.2f}s) for final mix") + try: + video_audio = video_audio.fx(vfx.speedx, factor=video_audio.duration / final_video.duration) + except Exception as e: + print(f"Error adjusting final video audio speed: {e}. Using original audio.") + pass # Proceed with original audio if speedx fails + + mixed_audio = CompositeAudioClip([video_audio, bg_music]) + # print("Composited video audio and background music") # Keep less noisy + else: + # Handle case where video might not have audio track initially + mixed_audio = bg_music + print("Warning: Video had no original audio track, only adding background music.") + + final_video = final_video.set_audio(mixed_audio) + print("Background music added successfully.") + return final_video + except Exception as e: + print(f"Error adding background music: {e}") + print("Continuing without background music.") + return final_video + + +def create_clip(media_asset, tts_path, estimated_duration, target_resolution, + caption_enabled, caption_color, caption_size, caption_position, + caption_bg_color, caption_stroke_color, caption_stroke_width, + narration_text, segment_index): + """Create a video clip with synchronized subtitles and narration.""" + try: + print(f"Creating clip #{segment_index} from asset: {media_asset.get('path')}, type: {media_asset.get('asset_type')}") + media_path = media_asset.get('path') + asset_type = media_asset.get('asset_type') + + # Determine actual audio duration + audio_clip = None + audio_duration = estimated_duration # Default to estimated duration + target_clip_duration = estimated_duration # Default target duration + + if tts_path and os.path.exists(tts_path): + try: + audio_clip = AudioFileClip(tts_path).audio_fadeout(0.2) # Fade out TTS slightly + audio_duration = audio_clip.duration + # Ensure clip duration is slightly longer than audio for transitions/padding + target_clip_duration = audio_duration + 0.3 # Add a small buffer after TTS ends + # Ensure target duration is not excessively long + target_clip_duration = min(target_clip_duration, estimated_duration * 3 + 5) # Prevent very long clips if TTS audio is unexpectedly long + # Also ensure a minimum duration even if TTS is very short + target_clip_duration = max(target_clip_duration, 2.0) # Minimum clip duration 2 seconds + + + print(f"TTS audio duration: {audio_duration:.2f}s. Target clip duration: {target_clip_duration:.2f}s (estimated {estimated_duration:.2f}s)") + except Exception as e: + print(f"Error loading TTS audio clip {tts_path}: {e}. Using estimated duration {estimated_duration:.2f}s for clip.") + audio_clip = None # Ensure audio_clip is None if loading fails + target_clip_duration = estimated_duration # Fallback to estimated duration + target_clip_duration = max(target_clip_duration, 2.0) # Ensure minimum duration + + else: + # If no TTS path, use estimated duration as target, ensure minimum + target_clip_duration = max(estimated_duration, 2.0) + + + # Handle missing or invalid media first + if not media_path or not os.path.exists(media_path): + print(f"Skipping clip {segment_index}: Missing or invalid media file {media_path}") + # Create a black clip with silent audio for the target duration + clip = ColorClip(size=target_resolution, color=(0,0,0), duration=target_clip_duration) + print(f"Created placeholder black clip for segment {segment_index}") + # Add placeholder text if captions are enabled and text exists + if caption_enabled and narration_text and caption_color.lower() != "transparent" and narration_text.strip(): + txt_clip = TextClip( + "[Missing Media]\n" + narration_text, # Indicate missing media + fontsize=caption_size, + font='Arial-Bold', # Ensure this font is available + color=caption_color, + bg_color=caption_bg_color, + method='caption', + align='center', + stroke_width=caption_stroke_width, + stroke_color=caption_stroke_color, + size=(target_resolution[0] * 0.9, None) + ).set_position('center').set_duration(target_clip_duration) # Duration matches black clip + clip = CompositeVideoClip([clip, txt_clip]) + + # Add silent audio to the placeholder clip + silent_audio_path = generate_silent_audio(target_clip_duration) + if silent_audio_path and os.path.exists(silent_audio_path): + try: + silent_audio_clip = AudioFileClip(silent_audio_path) + # Ensure silent audio duration matches video clip duration + if abs(silent_audio_clip.duration - clip.duration) > 0.1: + silent_audio_clip = silent_audio_clip.fx(vfx.speedx, factor=silent_audio_clip.duration / clip.duration) + clip = clip.set_audio(silent_audio_clip) + except Exception as e: + print(f"Error setting silent audio to placeholder clip {segment_index}: {e}") + clip = clip.set_audio(None) # Set audio to None if silent audio fails loading + else: + clip = clip.set_audio(None) # Set audio to None if silent audio generation fails + + return clip # Return the placeholder clip + + # Process media if path is valid + if asset_type == "video": + try: + clip = VideoFileClip(media_path) + print(f"Loaded video clip from {media_path} with duration {clip.duration:.2f}s") + clip = resize_to_fill(clip, target_resolution) + if clip.duration < target_clip_duration: + print("Looping video clip") + # Loop the video to match the target duration + clip = clip.loop(duration=target_clip_duration) + else: + # Subclip the video to match the target duration + clip = clip.subclip(0, target_clip_duration) + clip = clip.fadein(0.2).fadeout(0.2) # Add simple transitions + print(f"Video clip processed to duration {clip.duration:.2f}s") + + except Exception as e: + print(f"Error processing video clip {media_path} for segment {segment_index}: {e}") + # Fallback to a black clip if video processing fails + print(f"Creating placeholder black clip instead for segment {segment_index}") + clip = ColorClip(size=target_resolution, color=(0,0,0), duration=target_clip_duration) + if caption_enabled and narration_text and caption_color.lower() != "transparent" and narration_text.strip(): + txt_clip = TextClip( + "[Video Error]\n" + narration_text, # Indicate video error + fontsize=caption_size, color=caption_color, bg_color=caption_bg_color, method='caption', align='center', + stroke_width=caption_stroke_width, stroke_color=caption_stroke_color, + size=(target_resolution[0] * 0.9, None) + ).set_position('center').set_duration(target_clip_duration) + clip = CompositeVideoClip([clip, txt_clip]) + + + elif asset_type == "image": + try: + img = Image.open(media_path) + # Ensure image is in RGB format before passing to ImageClip + if img.mode != 'RGB': + print("Converting image to RGB") + img = img.convert('RGB') + # ImageClip accepts numpy arrays + img_array = np.array(img) + img.close() # Close the PIL image + clip = ImageClip(img_array).set_duration(target_clip_duration) + else: + img.close() # Close the PIL image + clip = ImageClip(media_path).set_duration(target_clip_duration) + + # print(f"Loaded image clip from {media_path} with duration {clip.duration:.2f}s") # Keep less noisy + clip = apply_kenburns_effect(clip, target_resolution) # Ken Burns with random effect + clip = clip.fadein(0.3).fadeout(0.3) # Add simple transitions + # print(f"Image clip processed to duration {clip.duration:.2f}s with Ken Burns") # Keep less noisy + + + except Exception as e: + print(f"Error processing image clip {media_path} for segment {segment_index}: {e}") + # Fallback to a black clip if image processing fails + print(f"Creating placeholder black clip instead for segment {segment_index}") + clip = ColorClip(size=target_resolution, color=(0,0,0), duration=target_clip_duration) + if caption_enabled and narration_text and caption_color.lower() != "transparent" and narration_text.strip(): + txt_clip = TextClip( + "[Image Error]\n" + narration_text, # Indicate image error + fontsize=caption_size, color=caption_color, bg_color=caption_bg_color, method='caption', align='center', + stroke_width=caption_stroke_width, stroke_color=caption_stroke_color, + size=(target_resolution[0] * 0.9, None) + ).set_position('center').set_duration(target_clip_duration) + clip = CompositeVideoClip([clip, txt_clip]) + + else: + print(f"Unknown asset type '{asset_type}' for segment {segment_index}. Creating placeholder.") + # Create a placeholder black clip + clip = ColorClip(size=target_resolution, color=(0,0,0), duration=target_clip_duration) + if caption_enabled and narration_text and caption_color.lower() != "transparent" and narration_text.strip(): + txt_clip = TextClip( + "[Unknown Media Type Error]\n" + narration_text, # Indicate unknown type error + fontsize=caption_size, color=caption_color, bg_color=caption_bg_color, method='caption', align='center', + stroke_width=caption_stroke_width, stroke_color=caption_stroke_color, + size=(target_resolution[0] * 0.9, None) + ).set_position('center').set_duration(target_clip_duration) + clip = CompositeVideoClip([clip, txt_clip]) + + + # Set the audio for the clip + if audio_clip: + # Ensure audio clip duration matches video clip duration after processing + if abs(audio_clip.duration - clip.duration) > 0.1: # Allow slight difference (e.g., 100ms) + print(f"Adjusting audio duration ({audio_clip.duration:.2f}s) to match video duration ({clip.duration:.2f}s) for segment {segment_index}") + try: + audio_clip = audio_clip.fx(vfx.speedx, factor=audio_clip.duration / clip.duration) + except Exception as e: + print(f"Error adjusting audio speed for segment {segment_index}: {e}. Using original audio duration.") + # If speeding fails, maybe just loop or subclip the audio? Or regenerate silent audio. + # For now, if speedx fails, let's just attach the original audio and hope for the best timing wise. + pass # Keep the original audio_clip if speedx fails + + clip = clip.set_audio(audio_clip) + else: + # If TTS failed or audio loading failed, ensure video clip has no audio or silent audio + print(f"No valid audio for clip {segment_index}. Setting silent audio.") + silent_audio_path = generate_silent_audio(clip.duration) # Generate silent audio matching the clip's final duration + if silent_audio_path and os.path.exists(silent_audio_path): + try: + silent_audio_clip = AudioFileClip(silent_audio_path) + # Should match duration, but double check + if abs(silent_audio_clip.duration - clip.duration) > 0.1: + silent_audio_clip = silent_audio_clip.fx(vfx.speedx, factor=silent_audio_clip.duration / clip.duration) + clip = clip.set_audio(silent_audio_clip) + except Exception as e: + print(f"Error setting silent audio for segment {segment_index}: {e}") + clip = clip.set_audio(None) # Set audio to None if silent audio fails loading + else: + clip = clip.set_audio(None) # Set audio to None if silent audio generation fails + + + # Add subtitles if enabled and text exists + if caption_enabled and narration_text and caption_color.lower() != "transparent" and narration_text.strip(): + try: + # Determine total audio duration (using actual if available, else estimated) + # Use clip.duration for subtitle timing as the clip's duration is final + actual_clip_duration_for_subtitles = clip.duration + if actual_clip_duration_for_subtitles <= 0: + print(f"Clip duration is zero or negative for segment {segment_index}, cannot add subtitles.") + else: + # Simple word-based chunking for subtitles + words = narration_text.split() + # Calculate average word duration based on clip duration and word count + total_words = len(words) + average_word_duration = actual_clip_duration_for_subtitles / total_words if total_words > 0 else 0.5 # Default if no words + + subtitle_clips = [] + current_time = 0 + chunk_size = 6 # Words per caption chunk (adjust as needed for readability) + + for i in range(0, total_words, chunk_size): + chunk_words = words[i:i+chunk_size] + chunk_text = ' '.join(chunk_words) + # Estimate chunk duration based on word count * average word duration + estimated_chunk_duration = len(chunk_words) * average_word_duration + + start_time = current_time + # Ensure end time doesn't exceed the *clip* duration + end_time = min(current_time + estimated_chunk_duration, clip.duration) + # Ensure minimal duration for a chunk + if end_time - start_time < 0.1 and i + chunk_size < total_words: + end_time = min(start_time + 0.1, clip.duration) # Give it at least 0.1s + + if start_time >= end_time: break # Avoid 0 or negative duration clips + + + # Determine vertical position + if caption_position == "Top": + subtitle_y_position = int(target_resolution[1] * 0.05) # Slightly lower than top edge + elif caption_position == "Middle": + # Calculate vertical center, then subtract half the estimated text height + # Estimate text height based on font size and number of lines (adjust factor as needed) + estimated_text_lines = max(1, math.ceil(len(chunk_words) / chunk_size)) # Crude estimate, at least 1 line + estimated_total_text_height = estimated_text_lines * caption_size * 1.2 # 1.2 is line spacing approx + subtitle_y_position = int(target_resolution[1] * 0.5) - int(estimated_total_text_height / 2) + # Ensure position is not off-screen (allow negative slightly for vertical alignment) + # subtitle_y_position = max(0, subtitle_y_position) # Don't clamp to 0 for Middle, let moviepy handle it + + else: # Default to Bottom + # Position from the bottom edge + # positioning the top-left of the text box at 85% of height often looks good for bottom captions. + subtitle_y_position = int(target_resolution[1] * 0.85) # Top-left of text box is at 85% height + + + txt_clip = TextClip( + chunk_text, + fontsize=caption_size, + font='Arial-Bold', # Ensure this font is available or use a common system font + color=caption_color, + bg_color=caption_bg_color, # Use background color + method='caption', # Enables text wrapping + align='center', + stroke_width=caption_stroke_width, # Use stroke + stroke_color=caption_stroke_color, # Use stroke color + size=(target_resolution[0] * 0.9, None) # Caption width max 90% of video width + ).set_start(start_time).set_end(end_time) + + # Position is tuple ('center', y_position) + txt_clip = txt_clip.set_position(('center', subtitle_y_position)) + subtitle_clips.append(txt_clip) + current_time = end_time # Move to the end of the current chunk + + if subtitle_clips: + clip = CompositeVideoClip([clip] + subtitle_clips) + # print(f"Added {len(subtitle_clips)} subtitle chunks to clip {segment_index}.") # Keep less noisy + # else: + # print(f"No subtitle clips generated for segment {segment_index} (might be due to text/duration issues).") # Keep less noisy + + + except Exception as sub_error: + print(f"Error adding subtitles for segment {segment_index}: {sub_error}") + # Fallback to a single centered text overlay if detailed subtitling fails + try: + txt_clip = TextClip( + narration_text, + fontsize=caption_size, + font='Arial-Bold', + color=caption_color, + bg_color=caption_bg_color, + method='caption', + align='center', + stroke_width=caption_stroke_width, + stroke_color=caption_stroke_color, + size=(target_resolution[0] * 0.8, None) + ).set_position(('center', int(target_resolution[1] * 0.75))).set_duration(clip.duration) # Position slightly above bottom + clip = CompositeVideoClip([clip, txt_clip]) + print(f"Added simple fallback subtitle for segment {segment_index}.") + except Exception as fallback_sub_error: + print(f"Simple fallback subtitle failed for segment {segment_index}: {fallback_sub_error}") + + + # Ensure final clip duration is explicitly set (already done earlier based on audio) + # clip = clip.set_duration(clip.duration) + + # print(f"Clip {segment_index} created successfully: {clip.duration:.2f}s") # Keep less noisy + return clip + except Exception as e: + print(f"Critical error in create_clip for segment {segment_index}: {str(e)}") + # Create a black clip with error message if anything goes wrong during the main process + # Use a safe duration if previous duration calculation also failed + error_duration = target_clip_duration if 'target_clip_duration' in locals() and target_clip_duration > 0 else (estimated_duration if estimated_duration > 0 else 3.0) + print(f"Creating error placeholder black clip for segment {segment_index} with duration {error_duration:.2f}s.") + black_clip = ColorClip(size=target_resolution, color=(0,0,0), duration=error_duration) + error_text = f"Error in segment {segment_index}" + if narration_text: error_text += f":\n{narration_text[:50]}..." + error_txt_clip = TextClip( + error_text, + fontsize=30, + color="red", + align='center', + size=(target_resolution[0] * 0.9, None) + ).set_position('center').set_duration(black_clip.duration) # Error text duration matches placeholder + clip = CompositeVideoClip([black_clip, error_txt_clip]) + silent_audio_path = generate_silent_audio(clip.duration) # Generate silent audio matching placeholder duration + if silent_audio_path and os.path.exists(silent_audio_path): + try: + # Use the actual placeholder clip duration for silent audio + clip = clip.set_audio(AudioFileClip(silent_audio_path).subclip(0, clip.duration)) + except Exception as audio_e: + print(f"Error setting silent audio for error clip {segment_index}: {audio_e}") + clip = clip.set_audio(None) + else: + clip = clip.set_audio(None) + return clip + + +def fix_imagemagick_policy(): + """Attempt to fix ImageMagick security policies required by TextClip.""" + print("Attempting to fix ImageMagick security policies...") + + # Use the found binary path if available, otherwise use default list + if found_imagemagick_binary: + # Assuming policy.xml is relative to the binary path or in a standard location + # This is a heuristic, may need manual path depending on installation + # Normalize binary path to handle symlinks etc. + real_imagemagick_binary_path = os.path.realpath(found_imagemagick_binary) + binary_dir = os.path.dirname(real_imagemagick_binary_path) + policy_paths_to_check = [ + os.path.join(binary_dir, '..', 'etc', 'ImageMagick-7', 'policy.xml'), + os.path.join(binary_dir, '..', 'etc', 'ImageMagick-6', 'policy.xml'), + os.path.join(binary_dir, '..', 'etc', 'ImageMagick', 'policy.xml'), + os.path.join(binary_dir, '..', 'share', 'ImageMagick-7', 'policy.xml'), + os.path.join(binary_dir, '..', 'share', 'ImageMagick-6', 'policy.xml'), + os.path.join(binary_dir, '..', 'share', 'ImageMagick', 'policy.xml'), + # Add more paths relative to binary if needed + ] + # Add standard system paths as fallbacks + policy_paths_to_check.extend([ + "/etc/ImageMagick-6/policy.xml", + "/etc/ImageMagick-7/policy.xml", + "/etc/ImageMagick/policy.xml", + "/usr/local/etc/ImageMagick-7/policy.xml", # macports/homebrew path + "/usr/share/ImageMagick/policy.xml", + "/usr/share/ImageMagick-6/policy.xml", + "/usr/share/ImageMagick-7/policy.xml", + os.path.join(os.environ.get('MAGICK_HOME', '') if os.environ.get('MAGICK_HOME') else '.', 'policy.xml'), # Check MAGICK_HOME + ]) + else: + # Only check standard system paths if binary wasn't found + policy_paths_to_check = [ + "/etc/ImageMagick-6/policy.xml", + "/etc/ImageMagick-7/policy.xml", + "/etc/ImageMagick/policy.xml", + "/usr/local/etc/ImageMagick-7/policy.xml", # macports/homebrew path + "/usr/share/ImageMagick/policy.xml", + "/usr/share/ImageMagick-6/policy.xml", + "/usr/share/ImageMagick-7/policy.xml", + os.path.join(os.environ.get('MAGICK_HOME', '') if os.environ.get('MAGICK_HOME') else '.', 'policy.xml'), # Check MAGICK_HOME + ] + + + # Filter out empty paths and check existence, prioritize unique paths + existing_policy_paths = [] + seen_paths = set() + for path in policy_paths_to_check: + if path and os.path.exists(path) and path not in seen_paths: + existing_policy_paths.append(path) + seen_paths.add(path) + + + found_policy = None + if existing_policy_paths: + found_policy = existing_policy_paths[0] # Use the first unique one found + + if not found_policy: + print("No policy.xml found in common locations. TextClip may fail.") + print("Consider installing ImageMagick and checking its installation path and policy.xml location.") + return False + + print(f"Attempting to modify policy file at {found_policy}") + try: + # Create a backup - use a unique name + backup_path = f"{found_policy}.bak_aivgen_{int(time.time())}" + if os.path.exists(found_policy): + shutil.copy2(found_policy, backup_path) + print(f"Created backup at {backup_path}") + else: + print(f"Warning: Policy file {found_policy} not found at copy stage, cannot create backup.") + + + # Read the original policy file (handle potential permission issues) + policy_content = None + try: + with open(found_policy, 'r') as f: + policy_content = f.read() + except Exception as e: + print(f"Error reading policy file {found_policy}: {e}. Attempting with sudo cat...") + try: + # Use sudo cat to read if direct read fails + process = subprocess.Popen(['sudo', 'cat', found_policy], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = process.communicate() + if process.returncode == 0: + policy_content = stdout.decode('utf-8') + print("Read policy file content using sudo.") + else: + print(f"Failed to read policy file using sudo cat. Result code: {process.returncode}. Error: {stderr.decode('utf-8')}") + print("Manual intervention may be required.") + return False + except FileNotFoundError: + print(f"sudo command not found. Cannot read policy file with sudo.") + return False + except Exception as e_sudo_read: + print(f"Error executing sudo cat: {e_sudo_read}") + print("Manual intervention may be required.") + return False + + if policy_content is None: + print("Failed to read policy file content.") + return False + + # Use regex to find and replace the specific policy lines + # Allow read and write rights for PDF, EPS, PS, etc. potentially restricted formats + # Also ensure path policies allow reading/writing files + # Be more specific with replacements to avoid unintended side effects + modified_content = re.sub( + r'', # Added /? for self-closing tag + r'', # Ensure it ends with self-closing tag + modified_content + ) + + # Also handle a more general case if the above didn't match, but with caution + # This attempts to change any 'rights="none"' on 'coder' or 'path' domains + # if the specific patterns weren't matched. + def _replace_none_rights(match): + domain = match.group(1) + rest = match.group(2) + # Only replace if rights is currently "none" + if 'rights="none"' in match.group(0): + print(f"Applying general policy fix for domain '{domain}'") + return f'' + return match.group(0) # Return original if no "none" rights found + + modified_content = re.sub( + r'', + _replace_none_rights, + modified_content + ) + + + # Write the modified content back (handle potential permission issues) + try: + with open(found_policy, 'w') as f: + f.write(modified_content) + print("ImageMagick policies updated successfully (direct write).") + return True + except IOError as e: + print(f"Direct write failed: {e}. Attempting with sudo tee...") + # Fallback to using os.system with sudo tee if direct write fails + # This requires the user to be able to run sudo commands without a password prompt for the script's execution + # and tee needs to be available. + # Using subprocess is safer than os.system for piping + try: + # Write modified content to a temporary file first + # Ensure TEMP_FOLDER is set before creating a temp file path + if not TEMP_FOLDER: + print("Error: TEMP_FOLDER not set for sudo write fallback.") + return False + os.makedirs(TEMP_FOLDER, exist_ok=True) # Ensure temp folder exists + + temp_policy_file = os.path.join(TEMP_FOLDER, "temp_policy_modified.xml") + with open(temp_policy_file, 'w') as f: + f.write(modified_content) + + # Use sudo tee to overwrite the original file + # sudo tee < temp_file + cmd = ['sudo', 'tee', found_policy] + print(f"Executing: {' '.join(cmd)} < {temp_policy_file}") + + # Using subprocess with stdin redirection + with open(temp_policy_file, 'rb') as f_in: # Open in binary mode for input + process = subprocess.Popen(cmd, stdin=f_in, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = process.communicate() + + if process.returncode == 0: + print("ImageMagick policies updated successfully using sudo tee.") + return True + else: + print(f"Failed to update ImageMagick policies using sudo tee. Result code: {process.returncode}. Error: {stderr.decode('utf-8')}") + print("Please manually edit your policy.xml to grant read/write rights for coder and path domains.") + print("Example: Change to ") + return False + except FileNotFoundError: + print(f"sudo or tee command not found. Cannot write policy file with sudo.") + return False + except Exception as e_sudo_write: + print(f"Error executing sudo tee process: {e_sudo_write}") + print("Manual intervention may be required.") + return False + finally: + # Clean up the temporary file + if 'temp_policy_file' in locals() and os.path.exists(temp_policy_file): + os.remove(temp_policy_file) + + + except Exception as e_general: + print(f"General error during ImageMagick policy modification: {e_general}") + print("Manual intervention may be required.") + return False + + +# ---------------- Gradio Interface Functions ---------------- # + +def generate_script_and_show_editor(user_input, resolution_choice, + caption_enabled_choice, caption_color, + caption_size, caption_position, caption_bg_color, + caption_stroke_color, caption_stroke_width, + bg_music_file): # Added bg_music_file input here + """ + Generates the script, parses it, stores segments in state, + and prepares the UI updates to show the editing interface. + Uses yield to update status. + """ + global TEMP_FOLDER + # Clean up previous run's temp folder if it exists + if TEMP_FOLDER and os.path.exists(TEMP_FOLDER): + print(f"Cleaning up previous temp folder: {TEMP_FOLDER}") + try: + # Use onerror to log errors during cleanup + def onerror(func, path, exc_info): + print(f"Error cleaning up {path}: {exc_info[1]}") + shutil.rmtree(TEMP_FOLDER, onerror=onerror) + except Exception as e: + print(f"Error starting cleanup of temp folder {TEMP_FOLDER}: {e}") + + # Create a new unique temporary folder for this run + # Add a suffix based on time to minimize collision risk if cleanup fails + TEMP_FOLDER = tempfile.mkdtemp(prefix="aivgen_") + print(f"Created new temp folder: {TEMP_FOLDER}") + + # Store global style choices and music file path in state + run_config = { + "resolution": (1920, 1080) if resolution_choice == "Full (1920x1080)" else (1080, 1920), + "caption_enabled": caption_enabled_choice == "Yes", + "caption_color": caption_color, + "caption_size": caption_size, + "caption_position": caption_position, + "caption_bg_color": caption_bg_color, + "caption_stroke_color": caption_stroke_color, + "caption_stroke_width": caption_stroke_width, + "temp_folder": TEMP_FOLDER, # Store temp folder path + "bg_music_path": bg_music_file # Store background music file path + } + + # Initial status update and hide editing/video areas + # Yielding multiple updates in a list/tuple works for simultaneous updates + # The outputs need to match the order specified in the .click() outputs list + # Outputs list order: run_config_state, status_output, editing_area, final_video_output, script_preview_markdown, + # segment_text_inputs (MAX), segment_file_inputs (MAX), segment_editing_groups (MAX), segment_prompt_labels (MAX), + # segments_state (LAST) + num_dynamic_outputs_per_segment = 3 # Textbox, File, Group + num_total_dynamic_outputs = MAX_SEGMENTS_FOR_EDITING * num_dynamic_outputs_per_segment + + # Prepare initial updates for all dynamic components to be hidden + initial_textbox_updates = [gr.update(visible=False, value="") for _ in range(MAX_SEGMENTS_FOR_EDITING)] + initial_file_updates = [gr.update(visible=False, value=None) for _ in range(MAX_SEGMENTS_FOR_EDITING)] + initial_group_visibility_updates = [gr.update(visible=False) for _ in range(MAX_SEGMENTS_FOR_EDITING)] + # Initial prompt labels should be cleared and hidden + initial_label_updates = [gr.update(value="", visible=False) for _ in range(MAX_SEGMENTS_FOR_EDITING)] + + + yield (run_config, # 0 + gr.update(value="Generating script...", visible=True), # 1 + gr.update(visible=False), # 2 editing area + gr.update(value=None, visible=False), # 3 video output + gr.update(visible=False, value="### Generated Script Preview\n\nGenerating script..."), # 4 raw script preview + # Outputs for dynamic components (initially hide/clear all) - Indices 5 onwards + *initial_textbox_updates, # 5... + *initial_file_updates, # ... + *initial_group_visibility_updates, # ... + *initial_label_updates, # ... Update prompt labels visibility and value + [], # segments_state - This is the LAST element updated + ) + + + script_text = generate_script(user_input, OPENROUTER_API_KEY, OPENROUTER_MODEL) + + # Determine raw script preview content + raw_script_preview_content = f"### Generated Script Preview\n\n```\n{script_text}\n```" if script_text and not script_text.startswith("[Error]") else f"### Generated Script Preview\n\n{script_text}" + + if not script_text or script_text.startswith("[Error]"): + # Update status and keep editing/video areas hidden + yield (run_config, + gr.update(value=f"Script generation failed: {script_text}", visible=True), + gr.update(visible=False), + gr.update(value=None, visible=False), + gr.update(visible=True, value=raw_script_preview_content), # Show raw script preview on error + # Outputs for dynamic components (all hidden) + *[gr.update(visible=False, value="") for _ in range(MAX_SEGMENTS_FOR_EDITING)], + *[gr.update(visible=False, value=None) for _ in range(MAX_SEGMENTS_FOR_EDITING)], + *[gr.update(visible=False) for _ in range(MAX_SEGMENTS_FOR_EDITING)], + *[gr.update(value="", visible=False) for _ in range(MAX_SEGMENTS_FOR_EDITING)], # Clear prompt labels + [], # segments_state remains empty + ) + return # Stop execution + + + yield (run_config, + gr.update(value="Parsing script...", visible=True), + gr.update(visible=False), + gr.update(value=None, visible=False), + gr.update(visible=True, value=raw_script_preview_content), # Show raw script preview + *[gr.update(visible=False, value="") for _ in range(MAX_SEGMENTS_FOR_EDITING)], + *[gr.update(visible=False, value=None) for _ in range(MAX_SEGMENTS_FOR_EDITING)], + *[gr.update(visible=False) for _ in range(MAX_SEGMENTS_FOR_EDITING)], + *[gr.update(value="", visible=False) for _ in range(MAX_SEGMENTS_FOR_EDITING)], # Clear prompt labels + [], # segments_state will be updated next + ) + + + segments = parse_script(script_text) + + if not segments: + yield (run_config, + gr.update(value="Failed to parse script or script is empty after parsing.", visible=True), + gr.update(visible=False), + gr.update(value=None, visible=False), + gr.update(visible=True, value=raw_script_preview_content), # Show raw script preview + # Outputs for dynamic components (all hidden) + *[gr.update(visible=False, value="") for _ in range(MAX_SEGMENTS_FOR_EDITING)], + *[gr.update(visible=False, value=None) for _ in range(MAX_SEGMENTS_FOR_EDITING)], + *[gr.update(visible=False) for _ in range(MAX_SEGMENTS_FOR_EDITING)], + *[gr.update(value="", visible=False) for _ in range(MAX_SEGMENTS_FOR_EDITING)], # Clear prompt labels + [], # segments_state remains empty + ) + return # Stop execution + + + # Prepare updates for dynamic editing components based on parsed segments + textbox_updates = [] + file_updates = [] + group_visibility_updates = [] + label_updates = [] # Updates for prompt labels + + for i in range(MAX_SEGMENTS_FOR_EDITING): + if i < len(segments): + # Show group, populate text, clear file upload, set prompt label + textbox_updates.append(gr.update(value=segments[i]['text'], visible=True)) + file_updates.append(gr.update(value=None, visible=True)) # Clear previous uploads + group_visibility_updates.append(gr.update(visible=True)) + label_updates.append(gr.update(value=f"Segment {i+1} (Prompt: {segments[i]['original_prompt']})", visible=True)) # Set label value and show + else: + # Hide unused groups and clear their values + textbox_updates.append(gr.update(value="", visible=False)) + file_updates.append(gr.update(value=None, visible=False)) + group_visibility_updates.append(gr.update(visible=False)) + label_updates.append(gr.update(value="", visible=False)) # Clear label value and hide + + + # Final yield to update UI: show editing area, populate fields, update state + yield (run_config, # 0 + gr.update(value=f"Script generated with {len(segments)} segments. Edit segments below.", visible=True), # 1 + gr.update(visible=True), # 2 Show Editing area + gr.update(value=None, visible=False), # 3 Ensure video output is hidden and cleared + gr.update(visible=True, value=raw_script_preview_content), # 4 Show raw script preview + # Dynamic outputs - Indices 5 onwards + *textbox_updates, # 5 ... + *file_updates, # ... + *group_visibility_updates, # ... + *label_updates, # ... Update prompt labels visibility and value + segments, # LAST - Update the state with parsed segments + ) + + +# Update generate_video_from_edited to accept the unpacked dynamic arguments +# It will receive: run_config_val, segments_data_val, then MAX_SEGMENTS_FOR_EDITING text values, +# then MAX_SEGMENTS_FOR_EDITING file values, then bg_music_volume_val. +def generate_video_from_edited(run_config_val, segments_data_val, *dynamic_segment_inputs, bg_music_volume_val): + """ + Takes the edited segment data (text, uploaded files) and configuration, + and generates the final video. + Uses yield to update status. + """ + # Re-pack the dynamic inputs into lists + # dynamic_segment_inputs contains all text and file inputs from the UI, in order. + # The first MAX_SEGMENTS_FOR_EDITING are text inputs, the next MAX_SEGMENTS_FOR_EDITING are file inputs. + num_dynamic_per_segment = 2 # Textbox and File + expected_dynamic_count = MAX_SEGMENTS_FOR_EDITING * num_dynamic_per_segment + + if len(dynamic_segment_inputs) != expected_dynamic_count: + print(f"Error: Expected {expected_dynamic_count} dynamic inputs (text+files), but received {len(dynamic_segment_inputs)}.") + yield "Error: Mismatch in segment inputs received. Please regenerate script.", None + return # Cannot proceed with incorrect inputs + + segment_texts = list(dynamic_segment_inputs[:MAX_SEGMENTS_FOR_EDITING]) + segment_uploads = list(dynamic_segment_inputs[MAX_SEGMENTS_FOR_EDITING:]) + + + # Now use the re-packed lists and the correctly named arguments + run_config = run_config_val + segments_data = segments_data_val + bg_music_volume = bg_music_volume_val + + + if not segments_data: + yield "No segments to process. Generate script first.", None + return + + global TEMP_FOLDER + # Ensure TEMP_FOLDER is correctly set from run_config + TEMP_FOLDER = run_config.get("temp_folder") + if not TEMP_FOLDER or not os.path.exists(TEMP_FOLDER): + yield "Error: Temporary folder not found from run config. Please regenerate script.", None + # Attempt cleanup just in case temp folder existed but was invalid + if TEMP_FOLDER and os.path.exists(TEMP_FOLDER): + try: + shutil.rmtree(TEMP_FOLDER) + except Exception as e: + print(f"Error cleaning up invalid temp folder {TEMP_FOLDER}: {e}") + TEMP_FOLDER = None # Reset global + return + + # Extract other config from run_config + TARGET_RESOLUTION = run_config.get("resolution", (1920, 1080)) # Default if missing + CAPTION_ENABLED = run_config.get("caption_enabled", True) # Default if missing + CAPTION_COLOR = run_config.get("caption_color", "#FFFFFF") # Default if missing + CAPTION_SIZE = run_config.get("caption_size", 45) # Default if missing + CAPTION_POSITION = run_config.get("caption_position", "Bottom") # Default if missing + CAPTION_BG_COLOR = run_config.get("caption_bg_color", "rgba(0, 0, 0, 0.4)") # Default if missing + CAPTION_STROKE_COLOR = run_config.get("caption_stroke_color", "#000000") # Default if missing + CAPTION_STROKE_WIDTH = run_config.get("caption_stroke_width", 2) # Default if missing + BG_MUSIC_UPLOAD_PATH = run_config.get("bg_music_path") # Get uploaded music path from config + + + # Update segments_data with potentially edited text and uploaded file paths + # segment_texts and segment_uploads are lists of values from the Gradio components + processed_segments = [] + # Iterate up to the minimum of state segments and provided inputs + num_segments_to_process = min(len(segments_data), len(segment_texts), len(segment_uploads), MAX_SEGMENTS_FOR_EDITING) + + if num_segments_to_process == 0: + yield "No segments to process after reading editor inputs. Script might be empty or inputs missing.", None + # Clean up + if TEMP_FOLDER and os.path.exists(TEMP_FOLDER): + try: + shutil.rmtree(TEMP_FOLDER) + print(f"Cleaned up temp folder: {TEMP_FOLDER}") + except Exception as e: + print(f"Error cleaning up temp folder {TEMP_FOLDER}: {e}") + TEMP_FOLDER = None # Reset global + return + + + for i in range(num_segments_to_process): + segment = segments_data[i] # Get original segment data + processed_segment = segment.copy() # Make a copy + # Use edited text, strip whitespace + processed_segment['text'] = segment_texts[i].strip() if segment_texts[i] is not None else segment.get('text', '').strip() + # Use uploaded media path (will be None if nothing uploaded) + processed_segment['uploaded_media'] = segment_uploads[i] + processed_segments.append(processed_segment) + + + yield "Fixing ImageMagick policy...", None + # Call fix_imagemagick_policy again just before video generation as a safeguard + fix_imagemagick_policy() + clips = [] - for clip_data in clips_data: + yield "Generating media and audio for clips...", None + + total_segments = len(processed_segments) + for idx, segment in enumerate(processed_segments): + yield f"Processing segment {idx+1}/{total_segments}: Generating media and audio...", None + print(f"\nProcessing segment {idx+1}/{total_segments} (Prompt: '{segment.get('original_prompt', 'N/A')[:30]}...')") + + # Determine media source: uploaded or generated + media_asset = generate_media_asset( + segment.get('original_prompt', 'background'), # Use original prompt for search if available, else a generic term + segment.get('uploaded_media') # Pass uploaded media path + ) + + # Generate TTS audio + tts_path = generate_tts(segment.get('text', '')) # Use edited text, default to empty string if None/missing + + # Create the video clip for this segment + yield f"Processing segment {idx+1}/{total_segments}: Creating clip...", None clip = create_clip( - clip_data['media'], - clip_data['tts_path'], - clip_data['duration'], - clip_data['customizations'], - target_resolution + media_asset=media_asset if media_asset else {"path": None, "asset_type": None}, # Pass dummy if generate_media_asset failed + tts_path=tts_path, + estimated_duration=segment.get('duration', 3.0), # Use estimated duration as a fallback reference + target_resolution=TARGET_RESOLUTION, + caption_enabled=CAPTION_ENABLED, + caption_color=CAPTION_COLOR, + caption_size=CAPTION_SIZE, + caption_position=CAPTION_POSITION, + caption_bg_color=CAPTION_BG_COLOR, + caption_stroke_color=CAPTION_STROKE_COLOR, + caption_stroke_width=CAPTION_STROKE_WIDTH, + narration_text=segment.get('text', ''), # Pass narration text for captions + segment_index=idx+1 ) + if clip: clips.append(clip) + else: + print(f"Skipping segment {idx+1} due to clip creation failure.") + # If create_clip returns None (shouldn't happen with fallback logic, but as safety) + # Add a placeholder black clip + placeholder_duration = segment.get('duration', 3.0) # Use estimated duration or default + placeholder_clip = ColorClip(size=TARGET_RESOLUTION, color=(0,0,0), duration=placeholder_duration) + silent_audio_path = generate_silent_audio(placeholder_duration) + if silent_audio_path and os.path.exists(silent_audio_path): + placeholder_clip = placeholder_clip.set_audio(AudioFileClip(silent_audio_path)) + error_text = f"Segment {idx+1} Failed" + if segment.get('text'): error_text += f":\n{segment['text'][:50]}..." + error_txt_clip = TextClip(error_text, fontsize=30, color="red", align='center', size=(TARGET_RESOLUTION[0] * 0.9, None)).set_position('center').set_duration(placeholder_clip.duration) # Error text duration matches placeholder + placeholder_clip = CompositeVideoClip([placeholder_clip, error_txt_clip]) + clips.append(placeholder_clip) + + if not clips: - return None - final = concatenate_videoclips(clips, method="compose") - # Add background music (using a placeholder audio file) - bg_music = AudioFileClip("sample_music.mp3").volumex(bg_volume / 100.0).subclip(0, final.duration) - final_audio = CompositeVideoClip([final.audio, bg_music]).audio_fadeout(0.5) - final = final.set_audio(final_audio) - final.write_videofile(OUTPUT_VIDEO_FILENAME, codec='libx264', fps=24) - return OUTPUT_VIDEO_FILENAME - -# Main Logic -def generate_initial_clips(concept, resolution_str): - global TEMP_FOLDER - TEMP_FOLDER = tempfile.mkdtemp() - script = generate_script(concept) - segments = parse_script(script) - clips_data = [] - for seg in segments: - media = generate_media(seg['prompt'], TEMP_FOLDER) - tts_path = generate_tts(seg['text'], TEMP_FOLDER) - if media and tts_path: - clips_data.append({ - "prompt": seg['prompt'], - "text": seg['text'], - "media": media, - "tts_path": tts_path, - "duration": seg['duration'], - "customizations": {"text": seg['text']} - }) - target_res = (1080, 1080) if resolution_str == "1080x1080" else (1920, 1080) - return clips_data, clips_data, script, target_res - -# Gradio Interface -with gr.Blocks(theme=gr.themes.Default(primary_hue="orange", neutral_hue="black")) as iface: - gr.Markdown("# AI Video Generator") - with gr.Row(): - concept = gr.Textbox(label="Video Concept") - resolution = gr.Radio(["1080x1080", "1920x1080"], label="Resolution", value="1080x1080") - output_format = gr.Dropdown(["MP4 (H.264)"], label="Output Format", value="MP4 (H.264)") - bg_volume = gr.Slider(0, 100, value=50, label="Background Music Volume") - with gr.Group(): - gr.Markdown("## Caption Settings") - show_captions = gr.Checkbox(label="Show Captions", value=True) - text_color = gr.ColorPicker(label="Text Color", value="#FFFFFF") - bg_color = gr.ColorPicker(label="Background Color", value="#000000") - transparency = gr.Slider(0, 100, value=40, label="Transparency") - stroke_width = gr.Slider(0, 10, value=2, label="Stroke Width") - stroke_color = gr.ColorPicker(label="Stroke Color", value="#000000") - generate_btn = gr.Button("Generate Script") - script_output = gr.Textbox(label="Generated Script") - with gr.Column(): - gr.Markdown("## Edit Segments") - clip_editors = gr.Column() - clips_state = gr.State() - text_inputs = gr.State([]) - target_res_state = gr.State() - - def create_editors(clips_data, text_list, script): - if not clips_data: - return gr.update(visible=False), clips_data, text_list, script - with clip_editors: - editors = [] - text_inputs_list = [] - for i, clip in enumerate(clips_data): - with gr.Group(): - gr.Markdown(f"### {clip['prompt']}") - text_input = gr.Textbox(label="Narrative", value=clip['text']) - media_upload = gr.File(label="Upload Media") - text_inputs_list.append(text_input) - editors.append(media_upload) - return gr.update(visible=True), clips_data, text_inputs_list, script - - generate_btn.click( - fn=generate_initial_clips, - inputs=[concept, resolution], - outputs=[clips_state, text_inputs, script_output, target_res_state] - ).then( - fn=create_editors, - inputs=[clips_state, text_inputs, script_output], - outputs=[clip_editors, clips_state, text_inputs, script_output] - ) + yield "No clips were successfully created. Video generation failed.", None + # Clean up + if TEMP_FOLDER and os.path.exists(TEMP_FOLDER): + try: + shutil.rmtree(TEMP_FOLDER) + print(f"Cleaned up temp folder: {TEMP_FOLDER}") + except Exception as e: + print(f"Error cleaning up temp folder {TEMP_FOLDER}: {e}") + TEMP_FOLDER = None # Reset global + return + + yield "Concatenating clips...", None + print("\nConcatenating clips...") + try: + final_video = concatenate_videoclips(clips, method="compose") + except Exception as e: + print(f"Error concatenating clips: {e}") + yield f"Error concatenating clips: {e}", None + # Clean up + if TEMP_FOLDER and os.path.exists(TEMP_FOLDER): + try: + shutil.rmtree(TEMP_FOLDER) + print(f"Cleaned up temp folder: {TEMP_FOLDER}") + except Exception as e: + print(f"Error cleaning up temp folder {TEMP_FOLDER}: {e}") + TEMP_FOLDER = None # Reset global + return + + + yield "Adding background music...", None + # Use the uploaded music path first, fall back to finding an MP3 + bg_music_to_use = BG_MUSIC_UPLOAD_PATH if BG_MUSIC_UPLOAD_PATH else find_mp3_files() + final_video = add_background_music(final_video, bg_music_to_use, bg_music_volume=bg_music_volume) # Use volume from input + + + yield f"Exporting final video to {OUTPUT_VIDEO_FILENAME}...", None + print(f"Exporting final video to {OUTPUT_VIDEO_FILENAME}...") + output_path = None + try: + # Use a temporary output file first for safety, within TEMP_FOLDER + temp_output_filename = os.path.join(TEMP_FOLDER, f"temp_final_video_{int(time.time())}.mp4") + final_video.write_videofile(temp_output_filename, codec='libx264', fps=24, preset='veryfast') + + # Ensure the destination directory for the final output exists (current dir) + os.makedirs(os.path.dirname(OUTPUT_VIDEO_FILENAME) or '.', exist_ok=True) + + # Move the final file to the intended location after successful export + final_output_path = OUTPUT_VIDEO_FILENAME + try: + shutil.move(temp_output_filename, final_output_path) + print(f"Final video saved as {final_output_path}") + output_path = final_output_path + except shutil.SameFileError: + print(f"Output path is the same as temp path, no move needed: {temp_output_filename}") + output_path = temp_output_filename + except Exception as e: + print(f"Error moving temporary file {temp_output_filename} to final destination {final_output_path}: {e}") + # If move fails, return the temp file path or None + output_path = temp_output_filename # Return temp path so user can access it + print(f"Returning video from temporary path: {output_path}") + + + except Exception as e: + print(f"Video export failed: {e}") + output_path = None + yield f"Video export failed: {e}", None # Provide error message in status + + # Clean up temporary folder + yield "Cleaning up temporary files...", output_path # Update status before cleanup + if TEMP_FOLDER and os.path.exists(TEMP_FOLDER): + try: + # Use onerror to log errors during cleanup + def onerror(func, path, exc_info): + print(f"Error cleaning up {path}: {exc_info[1]}") + shutil.rmtree(TEMP_FOLDER, onerror=onerror) + print(f"Cleaned up temp folder: {TEMP_FOLDER}") + except Exception as e: + print(f"Error starting cleanup of temp folder {TEMP_FOLDER}: {e}") + TEMP_FOLDER = None # Reset global + + yield "Done!", output_path # Final status update + + +# ---------------- Gradio Interface Definition (Blocks) ---------------- # + +# Need lists to hold the dynamic UI components for segments +segment_editing_groups = [] +segment_prompt_labels = [] # List to hold the prompt Labels +segment_text_inputs = [] +segment_file_inputs = [] - generate_video_btn = gr.Button("Generate Video") - video_output = gr.Video(label="Generated Video") - - def process_video(clips_data, target_res, *args): - text_inputs = args[:len(clips_data)] - media_uploads = args[len(clips_data):2*len(clips_data)] - cust_args = args[2*len(clips_data):] - for i, (text, upload) in enumerate(zip(text_inputs, media_uploads)): - clips_data[i]['text'] = text - clips_data[i]['customizations']['text'] = text - if upload: - media = generate_media(clips_data[i]['prompt'], TEMP_FOLDER, upload) - if media: - clips_data[i]['media'] = media - customizations = { - 'show_captions': cust_args[0], - 'text_color': cust_args[1], - 'bg_color': cust_args[2], - 'transparency': cust_args[3], - 'stroke_width': cust_args[4], - 'stroke_color': cust_args[5] - } - for clip in clips_data: - clip['customizations'].update(customizations) - bg_volume_val = cust_args[6] - return generate_final_video(clips_data, bg_volume_val, target_res, TEMP_FOLDER) +with gr.Blocks() as demo: + gr.Markdown("# 🤖 AI Documentary Video Generator 🎬") + gr.Markdown("Enter a concept to generate a funny documentary script. You can then edit the script text and replace the suggested media for each segment before generating the final video.") + # --- Global Settings --- + with gr.Accordion("Global Settings", open=True): + user_concept_input = gr.Textbox(label="Video Concept", placeholder="e.g., The secret life of pigeons, Why socks disappear in the laundry, The futility of alarm clocks...") + with gr.Row(): + resolution_radio = gr.Radio(["Full (1920x1080)", "Short (1080x1920)"], label="Video Resolution", value="Full (1920x1080)") + bg_music_volume_slider = gr.Slider(minimum=0, maximum=0.5, value=0.08, step=0.01, label="Background Music Volume", info="Lower volume keeps narration clear.") # Adjusted max volume + # Added music upload component + bg_music_upload = gr.File(label="Upload Background Music (MP3, WAV)", type="filepath", interactive=True, file_types=[".mp3", ".wav"]) + + + # --- Caption Settings --- + with gr.Accordion("Caption Settings", open=False): + caption_enabled_radio = gr.Radio(["Yes", "No"], label="Show Captions?", value="Yes") + with gr.Row(): + caption_color_picker = gr.ColorPicker(label="Caption Text Color", value="#FFFFFF") # Default white + caption_bg_color_picker = gr.ColorPicker(label="Caption Background Color (with transparency)", value="rgba(0, 0, 0, 0.4)") # Default semi-transparent black, slightly more opaque + with gr.Row(): + caption_size_slider = gr.Slider(minimum=20, maximum=80, value=45, step=1, label="Caption Font Size") # Adjusted max size + caption_stroke_width_slider = gr.Slider(minimum=0, maximum=5, value=2, step=0.5, label="Caption Stroke Width") + with gr.Row(): + caption_position_radio = gr.Radio(["Top", "Middle", "Bottom"], label="Caption Position", value="Bottom") + caption_stroke_color_picker = gr.ColorPicker(label="Caption Stroke Color", value="#000000") # Default black stroke + + + generate_script_btn = gr.Button("Generate Script", variant="primary") + + # --- Status and Script Output --- + status_output = gr.Label(label="Status", value="", visible=True) # Always visible + # Using Markdown to show raw script content + script_preview_markdown = gr.Markdown("### Generated Script Preview\n\nScript will appear here...", visible=False) # Initially hidden + + # --- State to hold parsed segments data and run config --- + segments_state = gr.State([]) # List of segment dictionaries + run_config_state = gr.State({}) # Dictionary for run configuration + + # --- Dynamic Editing Area (Initially hidden) --- + # We create MAX_SEGMENTS_FOR_EDITING groups, and show/hide them dynamically + with gr.Column(visible=False, elem_id="editing_area_id") as editing_area: # Added elem_id + gr.Markdown("### Edit Script Segments") + gr.Markdown("Review the AI-generated text and media suggestions below. Edit the text and/or upload your own image/video for any segment. If no file is uploaded, AI will fetch media based on the original prompt.") + for i in range(MAX_SEGMENTS_FOR_EDITING): + # Use gr.Group instead of gr.Box for compatibility + with gr.Group(visible=False) as segment_group: # Each group represents one segment + segment_editing_groups.append(segment_group) + # Use a Label to display the original prompt - it's non-interactive text + # The value will be updated by JS or Python outputs + segment_prompt_label = gr.Label( + f"Segment {i+1} Prompt:", # Initial placeholder text, will be overwritten + show_label=False, + ) + segment_prompt_labels.append(segment_prompt_label) + + + segment_text = gr.Textbox(label="Narration Text", lines=2, interactive=True) + segment_text_inputs.append(segment_text) + + segment_file = gr.File(label="Upload Custom Media (Image or Video)", type="filepath", interactive=True) + segment_file_inputs.append(segment_file) + + generate_video_btn = gr.Button("Generate Video", variant="primary") + + + # --- Final Video Output --- + final_video_output = gr.Video(label="Generated Video", visible=False) # Initially hidden + + # --- Event Handlers --- + + # Generate Script Button Click + # Outputs list must match the order of components being updated by yield in generate_script_and_show_editor + generate_script_btn.click( + fn=generate_script_and_show_editor, + inputs=[ + user_concept_input, + resolution_radio, + caption_enabled_radio, + caption_color_picker, + caption_size_slider, + caption_position_radio, + caption_bg_color_picker, + caption_stroke_color_picker, + caption_stroke_width_slider, + bg_music_upload # Added music upload input here + ], + outputs=[ + run_config_state, # 0 + status_output, # 1 + editing_area, # 2 Show/hide editing area column + final_video_output, # 3 Hide and clear video output + script_preview_markdown, # 4 Update raw script preview + # Outputs for dynamic components (visibility and value updates) - Indices 5 onwards + *segment_text_inputs, # 5 ... + *segment_file_inputs, # ... + *segment_editing_groups, # ... + *segment_prompt_labels, # ... Update prompt labels visibility and value + segments_state, # LAST - Update the state with parsed segments + ] + ) + + # Generate Video Button Click + # Inputs must match the definition of generate_video_from_edited generate_video_btn.click( - fn=process_video, - inputs=[clips_state, target_res_state] + text_inputs.value + [gr.File(type="filepath")] * len(text_inputs.value) + [show_captions, text_color, bg_color, transparency, stroke_width, stroke_color, bg_volume], - outputs=video_output + fn=generate_video_from_edited, + inputs=[ + run_config_state, # 1st arg + segments_state, # 2nd arg + # All segment text and file inputs will be collected by *dynamic_segment_inputs + *segment_text_inputs, + *segment_file_inputs, + bg_music_volume_slider # Last arg + ], + outputs=[status_output, final_video_output] # Yield status updates and final video ) -iface.launch(share=True) \ No newline at end of file + # We don't need a segments_state.change JS handler anymore because the prompt labels + # are updated directly by the Python function via the outputs list. + # Removing the segments_state.change event listener entirely. + + +# Launch the interface +if __name__ == "__main__": + # Attempt ImageMagick policy fix on script startup + # This helps but might still require manual sudo depending on system config + fix_imagemagick_policy() + + print("Launching Gradio interface...") + + # Check if API keys are still placeholders (unlikely with hardcoded keys, but good practice) + if PEXELS_API_KEY.startswith('YOUR_PEXELS_API_KEY'): + print("Warning: PEXELS_API_KEY is not configured. Media search may fail.") + if OPENROUTER_API_KEY.startswith('YOUR_OPENROUTER_API_KEY'): + print("Warning: OPENROUTER_API_KEY is not configured. Script generation will fail.") + + demo.launch(share=True) # Set share=True to get a public link