diff --git "a/app.py" "b/app.py"
--- "a/app.py"
+++ "b/app.py"
@@ -1,106 +1,86 @@
+
# Import necessary libraries
from kokoro import KPipeline
+
import soundfile as sf
import torch
-# Removed duplicate import of soundfile as sf
+
+import soundfile as sf
import os
-from moviepy.editor import (
- VideoFileClip, AudioFileClip, ImageClip, concatenate_videoclips,
- CompositeVideoClip, TextClip, CompositeAudioClip # Added CompositeAudioClip
-)
+from moviepy.editor import VideoFileClip, AudioFileClip, ImageClip
from PIL import Image
import tempfile
import random
import cv2
import math
-import requests, io, time, re
-# Removed duplicate import of random
+import os, requests, io, time, re, random
+from moviepy.editor import (
+ VideoFileClip, concatenate_videoclips, AudioFileClip, ImageClip,
+ CompositeVideoClip, TextClip, CompositeAudioClip
+)
import gradio as gr
import shutil
-# Removed duplicate import of os
+import os
import moviepy.video.fx.all as vfx
import moviepy.config as mpy_config
from pydub import AudioSegment
from pydub.generators import Sine
-# Removed duplicate import of Image, ImageDraw, ImageFont
+
+from PIL import Image, ImageDraw, ImageFont
import numpy as np
from bs4 import BeautifulSoup
import base64
from urllib.parse import quote
import pysrt
from gtts import gTTS
-# Removed duplicate import of gradio as gr
-import traceback # For detailed error printing
+import gradio as gr # Import Gradio
# Initialize Kokoro TTS pipeline (using American English)
-try:
- pipeline = KPipeline(lang_code='a') # Use voice 'af_heart' for American English
- print("Kokoro TTS pipeline initialized.")
-except Exception as e:
- print(f"FATAL ERROR: Could not initialize Kokoro TTS pipeline: {e}")
- pipeline = None # Set pipeline to None if initialization fails
-
+pipeline = KPipeline(lang_code='a') # Use voice 'af_heart' for American English
# Ensure ImageMagick binary is set
-try:
- # Common paths, adjust if necessary for your environment
- imagemagick_paths = ["/usr/bin/convert", "/usr/local/bin/convert", "/opt/homebrew/bin/convert"]
- found_path = None
- for path in imagemagick_paths:
- if os.path.exists(path):
- found_path = path
- break
- if found_path:
- mpy_config.change_settings({"IMAGEMAGICK_BINARY": found_path})
- print(f"ImageMagick binary set successfully to: {found_path}")
- else:
- print("Warning: Could not find ImageMagick 'convert' binary in common paths.")
- print("TextClip functionality might be limited if ImageMagick is not found or configured.")
-except Exception as e:
- print(f"Warning: Could not set ImageMagick binary automatically: {e}")
- print("TextClip functionality might be limited if ImageMagick is not found.")
-
+mpy_config.change_settings({"IMAGEMAGICK_BINARY": "/usr/bin/convert"})
# ---------------- Global Configuration ---------------- #
-# !!! IMPORTANT: Replace placeholders with your actual API keys !!!
PEXELS_API_KEY = 'BhJqbcdm9Vi90KqzXKAhnEHGsuFNv4irXuOjWtT761U49lRzo03qBGna'
OPENROUTER_API_KEY = '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'
-OPENROUTER_MODEL = "mistralai/mistral-small-3.1-24b-instruct:free" # Or choose another model
+OPENROUTER_MODEL = "mistralai/mistral-small-3.1-24b-instruct:free"
OUTPUT_VIDEO_FILENAME = "final_video.mp4"
-USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36" # Updated User Agent
-
-# --- Check if API keys are set ---
-if PEXELS_API_KEY == "YOUR_PEXELS_API_KEY_HERE":
- print("WARNING: PEXELS_API_KEY is not set. Please set the environment variable or replace the placeholder in the script.")
-if OPENROUTER_API_KEY == "YOUR_OPENROUTER_API_KEY_HERE":
- print("WARNING: OPENROUTER_API_KEY is not set. Please set the environment variable or replace the placeholder in the script.")
-
-
-# Additional global variables needed for the Gradio interface (defaults)
-selected_voice = 'af_heart'
-voice_speed = 0.9
-font_size = 45
-video_clip_probability = 0.25
-bg_music_volume = 0.08
-fps = 30
-preset = "veryfast"
+USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
+
+
+
+# Additional global variables needed for the Gradio interface
+selected_voice = 'af_heart' # Default voice
+voice_speed = 0.9 # Default voice speed
+font_size = 45 # Default font size
+video_clip_probability = 0.25 # Default probability for video clips
+bg_music_volume = 0.08 # Default background music volume
+fps = 30 # Default FPS
+preset = "veryfast" # Default preset
TARGET_RESOLUTION = None
CAPTION_COLOR = None
TEMP_FOLDER = None
# ---------------- Helper Functions ---------------- #
+# (Your existing helper functions remain unchanged: generate_script, parse_script,
+# search_pexels_videos, search_pexels_images, search_google_images, download_image,
+# download_video, generate_media, generate_tts, apply_kenburns_effect,
+# resize_to_fill, find_mp3_files, add_background_music, create_clip,
+# fix_imagemagick_policy)
+
+# Define these globally as they were in your original code but will be set per run
+TARGET_RESOLUTION = None
+CAPTION_COLOR = None
+TEMP_FOLDER = None
def generate_script(user_input):
"""Generate documentary script with proper OpenRouter handling."""
- if not OPENROUTER_API_KEY or OPENROUTER_API_KEY == "YOUR_OPENROUTER_API_KEY_HERE":
- print("ERROR: OpenRouter API Key is missing or still a placeholder.")
- return None
-
headers = {
'Authorization': f'Bearer {OPENROUTER_API_KEY}',
- 'HTTP-Referer': 'https://github.com/your-repo', # Optional: Replace with your repo/domain
- 'X-Title': 'AI Documentary Maker' # Optional
+ 'HTTP-Referer': 'https://your-domain.com',
+ 'X-Title': 'AI Documentary Maker'
}
prompt = f"""Short Documentary Script GeneratorInstructions:
@@ -187,7 +167,7 @@ Now here is the Topic/scrip: {user_input}
'model': OPENROUTER_MODEL,
'messages': [{'role': 'user', 'content': prompt}],
'temperature': 0.4,
- 'max_tokens': 1000 # Reduced max tokens slightly
+ 'max_tokens': 5000
}
try:
@@ -195,41 +175,22 @@ Now here is the Topic/scrip: {user_input}
'https://openrouter.ai/api/v1/chat/completions',
headers=headers,
json=data,
- timeout=60 # Increased timeout
+ timeout=30
)
- response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx)
-
- response_data = response.json()
- if 'choices' in response_data and len(response_data['choices']) > 0 and 'message' in response_data['choices'][0] and 'content' in response_data['choices'][0]['message']:
- script_content = response_data['choices'][0]['message']['content'].strip()
- if not script_content:
- print("Warning: API returned an empty script.")
- return None
- # Basic format check
- if '[' not in script_content or ']' not in script_content:
- print(f"Warning: Generated script might lack proper formatting (missing '[' or ']'):\n{script_content[:200]}...")
- return script_content
+ if response.status_code == 200:
+ response_data = response.json()
+ if 'choices' in response_data and len(response_data['choices']) > 0:
+ return response_data['choices'][0]['message']['content']
+ else:
+ print("Unexpected response format:", response_data)
+ return None
else:
- print("Unexpected API response format:", response_data)
+ print(f"API Error {response.status_code}: {response.text}")
return None
- except requests.exceptions.Timeout:
- print("API request timed out.")
- return None
- except requests.exceptions.RequestException as e:
- print(f"API request failed: {e}")
- # Print detailed error if available (e.g., from response text)
- if hasattr(e, 'response') and e.response is not None:
- print(f"Response status: {e.response.status_code}")
- try:
- print(f"Response body: {e.response.json()}")
- except ValueError: # If response is not JSON
- print(f"Response body: {e.response.text}")
- return None
except Exception as e:
- print(f"An unexpected error occurred during script generation: {e}")
- traceback.print_exc()
+ print(f"Request failed: {str(e)}")
return None
def parse_script(script_text):
@@ -243,1972 +204,769 @@ def parse_script(script_text):
current_title = None
current_text = ""
- if not script_text:
- print("Error: Received empty script text for parsing.")
- return []
-
try:
- lines = script_text.strip().splitlines()
- for line in lines:
+ for line in script_text.splitlines():
line = line.strip()
- if not line: # Skip empty lines
- continue
-
- # Regex to capture title in brackets and the following text on the same line
- match = re.match(r'^\s*\[([^\]]+)\](.*)', line)
- if match:
- # If we were processing a previous title, save it
- if current_title is not None and current_text:
- sections[current_title] = current_text.strip()
-
- current_title = match.group(1).strip()
- # Ensure title is not empty
- if not current_title:
- print(f"Warning: Found empty title '[]' in script line: '{line}'. Skipping.")
- current_title = None # Reset title
- current_text = ""
- continue
-
- current_text = match.group(2).strip() + " " # Start text for the new title
- elif current_title is not None:
- # Append line to the current text if it doesn't start a new section
- current_text += line + " "
-
- # Add the last section after the loop ends
- if current_title is not None and current_text:
+ if line.startswith("[") and "]" in line:
+ bracket_start = line.find("[")
+ bracket_end = line.find("]", bracket_start)
+ if bracket_start != -1 and bracket_end != -1:
+ if current_title is not None:
+ sections[current_title] = current_text.strip()
+ current_title = line[bracket_start+1:bracket_end]
+ current_text = line[bracket_end+1:].strip()
+ elif current_title:
+ current_text += line + " "
+
+ if current_title:
sections[current_title] = current_text.strip()
elements = []
for title, narration in sections.items():
- narration = narration.strip() # Ensure no leading/trailing whitespace
if not title or not narration:
- print(f"Warning: Skipping empty title ('{title}') or narration ('{narration}')")
continue
- media_element = {"type": "media", "prompt": title, "effects": "random"} # Use random effect
+ media_element = {"type": "media", "prompt": title, "effects": "fade-in"}
words = narration.split()
- # Simple duration estimate: ~0.5 seconds per word, minimum 3 seconds
- duration = max(3.0, len(words) * 0.5)
- tts_element = {"type": "tts", "text": narration, "voice": "en", "duration": duration} # 'en' is placeholder, actual voice set globally
+ duration = max(3, len(words) * 0.5)
+ tts_element = {"type": "tts", "text": narration, "voice": "en", "duration": duration}
elements.append(media_element)
elements.append(tts_element)
- if not elements:
- print("Warning: Script parsing resulted in no elements. Check script format.")
return elements
except Exception as e:
print(f"Error parsing script: {e}")
- print(f"Problematic script text snippet: {script_text[:200]}") # Log part of the script
- traceback.print_exc()
return []
-
def search_pexels_videos(query, pexels_api_key):
- """Search for a video on Pexels by query and return a random HD/SD video."""
- if not pexels_api_key or pexels_api_key == "YOUR_PEXELS_API_KEY_HERE":
- print("ERROR: Pexels API key is missing or still a placeholder. Cannot search for videos.")
- return None
+ """Search for a video on Pexels by query and return a random HD video."""
headers = {'Authorization': pexels_api_key}
base_url = "https://api.pexels.com/videos/search"
- num_pages = 2 # Search first 2 pages is usually enough
+ num_pages = 3
videos_per_page = 15
- max_retries = 2 # Fewer retries
- retry_delay = 2 # Start with 2 seconds delay
+ max_retries = 3
+ retry_delay = 1
search_query = query
all_videos = []
- print(f"Searching Pexels videos for: '{query}'")
for page in range(1, num_pages + 1):
- # Prefer landscape orientation for standard video
- orient = "landscape" if TARGET_RESOLUTION and TARGET_RESOLUTION[0] > TARGET_RESOLUTION[1] else "portrait"
- params = {"query": search_query, "per_page": videos_per_page, "page": page, "orientation": orient}
for attempt in range(max_retries):
try:
- response = requests.get(base_url, headers=headers, params=params, timeout=15) # Reasonable timeout
-
- response.raise_for_status() # Check for 4xx/5xx errors
+ params = {"query": search_query, "per_page": videos_per_page, "page": page}
+ response = requests.get(base_url, headers=headers, params=params, timeout=10)
+
+ if response.status_code == 200:
+ data = response.json()
+ videos = data.get("videos", [])
+
+ if not videos:
+ print(f"No videos found on page {page}.")
+ break
+
+ for video in videos:
+ video_files = video.get("video_files", [])
+ for file in video_files:
+ if file.get("quality") == "hd":
+ all_videos.append(file.get("link"))
+ break
+
+ break
+
+ elif response.status_code == 429:
+ print(f"Rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay} seconds...")
+ time.sleep(retry_delay)
+ retry_delay *= 2
+ else:
+ print(f"Error fetching videos: {response.status_code} {response.text}")
+ if attempt < max_retries - 1:
+ print(f"Retrying in {retry_delay} seconds...")
+ time.sleep(retry_delay)
+ retry_delay *= 2
+ else:
+ break
- data = response.json()
- videos = data.get("videos", [])
-
- if not videos:
- # print(f"No videos found on page {page} for '{query}'.") # Less verbose
- break # Stop searching pages if one is empty
-
- for video in videos:
- video_files = video.get("video_files", [])
- # Prefer HD, then SD if HD not found
- hd_link = None
- sd_link = None
- for file in video_files:
- if file.get("quality") == "hd" and file.get("link") and file.get('width', 0) > 1000: # Basic check for decent HD
- hd_link = file.get("link")
- break # Found HD, use it
- elif file.get("quality") == "sd" and file.get("link") and file.get('width', 0) > 500: # Basic check for decent SD
- sd_link = file.get("link") # Keep SD as fallback
-
- link_to_add = hd_link if hd_link else sd_link
- if link_to_add:
- all_videos.append(link_to_add)
-
- break # Success for this page, move to next page
-
- except requests.exceptions.HTTPError as e:
- print(f"HTTP Error fetching Pexels videos: {e.response.status_code} {e.response.text}")
- if e.response.status_code == 429: # Rate limit
- print(f"Pexels rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay} seconds...")
- time.sleep(retry_delay)
- retry_delay *= 2
- elif e.response.status_code == 400: # Bad request often means invalid query
- print(f"Pexels API bad request (400) for query '{query}'. Skipping video search.")
- return None # Don't retry bad requests
- elif attempt < max_retries - 1:
- print(f"Retrying Pexels video search in {retry_delay} seconds...")
- time.sleep(retry_delay)
- retry_delay *= 2
- else:
- print("Max retries reached for Pexels video search after HTTP error.")
- break # Max retries for this page
- except requests.exceptions.Timeout:
- print(f"Pexels video search timed out (attempt {attempt+1}/{max_retries}).")
+ except requests.exceptions.RequestException as e:
+ print(f"Request exception: {e}")
if attempt < max_retries - 1:
- print(f"Retrying Pexels video search in {retry_delay} seconds...")
- time.sleep(retry_delay)
- retry_delay *= 2
+ print(f"Retrying in {retry_delay} seconds...")
+ time.sleep(retry_delay)
+ retry_delay *= 2
else:
- print("Max retries reached for Pexels video search due to timeout.")
- break # Max retries for this page
- except requests.exceptions.RequestException as e:
- print(f"Pexels video search request exception: {e}")
- break # Stop trying for this page on general network errors
- except Exception as e:
- print(f"Unexpected error during Pexels video search: {e}")
- traceback.print_exc()
- break # Stop trying for this page
-
- # Reset retry delay for the next page
- retry_delay = 2
+ break
if all_videos:
random_video = random.choice(all_videos)
- print(f"Selected random video from {len(all_videos)} found for '{query}'")
+ print(f"Selected random video from {len(all_videos)} HD videos")
return random_video
else:
- print(f"No suitable Pexels videos found for query: '{query}'")
+ print("No suitable videos found after searching all pages.")
return None
def search_pexels_images(query, pexels_api_key):
"""Search for an image on Pexels by query."""
- if not pexels_api_key or pexels_api_key == "YOUR_PEXELS_API_KEY_HERE":
- print("ERROR: Pexels API key is missing or still a placeholder. Cannot search for images.")
- return None
headers = {'Authorization': pexels_api_key}
url = "https://api.pexels.com/v1/search"
- # Match orientation to target video resolution
- orient = "landscape" if TARGET_RESOLUTION and TARGET_RESOLUTION[0] > TARGET_RESOLUTION[1] else "portrait"
- params = {"query": query, "per_page": 10, "orientation": orient}
+ params = {"query": query, "per_page": 5, "orientation": "landscape"}
- max_retries = 2
- retry_delay = 2
- print(f"Searching Pexels images for: '{query}' (Orientation: {orient})")
+ max_retries = 3
+ retry_delay = 1
for attempt in range(max_retries):
try:
- response = requests.get(url, headers=headers, params=params, timeout=15)
- response.raise_for_status() # Check for 4xx/5xx errors
-
- data = response.json()
- photos = data.get("photos", [])
- if photos:
- # Select from 'original', 'large2x', 'large' in order of preference
- valid_photos = []
- for photo in photos:
- src = photo.get("src", {})
- # Prefer larger sizes but fall back
- img_url = src.get("original") or src.get("large2x") or src.get("large") or src.get("medium")
- if img_url:
- valid_photos.append(img_url)
-
- if valid_photos:
- chosen_url = random.choice(valid_photos)
- print(f"Found {len(valid_photos)} Pexels images for '{query}', selected one.")
- return chosen_url
+ response = requests.get(url, headers=headers, params=params, timeout=10)
+
+ if response.status_code == 200:
+ data = response.json()
+ photos = data.get("photos", [])
+ if photos:
+ photo = random.choice(photos[:min(5, len(photos))])
+ img_url = photo.get("src", {}).get("original")
+ return img_url
else:
- print(f"No valid image URLs found in Pexels response for '{query}'.")
+ print(f"No images found for query: {query}")
return None
- else:
- # print(f"No Pexels images found for query: {query}") # Less verbose
- return None
- except requests.exceptions.HTTPError as e:
- print(f"HTTP Error fetching Pexels images: {e.response.status_code} {e.response.text}")
- if e.response.status_code == 429: # Rate limit
- print(f"Pexels rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay} seconds...")
- time.sleep(retry_delay)
- retry_delay *= 2
- elif e.response.status_code == 400: # Bad request
- print(f"Pexels API bad request (400) for query '{query}'. Skipping image search.")
- return None
- elif attempt < max_retries - 1:
- print(f"Retrying Pexels image search in {retry_delay} seconds...")
- time.sleep(retry_delay)
- retry_delay *= 2
+ elif response.status_code == 429:
+ print(f"Rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay} seconds...")
+ time.sleep(retry_delay)
+ retry_delay *= 2
else:
- print("Max retries reached for Pexels image search after HTTP error.")
- return None # Max retries failed
- except requests.exceptions.Timeout:
- print(f"Pexels image search timed out (attempt {attempt+1}/{max_retries}).")
- if attempt < max_retries - 1:
- print(f"Retrying Pexels image search in {retry_delay} seconds...")
- time.sleep(retry_delay)
- retry_delay *= 2
- else:
- print("Max retries reached for Pexels image search due to timeout.")
- return None # Max retries failed
+ print(f"Error fetching images: {response.status_code} {response.text}")
+ if attempt < max_retries - 1:
+ print(f"Retrying in {retry_delay} seconds...")
+ time.sleep(retry_delay)
+ retry_delay *= 2
+
except requests.exceptions.RequestException as e:
- print(f"Pexels image search request exception: {e}")
- return None # Don't retry general network errors
- except Exception as e:
- print(f"Unexpected error during Pexels image search: {e}")
- traceback.print_exc()
- return None
-
- print(f"No Pexels images found for query: '{query}' after all attempts.")
+ print(f"Request exception: {e}")
+ if attempt < max_retries - 1:
+ print(f"Retrying in {retry_delay} seconds...")
+ time.sleep(retry_delay)
+ retry_delay *= 2
+
+ print(f"No Pexels images found for query: {query} after all attempts")
return None
def search_google_images(query):
- """Search for images on Google Images (use cautiously, might break)."""
- print(f"Attempting Google Image search for (use with caution): '{query}'")
+ """Search for images on Google Images (for news-related queries)"""
try:
- # Using a simpler, potentially more stable URL structure
- search_url = f"https://www.google.com/search?q={quote(query)}&source=lnms&tbm=isch"
+ search_url = f"https://www.google.com/search?q={quote(query)}&tbm=isch"
headers = {"User-Agent": USER_AGENT}
response = requests.get(search_url, headers=headers, timeout=10)
- response.raise_for_status() # Check for HTTP errors
-
soup = BeautifulSoup(response.text, "html.parser")
+ img_tags = soup.find_all("img")
image_urls = []
- # Google changes its HTML structure often. This targets common patterns.
- # Pattern 1: Images directly in
tags (often thumbnails or requires JS)
- for img in soup.find_all("img"):
- src = img.get("src") or img.get("data-src")
- if src and src.startswith("http") and "gstatic.com" not in src and "google.com" not in src:
- image_urls.append(src)
- elif src and src.startswith('data:image'):
- # Skip base64 images as they are usually small thumbnails
- pass
-
- # Pattern 2: Look for JSON data embedded in script tags (more reliable if found)
- # This requires more complex parsing and adapting to Google's changing structure.
- # Example (might need adjustment):
- # scripts = soup.find_all("script")
- # for script in scripts:
- # if script.string and 'var AF_data' in script.string: # Example marker
- # # Complex parsing logic here to extract URLs from the JS object
- # pass
+ for img in img_tags:
+ src = img.get("src", "")
+ if src.startswith("http") and "gstatic" not in src:
+ image_urls.append(src)
if image_urls:
- # Filter out potential low-quality results (e.g., very short URLs)
- filtered_urls = [url for url in image_urls if len(url) > 50 and ('.jpg' in url or '.png' in url or '.jpeg' in url)]
- if not filtered_urls: filtered_urls = image_urls # Use original if filter removed everything
-
- # Return a random one from the first few potentially relevant results
- num_to_consider = min(len(filtered_urls), 10)
- chosen_url = random.choice(filtered_urls[:num_to_consider])
- print(f"Found {len(filtered_urls)} potential Google images, selected one.")
- return chosen_url
+ return random.choice(image_urls[:5]) if len(image_urls) >= 5 else image_urls[0]
else:
- print(f"No suitable Google Images found for query: '{query}' with current parsing method.")
+ print(f"No Google Images found for query: {query}")
return None
- except requests.exceptions.RequestException as e:
- print(f"Error during Google Images request: {e}")
- return None
except Exception as e:
- print(f"Error parsing Google Images HTML: {e}")
- # traceback.print_exc() # Uncomment for detailed parsing errors
+ print(f"Error in Google Images search: {e}")
return None
-
def download_image(image_url, filename):
"""Download an image from a URL to a local file with enhanced error handling."""
- if not image_url or not isinstance(image_url, str) or not image_url.startswith('http'):
- print(f"Error: Invalid image URL provided for download: {image_url}")
- return None
try:
- headers = {"User-Agent": USER_AGENT, "Accept": "image/jpeg,image/png,image/*"} # Be more specific
- print(f"Downloading image: {image_url} \n to: {filename}")
- response = requests.get(image_url, headers=headers, stream=True, timeout=20) # Increased timeout
- response.raise_for_status() # Check for download errors (4xx, 5xx)
-
- # Check content type if possible
- content_type = response.headers.get('Content-Type', '').lower()
- if content_type and 'image' not in content_type:
- print(f"Warning: URL content type ('{content_type}') might not be an image. Proceeding anyway.")
+ headers = {"User-Agent": USER_AGENT}
+ print(f"Downloading image from: {image_url} to {filename}")
+ response = requests.get(image_url, headers=headers, stream=True, timeout=15)
+ response.raise_for_status()
- # Download the content
with open(filename, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
- # Basic file size check
- if os.path.getsize(filename) < 1024: # Less than 1KB is suspicious
- print(f"Warning: Downloaded image file '{filename}' is very small. It might be invalid.")
- # Optionally remove it here, but validation below is better
- # os.remove(filename); return None
-
print(f"Image downloaded successfully to: {filename}")
- # Validate the downloaded image using Pillow
try:
img = Image.open(filename)
- img.verify() # Check if Pillow can read the header and format
- # Re-open after verify to load image data
+ img.verify()
img = Image.open(filename)
- # Check for minimum dimensions (optional)
- # min_dim = 100
- # if img.width < min_dim or img.height < min_dim:
- # print(f"Warning: Image {filename} is very small ({img.width}x{img.height}).")
-
- # Convert to RGB if necessary (common requirement for video processing)
- if img.mode not in ['RGB', 'RGBA']: # Allow RGBA for transparency if needed later, but RGB is safer
- print(f"Converting image {filename} from {img.mode} to RGB.")
+ if img.mode != 'RGB':
img = img.convert('RGB')
- img.save(filename, quality=90) # Save with decent quality
- elif img.mode == 'RGBA':
- # If RGBA, consider converting to RGB or handling alpha channel appropriately
- print(f"Image {filename} has alpha channel (RGBA). Converting to RGB.")
- img = img.convert('RGB')
- img.save(filename, quality=90)
-
- img.close() # Close the image file handle
+ img.save(filename)
print(f"Image validated and processed: {filename}")
return filename
- except (IOError, SyntaxError, Image.UnidentifiedImageError) as e_validate:
- print(f"ERROR: Downloaded file '{filename}' is not a valid image or is corrupted: {e_validate}")
+ except Exception as e_validate:
+ print(f"Downloaded file is not a valid image: {e_validate}")
if os.path.exists(filename):
- try:
- os.remove(filename)
- print(f"Removed invalid image file: {filename}")
- except OSError as e_remove:
- print(f"Error removing invalid image file '{filename}': {e_remove}")
+ os.remove(filename)
return None
except requests.exceptions.RequestException as e_download:
- print(f"ERROR: Image download failed for {image_url}: {e_download}")
- # Clean up potentially incomplete file
+ print(f"Image download error: {e_download}")
if os.path.exists(filename):
- try: os.remove(filename)
- except OSError: pass
+ os.remove(filename)
return None
except Exception as e_general:
- print(f"ERROR: General error during image processing for {image_url}: {e_general}")
- traceback.print_exc()
+ print(f"General error during image processing: {e_general}")
if os.path.exists(filename):
- try: os.remove(filename)
- except OSError: pass
+ os.remove(filename)
return None
def download_video(video_url, filename):
"""Download a video from a URL to a local file."""
- if not video_url or not isinstance(video_url, str) or not video_url.startswith('http'):
- print(f"Error: Invalid video URL provided for download: {video_url}")
- return None
try:
- headers = {"User-Agent": USER_AGENT} # Pexels might not require this, but good practice
- print(f"Downloading video: {video_url} \n to: {filename}")
- response = requests.get(video_url, headers=headers, stream=True, timeout=90) # Generous timeout for videos
- response.raise_for_status() # Check for download errors (4xx, 5xx)
-
- # Optional: Check content type
- content_type = response.headers.get('Content-Type', '').lower()
- if content_type and 'video' not in content_type:
- print(f"Warning: URL content type ('{content_type}') might not be a video. Proceeding.")
-
+ response = requests.get(video_url, stream=True, timeout=30)
+ response.raise_for_status()
with open(filename, 'wb') as f:
- total_downloaded = 0
- start_time = time.time()
- for chunk in response.iter_content(chunk_size=1024*1024): # Larger chunks (1MB) for video
- if chunk: # filter out keep-alive new chunks
- f.write(chunk)
- total_downloaded += len(chunk)
- end_time = time.time()
- download_speed = (total_downloaded / (1024*1024)) / (end_time - start_time + 1e-6) # MB/s
- print(f"Video downloaded successfully to: {filename} ({total_downloaded / (1024*1024):.2f} MB at {download_speed:.2f} MB/s)")
-
- # Basic validation: check file size
- if os.path.getsize(filename) < 10 * 1024: # Check if file is suspiciously small (e.g., < 10KB)
- print(f"Warning: Downloaded video file '{filename}' is very small. It might be invalid.")
- # Keep the file for now, let moviepy handle potential errors later
-
+ for chunk in response.iter_content(chunk_size=8192):
+ f.write(chunk)
+ print(f"Video downloaded successfully to: {filename}")
return filename
- except requests.exceptions.RequestException as e:
- print(f"ERROR: Video download failed for {video_url}: {e}")
- if os.path.exists(filename):
- try: os.remove(filename) # Clean up failed download
- except OSError: pass
- return None
- except Exception as e_general:
- print(f"ERROR: General error during video download for {video_url}: {e_general}")
- traceback.print_exc()
+ except Exception as e:
+ print(f"Video download error: {e}")
if os.path.exists(filename):
- try: os.remove(filename)
- except OSError: pass
+ os.remove(filename)
return None
-
def generate_media(prompt, user_image=None, current_index=0, total_segments=1):
"""
- Generate a visual asset: Try video (based on probability), then Pexels image, then Google (news), then fallback Pexels image.
+ Generate a visual asset by first searching for a video or using a specific search strategy.
+ For news-related queries, use Google Images.
Returns a dict: {'path': , 'asset_type': 'video' or 'image'}.
"""
- # Sanitize prompt for use in filenames
safe_prompt = re.sub(r'[^\w\s-]', '', prompt).strip().replace(' ', '_')
- if not safe_prompt: # Handle cases where prompt becomes empty after sanitizing
- safe_prompt = f"media_{current_index}"
- safe_prompt = safe_prompt[:50] # Limit filename part length
- print(f"\n--- Generating Media for Prompt: '{prompt}' (Segment {current_index+1}/{total_segments}) ---")
-
- # --- Strategy ---
- # 1. Video? (Based on probability) -> Pexels Video Search -> Download
- # 2. Image? -> Pexels Image Search -> Download
- # 3. News? -> Google Image Search -> Download
- # 4. Fallback? -> Generic Pexels Image Search -> Download
- # 5. Absolute Fallback? -> Generate Color Background
+ if "news" in prompt.lower():
+ print(f"News-related query detected: {prompt}. Using Google Images...")
+ image_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}_news.jpg")
+ image_url = search_google_images(prompt)
+ if image_url:
+ downloaded_image = download_image(image_url, image_file)
+ if downloaded_image:
+ print(f"News image saved to {downloaded_image}")
+ return {"path": downloaded_image, "asset_type": "image"}
+ else:
+ print(f"Google Images search failed for prompt: {prompt}")
- # 1. Try Video first based on probability
if random.random() < video_clip_probability:
- print(f"Attempting video search (Probability: {video_clip_probability*100:.0f}%)")
- video_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}_video_{current_index}.mp4")
+ video_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}_video.mp4")
video_url = search_pexels_videos(prompt, PEXELS_API_KEY)
if video_url:
downloaded_video = download_video(video_url, video_file)
- if downloaded_video and os.path.exists(downloaded_video):
- # Basic check: File size > 10KB?
- if os.path.getsize(downloaded_video) > 10 * 1024:
- print(f"Video asset downloaded: {downloaded_video}")
- # Optional: Deeper check with moviepy (adds overhead)
- # try:
- # with VideoFileClip(downloaded_video) as test_clip:
- # if test_clip.duration > 0:
- # print(f"Video asset usable: {downloaded_video}")
- # return {"path": downloaded_video, "asset_type": "video"}
- # else: print(f"Downloaded video file seems invalid (duration 0): {downloaded_video}")
- # except Exception as e: print(f"Error testing downloaded video {downloaded_video}: {e}")
- # If basic check passed, return it and let create_clip handle errors
- return {"path": downloaded_video, "asset_type": "video"}
- else:
- print(f"Downloaded video file is too small, likely invalid: {downloaded_video}")
- try: os.remove(downloaded_video)
- except OSError: pass
- # else: print(f"Pexels video download failed for prompt: '{prompt}'") # Covered by download_video logs
- # else: print(f"Pexels video search failed for prompt: '{prompt}'") # Covered by search_pexels_videos logs
- else:
- print("Skipping video search based on probability.")
-
- # 2. Try Pexels Image
- print("Attempting Pexels image search...")
- image_file_pexels = os.path.join(TEMP_FOLDER, f"{safe_prompt}_pexels_{current_index}.jpg")
- image_url_pexels = search_pexels_images(prompt, PEXELS_API_KEY)
- if image_url_pexels:
- downloaded_image_pexels = download_image(image_url_pexels, image_file_pexels)
- if downloaded_image_pexels and os.path.exists(downloaded_image_pexels):
- print(f"Pexels image asset saved: {downloaded_image_pexels}")
- return {"path": downloaded_image_pexels, "asset_type": "image"}
- # else: print(f"Pexels image download failed for prompt: '{prompt}'") # Covered by download_image logs
- # else: print(f"Pexels image search failed for prompt: '{prompt}'") # Covered by search_pexels_images logs
-
-
- # 3. If "news" in prompt (case-insensitive), try Google Images as a secondary option
- news_keywords = ["news", "report", "breaking", "headline", "current event"] # Expand if needed
- if any(keyword in prompt.lower() for keyword in news_keywords):
- print(f"News-related query detected: '{prompt}'. Trying Google Images...")
- image_file_google = os.path.join(TEMP_FOLDER, f"{safe_prompt}_google_{current_index}.jpg")
- image_url_google = search_google_images(prompt)
- if image_url_google:
- downloaded_image_google = download_image(image_url_google, image_file_google)
- if downloaded_image_google and os.path.exists(downloaded_image_google):
- print(f"Google image asset saved: {downloaded_image_google}")
- return {"path": downloaded_image_google, "asset_type": "image"}
- # else: print(f"Google Images download failed for prompt: '{prompt}'") # Covered by download_image logs
- # else: print(f"Google Images search failed for prompt: '{prompt}'") # Covered by search_google_images logs
-
- # 4. Fallback to generic Pexels image search if everything else failed
- print("Primary searches failed or skipped. Attempting fallback Pexels image search...")
- fallback_terms = ["abstract", "texture", "technology", "nature", "background", "cityscape", "pattern"]
- fallback_term = random.choice(fallback_terms)
- print(f"Using fallback term: '{fallback_term}'")
- fallback_file = os.path.join(TEMP_FOLDER, f"fallback_{fallback_term}_{current_index}.jpg")
- fallback_url = search_pexels_images(fallback_term, PEXELS_API_KEY)
- if fallback_url:
- downloaded_fallback = download_image(fallback_url, fallback_file)
- if downloaded_fallback and os.path.exists(downloaded_fallback):
- print(f"Fallback image asset saved: {downloaded_fallback}")
- return {"path": downloaded_fallback, "asset_type": "image"}
- # else: print(f"Fallback image download failed for term: '{fallback_term}'")
- # else: print(f"Fallback image search failed for term: '{fallback_term}'")
-
- # 5. Absolute fallback: Generate a simple color background (if ImageMagick is available)
- try:
- print("All media generation failed. Creating a simple color background as last resort.")
- color_bg_path = os.path.join(TEMP_FOLDER, f"color_bg_{current_index}.png")
- # Ensure TARGET_RESOLUTION is set before calling this
- if TARGET_RESOLUTION:
- w, h = TARGET_RESOLUTION
- # Pick a random dark color
- r, g, b = random.randint(10, 60), random.randint(10, 60), random.randint(10, 60)
- color = f"rgb({r},{g},{b})"
- # Use ImageMagick 'convert' command - requires it to be installed and accessible via mpy_config
- if mpy_config.get("IMAGEMAGICK_BINARY") != "auto-detect":
- cmd = f"{mpy_config.get('IMAGEMAGICK_BINARY')} -size {w}x{h} xc:'{color}' '{color_bg_path}'"
- print(f"Executing: {cmd}")
- exit_code = os.system(cmd)
- if exit_code == 0 and os.path.exists(color_bg_path) and os.path.getsize(color_bg_path) > 100:
- print(f"Generated color background: {color_bg_path}")
- return {"path": color_bg_path, "asset_type": "image"}
- else:
- print(f"Failed to generate color background using ImageMagick (Exit code: {exit_code}).")
- return None
- else:
- print("Cannot generate color background: ImageMagick binary not configured in moviepy.")
- return None
+ if downloaded_video:
+ print(f"Video asset saved to {downloaded_video}")
+ return {"path": downloaded_video, "asset_type": "video"}
else:
- print("Cannot generate color background: TARGET_RESOLUTION not set.")
- return None
- except Exception as e:
- print(f"Error generating color background: {e}")
- traceback.print_exc()
- return None
+ print(f"Pexels video search failed for prompt: {prompt}")
+
+ image_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}.jpg")
+ image_url = search_pexels_images(prompt, PEXELS_API_KEY)
+ if image_url:
+ downloaded_image = download_image(image_url, image_file)
+ if downloaded_image:
+ print(f"Image asset saved to {downloaded_image}")
+ return {"path": downloaded_image, "asset_type": "image"}
+ else:
+ print(f"Pexels image download failed for prompt: {prompt}")
+
+ fallback_terms = ["nature", "people", "landscape", "technology", "business"]
+ for term in fallback_terms:
+ print(f"Trying fallback image search with term: {term}")
+ fallback_file = os.path.join(TEMP_FOLDER, f"fallback_{term}.jpg")
+ fallback_url = search_pexels_images(term, PEXELS_API_KEY)
+ if fallback_url:
+ downloaded_fallback = download_image(fallback_url, fallback_file)
+ if downloaded_fallback:
+ print(f"Fallback image saved to {downloaded_fallback}")
+ return {"path": downloaded_fallback, "asset_type": "image"}
+ else:
+ print(f"Fallback image download failed for term: {term}")
+ else:
+ print(f"Fallback image search failed for term: {term}")
- # Should not be reached if color background works, but as a final safety net:
- print(f"ERROR: Failed to generate *any* visual asset for prompt: '{prompt}'")
+ print(f"Failed to generate visual asset for prompt: {prompt}")
return None
-
def generate_silent_audio(duration, sample_rate=24000):
"""Generate a silent WAV audio file lasting 'duration' seconds."""
- try:
- # Ensure duration is positive and reasonable
- duration = max(0.1, duration)
- num_samples = int(duration * sample_rate)
- silence = np.zeros(num_samples, dtype=np.float32)
-
- # Ensure TEMP_FOLDER exists and is writable
- if not TEMP_FOLDER or not os.path.isdir(TEMP_FOLDER):
- print("Error: TEMP_FOLDER not set or invalid for silent audio.")
- # Create a fallback temporary file (less ideal as it might not be cleaned up)
- try:
- silent_file = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
- silent_path = silent_file.name
- silent_file.close() # Close handle immediately after getting name
- except Exception as temp_err:
- print(f"Error creating fallback temp file for silence: {temp_err}")
- return None
- else:
- # Use microsecond timestamp for uniqueness
- timestamp = int(time.time() * 1_000_000)
- silent_path = os.path.join(TEMP_FOLDER, f"silent_{timestamp}.wav")
-
- sf.write(silent_path, silence, sample_rate)
- # Verify file creation and size
- if os.path.exists(silent_path) and os.path.getsize(silent_path) > 0:
- print(f"Silent audio generated: {silent_path} ({duration:.2f}s)")
- return silent_path
- else:
- print(f"Error: Failed to write silent audio file to {silent_path}")
- return None
- except Exception as e:
- print(f"Error generating silent audio: {e}")
- traceback.print_exc()
- return None
-
+ num_samples = int(duration * sample_rate)
+ silence = np.zeros(num_samples, dtype=np.float32)
+ silent_path = os.path.join(TEMP_FOLDER, f"silent_{int(time.time())}.wav")
+ sf.write(silent_path, silence, sample_rate)
+ print(f"Silent audio generated: {silent_path}")
+ return silent_path
def generate_tts(text, voice):
"""
Generate TTS audio using Kokoro, falling back to gTTS or silent audio if needed.
- Uses global `selected_voice` and `voice_speed`.
"""
- if not text:
- print("Warning: Empty text received for TTS. Generating 1s silence.")
- return generate_silent_audio(duration=1.0)
-
- # Sanitize text slightly for filename (limit length, basic chars)
- safe_text_part = re.sub(r'[^\w-]', '', text[:20]).strip().replace(' ', '_')
- timestamp = int(time.time() * 1_000_000) # More unique timestamp
- if not safe_text_part: safe_text_part = f"tts_{timestamp}"
- else: safe_text_part = f"{safe_text_part}_{timestamp}"
-
- # Ensure TEMP_FOLDER is valid
- if not TEMP_FOLDER or not os.path.isdir(TEMP_FOLDER):
- print("ERROR: TEMP_FOLDER not set or invalid for TTS generation.")
- return generate_silent_audio(duration=max(1.0, len(text.split()) * 0.5)) # Fallback silence
-
- file_path = os.path.join(TEMP_FOLDER, f"{safe_text_part}.wav")
-
- # Decide voice: Use global `selected_voice` if `voice` is the default 'en'
- kokoro_voice_to_use = selected_voice if voice == 'en' else voice
- print(f"Generating TTS for: '{text[:60]}...' (Voice: {kokoro_voice_to_use}, Speed: {voice_speed})")
-
- # --- Try Kokoro TTS ---
- if pipeline is not None: # Check if Kokoro was initialized successfully
- try:
- generator = pipeline(text, voice=kokoro_voice_to_use, speed=voice_speed, split_pattern=r'\n+') # Split on newlines if any
- audio_segments = []
- output_sample_rate = 24000 # Kokoro's default rate
-
- for i, (gs, ps, audio) in enumerate(generator):
- if audio is not None and isinstance(audio, np.ndarray) and audio.ndim > 0 and audio.size > 0:
- # Ensure audio is float32
- if audio.dtype != np.float32:
- if audio.dtype == np.int16:
- audio = audio.astype(np.float32) / 32768.0
- else:
- print(f"Warning: Unexpected audio dtype {audio.dtype} from Kokoro. Attempting conversion.")
- try: audio = audio.astype(np.float32) # Generic attempt
- except Exception: print("Conversion failed."); continue # Skip segment if conversion fails
- audio_segments.append(audio)
- else:
- print(f"Warning: Kokoro returned empty or invalid audio segment {i} for text.")
-
- if not audio_segments:
- print("Error: Kokoro generated no valid audio segments.")
- raise ValueError("No audio data from Kokoro")
-
- # Concatenate segments if needed
- full_audio = np.concatenate(audio_segments) if len(audio_segments) > 1 else audio_segments[0]
-
- # Check final audio shape and content
- if full_audio is None or full_audio.ndim == 0 or full_audio.size == 0:
- print("Error: Final concatenated audio from Kokoro is invalid.")
- raise ValueError("Invalid final audio data from Kokoro")
-
- # Check for NaN or Inf values
- if np.isnan(full_audio).any() or np.isinf(full_audio).any():
- print("Warning: Kokoro audio contains NaN or Inf values. Cleaning.")
- full_audio = np.nan_to_num(full_audio) # Replace NaN with 0, Inf with large numbers
-
- # Normalize audio slightly to prevent clipping
- max_abs_val = np.max(np.abs(full_audio))
- if max_abs_val > 0: # Avoid division by zero
- if max_abs_val > 1.0:
- print("Normalizing Kokoro audio to prevent clipping.")
- full_audio = full_audio / max_abs_val * 0.98
- else:
- print("Warning: Kokoro generated silent audio.")
-
-
- sf.write(file_path, full_audio, output_sample_rate)
-
- # Verify file write
- if os.path.exists(file_path) and os.path.getsize(file_path) > 100: # Check size > 100 bytes
- print(f"TTS audio saved: {file_path} (Kokoro)")
- return file_path
- else:
- print(f"Error: Failed to write Kokoro TTS file or file is too small: {file_path}")
- raise ValueError("Kokoro file write failed")
-
- except Exception as e_kokoro:
- print(f"Error with Kokoro TTS: {e_kokoro}. Trying gTTS fallback...")
- # traceback.print_exc() # Uncomment for detailed Kokoro errors
- else:
- print("Kokoro pipeline not available. Skipping Kokoro TTS attempt.")
+ safe_text = re.sub(r'[^\w\s-]', '', text[:10]).strip().replace(' ', '_')
+ file_path = os.path.join(TEMP_FOLDER, f"tts_{safe_text}.wav")
+ if os.path.exists(file_path):
+ print(f"Using cached TTS for text '{text[:10]}...'")
+ return file_path
- # --- Try gTTS Fallback ---
try:
- print("Attempting gTTS fallback...")
- tts = gTTS(text=text, lang='en', slow= (voice_speed < 0.8) ) # Basic speed control approximation
- # Save MP3 temporarily
- mp3_path = os.path.join(TEMP_FOLDER, f"{safe_text_part}_gtts.mp3")
- tts.save(mp3_path)
-
- # Convert MP3 to WAV using pydub
- audio = AudioSegment.from_mp3(mp3_path)
- # Export as WAV (pydub handles sample rate conversion if needed, defaults reasonable)
- # Ensure export path is the same WAV path we intended originally
- audio.export(file_path, format="wav")
-
- # Clean up temporary MP3
- if os.path.exists(mp3_path):
- try: os.remove(mp3_path)
- except OSError: pass
-
- # Check if the generated WAV file is valid
- if os.path.exists(file_path) and os.path.getsize(file_path) > 100: # Basic size check
- print(f"Fallback TTS saved: {file_path} (gTTS)")
- return file_path
- else:
- print(f"Error: gTTS generated an invalid or empty WAV file: {file_path}")
- if os.path.exists(file_path):
- try: os.remove(file_path)
- except OSError: pass
- raise ValueError("gTTS output file invalid")
-
- except ImportError:
- print("Error: gTTS or pydub might not be installed. Cannot use gTTS fallback.")
- # Skip to silence generation
- except Exception as e_gtts:
- print(f"Error with gTTS fallback: {e_gtts}. Generating silence.")
- # traceback.print_exc() # Uncomment for detailed gTTS errors
-
-
- # --- Generate Silence as final fallback ---
- print("Generating silence as final TTS fallback.")
- # Estimate duration based on text length if possible
- estimated_duration = max(1.0, len(text.split()) * (0.6 / voice_speed)) # Rough estimate adjusted by speed
- return generate_silent_audio(duration=estimated_duration)
-
-
-def apply_kenburns_effect(clip, target_resolution, effect_type="random"):
- """Apply a smooth Ken Burns effect (zoom/pan) to an image clip."""
- try:
- target_w, target_h = target_resolution
- if not isinstance(clip, ImageClip):
- print("Warning: Ken Burns effect applied to non-ImageClip. Results may vary.")
- # Attempt to get dimensions anyway
- if not hasattr(clip, 'w') or clip.w is None or clip.h is None or clip.w == 0 or clip.h == 0:
- print("Error applying Ken Burns: Invalid clip dimensions.")
- return clip # Return original clip
-
- # Ensure clip has a duration set
- if clip.duration is None or clip.duration <= 0:
- print("Error applying Ken Burns: Clip duration is not set or is zero.")
- # Set a default duration? Might cause issues later. Return unmodified for now.
- return clip
-
- clip_w, clip_h = clip.w, clip.h
- clip_aspect = clip_w / clip_h
- target_aspect = target_w / target_h
-
- # --- Resize to cover target area ---
- if clip_aspect >= target_aspect: # Includes case where aspect ratios are equal
- # Image is wider than or equal to target: Resize based on height
- scale_factor = target_h / clip_h
- resized_w = int(clip_w * scale_factor)
- resized_h = target_h
- else:
- # Image is taller than target: Resize based on width
- scale_factor = target_w / clip_w
- resized_w = target_w
- resized_h = int(clip_h * scale_factor)
-
- # Use LANCZOS for resizing images - better quality
- # Need to handle potential mask resizing as well
- resized_clip = clip.resize(newsize=(resized_w, resized_h))
-
- # --- Apply scale for zoom effect ---
- zoom_scale = 1.15 # How much larger the image is than the frame initially
- zoomed_w = int(resized_w * zoom_scale)
- zoomed_h = int(resized_h * zoom_scale)
- zoomed_clip = resized_clip.resize(newsize=(zoomed_w, zoomed_h))
-
- # --- Determine movement parameters ---
- max_offset_x = max(0, zoomed_w - target_w)
- max_offset_y = max(0, zoomed_h - target_h)
-
- available_effects = ["zoom-in", "zoom-out", "pan-left", "pan-right", "pan-up", "pan-down", "diag-tl-br", "diag-tr-bl"]
- if effect_type == "random" or effect_type not in available_effects:
- effect_type = random.choice(available_effects)
- if effect_type not in available_effects: # Should not happen, but safety check
- effect_type = "zoom-in"
-
- print(f"Applying Ken Burns effect: {effect_type}")
-
- # Define start and end positions/zooms based on effect type
- center_x = zoomed_w / 2
- center_y = zoomed_h / 2
- start_pos = (center_x, center_y)
- end_pos = (center_x, center_y)
- start_visual_zoom = 1.0 # 1.0 = fits target, >1.0 = zoomed in
- end_visual_zoom = 1.0
-
- if effect_type == "zoom-in":
- start_visual_zoom = 1.0
- end_visual_zoom = zoom_scale # Zoom in to the max pre-zoom
- elif effect_type == "zoom-out":
- start_visual_zoom = zoom_scale
- end_visual_zoom = 1.0
- elif effect_type == "pan-left":
- start_pos = (center_x + max_offset_x / 2, center_y)
- end_pos = (center_x - max_offset_x / 2, center_y)
- elif effect_type == "pan-right":
- start_pos = (center_x - max_offset_x / 2, center_y)
- end_pos = (center_x + max_offset_x / 2, center_y)
- elif effect_type == "pan-up":
- start_pos = (center_x, center_y + max_offset_y / 2)
- end_pos = (center_x, center_y - max_offset_y / 2)
- elif effect_type == "pan-down":
- start_pos = (center_x, center_y - max_offset_y / 2)
- end_pos = (center_x, center_y + max_offset_y / 2)
- elif effect_type == "diag-tl-br": # Top-Left to Bottom-Right
- start_pos = (center_x - max_offset_x / 2, center_y - max_offset_y / 2)
- end_pos = (center_x + max_offset_x / 2, center_y + max_offset_y / 2)
- elif effect_type == "diag-tr-bl": # Top-Right to Bottom-Left
- start_pos = (center_x + max_offset_x / 2, center_y - max_offset_y / 2)
- end_pos = (center_x - max_offset_x / 2, center_y + max_offset_y / 2)
-
- # --- Define the transformation function for moviepy's fl ---
- # This function operates on each frame (as a numpy array)
- def transform_frame(get_frame, t):
- # get_frame(t) returns the frame of the *input* clip (zoomed_clip) at time t
- frame = get_frame(t)
-
- # Smooth interpolation (cosine ease-in-out)
- ratio = t / zoomed_clip.duration if zoomed_clip.duration > 0 else 0
- ratio = 0.5 - 0.5 * math.cos(math.pi * ratio) # Ease in/out
-
- # Interpolate zoom and position
- current_visual_zoom = start_visual_zoom + (end_visual_zoom - start_visual_zoom) * ratio
- current_center_x = start_pos[0] + (end_pos[0] - start_pos[0]) * ratio
- current_center_y = start_pos[1] + (end_pos[1] - start_pos[1]) * ratio
-
- # Calculate crop window size based on the current visual zoom needed
- # The crop window size should be the target size divided by the zoom factor
- crop_w = int(target_w / current_visual_zoom)
- crop_h = int(target_h / current_visual_zoom)
-
- # Ensure the crop window isn't larger than the actual frame dimensions
- crop_w = min(crop_w, zoomed_w)
- crop_h = min(crop_h, zoomed_h)
- # Ensure crop dimensions are positive
- if crop_w <= 0 or crop_h <= 0:
- print(f"Warning: Invalid crop dimensions ({crop_w}x{crop_h}) calculated in Ken Burns. Using target size.")
- crop_w = min(target_w, zoomed_w)
- crop_h = min(target_h, zoomed_h)
-
-
- # Clamp the center position to prevent cropping outside the image bounds
- # The center point is relative to the zoomed frame (zoomed_w, zoomed_h)
- min_center_x = crop_w / 2
- max_center_x = zoomed_w - crop_w / 2
- min_center_y = crop_h / 2
- max_center_y = zoomed_h - crop_h / 2
-
- # Ensure max > min before clamping
- if max_center_x < min_center_x: max_center_x = min_center_x
- if max_center_y < min_center_y: max_center_y = min_center_y
-
- clamped_center_x = max(min_center_x, min(current_center_x, max_center_x))
- clamped_center_y = max(min_center_y, min(current_center_y, max_center_y))
-
- # Use cv2.getRectSubPix for subpixel accuracy cropping
- if not isinstance(frame, np.ndarray):
- print("Warning: Frame is not numpy array in Ken Burns transform.")
- # Try to convert? Risky. Return frame resized to target.
- try: return cv2.resize(np.array(frame), (target_w, target_h)) # Attempt conversion
- except: return np.zeros((target_h, target_w, 3), dtype=np.uint8) # Return black frame on failure
-
- # Ensure frame is contiguous C-style array if needed by cv2 (often helps)
- if not frame.flags['C_CONTIGUOUS']:
- frame = np.ascontiguousarray(frame)
-
- try:
- # Ensure crop dimensions are integers
- crop_w_int, crop_h_int = int(round(crop_w)), int(round(crop_h))
- if crop_w_int <= 0 or crop_h_int <= 0: raise ValueError("Crop dimensions must be positive")
-
- cropped_frame = cv2.getRectSubPix(frame, (crop_w_int, crop_h_int), (clamped_center_x, clamped_center_y))
-
- except (cv2.error, ValueError) as e:
- print(f"Error during cv2.getRectSubPix: {e}")
- print(f" Frame shape: {frame.shape}, dtype: {frame.dtype}, flags: {frame.flags}")
- print(f" Req Crop size: ({crop_w:.2f}, {crop_h:.2f}), Int Crop: ({crop_w_int}, {crop_h_int})")
- print(f" Req Center: ({current_center_x:.2f}, {current_center_y:.2f}), Clamped: ({clamped_center_x:.2f}, {clamped_center_y:.2f})")
- # Fallback: return the original frame, resized to target
- return cv2.resize(frame, (target_w, target_h), interpolation=cv2.INTER_LINEAR)
-
-
- # Resize the cropped frame to the final target resolution
- # Use LANCZOS4 for high quality resize
- resized_frame = cv2.resize(cropped_frame, (target_w, target_h), interpolation=cv2.INTER_LANCZOS4)
-
- return resized_frame
-
- # Apply the transformation using moviepy's fl method
- # Apply to mask if the original clip had one
- final_clip = zoomed_clip.fl(transform_frame, apply_to=['mask'] if zoomed_clip.ismask else [])
- # Set the duration explicitly as fl might mess it up
- final_clip = final_clip.set_duration(clip.duration)
- return final_clip
-
+ kokoro_voice = selected_voice if voice == 'en' else voice
+ generator = pipeline(text, voice=kokoro_voice, speed=voice_speed, split_pattern=r'\n+')
+ audio_segments = []
+ for i, (gs, ps, audio) in enumerate(generator):
+ audio_segments.append(audio)
+ full_audio = np.concatenate(audio_segments) if len(audio_segments) > 1 else audio_segments[0]
+ sf.write(file_path, full_audio, 24000)
+ print(f"TTS audio saved to {file_path} (Kokoro)")
+ return file_path
except Exception as e:
- print(f"Error applying Ken Burns effect: {e}")
- traceback.print_exc()
- # Fallback: Return the original clip, resized to fill target
- print("Falling back to simple resize_to_fill.")
+ print(f"Error with Kokoro TTS: {e}")
try:
- return resize_to_fill(clip, target_resolution)
- except Exception as e_resize:
- print(f"Fallback resize_to_fill also failed: {e_resize}")
- return clip # Return original as last resort
-
+ print("Falling back to gTTS...")
+ tts = gTTS(text=text, lang='en')
+ mp3_path = os.path.join(TEMP_FOLDER, f"tts_{safe_text}.mp3")
+ tts.save(mp3_path)
+ audio = AudioSegment.from_mp3(mp3_path)
+ audio.export(file_path, format="wav")
+ os.remove(mp3_path)
+ print(f"Fallback TTS saved to {file_path} (gTTS)")
+ return file_path
+ except Exception as fallback_error:
+ print(f"Both TTS methods failed: {fallback_error}")
+ return generate_silent_audio(duration=max(3, len(text.split()) * 0.5))
+
+def apply_kenburns_effect(clip, target_resolution, effect_type=None):
+ """Apply a smooth Ken Burns effect with a single movement pattern."""
+ target_w, target_h = target_resolution
+ clip_aspect = clip.w / clip.h
+ target_aspect = target_w / target_h
+
+ if clip_aspect > target_aspect:
+ new_height = target_h
+ new_width = int(new_height * clip_aspect)
+ else:
+ new_width = target_w
+ new_height = int(new_width / clip_aspect)
+
+ clip = clip.resize(newsize=(new_width, new_height))
+ base_scale = 1.15
+ new_width = int(new_width * base_scale)
+ new_height = int(new_height * base_scale)
+ clip = clip.resize(newsize=(new_width, new_height))
+
+ max_offset_x = new_width - target_w
+ max_offset_y = new_height - target_h
+
+ available_effects = ["zoom-in", "zoom-out", "pan-left", "pan-right", "up-left"]
+ if effect_type is None or effect_type == "random":
+ effect_type = random.choice(available_effects)
+
+ if effect_type == "zoom-in":
+ start_zoom = 0.9
+ end_zoom = 1.1
+ start_center = (new_width / 2, new_height / 2)
+ end_center = start_center
+ elif effect_type == "zoom-out":
+ start_zoom = 1.1
+ end_zoom = 0.9
+ start_center = (new_width / 2, new_height / 2)
+ end_center = start_center
+ elif effect_type == "pan-left":
+ start_zoom = 1.0
+ end_zoom = 1.0
+ start_center = (max_offset_x + target_w / 2, (max_offset_y // 2) + target_h / 2)
+ end_center = (target_w / 2, (max_offset_y // 2) + target_h / 2)
+ elif effect_type == "pan-right":
+ start_zoom = 1.0
+ end_zoom = 1.0
+ start_center = (target_w / 2, (max_offset_y // 2) + target_h / 2)
+ end_center = (max_offset_x + target_w / 2, (max_offset_y // 2) + target_h / 2)
+ elif effect_type == "up-left":
+ start_zoom = 1.0
+ end_zoom = 1.0
+ start_center = (max_offset_x + target_w / 2, max_offset_y + target_h / 2)
+ end_center = (target_w / 2, target_h / 2)
+ else:
+ raise ValueError(f"Unsupported effect_type: {effect_type}")
+
+ def transform_frame(get_frame, t):
+ frame = get_frame(t)
+ ratio = t / clip.duration if clip.duration > 0 else 0
+ ratio = 0.5 - 0.5 * math.cos(math.pi * ratio)
+ current_zoom = start_zoom + (end_zoom - start_zoom) * ratio
+ crop_w = int(target_w / current_zoom)
+ crop_h = int(target_h / current_zoom)
+ current_center_x = start_center[0] + (end_center[0] - start_center[0]) * ratio
+ current_center_y = start_center[1] + (end_center[1] - start_center[1]) * ratio
+ min_center_x = crop_w / 2
+ max_center_x = new_width - crop_w / 2
+ min_center_y = crop_h / 2
+ max_center_y = new_height - crop_h / 2
+ current_center_x = max(min_center_x, min(current_center_x, max_center_x))
+ current_center_y = max(min_center_y, min(current_center_y, max_center_y))
+ cropped_frame = cv2.getRectSubPix(frame, (crop_w, crop_h), (current_center_x, current_center_y))
+ resized_frame = cv2.resize(cropped_frame, (target_w, target_h), interpolation=cv2.INTER_LANCZOS4)
+ return resized_frame
+
+ return clip.fl(transform_frame)
def resize_to_fill(clip, target_resolution):
- """Resize and crop a clip (video or image) to fill the target resolution, maintaining aspect ratio."""
- try:
- target_w, target_h = target_resolution
- if not hasattr(clip, 'size') or clip.size is None or not all(isinstance(d, (int, float)) and d > 0 for d in clip.size):
- print(f"Error: Cannot resize clip with invalid dimensions: size={getattr(clip, 'size', 'N/A')}")
- return clip # Return original clip if dimensions are bad
-
- clip_w, clip_h = clip.w, clip.h
- if clip_w == 0 or clip_h == 0:
- print(f"Error: Cannot resize clip with zero dimensions: {clip_w}x{clip_h}")
- return clip
-
- clip_aspect = clip_w / clip_h
- target_aspect = target_w / target_h
-
- if abs(clip_aspect - target_aspect) < 0.01:
- # Aspect ratios are close enough, just resize
- print(f"Resizing clip directly to {target_w}x{target_h}")
- resized_clip = clip.resize(newsize=(target_w, target_h))
- elif clip_aspect > target_aspect:
- # Clip is wider than target: Resize based on height, crop width
- print(f"Resizing clip to height {target_h}, cropping width.")
- resized_clip = clip.resize(height=target_h) # Moviepy calculates width
- # Ensure dimensions are updated after resize
- if resized_clip.w is None or resized_clip.h is None: raise ValueError("Resize failed to update dimensions")
-
- crop_amount = (resized_clip.w - target_w) / 2
- if crop_amount < 0: crop_amount = 0 # Avoid negative crop
-
- # Use moviepy's crop method (x1, y1, x2, y2)
- x1 = crop_amount
- x2 = resized_clip.w - crop_amount
- resized_clip = resized_clip.crop(x1=x1, y1=0, x2=x2, y2=resized_clip.h)
-
- else: # clip_aspect < target_aspect
- # Clip is taller than target: Resize based on width, crop height
- print(f"Resizing clip to width {target_w}, cropping height.")
- resized_clip = clip.resize(width=target_w) # Moviepy calculates height
- if resized_clip.w is None or resized_clip.h is None: raise ValueError("Resize failed to update dimensions")
-
- crop_amount = (resized_clip.h - target_h) / 2
- if crop_amount < 0: crop_amount = 0
-
- y1 = crop_amount
- y2 = resized_clip.h - crop_amount
- resized_clip = resized_clip.crop(x1=0, y1=y1, x2=resized_clip.w, y2=y2)
-
- # Final check and resize if dimensions are slightly off due to rounding
- if resized_clip.w != target_w or resized_clip.h != target_h:
- print(f"Warning: resize_to_fill resulted in dimensions {resized_clip.w}x{resized_clip.h}. Forcing final resize to {target_w}x{target_h}.")
- resized_clip = resized_clip.resize(newsize=(target_w, target_h))
-
- return resized_clip
-
- except Exception as e:
- print(f"Error in resize_to_fill: {e}")
- print(f"Clip info: duration={getattr(clip, 'duration', 'N/A')}, size={getattr(clip, 'size', 'N/A')}")
- traceback.print_exc()
- # Fallback: Try a simple resize without cropping if complex logic failed
- try:
- print("Attempting simple fallback resize.")
- return clip.resize(newsize=target_resolution)
- except Exception as e_resize:
- print(f"Fallback resize also failed: {e_resize}")
- # Return original clip as last resort
- return clip
+ """Resize and crop a clip to fill the target resolution while maintaining aspect ratio."""
+ target_w, target_h = target_resolution
+ clip_aspect = clip.w / clip.h
+ target_aspect = target_w / target_h
+
+ if clip_aspect > target_aspect:
+ clip = clip.resize(height=target_h)
+ crop_amount = (clip.w - target_w) / 2
+ clip = clip.crop(x1=crop_amount, x2=clip.w - crop_amount, y1=0, y2=clip.h)
+ else:
+ clip = clip.resize(width=target_w)
+ crop_amount = (clip.h - target_h) / 2
+ clip = clip.crop(x1=0, x2=clip.w, y1=crop_amount, y2=clip.h - crop_amount)
+ return clip
def find_mp3_files():
- """Search for any MP3 files in the current directory and subdirectories (DEPRECATED)."""
- # This function is no longer used as music is uploaded via Gradio and copied to "music.mp3"
- print("Warning: find_mp3_files() is deprecated. Music should be uploaded via interface.")
- return None
+ """Search for any MP3 files in the current directory and subdirectories."""
+ mp3_files = []
+ for root, dirs, files in os.walk('.'):
+ for file in files:
+ if file.endswith('.mp3'):
+ mp3_path = os.path.join(root, file)
+ mp3_files.append(mp3_path)
+ print(f"Found MP3 file: {mp3_path}")
+ return mp3_files[0] if mp3_files else None
def add_background_music(final_video, bg_music_volume=0.10):
- """Add background music using 'music.mp3' if it exists."""
+ """Add background music to the final video using any MP3 file found."""
try:
- # Expect the music file to be named 'music.mp3' in the current directory
bg_music_path = "music.mp3"
- if os.path.exists(bg_music_path) and os.path.getsize(bg_music_path) > 1000: # Check > 1KB
+ if bg_music_path and os.path.exists(bg_music_path):
print(f"Adding background music from: {bg_music_path}")
-
- # Load background music
- try:
- bg_music = AudioFileClip(bg_music_path)
- except Exception as e_load:
- print(f"Error loading background music file '{bg_music_path}': {e_load}")
- print("Skipping background music.")
- return final_video # Return original video
-
- # Ensure video has audio track to mix with, or create silent track
- if final_video.audio is None:
- print("Video has no primary audio track. Creating silent track.")
- # Create silent audio matching video duration
- silent_audio = AudioSegment.silent(duration=int(final_video.duration * 1000)) # pydub uses ms
- silent_audio_path = os.path.join(TEMP_FOLDER, "silent_for_bg.wav")
- silent_audio.export(silent_audio_path, format="wav")
- video_audio = AudioFileClip(silent_audio_path)
- final_video = final_video.set_audio(video_audio) # Add silent track
- else:
- video_audio = final_video.audio
-
-
- # Loop or trim background music to match video duration
+ bg_music = AudioFileClip(bg_music_path)
if bg_music.duration < final_video.duration:
loops_needed = math.ceil(final_video.duration / bg_music.duration)
- print(f"Looping background music {loops_needed} times.")
- # Avoid excessive looping for very short music
- if loops_needed > 50:
- print(f"Warning: Background music is very short ({bg_music.duration:.1f}s) compared to video ({final_video.duration:.1f}s). Looping capped at 50.")
- loops_needed = 50
-
- # Check if concatenate_audioclips is available and works
- try:
- bg_segments = [bg_music] * int(loops_needed)
- bg_music_looped = concatenate_audioclips(bg_segments)
- bg_music = bg_music_looped
- except Exception as e_concat:
- print(f"Error concatenating audio for looping: {e_concat}. Using single instance of BG music.")
- # Fallback: use only the first instance, trimmed later
-
- # Trim precisely to video duration
+ bg_segments = [bg_music] * loops_needed
+ bg_music = concatenate_audioclips(bg_segments)
bg_music = bg_music.subclip(0, final_video.duration)
-
- # Apply volume adjustment
bg_music = bg_music.volumex(bg_music_volume)
-
- # Mix audio tracks using CompositeAudioClip
- # Ensure both clips have the same duration before compositing for safety
- if abs(video_audio.duration - bg_music.duration) > 0.1:
- print(f"Warning: Audio duration mismatch before mixing (Vid: {video_audio.duration:.2f}s, BG: {bg_music.duration:.2f}s). Adjusting BG music duration.")
- bg_music = bg_music.set_duration(video_audio.duration)
-
- # Create the composite audio clip
+ video_audio = final_video.audio
mixed_audio = CompositeAudioClip([video_audio, bg_music])
-
- # Set the composite audio to the video
final_video = final_video.set_audio(mixed_audio)
- print(f"Background music added successfully (Volume: {bg_music_volume:.2f})")
-
- # Close the original bg music clip handle if possible
- bg_music.close()
- if video_audio != final_video.audio: # Close intermediate silent track if created
- video_audio.close()
-
-
+ print("Background music added successfully")
else:
- if not os.path.exists(bg_music_path):
- print("Background music file 'music.mp3' not found. Skipping background music.")
- else:
- print(f"Background music file 'music.mp3' found but is too small ({os.path.getsize(bg_music_path)} bytes). Skipping.")
+ print("No MP3 files found, skipping background music")
return final_video
except Exception as e:
print(f"Error adding background music: {e}")
- traceback.print_exc()
- print("Continuing without background music.")
- # Return the video, ensuring it has *some* audio track if possible
- if final_video.audio is None:
- # Try adding silence if no audio track exists
- try:
- print("Adding silent track as fallback after BG music error.")
- silent_audio = AudioSegment.silent(duration=int(final_video.duration * 1000))
- silent_audio_path = os.path.join(TEMP_FOLDER, "silent_fallback.wav")
- silent_audio.export(silent_audio_path, format="wav")
- final_video = final_video.set_audio(AudioFileClip(silent_audio_path))
- except Exception as silent_err:
- print(f"Failed to add silent fallback audio: {silent_err}")
+ print("Continuing without background music")
return final_video
-
-# --- create_clip Function (Incorporating Subtitle Logic) ---
def create_clip(media_path, asset_type, tts_path, duration=None, effects=None, narration_text=None, segment_index=0):
"""Create a video clip with synchronized subtitles and narration."""
- clip_start_time = time.time()
- print(f"--- Creating Clip #{segment_index+1} ---")
- print(f" Media: {asset_type} at {os.path.basename(media_path)}")
- print(f" TTS: {os.path.basename(tts_path)}")
- print(f" Narration: '{narration_text[:60]}...'")
-
- # Input validation
- if not media_path or not os.path.exists(media_path) or os.path.getsize(media_path) < 100:
- print(f"ERROR: Invalid or missing media file: {media_path}")
- return None
- if not tts_path or not os.path.exists(tts_path) or os.path.getsize(tts_path) < 100:
- print(f"ERROR: Invalid or missing TTS file: {tts_path}")
- # Attempt to use silent audio as fallback?
- print("Attempting to generate silent audio as fallback for missing TTS.")
- # Use the estimated duration from parse_script if available
- fallback_duration = duration if duration and duration > 0 else 3.0
- tts_path = generate_silent_audio(fallback_duration)
- if not tts_path:
- print("ERROR: Failed to generate fallback silent audio. Cannot create clip.")
- return None # Critical failure if no audio
-
- # Load audio first to get accurate duration
- audio_clip = None
- audio_duration = 0.0
try:
- audio_clip = AudioFileClip(tts_path)
- # Apply slight fade out to prevent abrupt cuts
- audio_clip = audio_clip.audio_fadeout(0.1)
- audio_duration = audio_clip.duration
- if audio_duration <= 0.1: # Check for very short/empty audio
- print(f"Warning: Audio duration is very short ({audio_duration:.2f}s). Using minimum 1s.")
- audio_duration = 1.0 # Ensure at least 1s duration for visuals
- # Adjust audio clip duration if needed (might not be necessary if silence was generated)
- if audio_clip.duration < 1.0:
- audio_clip = audio_clip.set_duration(1.0) # Stretch silence? Risky.
- # Better: Regenerate silence if original was too short
- print("Regenerating 1s silent audio.")
- new_silent_path = generate_silent_audio(1.0)
- if new_silent_path:
- audio_clip.close() # Close old clip
- audio_clip = AudioFileClip(new_silent_path)
- else:
- print("Error: Failed to regenerate silent audio. Clip might be very short.")
-
-
- except Exception as e:
- print(f"Error loading audio file {tts_path}: {e}")
- print("Using estimated duration and generating silence.")
- audio_duration = duration if duration and duration > 0 else 3.0
- silent_audio_path = generate_silent_audio(audio_duration)
- if not silent_audio_path:
- print("Error: Failed to generate fallback silent audio after load error.")
- return None # Cannot proceed without audio
- try:
- audio_clip = AudioFileClip(silent_audio_path)
- audio_duration = audio_clip.duration # Get duration from generated silence
- except Exception as e_silent:
- print(f"Error loading generated silent audio {silent_audio_path}: {e_silent}")
- return None
-
-
- # Add a small buffer to the target duration for visuals
- target_duration = audio_duration + 0.2 # e.g., 0.2s buffer
+ print(f"Creating clip #{segment_index} with asset_type: {asset_type}, media_path: {media_path}")
+ if not os.path.exists(media_path) or not os.path.exists(tts_path):
+ print("Missing media or TTS file")
+ return None
- print(f" Audio Duration: {audio_duration:.2f}s, Target Visual Duration: {target_duration:.2f}s")
+ audio_clip = AudioFileClip(tts_path).audio_fadeout(0.2)
+ audio_duration = audio_clip.duration
+ target_duration = audio_duration + 0.2
- # Create base visual clip (video or image)
- clip = None
- temp_img_path = None # To track temporary converted images
- try:
if asset_type == "video":
- try:
- clip = VideoFileClip(media_path, target_resolution=TARGET_RESOLUTION[:2]) # Request target size on load if possible
- # Ensure video duration is sufficient, loop/subclip as needed
- if clip.duration < target_duration:
- print(f" Looping video (duration {clip.duration:.2f}s) to match target {target_duration:.2f}s")
- # Check if loop is feasible
- if clip.duration > 0.1: # Avoid looping near-zero duration clips
- clip = clip.loop(duration=target_duration)
- else:
- print("Warning: Video duration too short to loop effectively. Freezing last frame.")
- clip = clip.to_ImageClip(t=clip.duration - 0.01 if clip.duration > 0.01 else 0).set_duration(target_duration)
- asset_type = "image" # Treat as image now for Ken Burns etc.
- else:
- # Start from beginning, take required duration
- clip = clip.subclip(0, target_duration)
-
- # Resize/crop video to fill target resolution *after* duration adjustment
- clip = resize_to_fill(clip, TARGET_RESOLUTION)
- # Apply fade-in/out to video clips too
- clip = clip.fadein(0.3).fadeout(0.3)
-
- except Exception as e:
- print(f"Error processing video file {media_path}: {e}")
- traceback.print_exc()
- # Fallback to generating a color background if video fails
- fallback_media = generate_media("abstract color", current_index=segment_index, total_segments=0) # Use a simple fallback
- if fallback_media and fallback_media.get('path'):
- print("Falling back to generated image due to video error.")
- asset_type = 'image'
- media_path = fallback_media['path']
- # Now process this as an image in the next block
- else:
- print("ERROR: Video processing failed, and fallback media generation failed.")
- if audio_clip: audio_clip.close()
- return None # Cannot proceed
-
- # This needs to handle the case where video processing failed and fell back to image
- # Or if it was an image from the start
- if asset_type == "image":
- try:
- # Validate image before creating ImageClip
- try:
- img = Image.open(media_path)
- img.verify()
- # Reopen after verify
- img = Image.open(media_path)
- # Convert to RGB if needed (ensure compatibility)
- if img.mode != 'RGB':
- print(f"Converting image {os.path.basename(media_path)} from {img.mode} to RGB.")
- # Save to a new temp file to avoid modifying original download
- temp_img_path = os.path.join(TEMP_FOLDER, f"converted_{segment_index}.jpg")
- img.convert('RGB').save(temp_img_path, quality=90)
- img.close()
- media_path_for_clip = temp_img_path
- else:
- img.close()
- media_path_for_clip = media_path # Use original if already RGB
- except Exception as img_err:
- print(f"Error validating/converting image {media_path}: {img_err}")
- if audio_clip: audio_clip.close()
- return None # Fail if image is invalid
-
- # Create ImageClip and set duration
- clip = ImageClip(media_path_for_clip).set_duration(target_duration)
-
- # Apply Ken Burns effect (which includes resizing)
- clip = apply_kenburns_effect(clip, TARGET_RESOLUTION, effect_type=effects or "random") # Use specified or random effect
-
- # Apply fade-in/out (Ken Burns function doesn't handle this)
- clip = clip.fadein(0.3).fadeout(0.3)
-
- except Exception as e:
- print(f"Error processing image file {media_path}: {e}")
- traceback.print_exc()
- if audio_clip: audio_clip.close()
- return None # Fail if image processing has critical error
-
- if clip is None:
- print("Error: Visual clip (video or image) could not be created.")
- if audio_clip: audio_clip.close()
+ clip = VideoFileClip(media_path)
+ clip = resize_to_fill(clip, TARGET_RESOLUTION)
+ if clip.duration < target_duration:
+ clip = clip.loop(duration=target_duration)
+ else:
+ clip = clip.subclip(0, target_duration)
+ elif asset_type == "image":
+ img = Image.open(media_path)
+ if img.mode != 'RGB':
+ with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as temp:
+ img.convert('RGB').save(temp.name)
+ media_path = temp.name
+ img.close()
+ clip = ImageClip(media_path).set_duration(target_duration)
+ clip = apply_kenburns_effect(clip, TARGET_RESOLUTION)
+ clip = clip.fadein(0.3).fadeout(0.3)
+ else:
return None
- # --- SUBTITLE GENERATION START ---
- if narration_text and CAPTION_COLOR != "transparent" and audio_duration > 0.1: # Avoid captions on silent/very short clips
- print(f" Adding Captions (Color: {CAPTION_COLOR}, Size: {font_size})")
- subtitle_clips = []
+ if narration_text and CAPTION_COLOR != "transparent":
try:
words = narration_text.split()
- if not words:
- print("Warning: Narration text has no words, skipping captions.")
- else:
- # Dynamic chunking: aim for ~3-7 words per chunk, max ~3 seconds per chunk
- max_words_per_chunk = 6
- min_words_per_chunk = 2 # Allow slightly shorter chunks
- max_duration_per_chunk = 3.5 # Allow slightly longer chunks
-
- chunks_data = [] # Stores (text, start_time, end_time)
- current_chunk_words = []
- current_chunk_start_time = 0.0
- approx_time_per_word = audio_duration / len(words) if len(words) > 0 else 0
-
- for i, word in enumerate(words):
- current_chunk_words.append(word)
- # Estimate end time based on word count and average time per word
- # This is approximate; actual speech timing varies.
- current_word_end_time = current_chunk_start_time + len(current_chunk_words) * approx_time_per_word
-
- # Check conditions to end the current chunk
- time_limit_reached = (current_word_end_time - current_chunk_start_time) >= max_duration_per_chunk
- word_limit_reached = len(current_chunk_words) >= max_words_per_chunk
- is_last_word = (i == len(words) - 1)
- min_words_met = len(current_chunk_words) >= min_words_per_chunk
-
- # End chunk if:
- # - It's the last word OR
- # - Word/time limit reached AND minimum words met
- if is_last_word or ((time_limit_reached or word_limit_reached) and min_words_met):
- chunk_text = ' '.join(current_chunk_words)
- # Ensure end time doesn't exceed total audio duration
- chunk_end_time = min(current_word_end_time, audio_duration)
- # Prevent zero-duration or overlapping chunks
- if chunk_end_time > current_chunk_start_time + 0.05: # Min duration 50ms
- chunks_data.append((chunk_text, current_chunk_start_time, chunk_end_time))
- # Prepare for next chunk
- current_chunk_start_time = chunk_end_time
- current_chunk_words = []
- else:
- # If chunk is too short, try adding the next word (unless it was the last)
- if not is_last_word:
- print(f"Skipping very short subtitle chunk: '{chunk_text}'")
- # Reset start time for next chunk if we skipped
- current_chunk_start_time = chunk_end_time
- current_chunk_words = []
-
-
- # Calculate subtitle position (e.g., 85% down the screen)
- subtitle_y_position = int(TARGET_RESOLUTION[1] * 0.85) # Lower position
-
- # Create TextClip for each valid chunk
- for chunk_text, start_time, end_time in chunks_data:
- chunk_duration = end_time - start_time
- if chunk_duration <= 0.05: continue # Skip tiny duration chunks
-
- try:
- # Use global font_size here
- # Ensure font is available or handle gracefully
- font_name = 'Arial-Bold' # Check if this font exists on the system
- # font_name = 'Liberation-Sans-Bold' # Common Linux alternative
- txt_clip = TextClip(
- txt=chunk_text,
- fontsize=font_size, # Use global variable
- font=font_name,
- color=CAPTION_COLOR,
- bg_color='rgba(0, 0, 0, 0.5)', # Slightly darker background
- method='caption', # Handles word wrapping
- align='center',
- stroke_color='black', # Black stroke for better contrast
- stroke_width=1.5,
- # Adjust size: 85% of width, height automatic
- size=(TARGET_RESOLUTION[0] * 0.85, None)
- ).set_start(start_time).set_duration(chunk_duration) # Use duration
-
- # Position the text clip
- txt_clip = txt_clip.set_position(('center', subtitle_y_position))
- subtitle_clips.append(txt_clip)
- except Exception as e_textclip:
- # Handle potential errors from TextClip generation (e.g., font not found)
- print(f"ERROR creating TextClip for chunk '{chunk_text}': {e_textclip}")
- print("Check if ImageMagick is installed and configured, and if the font is available.")
- # Fallback to simple text? Or skip this chunk? Skipping for now.
-
- # Overlay the list of subtitle clips onto the main video/image clip
- if subtitle_clips:
- clip = CompositeVideoClip([clip] + subtitle_clips)
- print(f" Added {len(subtitle_clips)} subtitle chunks.")
- else:
- print("Warning: No subtitle clips were generated despite text being present.")
-
- except Exception as sub_error:
- # Fallback: If complex chunking/styling fails, display the whole text simply
- print(f"ERROR during subtitle generation: {sub_error}. Using fallback simple text.")
- traceback.print_exc()
- try:
+ chunks = []
+ current_chunk = []
+ for word in words:
+ current_chunk.append(word)
+ if len(current_chunk) >= 5:
+ chunks.append(' '.join(current_chunk))
+ current_chunk = []
+ if current_chunk:
+ chunks.append(' '.join(current_chunk))
+
+ chunk_duration = audio_duration / len(chunks)
+ subtitle_clips = []
+ subtitle_y_position = int(TARGET_RESOLUTION[1] * 0.70)
+
+ for i, chunk_text in enumerate(chunks):
+ start_time = i * chunk_duration
+ end_time = (i + 1) * chunk_duration
txt_clip = TextClip(
- narration_text,
- fontsize=int(font_size * 0.8), # Slightly smaller for full text
+ chunk_text,
+ fontsize=45,
+ font='Arial-Bold',
color=CAPTION_COLOR,
- font='Arial', # Simpler font for fallback
- align='center',
+ bg_color='rgba(0, 0, 0, 0.25)',
method='caption',
- bg_color='rgba(0, 0, 0, 0.5)',
- size=(TARGET_RESOLUTION[0] * 0.8, None) # Max width
- ).set_position(('center', subtitle_y_position)).set_duration(clip.duration) # Show for full clip duration
- # Overlay the single fallback text clip
- clip = CompositeVideoClip([clip, txt_clip])
- except Exception as e_fallback_text:
- print(f"ERROR creating fallback TextClip: {e_fallback_text}")
- # Proceed without captions if fallback also fails
-
- # --- SUBTITLE GENERATION END ---
-
- # Set the audio track to the final clip
- if audio_clip:
- clip = clip.set_audio(audio_clip)
- else:
- print("Error: No valid audio_clip available to set.")
- # Clip will be silent, which might be acceptable if TTS failed utterly.
-
-
- # Final duration check/adjustment (important after compositing)
- # Set duration based on the *audio* clip's duration + buffer, as visuals might be longer
- if abs(clip.duration - target_duration) > 0.1:
- print(f"Warning: Final clip duration ({clip.duration:.2f}s) differs significantly from target ({target_duration:.2f}s). Forcing duration.")
- clip = clip.set_duration(target_duration)
-
- clip_creation_duration = time.time() - clip_start_time
- print(f"--- Clip #{segment_index+1} created successfully (Duration: {clip.duration:.2f}s) [Took {clip_creation_duration:.1f}s] ---")
-
- # Clean up temporary converted image file if created
- if temp_img_path and os.path.exists(temp_img_path):
- try: os.remove(temp_img_path)
- except OSError: pass
-
+ align='center',
+ stroke_width=2,
+ stroke_color=CAPTION_COLOR,
+ size=(TARGET_RESOLUTION[0] * 0.8, None)
+ ).set_start(start_time).set_end(end_time)
+ txt_clip = txt_clip.set_position(('center', subtitle_y_position))
+ subtitle_clips.append(txt_clip)
+
+ clip = CompositeVideoClip([clip] + subtitle_clips)
+ except Exception as sub_error:
+ print(f"Subtitle error: {sub_error}")
+ txt_clip = TextClip(
+ narration_text,
+ fontsize=font_size,
+ color=CAPTION_COLOR,
+ align='center',
+ size=(TARGET_RESOLUTION[0] * 0.7, None)
+ ).set_position(('center', int(TARGET_RESOLUTION[1] / 3))).set_duration(clip.duration)
+ clip = CompositeVideoClip([clip, txt_clip])
+
+ clip = clip.set_audio(audio_clip)
+ print(f"Clip created: {clip.duration:.1f}s")
return clip
-
except Exception as e:
- print(f"*************** FATAL ERROR in create_clip (Segment {segment_index+1}) ***************")
- traceback.print_exc() # Print detailed traceback
- print(f"Error details: {str(e)}")
- print(f" Media Path: {media_path}")
- print(f" TTS Path: {tts_path}")
- print(f" Asset Type: {asset_type}")
- print("**************************************************************************")
- # Clean up resources if possible
- if 'clip' in locals() and clip is not None and hasattr(clip, 'close'): clip.close()
- if audio_clip is not None and hasattr(audio_clip, 'close'): audio_clip.close()
- if temp_img_path and os.path.exists(temp_img_path):
- try: os.remove(temp_img_path)
- except OSError: pass
- return None # Return None on failure
-
+ print(f"Error in create_clip: {str(e)}")
+ return None
def fix_imagemagick_policy():
- """Attempts to fix ImageMagick security policies on Linux systems."""
- # This is often needed for TextClip with complex features (backgrounds, strokes) on Colab/Linux.
- # It might require sudo privileges.
- policy_fixed = False
- if os.name != 'posix': # Only run on Linux/macOS etc.
- print("Skipping ImageMagick policy fix (not on POSIX system).")
- return False
-
+ """Fix ImageMagick security policies."""
try:
- print("Attempting to fix ImageMagick security policies (may require sudo)...")
- # Common paths for ImageMagick policy files
+ print("Attempting to fix ImageMagick security policies...")
policy_paths = [
"/etc/ImageMagick-6/policy.xml",
"/etc/ImageMagick-7/policy.xml",
"/etc/ImageMagick/policy.xml",
- "/usr/local/etc/ImageMagick-7/policy.xml",
- "/opt/homebrew/etc/ImageMagick-7/policy.xml", # macOS Homebrew path
+ "/usr/local/etc/ImageMagick-7/policy.xml"
]
- found_policy = None
- for path in policy_paths:
- if os.path.exists(path):
- found_policy = path
- break
-
+ found_policy = next((path for path in policy_paths if os.path.exists(path)), None)
if not found_policy:
- print("ImageMagick policy.xml not found in common locations. Skipping policy fix.")
- print("TextClip features might be limited if default policies are restrictive.")
- return False # Indicate policy wasn't found/fixed
+ print("No policy.xml found. Using alternative subtitle method.")
+ return False
+ print(f"Modifying policy file at {found_policy}")
+ os.system(f"sudo cp {found_policy} {found_policy}.bak")
+ os.system(f"sudo sed -i 's/rights=\"none\"/rights=\"read|write\"/g' {found_policy}")
+ os.system(f"sudo sed -i 's/]*>/]*>//g' {found_policy}")
+ print("ImageMagick policies updated successfully.")
+ return True
+ except Exception as e:
+ print(f"Error fixing policies: {e}")
+ return False
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
- # Check if modification is needed (simple check for common restriction)
- needs_fix = False
- try:
- with open(found_policy, 'r') as f:
- content = f.read()
- # Look for common restrictive patterns that TextClip might hit
- if 'rights="none" pattern="LABEL"' in content or \
- 'rights="none" pattern="caption"' in content or \
- 'rights="none" pattern="TEXT"' in content or \
- '' in content:
- needs_fix = True
- except Exception as read_err:
- print(f"Could not read policy file {found_policy} to check if fix is needed: {read_err}")
- needs_fix = True # Assume fix is needed if we can't read it
-
- if not needs_fix:
- print(f"Policy file {found_policy} seems okay or already modified. Skipping modification.")
- return True # Assume it's okay
-
- print(f"Found policy file: {found_policy}. Attempting to modify...")
-
- # Commands to relax restrictions (use with caution)
- # Backup the original file first
- backup_cmd = f"sudo cp '{found_policy}' '{found_policy}.bak'"
- # Allow read/write for formats used by TextClip and path operations
- # Use simpler sed commands that are less likely to fail on different sed versions
- sed_commands = [
- f"sudo sed -i.bak 's/rights=\"none\" pattern=\"PS\"/rights=\"read|write\" pattern=\"PS\"/g' '{found_policy}'",
- f"sudo sed -i.bak 's/rights=\"none\" pattern=\"EPS\"/rights=\"read|write\" pattern=\"EPS\"/g' '{found_policy}'",
- f"sudo sed -i.bak 's/rights=\"none\" pattern=\"PDF\"/rights=\"read|write\" pattern=\"PDF\"/g' '{found_policy}'",
- f"sudo sed -i.bak 's/rights=\"none\" pattern=\"XPS\"/rights=\"read|write\" pattern=\"XPS\"/g' '{found_policy}'",
- f"sudo sed -i.bak 's/rights=\"none\" pattern=\"LABEL\"/rights=\"read|write\" pattern=\"LABEL\"/g' '{found_policy}'",
- f"sudo sed -i.bak 's/rights=\"none\" pattern=\"caption\"/rights=\"read|write\" pattern=\"caption\"/g' '{found_policy}'",
- f"sudo sed -i.bak 's/rights=\"none\" pattern=\"TEXT\"/rights=\"read|write\" pattern=\"TEXT\"/g' '{found_policy}'",
- f"sudo sed -i.bak 's///g' '{found_policy}'" # Handle path policy
- ]
- print("Executing policy modification commands (requires sudo)...")
- # Try backup first
- print(f"Executing: {backup_cmd}")
- backup_status = os.system(backup_cmd)
- if backup_status != 0:
- print(f"Warning: Failed to backup policy file (Exit code: {backup_status}). Proceeding without backup...")
- # Modify sed commands to not create individual backups if main backup failed
- sed_commands = [cmd.replace("-i.bak", "-i") for cmd in sed_commands]
- # Execute sed commands one by one
- all_sed_ok = True
- for cmd in sed_commands:
- print(f"Executing: {cmd}")
- status = os.system(cmd)
- if status != 0:
- print(f"Warning: Sed command failed (Exit code: {status}). Policy might not be fully fixed.")
- # Don't necessarily stop, maybe other commands worked
- # all_sed_ok = False # Uncomment if any failure should mark the fix as failed
- # Check the outcome loosely
- # We can't be certain without parsing, but if commands ran without error codes, assume it worked.
- # A more robust check would re-read the file.
- print("ImageMagick policy modification commands executed.")
- policy_fixed = True # Assume success if commands ran
- # Optional: Restart services if needed (usually not required just for policy changes)
- # os.system("sudo systemctl restart imagemagick") # Example
- return policy_fixed
- except Exception as e:
- print(f"Error occurred during ImageMagick policy fix: {e}")
- traceback.print_exc()
- return False
# ---------------- Main Video Generation Function ---------------- #
def generate_video(user_input, resolution, caption_option):
- """Generate a video based on user input via Gradio. Uses global settings."""
+ """Generate a video based on user input via Gradio."""
global TARGET_RESOLUTION, CAPTION_COLOR, TEMP_FOLDER
-
- start_time = time.time()
- print("\n=============================================")
- print("======= STARTING VIDEO GENERATION =======")
- print(f" Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S')}")
- print(f" Concept: '{user_input[:100]}...'")
- print(f" Resolution: {resolution}")
- print(f" Captions: {caption_option}")
- print(f" Voice: {selected_voice} (Speed: {voice_speed})")
- print(f" BG Music Vol: {bg_music_volume}, FPS: {fps}, Preset: {preset}")
- print(f" Video Clip Prob: {video_clip_probability*100:.0f}%, Caption Size: {font_size}")
- print("=============================================\n")
-
-
- # --- Pre-checks ---
- if not OPENROUTER_API_KEY or OPENROUTER_API_KEY == "YOUR_OPENROUTER_API_KEY_HERE":
- print("FATAL ERROR: OpenRouter API Key is missing or still a placeholder!")
- raise gr.Error("OpenRouter API Key is not configured. Please set it in the script or environment.")
- if not PEXELS_API_KEY or PEXELS_API_KEY == "YOUR_PEXELS_API_KEY_HERE":
- print("FATAL ERROR: Pexels API Key is missing or still a placeholder!")
- raise gr.Error("Pexels API Key is not configured. Please set it in the script or environment.")
- if pipeline is None:
- print("FATAL ERROR: Kokoro TTS pipeline failed to initialize.")
- raise gr.Error("TTS engine (Kokoro) failed to initialize. Cannot proceed.")
-
-
- # --- Setup ---
+
# Set resolution
- if resolution == "Full": # 16:9 Landscape
+ if resolution == "Full":
TARGET_RESOLUTION = (1920, 1080)
- elif resolution == "Short": # 9:16 Portrait
+ elif resolution == "Short":
TARGET_RESOLUTION = (1080, 1920)
else:
- print(f"Warning: Unknown resolution '{resolution}'. Defaulting to Full HD (1920x1080).")
- TARGET_RESOLUTION = (1920, 1080)
+ TARGET_RESOLUTION = (1920, 1080) # Default
- # Set caption color based on user choice
+ # Set caption color
CAPTION_COLOR = "white" if caption_option == "Yes" else "transparent"
- # Create a unique temporary folder for this run
- try:
- # Use a more descriptive temp dir name if possible
- base_temp_dir = os.path.join(os.getcwd(), "temp_video_gen")
- os.makedirs(base_temp_dir, exist_ok=True)
- TEMP_FOLDER = tempfile.mkdtemp(prefix=f"{time.strftime('%Y%m%d_%H%M%S')}_", dir=base_temp_dir)
- print(f"Temporary folder created: {TEMP_FOLDER}")
- except Exception as e:
- print(f"FATAL ERROR: Could not create temporary folder: {e}")
- traceback.print_exc()
- # Cannot proceed without temp folder
- raise gr.Error(f"Failed to create temporary directory: {e}")
-
+ # Create a unique temporary folder
+ TEMP_FOLDER = tempfile.mkdtemp()
- # Fix ImageMagick policy (important for captions)
+ # Fix ImageMagick policy
fix_success = fix_imagemagick_policy()
if not fix_success:
- print("Continuing without guaranteed ImageMagick policy fix. Captions might have issues.")
- # --- End Setup ---
+ print("Will use alternative methods if needed")
-
- # --- Script Generation ---
- print("\n--- Generating Script ---")
+ print("Generating script from API...")
script = generate_script(user_input)
if not script:
- print("FATAL ERROR: Failed to generate script from API.")
- if TEMP_FOLDER and os.path.isdir(TEMP_FOLDER): shutil.rmtree(TEMP_FOLDER) # Clean up
- raise gr.Error("Failed to generate script from API. Check logs and API key.")
- print(f"Generated Script:\n{'-'*20}\n{script}\n{'-'*20}") # Print the full script for debugging
- # --- End Script Generation ---
-
-
- # --- Script Parsing ---
- print("\n--- Parsing Script ---")
+ print("Failed to generate script.")
+ shutil.rmtree(TEMP_FOLDER)
+ return None
+ print("Generated Script:\n", script)
elements = parse_script(script)
if not elements:
- print("FATAL ERROR: Failed to parse script into elements. Check script format and parsing logic.")
- if TEMP_FOLDER and os.path.isdir(TEMP_FOLDER): shutil.rmtree(TEMP_FOLDER)
- raise gr.Error("Failed to parse the generated script. Check script format and logs.")
- num_segments = len(elements) // 2
- print(f"Parsed {num_segments} script segments.")
- if num_segments == 0:
- print("Warning: Script parsed into 0 segments. No video will be generated.")
- if TEMP_FOLDER and os.path.isdir(TEMP_FOLDER): shutil.rmtree(TEMP_FOLDER)
- # Return None instead of raising error? Or show message?
- return None # Indicate no video was created
- # --- End Script Parsing ---
-
-
- # --- Pair Elements (Media + TTS) ---
- paired_elements = []
- if len(elements) % 2 != 0:
- print(f"Warning: Odd number of elements ({len(elements)}) after parsing. Last element might be ignored.")
+ print("Failed to parse script into elements.")
+ shutil.rmtree(TEMP_FOLDER)
+ return None
+ print(f"Parsed {len(elements)//2} script segments.")
- for i in range(0, len(elements) - 1, 2): # Iterate up to second-to-last element
- if elements[i]['type'] == 'media' and elements[i+1]['type'] == 'tts':
+ paired_elements = []
+ for i in range(0, len(elements), 2):
+ if i + 1 < len(elements):
paired_elements.append((elements[i], elements[i + 1]))
- else:
- print(f"Warning: Skipping invalid element pair at index {i}. Expected media then tts, got {elements[i]['type']} then {elements[i+1]['type']}.")
if not paired_elements:
- print("FATAL ERROR: No valid media-tts pairs found after parsing.")
- if TEMP_FOLDER and os.path.isdir(TEMP_FOLDER): shutil.rmtree(TEMP_FOLDER)
- raise gr.Error("Script parsed, but no valid [Scene]-Narration pairs found.")
- # --- End Pairing ---
-
+ print("No valid script segments found.")
+ shutil.rmtree(TEMP_FOLDER)
+ return None
- # --- Clip Creation Loop ---
- print(f"\n--- Creating {len(paired_elements)} Individual Clips ---")
clips = []
- successful_clips = 0
- clip_paths_to_clean = [] # Keep track of intermediate files for cleanup if needed
-
for idx, (media_elem, tts_elem) in enumerate(paired_elements):
- segment_start_time = time.time()
- print(f"\n>>> Processing Segment {idx+1}/{len(paired_elements)}: Prompt '{media_elem.get('prompt', 'N/A')}'")
-
- # 1. Generate Media Asset
- media_asset = generate_media(
- media_elem['prompt'],
- current_index=idx,
- total_segments=len(paired_elements)
- )
- if not media_asset or not media_asset.get('path'):
- print(f"ERROR: Failed to generate media for segment {idx+1}. Skipping segment.")
- continue # Skip this segment
- clip_paths_to_clean.append(media_asset['path']) # Add for potential cleanup
-
- # 2. Generate TTS
+ print(f"\nProcessing segment {idx+1}/{len(paired_elements)} with prompt: '{media_elem['prompt']}'")
+ media_asset = generate_media(media_elem['prompt'], current_index=idx, total_segments=len(paired_elements))
+ if not media_asset:
+ print(f"Skipping segment {idx+1} due to missing media asset.")
+ continue
tts_path = generate_tts(tts_elem['text'], tts_elem['voice'])
if not tts_path:
- print(f"ERROR: Failed to generate TTS for segment {idx+1}. Skipping segment.")
- continue # Skip this segment
- clip_paths_to_clean.append(tts_path) # Add for potential cleanup
-
- # 3. Create the Clip (Visual + Audio + Subtitles)
+ print(f"Skipping segment {idx+1} due to TTS generation failure.")
+ continue
clip = create_clip(
media_path=media_asset['path'],
asset_type=media_asset['asset_type'],
tts_path=tts_path,
- duration=tts_elem.get('duration'), # Pass estimated duration for potential fallback
- effects=media_elem.get('effects'),
+ duration=tts_elem['duration'],
+ effects=media_elem.get('effects', 'fade-in'),
narration_text=tts_elem['text'],
segment_index=idx
)
-
if clip:
- # Validate clip duration and dimensions before adding
- if clip.duration > 0.1 and clip.w == TARGET_RESOLUTION[0] and clip.h == TARGET_RESOLUTION[1]:
- clips.append(clip)
- successful_clips += 1
- segment_duration = time.time() - segment_start_time
- print(f">>> Segment {idx+1} processed successfully. [Took {segment_duration:.1f}s]")
- else:
- print(f"ERROR: Clip for segment {idx+1} has invalid duration ({clip.duration:.2f}s) or dimensions ({clip.w}x{clip.h}). Expected {TARGET_RESOLUTION[0]}x{TARGET_RESOLUTION[1]}. Skipping.")
- # Clean up resources associated with the failed clip
- if hasattr(clip, 'close'): clip.close()
+ clips.append(clip)
else:
- print(f"ERROR: Clip creation failed for segment {idx+1}. See errors above.")
-
- # --- End Clip Creation Loop ---
-
+ print(f"Clip creation failed for segment {idx+1}.")
- # --- Final Video Assembly ---
if not clips:
- print("\nFATAL ERROR: No clips were successfully created. Cannot generate video.")
- if TEMP_FOLDER and os.path.isdir(TEMP_FOLDER): shutil.rmtree(TEMP_FOLDER)
- raise gr.Error("Failed to create any video clips from the script segments.")
+ print("No clips were successfully created.")
+ shutil.rmtree(TEMP_FOLDER)
+ return None
- print(f"\n--- Assembling Final Video ({len(clips)} clips) ---")
- final_video = None # Initialize to ensure cleanup happens
- try:
- # Concatenate clips
- print("Concatenating clips...")
- final_video = concatenate_videoclips(clips, method="compose") # 'compose' is generally safer
- print(f"Clips concatenated. Total duration before music: {final_video.duration:.2f}s")
-
- # Add background music
- print("Adding background music (if provided)...")
- final_video = add_background_music(final_video, bg_music_volume=bg_music_volume)
-
- # Write the final video file
- print(f"Exporting final video to '{OUTPUT_VIDEO_FILENAME}' (FPS: {fps}, Preset: {preset})...")
- # Use threads based on CPU count? Maybe default is fine. logger='bar' for progress bar
- # Ensure output directory exists if OUTPUT_VIDEO_FILENAME includes a path
- output_dir = os.path.dirname(OUTPUT_VIDEO_FILENAME)
- if output_dir and not os.path.exists(output_dir):
- os.makedirs(output_dir)
-
- final_video.write_videofile(
- OUTPUT_VIDEO_FILENAME,
- codec='libx264', # Common, good quality codec
- audio_codec='aac', # Common audio codec
- fps=fps,
- preset=preset, # Controls encoding speed vs compression
- threads=os.cpu_count() or 4, # Use available cores or default to 4
- logger='bar', # Show progress bar in console
- ffmpeg_params=["-movflags", "+faststart"] # Optimize for web streaming
- )
- print(f"Final video saved successfully as {OUTPUT_VIDEO_FILENAME}")
+ print("\nConcatenating clips...")
+ final_video = concatenate_videoclips(clips, method="compose")
+ final_video = add_background_music(final_video, bg_music_volume=bg_music_volume)
+ print(f"Exporting final video to {OUTPUT_VIDEO_FILENAME}...")
+ final_video.write_videofile(OUTPUT_VIDEO_FILENAME, codec='libx264', fps=fps, preset=preset)
+ print(f"Final video saved as {OUTPUT_VIDEO_FILENAME}")
- except Exception as e:
- print(f"FATAL ERROR during final video assembly or writing: {e}")
- traceback.print_exc()
- if TEMP_FOLDER and os.path.isdir(TEMP_FOLDER): shutil.rmtree(TEMP_FOLDER) # Clean up on error
- raise gr.Error(f"Error during final video creation: {e}")
- finally:
- # --- Resource Cleanup (Clips) ---
- # Crucial to close clips to release file handles, especially on Windows
- print("Closing individual clip resources...")
- for i, clip in enumerate(clips):
- try:
- if hasattr(clip, 'close'):
- clip.close()
- # Also close audio if it's separate and hasn't been closed yet
- if hasattr(clip, 'audio') and clip.audio is not None and hasattr(clip.audio, 'close'):
- clip.audio.close()
- except Exception as e_close:
- print(f"Warning: Error closing clip {i}: {e_close}")
- if final_video is not None and hasattr(final_video, 'close'):
- try:
- final_video.close()
- print("Closed final video resource.")
- except Exception as e_final_close:
- print(f"Warning: Error closing final video resource: {e_final_close}")
- # --- End Resource Cleanup ---
-
+ # Clean up
+ print("Cleaning up temporary files...")
+ shutil.rmtree(TEMP_FOLDER)
+ print("Temporary files removed.")
- # --- Temp Folder Cleanup ---
- print("\n--- Cleaning Up Temporary Files ---")
- try:
- if TEMP_FOLDER and os.path.isdir(TEMP_FOLDER):
- shutil.rmtree(TEMP_FOLDER)
- print(f"Temporary folder removed: {TEMP_FOLDER}")
- else:
- print("Temporary folder not found or already removed.")
- except Exception as e:
- print(f"Warning: Could not remove temporary folder {TEMP_FOLDER}: {e}")
- print("Manual cleanup might be required.")
- # --- End Cleanup ---
-
- end_time = time.time()
- total_time = end_time - start_time
- print("\n=============================================")
- print("======= VIDEO GENERATION COMPLETE =======")
- print(f" Total time: {total_time:.2f} seconds")
- print(f" Output file: {OUTPUT_VIDEO_FILENAME}")
- print("=============================================\n")
-
- # Return the path to the generated video file
return OUTPUT_VIDEO_FILENAME
-
-# ---------------- Gradio Interface Setup ---------------- #
-
-# Dictionary mapping user-friendly names to Kokoro voice IDs
+# ---------------- Gradio Interface ---------------- #
VOICE_CHOICES = {
- 'Emma (Female)': 'af_heart', 'Bella (Female)': 'af_bella', 'Nicole (Female)': 'af_nicole',
- 'Aoede (Female)': 'af_aoede', 'Kore (Female)': 'af_kore', 'Sarah (Female)': 'af_sarah',
- 'Nova (Female)': 'af_nova', 'Sky (Female)': 'af_sky', 'Alloy (Female)': 'af_alloy',
- 'Jessica (Female)': 'af_jessica', 'River (Female)': 'af_river',
- 'Michael (Male)': 'am_michael', 'Fenrir (Male)': 'am_fenrir', 'Puck (Male)': 'am_puck',
- 'Echo (Male)': 'am_echo', 'Eric (Male)': 'am_eric', 'Liam (Male)': 'am_liam',
- 'Onyx (Male)': 'am_onyx', 'Santa (Male)': 'am_santa', 'Adam (Male)': 'am_adam',
- 'Emma đŦđ§ (Female)': 'bf_emma', 'Isabella đŦđ§ (Female)': 'bf_isabella', 'Alice đŦđ§ (Female)': 'bf_alice',
- 'Lily đŦđ§ (Female)': 'bf_lily', 'George đŦđ§ (Male)': 'bm_george', 'Fable đŦđ§ (Male)': 'bm_fable',
- 'Lewis đŦđ§ (Male)': 'bm_lewis', 'Daniel đŦđ§ (Male)': 'bm_daniel'
+ 'Emma (Female)': 'af_heart',
+ 'Bella (Female)': 'af_bella',
+ 'Nicole (Female)': 'af_nicole',
+ 'Aoede (Female)': 'af_aoede',
+ 'Kore (Female)': 'af_kore',
+ 'Sarah (Female)': 'af_sarah',
+ 'Nova (Female)': 'af_nova',
+ 'Sky (Female)': 'af_sky',
+ 'Alloy (Female)': 'af_alloy',
+ 'Jessica (Female)': 'af_jessica',
+ 'River (Female)': 'af_river',
+ 'Michael (Male)': 'am_michael',
+ 'Fenrir (Male)': 'am_fenrir',
+ 'Puck (Male)': 'am_puck',
+ 'Echo (Male)': 'am_echo',
+ 'Eric (Male)': 'am_eric',
+ 'Liam (Male)': 'am_liam',
+ 'Onyx (Male)': 'am_onyx',
+ 'Santa (Male)': 'am_santa',
+ 'Adam (Male)': 'am_adam',
+ 'Emma đŦđ§ (Female)': 'bf_emma',
+ 'Isabella đŦđ§ (Female)': 'bf_isabella',
+ 'Alice đŦđ§ (Female)': 'bf_alice',
+ 'Lily đŦđ§ (Female)': 'bf_lily',
+ 'George đŦđ§ (Male)': 'bm_george',
+ 'Fable đŦđ§ (Male)': 'bm_fable',
+ 'Lewis đŦđ§ (Male)': 'bm_lewis',
+ 'Daniel đŦđ§ (Male)': 'bm_daniel'
}
-def generate_video_with_options(user_input, resolution, caption_option, music_file, voice, vclip_prob, bg_vol, video_fps, video_preset, v_speed, caption_size, progress=gr.Progress(track_ ĪĪĪÎĩ=True)):
- """Wrapper function for Gradio to set global options before calling generate_video."""
- # Use Gradio progress tracker if needed (though detailed logs are in console)
- progress(0, desc="Initializing...")
-
+def generate_video_with_options(user_input, resolution, caption_option, music_file, voice, vclip_prob, bg_vol, video_fps, video_preset, v_speed, caption_size):
global selected_voice, voice_speed, font_size, video_clip_probability, bg_music_volume, fps, preset
-
- print("--- Updating Settings from Gradio ---")
- # Update global variables with user selections from Gradio interface
- selected_voice = VOICE_CHOICES.get(voice, 'af_heart') # Get voice ID, default if key not found
+
+ # Update global variables with user selections
+ selected_voice = VOICE_CHOICES[voice]
voice_speed = v_speed
font_size = caption_size
- video_clip_probability = vclip_prob / 100.0 # Convert percentage to decimal
+ video_clip_probability = vclip_prob / 100 # Convert from percentage to decimal
bg_music_volume = bg_vol
fps = video_fps
preset = video_preset
-
- # Handle music upload: Copy uploaded file to a standard name 'music.mp3'
- target_music_path = "music.mp3"
- # Remove previous music file if it exists
- if os.path.exists(target_music_path):
- try:
- os.remove(target_music_path)
- print(f"Removed previous '{target_music_path}'")
- except OSError as e:
- print(f"Warning: Could not remove previous music file: {e}")
-
+
+ # Handle music upload
if music_file is not None:
- # music_file is the path to the temporary uploaded file when type='filepath'
- if isinstance(music_file, str) and os.path.exists(music_file):
- try:
- shutil.copy(music_file, target_music_path)
- print(f"Uploaded music '{os.path.basename(music_file)}' copied to '{target_music_path}'")
- except Exception as e:
- print(f"Error copying uploaded music file: {e}")
- # Continue without background music if copy fails
- gr.Warning("Failed to copy background music file.") # Show warning in UI
- else:
- print(f"Invalid music file object received: {music_file}")
- gr.Warning("Received invalid background music file.")
- else:
- print("No background music file uploaded.")
-
- # --- Call the main video generation function ---
- # Wrap in try...except to catch errors and report them via Gradio
- try:
- # Update progress description
- progress(0.1, desc="Generating script...") # Example progress update
-
- # Note: generate_video itself prints detailed logs to console.
- # Adding more progress steps here requires modifying generate_video
- # to accept the progress object and call progress.update() internally.
- # For simplicity, we rely on console logs for detailed progress.
-
- video_path = generate_video(user_input, resolution, caption_option)
-
- # Check if video generation failed (returned None)
- if video_path is None or not os.path.exists(video_path):
- print("Video generation function returned None or file does not exist.")
- raise gr.Error("Video generation process completed, but the final video file was not created successfully. Please check the console logs for errors.")
-
- progress(1.0, desc="Video Ready!")
- gr.Info("Video generation complete!")
- return video_path
-
- except gr.Error as e:
- # Re-raise Gradio errors directly to show them in the UI
- print(f"Gradio Error: {e}")
- raise e
- except Exception as e:
- # Catch any other unexpected errors during generation
- print(f"An unexpected error occurred in generate_video_with_options: {e}")
- traceback.print_exc()
- # Raise a Gradio error for unexpected issues
- raise gr.Error(f"An unexpected error occurred during video generation: {str(e)}. Check console logs for details.")
-
-
-# Create the Gradio interface definition using Blocks API
-with gr.Blocks(theme=gr.themes.Soft(primary_hue=gr.themes.colors.blue)) as iface:
- gr.Markdown(
- """
- # đ¤ AI Documentary Video Generator đŦ
- Create short, funny documentary-style videos with AI narration and stock footage.
- Customize voice, music, captions, and more!
- """
- )
-
- with gr.Tab("đŦ Create Video"):
- with gr.Row():
- with gr.Column(scale=2):
- user_input = gr.Textbox(
- label="đ Video Concept / Script",
- placeholder="Enter your video topic (e.g., 'Top 5 facts about cats') or paste a full script formatted like the example...",
- lines=5,
- info="Provide a topic for AI script generation or paste your own formatted script."
- )
- with gr.Accordion("Example Script Format", open=False):
- gr.Markdown(
- """
- ```
- [Cats]
- Cats: tiny ninjas plotting world domination from fluffy pillows.
- [Sleeping]
- They sleep 23 hours a day, conserving energy for midnight zoomies.
- [Boxes]
- Their mortal enemy? The empty box. It must be investigated and sat in.
- [Judgement]
- Silently judging your life choices from atop the bookshelf.
- [Subscribe]
- Subscribe now, or a cat will knock something off your table. Purrhaps.
- ```
- **Rules:**
- - Start each scene with `[Search Term]` (1-2 words for Pexels).
- - Follow with 5-15 words of narration.
- - Keep it funny and conversational.
- - End with a subscribe line related to the topic.
- """
- )
- with gr.Column(scale=1):
- resolution = gr.Radio(["Full", "Short"], label="đ Resolution", value="Full", info="Full=16:9 (YouTube), Short=9:16 (TikTok/Reels)")
- caption_option = gr.Radio(["Yes", "No"], label="âī¸ Add Captions?", value="Yes")
- music_file = gr.File(
- label="đĩ Upload Background Music (Optional MP3)",
- file_types=[".mp3"],
- type="filepath" # Corrected type
- )
-
- with gr.Accordion("âī¸ Advanced Settings", open=False):
- with gr.Row():
- voice = gr.Dropdown(choices=list(VOICE_CHOICES.keys()), label="đŖī¸ Choose Voice", value="Emma (Female)")
- v_speed = gr.Slider(minimum=0.5, maximum=1.5, value=0.9, step=0.05, label="đ¨ Voice Speed", info="0.5=Slow, 1.0=Normal, 1.5=Fast")
- with gr.Row():
- caption_size = gr.Slider(minimum=20, maximum=100, value=45, step=1, label="đĄ Caption Font Size")
- vclip_prob = gr.Slider(minimum=0, maximum=100, value=25, step=5, label="đī¸ Video Clip %", info="Chance of using a video clip instead of an image for a scene.")
- with gr.Row():
- bg_vol = gr.Slider(minimum=0.0, maximum=1.0, value=0.08, step=0.01, label="đ BG Music Volume", info="0.0=Silent, 1.0=Full Volume")
- video_fps = gr.Slider(minimum=15, maximum=60, value=30, step=1, label="đŦ Video FPS")
- video_preset = gr.Dropdown(
- choices=["ultrafast", "superfast", "veryfast", "faster", "fast", "medium", "slow", "slower", "veryslow"],
- value="veryfast", label="âī¸ Export Quality/Speed", info="Faster presets = lower quality/size, Slower presets = higher quality/size."
- )
-
- submit_button = gr.Button("⨠Generate Video â¨", variant="primary", scale=1)
- output_video = gr.Video(label="Generated Video", scale=3) # Make video output larger
-
- # Define the action when the button is clicked
- submit_button.click(
- fn=generate_video_with_options,
- inputs=[
- user_input, resolution, caption_option, music_file, voice,
- vclip_prob, bg_vol, video_fps, video_preset, v_speed, caption_size
- ],
- outputs=output_video
- )
-
- with gr.Tab("âšī¸ Notes & Tips"):
- gr.Markdown(
- """
- ### Important Notes:
- * **API Keys:** Ensure your Pexels and OpenRouter API keys are correctly set at the top of the script or as environment variables.
- * **Dependencies:** Make sure all required libraries (`moviepy`, `kokoro`, `gTTS`, `requests`, `pydub`, `pillow`, `gradio`, `numpy`, `soundfile`, `bs4`, `pysrt`) are installed. You might also need `ffmpeg` and `imagemagick` installed on your system.
- * **ImageMagick:** For captions with backgrounds/strokes to work reliably (especially on Linux/Colab), ImageMagick needs to be installed and its policy file might need adjustment (the script attempts this with `sudo`).
- * **Performance:** Video generation can be CPU and time-intensive. Generation times of several minutes are normal. Check the console/terminal output for detailed progress and potential errors.
- * **Stock Footage:** The quality and relevance of stock footage depend on the search terms in your script (`[Search Term]`) and Pexels/Google search results. Keep search terms general but descriptive.
- * **Error Handling:** If generation fails, check the console output for specific error messages from API calls, file downloads, or video processing steps.
-
- ### Tips:
- * Start with simple topics to test the workflow.
- * Use the "Example Script Format" as a guide for your own scripts.
- * Experiment with different voices and speeds.
- * Adjust the "Video Clip %" slider to control the mix of video vs. images.
- * If captions look wrong, ensure ImageMagick is working and try a standard font like 'Arial'.
- """
- )
-
+ target_path = "music.mp3"
+ shutil.copy(music_file.name, target_path)
+ print(f"Uploaded music saved as: {target_path}")
+
+ # Generate the video
+ return generate_video(user_input, resolution, caption_option)
+
+# Create the Gradio interface
+iface = gr.Interface(
+ fn=generate_video_with_options,
+ inputs=[
+ gr.Textbox(label="Video Concept", placeholder="Enter your video concept here..."),
+ gr.Radio(["Full", "Short"], label="Resolution", value="Full"),
+ gr.Radio(["No"], label="Captions (Coming Soon)", value="No"),
+ gr.File(label="Upload Background Music (MP3)", file_types=[".mp3"]),
+ gr.Dropdown(choices=list(VOICE_CHOICES.keys()), label="Choose Voice", value="Emma (Female)"),
+ gr.Slider(0, 100, value=25, step=1, label="Video Clip Usage Probability (%)"),
+ gr.Slider(0.0, 1.0, value=0.08, step=0.01, label="Background Music Volume"),
+ gr.Slider(10, 60, value=30, step=1, label="Video FPS"),
+ gr.Dropdown(choices=["ultrafast", "superfast", "veryfast", "faster", "fast", "medium", "slow"],
+ value="veryfast", label="Export Preset"),
+ gr.Slider(0.5, 1.5, value=0.9, step=0.05, label="Voice Speed"),
+ gr.Slider(20, 100, value=45, step=1, label="Caption Font Size")
+ ],
+ outputs=gr.Video(label="Generated Video"),
+ title="AI Documentary Video Generator",
+ description="Create short documentary videos with AI. Upload music, choose voice, and customize settings."
+)
# Launch the interface
if __name__ == "__main__":
- # Final check for API keys before launching
- keys_ok = True
- if PEXELS_API_KEY == "YOUR_PEXELS_API_KEY_HERE":
- print("####################################################################")
- print("ERROR: PEXELS_API_KEY is not set!")
- print("Please set it at the top of the script or as an environment variable.")
- print("####################################################################")
- keys_ok = False
- if OPENROUTER_API_KEY == "YOUR_OPENROUTER_API_KEY_HERE":
- print("####################################################################")
- print("ERROR: OPENROUTER_API_KEY is not set!")
- print("Please set it at the top of the script or as an environment variable.")
- print("####################################################################")
- keys_ok = False
-
- if keys_ok:
- print("\nAPI Keys seem to be set. Launching Gradio interface...")
- print("Access the interface at the URL provided below (usually http://127.0.0.1:7860 or a public URL if share=True).")
- iface.launch(share=True, debug=True) # Enable share=True for public link, debug=True for more logs
- else:
- print("\nCannot launch Gradio interface due to missing API keys.")
-
\ No newline at end of file
+ iface.launch(share=True)
\ No newline at end of file