diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -1,9 +1,9 @@ # -*- coding: utf-8 -*- from kokoro import KPipeline import soundfile as sf -import torch +import torch # Keep torch import if Kokoro needs it implicitly import os -from moviepy.editor import ( # Added CompositeAudioClip here +from moviepy.editor import ( VideoFileClip, AudioFileClip, ImageClip, concatenate_videoclips, CompositeVideoClip, CompositeAudioClip, TextClip ) @@ -17,70 +17,92 @@ import math import requests import re import time -from pydub import AudioSegment +# from pydub import AudioSegment # pydub wasn't used, removed import import numpy as np from bs4 import BeautifulSoup from urllib.parse import quote import gradio as gr import shutil +import traceback # For detailed error logging + +# --- Initialize Kokoro TTS pipeline --- +# Moved initialization into a function to handle potential errors more gracefully +# and allow retrying or lazy loading if needed. +pipeline = None +def initialize_kokoro(): + global pipeline + if pipeline is None: + try: + print("Initializing Kokoro TTS pipeline (lang_code='a')...") + # Using 'a' for American English as per original code + pipeline = KPipeline(lang_code='a') + print("Kokoro TTS pipeline initialized successfully.") + except Exception as e: + print(f"FATAL ERROR initializing Kokoro pipeline: {e}") + print("TTS generation will not be available.") + pipeline = None # Ensure pipeline is None if init fails + return pipeline -# Initialize Kokoro TTS pipeline (American English) -# Consider moving this inside a function if memory is a concern or if it needs lazy loading -try: - pipeline = KPipeline(lang_code='a') -except Exception as e: - print(f"Error initializing Kokoro pipeline: {e}. TTS might not work.") - pipeline = None # Set pipeline to None if initialization fails +# Attempt initialization once at the start +initialize_kokoro() + +# --- Configure ImageMagick --- # Ensure ImageMagick path is correct for your environment -# If running in a container or different OS, this path might need adjustment. -# It's often better to rely on MoviePy finding it automatically if it's in the system PATH. try: - # Check if convert exists at the specified path - if os.path.exists("/usr/bin/convert"): - mpy_config.change_settings({"IMAGEMAGICK_BINARY": r"/usr/bin/convert"}) - print("ImageMagick path set to /usr/bin/convert") - else: - print("Warning: ImageMagick not found at /usr/bin/convert. TextClip captions might fail.") - # Optionally, try to find it automatically or raise a more specific error - # For example, MoviePy might find it if it's in the PATH. - # If it's known to be elsewhere, set that path. - # mpy_config.change_settings({"IMAGEMAGICK_BINARY": r"/path/to/your/convert"}) + # Check common paths or rely on system PATH + imagemagick_path = None + common_paths = ["/usr/bin/convert", "/usr/local/bin/convert", "/opt/homebrew/bin/convert"] + for path in common_paths: + if os.path.exists(path): + imagemagick_path = path + break + + if imagemagick_path: + mpy_config.change_settings({"IMAGEMAGICK_BINARY": imagemagick_path}) + print(f"ImageMagick path set to: {imagemagick_path}") + # If not found in common paths, MoviePy might still find it if it's in the system PATH. + # We only print a warning if we couldn't find it in common places. + elif not any(os.path.exists(p) for p in ["convert", "magick"]): # Check PATH roughly + print("Warning: ImageMagick 'convert' command not found in common paths or system PATH.") + print(" TextClip captions requiring ImageMagick may fail.") except Exception as e: - print(f"Error configuring ImageMagick: {e}. TextClip captions might fail.") + print(f"Warning: Error configuring ImageMagick: {e}. TextClip captions might fail.") # --- Global Configuration --- -# It's safer to get API keys from environment variables or a config file -PEXELS_API_KEY = os.getenv('PEXELS_API_KEY', 'BhJqbcdm9Vi90KqzXKAhnEHGsuFNv4irXuOjWtT761U49lRzo03qBGna') # Replace with your key or use env var -OPENROUTER_API_KEY = os.getenv('OPENROUTER_API_KEY', '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') # Replace or use env var -OPENROUTER_MODEL = "mistralai/mistral-small-3.1-24b-instruct:free" # Or choose another model -OUTPUT_VIDEO_FILENAME = "final_video.mp4" # Default output name -USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" - -# These will be set by the Gradio inputs +PEXELS_API_KEY = os.getenv('PEXELS_API_KEY', 'BhJqbcdm9Vi90KqzXKAhnEHGsuFNv4irXuOjWtT761U49lRzo03qBGna') # Replace/Use Env Var +OPENROUTER_API_KEY = os.getenv('OPENROUTER_API_KEY', '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') # Replace/Use Env Var +# Check if API keys are set properly +if not PEXELS_API_KEY or 'BhJqbcdm9Vi90KqzXKAhnEHGsuFNv4irXuOjWtT761U49lRzo03qBGna' in PEXELS_API_KEY: + print("WARNING: Pexels API Key seems to be the default or not set. Media fetching might fail.") +if not OPENROUTER_API_KEY or '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' in OPENROUTER_API_KEY: + print("WARNING: OpenRouter API Key seems to be the default or not set. Script generation might fail.") + +OPENROUTER_MODEL = "mistralai/mistral-small-3.1-24b-instruct:free" +OUTPUT_VIDEO_FILENAME = "final_video.mp4" +USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36" # Updated UA + TARGET_RESOLUTION = None CAPTION_COLOR = None TEMP_FOLDER = None # --- Helper Functions (generate_script, parse_script, search_*, download_*) --- -# (Keep these functions as they were in your original code, they seem okay, -# but ensure error handling is robust, especially for API calls and downloads) -# ... (generate_script function) ... +# Keep these functions as previously refined, ensuring robust error handling. +# (Code for these functions omitted for brevity - assume they are the robust versions from the previous response) +# ... (generate_script function - with API key checks) ... def generate_script(user_input): # Ensure API key is available - if not OPENROUTER_API_KEY or OPENROUTER_API_KEY == 'YOUR_API_KEY_HERE': - print("Error: OpenRouter API Key not set.") + if not OPENROUTER_API_KEY or 'sk-or-v1-' not in OPENROUTER_API_KEY: # Basic check + print("Error: OpenRouter API Key not set or looks invalid.") return "Error: OpenRouter API Key not configured." headers = { 'Authorization': f'Bearer {OPENROUTER_API_KEY}', - # Optional but recommended by OpenRouter 'HTTP-Referer': 'http://localhost:7860', # Or your app's URL 'X-Title': 'AI Documentary Maker' } - # Your existing prompt structure prompt = f"""Short Documentary Script GeneratorInstructions: If I say "use this," just output the script exactly as I gave it. If I only give topics, generate a script based on them. @@ -119,34 +141,29 @@ Now here is the Topic/scrip: {user_input} data = { 'model': OPENROUTER_MODEL, 'messages': [{'role': 'user', 'content': prompt}], - 'temperature': 0.4, # Adjust creativity vs. predictability - 'max_tokens': 1024 # Set a reasonable limit + 'temperature': 0.4, + 'max_tokens': 1024 } try: - # Increased timeout for potentially longer generations response = requests.post('https://openrouter.ai/api/v1/chat/completions', headers=headers, json=data, timeout=60) - response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) - + response.raise_for_status() response_data = response.json() if 'choices' in response_data and len(response_data['choices']) > 0: script_content = response_data['choices'][0]['message']['content'] - # Basic validation: Check if it contains the expected format if '[' in script_content and ']' in script_content: print("Script generated successfully.") return script_content.strip() else: print("Warning: Generated script might not be in the expected format.") - return script_content.strip() # Return anyway, maybe user can fix it + return script_content.strip() else: print("API Error: Unexpected response format from OpenRouter:", response_data) return "Error: Could not parse script from API response." - except requests.exceptions.Timeout: print("API Error: Request to OpenRouter timed out.") return "Error: Script generation timed out." except requests.exceptions.RequestException as e: print(f"API Error: Request failed: {e}") - # Log more details if possible, e.g., response.text for non-200 status error_details = f"Status Code: {e.response.status_code}, Response: {e.response.text}" if e.response else str(e) print(error_details) return f"Error: Failed to connect to script generation service ({error_details})." @@ -154,7 +171,7 @@ Now here is the Topic/scrip: {user_input} print(f"Error during script generation: {e}") return f"Error: An unexpected error occurred during script generation: {e}" -# ... (parse_script function) ... +# ... (parse_script function - robust version) ... def parse_script(script_text): sections = {} current_title = None @@ -166,157 +183,98 @@ def parse_script(script_text): lines = script_text.strip().splitlines() for line in lines: line = line.strip() - if not line: # Skip empty lines - continue - - # Improved regex to handle potential extra spaces + if not line: continue match = re.match(r'^\s*\[([^\]]+)\]\s*(.*)', line) if match: - # If a new title is found, save the previous section (if any) if current_title is not None and current_text: sections[current_title] = current_text.strip() - current_title = match.group(1).strip() - current_text = match.group(2).strip() + " " # Add space for potential multi-line narration + current_text = match.group(2).strip() + " " elif current_title: - # Append to the current narration if it's not a title line current_text += line + " " - - # Save the last section after the loop finishes if current_title is not None and current_text: sections[current_title] = current_text.strip() - if not sections: - print("Warning: No sections found in the script. Check formatting (e.g., [Title] Narration).") + print("Warning: No sections found in the script. Check formatting ([Title] Narration).") return [] - elements = [] for title, narration in sections.items(): - # Basic validation if not title or not narration: - print(f"Warning: Skipping section with empty title or narration. Title: '{title}', Narration: '{narration}'") + print(f"Warning: Skipping section with empty title or narration. Title: '{title}'") continue - - # Media element uses the title as prompt - media_element = {"type": "media", "prompt": title, "effects": "fade-in"} # Default effect - - # TTS element uses the narration + media_element = {"type": "media", "prompt": title, "effects": "fade-in"} words = narration.split() - # Simple duration estimate: base 2s + 0.4s per word, capped maybe? - # Needs refinement based on TTS speed. Kokoro might provide duration. - duration = max(2.0, len(words) * 0.4) - tts_element = {"type": "tts", "text": narration, "voice": "en", "duration": duration} # Duration is estimate - + duration = max(2.0, len(words) * 0.4) # Simple estimate + tts_element = {"type": "tts", "text": narration, "voice": "en", "duration": duration} elements.append(media_element) elements.append(tts_element) - print(f"Script parsed into {len(elements)//2} sections.") return elements - except Exception as e: print(f"Error parsing script: {e}") - # Consider logging the script_text that caused the error for debugging - # print(f"Script content causing error:\n{script_text}") + traceback.print_exc() return [] -# ... (search_pexels_videos function - check API key handling) ... +# ... (search_pexels_videos - robust version with API key check) ... def search_pexels_videos(query, pexels_api_key): - if not pexels_api_key or pexels_api_key == 'YOUR_PEXELS_API_KEY': # Basic check + if not pexels_api_key or 'YOUR_PEXELS_API_KEY' in pexels_api_key: # Basic check print("Error: Pexels API Key not configured.") return None headers = {'Authorization': pexels_api_key} base_url = "https://api.pexels.com/videos/search" num_pages = 3 - videos_per_page = 15 # Pexels default, max is 80 + videos_per_page = 15 max_retries = 3 - retry_delay = 2 # Start with slightly longer delay - search_query = query + retry_delay = 2 all_videos = [] - print(f"Searching Pexels videos for: '{query}'...") for page in range(1, num_pages + 1): print(f" Fetching page {page}...") - params = {"query": search_query, "per_page": videos_per_page, "page": page, "orientation": "landscape"} # Added orientation + params = {"query": query, "per_page": videos_per_page, "page": page, "orientation": "landscape"} for attempt in range(max_retries): try: - response = requests.get(base_url, headers=headers, params=params, timeout=20) # Increased timeout - response.raise_for_status() # Will raise HTTPError for bad responses - + response = requests.get(base_url, headers=headers, params=params, timeout=20) + response.raise_for_status() data = response.json() videos = data.get("videos", []) if not videos: print(f" No videos found on page {page} for '{query}'.") - # If no videos on page 1, maybe stop searching? Depends on desired behavior. - # if page == 1: return None - break # Stop trying this page - + break found_on_page = 0 for video in videos: video_files = video.get("video_files", []) - # Prefer HD or FHD if available, fall back otherwise - hd_link = None - fhd_link = None - best_link = None + hd_link, fhd_link, best_link = None, None, None for file in video_files: - quality = file.get("quality") - link = file.get("link") - width = file.get("width", 0) + quality, link, width = file.get("quality"), file.get("link"), file.get("width", 0) if not link: continue - - # Basic check for MP4 format if possible (sometimes link implies it) - if not best_link: best_link = link # Fallback - - # Prioritize based on quality and width (prefer wider for landscape) - if quality == "hd" and width >= 1280: - hd_link = link - if quality == "fhd" and width >= 1920: - fhd_link = link # Prefer FHD if available - - chosen_link = fhd_link or hd_link or best_link # Prioritize FHD > HD > Best available + if not best_link: best_link = link + if quality == "hd" and width >= 1280: hd_link = link + if quality == "fhd" and width >= 1920: fhd_link = link + chosen_link = fhd_link or hd_link or best_link if chosen_link: all_videos.append(chosen_link) found_on_page += 1 - print(f" Found {found_on_page} suitable video links on page {page}.") - break # Success for this page, move to next page - + break # Success for this page except requests.exceptions.Timeout: print(f" Timeout occurred (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay}s...") - time.sleep(retry_delay) - retry_delay *= 1.5 # Less aggressive backoff + time.sleep(retry_delay); retry_delay *= 1.5 except requests.exceptions.HTTPError as e: - if e.response.status_code == 429: # Rate limit + if e.response.status_code == 429: print(f" Rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay}s...") - time.sleep(retry_delay) - retry_delay *= 1.5 - elif e.response.status_code == 400 and "Invalid parameters" in e.response.text: - print(f" Pexels API Error: Invalid parameters for query '{query}'. Skipping video search.") - return None # Don't retry if params are bad - elif e.response.status_code == 404: # Not Found (might indicate bad query) - print(f" Pexels API Error: Query '{query}' not found (404). Skipping video search.") - return None + time.sleep(retry_delay); retry_delay *= 1.5 + elif e.response.status_code in [400, 404]: + print(f" Pexels API Error {e.response.status_code} for query '{query}'. Skipping video search. Response: {e.response.text}") + return None else: print(f" HTTP Error fetching videos: {e.response.status_code} {e.response.text} (attempt {attempt+1}/{max_retries})") - if attempt < max_retries - 1: - print(f" Retrying in {retry_delay}s...") - time.sleep(retry_delay) - retry_delay *= 1.5 - else: - print(" Max retries reached for HTTP error.") - return None # Failed after retries + if attempt < max_retries - 1: time.sleep(retry_delay); retry_delay *= 1.5 + else: return None except requests.exceptions.RequestException as e: print(f" Network error: {e} (attempt {attempt+1}/{max_retries})") - if attempt < max_retries - 1: - print(f" Retrying in {retry_delay}s...") - time.sleep(retry_delay) - retry_delay *= 1.5 - else: - print(" Max retries reached for network error.") - return None # Failed after retries - # Optional: Add a small delay between page requests to be nice to the API - time.sleep(0.5) - - + if attempt < max_retries - 1: time.sleep(retry_delay); retry_delay *= 1.5 + else: return None + time.sleep(0.5) # Delay between pages if all_videos: random_video = random.choice(all_videos) print(f"Selected random video from {len(all_videos)} found links for '{query}'.") @@ -325,110 +283,73 @@ def search_pexels_videos(query, pexels_api_key): print(f"No suitable Pexels videos found for query: '{query}' after searching {num_pages} pages.") return None -# ... (search_pexels_images function - check API key handling) ... +# ... (search_pexels_images - robust version with API key check) ... def search_pexels_images(query, pexels_api_key): - if not pexels_api_key or pexels_api_key == 'YOUR_PEXELS_API_KEY': + if not pexels_api_key or 'YOUR_PEXELS_API_KEY' in pexels_api_key: print("Error: Pexels API Key not configured.") return None headers = {'Authorization': pexels_api_key} url = "https://api.pexels.com/v1/search" - # Request more images to increase chance of finding a good one params = {"query": query, "per_page": 15, "orientation": "landscape"} max_retries = 3 retry_delay = 2 - print(f"Searching Pexels images for: '{query}'...") for attempt in range(max_retries): try: response = requests.get(url, headers=headers, params=params, timeout=15) response.raise_for_status() - data = response.json() photos = data.get("photos", []) if photos: - # Select from top results, maybe filter by size if needed later - potential_photos = photos[:min(10, len(photos))] # Consider top 10 + potential_photos = photos[:min(10, len(photos))] chosen_photo = random.choice(potential_photos) - # Prefer 'large' or 'original' for better quality img_url = chosen_photo.get("src", {}).get("large2x") or \ chosen_photo.get("src", {}).get("original") or \ chosen_photo.get("src", {}).get("large") if img_url: print(f"Found {len(photos)} images, selected one for '{query}'.") return img_url - else: - print(f" Warning: Found photo for '{query}' but couldn't extract a suitable URL.") - # Try next photo? For now, we'll fail if the chosen one has no URL - break # Stop trying this attempt, move to retry or fail - + else: break # Chosen photo has no URL, retry or fail else: print(f" No Pexels images found for query: '{query}' on attempt {attempt+1}.") - # If no images found, don't retry immediately unless it was a temporary error - return None # No images found for this query - + return None except requests.exceptions.Timeout: print(f" Timeout occurred (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay}s...") - time.sleep(retry_delay) - retry_delay *= 1.5 + time.sleep(retry_delay); retry_delay *= 1.5 except requests.exceptions.HTTPError as e: - if e.response.status_code == 429: # Rate limit + if e.response.status_code == 429: print(f" Rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay}s...") - time.sleep(retry_delay) - retry_delay *= 1.5 - elif e.response.status_code == 400 and "Invalid parameters" in e.response.text: - print(f" Pexels API Error: Invalid parameters for query '{query}'. Skipping image search.") + time.sleep(retry_delay); retry_delay *= 1.5 + elif e.response.status_code in [400, 404]: + print(f" Pexels API Error {e.response.status_code} for query '{query}'. Skipping image search. Response: {e.response.text}") return None - elif e.response.status_code == 404: - print(f" Pexels API Error: Query '{query}' not found (404). Skipping image search.") - return None else: print(f" HTTP Error fetching images: {e.response.status_code} {e.response.text} (attempt {attempt+1}/{max_retries})") - if attempt < max_retries - 1: - print(f" Retrying in {retry_delay}s...") - time.sleep(retry_delay) - retry_delay *= 1.5 - else: - print(" Max retries reached for HTTP error.") - return None + if attempt < max_retries - 1: time.sleep(retry_delay); retry_delay *= 1.5 + else: return None except requests.exceptions.RequestException as e: print(f" Network error: {e} (attempt {attempt+1}/{max_retries})") - if attempt < max_retries - 1: - print(f" Retrying in {retry_delay}s...") - time.sleep(retry_delay) - retry_delay *= 1.5 - else: - print(" Max retries reached for network error.") - return None - + if attempt < max_retries - 1: time.sleep(retry_delay); retry_delay *= 1.5 + else: return None print(f"Failed to find Pexels images for query: '{query}' after all attempts.") return None -# ... (search_google_images function - Be mindful of Google's terms of service and potential blocks) ... +# ... (search_google_images - robust version, use with caution) ... def search_google_images(query): - # Warning: Scraping Google Images can be unreliable and may violate their ToS. - # Consider using official APIs if available (like Google Custom Search JSON API, which has costs). print(f"Attempting Google Images search for: '{query}' (use with caution)...") try: - # Encode query properly for URL - search_url = f"https://www.google.com/search?q={quote(query)}&tbm=isch&hl=en&safe=active" # Added params + search_url = f"https://www.google.com/search?q={quote(query)}&tbm=isch&hl=en&safe=active" headers = {"User-Agent": USER_AGENT} response = requests.get(search_url, headers=headers, timeout=15) response.raise_for_status() - soup = BeautifulSoup(response.text, "html.parser") img_tags = soup.find_all("img") - image_urls = [] for img in img_tags: - src = img.get("src") or img.get("data-src") # Check both src and data-src - if src and src.startswith("http") and "gstatic.com/images" not in src: # Filter out some common Google logos/icons - # Basic check for common image extensions (optional, might exclude valid URLs) - # if any(ext in src.lower() for ext in ['.jpg', '.jpeg', '.png', '.webp']): + src = img.get("src") or img.get("data-src") + if src and src.startswith("http") and "gstatic.com/images" not in src: image_urls.append(src) - if image_urls: - # Maybe take more than just the first few? Google often shows related images first. - # Select randomly from a larger sample if available. sample_size = min(len(image_urls), 10) selected_url = random.choice(image_urls[:sample_size]) print(f"Found {len(image_urls)} potential Google Images, selected one.") @@ -443,173 +364,96 @@ def search_google_images(query): print(f"Error parsing Google Images search results: {e}") return None - -# ... (download_image function - added more robust validation) ... +# ... (download_image - robust version with validation) ... def download_image(image_url, filename): - # Ensure TEMP_FOLDER exists - if not TEMP_FOLDER or not os.path.exists(TEMP_FOLDER): - print(f"Error: Temporary folder '{TEMP_FOLDER}' not set or does not exist.") - return None - # Construct full path + if not TEMP_FOLDER or not os.path.exists(TEMP_FOLDER): return None full_path = os.path.join(TEMP_FOLDER, os.path.basename(filename)) - try: headers = {"User-Agent": USER_AGENT} print(f"Downloading image from: {image_url} to {full_path}") - # Use stream=True and verify=True (default, but good practice) response = requests.get(image_url, headers=headers, stream=True, timeout=20, verify=True) - response.raise_for_status() # Check for HTTP errors - - # Check content type if possible + response.raise_for_status() content_type = response.headers.get('Content-Type', '').lower() if not content_type.startswith('image/'): print(f"Warning: URL content type ({content_type}) doesn't look like an image. Trying anyway.") - # Decide whether to proceed or return None here - with open(full_path, 'wb') as f: - for chunk in response.iter_content(chunk_size=8192): - f.write(chunk) + for chunk in response.iter_content(chunk_size=8192): f.write(chunk) print(f"Image downloaded to: {full_path}") - - # --- Validation Step --- + if os.path.getsize(full_path) < 1024: + print(f"Warning: Downloaded file {full_path} is very small ({os.path.getsize(full_path)} bytes). May be invalid.") try: - # Check if file size is reasonable (e.g., > 1KB) - if os.path.getsize(full_path) < 1024: - print(f"Warning: Downloaded file {full_path} is very small ({os.path.getsize(full_path)} bytes). May be invalid.") - # Decide if this is an error or just a warning - - # Use Pillow to verify and convert if necessary img = Image.open(full_path) - img.verify() # Verify that it's a valid image file - img.close() # Close after verify, reopen to load data - - # Re-open to load data and check/convert format + img.verify() + img.close() img = Image.open(full_path) if img.mode != 'RGB': print(f"Converting image {full_path} from {img.mode} to RGB.") img = img.convert('RGB') - # Overwrite the original file with the converted version - img.save(full_path, format='JPEG') # Save as JPEG for consistency + img.save(full_path, format='JPEG') img.close() - print(f"Image validated and ensured RGB format: {full_path}") - return full_path # Return the full path - + return full_path except (IOError, SyntaxError, Image.UnidentifiedImageError) as e_validate: print(f"Error: Downloaded file is not a valid image or is corrupted: {e_validate}") - # Clean up the invalid downloaded file - if os.path.exists(full_path): - os.remove(full_path) + if os.path.exists(full_path): os.remove(full_path) return None except Exception as e_img_proc: print(f"Error processing image after download: {e_img_proc}") - if os.path.exists(full_path): - os.remove(full_path) + if os.path.exists(full_path): os.remove(full_path) return None - - except requests.exceptions.Timeout: print(f"Error: Timeout while downloading image from {image_url}") return None except requests.exceptions.RequestException as e_download: print(f"Error: Image download failed: {e_download}") - # Clean up potentially incomplete file - if os.path.exists(full_path): - try: - os.remove(full_path) - except OSError: - pass # Ignore if removal fails + if os.path.exists(full_path): try: os.remove(full_path) catch OSError: pass return None except Exception as e_general: print(f"Error: General error during image download/processing: {e_general}") - if os.path.exists(full_path): - try: - os.remove(full_path) - except OSError: - pass + if os.path.exists(full_path): try: os.remove(full_path) catch OSError: pass return None -# ... (download_video function - added basic validation possibility) ... +# ... (download_video - robust version) ... def download_video(video_url, filename): - if not TEMP_FOLDER or not os.path.exists(TEMP_FOLDER): - print(f"Error: Temporary folder '{TEMP_FOLDER}' not set or does not exist.") - return None + if not TEMP_FOLDER or not os.path.exists(TEMP_FOLDER): return None full_path = os.path.join(TEMP_FOLDER, os.path.basename(filename)) - try: - headers = {"User-Agent": USER_AGENT} # Some servers might require this + headers = {"User-Agent": USER_AGENT} print(f"Downloading video from: {video_url} to {full_path}") - response = requests.get(video_url, stream=True, timeout=60, verify=True) # Longer timeout for videos + response = requests.get(video_url, stream=True, timeout=60, verify=True) response.raise_for_status() - - # Optional: Check content type content_type = response.headers.get('Content-Type', '').lower() if 'video/' not in content_type: print(f"Warning: URL content type ({content_type}) doesn't look like a video. Trying anyway.") - with open(full_path, 'wb') as f: - for chunk in response.iter_content(chunk_size=8192 * 10): # Larger chunk size for videos - f.write(chunk) + for chunk in response.iter_content(chunk_size=8192 * 10): f.write(chunk) print(f"Video downloaded successfully to: {full_path}") - - # --- Basic Validation --- - # Check file size (e.g., > 100KB for a video) - min_video_size = 100 * 1024 # 100 KB + min_video_size = 100 * 1024 if os.path.getsize(full_path) < min_video_size: print(f"Warning: Downloaded video file {full_path} is very small ({os.path.getsize(full_path)} bytes). May be invalid.") - # Optionally remove and return None if size is too small - # os.remove(full_path) - # return None - - # Optional: Try opening with moviepy to see if it's readable (can be slow) + # Optional: Test read with moviepy (can be slow) # try: - # with VideoFileClip(full_path) as test_clip: - # print(f"Video file {full_path} seems readable by MoviePy (duration: {test_clip.duration:.2f}s).") + # with VideoFileClip(full_path) as test_clip: pass # except Exception as e_read: - # print(f"Error: Downloaded file {full_path} could not be opened as a video: {e_read}") - # os.remove(full_path) - # return None - + # print(f"Error: Downloaded file {full_path} could not be opened as a video: {e_read}"); os.remove(full_path); return None return full_path - except requests.exceptions.Timeout: print(f"Error: Timeout while downloading video from {video_url}") return None except requests.exceptions.RequestException as e: print(f"Error: Video download failed: {e}") - if os.path.exists(full_path): - try: - os.remove(full_path) - except OSError: pass + if os.path.exists(full_path): try: os.remove(full_path) catch OSError: pass return None except Exception as e: print(f"Error during video download: {e}") - if os.path.exists(full_path): - try: - os.remove(full_path) - except OSError: pass + if os.path.exists(full_path): try: os.remove(full_path) catch OSError: pass return None -# --- Media Generation Logic --- -# Updated to accept video_probability -def generate_media(prompt, video_probability, current_index=0, total_segments=1): - """ - Generates media (video or image) for a given prompt. - Args: - prompt (str): The search term for the media. - video_probability (float): Chance (0.0 to 1.0) to prioritize video search. - current_index (int): Index of the current segment. - total_segments (int): Total number of segments in the video. - Returns: - dict or None: {"path": path_to_media, "asset_type": "video|image"} or None if failed. - """ - if not TEMP_FOLDER or not os.path.exists(TEMP_FOLDER): - print(f"Error: Temporary folder '{TEMP_FOLDER}' not set or does not exist.") - return None - # Sanitize prompt for use in filenames - safe_prompt = re.sub(r'[^\w\s-]', '', prompt).strip().replace(' ', '_') - safe_prompt = safe_prompt[:50] # Limit filename length +# --- Media Generation Logic (using video_probability) --- +def generate_media(prompt, video_probability, current_index=0, total_segments=1): + if not TEMP_FOLDER or not os.path.exists(TEMP_FOLDER): return None + safe_prompt = re.sub(r'[^\w\s-]', '', prompt).strip().replace(' ', '_')[:50] # --- Attempt 1: Pexels Video (based on probability) --- if random.random() < video_probability: @@ -619,15 +463,11 @@ def generate_media(prompt, video_probability, current_index=0, total_segments=1) if video_url: downloaded_video = download_video(video_url, video_file) if downloaded_video and os.path.exists(downloaded_video): - print(f"Using Pexels video: {downloaded_video}") + print(f"Using Pexels video: {os.path.basename(downloaded_video)}") return {"path": downloaded_video, "asset_type": "video"} - else: - print(f"Pexels video download failed for prompt: '{prompt}'") - else: - print(f"Pexels video search failed for prompt: '{prompt}'. Trying images next.") - else: - print(f"Skipping initial Pexels video search based on probability ({video_probability:.2f}). Trying images first.") - + else: print(f"Pexels video download failed for prompt: '{prompt}'") + else: print(f"Pexels video search failed for prompt: '{prompt}'. Trying images next.") + else: print(f"Skipping initial Pexels video search based on probability ({video_probability:.2f}). Trying images first.") # --- Attempt 2: Pexels Image --- print(f"Attempting Pexels image search for '{prompt}'...") @@ -636,102 +476,85 @@ def generate_media(prompt, video_probability, current_index=0, total_segments=1) if image_url_pexels: downloaded_image_pexels = download_image(image_url_pexels, image_file_pexels) if downloaded_image_pexels and os.path.exists(downloaded_image_pexels): - print(f"Using Pexels image: {downloaded_image_pexels}") + print(f"Using Pexels image: {os.path.basename(downloaded_image_pexels)}") return {"path": downloaded_image_pexels, "asset_type": "image"} - else: - print(f"Pexels image download failed for prompt: '{prompt}'") - else: - print(f"Pexels image search failed for prompt: '{prompt}'.") - + else: print(f"Pexels image download failed for prompt: '{prompt}'") + else: print(f"Pexels image search failed for prompt: '{prompt}'.") - # --- Attempt 3: Google Image (if Pexels failed) --- - # Use this cautiously due to reliability/ToS issues - if "news" in prompt.lower() or not image_url_pexels: # Prioritize Google for news or if Pexels failed + # --- Attempt 3: Google Image (if Pexels failed or 'news') --- + if "news" in prompt.lower() or not image_url_pexels: print(f"Attempting Google image search for '{prompt}'...") image_file_google = os.path.join(TEMP_FOLDER, f"{safe_prompt}_google_{current_index}.jpg") image_url_google = search_google_images(prompt) if image_url_google: downloaded_image_google = download_image(image_url_google, image_file_google) if downloaded_image_google and os.path.exists(downloaded_image_google): - print(f"Using Google image: {downloaded_image_google}") + print(f"Using Google image: {os.path.basename(downloaded_image_google)}") return {"path": downloaded_image_google, "asset_type": "image"} - else: - print(f"Google image download failed for prompt: '{prompt}'") - else: - print(f"Google image search failed for prompt: '{prompt}'.") - + else: print(f"Google image download failed for prompt: '{prompt}'") + else: print(f"Google image search failed for prompt: '{prompt}'.") - # --- Attempt 4: Fallback Image Search (if everything else failed) --- + # --- Attempt 4: Fallback Image Search --- print(f"All specific searches failed for '{prompt}'. Trying fallback terms...") fallback_terms = ["abstract background", "technology", "nature landscape", "cityscape", "texture"] - random.shuffle(fallback_terms) # Try in random order + random.shuffle(fallback_terms) for term in fallback_terms: print(f" Trying fallback image search with term: '{term}'") fallback_file = os.path.join(TEMP_FOLDER, f"fallback_{term.replace(' ','_')}_{current_index}.jpg") - # Avoid re-downloading same fallback if already exists (simple caching) if os.path.exists(fallback_file): - print(f" Using cached fallback image: {fallback_file}") + print(f" Using cached fallback image: {os.path.basename(fallback_file)}") return {"path": fallback_file, "asset_type": "image"} - fallback_url = search_pexels_images(term, PEXELS_API_KEY) if fallback_url: downloaded_fallback = download_image(fallback_url, fallback_file) if downloaded_fallback and os.path.exists(downloaded_fallback): - print(f"Using fallback image: {downloaded_fallback}") + print(f"Using fallback image: {os.path.basename(downloaded_fallback)}") return {"path": downloaded_fallback, "asset_type": "image"} - else: - print(f" Fallback image download failed for term: '{term}'") - else: - print(f" Fallback image search failed for term: '{term}'") - time.sleep(0.5) # Small delay between fallback attempts + else: print(f" Fallback image download failed for term: '{term}'") + else: print(f" Fallback image search failed for term: '{term}'") + time.sleep(0.5) # --- Final Failure --- print(f"FATAL: Failed to generate any visual asset for prompt: '{prompt}' after all attempts.") - # Optionally create a placeholder black screen or text clip here? - # For now, return None, which will cause the segment to be skipped. return None -# --- TTS Generation --- -def generate_tts(text, voice): - """Generates Text-to-Speech audio using Kokoro.""" - if not pipeline: - print("Error: Kokoro TTS pipeline not initialized.") - return None +# --- TTS Generation (Kokoro Fix) --- +def generate_tts(text, voice): # voice parameter kept for potential future use, but ignored for now + """Generates Text-to-Speech audio using Kokoro default voice for lang_code='a'.""" + global pipeline + # Ensure pipeline is initialized + if pipeline is None: + print("Error: Kokoro TTS pipeline is not initialized. Trying to initialize now...") + if not initialize_kokoro(): # Try initializing again + print("Error: Failed to initialize Kokoro TTS pipeline. Cannot generate TTS.") + return None + if not TEMP_FOLDER or not os.path.exists(TEMP_FOLDER): print(f"Error: Temporary folder '{TEMP_FOLDER}' not set or does not exist.") return None - # Sanitize text for filename (use first few words) safe_text_prefix = re.sub(r'[^\w\s-]', '', text[:20]).strip().replace(' ', '_') file_path = os.path.join(TEMP_FOLDER, f"tts_{safe_text_prefix}.wav") - # Simple caching: Check if the exact file already exists - # A more robust cache would hash the text content. if os.path.exists(file_path): - print(f"Using cached TTS for text starting with '{text[:20]}...'") - # Verify the cached file is valid (e.g., non-zero size) - if os.path.getsize(file_path) > 100: # Basic check for > 100 bytes + if os.path.getsize(file_path) > 100: + print(f"Using cached TTS for text starting with '{text[:20]}...'") return file_path else: - print(f"Cached TTS file {file_path} seems invalid (too small). Regenerating.") - try: - os.remove(file_path) - except OSError: pass # Ignore if removal fails - + print(f"Cached TTS file {file_path} seems invalid. Regenerating.") + try: os.remove(file_path) catch OSError: pass print(f"Generating TTS for: '{text[:50]}...'") try: - # Map 'en' to a specific Kokoro voice if needed, or handle other languages - kokoro_voice = 'en-us-heart' # Example: Using a specific English voice from Kokoro - # Adjust parameters as needed - generator = pipeline(text, voice=kokoro_voice, speed=1.0) # Adjust speed (0.5 to 2.0) + # --- Kokoro Fix --- + # Rely on the default voice for the initialized lang_code='a' + # Do NOT specify voice='en-us-heart' explicitly here. + generator = pipeline(text, speed=1.0) # Adjust speed if needed (0.5 to 2.0) audio_segments = [] - # Iterate through the generator to get audio chunks for audio_chunk in generator: - # The structure might depend on Kokoro version; adapt if needed - # Assuming 'audio_chunk' is the numpy array directly or within a tuple + # Adapt based on actual generator output structure if isinstance(audio_chunk, tuple) and len(audio_chunk) > 0 and isinstance(audio_chunk[-1], np.ndarray): audio_segments.append(audio_chunk[-1]) elif isinstance(audio_chunk, np.ndarray): @@ -739,86 +562,55 @@ def generate_tts(text, voice): else: print(f"Warning: Unexpected item from TTS generator: {type(audio_chunk)}") - if not audio_segments: print("Error: TTS generation produced no audio segments.") return None - # Concatenate audio segments full_audio = np.concatenate(audio_segments) + sf.write(file_path, full_audio, 24000) # Kokoro typically uses 24kHz + print(f"TTS audio saved to {os.path.basename(file_path)}") - # Write using soundfile (Kokoro typically uses 24000 Hz) - sf.write(file_path, full_audio, 24000) - print(f"TTS audio saved to {file_path}") - - # Final check if file exists and has size if os.path.exists(file_path) and os.path.getsize(file_path) > 100: return file_path else: print(f"Error: TTS file generation failed or file is invalid: {file_path}") + if os.path.exists(file_path): try: os.remove(file_path) catch OSError: pass return None - except NameError as ne: - if 'KPipeline' in str(ne): - print("Error: Kokoro KPipeline is not defined. Was initialization successful?") - return None - else: - print(f"Error during TTS generation (NameError): {ne}") - return None + # Catch potential download/model errors specifically if possible + except requests.exceptions.RequestException as req_err: + print(f"Error during Kokoro TTS generation (Network/Download): {req_err}") + print(" Check internet connection and Hugging Face Hub status (https://status.huggingface.co/).") + traceback.print_exc() + if os.path.exists(file_path): try: os.remove(file_path) catch OSError: pass + return None except Exception as e: print(f"Error during Kokoro TTS generation: {e}") - # Log the text that caused the error if helpful - # print(f"TTS text causing error: {text}") - # Clean up potentially corrupted file - if os.path.exists(file_path): - try: - os.remove(file_path) - except OSError: pass + traceback.print_exc() # Print detailed traceback + if os.path.exists(file_path): try: os.remove(file_path) catch OSError: pass return None # --- Video Effects and Processing --- - +# ... (apply_kenburns_effect - robust version) ... def apply_kenburns_effect(clip, target_resolution, effect_type=None): - """Applies a Ken Burns effect (zoom/pan) to an ImageClip.""" try: target_w, target_h = target_resolution - # Ensure clip is an ImageClip with duration if not isinstance(clip, ImageClip) or clip.duration is None or clip.duration <= 0: print("Warning: Ken Burns effect requires an ImageClip with positive duration.") - # Fallback: just resize and center crop? - return resize_to_fill(clip.set_duration(3.0), target_resolution) # Give it a default duration + return resize_to_fill(clip.set_duration(3.0), target_resolution) - # --- Calculate Resize needed to cover target resolution after zoom/pan --- - # We need the image to be larger than the target frame initially. - # Base scale determines how much larger (e.g., 1.15 means 15% larger) - base_scale = 1.20 # Increased scale for more noticeable effect - initial_w = int(target_w * base_scale) - initial_h = int(target_h * base_scale) - - # Resize the original image clip to this larger initial size, maintaining aspect ratio + base_scale = 1.20 + initial_w, initial_h = int(target_w * base_scale), int(target_h * base_scale) img_aspect = clip.w / clip.h initial_aspect = initial_w / initial_h - - if img_aspect > initial_aspect: # Image wider than needed scale - scaled_h = initial_h - scaled_w = int(scaled_h * img_aspect) - else: # Image taller than needed scale - scaled_w = initial_w - scaled_h = int(scaled_w / img_aspect) - - # Ensure the scaled size is at least the initial target size - scaled_w = max(initial_w, scaled_w) - scaled_h = max(initial_h, scaled_h) - + if img_aspect > initial_aspect: scaled_h, scaled_w = initial_h, int(initial_h * img_aspect) + else: scaled_w, scaled_h = initial_w, int(initial_w / img_aspect) + scaled_w, scaled_h = max(initial_w, scaled_w), max(initial_h, scaled_h) resized_clip = clip.resize(newsize=(scaled_w, scaled_h)) - nw, nh = scaled_w, scaled_h # New dimensions after resize - - # --- Define Ken Burns Parameters --- - max_offset_x = max(0, nw - target_w) - max_offset_y = max(0, nh - target_h) + nw, nh = scaled_w, scaled_h + max_offset_x, max_offset_y = max(0, nw - target_w), max(0, nh - target_h) - # Define effect types and their start/end parameters (positions as fractions of max offset) - effects = { + effects = { # Positions as fractions of max offset "zoom-in": {"start_zoom": 1.0, "end_zoom": base_scale, "start_pos": (0.5, 0.5), "end_pos": (0.5, 0.5)}, "zoom-out": {"start_zoom": base_scale, "end_zoom": 1.0, "start_pos": (0.5, 0.5), "end_pos": (0.5, 0.5)}, "pan-left": {"start_zoom": base_scale, "end_zoom": base_scale, "start_pos": (1.0, 0.5), "end_pos": (0.0, 0.5)}, @@ -828,755 +620,495 @@ def apply_kenburns_effect(clip, target_resolution, effect_type=None): "diag-tl-br": {"start_zoom": base_scale, "end_zoom": base_scale, "start_pos": (0.0, 0.0), "end_pos": (1.0, 1.0)}, "diag-tr-bl": {"start_zoom": base_scale, "end_zoom": base_scale, "start_pos": (1.0, 0.0), "end_pos": (0.0, 1.0)}, } - if effect_type is None or effect_type == "random" or effect_type not in effects: effect_type = random.choice(list(effects.keys())) print(f"Applying Ken Burns effect: {effect_type}") - params = effects[effect_type] - # --- Define the transformation function for fl_image --- def transform_frame(get_frame, t): - # get_frame() might not be needed if using ImageClip directly? - # Let's try operating on the resized_clip's frame directly. - # frame = get_frame(t) # This might get the *original* small frame - # Instead, use the frame from the resized clip at time 0 (it's static) - img_frame = resized_clip.get_frame(0) # Get the numpy array of the resized image - - # Calculate progress ratio (linear for now, can add easing) + img_frame = resized_clip.get_frame(0) ratio = t / clip.duration if clip.duration > 0 else 0 - # Simple easing (optional) - # ratio = 0.5 - 0.5 * math.cos(math.pi * ratio) - - # Interpolate zoom and position current_zoom = params["start_zoom"] + (params["end_zoom"] - params["start_zoom"]) * ratio pos_x = params["start_pos"][0] + (params["end_pos"][0] - params["start_pos"][0]) * ratio pos_y = params["start_pos"][1] + (params["end_pos"][1] - params["start_pos"][1]) * ratio - - # Calculate the size of the crop window based on current zoom - crop_w = int(target_w / current_zoom) - crop_h = int(target_h / current_zoom) - - # Ensure crop window isn't larger than the image itself - crop_w = min(crop_w, nw) - crop_h = min(crop_h, nh) - if crop_w <= 0 or crop_h <= 0: # Avoid division by zero or invalid crops - return cv2.resize(img_frame, (target_w, target_h), interpolation=cv2.INTER_AREA) # Fallback - - # Calculate the center of the crop window based on interpolated position - # Position (pos_x, pos_y) is relative to the maximum possible offset + crop_w, crop_h = int(target_w / current_zoom), int(target_h / current_zoom) + crop_w, crop_h = min(crop_w, nw), min(crop_h, nh) + if crop_w <= 0 or crop_h <= 0: return cv2.resize(img_frame, (target_w, target_h), interpolation=cv2.INTER_AREA) center_x = (nw / 2) + (pos_x - 0.5) * max_offset_x center_y = (nh / 2) + (pos_y - 0.5) * max_offset_y - - # Clamp center position to ensure the crop window stays within bounds - min_center_x = crop_w / 2 - max_center_x = nw - crop_w / 2 - min_center_y = crop_h / 2 - max_center_y = nh - crop_h / 2 - + min_center_x, max_center_x = crop_w / 2, nw - crop_w / 2 + min_center_y, max_center_y = crop_h / 2, nh - crop_h / 2 center_x = max(min_center_x, min(center_x, max_center_x)) center_y = max(min_center_y, min(center_y, max_center_y)) - - # Perform the crop using OpenCV's getRectSubPix - # Needs BGR format if using OpenCV directly on MoviePy frame (which is RGB) - # MoviePy's fl/fl_image handles this conversion internally usually. - # Let's try using the numpy frame directly. cropped_frame = cv2.getRectSubPix(img_frame, (crop_w, crop_h), (center_x, center_y)) - - # Resize the cropped frame to the final target resolution - # Use INTER_LANCZOS4 for potentially better quality resizing resized_output_frame = cv2.resize(cropped_frame, (target_w, target_h), interpolation=cv2.INTER_LANCZOS4) - return resized_output_frame - # Apply the transformation using fl_image (operates on numpy arrays) - # Need to ensure the input clip to .fl has the correct duration set. - # We apply it to the original clip, but the function uses the resized_clip internally. - return clip.fl(transform_frame, apply_to=['mask']) # Apply to mask if needed + # Apply the transformation using fl (which implicitly uses fl_image) + # Ensure the clip has its duration set BEFORE applying .fl + final_effect_clip = clip.set_duration(clip.duration).fl(transform_frame) + resized_clip.close() # Close the intermediate resized clip + return final_effect_clip except Exception as e: print(f"Error applying Ken Burns effect: {e}. Falling back to simple resize.") - # Fallback to simple resize and crop + traceback.print_exc() + if 'resized_clip' in locals() and resized_clip: resized_clip.close() return resize_to_fill(clip, target_resolution) +# ... (resize_to_fill - robust version) ... def resize_to_fill(clip, target_resolution): - """Resizes and crops a clip to fill the target resolution.""" try: target_w, target_h = target_resolution target_aspect = target_w / target_h - # Check if clip has size attributes - if not hasattr(clip, 'w') or not hasattr(clip, 'h') or clip.w is None or clip.h is None or clip.w == 0 or clip.h == 0: - # Attempt to get size from a frame if possible (for ImageClips without explicit size?) - try: + # Ensure clip has valid dimensions + clip_w, clip_h = clip.w, clip.h + if clip_w is None or clip_h is None or clip_w == 0 or clip_h == 0: + try: # Try getting from frame for ImageClips frame = clip.get_frame(0) - clip.w, clip.h = frame.shape[1], frame.shape[0] - print(f"Clip size obtained from frame: {clip.w}x{clip.h}") - if clip.w == 0 or clip.h == 0: raise ValueError("Clip has zero dimension") + clip_h, clip_w, _ = frame.shape + if clip_w == 0 or clip_h == 0: raise ValueError("Clip has zero dimension from frame") except Exception as size_err: print(f"Error: Cannot determine size of the clip for resize_to_fill. Skipping resize. Error: {size_err}") - # Return the clip as is, or maybe a placeholder? - # For now, return original clip, but this might cause issues later. - return clip - - - clip_aspect = clip.w / clip.h + return clip # Return original, might cause errors later - if clip_aspect > target_aspect: - # Clip is wider than target: Resize height to match target, then crop width - new_h = target_h - new_w = int(new_h * clip_aspect) - resized_clip = clip.resize(height=new_h) - # Calculate crop amounts for width + clip_aspect = clip_w / clip_h + if clip_aspect > target_aspect: # Wider + resized_clip = clip.resize(height=target_h) + new_w = resized_clip.w crop_amount = (new_w - target_w) / 2 - # Ensure crop amounts are not negative and within bounds x1 = max(0, math.floor(crop_amount)) x2 = min(new_w, new_w - math.ceil(crop_amount)) - if x1 >= x2: # Avoid invalid crop - print("Warning: Crop dimensions invalid in resize_to_fill (width). Using center.") - x1 = 0; x2 = new_w - final_clip = resized_clip.crop(x1=x1, y1=0, x2=x2, y2=new_h) - - elif clip_aspect < target_aspect: - # Clip is taller than target: Resize width to match target, then crop height - new_w = target_w - new_h = int(new_w / clip_aspect) - resized_clip = clip.resize(width=new_w) - # Calculate crop amounts for height + final_clip = resized_clip.crop(x1=x1, y1=0, x2=x2, y2=target_h) + resized_clip.close() # Close intermediate + elif clip_aspect < target_aspect: # Taller + resized_clip = clip.resize(width=target_w) + new_h = resized_clip.h crop_amount = (new_h - target_h) / 2 y1 = max(0, math.floor(crop_amount)) y2 = min(new_h, new_h - math.ceil(crop_amount)) - if y1 >= y2: # Avoid invalid crop - print("Warning: Crop dimensions invalid in resize_to_fill (height). Using center.") - y1 = 0; y2 = new_h - final_clip = resized_clip.crop(x1=0, y1=y1, x2=new_w, y2=y2) - - else: - # Clip aspect ratio matches target: Just resize + final_clip = resized_clip.crop(x1=0, y1=y1, x2=target_w, y2=y2) + resized_clip.close() # Close intermediate + else: # Same aspect ratio final_clip = clip.resize(newsize=target_resolution) + if final_clip is clip: # If resize didn't return a new object + final_clip = clip.copy() # Avoid modifying original if it was returned + final_clip = final_clip.resize(newsize=target_resolution) - # Ensure the final clip has the exact target size - # This might be needed if rounding caused slight deviations - final_clip = final_clip.resize(newsize=target_resolution) + # Final check and resize to exact target in case of rounding issues + if final_clip.w != target_w or final_clip.h != target_h: + print(f"Final size adjustment needed: ({final_clip.w}x{final_clip.h}) -> ({target_w}x{target_h})") + final_clip = final_clip.resize(newsize=target_resolution) return final_clip except Exception as e: print(f"Error during resize_to_fill: {e}. Returning original clip.") - return clip # Return original clip if resizing fails - + traceback.print_exc() + if 'resized_clip' in locals() and resized_clip: resized_clip.close() + return clip +# ... (add_background_music - robust version) ... def add_background_music(final_video, music_file, bg_music_volume=0.08): - """Adds background music to the final video clip.""" if not music_file or not os.path.exists(music_file): - print("No valid background music file provided or file not found. Skipping.") - return final_video # Return video without added music - - print(f"Adding background music from: {music_file}") + print("No valid background music file provided. Skipping.") + return final_video + print(f"Adding background music from: {os.path.basename(music_file)}") + bg_music = None try: - # Load background music bg_music = AudioFileClip(music_file) - - # Ensure video has an audio track to composite with - if final_video.audio is None: - print("Warning: Video has no audio track. Background music will be the only audio.") - # Create a silent track of the same duration? Or just set bg_music? - # For simplicity, let's just set the bg_music, but adjust volume first. - bg_music = bg_music.volumex(bg_music_volume).subclip(0, final_video.duration) - looped_bg_music = bg_music.loop(duration=final_video.duration) if bg_music.duration < final_video.duration else bg_music.subclip(0, final_video.duration) - final_video = final_video.set_audio(looped_bg_music) - + video_duration = final_video.duration + bg_duration = bg_music.duration + + # Loop or trim bg music + if bg_duration < video_duration: + looped_bg = bg_music.loop(duration=video_duration) + bg_music.close() # Close original + bg_music = looped_bg + print(f"Background music looped to {video_duration:.2f}s") + elif bg_duration > video_duration: + trimmed_bg = bg_music.subclip(0, video_duration) + bg_music.close() # Close original + bg_music = trimmed_bg + print(f"Background music trimmed to {video_duration:.2f}s") + + # Adjust volume + bg_music = bg_music.volumex(bg_music_volume) + + # Composite with video audio + video_audio = final_video.audio + if video_audio: + mixed_audio = CompositeAudioClip([video_audio, bg_music]) + # video_audio.close() # Close original video audio? Maybe not needed. + final_video = final_video.set_audio(mixed_audio) + # mixed_audio.close() # Close composite? + print("Background music mixed with existing audio.") else: - # Loop or trim background music to match video duration - if bg_music.duration < final_video.duration: - # Use loop method for cleaner looping - bg_music = bg_music.loop(duration=final_video.duration) - print(f"Background music looped to match video duration ({final_video.duration:.2f}s)") - else: - bg_music = bg_music.subclip(0, final_video.duration) - print(f"Background music trimmed to match video duration ({final_video.duration:.2f}s)") - - - # Adjust background music volume - bg_music = bg_music.volumex(bg_music_volume) - - # Composite original audio and background music - video_audio = final_video.audio - # Ensure video_audio is not None again just in case - if video_audio: - mixed_audio = CompositeAudioClip([video_audio, bg_music]) - final_video = final_video.set_audio(mixed_audio) - print("Background music mixed with existing audio.") - else: - # Should not happen if checked above, but as a fallback: - print("Warning: Video audio became None unexpectedly. Setting background music as audio.") - final_video = final_video.set_audio(bg_music) - - - # Clean up the temporary bg_music object? MoviePy usually handles this. - # bg_music.close() # Might be needed if issues arise + print("Warning: Video has no audio track. Setting background music as the only audio.") + final_video = final_video.set_audio(bg_music) print("Background music added successfully.") return final_video except Exception as e: print(f"Error adding background music: {e}") + traceback.print_exc() print("Proceeding without background music.") - # Ensure the original audio (if any) is still attached - # final_video = final_video.set_audio(final_video.audio) # Redundant if no error occurred before set_audio - return final_video # Return the original video + return final_video # Return original video + finally: + # Ensure bg_music clip is closed if it exists + if bg_music: + try: bg_music.close() + except Exception as close_err: print(f"Minor error closing bg_music: {close_err}") -# --- Clip Creation --- +# --- Clip Creation (Robust version with more closing) --- def create_clip(media_path, asset_type, tts_path, duration=None, effects=None, narration_text=None, segment_index=0): """Creates a single video segment (clip) with media, audio, and optional captions.""" - try: - print(f"\nCreating clip #{segment_index+1}: Type='{asset_type}', Media='{os.path.basename(media_path)}', TTS='{os.path.basename(tts_path)}'") + print(f"\nCreating clip #{segment_index+1}: Type='{asset_type}', Media='{os.path.basename(media_path)}', TTS='{os.path.basename(tts_path)}'") + clip = None + audio_clip = None + temp_clip_visual = None # To hold intermediate visual clips + try: # --- Validate Inputs --- - if not media_path or not os.path.exists(media_path): - print(f"Error: Media file not found for clip {segment_index+1}: {media_path}") - return None - if not tts_path or not os.path.exists(tts_path): - print(f"Error: TTS audio file not found for clip {segment_index+1}: {tts_path}") - return None - if TARGET_RESOLUTION is None: - print(f"Error: TARGET_RESOLUTION not set for clip {segment_index+1}.") - return None - + if not media_path or not os.path.exists(media_path): raise ValueError(f"Media file not found: {media_path}") + if not tts_path or not os.path.exists(tts_path): raise ValueError(f"TTS audio file not found: {tts_path}") + if TARGET_RESOLUTION is None: raise ValueError("TARGET_RESOLUTION not set.") # --- Load Audio --- - try: - audio_clip = AudioFileClip(tts_path) - # Apply slight fade out to avoid abrupt cuts - audio_clip = audio_clip.audio_fadeout(0.1) - audio_duration = audio_clip.duration - if audio_duration <= 0: - print(f"Warning: TTS audio clip has zero or negative duration ({audio_duration}). Using default 3s.") - audio_duration = 3.0 - # We might need to create a silent clip of this duration if audio_clip is unusable - # For now, just override duration variable. - print(f" Audio duration: {audio_duration:.2f}s") - except Exception as audio_err: - print(f"Error loading audio clip {tts_path}: {audio_err}") - return None - - - # Determine target duration for the visual part (audio + small buffer) - target_duration = audio_duration + 0.2 # Add buffer for transitions/fades - + audio_clip = AudioFileClip(tts_path) + audio_clip = audio_clip.audio_fadeout(0.1) + audio_duration = audio_clip.duration + if audio_duration <= 0: + print(f"Warning: TTS audio clip has zero duration. Using default 3s.") + audio_duration = 3.0 + # Replace audio_clip with silence? Or just adjust target_duration? + # Let's adjust target_duration for now. The silent audio might cause issues. + # audio_clip.close() # Close the invalid audio clip + # audio_clip = # Need a silent clip here if we want audio track + print(f" Audio duration: {audio_duration:.2f}s") + target_duration = audio_duration + 0.2 # Buffer # --- Load Visual Media --- - clip = None if asset_type == "video": - try: - temp_clip = VideoFileClip(media_path, target_resolution=TARGET_RESOLUTION[::-1]) # Provide target res hint - # Resize/Crop video to fill the target resolution - clip = resize_to_fill(temp_clip, TARGET_RESOLUTION) - - # Ensure video duration matches target_duration (loop or cut) - if clip.duration < target_duration: - print(f" Looping video (duration {clip.duration:.2f}s) to match target duration {target_duration:.2f}s") - clip = clip.loop(duration=target_duration) - else: - # Cut video to match target duration (take from the start) - print(f" Subclipping video (duration {clip.duration:.2f}s) to target duration {target_duration:.2f}s") - clip = clip.subclip(0, target_duration) - - # Apply fade in/out to video clip - clip = clip.fadein(0.3).fadeout(0.3) - - # Close the temporary clip to release file handle - temp_clip.close() - - except Exception as video_err: - print(f"Error processing video file {media_path}: {video_err}") - if 'temp_clip' in locals() and temp_clip: temp_clip.close() - return None - + temp_clip_visual = VideoFileClip(media_path, target_resolution=TARGET_RESOLUTION[::-1]) + clip_resized = resize_to_fill(temp_clip_visual, TARGET_RESOLUTION) + temp_clip_visual.close() # Close original video file ASAP + temp_clip_visual = clip_resized # Now temp_clip_visual holds the resized version + + if temp_clip_visual.duration < target_duration: + print(f" Looping video ({temp_clip_visual.duration:.2f}s) -> {target_duration:.2f}s") + clip = temp_clip_visual.loop(duration=target_duration) + temp_clip_visual.close() # Close the clip that was looped + else: + print(f" Subclipping video ({temp_clip_visual.duration:.2f}s) -> {target_duration:.2f}s") + clip = temp_clip_visual.subclip(0, target_duration) + temp_clip_visual.close() # Close the clip that was subclipped + clip = clip.fadein(0.3).fadeout(0.3) elif asset_type == "image": - try: - # ImageClip needs duration set explicitly - temp_clip = ImageClip(media_path).set_duration(target_duration) - # Apply Ken Burns effect (which includes resizing) - clip = apply_kenburns_effect(temp_clip, TARGET_RESOLUTION, effect_type="random") # Use random effect - - # Fades are often handled within Ken Burns or applied after - # clip = clip.fadein(0.3).fadeout(0.3) # Already included in apply_kenburns? Check. If not, add here. - - # No need to close ImageClip explicitly usually - - except Exception as image_err: - print(f"Error processing image file {media_path}: {image_err}") - return None - - else: - print(f"Error: Unsupported asset_type '{asset_type}' for clip {segment_index+1}") - return None - - # Ensure clip was created - if clip is None: - print(f"Error: Visual clip creation failed for {media_path}") - return None + # Create ImageClip first + temp_clip_visual = ImageClip(media_path).set_duration(target_duration) + # Apply Ken Burns (includes resize) + clip = apply_kenburns_effect(temp_clip_visual, TARGET_RESOLUTION, effect_type="random") + # Ken Burns returns a new clip, close the original ImageClip + temp_clip_visual.close() + # Add fades if not included in Ken Burns effect function + # clip = clip.fadein(0.3).fadeout(0.3) # Check if needed - # Set the clip's duration definitively - clip = clip.set_duration(target_duration) + else: raise ValueError(f"Unsupported asset_type '{asset_type}'") + if clip is None: raise ValueError("Visual clip creation failed.") + clip = clip.set_duration(target_duration) # Ensure duration is exact # --- Add Captions (if enabled) --- if narration_text and CAPTION_COLOR != "transparent": print(" Adding captions...") + # (Caption generation code remains the same as the robust version previously provided) + # ... (caption generation logic) ... try: - # Simple word splitting for chunking (adjust words_per_chunk as needed) - words = narration_text.split() - words_per_chunk = 4 # Show fewer words per chunk - max_chars_per_line = 35 # Limit line width roughly - - chunks = [] - current_chunk_words = [] - current_chunk_chars = 0 + words = narration_text.split(); words_per_chunk = 4; max_chars_per_line = 35 + chunks = []; current_chunk_words = []; current_chunk_chars = 0 for word in words: - # Check if adding word exceeds limits if len(current_chunk_words) >= words_per_chunk or \ current_chunk_chars + len(word) + (1 if current_chunk_words else 0) > max_chars_per_line: - if current_chunk_words: # Add previous chunk if not empty - chunks.append(' '.join(current_chunk_words)) - current_chunk_words = [word] # Start new chunk - current_chunk_chars = len(word) + if current_chunk_words: chunks.append(' '.join(current_chunk_words)) + current_chunk_words = [word]; current_chunk_chars = len(word) else: current_chunk_words.append(word) current_chunk_chars += len(word) + (1 if current_chunk_words else 0) + if current_chunk_words: chunks.append(' '.join(current_chunk_words)) - # Add the last chunk - if current_chunk_words: - chunks.append(' '.join(current_chunk_words)) - - if not chunks: - print(" Warning: No caption chunks generated.") - - else: - # Calculate timing for each chunk + if chunks: total_chunks = len(chunks) chunk_duration = audio_duration / total_chunks if total_chunks > 0 else audio_duration subtitle_clips = [] - - # Caption styling (adjust as needed) - fontsize = 40 if TARGET_RESOLUTION[1] >= 1080 else 30 # Adjust based on resolution height - y_position_ratio = 0.80 # Place captions lower (80% down) - caption_width_ratio = 0.85 # Max width relative to screen - + fontsize = 40 if TARGET_RESOLUTION[1] >= 1080 else 30 + y_position_ratio = 0.80; caption_width_ratio = 0.85 subtitle_y_position = int(TARGET_RESOLUTION[1] * y_position_ratio) max_caption_width = int(TARGET_RESOLUTION[0] * caption_width_ratio) for i, chunk_text in enumerate(chunks): start_time = i * chunk_duration - # Ensure end time doesn't exceed the clip's visual duration end_time = min((i + 1) * chunk_duration, target_duration) - # Ensure start time is not after end time if start_time >= end_time: continue - - # Create TextClip for the chunk - # Use 'caption' method for auto-wrapping within size - # Added stroke for better visibility txt_clip = TextClip( - chunk_text, - fontsize=fontsize, - font='Arial-Bold', # Ensure this font is available or choose another - color=CAPTION_COLOR, - bg_color='rgba(0, 0, 0, 0.4)', # Semi-transparent background - method='caption', # Auto-wrap text - align='center', - stroke_color='black', # Black stroke - stroke_width=1.5, - size=(max_caption_width, None) # Set width, height adjusts automatically - ).set_start(start_time).set_duration(end_time - start_time) # Use duration instead of end - - # Position the caption chunk + chunk_text, fontsize=fontsize, font='Arial-Bold', color=CAPTION_COLOR, + bg_color='rgba(0, 0, 0, 0.4)', method='caption', align='center', + stroke_color='black', stroke_width=1.5, size=(max_caption_width, None) + ).set_start(start_time).set_duration(end_time - start_time) txt_clip = txt_clip.set_position(('center', subtitle_y_position)) subtitle_clips.append(txt_clip) - - # Composite the main clip with all subtitle clips if subtitle_clips: - clip = CompositeVideoClip([clip] + subtitle_clips, size=TARGET_RESOLUTION) + # Create the composite clip + composite_clip = CompositeVideoClip([clip] + subtitle_clips, size=TARGET_RESOLUTION) + # Close the original visual clip and the text clips + clip.close() + for txt in subtitle_clips: txt.close() + # Assign the new composite clip back to 'clip' + clip = composite_clip print(f" Added {len(subtitle_clips)} caption chunks.") - else: - print(" No subtitle clips were generated.") - + else: print(" No subtitle clips were generated.") + else: print(" Warning: No caption chunks generated.") except ImportError as ie: - # Specific check for ImageMagick issue if 'Decorator requires' in str(ie) or 'imagemagick' in str(ie).lower(): print("ERROR: TextClip generation failed. ImageMagick might not be configured correctly.") - print(" Try installing ImageMagick and ensuring MoviePy knows its path.") - print(" Skipping captions for this clip.") - else: - print(f"Error during caption generation (ImportError): {ie}. Skipping captions.") + else: print(f"Error during caption generation (ImportError): {ie}.") + print(" Skipping captions for this clip.") except Exception as sub_error: print(f"Error during caption generation: {sub_error}. Skipping captions.") - # Optionally log the traceback here for debugging - import traceback traceback.print_exc() # --- Combine Video and Audio --- - # Ensure audio clip duration doesn't exceed video clip duration if audio_clip.duration > clip.duration: print(f" Warning: Audio duration ({audio_clip.duration:.2f}s) > Video duration ({clip.duration:.2f}s). Trimming audio.") - audio_clip = audio_clip.subclip(0, clip.duration) + trimmed_audio = audio_clip.subclip(0, clip.duration) + audio_clip.close() # Close original audio + audio_clip = trimmed_audio - clip = clip.set_audio(audio_clip) + # Set the final audio + final_clip = clip.set_audio(audio_clip) + # Check if set_audio returned a new object or modified in place + if final_clip is not clip: + clip.close() # Close the visual-only clip if a new one was returned - print(f"Clip #{segment_index+1} created successfully. Duration: {clip.duration:.2f}s") - return clip + print(f"Clip #{segment_index+1} created successfully. Duration: {final_clip.duration:.2f}s") + return final_clip # Return the combined clip except Exception as e: print(f"FATAL Error in create_clip for segment {segment_index+1}: {str(e)}") - # Log traceback for detailed debugging - import traceback traceback.print_exc() - # Clean up partially created clips if possible? - if 'clip' in locals() and clip: - try: clip.close() - except: pass - if 'audio_clip' in locals() and audio_clip: - try: audio_clip.close() - except: pass - + # Ensure resources are released on error + if clip: try: clip.close() catch Exception: pass + if audio_clip: try: audio_clip.close() catch Exception: pass + if temp_clip_visual: try: temp_clip_visual.close() catch Exception: pass + # Re-raise or return None? Returning None to allow skipping the segment. return None + # No finally block needed here as cleanup is handled in except or after successful return -# --- ImageMagick Policy Fix (Attempt) --- +# --- ImageMagick Policy Fix (Attempt - keep as is, it's best-effort) --- def fix_imagemagick_policy(): - # This is OS-dependent and requires sudo. Might not work in all environments (like containers). - # It's generally better to configure ImageMagick properly outside the script if possible. - policy_paths = [ - "/etc/ImageMagick-6/policy.xml", - "/etc/ImageMagick-7/policy.xml", - "/etc/ImageMagick/policy.xml", - "/usr/local/etc/ImageMagick-6/policy.xml", - "/usr/local/etc/ImageMagick-7/policy.xml", - # Add other common paths if needed - ] + # (Keep the previous robust version of this function) + policy_paths = [ "/etc/ImageMagick-6/policy.xml", "/etc/ImageMagick-7/policy.xml", "/etc/ImageMagick/policy.xml", "/usr/local/etc/ImageMagick-6/policy.xml", "/usr/local/etc/ImageMagick-7/policy.xml" ] found_policy = next((path for path in policy_paths if os.path.exists(path)), None) - - if not found_policy: - print("ImageMagick policy.xml not found in common locations. Cannot apply automatic fix.") - print("Text captions might fail if ImageMagick default policy is restrictive.") - return False - - print(f"Found ImageMagick policy file at: {found_policy}") - print("Attempting to modify policy to allow text rendering (requires permissions)...") - - # Check if we can even read the file before trying sudo commands - if not os.access(found_policy, os.R_OK): - print(f"Warning: Cannot read policy file {found_policy}. Check permissions.") - # Don't attempt sudo if we can't even read it. - return False - - # --- Prepare commands --- - # Create backup - backup_cmd = f"sudo cp {found_policy} {found_policy}.bak" - # Allow read/write for MVG/TEXT formats (often restricted) - rights_cmd = f"sudo sed -i.bak 's/rights=\"none\" pattern=\"(MVG|TEXT)\"/rights=\"read|write\" pattern=\"\\1\"/g' {found_policy}" - # Allow read/write for HTTPS coder (sometimes needed for fonts/remote resources) - Use cautiously - # https_cmd = f"sudo sed -i.bak 's/rights=\"none\" pattern=\"HTTPS\"/rights=\"read|write\" pattern=\"HTTPS\"/g' {found_policy}" - # Allow read/write for @* path (often restricted) - Use cautiously - path_cmd = f"sudo sed -i.bak 's///g' {found_policy}" - path_cmd2 = f"sudo sed -i.bak 's/rights=\"none\" pattern=\"@\"/rights=\"read|write\" pattern=\"@\"/g' {found_policy}" # Another variation - - - # --- Execute commands --- - # It's risky to run sudo commands directly from a script. - # Inform the user what needs to be done manually if this fails. - print("\n---") - print("INFO: The script needs to modify ImageMagick's policy.xml for text captions.") - print(f" Attempting to run the following commands with sudo:") - # print(f" 1. {backup_cmd}") # Backup is done by sed -i.bak now - print(f" 1. {rights_cmd}") - # print(f" 3. {https_cmd}") # Maybe skip https unless needed - print(f" 2. {path_cmd}") - print(f" 3. {path_cmd2}") - print(" You might be prompted for your password.") - print(" If this fails, captions may not work. You might need to edit") - print(f" {found_policy} manually to grant read/write rights for TEXT, MVG, and path '@*'.") - print("---\n") - + if not found_policy: print("ImageMagick policy.xml not found. Cannot apply automatic fix."); return False + print(f"Found ImageMagick policy file at: {found_policy}. Attempting modification (requires sudo)...") + if not os.access(found_policy, os.R_OK): print(f"Warning: Cannot read policy file {found_policy}. Check permissions."); return False + + commands = [ + f"sudo sed -i.bak 's/rights=\"none\" pattern=\"(MVG|TEXT)\"/rights=\"read|write\" pattern=\"\\1\"/g' {found_policy}", + f"sudo sed -i.bak 's///g' {found_policy}", + f"sudo sed -i.bak 's/rights=\"none\" pattern=\"@\"/rights=\"read|write\" pattern=\"@\"/g' {found_policy}" + ] + print("\n--- INFO: Attempting to run sudo commands to allow text rendering: ---") + for cmd in commands: print(f" {cmd}") + print("--- You may be prompted for your password. If this fails, captions might not work. ---\n") success = True - # Try executing the commands try: - # print("Running backup command...") - # backup_status = os.system(backup_cmd) - # if backup_status != 0: print("Warning: Failed to create policy backup.") # Don't stop yet - - print("Running rights modification...") - rights_status = os.system(rights_cmd) - if rights_status != 0: - print("ERROR: Failed to modify coder rights in policy.xml.") - success = False - - # print("Running HTTPS coder modification...") - # https_status = os.system(https_cmd) - # if https_status != 0: print("Warning: Failed to modify HTTPS coder rights.") - - print("Running path modification 1...") - path_status = os.system(path_cmd) - if path_status != 0: print("Warning: Failed to modify path rights (attempt 1).") # Try second one - - print("Running path modification 2...") - path_status2 = os.system(path_status2) - if path_status2 != 0: print("Warning: Failed to modify path rights (attempt 2).") - - # Check if at least rights command succeeded - if success: - print("ImageMagick policy modification attempted.") - print("Please restart the application if captions still fail.") - return True - else: - print("Failed to apply necessary ImageMagick policy changes.") - return False - + for i, cmd in enumerate(commands): + print(f"Running command {i+1}...") + status = os.system(cmd) + if status != 0: + print(f"ERROR: Command failed with status {status}.") + # Mark as failed but continue trying others? Or stop? Let's mark and continue. + success = False + if success: print("ImageMagick policy modification attempted successfully.") + else: print("Warning: One or more ImageMagick policy modifications failed.") + return success except Exception as e: - print(f"Error attempting to modify ImageMagick policies: {e}") - print("Please check permissions and try modifying the policy file manually.") - return False + print(f"Error attempting to modify ImageMagick policies: {e}"); return False + -# --- Main Video Generation Function --- -# Updated signature to accept new parameters +# --- Main Video Generation Function (Added warning for partial success) --- def generate_video_from_script(script, resolution, caption_option, music_file, fps, preset, video_probability): global TARGET_RESOLUTION, CAPTION_COLOR, TEMP_FOLDER - - # --- Setup --- start_time = time.time() print("\n--- Starting Video Generation ---") - print(f"Resolution: {resolution}, Captions: {caption_option}, FPS: {fps}, Preset: {preset}, Video Prob: {video_probability:.2f}") - if music_file: print(f"Music File: {os.path.basename(music_file)}") - - # Set Resolution - if resolution == "Full": # 16:9 Landscape - TARGET_RESOLUTION = (1920, 1080) - elif resolution == "Short": # 9:16 Portrait - TARGET_RESOLUTION = (1080, 1920) - else: # Default to Full HD - TARGET_RESOLUTION = (1920, 1080) - print(f"Target Resolution set to: {TARGET_RESOLUTION[0]}x{TARGET_RESOLUTION[1]}") - - # Set Caption Color + # ... (Setup: Resolution, Caption Color, Temp Folder - same as before) ... + if resolution == "Full (1920x1080)": TARGET_RESOLUTION = (1920, 1080) + elif resolution == "Short (1080x1920)": TARGET_RESOLUTION = (1080, 1920) + else: TARGET_RESOLUTION = (1080, 1920) # Default to short CAPTION_COLOR = "white" if caption_option == "Yes" else "transparent" - - # Create Temporary Folder try: TEMP_FOLDER = tempfile.mkdtemp(prefix="aivideo_") print(f"Temporary folder created: {TEMP_FOLDER}") - except Exception as e: - print(f"Error creating temporary folder: {e}") - return None # Cannot proceed without temp folder + except Exception as e: print(f"Error creating temp folder: {e}"); return None - # Attempt ImageMagick fix (optional, based on caption need) - if CAPTION_COLOR != "transparent": - fix_success = fix_imagemagick_policy() - if not fix_success: - print("Warning: ImageMagick policy fix failed or was skipped. Captions might not render correctly.") - # Proceed anyway, TextClip might work or fail gracefully + if CAPTION_COLOR != "transparent": fix_imagemagick_policy() - # --- Script Parsing --- print("Parsing script...") elements = parse_script(script) - if not elements: - print("Error: Failed to parse script into elements. Cannot generate video.") - shutil.rmtree(TEMP_FOLDER) - return None - - # Pair media prompts with TTS elements + if not elements: print("Error: Failed to parse script."); shutil.rmtree(TEMP_FOLDER); return None paired_elements = [] for i in range(0, len(elements), 2): if i + 1 < len(elements) and elements[i]['type'] == 'media' and elements[i+1]['type'] == 'tts': paired_elements.append((elements[i], elements[i + 1])) - else: - print(f"Warning: Skipping mismatched or incomplete element pair at index {i}.") - - - if not paired_elements: - print("Error: No valid media/TTS pairs found after parsing. Cannot generate video.") - shutil.rmtree(TEMP_FOLDER) - return None + else: print(f"Warning: Skipping mismatched element pair at index {i}.") + if not paired_elements: print("Error: No valid media/TTS pairs found."); shutil.rmtree(TEMP_FOLDER); return None print(f"Successfully parsed {len(paired_elements)} segments.") - - # --- Generate Clips for Each Segment --- clips = [] + processed_clips_ R = [] # Keep track of successfully processed clips for cleanup total_segments = len(paired_elements) + successful_segments = 0 + for idx, (media_elem, tts_elem) in enumerate(paired_elements): segment_start_time = time.time() print(f"\n--- Processing Segment {idx+1}/{total_segments} ---") - print(f" Prompt: '{media_elem['prompt']}'") - print(f" Narration: '{tts_elem['text'][:60]}...'") - - # 1. Generate Media (Video or Image) - # Pass the video_probability from the UI - media_asset = generate_media( - media_elem['prompt'], - video_probability=video_probability, - current_index=idx, - total_segments=total_segments - ) - if not media_asset or not media_asset.get('path'): - print(f"Error: Failed to generate media for segment {idx+1}. Skipping segment.") - continue # Skip this segment + # ... (1. Generate Media - same as before) ... + media_asset = generate_media(media_elem['prompt'], video_probability, idx, total_segments) + if not media_asset or not media_asset.get('path'): print(f"Error: Failed media for segment {idx+1}. Skipping."); continue - # 2. Generate TTS Audio - tts_path = generate_tts(tts_elem['text'], tts_elem['voice']) + # ... (2. Generate TTS - uses fixed function) ... + tts_path = generate_tts(tts_elem['text'], tts_elem['voice']) # voice ignored now if not tts_path: - print(f"Error: Failed to generate TTS for segment {idx+1}. Skipping segment.") - # Clean up downloaded media if TTS failed? - if media_asset and os.path.exists(media_asset['path']): - try: os.remove(media_asset['path']) - except OSError: pass - continue # Skip this segment - - # 3. Create the MoviePy Clip (combines media, tts, captions) + print(f"Error: Failed TTS for segment {idx+1}. Skipping.") + if media_asset and os.path.exists(media_asset['path']): try: os.remove(media_asset['path']) catch OSError: pass + continue + + # ... (3. Create Clip - uses robust function) ... clip = create_clip( - media_path=media_asset['path'], - asset_type=media_asset['asset_type'], - tts_path=tts_path, - # Duration is determined by TTS inside create_clip now - # duration=tts_elem['duration'], # Pass estimated duration? Or let create_clip handle it? - effects=media_elem.get('effects'), # Pass effects if used later - narration_text=tts_elem['text'], # Pass text for captions - segment_index=idx + media_path=media_asset['path'], asset_type=media_asset['asset_type'], tts_path=tts_path, + narration_text=tts_elem['text'], segment_index=idx ) if clip: clips.append(clip) + processed_clips_R.append(clip) # Add to list for later closing + successful_segments += 1 segment_end_time = time.time() print(f"Segment {idx+1} processed in {segment_end_time - segment_start_time:.2f} seconds.") else: - print(f"Error: Clip creation failed for segment {idx+1}. Skipping segment.") - # Clean up generated files for this segment if clip creation failed - if media_asset and os.path.exists(media_asset['path']): - try: os.remove(media_asset['path']) - except OSError: pass - if tts_path and os.path.exists(tts_path): - try: os.remove(tts_path) - except OSError: pass - continue # Skip failed segment + print(f"Error: Clip creation failed for segment {idx+1}. Skipping.") + # Clean up files for this segment + if media_asset and os.path.exists(media_asset['path']): try: os.remove(media_asset['path']) catch OSError: pass + if tts_path and os.path.exists(tts_path): try: os.remove(tts_path) catch OSError: pass + continue # --- Final Video Assembly --- + final_video = None # Initialize final_video if not clips: print("Error: No clips were successfully created. Cannot generate final video.") + # No clips, but TEMP_FOLDER might have remnants if media/TTS downloaded but clip failed if os.path.exists(TEMP_FOLDER): shutil.rmtree(TEMP_FOLDER) - return None + return None # Return None if no clips + + # --- Add Warning if segments were skipped --- + if successful_segments < total_segments: + print(f"\nWARNING: Only {successful_segments} out of {total_segments} segments were successfully processed.") + # Also send this warning to Gradio interface later print(f"\nSuccessfully created {len(clips)} clips. Concatenating...") try: - # Concatenate all the generated clips - final_video = concatenate_videoclips(clips, method="compose") # 'compose' is often safer + final_video = concatenate_videoclips(clips, method="compose") print("Clips concatenated.") - # Add Background Music (if provided) - if music_file: - final_video = add_background_music(final_video, music_file, bg_music_volume=0.08) # Fixed volume for now + # Close individual clips after concatenation + print("Closing individual segment clips...") + for c in processed_clips_R: # Use the list of successfully created clips + try: c.close() + except Exception as e: print(f"Minor error closing a segment clip: {e}") - # --- Export Final Video --- - output_path = OUTPUT_VIDEO_FILENAME # Use the global default or customize - print(f"Exporting final video to '{output_path}'...") - print(f" Codec: libx264, FPS: {fps}, Preset: {preset}") + # Add Background Music + if music_file: + final_video = add_background_music(final_video, music_file, bg_music_volume=0.08) - # Write the video file with user-defined fps and preset - # Add audio_codec, threads, ffmpeg_params if needed + # Export Final Video + output_path = OUTPUT_VIDEO_FILENAME + print(f"Exporting final video to '{output_path}' (FPS: {fps}, Preset: {preset})...") final_video.write_videofile( - output_path, - codec='libx264', - audio_codec='aac', # Common audio codec - fps=fps, - preset=preset, - threads=os.cpu_count() or 4, # Use available cores - logger='bar' # Show progress bar - # ffmpeg_params=['-crf', '23'] # Example: Constant Rate Factor for quality/size tradeoff + output_path, codec='libx264', audio_codec='aac', + fps=fps, preset=preset, threads=os.cpu_count() or 4, logger='bar' ) export_end_time = time.time() print(f"\nFinal video saved successfully as '{output_path}'") print(f"Total generation time: {export_end_time - start_time:.2f} seconds.") - # --- Cleanup --- - # Close clips to release resources (important!) - print("Closing clips...") - for clip in clips: - try: clip.close() - except Exception as e: print(f"Minor error closing a clip: {e}") - try: - if final_video: final_video.close() - except Exception as e: print(f"Minor error closing final video object: {e}") - - # Remove temporary folder - print(f"Removing temporary folder: {TEMP_FOLDER}") - try: - shutil.rmtree(TEMP_FOLDER) - print("Temporary files removed.") - except Exception as e: - print(f"Warning: Could not remove temporary folder {TEMP_FOLDER}: {e}") - - - return output_path # Return the path to the generated video + # Return path AFTER cleanup + return output_path except Exception as e: print(f"FATAL Error during final video assembly or export: {e}") - # Log traceback - import traceback traceback.print_exc() - # Cleanup attempt - print("Attempting cleanup after error...") - for clip in clips: # Close any clips created - try: clip.close() - except: pass - if 'final_video' in locals() and final_video: - try: final_video.close() - except: pass - if os.path.exists(TEMP_FOLDER): + return None # Indicate failure + finally: + # --- Cleanup --- + print("Final cleanup...") + # Close final video object if it exists + if final_video: + try: final_video.close() + except Exception as e: print(f"Minor error closing final video object: {e}") + # Ensure individual clips are closed (redundant if done above, but safe) + for c in processed_clips_R: + if hasattr(c, 'close') and callable(c.close): + try: c.close() + except Exception: pass # Ignore errors here + # Remove temporary folder + if TEMP_FOLDER and os.path.exists(TEMP_FOLDER): + print(f"Removing temporary folder: {TEMP_FOLDER}") try: shutil.rmtree(TEMP_FOLDER) - print("Temporary folder removed after error.") - except Exception as clean_e: - print(f"Warning: Could not remove temp folder {TEMP_FOLDER} after error: {clean_e}") - return None + print("Temporary files removed.") + except Exception as e: + print(f"Warning: Could not remove temporary folder {TEMP_FOLDER}: {e}") -# --- Gradio Blocks Interface --- +# --- Gradio Blocks Interface (Added Warning for partial success) --- with gr.Blocks(title="AI Documentary Video Generator", theme=gr.themes.Soft()) as demo: gr.Markdown("# Create a Funny AI Documentary Video") - gr.Markdown("Enter a concept -> Generate Script -> Edit Script (Optional) -> Configure Options -> Generate Video!") - gr.Markdown("**Note**: Generation can take several minutes, especially with many segments or high resolution.") + gr.Markdown("Enter concept -> Generate Script -> Edit (Optional) -> Configure -> Generate Video!") + gr.Markdown("**Note**: Generation can take several minutes. Check console for progress & errors.") with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 1. Concept & Script") - concept = gr.Textbox(label="Video Concept/Topic", placeholder="e.g., 'The secret life of squirrels', 'Why Mondays exist', 'Historical events reimagined by cats'") + concept = gr.Textbox(label="Video Concept/Topic", placeholder="e.g., 'The secret life of squirrels'") generate_script_btn = gr.Button("📝 Generate Script", variant="secondary") - script = gr.Textbox(label="Script (Edit if needed)", lines=15, placeholder="Generated script will appear here. Follow the [Title] Narration format per line.", interactive=True) + script = gr.Textbox(label="Script (Edit if needed)", lines=15, placeholder="Generated script...", interactive=True) with gr.Column(scale=1): gr.Markdown("### 2. Video Options") + # Clearer labels for resolution resolution = gr.Radio(["Full (1920x1080)", "Short (1080x1920)"], label="Resolution", value="Short (1080x1920)") - captions = gr.Radio(["Yes", "No"], label="Add Captions?", value="Yes") # Default to Yes + captions = gr.Radio(["Yes", "No"], label="Add Captions?", value="Yes") music = gr.Audio(label="Background Music (Optional)", type="filepath") gr.Markdown("### 3. Advanced Settings") - # Added FPS Slider - fps_slider = gr.Slider(minimum=15, maximum=60, step=1, value=30, label="Output FPS", info="Frames per second for the video.") - # Added Preset Dropdown + fps_slider = gr.Slider(minimum=15, maximum=60, step=1, value=30, label="Output FPS") preset_dropdown = gr.Dropdown( ["ultrafast", "superfast", "veryfast", "faster", "fast", "medium", "slow", "slower", "veryslow"], - value="veryfast", # Faster preset suitable for previews - label="Encoding Preset", - info="Faster presets = quicker encoding, larger file size, lower quality. Slower presets = vice versa." + value="veryfast", label="Encoding Preset", info="Faster=Quicker, Larger File; Slower=Vice Versa" ) - # Added Video Probability Slider video_prob_slider = gr.Slider( minimum=0.0, maximum=1.0, step=0.05, value=0.45, - label="Video Clip Probability", - info="Chance (0% to 100%) to use a video clip instead of an image for each segment." + label="Video Clip Probability", info="Chance (0-1) to use video per segment" ) gr.Markdown("### 4. Generate") @@ -1584,80 +1116,76 @@ with gr.Blocks(title="AI Documentary Video Generator", theme=gr.themes.Soft()) a with gr.Row(): video_output = gr.Video(label="Generated Video", interactive=False) - + status_message = gr.Markdown("") # To display warnings/errors # --- Gradio Event Handlers --- - - # Function to call when 'Generate Script' is clicked def on_generate_script(concept_text): - if not concept_text: - gr.Warning("Please enter a video concept.") - return "" - gr.Info("Generating script... This may take a moment.") + if not concept_text: return gr.update(value="", placeholder="Please enter a concept first."), gr.Markdown("⚠️ Please enter a video concept.") + status_update = gr.Markdown("⏳ Generating script...") script_text = generate_script(concept_text) if script_text and "Error:" not in script_text: - gr.Info("Script generated successfully!") + final_status = gr.Markdown("✅ Script generated!") + return gr.update(value=script_text), final_status elif script_text and "Error:" in script_text: - gr.Error(f"Script generation failed: {script_text}") + final_status = gr.Markdown(f"❌ Script Error: {script_text}") + return gr.update(value=""), final_status # Clear script box on error else: - gr.Error("Script generation failed. Check logs or API keys.") - return script_text if script_text else "Failed to generate script. Check console logs." + final_status = gr.Markdown("❌ Script generation failed. Check logs.") + return gr.update(value=""), final_status - # Function to call when 'Generate Video' is clicked - # Updated signature to accept new inputs def on_generate_video(script_text, resolution_choice, captions_choice, music_path, fps, preset, video_probability): - if not script_text or "[Error]" in script_text or "Failed to generate script" in script_text: - gr.Error("Cannot generate video. Please generate a valid script first.") - return None - if not PEXELS_API_KEY or 'YOUR_PEXELS_API_KEY' in PEXELS_API_KEY: - gr.Error("Pexels API Key is not configured correctly. Cannot fetch media.") - return None - if not OPENROUTER_API_KEY or 'YOUR_API_KEY' in OPENROUTER_API_KEY: - gr.Error("OpenRouter API Key is not configured correctly (needed for script generation fallback/checks).") - # Allow proceeding but warn? Or stop? Let's stop for now. - return None + if not script_text or "Error:" in script_text or "Failed to generate script" in script_text: + return None, gr.Markdown("❌ Cannot generate video. Please generate a valid script first.") + # Basic API Key check before starting + if not PEXELS_API_KEY or 'BhJqbcdm9Vi90KqzXKAhnEHGsuFNv4irXuOjWtT761U49lRzo03qBGna' in PEXELS_API_KEY: + return None, gr.Markdown("❌ Pexels API Key missing/invalid. Cannot fetch media.") + # Kokoro check + if pipeline is None: + return None, gr.Markdown("❌ Kokoro TTS failed to initialize. Cannot generate audio. Check console.") - gr.Info("Starting video generation... This may take several minutes. Please wait.") - print(f"Received parameters: Res='{resolution_choice}', Caps='{captions_choice}', Music='{music_path}', FPS={fps}, Preset='{preset}', VidProb={video_probability}") + status_update = gr.Markdown("⏳ Starting video generation... This may take several minutes. Please wait and check console logs.") + yield None, status_update # Update status immediately - # Extract resolution format (e.g., "Full" or "Short") - resolution_format = resolution_choice.split(" ")[0] + # Store original segment count + original_segment_count = len(parse_script(script_text)) // 2 if parse_script(script_text) else 0 video_path = generate_video_from_script( - script_text, - resolution_format, - captions_choice, - music_path, - fps, # Pass FPS - preset, # Pass preset - video_probability # Pass video probability + script_text, resolution_choice, captions_choice, music_path, + fps, preset, video_probability ) + final_status = "" if video_path and os.path.exists(video_path): - gr.Info("Video generated successfully!") - return video_path + final_status = "✅ Video generated successfully!" + # Check if segments were skipped by comparing final clip count to original estimate + # (This requires parsing the script again or getting the count from generate_video_from_script) + # For simplicity, we'll rely on console warnings for now, but could add check here. + # Example check (needs refinement): + # final_clip_count = ... # Need way to get this back from generate_video_from_script + # if final_clip_count < original_segment_count: + # final_status += f" (Warning: {original_segment_count - final_clip_count} segments failed)" + + yield video_path, gr.Markdown(final_status) else: - gr.Error("Video generation failed. Please check the console logs for errors.") - return None + final_status = "❌ Video generation failed. Please check the console logs for errors." + yield None, gr.Markdown(final_status) + # Connect buttons to functions generate_script_btn.click( fn=on_generate_script, inputs=[concept], - outputs=[script], - api_name="generate_script" # Optional: for API access + outputs=[script, status_message], # Update script box and status message + api_name="generate_script" ) generate_video_btn.click( fn=on_generate_video, - # Add the new sliders/dropdowns to the inputs list inputs=[script, resolution, captions, music, fps_slider, preset_dropdown, video_prob_slider], - outputs=[video_output], - api_name="generate_video" # Optional: for API access + outputs=[video_output, status_message], # Update video output and status message + api_name="generate_video" ) # Launch the Gradio app -# share=True creates a public link (use with caution regarding API keys if hardcoded) -# debug=True provides more detailed logs in the console demo.queue().launch(share=True, debug=True) \ No newline at end of file