diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -1,19 +1,20 @@ -# --- Import necessary libraries --- -import gradio as gr +# Import necessary libraries (Ensure all are installed: moviepy, soundfile, torch, +# pydub, requests, pillow, numpy, beautifulsoup4, gtts, gradio, kokoro, opencv-python) + +from kokoro import KPipeline + +import soundfile as sf +import torch + +import soundfile as sf import os -import shutil +from moviepy.editor import VideoFileClip, AudioFileClip, ImageClip +from PIL import Image import tempfile -import time -import re import random -import math -import requests -import io -import uuid # For unique IDs -import traceback # For detailed error printing -import numpy as np -from PIL import Image, ImageDraw, ImageFont import cv2 +import math +import os, requests, io, time, re, random from moviepy.editor import ( VideoFileClip, concatenate_videoclips, AudioFileClip, ImageClip, CompositeVideoClip, TextClip, CompositeAudioClip @@ -22,1611 +23,1566 @@ import moviepy.video.fx.all as vfx import moviepy.config as mpy_config from pydub import AudioSegment from pydub.generators import Sine -import soundfile as sf -import torch # Assuming Kokoro needs it -from kokoro import KPipeline # Kokoro TTS + +from PIL import Image, ImageDraw, ImageFont +import numpy as np from bs4 import BeautifulSoup +import base64 from urllib.parse import quote +# pysrt is imported but not used in the provided code snippets, keeping for completeness +# import pysrt from gtts import gTTS -from functools import partial # For event handlers +import gradio as gr # Import Gradio +import shutil # Needed for temp folder cleanup -# --- Initialize Kokoro TTS Pipeline --- -# Ensure this is done safely (e.g., check if already initialized if run multiple times) +# Initialize Kokoro TTS pipeline (using American English) +# Ensure you have the required voice models downloaded for Kokoro if needed, +# or it will fall back to gTTS. 'a' for American English uses voice 'af_heart'. try: - # Use American English voice provided by Kokoro library example - pipeline = KPipeline(lang_code='a') # 'a' often corresponds to American English variant - print("Kokoro TTS Pipeline Initialized.") + pipeline = KPipeline(lang_code='a') + print("Kokoro TTS pipeline initialized.") except Exception as e: - print(f"Error initializing Kokoro TTS: {e}. TTS functionality might be limited.") + print(f"Warning: Could not initialize Kokoro TTS pipeline: {e}. Will rely on gTTS.") pipeline = None # Set pipeline to None if initialization fails -# --- Configuration --- -PEXELS_API_KEY = 'BhJqbcdm9Vi90KqzXKAhnEHGsuFNv4irXuOjWtT761U49lRzo03qBGna' # Replace with your actual key -OPENROUTER_API_KEY = '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' # Replace with your actual key -OPENROUTER_MODEL = "mistralai/mistral-small" # Use a reliable model -OUTPUT_VIDEO_FILENAME_BASE = "ai_docu_video" +# Ensure ImageMagick binary is set (Adjust path as needed for your system) +# This line requires imagemagick to be installed and the path correct. +# If TextClip fails, check ImageMagick installation and policy.xml (handled by fix_imagemagick_policy). +mpy_config.change_settings({"IMAGEMAGICK_BINARY": "/usr/bin/convert"}) + +# ---------------- Global Configuration (Some now used as defaults/initial values) ---------------- # +# Replace with your actual keys +PEXELS_API_KEY = os.environ.get('PEXELS_API_KEY', 'YOUR_PEXELS_API_KEY') # Use env var or placeholder +OPENROUTER_API_KEY = os.environ.get('OPENROUTER_API_KEY', 'YOUR_OPENROUTER_API_KEY') # Use env var or placeholder +OPENROUTER_MODEL = "mistralai/mistral-small-3.1-24b-instruct:free" # Or another preferred model +OUTPUT_VIDEO_FILENAME = "final_video.mp4" USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" -# Optional: Set ImageMagick binary if moviepy doesn't find it automatically -# try: -# # Check common paths or allow environment variable override -# imagemagick_path = os.environ.get("IMAGEMAGICK_BINARY", "/usr/bin/convert") # Example path -# if os.path.exists(imagemagick_path): -# mpy_config.change_settings({"IMAGEMAGICK_BINARY": imagemagick_path}) -# print(f"ImageMagick binary set to: {imagemagick_path}") -# else: -# print(f"Warning: ImageMagick binary not found at {imagemagick_path}. Text rendering might use defaults.") -# except Exception as e: -# print(f"Warning: Error configuring ImageMagick: {e}") - -# --- Helper Functions (Refactored for Parameters & Temp Dir) --- -def fix_imagemagick_policy(): - """Attempts to fix common ImageMagick security policy issues for caption rendering.""" - # This function might require sudo privileges and is OS-dependent. Use with caution. - policy_paths = [ - "/etc/ImageMagick-6/policy.xml", - "/etc/ImageMagick-7/policy.xml", - "/etc/ImageMagick/policy.xml", - "/usr/local/etc/ImageMagick-7/policy.xml" - ] - found_policy = next((path for path in policy_paths if os.path.exists(path)), None) - if not found_policy: - print("ImageMagick policy.xml not found in common locations. Skipping policy fix.") - return False +# Maximum number of script segments to display for editing +MAX_SEGMENTS_FOR_EDITING = 15 - print(f"Attempting to modify ImageMagick policy at: {found_policy}") - print("NOTE: This may require administrative privileges (sudo).") - # Use simpler patterns that are more likely to work across versions - commands = [ - f"sudo sed -i.bak 's/rights=\"none\" pattern=\"PS\"/rights=\"read|write\" pattern=\"PS\"/' {found_policy}", - f"sudo sed -i 's/rights=\"none\" pattern=\"EPS\"/rights=\"read|write\" pattern=\"EPS\"/' {found_policy}", - f"sudo sed -i 's/rights=\"none\" pattern=\"PDF\"/rights=\"read|write\" pattern=\"PDF\"/' {found_policy}", - f"sudo sed -i 's/rights=\"none\" pattern=\"XPS\"/rights=\"read|write\" pattern=\"XPS\"/' {found_policy}", - # Allow reading/writing paths - adjust pattern if needed - f"sudo sed -i 's///' {found_policy}", - # Less aggressive version for path, allows reading system paths if needed by fonts etc. - f"sudo sed -i 's///' {found_policy}" - ] - success = True - for cmd in commands: - print(f"Executing: {cmd}") - try: - # Use os.system - requires user interaction for sudo password if needed - # Consider subprocess for better control if running non-interactively - exit_code = os.system(cmd) - if exit_code != 0: - print(f"Command failed with exit code {exit_code}. Policy might not be fully updated.") - # Don't necessarily set success to False, some might fail harmlessly - except Exception as e: - print(f"Error executing command: {e}") - success = False # Mark as failed if command execution raises error +# Global placeholder for the temporary folder, will be created per run +TEMP_FOLDER = None - if success: - print("ImageMagick policy modifications attempted. Restart application if issues persist.") - else: - print("Some policy modification commands failed. Text rendering might be affected.") - return success +# ---------------- Helper Functions ---------------- # +# Most helper functions remain the same, but some might need minor adjustments +# to use passed parameters instead of global state (e.g., resolution). +# We'll update create_clip and add_background_music accordingly. + +def generate_script(user_input, api_key, model): + """Generate documentary script with proper OpenRouter handling.""" + if not api_key or api_key == 'YOUR_OPENROUTER_API_KEY': + print("OpenRouter API key not set. Skipping script generation.") + return "[Error] API key not configured." -def generate_script(user_input): - """Generate documentary script using OpenRouter API.""" - print("[Function] generate_script called") headers = { - 'Authorization': f'Bearer {OPENROUTER_API_KEY}', - 'HTTP-Referer': 'https://your-app-domain.com', # Replace with your actual domain/app name - 'X-Title': 'AI Documentary Maker - Dynamic' + 'Authorization': f'Bearer {api_key}', + 'HTTP-Referer': 'https://your-domain.com', # Replace with your app's domain + 'X-Title': 'AI Documentary Maker' } - # Use the detailed prompt provided by the user - prompt = f"""Short Documentary Script Generator Instructions: -If I say "use this," just output the script exactly as I gave it. + # Updated prompt based on user's specific requirements for tone and style + prompt = f"""Short Documentary Script Generator + +Instructions: +If I say "use this," output the script exactly as given. If I only give topics, generate a script based on them. -If I provide a full script, rewrite it without any changes. Make everything short simple and humorous funny and act as serious but humorous. And don't say anything off topic. Also always say a funny statement to subscribe based on the video topic at the end. Use normal conversational text like a normal person talking and avoid AI phase make the statements humanize and normal conversational -And always talk negative views don't really want any positive statement and the full script will be one video about one topic so keep in mind -Formatting Rules: +If I provide a full script, rewrite it without any changes. +Make everything short, simple, humorous, funny, and act serious but be humorous. +Don't say anything off topic. +Always end with a funny call to subscribe based on the video topic. +Use normal conversational text, avoid AI phrases, humanize the statements. +Always focus on negative or cynical views, avoid positive statements. +The full script should be for a single video about one topic. +Formatting Rules: Title in Square Brackets: Each section starts with a one-word title inside [ ] (max two words if necessary). This title will be used as a search term for Pexels footage. -Casual & Funny Narration: Each section has 5-10 words of narration. Keep it natural, funny, and unpredictable (not robotic, poetic, or rhythmic). -No Special Formatting: No bold, italics, or special characters. You are an assistant AI your task is to create script. You aren't a chatbot. So, don't write extra text +Casual & Funny Narration: Each section has 5-15 words of narration. Keep it natural, funny, and unpredictable. +No Special Formatting: No bold, italics, or special characters. Generalized Search Terms: If a term is too specific, make it more general for Pexels search. Scene-Specific Writing: Each section describes only what should be shown in the video. -Output Only the Script, and also make it funny and humorous and hilarious and also add to subscribe with a funny statement like subscribe now or ..... +Output Only the Script, make it funny/humorous/hilarious, and add a funny subscribe statement. No extra text, just the script. Example Output: -[North Korea] -Top 5 unknown facts about North Korea. -[Invisibility] -North Korea’s internet speed is so fast… it doesn’t exist. -[Leadership] -Kim Jong-un once won an election with 100% votes… against himself. -[Magic] -North Korea discovered time travel. That’s why their news is always from the past. +[Cats] +They plot world domination while napping. +[Dogs] +Loyalty is just a bribe for snacks. +[Humans] +The only species that pays to live on a planet they destroy. +[Future] +It looks suspiciously like the present, but with more screens. [Warning] -Subscribe now, or Kim Jong-un will send you a free one-way ticket… to North Korea. -[Freedom] -North Korean citizens can do anything… as long as it's government-approved. +Subscribe or a cat will steal your bandwidth. Now here is the Topic/script: {user_input} """ + data = { - 'model': OPENROUTER_MODEL, + 'model': model, 'messages': [{'role': 'user', 'content': prompt}], - 'temperature': 0.5, # Slightly increased for more creative/funny results - 'max_tokens': 1024 # Increased max tokens for potentially longer scripts + 'temperature': 0.7, # Increased temperature slightly for more unpredictable humor + 'max_tokens': 500 # Limit token response to keep scripts short } + try: response = requests.post( 'https://openrouter.ai/api/v1/chat/completions', - headers=headers, json=data, timeout=60 # Increased timeout + headers=headers, + json=data, + timeout=45 # Increased timeout ) + response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) response_data = response.json() if 'choices' in response_data and len(response_data['choices']) > 0: - script_content = response_data['choices'][0]['message']['content'] - # Basic cleanup: remove potential leading/trailing whitespace/newlines - script_content = script_content.strip() - print("--- Generated Script ---") - print(script_content) - print("-----------------------") - # Check if script seems empty or just contains formatting noise - if not script_content or len(script_content) < 10 or script_content.count('[') == 0: - print("Warning: Generated script seems empty or invalid.") - return f"Error: Generated script was empty or invalid. Raw response: {script_content}" - return script_content + script_text = response_data['choices'][0]['message']['content'] + # Basic post-processing to remove potential markdown code blocks + if script_text.startswith("```") and script_text.endswith("```"): + script_text = script_text[script_text.find('\n')+1:script_text.rfind('\n')].strip() + return script_text else: - print("API Error: Unexpected response format:", response_data) - return "Error: Could not generate script (unexpected format)." + print("Unexpected response format:", response_data) + return "[Error] Unexpected API response format." except requests.exceptions.RequestException as e: print(f"API Request failed: {str(e)}") - # Provide more specific error if possible - error_message = f"Error: Could not generate script (Request failed: {e})." - if isinstance(e, requests.exceptions.Timeout): - error_message = "Error: Could not generate script (API request timed out)." - elif isinstance(e, requests.exceptions.HTTPError): - error_message = f"Error: Could not generate script (API Error {e.response.status_code}: {e.response.text})." - return error_message + return f"[Error] API request failed: {str(e)}" except Exception as e: - print(f"Unexpected error during script generation: {e}") - print(traceback.format_exc()) - return f"Error: An unexpected error occurred during script generation: {e}" + print(f"An unexpected error occurred during script generation: {e}") + return f"[Error] An unexpected error occurred: {str(e)}" + def parse_script(script_text): - """Parse the generated script into media and TTS elements with segment IDs.""" - print("[Function] parse_script called") - elements = [] - segment_id_counter = 0 + """ + Parse the generated script into a list of segment dictionaries. + Each dictionary includes original prompt, narration text, estimated duration, and placeholder for uploaded media. + Handles potential API errors returned as strings. + """ + if script_text.startswith("[Error]"): + print(f"Skipping parse due to script generation error: {script_text}") + return [] + + segments = [] current_title = None - current_narration = "" - - lines = script_text.splitlines() - - for i, line in enumerate(lines): - line = line.strip() - if not line: # Skip empty lines - continue - - match = re.match(r'^\[(.*?)\](.*)', line) # Match [Title] Optional Text - - if match: - # If we have a pending title/narration, save it before starting new one - if current_title is not None and current_narration: - segment_id = f"seg_{segment_id_counter}" - media_element = {"type": "media", "prompt": current_title, "segment_id": segment_id} - # Estimate duration based on narration words - words = current_narration.split() - duration = max(2.0, min(15.0, len(words) * 0.45 + 0.5)) # Base duration + per word, capped - tts_element = {"type": "tts", "text": current_narration, "duration": duration, "segment_id": segment_id} - elements.append(media_element) - elements.append(tts_element) - segment_id_counter += 1 - print(f" -> Parsed segment {segment_id}: '{current_title}' / '{current_narration[:30]}...'") - - # Start the new segment - current_title = match.group(1).strip() - current_narration = match.group(2).strip() # Text on the same line - - elif current_title is not None: - # This line is part of the narration for the current title - current_narration += (" " + line) if current_narration else line # Add space only if needed - - # Add the very last segment after the loop finishes - if current_title is not None and current_narration: - segment_id = f"seg_{segment_id_counter}" - media_element = {"type": "media", "prompt": current_title, "segment_id": segment_id} - words = current_narration.split() - duration = max(2.0, min(15.0, len(words) * 0.45 + 0.5)) - tts_element = {"type": "tts", "text": current_narration, "duration": duration, "segment_id": segment_id} - elements.append(media_element) - elements.append(tts_element) - print(f" -> Parsed segment {segment_id}: '{current_title}' / '{current_narration[:30]}...'") - - if not elements: - print("Warning: Script parsing resulted in zero elements.") - else: - print(f"Parsed into {len(elements)} elements ({len(elements)//2} segments)") - return elements - -def search_pexels(query, api_key, search_type="videos", per_page=10, orientation="landscape"): - """Search Pexels API for videos or photos.""" - base_url = f"https://api.pexels.com/{search_type}/search" - headers = {'Authorization': api_key} - params = {"query": query, "per_page": per_page, "orientation": orientation} - max_retries = 3 + current_text = "" + + try: + lines = script_text.strip().splitlines() + if not lines: + print("Script text is empty.") + return [] + + for line in lines: + line = line.strip() + if line.startswith("[") and "]" in line: + bracket_start = line.find("[") + bracket_end = line.find("]", bracket_start) + if bracket_start != -1 and bracket_end != -1: + if current_title is not None and current_text.strip(): + # Estimate duration based on word count (adjust factor as needed) + duration = max(2.0, len(current_text.split()) * 0.4) # Minimum 2s, approx 0.4s per word + segments.append({ + "original_prompt": current_title.strip(), + "text": current_text.strip(), + "duration": duration, + "uploaded_media": None # Placeholder for user uploaded file path + }) + current_title = line[bracket_start+1:bracket_end].strip() + current_text = line[bracket_end+1:].strip() + elif current_title: # Append text if no new title found but currently parsing + current_text += line + " " + elif current_title: # Append text to the current segment + current_text += line + " " + + # Add the last segment + if current_title is not None and current_text.strip(): + duration = max(2.0, len(current_text.split()) * 0.4) + segments.append({ + "original_prompt": current_title.strip(), + "text": current_text.strip(), + "duration": duration, + "uploaded_media": None + }) + + # Limit segments to MAX_SEGMENTS_FOR_EDITING + if len(segments) > MAX_SEGMENTS_FOR_EDITING: + print(f"Warning: Script generated {len(segments)} segments, limiting to {MAX_SEGMENTS_FOR_EDITING} for editing.") + segments = segments[:MAX_SEGMENTS_FOR_EDITING] + + print(f"Parsed {len(segments)} segments.") + return segments + except Exception as e: + print(f"Error parsing script: {e}") + return [] + +# Pexels and Google Image search and download functions remain unchanged +def search_pexels_videos(query, pexels_api_key): + """Search for a video on Pexels by query and return a random HD video.""" + if not pexels_api_key or pexels_api_key == 'YOUR_PEXELS_API_KEY': + print("Pexels API key not set. Skipping video search.") + return None + headers = {'Authorization': pexels_api_key} + base_url = "https://api.pexels.com/videos/search" + num_pages = 3 + videos_per_page = 15 + max_retries = 2 # Reduced retries for faster failure retry_delay = 1 - print(f"Searching Pexels {search_type} for: '{query}' (Orientation: {orientation})") + search_query = query + all_videos = [] + + for page in range(1, num_pages + 1): + for attempt in range(max_retries): + try: + params = {"query": search_query, "per_page": videos_per_page, "page": page} + response = requests.get(base_url, headers=headers, params=params, timeout=10) + + if response.status_code == 200: + data = response.json() + videos = data.get("videos", []) + if not videos: break # No videos on this page + for video in videos: + video_files = video.get("video_files", []) + for file in video_files: + # Prioritize HD, fall back to SD if no HD found + if file.get("quality") == "hd": + all_videos.append(file.get("link")) + break # Found HD, move to next video + elif file.get("quality") == "sd": # Add SD as fallback + all_videos.append(file.get("link")) # Don't break, keep looking for HD + + # After checking all files for a video, if HD was added, break inner loop + # If not, continue to next attempt if needed, otherwise break attempt loop + if any(link for link in all_videos if 'hd' in link.lower()): # Simple check if HD was added + break # Found some HD videos, move to next page or finish + + elif response.status_code == 429: + print(f"Pexels rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay}s...") + time.sleep(retry_delay) + retry_delay *= 2 + else: + print(f"Pexels video search error {response.status_code}: {response.text}") + break # Non-recoverable error or too many retries + + except requests.exceptions.RequestException as e: + print(f"Pexels video request exception (attempt {attempt+1}/{max_retries}): {e}") + if attempt < max_retries - 1: + time.sleep(retry_delay) + retry_delay *= 2 + else: + break # Too many retries + + if not videos and page > 1: break # If no videos found on subsequent pages, stop. + + + if all_videos: + # Try to pick an HD video if available, otherwise pick any + hd_videos = [link for link in all_videos if 'hd' in link.lower()] + if hd_videos: + random_video = random.choice(hd_videos) + print(f"Selected random HD video from {len(hd_videos)} options.") + else: + random_video = random.choice(all_videos) + print(f"Selected random SD video from {len(all_videos)} options (no HD found).") + return random_video + else: + print("No suitable videos found after searching all pages.") + return None + + +def search_pexels_images(query, pexels_api_key): + """Search for an image on Pexels by query.""" + if not pexels_api_key or pexels_api_key == 'YOUR_PEXELS_API_KEY': + print("Pexels API key not set. Skipping image search.") + return None + headers = {'Authorization': pexels_api_key} + url = "https://api.pexels.com/v1/search" + params = {"query": query, "per_page": 15, "orientation": "landscape"} # Increased per_page + + max_retries = 2 + retry_delay = 1 for attempt in range(max_retries): try: - response = requests.get(base_url, headers=headers, params=params, timeout=15) - response.raise_for_status() # Check for HTTP errors - - data = response.json() - results = data.get("videos" if search_type == "videos" else "photos", []) - - if not results: - print(f"No Pexels {search_type} found for '{query}' on attempt {attempt+1}.") - # Optionally try modifying query slightly on retries (e.g., remove pluralization) - # if attempt == 0 and query.endswith('s'): params['query'] = query[:-1] - # else: break # Stop if no results after modification or first try - break # Keep it simple: stop if no results - - print(f"Found {len(results)} Pexels {search_type} results.") - return results # Return the list of results - - except requests.exceptions.HTTPError as e: - if e.response.status_code == 429: # Rate limit - print(f"Pexels API rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay}s...") - time.sleep(retry_delay) - retry_delay *= 2 # Exponential backoff - elif e.response.status_code == 400 and 'invalid query' in e.response.text.lower(): - print(f"Pexels API Error: Invalid query '{query}'. Skipping.") - return [] # Return empty list for invalid query + response = requests.get(url, headers=headers, params=params, timeout=10) + + if response.status_code == 200: + data = response.json() + photos = data.get("photos", []) + if photos: + # Choose from the top results + photo = random.choice(photos[:min(10, len(photos))]) + img_url = photo.get("src", {}).get("original") + print(f"Found {len(photos)} images, selected one.") + return img_url + else: + print(f"No images found for query: {query} on Pexels.") + return None + + elif response.status_code == 429: + print(f"Pexels rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay}s...") + time.sleep(retry_delay) + retry_delay *= 2 else: - print(f"Pexels API HTTP Error {e.response.status_code}: {e.response.text} (attempt {attempt+1}/{max_retries})") - if attempt < max_retries - 1: - time.sleep(retry_delay) - retry_delay *= 2 - else: - print("Max retries reached for HTTP error.") - return [] # Failed after retries + print(f"Pexels image search error {response.status_code}: {response.text}") + break # Non-recoverable error or too many retries + except requests.exceptions.RequestException as e: - print(f"Pexels API Request Exception: {e} (attempt {attempt+1}/{max_retries})") + print(f"Pexels image request exception (attempt {attempt+1}/{max_retries}): {e}") if attempt < max_retries - 1: time.sleep(retry_delay) retry_delay *= 2 else: - print("Max retries reached for request exception.") - return [] # Failed after retries + break # Too many retries - print(f"Pexels search failed for '{query}' after {max_retries} attempts.") - return [] # Return empty list if search fails completely - -def search_google_images(query, temp_dir): - """Search Google Images and attempt to download the first few valid results.""" - print(f"Searching Google Images for: '{query}'") - search_url = f"https://www.google.com/search?q={quote(query)}&tbm=isch&safe=active" # Added safe search - headers = {"User-Agent": USER_AGENT} - downloaded_path = None + print(f"No Pexels images found for query: {query} after all attempts") + return None +def search_google_images(query): + """Search for images on Google Images (fallback/news)""" try: - response = requests.get(search_url, headers=headers, timeout=10) + # Using a simple text search method; dedicated Google Image Search APIs are better but may require setup. + # This is prone to breaking if Google changes its HTML structure. + search_url = f"https://www.google.com/search?q={quote(query)}&tbm=isch" + headers = {"User-Agent": USER_AGENT} + print(f"Searching Google Images for: {query}") + response = requests.get(search_url, headers=headers, timeout=15) response.raise_for_status() soup = BeautifulSoup(response.text, "html.parser") + # Find img tags, look for src attributes + # This is a very fragile parsing method, might need adjustment img_tags = soup.find_all("img") image_urls = [] + # Look for src attributes that start with http and aren't data URIs or specific gstatic patterns + # This is a heuristic and might grab incorrect URLs for img in img_tags: - src = img.get("src") or img.get("data-src") # Try both src and data-src - if src and src.startswith("http") and "gstatic.com/images" not in src: # Filter out base64/gstatic - image_urls.append(src) - - print(f"Found {len(image_urls)} potential image URLs from Google.") - - # Try downloading the first few valid URLs - for i, url in enumerate(image_urls[:5]): # Try top 5 - safe_prompt = re.sub(r'[^\w\s-]', '', query).strip().replace(' ', '_') - filename = os.path.join(temp_dir, f"gimg_{safe_prompt}_{uuid.uuid4().hex[:6]}.jpg") - print(f"Attempting download from Google Images URL #{i+1}: {url[:80]}...") - downloaded_path = download_image(url, filename) - if downloaded_path: - print(f"Successfully downloaded Google Image to: {os.path.basename(downloaded_path)}") - return downloaded_path # Return the first one that works - else: - print("Download/validation failed for this URL.") - time.sleep(0.2) # Small delay before next attempt + src = img.get("src", "") + if src.startswith("http") and "encrypted" not in src and "base64" not in src: # Basic filtering + image_urls.append(src) + elif img.get("data-src", "").startswith("http"): # Some sites use data-src + image_urls.append(img.get("data-src", "")) - except requests.exceptions.RequestException as e: - print(f"Error during Google Images search request: {e}") + + # Filter out potential tiny icons or invalid URLs + valid_image_urls = [url for url in image_urls if url and "gstatic" not in url and url.split('.')[-1].lower() in ['jpg', 'jpeg', 'png', 'gif', 'bmp']] + + if valid_image_urls: + print(f"Found {len(valid_image_urls)} potential Google Images, picking one.") + return random.choice(valid_image_urls[:min(10, len(valid_image_urls))]) + else: + print(f"No valid Google Images found for query: {query}") + return None except Exception as e: - print(f"Error parsing Google Images results or downloading: {e}") - print(traceback.format_exc()) + print(f"Error in Google Images search: {e}") + return None - print(f"Google Images search/download failed for query: {query}") - return None -def download_media(url, filename, media_type="image"): - """Download image or video from URL with error handling and validation.""" - print(f"Downloading {media_type}: {url[:80]}... -> {os.path.basename(filename)}") - headers = {"User-Agent": USER_AGENT} +def download_image(image_url, filename): + """Download an image from a URL to a local file with enhanced error handling.""" + if not image_url: + print("No image URL provided for download.") + return None + try: - response = requests.get(url, headers=headers, stream=True, timeout=30) # Increased timeout for videos + headers = {"User-Agent": USER_AGENT} + print(f"Attempting to download image from: {image_url}") + response = requests.get(image_url, headers=headers, stream=True, timeout=20) # Increased timeout response.raise_for_status() + # Check content type before saving + content_type = response.headers.get('Content-Type', '') + if not content_type.startswith('image/'): + print(f"URL did not return an image Content-Type ({content_type}). Skipping download.") + return None + + # Ensure the directory exists + os.makedirs(os.path.dirname(filename), exist_ok=True) + with open(filename, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) - print(f"{media_type.capitalize()} downloaded successfully.") - # Validate image files - if media_type == "image": - try: - with Image.open(filename) as img: - img.verify() # Check if Pillow can read metadata - # Re-open to check format and convert if necessary - with Image.open(filename) as img: - if img.format in ['JPEG', 'PNG', 'WEBP']: # Common formats - if img.mode != 'RGB': - print(f"Converting image {os.path.basename(filename)} to RGB") - # Create a new filename for the converted image - name, ext = os.path.splitext(filename) - rgb_filename = f"{name}_rgb{ext}" - img.convert('RGB').save(rgb_filename) - # Optionally remove the original non-RGB file - # try: os.remove(filename) except OSError: pass - filename = rgb_filename # Use the new RGB file path - print(f"Image validated ({img.format}, {img.mode}). Path: {os.path.basename(filename)}") - return filename - else: - print(f"Warning: Downloaded image format ({img.format}) might not be ideal. Attempting conversion.") - name, ext = os.path.splitext(filename) - jpg_filename = f"{name}_converted.jpg" - try: - img.convert('RGB').save(jpg_filename) - # try: os.remove(filename) except OSError: pass - filename = jpg_filename - print(f"Image converted to JPG. Path: {os.path.basename(filename)}") - return filename - except Exception as conv_err: - print(f"Error converting image to JPG: {conv_err}. Keeping original.") - # Fallback: Try returning original if conversion failed but it opened - return filename - - except (IOError, SyntaxError, Exception) as e_validate: - print(f"Downloaded file is not a valid image or processing failed: {e_validate}") - try: os.remove(filename) # Clean up invalid file - except OSError: pass - return None - elif media_type == "video": - # Basic video validation (can be expanded with ffprobe if needed) - if os.path.getsize(filename) < 1024: # Check if file size is suspiciously small - print("Warning: Downloaded video file is very small. May be invalid.") - # Keep it for now, moviepy will likely fail later if invalid - print(f"Video downloaded. Path: {os.path.basename(filename)}") - return filename # Assume valid for now + print(f"Potential image downloaded to: {filename}") + + # Validate and process the image + try: + img = Image.open(filename) + img.verify() # Verify it's an image file + img = Image.open(filename) # Re-open after verify + if img.mode != 'RGB': + img = img.convert('RGB') + img.save(filename) + print(f"Image validated and converted to RGB: {filename}") + return filename + except Exception as e_validate: + print(f"Downloaded file is not a valid image or processing failed: {e_validate}") + if os.path.exists(filename): + os.remove(filename) # Clean up invalid file + return None except requests.exceptions.RequestException as e_download: - print(f"{media_type.capitalize()} download error: {e_download}") - if os.path.exists(filename): try: os.remove(filename) - except OSError: pass + print(f"Image download error for {image_url}: {e_download}") + if os.path.exists(filename): + os.remove(filename) # Clean up partially downloaded file return None except Exception as e_general: - print(f"General error during {media_type} download/processing: {e_general}") - print(traceback.format_exc()) - if os.path.exists(filename): try: os.remove(filename) - except OSError: pass + print(f"General error during image download/processing: {e_general}") + if os.path.exists(filename): + os.remove(filename) # Clean up if needed return None - return None # Should not be reached ideally +def download_video(video_url, filename): + """Download a video from a URL to a local file.""" + if not video_url: + print("No video URL provided for download.") + return None + try: + headers = {"User-Agent": USER_AGENT} # Some sites block direct downloads + print(f"Attempting to download video from: {video_url}") + response = requests.get(video_url, stream=True, timeout=45) # Increased timeout for videos + response.raise_for_status() -# Keep original download_image/video for compatibility if needed, but prefer download_media -def download_image(url, filename): - return download_media(url, filename, media_type="image") + # Check content type + content_type = response.headers.get('Content-Type', '') + if not content_type.startswith('video/'): + print(f"URL did not return a video Content-Type ({content_type}). Skipping download.") + return None -def download_video(url, filename): - return download_media(url, filename, media_type="video") + os.makedirs(os.path.dirname(filename), exist_ok=True) + with open(filename, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) -def generate_media(prompt, temp_dir, video_preference_ratio=0.3, is_news=False): - """Generate a visual asset (video or image) based on prompt and preferences.""" - safe_prompt = re.sub(r'[^\w\s-]', '', prompt).strip().replace(' ', '_') - print(f"\nGenerating media for prompt: '{prompt}', Video Pref: {video_preference_ratio}, News: {is_news}") - - # --- Strategy --- - # 1. If News: Prioritize Google Images. - # 2. If Video Preferred: Try Pexels Video first. - # 3. Try Pexels Image. - # 4. If Image failed or Video not preferred initially: Try Pexels Video. - # 5. Fallback: Generic Pexels Image search. - - # 1. News Strategy - if is_news: - print("News strategy: Trying Google Images first.") - gimg_path = search_google_images(prompt, temp_dir) - if gimg_path: - return {"path": gimg_path, "asset_type": "image", "source": "google"} + print(f"Video downloaded successfully to: {filename}") + # Basic check if the file seems valid (not just 0 bytes) + if os.path.exists(filename) and os.path.getsize(filename) > 1024: # Check for > 1KB + return filename else: - print("Google Images failed for news prompt, continuing with Pexels...") - - # 2. Video Preferred Strategy - if random.random() < video_preference_ratio: - print("Video preference: Trying Pexels Video first.") - pexel_videos = search_pexels(prompt, PEXELS_API_KEY, search_type="videos") - if pexel_videos: - selected_video_info = random.choice(pexel_videos) - # Find HD link if possible, otherwise take highest quality available - hd_link = next((f['link'] for f in selected_video_info.get('video_files', []) if f.get('quality') == 'hd'), None) - if not hd_link: - best_link = max(selected_video_info.get('video_files', []), key=lambda x: x.get('width', 0) * x.get('height', 0), default=None) - hd_link = best_link['link'] if best_link else None - - if hd_link: - video_file = os.path.join(temp_dir, f"vid_{safe_prompt}_{uuid.uuid4().hex[:6]}.mp4") - downloaded_video = download_video(hd_link, video_file) - if downloaded_video: - return {"path": downloaded_video, "asset_type": "video", "source": "pexels"} - print("Pexels Video (first attempt) failed or no suitable link found.") - - - # 3. Pexels Image Strategy - print("Trying Pexels Image search.") - pexel_images = search_pexels(prompt, PEXELS_API_KEY, search_type="photos") - if pexel_images: - selected_photo_info = random.choice(pexel_images) - # Prefer 'large' or 'original' size - img_url = selected_photo_info.get('src', {}).get('large', selected_photo_info.get('src', {}).get('original')) - if img_url: - image_file = os.path.join(temp_dir, f"img_{safe_prompt}_{uuid.uuid4().hex[:6]}.jpg") - downloaded_image = download_image(img_url, image_file) - if downloaded_image: - return {"path": downloaded_image, "asset_type": "image", "source": "pexels"} - print("Pexels Image search failed.") - - - # 4. Pexels Video (Second Attempt if not tried first or if Image failed) - if not (random.random() < video_preference_ratio): # If video wasn't tried first - print("Trying Pexels Video (second attempt).") - pexel_videos = search_pexels(prompt, PEXELS_API_KEY, search_type="videos") - if pexel_videos: - selected_video_info = random.choice(pexel_videos) - hd_link = next((f['link'] for f in selected_video_info.get('video_files', []) if f.get('quality') == 'hd'), None) - if not hd_link: - best_link = max(selected_video_info.get('video_files', []), key=lambda x: x.get('width', 0) * x.get('height', 0), default=None) - hd_link = best_link['link'] if best_link else None - - if hd_link: - video_file = os.path.join(temp_dir, f"vid_{safe_prompt}_{uuid.uuid4().hex[:6]}.mp4") - downloaded_video = download_video(hd_link, video_file) - if downloaded_video: - return {"path": downloaded_video, "asset_type": "video", "source": "pexels"} - print("Pexels Video (second attempt) failed.") - - - # 5. Fallback Image Strategy - print("All primary searches failed. Trying fallback Pexels image search...") - fallback_terms = ["abstract", "texture", "background", "technology", "nature"] - random.shuffle(fallback_terms) # Try in random order - for term in fallback_terms: - print(f" Fallback search term: '{term}'") - fallback_images = search_pexels(term, PEXELS_API_KEY, search_type="photos", per_page=5) - if fallback_images: - selected_photo_info = random.choice(fallback_images) - img_url = selected_photo_info.get('src', {}).get('large', selected_photo_info.get('src', {}).get('original')) - if img_url: - fallback_file = os.path.join(temp_dir, f"fallback_{term}_{uuid.uuid4().hex[:6]}.jpg") - downloaded_fallback = download_image(img_url, fallback_file) - if downloaded_fallback: - print(f"Using fallback image '{term}'.") - return {"path": downloaded_fallback, "asset_type": "image", "source": "pexels_fallback"} - time.sleep(0.5) # Avoid hitting rate limits rapidly on fallback - - print(f"ERROR: Failed to generate any visual asset for prompt: {prompt}") - # Create a placeholder black image as a last resort? - try: - placeholder_path = os.path.join(temp_dir, f"placeholder_{uuid.uuid4().hex[:6]}.png") - img = Image.new('RGB', (640, 360), color = 'black') # Small black image - draw = ImageDraw.Draw(img) - draw.text((10, 10), f"Media Failed\n'{prompt[:50]}...'", fill="white") - img.save(placeholder_path) - print("Using placeholder black image.") - return {"path": placeholder_path, "asset_type": "image", "source": "placeholder"} - except Exception as placeholder_err: - print(f"Failed to create placeholder image: {placeholder_err}") - return None # Absolute failure - -def generate_tts(text, temp_dir, voice='en', use_kokoro=True): - """Generate TTS using Kokoro, falling back to gTTS or silence.""" - safe_text = re.sub(r'[^\w\s-]', '', text[:20]).strip().replace(' ', '_') - file_path = os.path.join(temp_dir, f"tts_{safe_text}_{uuid.uuid4().hex[:6]}.wav") - print(f"Generating TTS for: '{text[:40]}...'") - - # Try Kokoro first if enabled and available - if use_kokoro and pipeline: + print(f"Downloaded video file {filename} is too small or empty.") + if os.path.exists(filename): + os.remove(filename) + return None + + except requests.exceptions.RequestException as e: + print(f"Video download error for {video_url}: {e}") + if os.path.exists(filename): + os.remove(filename) + return None + except Exception as e_general: + print(f"General error during video download: {e_general}") + if os.path.exists(filename): + os.remove(filename) + return None + + +def generate_media_asset(prompt, uploaded_media_path): + """ + Generate a visual asset (video or image). Prioritizes user upload, + then searches Pexels video, then Pexels image, then Google Image. + Returns a dict: {'path': , 'asset_type': 'video' or 'image'}. + """ + safe_prompt = re.sub(r'[^\w\s-]', '', prompt).strip().replace(' ', '_') + os.makedirs(TEMP_FOLDER, exist_ok=True) # Ensure temp folder exists + + # 1. Use user uploaded media if provided + if uploaded_media_path and os.path.exists(uploaded_media_path): + print(f"Using user uploaded media: {uploaded_media_path}") + file_ext = os.path.splitext(uploaded_media_path)[1].lower() + asset_type = 'video' if file_ext in ['.mp4', '.mov', '.avi', '.webm'] else 'image' + # Copy the user file to temp folder to manage cleanup + temp_user_path = os.path.join(TEMP_FOLDER, f"user_upload_{os.path.basename(uploaded_media_path)}") try: - # Assuming 'af_heart' or similar is the desired American English voice from Kokoro - kokoro_voice_code = 'af_heart' # Adjust if your Kokoro setup uses different codes - generator = pipeline(text, voice=kokoro_voice_code, speed=0.95, split_pattern=r'\n+') # Adjust speed as needed - audio_segments = [audio for _, _, audio in generator] - - if not audio_segments: - raise ValueError("Kokoro TTS returned no audio segments.") - - # Concatenate segments if multiple, ensure numpy array - full_audio = np.concatenate(audio_segments) if len(audio_segments) > 1 else audio_segments[0] - if not isinstance(full_audio, np.ndarray): - raise ValueError("Kokoro output is not a numpy array.") - - # Ensure audio is float32 for soundfile writing if needed - if full_audio.dtype != np.float32: - full_audio = full_audio.astype(np.float32) - # Normalize if necessary after type conversion (e.g., if it was int16) - max_val = np.max(np.abs(full_audio)) - if max_val > 1.0: full_audio /= max_val - - - sf.write(file_path, full_audio, 24000) # Kokoro default sample rate is often 24000 - print(f"TTS audio saved to {os.path.basename(file_path)} (Kokoro)") - return file_path + shutil.copy2(uploaded_media_path, temp_user_path) + print(f"Copied user upload to temp: {temp_user_path}") + return {"path": temp_user_path, "asset_type": asset_type} except Exception as e: - print(f"Error with Kokoro TTS: {e}. Trying gTTS fallback.") - print(traceback.format_exc()) # Print full traceback for Kokoro errors - # Fall through to gTTS + print(f"Error copying user file {uploaded_media_path}: {e}. Falling back to search.") + + + # 2. Search Pexels Videos (25% chance if no user upload) + # Let's slightly increase video search preference when available + if random.random() < 0.4: # Increase video search chance + video_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}_video.mp4") + print(f"Attempting Pexels video search for: {prompt}") + video_url = search_pexels_videos(prompt, PEXELS_API_KEY) + if video_url: + downloaded_video = download_video(video_url, video_file) + if downloaded_video: + print(f"Pexels video asset saved to {downloaded_video}") + return {"path": downloaded_video, "asset_type": "video"} + else: + print(f"Pexels video search failed or found no video for: {prompt}") + + # 3. Search Pexels Images + image_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}.jpg") + print(f"Attempting Pexels image search for: {prompt}") + image_url = search_pexels_images(prompt, PEXELS_API_KEY) + if image_url: + downloaded_image = download_image(image_url, image_file) + if downloaded_image: + print(f"Pexels image asset saved to {downloaded_image}") + return {"path": downloaded_image, "asset_type": "image"} + else: + print(f"Pexels image search failed or found no image for: {prompt}") + + # 4. Fallback: Search Google Images (especially useful for news/specific things Pexels might not have) + print(f"Attempting Google Images fallback for: {prompt}") + google_image_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}_google.jpg") + google_image_url = search_google_images(prompt) + if google_image_url: + downloaded_google_image = download_image(google_image_url, google_image_file) + if downloaded_google_image: + print(f"Google Image asset saved to {downloaded_google_image}") + return {"path": downloaded_google_image, "asset_type": "image"} + else: + print(f"Google Images fallback failed for: {prompt}") - # Fallback to gTTS - try: - print("Using gTTS fallback...") - tts = gTTS(text=text, lang='en', slow=False) - # Save as mp3 first, then convert to wav - mp3_path = os.path.join(temp_dir, f"tts_{safe_text}_{uuid.uuid4().hex[:6]}.mp3") - tts.save(mp3_path) - # Convert mp3 to wav using pydub - audio = AudioSegment.from_mp3(mp3_path) - # Set sample rate to match Kokoro if possible, otherwise use a standard rate - audio = audio.set_frame_rate(24000) - # Export as WAV - audio.export(file_path, format="wav") - os.remove(mp3_path) # Clean up the temporary mp3 file - print(f"Fallback TTS saved to {os.path.basename(file_path)} (gTTS)") - return file_path - except ImportError as ie: - print(f"Error: gTTS or its dependency (pydub/ffmpeg) not installed? {ie}") - print("Skipping gTTS fallback.") - # Fall through to silence - except Exception as fallback_error: - print(f"gTTS fallback also failed: {fallback_error}") - print(traceback.format_exc()) - # Fall through to silence + # 5. Final Fallback: Generic Images if specific search failed + fallback_terms = ["nature", "city", "abstract", "background"] # More generic fallbacks + for term in fallback_terms: + print(f"Trying generic fallback image search with term: {term}") + fallback_file = os.path.join(TEMP_FOLDER, f"fallback_{term}.jpg") + fallback_url = search_pexels_images(term, PEXELS_API_KEY) # Use Pexels for fallbacks + if fallback_url: + downloaded_fallback = download_image(fallback_url, fallback_file) + if downloaded_fallback: + print(f"Generic fallback image saved to {downloaded_fallback}") + return {"path": downloaded_fallback, "asset_type": "image"} + else: + print(f"Generic fallback image download failed for term: {term}") + else: + print(f"Generic fallback image search failed for term: {term}") - # Fallback to silent audio if all TTS methods fail - print("All TTS methods failed. Generating silent audio.") - duration = max(1.0, len(text.split()) * 0.4) # Estimate duration for silence - return generate_silent_audio(duration, temp_dir, sample_rate=24000) -def generate_silent_audio(duration, temp_dir, sample_rate=24000): - """Generate a silent WAV audio file.""" + print(f"Failed to generate any visual asset for prompt: {prompt} after all attempts.") + return None + +def generate_silent_audio(duration, sample_rate=24000): + """Generate a silent WAV audio file lasting 'duration' seconds.""" + print(f"Generating {duration:.2f}s of silent audio.") + num_samples = int(duration * sample_rate) + silence = np.zeros(num_samples, dtype=np.float32) + # Use unique filename to avoid conflicts + silent_path = os.path.join(TEMP_FOLDER, f"silent_{abs(hash(duration)) % (10**8)}_{int(time.time())}.wav") try: - num_samples = int(duration * sample_rate) - silence = np.zeros(num_samples, dtype=np.float32) - silent_path = os.path.join(temp_dir, f"silent_{uuid.uuid4().hex[:6]}.wav") sf.write(silent_path, silence, sample_rate) - print(f"Silent audio generated: {os.path.basename(silent_path)} for {duration:.2f}s") + print(f"Silent audio generated: {silent_path}") return silent_path except Exception as e: print(f"Error generating silent audio: {e}") return None -def apply_kenburns_effect(clip, target_resolution, effect_type="random"): - """Apply Ken Burns effect to an ImageClip.""" - print(f"Applying Ken Burns effect: {effect_type} to image clip") - target_w, target_h = target_resolution - # Ensure clip has dimensions, default if not (shouldn't happen with ImageClip) - clip_w = getattr(clip, 'w', target_w) - clip_h = getattr(clip, 'h', target_h) - if clip_w <= 0 or clip_h <= 0: - print(f"Warning: Invalid clip dimensions ({clip_w}x{clip_h}) for Ken Burns. Using target.") - clip_w, clip_h = target_w, target_h +def generate_tts(text, voice='en'): + """ + Generate TTS audio using Kokoro, falling back to gTTS or silent audio if needed. + Ensures temp folder exists. + """ + os.makedirs(TEMP_FOLDER, exist_ok=True) # Ensure temp folder exists + safe_text_hash = str(abs(hash(text)) % (10**10)) # Use a hash for potentially long text + file_path = os.path.join(TEMP_FOLDER, f"tts_{safe_text_hash}.wav") - clip_aspect = clip_w / clip_h - target_aspect = target_w / target_h + if os.path.exists(file_path): + print(f"Using cached TTS for text hash '{safe_text_hash}'") + return file_path + + target_duration = max(2.0, len(text.split()) * 0.4) # Estimate duration if TTS fails + + if pipeline: + try: + print(f"Attempting Kokoro TTS for text: '{text[:50]}...'") + kokoro_voice = 'af_heart' if voice == 'en' else voice # Kokoro default American English voice + # Kokoro pipeline might return multiple segments for long text + generator = pipeline(text, voice=kokoro_voice, speed=1.0, split_pattern=r'\n+') # Use speed 1.0 + audio_segments = [] + total_duration = 0 + for i, (gs, ps, audio) in enumerate(generator): + audio_segments.append(audio) + total_duration += len(audio) / 24000.0 # Assuming 24000 Hz sample rate + if audio_segments: + full_audio = np.concatenate(audio_segments) if len(audio_segments) > 1 else audio_segments[0] + sf.write(file_path, full_audio, 24000) # Use 24000Hz standard + print(f"TTS audio saved to {file_path} (Kokoro, {total_duration:.2f}s)") + return file_path + else: + print("Kokoro pipeline returned no audio segments.") + + except Exception as e: + print(f"Error with Kokoro TTS: {e}") + # Continue to gTTS fallback - # --- Resize to cover target aspect ratio --- - if clip_aspect > target_aspect: # Image wider than target - new_h = target_h - new_w = int(new_h * clip_aspect) - else: # Image taller than target - new_w = target_w - new_h = int(new_w / clip_aspect) - # Ensure dimensions are at least target size - new_w = max(new_w, target_w) - new_h = max(new_h, target_h) - clip = clip.resize(newsize=(new_w, new_h)) - - # --- Further scale up for movement room --- - scale_factor = 1.15 # How much bigger to make it for panning/zooming - scaled_w = int(new_w * scale_factor) - scaled_h = int(new_h * scale_factor) - # Use ANTIALIAS for potentially better quality on downscale during resize try: - clip = clip.resize(newsize=(scaled_w, scaled_h)) #, resample=Image.Resampling.LANCZOS) # Check Pillow version for LANCZOS - except Exception as resize_err: - print(f"Warning: High-quality resize failed ({resize_err}). Using default.") - clip = clip.resize(newsize=(scaled_w, scaled_h)) + print(f"Falling back to gTTS for text: '{text[:50]}...'") + tts = gTTS(text=text, lang='en', slow=False) # Use standard speed + mp3_path = os.path.join(TEMP_FOLDER, f"tts_{safe_text_hash}.mp3") + tts.save(mp3_path) + audio = AudioSegment.from_mp3(mp3_path) + audio.export(file_path, format="wav") + os.remove(mp3_path) + print(f"Fallback TTS saved to {file_path} (gTTS, {audio.duration_seconds:.2f}s)") + return file_path + except Exception as fallback_error: + print(f"Both TTS methods failed for text: '{text[:50]}...'. Error: {fallback_error}") + # Use the estimated duration for silent audio + return generate_silent_audio(duration=target_duration) +def apply_kenburns_effect(clip, target_resolution, effect_type=None): + """Apply a smooth Ken Burns effect with a single movement pattern.""" + target_w, target_h = target_resolution + clip_aspect = clip.w / clip.h + target_aspect = target_w / target_h - # --- Define effect parameters --- - available_effects = ["zoom-in", "zoom-out", "pan-left", "pan-right", "pan-up", "pan-down", "diag-tl-br", "diag-tr-bl", "static"] - if effect_type == "random" or effect_type not in available_effects: + # Resize clip to fill target resolution while maintaining aspect ratio, then scale up + if clip_aspect > target_aspect: + # Wider than target: match height, scale width + clip = clip.resize(height=target_h) + initial_w, initial_h = clip.size + scale_factor = 1.15 + new_width = int(initial_w * scale_factor) + new_height = int(initial_h * scale_factor) + clip = clip.resize(newsize=(new_width, new_height)) + else: + # Taller than target: match width, scale height + clip = clip.resize(width=target_w) + initial_w, initial_h = clip.size + scale_factor = 1.15 + new_width = int(initial_w * scale_factor) + new_height = int(initial_h * scale_factor) + clip = clip.resize(newsize=(new_width, new_height)) + + max_offset_x = new_width - target_w + max_offset_y = new_height - target_h + + available_effects = ["zoom-in", "zoom-out", "pan-left", "pan-right", "pan-up", "pan-down", "up-left", "down-right"] + if effect_type is None or effect_type == "random": effect_type = random.choice(available_effects) - print(f" -> Selected effect: {effect_type}") - - if effect_type == "static": # Option for no movement - return clip.resize((target_w, target_h)) # Just resize and center + # Define start and end scale factors and positions relative to the scaled image size + # Position is the top-left corner of the target resolution frame within the scaled image + start_scale = 1.0 / (1.15 * 1.0) # Scale is relative to the final cropped size. Let's use position instead. + end_scale = 1.0 / (1.15 * 1.0) - zoom_amount = 0.10 # Percentage zoom - start_zoom, end_zoom = 1.0, 1.0 - # Start/end centers relative to the scaled image - start_cx, start_cy = scaled_w / 2, scaled_h / 2 - end_cx, end_cy = start_cx, start_cy + # Start and end positions of the top-left corner of the target_resolution window + start_x, start_y = 0, 0 + end_x, end_y = 0, 0 + start_zoom_factor = 1.0 + end_zoom_factor = 1.0 if effect_type == "zoom-in": - start_zoom = 1.0 - end_zoom = 1.0 + zoom_amount + start_zoom_factor = 1.0 + end_zoom_factor = 1.15 + # Stay centered + start_x = max_offset_x / 2 + start_y = max_offset_y / 2 + end_x = max_offset_x / 2 + end_y = max_offset_y / 2 elif effect_type == "zoom-out": - start_zoom = 1.0 + zoom_amount - end_zoom = 1.0 + start_zoom_factor = 1.15 + end_zoom_factor = 1.0 + # Stay centered + start_x = max_offset_x / 2 + start_y = max_offset_y / 2 + end_x = max_offset_x / 2 + end_y = max_offset_y / 2 elif effect_type == "pan-left": - start_cx = scaled_w - target_w / 2 # Start right edge - end_cx = target_w / 2 # End left edge + start_x = max_offset_x + start_y = max_offset_y / 2 + end_x = 0 + end_y = max_offset_y / 2 elif effect_type == "pan-right": - start_cx = target_w / 2 # Start left edge - end_cx = scaled_w - target_w / 2 # End right edge + start_x = 0 + start_y = max_offset_y / 2 + end_x = max_offset_x + end_y = max_offset_y / 2 elif effect_type == "pan-up": - start_cy = scaled_h - target_h / 2 # Start bottom edge - end_cy = target_h / 2 # End top edge + start_x = max_offset_x / 2 + start_y = max_offset_y + end_x = max_offset_x / 2 + end_y = 0 elif effect_type == "pan-down": - start_cy = target_h / 2 # Start top edge - end_cy = scaled_h - target_h / 2 # End bottom edge - elif effect_type == "diag-tl-br": # Top-Left to Bottom-Right - start_cx, start_cy = target_w / 2, target_h / 2 - end_cx, end_cy = scaled_w - target_w / 2, scaled_h - target_h / 2 - elif effect_type == "diag-tr-bl": # Top-Right to Bottom-Left - start_cx, start_cy = scaled_w - target_w / 2, target_h / 2 - end_cx, end_cy = target_w / 2, scaled_h - target_h / 2 - - # --- Define the frame transformation function --- + start_x = max_offset_x / 2 + start_y = 0 + end_x = max_offset_x / 2 + end_y = max_offset_y + elif effect_type == "up-left": + start_x = max_offset_x + start_y = max_offset_y + end_x = 0 + end_y = 0 + elif effect_type == "down-right": + start_x = 0 + start_y = 0 + end_x = max_offset_x + end_y = max_offset_y + else: + # Default to pan-right if type is random but somehow invalid + effect_type = 'pan-right' + start_x = 0 + start_y = max_offset_y / 2 + end_x = max_offset_x + end_y = max_offset_y / 2 + + def transform_frame(get_frame, t): - frame = get_frame(t) # Get the frame (full scaled image for ImageClip) - if not isinstance(frame, np.ndarray): frame = np.array(frame) # Ensure numpy array - if frame.ndim == 2: frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR) # Ensure 3 channels - - # Smooth interpolation (ease-in-out) - ratio = t / clip.duration if clip.duration > 0 else 0 - smooth_ratio = 0.5 - 0.5 * math.cos(math.pi * ratio) - - current_zoom = start_zoom + (end_zoom - start_zoom) * smooth_ratio - current_zoom = max(0.1, current_zoom) # Prevent zero/negative zoom - - # Calculate crop size based on target resolution and current zoom - crop_w = int(target_w / current_zoom) - crop_h = int(target_h / current_zoom) - - # Ensure crop dimensions are valid and within the scaled image bounds - if crop_w <= 0 or crop_h <= 0 or crop_w > scaled_w or crop_h > scaled_h: - # Fallback: Center crop with no zoom if calculated size is invalid - print(f"Warning: Invalid Ken Burns crop size ({crop_w}x{crop_h}) at t={t:.2f}. Using fallback.") - crop_w = min(target_w, scaled_w) - crop_h = min(target_h, scaled_h) - center_x = scaled_w / 2 - center_y = scaled_h / 2 - else: - # Interpolate center position - current_cx = start_cx + (end_cx - start_cx) * smooth_ratio - current_cy = start_cy + (end_cy - start_cy) * smooth_ratio + frame = get_frame(t) + # Use a smooth ease-in/ease-out function + progress = t / clip.duration if clip.duration > 0 else 0 + eased_progress = 0.5 - 0.5 * math.cos(math.pi * progress) # Cosine easing + + # Interpolate position + current_x = start_x + (end_x - start_x) * eased_progress + current_y = start_y + (end_y - start_y) * eased_progress - # Clamp center position to keep the crop box within the scaled image bounds - min_cx, max_cx = crop_w / 2, scaled_w - crop_w / 2 - min_cy, max_cy = crop_h / 2, scaled_h - crop_h / 2 - center_x = max(min_cx, min(current_cx, max_cx)) - center_y = max(min_cy, min(current_cy, max_cy)) + # Interpolate zoom (relative to the scaled-up size) + current_zoom_factor = start_zoom_factor + (end_zoom_factor - start_zoom_factor) * eased_progress - # Extract the sub-pixel rectangle and resize + # Calculate crop size based on current zoom + crop_w = int(target_w / current_zoom_factor) + crop_h = int(target_h / current_zoom_factor) + + # Calculate the center point of the crop window + center_x = current_x + crop_w / 2 + center_y = current_y + crop_h / 2 + + # Ensure center stays within the bounds of the scaled image + center_x = max(crop_w / 2, min(center_x, new_width - crop_w / 2)) + center_y = max(crop_h / 2, min(center_y, new_height - crop_h / 2)) + + # Perform the crop using cv2.getRectSubPix (expects floating point center) + # Ensure frame is a numpy array (moviepy returns numpy arrays) try: - # Use cv2.getRectSubPix for potentially smoother results - cropped_frame = cv2.getRectSubPix(frame, (int(round(crop_w)), int(round(crop_h))), (center_x, center_y)) - # Resize to the final target resolution using high-quality interpolation + cropped_frame = cv2.getRectSubPix(frame, (crop_w, crop_h), (center_x, center_y)) + # Resize the cropped frame back to the target resolution resized_frame = cv2.resize(cropped_frame, (target_w, target_h), interpolation=cv2.INTER_LANCZOS4) return resized_frame - except cv2.error as cv_err: - print(f"OpenCV Error during Ken Burns transform: {cv_err}") - print(f" Frame shape: {frame.shape}, crop_w/h: {crop_w}/{crop_h}, center: {center_x},{center_y}") - # Fallback: Simple resize of the original frame (might look jumpy) - return cv2.resize(frame, (target_w, target_h), interpolation=cv2.INTER_LINEAR) except Exception as e: - print(f"Unexpected error during Ken Burns transform: {e}") - print(traceback.format_exc()) - return cv2.resize(frame, (target_w, target_h), interpolation=cv2.INTER_LINEAR) # Fallback + print(f"Error applying Ken Burns transform at t={t:.2f}s: {e}") + # Return a black frame or placeholder in case of error + return np.zeros((target_h, target_w, 3), dtype=np.uint8) + - # Apply the transformation + # Need to return a new clip instance with the effect applied return clip.fl(transform_frame) def resize_to_fill(clip, target_resolution): - """Resize and crop a video clip to fill the target resolution.""" + """Resize and crop a clip to fill the target resolution while maintaining aspect ratio.""" target_w, target_h = target_resolution - print(f"Resizing/cropping video clip to fill {target_w}x{target_h}") - - # Ensure clip has dimensions - if not hasattr(clip, 'w') or not hasattr(clip, 'h') or clip.w <= 0 or clip.h <= 0: - print("Error: Cannot resize video clip without valid dimensions.") - # Attempt to get dimensions from the first frame - try: - frame = clip.get_frame(0) - h, w = frame.shape[:2] - clip.w, clip.h = w, h - print(f"Manually set video clip dimensions to {w}x{h} for resize.") - if w <= 0 or h <= 0: raise ValueError("Invalid dimensions from frame.") - except Exception as e: - print(f"Failed to get/set dimensions: {e}. Returning original clip.") - return clip - - clip_w, clip_h = clip.w, clip.h - clip_aspect = clip_w / clip_h + clip_aspect = clip.w / clip.h target_aspect = target_w / target_h - if abs(clip_aspect - target_aspect) < 0.01: # If aspect ratios are very close - print("Aspect ratios match. Resizing directly.") - return clip.resize((target_w, target_h)) - - elif clip_aspect > target_aspect: # Clip is wider than target - print("Clip is wider. Resizing height and cropping width.") - resized_clip = clip.resize(height=target_h) - # Calculate crop amount (total pixels to remove, divided by 2 for each side) - crop_x = (resized_clip.w - target_w) / 2 - if crop_x < 0: crop_x = 0 # Safety check - print(f"Cropping width: x1={crop_x:.1f}, x2={resized_clip.w - crop_x:.1f}") - final_clip = resized_clip.crop(x1=crop_x, x2=resized_clip.w - crop_x) - else: # Clip is taller than target - print("Clip is taller. Resizing width and cropping height.") - resized_clip = clip.resize(width=target_w) - # Calculate crop amount - crop_y = (resized_clip.h - target_h) / 2 - if crop_y < 0: crop_y = 0 # Safety check - print(f"Cropping height: y1={crop_y:.1f}, y2={resized_clip.h - crop_y:.1f}") - final_clip = resized_clip.crop(y1=crop_y, y2=resized_clip.h - crop_y) - - # Final check on dimensions (floating point issues might cause slight differences) - if final_clip.w != target_w or final_clip.h != target_h: - print(f"Warning: Final clip dimensions ({final_clip.w}x{final_clip.h}) after crop don't exactly match target ({target_w}x{target_h}). Forcing resize.") - final_clip = final_clip.resize((target_w, target_h)) - - return final_clip - -def find_mp3_files(start_dir='.'): - """Search for MP3 files recursively.""" + if clip_aspect > target_aspect: # Clip is wider than target + clip = clip.resize(height=target_h) + # Calculate crop amount to make width match target_w + crop_amount_x = (clip.w - target_w) / 2 + clip = clip.crop(x1=crop_amount_x, x2=clip.w - crop_amount_x, y1=0, y2=clip.h) + else: # Clip is taller than target or same aspect + clip = clip.resize(width=target_w) + # Calculate crop amount to make height match target_h + crop_amount_y = (clip.h - target_h) / 2 + clip = clip.crop(x1=0, x2=clip.w, y1=crop_amount_y, y2=clip.h - crop_amount_y) + + # Ensure dimensions are exactly target_resolution after crop + if clip.size != target_resolution: + print(f"Warning: Clip size {clip.size} after resize_to_fill does not match target {target_resolution}. Resizing again.") + clip = clip.resize(newsize=target_resolution) + + + return clip + +def find_mp3_files(): + """Search for any MP3 files in the current directory and subdirectories.""" mp3_files = [] - print(f"Searching for MP3 files in '{start_dir}' and subdirectories...") - for root, dirs, files in os.walk(start_dir): + # Check relative paths first + for root, dirs, files in os.walk('.'): for file in files: if file.lower().endswith('.mp3'): mp3_path = os.path.join(root, file) mp3_files.append(mp3_path) - print(f"Found MP3: {mp3_path}") - return mp3_files[0] if mp3_files else None + print(f"Found MP3 file: {mp3_path}") + + if mp3_files: + return mp3_files[0] # Return the first one found + else: + print("No MP3 files found in the current directory or subdirectories.") + return None -def add_background_music(final_video, bg_music_path=None, bg_music_volume=0.08): + +def add_background_music(final_video, bg_music_path, bg_music_volume=0.08): """Add background music to the final video.""" - print("Attempting to add background music...") - music_to_use = bg_music_path + if not bg_music_path or not os.path.exists(bg_music_path): + print("No valid background music path provided or file not found. Skipping background music.") + return final_video - # If no path provided via upload, search for local MP3s - if not music_to_use or not os.path.exists(music_to_use): - print(f"BG music path '{bg_music_path}' not found or not given. Searching for local MP3s...") - music_to_use = find_mp3_files() # Search in current dir and subdirs + try: + print(f"Adding background music from: {bg_music_path}") + bg_music = AudioFileClip(bg_music_path) - if music_to_use and os.path.exists(music_to_use): - print(f"Using background music: {os.path.basename(music_to_use)}") - try: - bg_clip = AudioFileClip(music_to_use) - video_duration = final_video.duration - - # Ensure video has an audio track to mix with, create silent if not - if final_video.audio is None: - print("Warning: Input video has no audio track. Creating silent track for mixing.") - # Create a silent audio clip matching video duration - silent_audio = AudioSegment.silent(duration=int(video_duration * 1000), frame_rate=44100) - silent_path = os.path.join(os.path.dirname(music_to_use), f"temp_silent_{uuid.uuid4().hex[:6]}.wav") # Save near music - silent_audio.export(silent_path, format="wav") - video_audio_clip = AudioFileClip(silent_path) - final_video = final_video.set_audio(video_audio_clip) - # Clean up temporary silent file? Or leave it in temp dir. - # os.remove(silent_path) # Be careful if temp dir is cleaned later - else: - video_audio_clip = final_video.audio + # Loop background music if shorter than video + if bg_music.duration < final_video.duration: + loops_needed = math.ceil(final_video.duration / bg_music.duration) + bg_segments = [bg_music] * loops_needed + bg_music = concatenate_audioclips(bg_segments) + # Subclip background music to match video duration + bg_music = bg_music.subclip(0, final_video.duration) - # Loop or trim BG music to match video duration - if bg_clip.duration < video_duration: - loops_needed = math.ceil(video_duration / bg_clip.duration) - print(f"Looping background music {loops_needed} times.") - bg_clip = concatenate_audioclips([bg_clip] * loops_needed) + # Adjust volume + bg_music = bg_music.volumex(bg_music_volume) - # Trim precisely to video duration - bg_clip = bg_clip.subclip(0, video_duration) + # Composite audio + video_audio = final_video.audio + if video_audio: + mixed_audio = CompositeAudioClip([video_audio, bg_music]) + else: + # Handle case where video might not have audio track initially + mixed_audio = bg_music + print("Warning: Video had no audio track, only adding background music.") - # Apply volume adjustment - bg_clip = bg_clip.volumex(bg_music_volume) + final_video = final_video.set_audio(mixed_audio) + print("Background music added successfully.") + return final_video + except Exception as e: + print(f"Error adding background music: {e}") + print("Continuing without background music.") + return final_video - # Mix audio tracks - mixed_audio = CompositeAudioClip([video_audio_clip, bg_clip]) - final_video = final_video.set_audio(mixed_audio) - print(f"Background music added successfully (Volume: {bg_music_volume:.2f})") - # Close the audio file clip resources - bg_clip.close() - if video_audio_clip != final_video.audio: # Close original video audio if replaced - video_audio_clip.close() +def create_clip(media_asset, tts_path, duration, target_resolution, + caption_enabled, caption_color, caption_size, caption_position, + caption_bg_color, caption_stroke_color, caption_stroke_width, + narration_text, segment_index): + """Create a video clip with synchronized subtitles and narration.""" + try: + print(f"Creating clip #{segment_index} from asset: {media_asset.get('path')}, type: {media_asset.get('asset_type')}") + media_path = media_asset.get('path') + asset_type = media_asset.get('asset_type') + + if not media_path or not os.path.exists(media_path): + print(f"Skipping clip {segment_index}: Missing media file {media_path}") + # Create a black clip with silent audio for this segment duration + black_clip = ColorClip(size=target_resolution, color=(0,0,0), duration=duration) + silent_audio_path = generate_silent_audio(duration) + if silent_audio_path and os.path.exists(silent_audio_path): + silent_audio_clip = AudioFileClip(silent_audio_path) + if silent_audio_clip.duration < duration: # Should not happen if silent_audio is correct + silent_audio_clip = silent_audio_clip.loop(duration=duration) + black_clip = black_clip.set_audio(silent_audio_clip.subclip(0, duration)) + print(f"Created placeholder black clip for segment {segment_index}") + # Add placeholder text if captions are enabled + if caption_enabled and narration_text and caption_color != "transparent": + txt_clip = TextClip( + "[Missing Media]\n" + narration_text, # Indicate missing media + fontsize=caption_size, + font='Arial-Bold', + color=caption_color, + bg_color=caption_bg_color, + method='caption', + align='center', + stroke_width=caption_stroke_width, + stroke_color=caption_stroke_color, + size=(target_resolution[0] * 0.9, None) + ).set_position('center').set_duration(duration) # Duration matches black clip + black_clip = CompositeVideoClip([black_clip, txt_clip]) + + return black_clip + + # Determine actual audio duration + audio_clip = None + audio_duration = duration # Default to estimated duration + if tts_path and os.path.exists(tts_path): + try: + audio_clip = AudioFileClip(tts_path).audio_fadeout(0.2) + audio_duration = audio_clip.duration + # Ensure clip duration is slightly longer than audio for transitions/padding + target_clip_duration = audio_duration + 0.3 # Add a small buffer + print(f"TTS audio duration: {audio_duration:.2f}s. Target clip duration: {target_clip_duration:.2f}s") + except Exception as e: + print(f"Error loading TTS audio clip {tts_path}: {e}. Using estimated duration {duration:.2f}s.") + audio_clip = None # Ensure audio_clip is None if loading fails + target_clip_duration = duration # Fallback to estimated duration - except Exception as e: - print(f"Error processing or adding background music: {e}") - print(traceback.format_exc()) - print("Continuing without background music due to error.") - # Return the original video if mixing failed - return final_video - else: - print("No suitable background music file found or provided. Skipping.") + if asset_type == "video": + try: + clip = VideoFileClip(media_path) + print(f"Loaded video clip with duration {clip.duration:.2f}s") + clip = resize_to_fill(clip, target_resolution) + if clip.duration < target_clip_duration: + print("Looping video clip") + clip = clip.loop(duration=target_clip_duration) + else: + clip = clip.subclip(0, target_clip_duration) + clip = clip.fadein(0.2).fadeout(0.2) # Add simple transitions + except Exception as e: + print(f"Error processing video clip {media_path}: {e}") + # Fallback to a black clip if video processing fails + print(f"Creating placeholder black clip instead for segment {segment_index}") + clip = ColorClip(size=target_resolution, color=(0,0,0), duration=target_clip_duration) - return final_video + elif asset_type == "image": + try: + img = Image.open(media_path) + # Ensure image is in RGB format before passing to ImageClip + if img.mode != 'RGB': + print("Converting image to RGB") + img = img.convert('RGB') + # Save back to a temp file or pass numpy array directly if ImageClip supports it + # ImageClip accepts numpy arrays, let's convert + img_array = np.array(img) + img.close() # Close the PIL image + clip = ImageClip(img_array).set_duration(target_clip_duration) + else: + img.close() # Close the PIL image + clip = ImageClip(media_path).set_duration(target_clip_duration) -def create_caption_clip(text, duration, target_resolution, options): - """Creates a moviepy TextClip for captions with styling.""" - target_w, target_h = target_resolution - settings = { - "fontsize": 40, - "font": 'Arial-Bold', # Ensure font is available on the system - "color": 'white', - "bg_color": 'rgba(0, 0, 0, 0.5)', # Semi-transparent black background - "stroke_color": 'black', - "stroke_width": 1.5, - "align": 'center', - "method": 'caption', # Use caption for automatic line breaking - "size": (target_w * 0.85, None), # Limit width to 85% of screen - "position": "bottom", # Default position keyword - **(options or {}) # Override defaults with provided options - } + print(f"Loaded image clip with duration {clip.duration:.2f}s") + clip = apply_kenburns_effect(clip, target_resolution, effect_type="random") # Random Ken Burns + clip = clip.fadein(0.3).fadeout(0.3) # Add simple transitions - # Convert position keyword to coordinates - pos_keyword = settings["position"] - if pos_keyword == 'bottom': - # Position slightly above the bottom edge - y_pos = target_h * 0.90 - settings["fontsize"] # Adjust based on font size? - settings["position"] = ('center', y_pos) - elif pos_keyword == 'center': - settings["position"] = ('center', 'center') - elif pos_keyword == 'top': - y_pos = target_h * 0.10 - settings["position"] = ('center', y_pos) - # Allow specific tuple coordinates too - elif not isinstance(pos_keyword, (tuple, list)): - print(f"Warning: Unknown caption position keyword '{pos_keyword}'. Defaulting to bottom.") - settings["position"] = ('center', target_h * 0.90 - settings["fontsize"]) - - - print(f"Creating caption: '{text[:30]}...', Size: {settings['fontsize']}, Color: {settings['color']}, Pos: {settings['position']}") - - # Attempt to create the TextClip - try: - # Remove position from settings dict before passing to TextClip if it's handled separately or method='caption' handles it - text_clip_args = {k: v for k, v in settings.items() if k != 'position'} + except Exception as e: + print(f"Error processing image clip {media_path}: {e}") + # Fallback to a black clip if image processing fails + print(f"Creating placeholder black clip instead for segment {segment_index}") + clip = ColorClip(size=target_resolution, color=(0,0,0), duration=target_clip_duration) - txt_clip = TextClip(text, **text_clip_args) - txt_clip = txt_clip.set_position(settings["position"]) - txt_clip = txt_clip.set_duration(duration) - print("Caption TextClip created.") - return txt_clip + else: + print(f"Unknown asset type {asset_type} for segment {segment_index}. Skipping.") + return None - except Exception as e: - print(f"ERROR creating TextClip: {e}") - # This often relates to ImageMagick issues (policy, installation, font availability) - print(" >> Check ImageMagick installation and policy configuration.") - print(" >> Ensure the specified font ('{settings['font']}') is installed and accessible.") - print(traceback.format_exc()) - # Return a dummy/empty clip or None to indicate failure - return None + # Set the audio for the clip if audio_clip was loaded successfully + if audio_clip: + # Ensure audio clip duration matches video clip duration after processing + if abs(audio_clip.duration - clip.duration) > 0.1: # Allow slight difference + print(f"Adjusting audio duration ({audio_clip.duration:.2f}s) to match video duration ({clip.duration:.2f}s)") + audio_clip = audio_clip.fx(vfx.speedx, factor=audio_clip.duration / clip.duration) + clip = clip.set_audio(audio_clip) + else: + # If TTS failed or audio loading failed, ensure video clip has no audio or silent audio + print(f"No valid audio for clip {segment_index}. Setting silent audio.") + silent_audio_path = generate_silent_audio(clip.duration) + if silent_audio_path and os.path.exists(silent_audio_path): + silent_audio_clip = AudioFileClip(silent_audio_path) + if abs(silent_audio_clip.duration - clip.duration) > 0.1: + silent_audio_clip = silent_audio_clip.fx(vfx.speedx, factor=silent_audio_clip.duration / clip.duration) + clip = clip.set_audio(silent_audio_clip) + else: + clip = clip.set_audio(None) # Set audio to None if silent audio fails -def create_clip(segment_data, temp_dir, target_resolution, caption_options=None, kenburns_effect="random"): - """Create a single video clip segment from state data.""" - idx = segment_data["index"] - media_path = segment_data["media_path"] - asset_type = segment_data["media_type"] - narration = segment_data["narration"] - segment_id = segment_data["segment_id"] + # Add subtitles if enabled + if caption_enabled and narration_text and caption_color != "transparent": + try: + # Simple word-based chunking for subtitles + words = narration_text.split() + # Calculate word timings based on total audio duration and word count + # This is a simple approach; for better sync, use a forced aligner or whisper + words_per_second = len(words) / audio_duration if audio_duration > 0 else len(words) + word_duration = 1.0 / words_per_second if words_per_second > 0 else 0.5 # Default if 0 + + subtitle_clips = [] + current_time = 0 + chunk_size = 6 # Words per caption chunk (adjust as needed) + + for i in range(0, len(words), chunk_size): + chunk_words = words[i:i+chunk_size] + chunk_text = ' '.join(chunk_words) + # Estimate chunk duration based on word count * average word duration + estimated_chunk_duration = len(chunk_words) * word_duration + + start_time = current_time + end_time = min(current_time + estimated_chunk_duration, clip.duration) # Ensure end time doesn't exceed clip duration + if start_time >= end_time: break # Avoid 0 or negative duration clips + + # Determine vertical position + if caption_position == "Top": + subtitle_y_position = int(target_resolution[1] * 0.1) + elif caption_position == "Middle": + subtitle_y_position = int(target_resolution[1] * 0.5) + else: # Default to Bottom + subtitle_y_position = int(target_resolution[1] * 0.85) # Closer to bottom + + + txt_clip = TextClip( + chunk_text, + fontsize=caption_size, + font='Arial-Bold', # Ensure this font is available or use a common system font + color=caption_color, + bg_color=caption_bg_color, # Use background color + method='caption', # Enables text wrapping + align='center', + stroke_width=caption_stroke_width, # Use stroke + stroke_color=caption_stroke_color, # Use stroke color + size=(target_resolution[0] * 0.9, None) # Caption width max 90% of video width + ).set_start(start_time).set_end(end_time) + txt_clip = txt_clip.set_position(('center', subtitle_y_position)) + subtitle_clips.append(txt_clip) + current_time = end_time # Move to the end of the current chunk + + if subtitle_clips: + clip = CompositeVideoClip([clip] + subtitle_clips) + print(f"Added {len(subtitle_clips)} subtitle chunks to clip {segment_index}.") + else: + print(f"No subtitle clips generated for segment {segment_index}.") + + + except Exception as sub_error: + print(f"Error adding subtitles for segment {segment_index}: {sub_error}") + # Fallback to a single centered text overlay if detailed subtitling fails + try: + txt_clip = TextClip( + narration_text, + fontsize=caption_size, + font='Arial-Bold', + color=caption_color, + bg_color=caption_bg_color, + method='caption', + align='center', + stroke_width=caption_stroke_width, + stroke_color=caption_stroke_color, + size=(target_resolution[0] * 0.8, None) + ).set_position(('center', int(target_resolution[1] * 0.75))).set_duration(clip.duration) + clip = CompositeVideoClip([clip, txt_clip]) + print(f"Added simple fallback subtitle for segment {segment_index}.") + except Exception as fallback_sub_error: + print(f"Simple fallback subtitle failed for segment {segment_index}: {fallback_sub_error}") + + + # Ensure final clip duration is set + clip = clip.set_duration(clip.duration) # This might seem redundant but can help fix issues + + print(f"Clip {segment_index} created: {clip.duration:.2f}s") + return clip + except Exception as e: + print(f"Critical error in create_clip for segment {segment_index}: {str(e)}") + # Create a black clip with error message if anything goes wrong + error_duration = duration if duration else 3 # Use estimated duration or default + black_clip = ColorClip(size=target_resolution, color=(0,0,0), duration=error_duration) + error_text = f"Error in segment {segment_index}" + if narration_text: error_text += f":\n{narration_text[:50]}..." + error_txt_clip = TextClip( + error_text, + fontsize=30, + color="red", + align='center', + size=(target_resolution[0] * 0.9, None) + ).set_position('center').set_duration(error_duration) + clip = CompositeVideoClip([black_clip, error_txt_clip]) + silent_audio_path = generate_silent_audio(error_duration) + if silent_audio_path and os.path.exists(silent_audio_path): + clip = clip.set_audio(AudioFileClip(silent_audio_path)) + print(f"Created error placeholder clip for segment {segment_index}.") + return clip - print(f"\n--- Creating Clip {idx+1} ({segment_id}) ---") - print(f" Media: {os.path.basename(media_path)} ({asset_type})") - print(f" Narration: '{narration[:50]}...'") - # Validate inputs - if not media_path or not os.path.exists(media_path): - print(f"Error: Media file not found for segment {idx+1}: {media_path}") - return None - if not narration: - print(f"Warning: Empty narration for segment {idx+1}. Generating silent TTS.") - # Fall through, generate_tts will handle empty string - - # 1. Generate TTS Audio - tts_path = generate_tts(narration, temp_dir) - if not tts_path: - print(f"Error: Failed to generate TTS for segment {idx+1}. Skipping clip.") - return None +def fix_imagemagick_policy(): + """Attempt to fix ImageMagick security policies required by TextClip.""" + print("Attempting to fix ImageMagick security policies...") + policy_paths = [ + "/etc/ImageMagick-6/policy.xml", + "/etc/ImageMagick-7/policy.xml", + "/etc/ImageMagick/policy.xml", # Common symlink path + "/usr/local/etc/ImageMagick-7/policy.xml", # macports/homebrew path + "/usr/share/ImageMagick/policy.xml", # Another common path + "/usr/share/ImageMagick-6/policy.xml", + "/usr/share/ImageMagick-7/policy.xml", + # Add more paths if needed based on typical installations + ] + found_policy = None + for path in policy_paths: + if os.path.exists(path): + found_policy = path + break + if not found_policy: + print("No policy.xml found in common locations. TextClip may fail.") + print("Consider installing ImageMagick and checking its installation path.") + return False + + print(f"Attempting to modify policy file at {found_policy}") try: - audio_clip = AudioFileClip(tts_path) - # Add slight fade out to prevent abrupt cuts - audio_clip = audio_clip.audio_fadeout(0.1) - audio_duration = audio_clip.duration - # Ensure minimum clip duration (e.g., 1 second), add buffer for fades - target_duration = max(1.5, audio_duration + 0.2) - print(f" Audio duration: {audio_duration:.2f}s, Target clip duration: {target_duration:.2f}s") - - except Exception as audio_err: - print(f"Error loading audio clip {os.path.basename(tts_path)}: {audio_err}") - return None # Cannot proceed without audio - - # 2. Create Video/Image Base Clip - base_clip = None - try: - if asset_type == "video": - print(" Processing video asset...") - # target_resolution passed as (w, h), but VideoFileClip might expect (h, w) sometimes? Check docs. - # Let's assume target_resolution is (width, height) consistently. - clip = VideoFileClip(media_path) #, target_resolution=target_resolution[::-1]) # Try passing target res here? - clip = resize_to_fill(clip, target_resolution) # Resize/crop to fit target - - # Loop or trim video - if clip.duration < target_duration: - print(f" Looping video (duration {clip.duration:.2f}s) to fit {target_duration:.2f}s") - clip = clip.loop(duration=target_duration) + # Create a backup + backup_path = f"{found_policy}.bak_aivgen_{int(time.time())}" + shutil.copy2(found_policy, backup_path) + print(f"Created backup at {backup_path}") + + # Read the original policy file + with open(found_policy, 'r') as f: + policy_content = f.read() + + # Use regex to find and replace the specific policy lines + # Allow read and write rights for PDF, EPS, PS, etc. potentially restricted formats + # Also ensure path policies allow reading/writing files + modified_content = re.sub( + r']*>', + r'', # Ensure path rights are read|write + modified_content + ) + # More general rights none replacement, be careful with this one + modified_content = re.sub( + r']*>', + lambda m: m.group(0).replace('rights="none"', 'rights="read|write"'), + modified_content + ) + + + # Write the modified content back + # Use sudo if running as a non-root user in a typical Linux install + try: + with open(found_policy, 'w') as f: + f.write(modified_content) + print("ImageMagick policies updated successfully (direct write).") + return True + except IOError as e: + print(f"Direct write failed: {e}. Attempting with sudo...") + # Fallback to using os.system with sudo if direct write fails + # This requires the user to be able to run sudo commands without a password prompt for the script's execution + temp_policy_file = os.path.join(TEMP_FOLDER, "temp_policy.xml") + with open(temp_policy_file, 'w') as f: + f.write(modified_content) + + cmd = f"sudo cp {temp_policy_file} {found_policy}" + print(f"Executing: {cmd}") + result = os.system(cmd) # Returns 0 on success + + if result == 0: + print("ImageMagick policies updated successfully using sudo.") + return True else: - # Start from beginning for simplicity, could add random start - start_time = 0 - clip = clip.subclip(start_time, start_time + target_duration) + print(f"Failed to update ImageMagick policies using sudo. Result code: {result}.") + print("Please manually edit your policy.xml to grant read/write rights for coder and path domains.") + print("Example: Change to ") + return False + finally: + if os.path.exists(temp_policy_file): + os.remove(temp_policy_file) - # Apply fade in/out for smoother transitions between video clips - base_clip = clip.fadein(0.2).fadeout(0.2) - elif asset_type == "image": - print(" Processing image asset...") - # ImageClip needs RGB, download_media should have handled conversion - clip = ImageClip(media_path).set_duration(target_duration) - # Apply Ken Burns effect - clip = apply_kenburns_effect(clip, target_resolution, effect_type=kenburns_effect) - # Apply fade in/out (can be longer for images) - base_clip = clip.fadein(0.4).fadeout(0.4) - else: - print(f"Error: Unknown asset type '{asset_type}' for segment {idx+1}") - audio_clip.close() # Close audio resource - return None + except Exception as e: + print(f"Error during ImageMagick policy modification: {e}") + print("Manual intervention may be required.") + return False - if base_clip is None: # Should not happen if logic is correct - raise ValueError("Base clip creation failed unexpectedly.") - # Ensure base_clip has the target dimensions after processing - if base_clip.w != target_resolution[0] or base_clip.h != target_resolution[1]: - print(f"Warning: Base clip dimensions ({base_clip.w}x{base_clip.h}) don't match target after processing. Forcing resize.") - base_clip = base_clip.resize(target_resolution) +# ---------------- Gradio Interface Functions ---------------- # + +def generate_script_and_show_editor(user_input, resolution_choice, + caption_enabled_choice, caption_color, + caption_size, caption_position, caption_bg_color, + caption_stroke_color, caption_stroke_width): + """ + Generates the script, parses it, stores segments in state, + and prepares the UI updates to show the editing interface. + """ + global TEMP_FOLDER + # Clean up previous run's temp folder if it exists + if TEMP_FOLDER and os.path.exists(TEMP_FOLDER): + print(f"Cleaning up previous temp folder: {TEMP_FOLDER}") + try: + shutil.rmtree(TEMP_FOLDER) + except Exception as e: + print(f"Error cleaning up temp folder {TEMP_FOLDER}: {e}") + + # Create a new unique temporary folder for this run + TEMP_FOLDER = tempfile.mkdtemp() + print(f"Created new temp folder: {TEMP_FOLDER}") + + # Store global style choices in state or use them directly (let's store in state) + # Gradio State can hold a single object. Let's use a dict. + run_config = { + "resolution": (1920, 1080) if resolution_choice == "Full" else (1080, 1920), + "caption_enabled": caption_enabled_choice == "Yes", + "caption_color": caption_color, + "caption_size": caption_size, + "caption_position": caption_position, + "caption_bg_color": caption_bg_color, + "caption_stroke_color": caption_stroke_color, + "caption_stroke_width": caption_stroke_width, + "temp_folder": TEMP_FOLDER # Store temp folder path + } + yield run_config, gr.update(value="Generating script...", visible=True), gr.update(visible=False) # Update status - except Exception as visual_err: - print(f"Error processing visual media {os.path.basename(media_path)}: {visual_err}") - print(traceback.format_exc()) - audio_clip.close() - if base_clip: base_clip.close() # Close if partially created - return None + script_text = generate_script(user_input, OPENROUTER_API_KEY, OPENROUTER_MODEL) - # 3. Add Captions (if enabled and text exists) - final_clip = base_clip - if caption_options and caption_options.get("enabled", "No") == "Yes" and narration: - print(" Adding captions...") - caption_clip = create_caption_clip(narration, base_clip.duration, target_resolution, caption_options) - if caption_clip: - # Composite the base video/image and the caption text - final_clip = CompositeVideoClip([base_clip, caption_clip]) - print(" Captions added successfully.") - else: - print(" Warning: Failed to create caption clip. Proceeding without captions for this segment.") - final_clip = base_clip # Use the base clip without captions + if not script_text or script_text.startswith("[Error]"): + yield run_config, gr.update(value=f"Script generation failed: {script_text}", visible=True), gr.update(visible=False) + return run_config, gr.update(visible=True), gr.update(visible=False), [], [], [], [] # Clear segment components - # 4. Set Audio - try: - final_clip = final_clip.set_audio(audio_clip) - print(f"Clip {idx+1} created successfully. Duration: {final_clip.duration:.2f}s") - return final_clip - except Exception as set_audio_err: - print(f"Error setting audio for clip {idx+1}: {set_audio_err}") - # Clean up resources - final_clip.close() - audio_clip.close() - return None - - -# --- Gradio UI and State Management --- - -# Helper to create the UI row for editing a single segment -def create_segment_editor_row(segment_data, temp_dir): - """Creates Gradio components for one segment editor row.""" - idx = segment_data["index"] - segment_id = segment_data["segment_id"] - is_video = segment_data["media_type"] == "video" - media_label = f"Segment {idx+1}: Media ({segment_data['media_type']})" - - with gr.Blocks(): # Use Blocks to encapsulate the row structure - with gr.Row(variant="panel", elem_id=f"segment-row-{segment_id}"): - with gr.Column(scale=2): # Media preview and upload - # Use Video component for videos, Image for images - if is_video: - media_preview = gr.Video(label=media_label, value=segment_data["media_path"], interactive=False, height=200) - else: - media_preview = gr.Image(label=media_label, value=segment_data["media_path"], interactive=False, height=200, type="filepath") - - upload_btn = gr.UploadButton("Change Media", file_types=["image", "video"], scale=1) - - with gr.Column(scale=3): # Narration editor - narration_editor = gr.Textbox( - label=f"Segment {idx+1}: Narration (Prompt: '{segment_data['prompt']}')", - value=segment_data["narration"], - lines=5, - interactive=True, - elem_id=f"narration-edit-{segment_id}" # Unique ID for updates - ) - # Display original prompt for reference - # gr.Markdown(f"Original Prompt: `{segment_data['prompt']}`") - - # Return the interactive components and preview component for potential updates - return narration_editor, media_preview, upload_btn - - -# --- Main Gradio App Definition --- -with gr.Blocks(theme=gr.themes.Soft(), title="AI Documentary Generator") as demo: - # --- State Variables --- - # app_state: Holds the list of segment dictionaries - # [ { "segment_id": "...", "index": ..., "prompt": ..., "narration": ..., ... }, ... ] - app_state = gr.State([]) - # temp_dir_state: Holds the path to the temporary directory for the current run - temp_dir_state = gr.State(None) - # ui_state: Holds references to dynamic UI components if needed for updates (Advanced) - # ui_state = gr.State({}) # { segment_id: {"narration_comp": ..., "media_comp": ...}, ... } - - # --- UI Layout --- - gr.Markdown("# AI Documentary Video Generator (Enhanced)") - gr.Markdown("Create humorous documentary-style videos. Enter a concept, edit the AI's script & visuals, customize, and generate!") - - with gr.Row(): - # --- Left Column: Inputs & Controls --- - with gr.Column(scale=1): - gr.Markdown("## 1. Concept & Script") - concept_input = gr.Textbox( - label="Video Concept / Topic", - placeholder="e.g., funny facts about cats, the history of pizza, why squirrels are plotting world domination", - lines=2 - ) - video_ratio_slider = gr.Slider( - 0, 1, value=0.3, step=0.05, - label="Video Clip Preference", - info="Influences % of clips attempted as video vs. image (0=images only, 1=videos only)" - ) - generate_script_btn = gr.Button("Generate Script & Visuals", variant="primary", icon="✨") - - gr.Markdown("## 3. Customization") - with gr.Accordion("Video & Audio Settings", open=False): - resolution_input = gr.Radio( - ["Full HD (1920x1080)", "Vertical Short (1080x1920)"], - label="Target Resolution", value="Full HD (1920x1080)" - ) - kenburns_select = gr.Dropdown( - ["random", "zoom-in", "zoom-out", "pan-left", "pan-right", "pan-up", "pan-down", "diag-tl-br", "diag-tr-bl", "static"], - label="Image Movement (Ken Burns)", value="random" - ) - bg_music_upload = gr.File(label="Optional Background Music (MP3/WAV)", file_types=[".mp3", ".wav", ".aac", ".ogg"]) - bg_music_volume = gr.Slider(0, 0.5, value=0.08, step=0.01, label="BG Music Volume") - - with gr.Accordion("Caption Settings", open=False): - caption_enable_radio = gr.Radio(["Yes", "No"], label="Enable Captions", value="Yes") - caption_font_size = gr.Slider(12, 80, value=44, step=1, label="Font Size") - caption_font_color = gr.ColorPicker(label="Font Color", value="#FFFFFF") # White - caption_bg_color = gr.ColorPicker(label="Background Color (RGBA)", value="#00000080") # Black 50% alpha - caption_position = gr.Dropdown(["bottom", "center", "top"], label="Vertical Position", value="bottom") - # Advanced: Font selection (requires knowing available fonts) - # caption_font = gr.Dropdown(["Arial-Bold", "Impact", "Comic Sans MS"], label="Font", value="Arial-Bold") # Example - - generate_video_btn = gr.Button("Generate Final Video", variant="primary", icon="🎬", interactive=False) # Disabled initially - - - # --- Right Column: Status, Editors, Output --- - with gr.Column(scale=2): - status_update = gr.Markdown("Status: Waiting for concept...") - - gr.Markdown("## 2. Edit Segments") - gr.Markdown("Review the AI-generated narration and visuals below. Edit text directly and use 'Change Media' to upload your own image or video for any segment.") - # This column will be populated dynamically with segment editors - segment_editors_area = gr.Column(elem_id="segment-editors-area") - - gr.Markdown("## 4. Output") - final_video_output = gr.Video(label="Generated Video", interactive=False) - cleanup_message = gr.Markdown("") + yield run_config, gr.update(value="Parsing script...", visible=True), gr.update(visible=False) + segments = parse_script(script_text) - # --- Event Handlers --- + if not segments: + yield run_config, gr.update(value="Failed to parse script or script is empty.", visible=True), gr.update(visible=False) + return run_config, gr.update(visible=True), gr.update(visible=False), [], [], [], [] # Clear segment components - # Function triggered by "Generate Script & Visuals" button - def handle_script_generation(concept, video_ratio, current_temp_dir): - print("\n--- Step 1: Generating Script & Initial Visuals ---") - if not concept: - return { status_update: gr.update(value="Status: Please enter a video concept.") } - - # Clean up previous run's temp dir if it exists - if current_temp_dir and os.path.isdir(current_temp_dir): - print(f"Cleaning up previous temporary directory: {current_temp_dir}") - shutil.rmtree(current_temp_dir, ignore_errors=True) - - # Create a new unique temporary directory for this run - temp_dir = tempfile.mkdtemp(prefix="aivideo_") - print(f"Created temporary directory: {temp_dir}") - - status_msg = "Status: Generating script..." - yield { - status_update: gr.update(value=status_msg), - segment_editors_area: gr.update(value=None), # Clear previous editors - final_video_output: gr.update(value=None), # Clear previous video - cleanup_message: gr.update(value=""), - generate_video_btn: gr.update(interactive=False) # Disable final generate btn - } - script_text = generate_script(concept) - if not script_text or script_text.startswith("Error:"): - shutil.rmtree(temp_dir, ignore_errors=True) # Clean up failed run temp dir - yield { - status_update: gr.update(value=f"Status: Script Generation Failed. {script_text}"), - temp_dir_state: None - } - return - - status_msg = "Status: Script generated. Parsing segments..." - yield { status_update: gr.update(value=status_msg) } - - elements = parse_script(script_text) - if not elements: - shutil.rmtree(temp_dir, ignore_errors=True) - yield { - status_update: gr.update(value="Status: Error parsing script. No segments found."), - temp_dir_state: None - } - return - - num_segments = len(elements) // 2 - status_msg = f"Status: Parsed {num_segments} segments. Generating initial media previews (this may take a while)..." - yield { status_update: gr.update(value=status_msg) } - - # --- Create Initial State (Generate media for each segment) --- - initial_state = [] - segment_map = {} - for elem in elements: # Group by segment_id - s_id = elem.get("segment_id") - if s_id: - if s_id not in segment_map: segment_map[s_id] = {} - segment_map[s_id][elem["type"]] = elem - - processed_segments = 0 - for idx, (s_id, types) in enumerate(segment_map.items()): - if "media" in types and "tts" in types: - media_elem = types["media"] - tts_elem = types["tts"] - prompt = media_elem['prompt'] - - # Simple check for news-related prompts - is_news_prompt = any(kw in prompt.lower() for kw in ["news", "breaking", "report", "update"]) - - # Generate initial media suggestion - media_asset = generate_media(prompt, temp_dir, video_ratio, is_news=is_news_prompt) - - if media_asset: - segment_data = { - "segment_id": s_id, - "index": idx, - "prompt": prompt, - "narration": tts_elem["text"], - "original_narration": tts_elem["text"], - "duration": tts_elem["duration"], # Keep initial estimate, recalculate if needed - "media_path": media_asset["path"], - "media_type": media_asset["asset_type"], - "original_media_path": media_asset["path"], - "user_uploaded": False, - "source": media_asset.get("source", "unknown") # Track where media came from - } - initial_state.append(segment_data) - processed_segments += 1 - status_msg = f"Status: Generated media for segment {processed_segments}/{num_segments} ('{prompt[:20]}...')" - yield { status_update: gr.update(value=status_msg) } - else: - print(f"Warning: Failed to get initial media for segment {idx+1} (Prompt: {prompt}). Skipping segment.") - status_msg = f"Status: Failed media for segment {idx+1}/{num_segments}. Skipping." - yield { status_update: gr.update(value=status_msg) } - time.sleep(0.5) # Pause briefly on failure + # Prepare updates for dynamic editing components + # We need to return lists of gr.update() calls for the visibility and content + # of each textbox and file component in the editing groups. + textbox_updates = [] + file_updates = [] + group_visibility_updates = [] - else: - print(f"Warning: Incomplete segment data for {s_id}. Skipping.") - - - if not initial_state: - shutil.rmtree(temp_dir, ignore_errors=True) - yield { - status_update: gr.update(value="Status: Error generating initial media or no valid segments found. Please try a different concept."), - temp_dir_state: None - } - return - - - # --- Dynamically Create Editor UI --- - print(f"Creating UI for {len(initial_state)} segments...") - # We need to build the UI components *within* this function context - # so we can wire them up correctly. - ui_components = {} # To store references if needed, e.g., for updates - with gr.Blocks() as editor_ui_block: # Create a temporary Blocks context to build the UI - for segment_data in initial_state: - s_id = segment_data["segment_id"] - narration_comp, media_comp, upload_comp = create_segment_editor_row(segment_data, temp_dir) - ui_components[s_id] = {"narration": narration_comp, "media": media_comp, "upload": upload_comp} - - # --- Wire up event handlers --- - # Use partial to pass segment_id and potentially component references - # Narration Change: - narration_comp.change( - fn=handle_narration_change, - inputs=[narration_comp, app_state], # Pass the component itself and state - outputs=[app_state], # Output updated state - # Pass segment_id using _js trick or find another way if needed - # This might require restructuring state or using elem_id lookup - # Let's assume handle_narration_change can find the segment by component ref or value for now - # A cleaner way might be adding segment_id as a hidden component in the row - ) - # Media Upload: - upload_comp.upload( - fn=partial(handle_media_upload, segment_id=s_id, temp_dir=temp_dir), # Pass s_id and temp_dir - inputs=[upload_comp, app_state], # Pass upload component and state - outputs=[app_state, media_comp], # Update state AND the preview component - ) - - status_msg = f"Status: Ready for editing. {len(initial_state)} segments loaded." - print("Segment editors created and events wired.") - - # Return the updates: new state, temp_dir, status, the dynamically created UI, and enable final button - yield { - app_state: initial_state, - temp_dir_state: temp_dir, - status_update: gr.update(value=status_msg), - segment_editors_area: gr.update(value=editor_ui_block), # Replace area content with new UI - generate_video_btn: gr.update(interactive=True) # Enable final generation button - } + for i in range(MAX_SEGMENTS_FOR_EDITING): + if i < len(segments): + # Show group, populate text, clear file upload + textbox_updates.append(gr.update(value=segments[i]['text'], visible=True)) + file_updates.append(gr.update(value=None, visible=True)) # Clear previous uploads + group_visibility_updates.append(gr.update(visible=True)) + else: + # Hide unused groups + textbox_updates.append(gr.update(value="", visible=False)) + file_updates.append(gr.update(value=None, visible=False)) + group_visibility_updates.append(gr.update(visible=False)) + + yield (run_config, + gr.update(value="Script generated. Edit segments below.", visible=True), + gr.update(visible=True), # Show Generate Video button + group_visibility_updates, # Update visibility of groups + textbox_updates, # Update textboxes + file_updates, # Update file uploads + segments) # Update the state with parsed segments + +def generate_video_from_edited(run_config, segments_data, segment_texts, segment_uploads): + """ + Takes the edited segment data (text, uploaded files) and configuration, + and generates the final video. + """ + if not segments_data: + yield "No segments to process. Generate script first.", None + return + + global TEMP_FOLDER + # Ensure TEMP_FOLDER is correctly set from run_config + TEMP_FOLDER = run_config.get("temp_folder") + if not TEMP_FOLDER or not os.path.exists(TEMP_FOLDER): + yield "Error: Temporary folder not found. Please regenerate script.", None + return + + # Extract config from run_config + TARGET_RESOLUTION = run_config["resolution"] + CAPTION_ENABLED = run_config["caption_enabled"] + CAPTION_COLOR = run_config["caption_color"] + CAPTION_SIZE = run_config["caption_size"] + CAPTION_POSITION = run_config["caption_position"] + CAPTION_BG_COLOR = run_config["caption_bg_color"] + CAPTION_STROKE_COLOR = run_config["caption_stroke_color"] + CAPTION_STROKE_WIDTH = run_config["caption_stroke_width"] + + + # Update segments_data with potentially edited text and uploaded file paths + # segment_texts and segment_uploads are lists of values from the Gradio components + processed_segments = [] + for i, segment in enumerate(segments_data): + if i < len(segment_texts): # Ensure we have corresponding input values + processed_segment = segment.copy() # Make a copy + processed_segment['text'] = segment_texts[i] # Use the edited text + processed_segment['uploaded_media'] = segment_uploads[i] # Use the uploaded file path (None if not uploaded) + processed_segments.append(processed_segment) + else: + # This shouldn't happen if state and UI updates are in sync, but as a safeguard + print(f"Warning: Missing input value for segment index {i}. Skipping segment.") + # Or perhaps use the original segment data if no edited input? Let's skip for safety. + # processed_segments.append(segment) # Append original if no input? Depends on desired behavior. + + if not processed_segments: + yield "No valid segments to process after editing.", None + return + + + yield "Fixing ImageMagick policy...", None + fix_imagemagick_policy() # Attempt policy fix before creating clips + + clips = [] + yield "Generating media and audio for clips...", None + + total_segments = len(processed_segments) + for idx, segment in enumerate(processed_segments): + yield f"Processing segment {idx+1}/{total_segments}...", None + print(f"\nProcessing segment {idx+1}/{total_segments}...") + + # Determine media source: uploaded or generated + media_asset = None + if segment.get('uploaded_media') and os.path.exists(segment['uploaded_media']): + print(f"Using uploaded media for segment {idx+1}: {segment['uploaded_media']}") + file_ext = os.path.splitext(segment['uploaded_media'])[1].lower() + asset_type = 'video' if file_ext in ['.mp4', '.mov', '.avi', '.webm'] else 'image' + # Need to copy the uploaded file to the temp folder if it's not already there + try: + temp_upload_path = os.path.join(TEMP_FOLDER, f"user_upload_{idx}{file_ext}") + shutil.copy2(segment['uploaded_media'], temp_upload_path) + media_asset = {"path": temp_upload_path, "asset_type": asset_type} + except Exception as e: + print(f"Error copying user upload {segment['uploaded_media']}: {e}. Attempting to generate media instead.") + media_asset = generate_media_asset(segment['original_prompt'], None) # Pass None for uploaded_media + else: + print(f"No user upload for segment {idx+1}. Generating media from prompt: '{segment['original_prompt']}'") + media_asset = generate_media_asset(segment['original_prompt'], None) # Pass None for uploaded_media + + + if not media_asset: + print(f"Failed to generate or use media asset for segment {idx+1}. Creating placeholder.") + # Create a dummy asset dict pointing to a non-existent path so create_clip makes a black clip + media_asset = {"path": os.path.join(TEMP_FOLDER, f"dummy_missing_media_{idx}.txt"), "asset_type": "image"} # Use image as dummy type + + # Generate TTS audio + tts_path = generate_tts(segment['text'], voice='en') # Using 'en' voice + + # Create the video clip for this segment + clip = create_clip( + media_asset=media_asset, + tts_path=tts_path, + duration=segment['duration'], # Use estimated duration as a fallback reference + target_resolution=TARGET_RESOLUTION, + caption_enabled=CAPTION_ENABLED, + caption_color=CAPTION_COLOR, + caption_size=CAPTION_SIZE, + caption_position=CAPTION_POSITION, + caption_bg_color=CAPTION_BG_COLOR, + caption_stroke_color=CAPTION_STROKE_COLOR, + caption_stroke_width=CAPTION_STROKE_WIDTH, + narration_text=segment['text'], + segment_index=idx+1 + ) + if clip: + clips.append(clip) + else: + print(f"Skipping segment {idx+1} due to clip creation failure.") + # Create a placeholder black clip if create_clip returned None + placeholder_duration = segment.get('duration', 3.0) # Use estimated duration or default + placeholder_clip = ColorClip(size=TARGET_RESOLUTION, color=(0,0,0), duration=placeholder_duration) + silent_audio_path = generate_silent_audio(placeholder_duration) + if silent_audio_path and os.path.exists(silent_audio_path): + placeholder_clip = placeholder_clip.set_audio(AudioFileClip(silent_audio_path)) + error_text = f"Segment {idx+1} Failed" + if segment.get('text'): error_text += f":\n{segment['text'][:50]}..." + error_txt_clip = TextClip(error_text, fontsize=30, color="red", align='center', size=(TARGET_RESOLUTION[0] * 0.9, None)).set_position('center').set_duration(placeholder_duration) + placeholder_clip = CompositeVideoClip([placeholder_clip, error_txt_clip]) + clips.append(placeholder_clip) + + + if not clips: + yield "No clips were successfully created. Video generation failed.", None + # Clean up + if TEMP_FOLDER and os.path.exists(TEMP_FOLDER): + try: + shutil.rmtree(TEMP_FOLDER) + print(f"Cleaned up temp folder: {TEMP_FOLDER}") + except Exception as e: + print(f"Error cleaning up temp folder {TEMP_FOLDER}: {e}") + TEMP_FOLDER = None # Reset global + return + + yield "Concatenating clips...", None + print("\nConcatenating clips...") + final_video = concatenate_videoclips(clips, method="compose") + + yield "Adding background music...", None + bg_music_path = find_mp3_files() # Find background music + final_video = add_background_music(final_video, bg_music_path, bg_music_volume=0.08) # Use default volume + + yield f"Exporting final video to {OUTPUT_VIDEO_FILENAME}...", None + print(f"Exporting final video to {OUTPUT_VIDEO_FILENAME}...") + try: + # Use a temporary output file first for safety + temp_output_filename = os.path.join(TEMP_FOLDER, f"temp_{OUTPUT_VIDEO_FILENAME}") + final_video.write_videofile(temp_output_filename, codec='libx264', fps=24, preset='veryfast') + # Move the final file to the intended location after successful export + shutil.move(temp_output_filename, OUTPUT_VIDEO_FILENAME) + print(f"Final video saved as {OUTPUT_VIDEO_FILENAME}") + output_path = OUTPUT_VIDEO_FILENAME + except Exception as e: + print(f"Error exporting video: {e}") + output_path = None + yield f"Video export failed: {e}", None # Provide error message in status - # Handler for narration textbox change - def handle_narration_change(new_narration, current_app_state, evt: gr.EventData): - # --- Find the segment associated with the changed component --- - # This is tricky. Gradio's event data might not easily give the segment_id. - # Option 1: Use elem_id if accessible via evt.target? (Needs verification) - # Option 2: Iterate state and match original narration? (Brittle if user edits slightly) - # Option 3: Add a hidden gr.Textbox(value=segment_id) in the row and include it in inputs. (Most robust) - - # --- Simplified Approach (Less Robust): Assume order or match text --- - # This needs improvement for robustness, using a hidden ID is better. - # Let's *assume* we can get the index or ID somehow. For now, print warning. - print(f"Narration changed to: '{new_narration[:30]}...'") - print("Warning: Linking narration change to specific segment state needs a robust ID mechanism (e.g., hidden component).") - # Placeholder: Find segment by original text (prone to errors) - found_segment = None - for segment in current_app_state: - # This matching is weak. Need a better way. - if segment["original_narration"] == new_narration: # Unlikely to work well - found_segment = segment - break - - if found_segment: - print(f"Updating narration for segment {found_segment['segment_id']}") - found_segment["narration"] = new_narration - else: - print("Could not reliably link narration change to state segment.") - - - # Return the potentially modified state - return current_app_state - - - # Handler for media upload button - def handle_media_upload(uploaded_file, current_app_state, segment_id, temp_dir): - print(f"\nMedia uploaded for segment: {segment_id}") - if uploaded_file is None: - print("Upload event triggered but file is None.") - # Need to return original state and original media preview value - target_segment = next((s for s in current_app_state if s["segment_id"] == segment_id), None) - original_media_path = target_segment["original_media_path"] if target_segment else None - # Determine if original was video or image to return correct update type - is_video = target_segment["media_type"] == "video" if target_segment else False - return current_app_state, gr.Video.update(value=original_media_path) if is_video else gr.Image.update(value=original_media_path) - - - # Find the segment in the state - target_segment = None - for segment in current_app_state: - if segment["segment_id"] == segment_id: - target_segment = segment - break - - if not target_segment: - print(f"Error: Could not find segment {segment_id} in state to update media.") - # Return original state and no change to media preview (or handle error state) - # This requires knowing the original preview value, which is complex here. - # Simplification: Return state, let UI potentially be out of sync on error. - return current_app_state, gr.update() # No change update - - - # Process the uploaded file - original_file_path = uploaded_file.name # Gradio provides a temp path - file_name = os.path.basename(original_file_path) - file_ext = os.path.splitext(file_name)[1].lower() - save_path = os.path.join(temp_dir, f"user_upload_{segment_id}{file_ext}") - - print(f"Copying uploaded file '{file_name}' to '{os.path.basename(save_path)}'") + # Clean up temporary folder + yield "Cleaning up temporary files...", output_path # Update status before cleanup + if TEMP_FOLDER and os.path.exists(TEMP_FOLDER): try: - shutil.copy(original_file_path, save_path) + shutil.rmtree(TEMP_FOLDER) + print(f"Cleaned up temp folder: {TEMP_FOLDER}") except Exception as e: - print(f"Error copying uploaded file: {e}") - # Return original state and no preview update - is_video = target_segment["media_type"] == "video" - return current_app_state, gr.Video.update(value=target_segment["media_path"]) if is_video else gr.Image.update(value=target_segment["media_path"]) - - - # Update the segment state - target_segment["media_path"] = save_path - target_segment["user_uploaded"] = True - if file_ext in ['.mp4', '.mov', '.avi', '.webm', '.mkv']: - target_segment["media_type"] = "video" - print("Media type set to VIDEO") - media_update = gr.Video.update(value=save_path) # Update Video component - elif file_ext in ['.jpg', '.jpeg', '.png', '.webp', '.bmp', '.gif']: - target_segment["media_type"] = "image" - print("Media type set to IMAGE") - # Validate/convert uploaded image if necessary - validated_path = download_image(save_path, save_path) # Use download_image for validation/conversion - if validated_path: - target_segment["media_path"] = validated_path # Update path if converted - media_update = gr.Image.update(value=validated_path) # Update Image component - else: - print("Uploaded image failed validation/conversion. Reverting state.") - # Revert state changes - target_segment["media_path"] = target_segment["original_media_path"] # Or previous path if edits allowed - target_segment["user_uploaded"] = False - target_segment["media_type"] = "video" if target_segment["original_media_path"].lower().endswith(('.mp4', '.mov')) else "image" - # Return original preview value - is_video = target_segment["media_type"] == "video" - media_update = gr.Video.update(value=target_segment["media_path"]) if is_video else gr.Image.update(value=target_segment["media_path"]) - else: - print(f"Warning: Unknown uploaded file type '{file_ext}'. Assuming image.") - target_segment["media_type"] = "image" # Default assumption - media_update = gr.Image.update(value=save_path) + print(f"Error cleaning up temp folder {TEMP_FOLDER}: {e}") + TEMP_FOLDER = None # Reset global + yield "Done!", output_path # Final status update - print(f"Segment {segment_id} state updated with new media: {os.path.basename(target_segment['media_path'])}") - # Return the updated state and the update for the media preview component - return current_app_state, media_update +# ---------------- Gradio Interface Definition (Blocks) ---------------- # +# Need lists to hold the dynamic UI components for segments +segment_editing_groups = [] +segment_text_inputs = [] +segment_file_inputs = [] - # Handler for "Generate Final Video" button - def handle_final_generation(current_app_state, temp_dir, - resolution_str, kenburns_style, - bg_music_file, bg_volume, - caption_enabled, cap_font_size, cap_font_color, cap_bg_color, cap_position): - print("\n--- Step 3: Generating Final Video ---") - start_time = time.time() +with gr.Blocks() as demo: + gr.Markdown("# 🤖 AI Documentary Video Generator 🎬") + gr.Markdown("Enter a concept to generate a funny documentary script. You can then edit the script text and replace the suggested media for each segment before generating the final video.") - if not current_app_state: - yield { status_update: gr.update(value="Status: No script data loaded. Please generate script first.") } - return - if not temp_dir or not os.path.isdir(temp_dir): - yield { status_update: gr.update(value="Status: Error - Temporary directory missing or invalid.") } - return + # --- Global Settings --- + with gr.Accordion("Global Settings", open=True): + user_concept_input = gr.Textbox(label="Video Concept", placeholder="e.g., The secret life of pigeons, Why socks disappear in the laundry, The futility of alarm clocks...") + with gr.Row(): + resolution_radio = gr.Radio(["Full (1920x1080)", "Short (1080x1920)"], label="Video Resolution", value="Full (1920x1080)") + bg_music_volume_slider = gr.Slider(minimum=0, maximum=1.0, value=0.08, step=0.01, label="Background Music Volume") - status_msg = f"Status: Starting final video generation for {len(current_app_state)} segments..." - yield { status_update: gr.update(value=status_msg), final_video_output: gr.update(value=None) } + # --- Caption Settings --- + with gr.Accordion("Caption Settings", open=False): + caption_enabled_radio = gr.Radio(["Yes", "No"], label="Show Captions?", value="Yes") + caption_color_picker = gr.ColorPicker(label="Caption Text Color", value="#FFFFFF") # Default white + caption_bg_color_picker = gr.ColorPicker(label="Caption Background Color (with transparency)", value="rgba(0, 0, 0, 0.25)") # Default semi-transparent black + caption_size_slider = gr.Slider(minimum=20, maximum=100, value=45, step=1, label="Caption Font Size") + caption_position_radio = gr.Radio(["Top", "Middle", "Bottom"], label="Caption Position", value="Bottom") + caption_stroke_color_picker = gr.ColorPicker(label="Caption Stroke Color", value="#000000") # Default black stroke + caption_stroke_width_slider = gr.Slider(minimum=0, maximum=5, value=2, step=0.5, label="Caption Stroke Width") - # --- Prepare Configuration --- - if "Vertical Short" in resolution_str: - target_resolution = (1080, 1920) - else: - target_resolution = (1920, 1080) # Default Full HD - - caption_options = { - "enabled": caption_enabled, # "Yes" or "No" - "fontsize": cap_font_size, - "color": cap_font_color, - "bg_color": cap_bg_color, # Pass RGBA string - "position": cap_position, - # Add other fixed options if needed - "font": 'Arial-Bold', # Or make this configurable - "stroke_color": '#000000', # Black stroke - "stroke_width": 1.5, - } - - bg_music_path = bg_music_file.name if bg_music_file else None # Get path from Gradio file object - - # --- Process Clips --- - clips = [] - total_segments = len(current_app_state) - for i, segment_data in enumerate(current_app_state): - status_msg = f"Status: Processing segment {i+1}/{total_segments} ('{segment_data['prompt'][:25]}...')" - yield { status_update: gr.update(value=status_msg) } - - # Pass all necessary data to create_clip - clip = create_clip( - segment_data=segment_data, - temp_dir=temp_dir, - target_resolution=target_resolution, - caption_options=caption_options, - kenburns_effect=kenburns_style - ) - - if clip: - clips.append(clip) - print(f"Segment {i+1} clip added.") - else: - print(f"Warning: Failed to create clip for segment {i+1}. Skipping.") - # Attempt to continue without the failed clip + generate_script_btn = gr.Button("Generate Script", variant="primary") - if not clips: - yield { status_update: gr.update(value="Status: Error - No valid clips were created. Video generation failed.") } - # Consider cleanup? - # if temp_dir and os.path.isdir(temp_dir): shutil.rmtree(temp_dir, ignore_errors=True) - # yield { temp_dir_state: None } - return + # --- Status and Script Output --- + status_output = gr.Label(label="Status", value="") + script_preview_markdown = gr.Markdown("### Generated Script Preview\n\nScript will appear here...") # Optional raw script preview - # --- Concatenate & Finalize --- - status_msg = f"Status: Concatenating {len(clips)} clips..." - yield { status_update: gr.update(value=status_msg) } + # --- State to hold parsed segments data and run config --- + segments_state = gr.State([]) # List of segment dictionaries + run_config_state = gr.State({}) # Dictionary for run configuration - final_video = None - try: - # Use compose method for potentially better handling of varying clip sizes/fps? Check docs. - final_video = concatenate_videoclips(clips, method="compose") - print("Clips concatenated successfully.") - except Exception as concat_err: - print(f"Error during video concatenation: {concat_err}") - print(traceback.format_exc()) - # Attempt cleanup of individual clips before erroring - for c in clips: - try: c.close() - except: pass - yield { status_update: gr.update(value=f"Status: Error during video concatenation: {concat_err}") } - return # Stop generation - - # Add background music (if provided and concatenation succeeded) - if final_video: - status_msg = "Status: Adding background music (if provided)..." - yield { status_update: gr.update(value=status_msg) } - final_video = add_background_music(final_video, bg_music_path, bg_volume) - - # Export Final Video - timestamp = time.strftime("%Y%m%d_%H%M%S") - output_filename = f"{OUTPUT_VIDEO_FILENAME_BASE}_{timestamp}.mp4" - # Save outside the temp dir for easier user access - final_output_path = os.path.abspath(output_filename) # Save in script's directory - - status_msg = f"Status: Exporting final video to {output_filename} (this may take time)..." - yield { status_update: gr.update(value=status_msg) } + # --- Dynamic Editing Area (Initially hidden) --- + # We create MAX_SEGMENTS_FOR_EDITING groups, and show/hide them dynamically + with gr.Column(visible=False) as editing_area: + gr.Markdown("### Edit Script Segments") + gr.Markdown("Review the AI-generated text and media suggestions below. Edit the text and/or upload your own image/video for any segment. If no file is uploaded, AI will fetch media based on the original prompt.") + for i in range(MAX_SEGMENTS_FOR_EDITING): + with gr.Group(visible=False) as segment_group: # Each group represents one segment + segment_editing_groups.append(segment_group) + gr.Markdown(f"**Segment {i+1}** (Prompt: )") # Placeholder for prompt text + # Using JS to update prompt text because Textbox is used for narration + # Alternatively, could use a non-editable gr.Label or gr.Textbox for prompt - try: - print(f"Writing final video to: {final_output_path}") - # Use recommended settings for web compatibility & performance - final_video.write_videofile( - final_output_path, - codec='libx264', # Good quality/compatibility codec - audio_codec='aac', # Standard audio codec - temp_audiofile=os.path.join(temp_dir, f'temp_audio_{timestamp}.aac'), # Explicit temp audio file - preset='medium', # 'medium' or 'fast' for balance, 'ultrafast' for speed - ffmpeg_params=[ # Ensure compatibility - '-pix_fmt', 'yuv420p', - '-profile:v', 'high', - '-level', '4.0', # Broad compatibility level - # '-tune', 'fastdecode', # Optional: optimize for playback speed - '-movflags', '+faststart' # Important for web streaming - ], - threads=max(1, os.cpu_count() // 2), # Use multiple threads - logger='bar', # Show progress bar - fps=24 # Standard FPS - ) - final_duration = final_video.duration - print(f"Final video exported successfully ({final_duration:.1f}s).") - - except Exception as write_err: - print(f"Error writing final video file: {write_err}") - print(traceback.format_exc()) - yield { status_update: gr.update(value=f"Status: Error writing video file: {write_err}") } - # Don't delete temp dir on write error, user might want intermediate files - return # Stop generation - finally: - # --- Resource Cleanup --- - print("Closing video clips...") - if final_video: - try: final_video.close() - except: pass - for c in clips: - try: c.close() - except: pass - # Close any opened audio files explicitly if needed (AudioFileClip handles this mostly) - - - end_time = time.time() - total_time = end_time - start_time - status_msg = f"Status: Video generation complete! Saved as {output_filename} ({final_duration:.1f}s). Total time: {total_time:.1f}s." - cleanup_msg_text = f"Temporary files are in: {temp_dir}\n(You can manually delete this folder later)" - - # --- Optional: Auto-cleanup --- - # print(f"Cleaning up temporary directory: {temp_dir}") - # shutil.rmtree(temp_dir, ignore_errors=True) - # cleanup_msg_text = "Temporary files automatically cleaned up." - # temp_dir_state_update = None # Clear state if cleaned up - - yield { - status_update: gr.update(value=status_msg), - final_video_output: gr.update(value=final_output_path), # Show the final video - cleanup_message: gr.update(value=cleanup_msg_text), - # temp_dir_state: temp_dir_state_update # Update temp dir state if cleaned - } + segment_text = gr.Textbox(label="Narration Text", lines=2, interactive=True) + segment_text_inputs.append(segment_text) + + segment_file = gr.File(label="Upload Custom Media (Image or Video)", type="filepath", interactive=True) + segment_file_inputs.append(segment_file) + + generate_video_btn = gr.Button("Generate Video", variant="primary") - # --- Wire UI component events to handler functions --- + # --- Final Video Output --- + final_video_output = gr.Video(label="Generated Video") - # Generate Script Button + # --- Event Handlers --- + + # Generate Script Button Click generate_script_btn.click( - fn=handle_script_generation, - inputs=[concept_input, video_ratio_slider, temp_dir_state], # Pass current temp_dir for cleanup - outputs=[app_state, temp_dir_state, status_update, segment_editors_area, generate_video_btn] # Update state, temp_dir, status, editor UI, final button interactivity + fn=generate_script_and_show_editor, + inputs=[ + user_concept_input, + resolution_radio, + caption_enabled_radio, + caption_color_picker, + caption_size_slider, + caption_position_radio, + caption_bg_color_picker, + caption_stroke_color_picker, + caption_stroke_width_slider + ], + outputs=[ + run_config_state, + status_output, + editing_area, # Show the editing area + # Outputs to update visibility of segment groups + *segment_editing_groups, + # Outputs to update values of segment textboxes + *segment_text_inputs, + # Outputs to update values (clear) of segment file uploads + *segment_file_inputs, + # Output to update the segments_state + segments_state + ] ) - # Final Generate Button + # Generate Video Button Click generate_video_btn.click( - fn=handle_final_generation, + fn=generate_video_from_edited, inputs=[ - app_state, temp_dir_state, - resolution_input, kenburns_select, - bg_music_upload, bg_music_volume, - caption_enable_radio, caption_font_size, caption_font_color, cap_bg_color, caption_position + run_config_state, # Pass run config + segments_state, # Pass the original parsed segments data + *segment_text_inputs, # Pass list of edited text values + *segment_file_inputs # Pass list of uploaded file paths ], - outputs=[status_update, final_video_output, cleanup_message] # Update status, output video, cleanup msg + outputs=[status_output, final_video_output] # Yield status updates and final video ) - # Note: Event handlers for dynamic components (narration change, media upload) - # are wired inside the `handle_script_generation` function when the components are created. + # Add JS to update segment prompt labels after script generation + # This requires defining IDs in the Markdown previously + demo.load( + None, + None, + None, + _js=f""" + function updateSegmentPrompts(segments_data) {{ + if (!segments_data) return; + for (let i = 0; i < segments_data.length; i++) {{ + const promptSpan = document.getElementById('segment-prompt-' + i); + if (promptSpan) {{ + promptSpan.textContent = segments_data[i].original_prompt; + }} + }} + // Hide unused prompt spans + for (let i = segments_data.length; i < {MAX_SEGMENTS_FOR_EDITING}; i++) {{ + const promptSpan = document.getElementById('segment-prompt-' + i); + if (promptSpan) {{ + promptSpan.textContent = ''; // Clear text + }} + }} + }} + """ + ) + # Trigger the JS function whenever segments_state changes + segments_state.change( + None, + segments_state, + None, + _js=""" + (segments_data) => { + updateSegmentPrompts(segments_data); + } + """ + ) -# --- Optional: Attempt ImageMagick Policy Fix on Startup --- -# Run this only if you consistently have caption rendering issues. -# May require running the script with sudo or manual execution of the commands. -# fix_imagemagick_policy() +# Launch the interface +if __name__ == "__main__": + # Attempt ImageMagick policy fix on script startup + # This helps but might still require manual sudo depending on system config + fix_imagemagick_policy() + print("Launching Gradio interface...") + # Make sure to set PEXELS_API_KEY and OPENROUTER_API_KEY environment variables + # or replace 'YOUR_PEXELS_API_KEY' and 'YOUR_OPENROUTER_API_KEY' above. + if PEXELS_API_KEY == 'YOUR_PEXELS_API_KEY': + print("Warning: PEXELS_API_KEY is not configured. Media search may fail.") + if OPENROUTER_API_KEY == 'YOUR_OPENROUTER_API_KEY': + print("Warning: OPENROUTER_API_KEY is not configured. Script generation will fail.") -# --- Launch the Gradio App --- -if __name__ == "__main__": - print("Launching Gradio App...") - # share=True exposes it publicly via Gradio's tunneling service. Remove if running locally only. - # debug=True provides more detailed error messages in the console. - demo.launch(share=True, debug=True) \ No newline at end of file + + demo.launch(share=True) # Set share=True to get a public link \ No newline at end of file