diff --git "a/app.py" "b/app.py"
--- "a/app.py"
+++ "b/app.py"
@@ -1,12 +1,3 @@
-# Install necessary packages (assuming these are already run in your environment)
-# !pip install transformers==4.49.0
-# !pip install moviepy gTTS requests pydub pillow
-# !pip cache purge
-# !apt-get install imagemagick -y
-# !pip install kokoro>=0.3.4 soundfile
-# !apt-get-qq -y install espeak-ng > /dev/null 2>&1
-# !pip install pysrt
-# !pip install gradio
# Import necessary libraries
from kokoro import KPipeline
@@ -40,33 +31,59 @@ from urllib.parse import quote
import pysrt
from gtts import gTTS
# Removed duplicate import of gradio as gr
+import traceback # For detailed error printing
# Initialize Kokoro TTS pipeline (using American English)
-pipeline = KPipeline(lang_code='a') # Use voice 'af_heart' for American English
+try:
+ pipeline = KPipeline(lang_code='a') # Use voice 'af_heart' for American English
+ print("Kokoro TTS pipeline initialized.")
+except Exception as e:
+ print(f"FATAL ERROR: Could not initialize Kokoro TTS pipeline: {e}")
+ pipeline = None # Set pipeline to None if initialization fails
+
# Ensure ImageMagick binary is set
try:
- mpy_config.change_settings({"IMAGEMAGICK_BINARY": "/usr/bin/convert"})
- print("ImageMagick binary set successfully.")
+ # Common paths, adjust if necessary for your environment
+ imagemagick_paths = ["/usr/bin/convert", "/usr/local/bin/convert", "/opt/homebrew/bin/convert"]
+ found_path = None
+ for path in imagemagick_paths:
+ if os.path.exists(path):
+ found_path = path
+ break
+ if found_path:
+ mpy_config.change_settings({"IMAGEMAGICK_BINARY": found_path})
+ print(f"ImageMagick binary set successfully to: {found_path}")
+ else:
+ print("Warning: Could not find ImageMagick 'convert' binary in common paths.")
+ print("TextClip functionality might be limited if ImageMagick is not found or configured.")
except Exception as e:
print(f"Warning: Could not set ImageMagick binary automatically: {e}")
print("TextClip functionality might be limited if ImageMagick is not found.")
# ---------------- Global Configuration ---------------- #
-PEXELS_API_KEY = 'BhJqbcdm9Vi90KqzXKAhnEHGsuFNv4irXuOjWtT761U49lRzo03qBGna' # Replace with your key
-OPENROUTER_API_KEY = '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' # Replace with your key
-OPENROUTER_MODEL = "mistralai/mistral-small-3.1-24b-instruct:free"
+# !!! IMPORTANT: Replace placeholders with your actual API keys !!!
+PEXELS_API_KEY = 'BhJqbcdm9Vi90KqzXKAhnEHGsuFNv4irXuOjWtT761U49lRzo03qBGna'
+OPENROUTER_API_KEY = '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'
+OPENROUTER_MODEL = "mistralai/mistral-small-3.1-24b-instruct:free" # Or choose another model
OUTPUT_VIDEO_FILENAME = "final_video.mp4"
-USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
-
-# Additional global variables needed for the Gradio interface
-selected_voice = 'af_heart' # Default voice
-voice_speed = 0.9 # Default voice speed
-font_size = 45 # Default font size
-video_clip_probability = 0.25 # Default probability for video clips
-bg_music_volume = 0.08 # Default background music volume
-fps = 30 # Default FPS
-preset = "veryfast" # Default preset
+USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36" # Updated User Agent
+
+# --- Check if API keys are set ---
+if PEXELS_API_KEY == "YOUR_PEXELS_API_KEY_HERE":
+ print("WARNING: PEXELS_API_KEY is not set. Please set the environment variable or replace the placeholder in the script.")
+if OPENROUTER_API_KEY == "YOUR_OPENROUTER_API_KEY_HERE":
+ print("WARNING: OPENROUTER_API_KEY is not set. Please set the environment variable or replace the placeholder in the script.")
+
+
+# Additional global variables needed for the Gradio interface (defaults)
+selected_voice = 'af_heart'
+voice_speed = 0.9
+font_size = 45
+video_clip_probability = 0.25
+bg_music_volume = 0.08
+fps = 30
+preset = "veryfast"
TARGET_RESOLUTION = None
CAPTION_COLOR = None
TEMP_FOLDER = None
@@ -76,9 +93,13 @@ TEMP_FOLDER = None
def generate_script(user_input):
"""Generate documentary script with proper OpenRouter handling."""
+ if not OPENROUTER_API_KEY or OPENROUTER_API_KEY == "YOUR_OPENROUTER_API_KEY_HERE":
+ print("ERROR: OpenRouter API Key is missing or still a placeholder.")
+ return None
+
headers = {
'Authorization': f'Bearer {OPENROUTER_API_KEY}',
- 'HTTP-Referer': 'https://your-domain.com', # Optional: Replace with your actual domain if needed
+ 'HTTP-Referer': 'https://github.com/your-repo', # Optional: Replace with your repo/domain
'X-Title': 'AI Documentary Maker' # Optional
}
@@ -166,7 +187,7 @@ Now here is the Topic/scrip: {user_input}
'model': OPENROUTER_MODEL,
'messages': [{'role': 'user', 'content': prompt}],
'temperature': 0.4,
- 'max_tokens': 5000
+ 'max_tokens': 1000 # Reduced max tokens slightly
}
try:
@@ -177,20 +198,38 @@ Now here is the Topic/scrip: {user_input}
timeout=60 # Increased timeout
)
- response.raise_for_status() # Raise an exception for bad status codes
+ response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx)
response_data = response.json()
if 'choices' in response_data and len(response_data['choices']) > 0 and 'message' in response_data['choices'][0] and 'content' in response_data['choices'][0]['message']:
- return response_data['choices'][0]['message']['content'].strip()
+ script_content = response_data['choices'][0]['message']['content'].strip()
+ if not script_content:
+ print("Warning: API returned an empty script.")
+ return None
+ # Basic format check
+ if '[' not in script_content or ']' not in script_content:
+ print(f"Warning: Generated script might lack proper formatting (missing '[' or ']'):\n{script_content[:200]}...")
+ return script_content
else:
print("Unexpected API response format:", response_data)
return None
+ except requests.exceptions.Timeout:
+ print("API request timed out.")
+ return None
except requests.exceptions.RequestException as e:
print(f"API request failed: {e}")
+ # Print detailed error if available (e.g., from response text)
+ if hasattr(e, 'response') and e.response is not None:
+ print(f"Response status: {e.response.status_code}")
+ try:
+ print(f"Response body: {e.response.json()}")
+ except ValueError: # If response is not JSON
+ print(f"Response body: {e.response.text}")
return None
except Exception as e:
print(f"An unexpected error occurred during script generation: {e}")
+ traceback.print_exc()
return None
def parse_script(script_text):
@@ -215,13 +254,21 @@ def parse_script(script_text):
if not line: # Skip empty lines
continue
- match = re.match(r'^\[([^\]]+)\](.*)', line)
+ # Regex to capture title in brackets and the following text on the same line
+ match = re.match(r'^\s*\[([^\]]+)\](.*)', line)
if match:
# If we were processing a previous title, save it
if current_title is not None and current_text:
sections[current_title] = current_text.strip()
current_title = match.group(1).strip()
+ # Ensure title is not empty
+ if not current_title:
+ print(f"Warning: Found empty title '[]' in script line: '{line}'. Skipping.")
+ current_title = None # Reset title
+ current_text = ""
+ continue
+
current_text = match.group(2).strip() + " " # Start text for the new title
elif current_title is not None:
# Append line to the current text if it doesn't start a new section
@@ -238,11 +285,11 @@ def parse_script(script_text):
print(f"Warning: Skipping empty title ('{title}') or narration ('{narration}')")
continue
- media_element = {"type": "media", "prompt": title, "effects": "fade-in"}
+ media_element = {"type": "media", "prompt": title, "effects": "random"} # Use random effect
words = narration.split()
- # Simple duration estimate: 0.5 seconds per word, minimum 3 seconds
+ # Simple duration estimate: ~0.5 seconds per word, minimum 3 seconds
duration = max(3.0, len(words) * 0.5)
- tts_element = {"type": "tts", "text": narration, "voice": "en", "duration": duration}
+ tts_element = {"type": "tts", "text": narration, "voice": "en", "duration": duration} # 'en' is placeholder, actual voice set globally
elements.append(media_element)
elements.append(tts_element)
@@ -252,20 +299,21 @@ def parse_script(script_text):
except Exception as e:
print(f"Error parsing script: {e}")
print(f"Problematic script text snippet: {script_text[:200]}") # Log part of the script
+ traceback.print_exc()
return []
def search_pexels_videos(query, pexels_api_key):
- """Search for a video on Pexels by query and return a random HD video."""
- if not pexels_api_key:
- print("Pexels API key is missing. Cannot search for videos.")
+ """Search for a video on Pexels by query and return a random HD/SD video."""
+ if not pexels_api_key or pexels_api_key == "YOUR_PEXELS_API_KEY_HERE":
+ print("ERROR: Pexels API key is missing or still a placeholder. Cannot search for videos.")
return None
headers = {'Authorization': pexels_api_key}
base_url = "https://api.pexels.com/videos/search"
- num_pages = 3 # Search first 3 pages
+ num_pages = 2 # Search first 2 pages is usually enough
videos_per_page = 15
- max_retries = 3
+ max_retries = 2 # Fewer retries
retry_delay = 2 # Start with 2 seconds delay
search_query = query
@@ -273,54 +321,56 @@ def search_pexels_videos(query, pexels_api_key):
print(f"Searching Pexels videos for: '{query}'")
for page in range(1, num_pages + 1):
- params = {"query": search_query, "per_page": videos_per_page, "page": page, "orientation": "landscape"} # Added orientation
+ # Prefer landscape orientation for standard video
+ orient = "landscape" if TARGET_RESOLUTION and TARGET_RESOLUTION[0] > TARGET_RESOLUTION[1] else "portrait"
+ params = {"query": search_query, "per_page": videos_per_page, "page": page, "orientation": orient}
for attempt in range(max_retries):
try:
- response = requests.get(base_url, headers=headers, params=params, timeout=15) # Increased timeout
-
- if response.status_code == 200:
- data = response.json()
- videos = data.get("videos", [])
-
- if not videos:
- # print(f"No videos found on page {page} for '{query}'.") # Less verbose
- break # Stop searching pages if one is empty
-
- for video in videos:
- video_files = video.get("video_files", [])
- # Prefer HD, then SD if HD not found
- hd_link = None
- sd_link = None
- for file in video_files:
- if file.get("quality") == "hd" and file.get("link"):
- hd_link = file.get("link")
- break # Found HD, use it
- elif file.get("quality") == "sd" and file.get("link"):
- sd_link = file.get("link") # Keep SD as fallback
-
- link_to_add = hd_link if hd_link else sd_link
- if link_to_add:
- all_videos.append(link_to_add)
-
- break # Success for this page, move to next page
-
- elif response.status_code == 429:
- print(f"Pexels rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay} seconds...")
- time.sleep(retry_delay)
- retry_delay *= 2
- elif response.status_code == 400: # Bad request often means invalid query
- print(f"Pexels API bad request (400) for query '{query}'. Skipping.")
- return None # Don't retry bad requests
- else:
- print(f"Error fetching Pexels videos: {response.status_code} {response.text}")
- if attempt < max_retries - 1:
- print(f"Retrying Pexels video search in {retry_delay} seconds...")
- time.sleep(retry_delay)
- retry_delay *= 2
- else:
- print("Max retries reached for Pexels video search.")
- break # Max retries for this page
+ response = requests.get(base_url, headers=headers, params=params, timeout=15) # Reasonable timeout
+
+ response.raise_for_status() # Check for 4xx/5xx errors
+ data = response.json()
+ videos = data.get("videos", [])
+
+ if not videos:
+ # print(f"No videos found on page {page} for '{query}'.") # Less verbose
+ break # Stop searching pages if one is empty
+
+ for video in videos:
+ video_files = video.get("video_files", [])
+ # Prefer HD, then SD if HD not found
+ hd_link = None
+ sd_link = None
+ for file in video_files:
+ if file.get("quality") == "hd" and file.get("link") and file.get('width', 0) > 1000: # Basic check for decent HD
+ hd_link = file.get("link")
+ break # Found HD, use it
+ elif file.get("quality") == "sd" and file.get("link") and file.get('width', 0) > 500: # Basic check for decent SD
+ sd_link = file.get("link") # Keep SD as fallback
+
+ link_to_add = hd_link if hd_link else sd_link
+ if link_to_add:
+ all_videos.append(link_to_add)
+
+ break # Success for this page, move to next page
+
+ except requests.exceptions.HTTPError as e:
+ print(f"HTTP Error fetching Pexels videos: {e.response.status_code} {e.response.text}")
+ if e.response.status_code == 429: # Rate limit
+ print(f"Pexels rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay} seconds...")
+ time.sleep(retry_delay)
+ retry_delay *= 2
+ elif e.response.status_code == 400: # Bad request often means invalid query
+ print(f"Pexels API bad request (400) for query '{query}'. Skipping video search.")
+ return None # Don't retry bad requests
+ elif attempt < max_retries - 1:
+ print(f"Retrying Pexels video search in {retry_delay} seconds...")
+ time.sleep(retry_delay)
+ retry_delay *= 2
+ else:
+ print("Max retries reached for Pexels video search after HTTP error.")
+ break # Max retries for this page
except requests.exceptions.Timeout:
print(f"Pexels video search timed out (attempt {attempt+1}/{max_retries}).")
if attempt < max_retries - 1:
@@ -332,8 +382,11 @@ def search_pexels_videos(query, pexels_api_key):
break # Max retries for this page
except requests.exceptions.RequestException as e:
print(f"Pexels video search request exception: {e}")
- # Don't retry general request exceptions unless specifically needed
- break # Stop trying for this page
+ break # Stop trying for this page on general network errors
+ except Exception as e:
+ print(f"Unexpected error during Pexels video search: {e}")
+ traceback.print_exc()
+ break # Stop trying for this page
# Reset retry delay for the next page
retry_delay = 2
@@ -348,61 +401,63 @@ def search_pexels_videos(query, pexels_api_key):
def search_pexels_images(query, pexels_api_key):
"""Search for an image on Pexels by query."""
- if not pexels_api_key:
- print("Pexels API key is missing. Cannot search for images.")
+ if not pexels_api_key or pexels_api_key == "YOUR_PEXELS_API_KEY_HERE":
+ print("ERROR: Pexels API key is missing or still a placeholder. Cannot search for images.")
return None
headers = {'Authorization': pexels_api_key}
url = "https://api.pexels.com/v1/search"
- params = {"query": query, "per_page": 10, "orientation": "landscape"} # Get more results, landscape only
+ # Match orientation to target video resolution
+ orient = "landscape" if TARGET_RESOLUTION and TARGET_RESOLUTION[0] > TARGET_RESOLUTION[1] else "portrait"
+ params = {"query": query, "per_page": 10, "orientation": orient}
- max_retries = 3
+ max_retries = 2
retry_delay = 2
- print(f"Searching Pexels images for: '{query}'")
+ print(f"Searching Pexels images for: '{query}' (Orientation: {orient})")
for attempt in range(max_retries):
try:
response = requests.get(url, headers=headers, params=params, timeout=15)
-
- if response.status_code == 200:
- data = response.json()
- photos = data.get("photos", [])
- if photos:
- # Select from 'original', 'large2x', 'large' in order of preference
- valid_photos = []
- for photo in photos:
- src = photo.get("src", {})
- img_url = src.get("original") or src.get("large2x") or src.get("large")
- if img_url:
- valid_photos.append(img_url)
-
- if valid_photos:
- chosen_url = random.choice(valid_photos)
- print(f"Found {len(valid_photos)} Pexels images for '{query}', selected one.")
- return chosen_url
- else:
- print(f"No valid image URLs found in Pexels response for '{query}'.")
- return None
+ response.raise_for_status() # Check for 4xx/5xx errors
+
+ data = response.json()
+ photos = data.get("photos", [])
+ if photos:
+ # Select from 'original', 'large2x', 'large' in order of preference
+ valid_photos = []
+ for photo in photos:
+ src = photo.get("src", {})
+ # Prefer larger sizes but fall back
+ img_url = src.get("original") or src.get("large2x") or src.get("large") or src.get("medium")
+ if img_url:
+ valid_photos.append(img_url)
+
+ if valid_photos:
+ chosen_url = random.choice(valid_photos)
+ print(f"Found {len(valid_photos)} Pexels images for '{query}', selected one.")
+ return chosen_url
else:
- # print(f"No Pexels images found for query: {query}") # Less verbose
+ print(f"No valid image URLs found in Pexels response for '{query}'.")
return None
+ else:
+ # print(f"No Pexels images found for query: {query}") # Less verbose
+ return None
- elif response.status_code == 429:
- print(f"Pexels rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay} seconds...")
- time.sleep(retry_delay)
- retry_delay *= 2
- elif response.status_code == 400:
- print(f"Pexels API bad request (400) for query '{query}'. Skipping.")
+ except requests.exceptions.HTTPError as e:
+ print(f"HTTP Error fetching Pexels images: {e.response.status_code} {e.response.text}")
+ if e.response.status_code == 429: # Rate limit
+ print(f"Pexels rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay} seconds...")
+ time.sleep(retry_delay)
+ retry_delay *= 2
+ elif e.response.status_code == 400: # Bad request
+ print(f"Pexels API bad request (400) for query '{query}'. Skipping image search.")
return None
+ elif attempt < max_retries - 1:
+ print(f"Retrying Pexels image search in {retry_delay} seconds...")
+ time.sleep(retry_delay)
+ retry_delay *= 2
else:
- print(f"Error fetching Pexels images: {response.status_code} {response.text}")
- if attempt < max_retries - 1:
- print(f"Retrying Pexels image search in {retry_delay} seconds...")
- time.sleep(retry_delay)
- retry_delay *= 2
- else:
- print("Max retries reached for Pexels image search.")
- return None # Max retries failed
-
+ print("Max retries reached for Pexels image search after HTTP error.")
+ return None # Max retries failed
except requests.exceptions.Timeout:
print(f"Pexels image search timed out (attempt {attempt+1}/{max_retries}).")
if attempt < max_retries - 1:
@@ -414,7 +469,11 @@ def search_pexels_images(query, pexels_api_key):
return None # Max retries failed
except requests.exceptions.RequestException as e:
print(f"Pexels image search request exception: {e}")
- return None # Don't retry
+ return None # Don't retry general network errors
+ except Exception as e:
+ print(f"Unexpected error during Pexels image search: {e}")
+ traceback.print_exc()
+ return None
print(f"No Pexels images found for query: '{query}' after all attempts.")
return None
@@ -431,39 +490,35 @@ def search_google_images(query):
soup = BeautifulSoup(response.text, "html.parser")
- # Google changes its HTML structure often. This is a common pattern, but might need updates.
- # Look for image data embedded in script tags or specific img tags.
image_urls = []
- # Try finding JSON data first (often more reliable if present)
- scripts = soup.find_all("script")
- for script in scripts:
- if script.string and 'AF_initDataCallback' in script.string:
- # This requires more complex parsing of the JS data structure
- # For simplicity, we'll stick to img tags for now.
- pass # Placeholder for potential future JSON parsing
-
- # Fallback to finding img tags (less reliable for direct source URLs)
- img_tags = soup.find_all("img")
- for img in img_tags:
- src = img.get("src") or img.get("data-src") # Check both src and data-src
- if src and src.startswith("http") and not "gstatic.com" in src:
- # Basic filtering, might need refinement
- image_urls.append(src)
- elif src and src.startswith('data:image'):
- # Handle base64 encoded images (less common for main results now)
- try:
- # Extract base64 data (simplistic extraction)
- header, encoded = src.split(",", 1)
- # You could save this, but it's often just thumbnails
- # print("Found base64 image data (skipping for now)")
- except ValueError:
- pass # Ignore malformed data URIs
+ # Google changes its HTML structure often. This targets common patterns.
+ # Pattern 1: Images directly in
tags (often thumbnails or requires JS)
+ for img in soup.find_all("img"):
+ src = img.get("src") or img.get("data-src")
+ if src and src.startswith("http") and "gstatic.com" not in src and "google.com" not in src:
+ image_urls.append(src)
+ elif src and src.startswith('data:image'):
+ # Skip base64 images as they are usually small thumbnails
+ pass
+
+ # Pattern 2: Look for JSON data embedded in script tags (more reliable if found)
+ # This requires more complex parsing and adapting to Google's changing structure.
+ # Example (might need adjustment):
+ # scripts = soup.find_all("script")
+ # for script in scripts:
+ # if script.string and 'var AF_data' in script.string: # Example marker
+ # # Complex parsing logic here to extract URLs from the JS object
+ # pass
if image_urls:
+ # Filter out potential low-quality results (e.g., very short URLs)
+ filtered_urls = [url for url in image_urls if len(url) > 50 and ('.jpg' in url or '.png' in url or '.jpeg' in url)]
+ if not filtered_urls: filtered_urls = image_urls # Use original if filter removed everything
+
# Return a random one from the first few potentially relevant results
- num_to_consider = min(len(image_urls), 10)
- chosen_url = random.choice(image_urls[:num_to_consider])
- print(f"Found {len(image_urls)} potential Google images, selected one.")
+ num_to_consider = min(len(filtered_urls), 10)
+ chosen_url = random.choice(filtered_urls[:num_to_consider])
+ print(f"Found {len(filtered_urls)} potential Google images, selected one.")
return chosen_url
else:
print(f"No suitable Google Images found for query: '{query}' with current parsing method.")
@@ -473,46 +528,66 @@ def search_google_images(query):
return None
except Exception as e:
print(f"Error parsing Google Images HTML: {e}")
+ # traceback.print_exc() # Uncomment for detailed parsing errors
return None
def download_image(image_url, filename):
"""Download an image from a URL to a local file with enhanced error handling."""
- if not image_url:
- print("Error: No image URL provided for download.")
+ if not image_url or not isinstance(image_url, str) or not image_url.startswith('http'):
+ print(f"Error: Invalid image URL provided for download: {image_url}")
return None
try:
- headers = {"User-Agent": USER_AGENT, "Accept": "image/*"} # Be more specific about accepted content
- print(f"Downloading image from: {image_url} to {filename}")
+ headers = {"User-Agent": USER_AGENT, "Accept": "image/jpeg,image/png,image/*"} # Be more specific
+ print(f"Downloading image: {image_url} \n to: {filename}")
response = requests.get(image_url, headers=headers, stream=True, timeout=20) # Increased timeout
- response.raise_for_status() # Check for download errors
+ response.raise_for_status() # Check for download errors (4xx, 5xx)
# Check content type if possible
content_type = response.headers.get('Content-Type', '').lower()
- if 'image' not in content_type:
- print(f"Warning: URL content type ({content_type}) might not be an image. Proceeding anyway.")
+ if content_type and 'image' not in content_type:
+ print(f"Warning: URL content type ('{content_type}') might not be an image. Proceeding anyway.")
+ # Download the content
with open(filename, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
+ # Basic file size check
+ if os.path.getsize(filename) < 1024: # Less than 1KB is suspicious
+ print(f"Warning: Downloaded image file '{filename}' is very small. It might be invalid.")
+ # Optionally remove it here, but validation below is better
+ # os.remove(filename); return None
+
print(f"Image downloaded successfully to: {filename}")
- # Validate the downloaded image
+ # Validate the downloaded image using Pillow
try:
img = Image.open(filename)
- img.verify() # Check if Pillow can read the header
- # Re-open after verify
+ img.verify() # Check if Pillow can read the header and format
+ # Re-open after verify to load image data
img = Image.open(filename)
- if img.mode != 'RGB':
+ # Check for minimum dimensions (optional)
+ # min_dim = 100
+ # if img.width < min_dim or img.height < min_dim:
+ # print(f"Warning: Image {filename} is very small ({img.width}x{img.height}).")
+
+ # Convert to RGB if necessary (common requirement for video processing)
+ if img.mode not in ['RGB', 'RGBA']: # Allow RGBA for transparency if needed later, but RGB is safer
print(f"Converting image {filename} from {img.mode} to RGB.")
img = img.convert('RGB')
img.save(filename, quality=90) # Save with decent quality
+ elif img.mode == 'RGBA':
+ # If RGBA, consider converting to RGB or handling alpha channel appropriately
+ print(f"Image {filename} has alpha channel (RGBA). Converting to RGB.")
+ img = img.convert('RGB')
+ img.save(filename, quality=90)
+
img.close() # Close the image file handle
print(f"Image validated and processed: {filename}")
return filename
except (IOError, SyntaxError, Image.UnidentifiedImageError) as e_validate:
- print(f"Downloaded file '{filename}' is not a valid image or is corrupted: {e_validate}")
+ print(f"ERROR: Downloaded file '{filename}' is not a valid image or is corrupted: {e_validate}")
if os.path.exists(filename):
try:
os.remove(filename)
@@ -522,55 +597,64 @@ def download_image(image_url, filename):
return None
except requests.exceptions.RequestException as e_download:
- print(f"Image download error from {image_url}: {e_download}")
+ print(f"ERROR: Image download failed for {image_url}: {e_download}")
# Clean up potentially incomplete file
if os.path.exists(filename):
- try:
- os.remove(filename)
+ try: os.remove(filename)
except OSError: pass
return None
except Exception as e_general:
- print(f"General error during image processing for {image_url}: {e_general}")
+ print(f"ERROR: General error during image processing for {image_url}: {e_general}")
+ traceback.print_exc()
if os.path.exists(filename):
- try:
- os.remove(filename)
+ try: os.remove(filename)
except OSError: pass
return None
def download_video(video_url, filename):
"""Download a video from a URL to a local file."""
- if not video_url:
- print("Error: No video URL provided for download.")
+ if not video_url or not isinstance(video_url, str) or not video_url.startswith('http'):
+ print(f"Error: Invalid video URL provided for download: {video_url}")
return None
try:
headers = {"User-Agent": USER_AGENT} # Pexels might not require this, but good practice
- print(f"Downloading video from: {video_url} to {filename}")
- response = requests.get(video_url, headers=headers, stream=True, timeout=60) # Generous timeout for videos
- response.raise_for_status()
+ print(f"Downloading video: {video_url} \n to: {filename}")
+ response = requests.get(video_url, headers=headers, stream=True, timeout=90) # Generous timeout for videos
+ response.raise_for_status() # Check for download errors (4xx, 5xx)
+
+ # Optional: Check content type
+ content_type = response.headers.get('Content-Type', '').lower()
+ if content_type and 'video' not in content_type:
+ print(f"Warning: URL content type ('{content_type}') might not be a video. Proceeding.")
with open(filename, 'wb') as f:
- for chunk in response.iter_content(chunk_size=1024*1024): # Larger chunks for video
- f.write(chunk)
- print(f"Video downloaded successfully to: {filename}")
+ total_downloaded = 0
+ start_time = time.time()
+ for chunk in response.iter_content(chunk_size=1024*1024): # Larger chunks (1MB) for video
+ if chunk: # filter out keep-alive new chunks
+ f.write(chunk)
+ total_downloaded += len(chunk)
+ end_time = time.time()
+ download_speed = (total_downloaded / (1024*1024)) / (end_time - start_time + 1e-6) # MB/s
+ print(f"Video downloaded successfully to: {filename} ({total_downloaded / (1024*1024):.2f} MB at {download_speed:.2f} MB/s)")
# Basic validation: check file size
- if os.path.getsize(filename) < 1024: # Check if file is suspiciously small (e.g., < 1KB)
+ if os.path.getsize(filename) < 10 * 1024: # Check if file is suspiciously small (e.g., < 10KB)
print(f"Warning: Downloaded video file '{filename}' is very small. It might be invalid.")
# Keep the file for now, let moviepy handle potential errors later
return filename
except requests.exceptions.RequestException as e:
- print(f"Video download error from {video_url}: {e}")
+ print(f"ERROR: Video download failed for {video_url}: {e}")
if os.path.exists(filename):
- try:
- os.remove(filename) # Clean up failed download
+ try: os.remove(filename) # Clean up failed download
except OSError: pass
return None
except Exception as e_general:
- print(f"General error during video download for {video_url}: {e_general}")
+ print(f"ERROR: General error during video download for {video_url}: {e_general}")
+ traceback.print_exc()
if os.path.exists(filename):
- try:
- os.remove(filename)
+ try: os.remove(filename)
except OSError: pass
return None
@@ -580,35 +664,50 @@ def generate_media(prompt, user_image=None, current_index=0, total_segments=1):
Generate a visual asset: Try video (based on probability), then Pexels image, then Google (news), then fallback Pexels image.
Returns a dict: {'path': , 'asset_type': 'video' or 'image'}.
"""
+ # Sanitize prompt for use in filenames
safe_prompt = re.sub(r'[^\w\s-]', '', prompt).strip().replace(' ', '_')
if not safe_prompt: # Handle cases where prompt becomes empty after sanitizing
safe_prompt = f"media_{current_index}"
+ safe_prompt = safe_prompt[:50] # Limit filename part length
+
print(f"\n--- Generating Media for Prompt: '{prompt}' (Segment {current_index+1}/{total_segments}) ---")
+ # --- Strategy ---
+ # 1. Video? (Based on probability) -> Pexels Video Search -> Download
+ # 2. Image? -> Pexels Image Search -> Download
+ # 3. News? -> Google Image Search -> Download
+ # 4. Fallback? -> Generic Pexels Image Search -> Download
+ # 5. Absolute Fallback? -> Generate Color Background
+
# 1. Try Video first based on probability
if random.random() < video_clip_probability:
- print(f"Attempting video search (Probability: {video_clip_probability*100}%)")
+ print(f"Attempting video search (Probability: {video_clip_probability*100:.0f}%)")
video_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}_video_{current_index}.mp4")
video_url = search_pexels_videos(prompt, PEXELS_API_KEY)
if video_url:
downloaded_video = download_video(video_url, video_file)
if downloaded_video and os.path.exists(downloaded_video):
- # Further check if video is usable by moviepy (optional, adds overhead)
- try:
- with VideoFileClip(downloaded_video) as test_clip:
- if test_clip.duration > 0:
- print(f"Video asset usable: {downloaded_video}")
- return {"path": downloaded_video, "asset_type": "video"}
- else:
- print(f"Downloaded video file seems invalid (duration 0): {downloaded_video}")
- os.remove(downloaded_video) # Clean up invalid video
- except Exception as e:
- print(f"Error testing downloaded video {downloaded_video}: {e}")
- if os.path.exists(downloaded_video): os.remove(downloaded_video) # Clean up invalid video
- else:
- print(f"Pexels video download failed for prompt: '{prompt}'")
- else:
- print(f"Pexels video search failed for prompt: '{prompt}'")
+ # Basic check: File size > 10KB?
+ if os.path.getsize(downloaded_video) > 10 * 1024:
+ print(f"Video asset downloaded: {downloaded_video}")
+ # Optional: Deeper check with moviepy (adds overhead)
+ # try:
+ # with VideoFileClip(downloaded_video) as test_clip:
+ # if test_clip.duration > 0:
+ # print(f"Video asset usable: {downloaded_video}")
+ # return {"path": downloaded_video, "asset_type": "video"}
+ # else: print(f"Downloaded video file seems invalid (duration 0): {downloaded_video}")
+ # except Exception as e: print(f"Error testing downloaded video {downloaded_video}: {e}")
+ # If basic check passed, return it and let create_clip handle errors
+ return {"path": downloaded_video, "asset_type": "video"}
+ else:
+ print(f"Downloaded video file is too small, likely invalid: {downloaded_video}")
+ try: os.remove(downloaded_video)
+ except OSError: pass
+ # else: print(f"Pexels video download failed for prompt: '{prompt}'") # Covered by download_video logs
+ # else: print(f"Pexels video search failed for prompt: '{prompt}'") # Covered by search_pexels_videos logs
+ else:
+ print("Skipping video search based on probability.")
# 2. Try Pexels Image
print("Attempting Pexels image search...")
@@ -619,12 +718,14 @@ def generate_media(prompt, user_image=None, current_index=0, total_segments=1):
if downloaded_image_pexels and os.path.exists(downloaded_image_pexels):
print(f"Pexels image asset saved: {downloaded_image_pexels}")
return {"path": downloaded_image_pexels, "asset_type": "image"}
- else:
- print(f"Pexels image download failed for prompt: '{prompt}'")
+ # else: print(f"Pexels image download failed for prompt: '{prompt}'") # Covered by download_image logs
+ # else: print(f"Pexels image search failed for prompt: '{prompt}'") # Covered by search_pexels_images logs
+
- # 3. If "news" in prompt, try Google Images as a secondary option
- if "news" in prompt.lower():
- print(f"News-related query: '{prompt}'. Trying Google Images as secondary...")
+ # 3. If "news" in prompt (case-insensitive), try Google Images as a secondary option
+ news_keywords = ["news", "report", "breaking", "headline", "current event"] # Expand if needed
+ if any(keyword in prompt.lower() for keyword in news_keywords):
+ print(f"News-related query detected: '{prompt}'. Trying Google Images...")
image_file_google = os.path.join(TEMP_FOLDER, f"{safe_prompt}_google_{current_index}.jpg")
image_url_google = search_google_images(prompt)
if image_url_google:
@@ -632,14 +733,12 @@ def generate_media(prompt, user_image=None, current_index=0, total_segments=1):
if downloaded_image_google and os.path.exists(downloaded_image_google):
print(f"Google image asset saved: {downloaded_image_google}")
return {"path": downloaded_image_google, "asset_type": "image"}
- else:
- print(f"Google Images download failed for prompt: '{prompt}'")
- else:
- print(f"Google Images search failed for prompt: '{prompt}'")
+ # else: print(f"Google Images download failed for prompt: '{prompt}'") # Covered by download_image logs
+ # else: print(f"Google Images search failed for prompt: '{prompt}'") # Covered by search_google_images logs
# 4. Fallback to generic Pexels image search if everything else failed
- print("Primary searches failed. Attempting fallback Pexels image search...")
- fallback_terms = ["abstract", "texture", "technology", "nature", "background"]
+ print("Primary searches failed or skipped. Attempting fallback Pexels image search...")
+ fallback_terms = ["abstract", "texture", "technology", "nature", "background", "cityscape", "pattern"]
fallback_term = random.choice(fallback_terms)
print(f"Using fallback term: '{fallback_term}'")
fallback_file = os.path.join(TEMP_FOLDER, f"fallback_{fallback_term}_{current_index}.jpg")
@@ -649,35 +748,39 @@ def generate_media(prompt, user_image=None, current_index=0, total_segments=1):
if downloaded_fallback and os.path.exists(downloaded_fallback):
print(f"Fallback image asset saved: {downloaded_fallback}")
return {"path": downloaded_fallback, "asset_type": "image"}
- else:
- print(f"Fallback image download failed for term: '{fallback_term}'")
- else:
- print(f"Fallback image search failed for term: '{fallback_term}'")
+ # else: print(f"Fallback image download failed for term: '{fallback_term}'")
+ # else: print(f"Fallback image search failed for term: '{fallback_term}'")
# 5. Absolute fallback: Generate a simple color background (if ImageMagick is available)
try:
- print("All media generation failed. Creating a simple color background.")
+ print("All media generation failed. Creating a simple color background as last resort.")
color_bg_path = os.path.join(TEMP_FOLDER, f"color_bg_{current_index}.png")
# Ensure TARGET_RESOLUTION is set before calling this
if TARGET_RESOLUTION:
w, h = TARGET_RESOLUTION
# Pick a random dark color
- r, g, b = random.randint(0, 50), random.randint(0, 50), random.randint(0, 50)
+ r, g, b = random.randint(10, 60), random.randint(10, 60), random.randint(10, 60)
color = f"rgb({r},{g},{b})"
- # Use ImageMagick 'convert' command - requires it to be installed and accessible
- cmd = f"convert -size {w}x{h} xc:'{color}' {color_bg_path}"
- os.system(cmd)
- if os.path.exists(color_bg_path):
- print(f"Generated color background: {color_bg_path}")
- return {"path": color_bg_path, "asset_type": "image"}
+ # Use ImageMagick 'convert' command - requires it to be installed and accessible via mpy_config
+ if mpy_config.get("IMAGEMAGICK_BINARY") != "auto-detect":
+ cmd = f"{mpy_config.get('IMAGEMAGICK_BINARY')} -size {w}x{h} xc:'{color}' '{color_bg_path}'"
+ print(f"Executing: {cmd}")
+ exit_code = os.system(cmd)
+ if exit_code == 0 and os.path.exists(color_bg_path) and os.path.getsize(color_bg_path) > 100:
+ print(f"Generated color background: {color_bg_path}")
+ return {"path": color_bg_path, "asset_type": "image"}
+ else:
+ print(f"Failed to generate color background using ImageMagick (Exit code: {exit_code}).")
+ return None
else:
- print("Failed to generate color background using ImageMagick.")
+ print("Cannot generate color background: ImageMagick binary not configured in moviepy.")
return None
else:
print("Cannot generate color background: TARGET_RESOLUTION not set.")
return None
except Exception as e:
print(f"Error generating color background: {e}")
+ traceback.print_exc()
return None
# Should not be reached if color background works, but as a final safety net:
@@ -688,25 +791,38 @@ def generate_media(prompt, user_image=None, current_index=0, total_segments=1):
def generate_silent_audio(duration, sample_rate=24000):
"""Generate a silent WAV audio file lasting 'duration' seconds."""
try:
+ # Ensure duration is positive and reasonable
+ duration = max(0.1, duration)
num_samples = int(duration * sample_rate)
silence = np.zeros(num_samples, dtype=np.float32)
+
# Ensure TEMP_FOLDER exists and is writable
if not TEMP_FOLDER or not os.path.isdir(TEMP_FOLDER):
print("Error: TEMP_FOLDER not set or invalid for silent audio.")
- # Create a fallback temporary file
- silent_file = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
- silent_path = silent_file.name
- silent_file.close() # Close handle immediately after getting name
+ # Create a fallback temporary file (less ideal as it might not be cleaned up)
+ try:
+ silent_file = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
+ silent_path = silent_file.name
+ silent_file.close() # Close handle immediately after getting name
+ except Exception as temp_err:
+ print(f"Error creating fallback temp file for silence: {temp_err}")
+ return None
else:
- silent_path = os.path.join(TEMP_FOLDER, f"silent_{int(time.time()*1000)}.wav")
+ # Use microsecond timestamp for uniqueness
+ timestamp = int(time.time() * 1_000_000)
+ silent_path = os.path.join(TEMP_FOLDER, f"silent_{timestamp}.wav")
sf.write(silent_path, silence, sample_rate)
- print(f"Silent audio generated: {silent_path} ({duration:.2f}s)")
- return silent_path
+ # Verify file creation and size
+ if os.path.exists(silent_path) and os.path.getsize(silent_path) > 0:
+ print(f"Silent audio generated: {silent_path} ({duration:.2f}s)")
+ return silent_path
+ else:
+ print(f"Error: Failed to write silent audio file to {silent_path}")
+ return None
except Exception as e:
print(f"Error generating silent audio: {e}")
- # Return None or raise exception? Returning None might hide issues.
- # Let's return None and let the calling function handle it.
+ traceback.print_exc()
return None
@@ -716,124 +832,160 @@ def generate_tts(text, voice):
Uses global `selected_voice` and `voice_speed`.
"""
if not text:
- print("Warning: Empty text received for TTS. Generating silence.")
- # Estimate a short duration for empty text, e.g., 1 second
+ print("Warning: Empty text received for TTS. Generating 1s silence.")
return generate_silent_audio(duration=1.0)
# Sanitize text slightly for filename (limit length, basic chars)
- safe_text_part = re.sub(r'[^\w-]', '', text[:15]).strip().replace(' ', '_')
- if not safe_text_part: safe_text_part = f"tts_{int(time.time()*1000)}"
+ safe_text_part = re.sub(r'[^\w-]', '', text[:20]).strip().replace(' ', '_')
+ timestamp = int(time.time() * 1_000_000) # More unique timestamp
+ if not safe_text_part: safe_text_part = f"tts_{timestamp}"
+ else: safe_text_part = f"{safe_text_part}_{timestamp}"
+
+ # Ensure TEMP_FOLDER is valid
+ if not TEMP_FOLDER or not os.path.isdir(TEMP_FOLDER):
+ print("ERROR: TEMP_FOLDER not set or invalid for TTS generation.")
+ return generate_silent_audio(duration=max(1.0, len(text.split()) * 0.5)) # Fallback silence
+
file_path = os.path.join(TEMP_FOLDER, f"{safe_text_part}.wav")
# Decide voice: Use global `selected_voice` if `voice` is the default 'en'
kokoro_voice_to_use = selected_voice if voice == 'en' else voice
- print(f"Generating TTS for: '{text[:50]}...' (Voice: {kokoro_voice_to_use}, Speed: {voice_speed})")
+ print(f"Generating TTS for: '{text[:60]}...' (Voice: {kokoro_voice_to_use}, Speed: {voice_speed})")
# --- Try Kokoro TTS ---
- try:
- # Ensure pipeline is initialized
- if pipeline is None:
- raise ValueError("Kokoro pipeline is not initialized.")
-
- generator = pipeline(text, voice=kokoro_voice_to_use, speed=voice_speed, split_pattern=r'\n+') # Split on newlines if any
- audio_segments = []
- output_sample_rate = 24000 # Kokoro's default rate
-
- for i, (gs, ps, audio) in enumerate(generator):
- if audio is not None and audio.ndim > 0 and audio.size > 0: # Check if audio data is valid
- # Ensure audio is float32, Kokoro might return different types
- if audio.dtype != np.float32:
- # Attempt conversion (e.g., from int16)
- if audio.dtype == np.int16:
- audio = audio.astype(np.float32) / 32768.0
- else:
- print(f"Warning: Unexpected audio dtype {audio.dtype} from Kokoro. Trying direct use.")
- # If unsure how to convert, might need to skip or handle specific cases
- audio_segments.append(audio)
+ if pipeline is not None: # Check if Kokoro was initialized successfully
+ try:
+ generator = pipeline(text, voice=kokoro_voice_to_use, speed=voice_speed, split_pattern=r'\n+') # Split on newlines if any
+ audio_segments = []
+ output_sample_rate = 24000 # Kokoro's default rate
+
+ for i, (gs, ps, audio) in enumerate(generator):
+ if audio is not None and isinstance(audio, np.ndarray) and audio.ndim > 0 and audio.size > 0:
+ # Ensure audio is float32
+ if audio.dtype != np.float32:
+ if audio.dtype == np.int16:
+ audio = audio.astype(np.float32) / 32768.0
+ else:
+ print(f"Warning: Unexpected audio dtype {audio.dtype} from Kokoro. Attempting conversion.")
+ try: audio = audio.astype(np.float32) # Generic attempt
+ except Exception: print("Conversion failed."); continue # Skip segment if conversion fails
+ audio_segments.append(audio)
+ else:
+ print(f"Warning: Kokoro returned empty or invalid audio segment {i} for text.")
+
+ if not audio_segments:
+ print("Error: Kokoro generated no valid audio segments.")
+ raise ValueError("No audio data from Kokoro")
+
+ # Concatenate segments if needed
+ full_audio = np.concatenate(audio_segments) if len(audio_segments) > 1 else audio_segments[0]
+
+ # Check final audio shape and content
+ if full_audio is None or full_audio.ndim == 0 or full_audio.size == 0:
+ print("Error: Final concatenated audio from Kokoro is invalid.")
+ raise ValueError("Invalid final audio data from Kokoro")
+
+ # Check for NaN or Inf values
+ if np.isnan(full_audio).any() or np.isinf(full_audio).any():
+ print("Warning: Kokoro audio contains NaN or Inf values. Cleaning.")
+ full_audio = np.nan_to_num(full_audio) # Replace NaN with 0, Inf with large numbers
+
+ # Normalize audio slightly to prevent clipping
+ max_abs_val = np.max(np.abs(full_audio))
+ if max_abs_val > 0: # Avoid division by zero
+ if max_abs_val > 1.0:
+ print("Normalizing Kokoro audio to prevent clipping.")
+ full_audio = full_audio / max_abs_val * 0.98
else:
- print(f"Warning: Kokoro returned empty or invalid audio segment {i} for text.")
-
- if not audio_segments:
- print("Error: Kokoro generated no valid audio segments.")
- raise ValueError("No audio data from Kokoro")
+ print("Warning: Kokoro generated silent audio.")
- # Concatenate segments if needed
- full_audio = np.concatenate(audio_segments) if len(audio_segments) > 1 else audio_segments[0]
- # Check final audio shape and content
- if full_audio is None or full_audio.ndim == 0 or full_audio.size == 0:
- print("Error: Final concatenated audio from Kokoro is invalid.")
- raise ValueError("Invalid final audio data from Kokoro")
+ sf.write(file_path, full_audio, output_sample_rate)
- # Check for NaN or Inf values
- if np.isnan(full_audio).any() or np.isinf(full_audio).any():
- print("Error: Kokoro audio contains NaN or Inf values. Attempting to clean.")
- full_audio = np.nan_to_num(full_audio) # Replace NaN with 0, Inf with large numbers
+ # Verify file write
+ if os.path.exists(file_path) and os.path.getsize(file_path) > 100: # Check size > 100 bytes
+ print(f"TTS audio saved: {file_path} (Kokoro)")
+ return file_path
+ else:
+ print(f"Error: Failed to write Kokoro TTS file or file is too small: {file_path}")
+ raise ValueError("Kokoro file write failed")
- # Normalize audio slightly to prevent clipping (optional)
- max_val = np.max(np.abs(full_audio))
- if max_val > 1.0:
- full_audio = full_audio / max_val * 0.98
+ except Exception as e_kokoro:
+ print(f"Error with Kokoro TTS: {e_kokoro}. Trying gTTS fallback...")
+ # traceback.print_exc() # Uncomment for detailed Kokoro errors
+ else:
+ print("Kokoro pipeline not available. Skipping Kokoro TTS attempt.")
- sf.write(file_path, full_audio, output_sample_rate)
- print(f"TTS audio saved: {file_path} (Kokoro)")
- return file_path
- except Exception as e_kokoro:
- print(f"Error with Kokoro TTS: {e_kokoro}. Trying gTTS fallback...")
+ # --- Try gTTS Fallback ---
+ try:
+ print("Attempting gTTS fallback...")
+ tts = gTTS(text=text, lang='en', slow= (voice_speed < 0.8) ) # Basic speed control approximation
+ # Save MP3 temporarily
+ mp3_path = os.path.join(TEMP_FOLDER, f"{safe_text_part}_gtts.mp3")
+ tts.save(mp3_path)
+
+ # Convert MP3 to WAV using pydub
+ audio = AudioSegment.from_mp3(mp3_path)
+ # Export as WAV (pydub handles sample rate conversion if needed, defaults reasonable)
+ # Ensure export path is the same WAV path we intended originally
+ audio.export(file_path, format="wav")
+
+ # Clean up temporary MP3
+ if os.path.exists(mp3_path):
+ try: os.remove(mp3_path)
+ except OSError: pass
- # --- Try gTTS Fallback ---
- try:
- tts = gTTS(text=text, lang='en', slow= (voice_speed < 0.8) ) # Basic speed control approximation
- # Save MP3 temporarily
- mp3_path = os.path.join(TEMP_FOLDER, f"{safe_text_part}_gtts.mp3")
- tts.save(mp3_path)
-
- # Convert MP3 to WAV using pydub
- audio = AudioSegment.from_mp3(mp3_path)
- # Export as WAV (pydub handles sample rate conversion if needed, defaults reasonable)
- audio.export(file_path, format="wav")
-
- # Clean up temporary MP3
- if os.path.exists(mp3_path):
- try:
- os.remove(mp3_path)
- except OSError: pass
+ # Check if the generated WAV file is valid
+ if os.path.exists(file_path) and os.path.getsize(file_path) > 100: # Basic size check
+ print(f"Fallback TTS saved: {file_path} (gTTS)")
+ return file_path
+ else:
+ print(f"Error: gTTS generated an invalid or empty WAV file: {file_path}")
+ if os.path.exists(file_path):
+ try: os.remove(file_path)
+ except OSError: pass
+ raise ValueError("gTTS output file invalid")
- print(f"Fallback TTS saved: {file_path} (gTTS)")
- # Check if the generated WAV file is valid
- if os.path.exists(file_path) and os.path.getsize(file_path) > 100: # Basic size check
- return file_path
- else:
- print(f"Error: gTTS generated an invalid or empty WAV file: {file_path}")
- if os.path.exists(file_path): os.remove(file_path)
- raise ValueError("gTTS output file invalid")
+ except ImportError:
+ print("Error: gTTS or pydub might not be installed. Cannot use gTTS fallback.")
+ # Skip to silence generation
+ except Exception as e_gtts:
+ print(f"Error with gTTS fallback: {e_gtts}. Generating silence.")
+ # traceback.print_exc() # Uncomment for detailed gTTS errors
- except Exception as e_gtts:
- print(f"Error with gTTS fallback: {e_gtts}. Generating silence.")
- # --- Generate Silence as final fallback ---
- # Estimate duration based on text length if possible
- estimated_duration = max(1.0, len(text.split()) * (0.6 / voice_speed)) # Rough estimate
- return generate_silent_audio(duration=estimated_duration)
+ # --- Generate Silence as final fallback ---
+ print("Generating silence as final TTS fallback.")
+ # Estimate duration based on text length if possible
+ estimated_duration = max(1.0, len(text.split()) * (0.6 / voice_speed)) # Rough estimate adjusted by speed
+ return generate_silent_audio(duration=estimated_duration)
def apply_kenburns_effect(clip, target_resolution, effect_type="random"):
"""Apply a smooth Ken Burns effect (zoom/pan) to an image clip."""
try:
target_w, target_h = target_resolution
- # Ensure clip has dimensions (might be None if error occurred)
- if not hasattr(clip, 'w') or not hasattr(clip, 'h') or clip.w is None or clip.h is None or clip.w == 0 or clip.h == 0:
- print("Error applying Ken Burns: Invalid clip dimensions.")
- return clip # Return original clip
+ if not isinstance(clip, ImageClip):
+ print("Warning: Ken Burns effect applied to non-ImageClip. Results may vary.")
+ # Attempt to get dimensions anyway
+ if not hasattr(clip, 'w') or clip.w is None or clip.h is None or clip.w == 0 or clip.h == 0:
+ print("Error applying Ken Burns: Invalid clip dimensions.")
+ return clip # Return original clip
+
+ # Ensure clip has a duration set
+ if clip.duration is None or clip.duration <= 0:
+ print("Error applying Ken Burns: Clip duration is not set or is zero.")
+ # Set a default duration? Might cause issues later. Return unmodified for now.
+ return clip
clip_w, clip_h = clip.w, clip.h
clip_aspect = clip_w / clip_h
target_aspect = target_w / target_h
# --- Resize to cover target area ---
- if clip_aspect > target_aspect:
- # Image is wider than target: Resize based on height
+ if clip_aspect >= target_aspect: # Includes case where aspect ratios are equal
+ # Image is wider than or equal to target: Resize based on height
scale_factor = target_h / clip_h
resized_w = int(clip_w * scale_factor)
resized_h = target_h
@@ -844,43 +996,41 @@ def apply_kenburns_effect(clip, target_resolution, effect_type="random"):
resized_h = int(clip_h * scale_factor)
# Use LANCZOS for resizing images - better quality
- clip = clip.resize(newsize=(resized_w, resized_h))
+ # Need to handle potential mask resizing as well
+ resized_clip = clip.resize(newsize=(resized_w, resized_h))
# --- Apply scale for zoom effect ---
- # Scale slightly larger to allow for movement without showing edges
zoom_scale = 1.15 # How much larger the image is than the frame initially
zoomed_w = int(resized_w * zoom_scale)
zoomed_h = int(resized_h * zoom_scale)
- clip = clip.resize(newsize=(zoomed_w, zoomed_h))
+ zoomed_clip = resized_clip.resize(newsize=(zoomed_w, zoomed_h))
# --- Determine movement parameters ---
max_offset_x = max(0, zoomed_w - target_w)
max_offset_y = max(0, zoomed_h - target_h)
available_effects = ["zoom-in", "zoom-out", "pan-left", "pan-right", "pan-up", "pan-down", "diag-tl-br", "diag-tr-bl"]
- if effect_type == "random":
+ if effect_type == "random" or effect_type not in available_effects:
effect_type = random.choice(available_effects)
- elif effect_type not in available_effects:
- print(f"Warning: Unknown Ken Burns effect '{effect_type}'. Defaulting to zoom-in.")
- effect_type = "zoom-in"
+ if effect_type not in available_effects: # Should not happen, but safety check
+ effect_type = "zoom-in"
print(f"Applying Ken Burns effect: {effect_type}")
# Define start and end positions/zooms based on effect type
- # Position is the center of the crop window relative to the zoomed image
center_x = zoomed_w / 2
center_y = zoomed_h / 2
start_pos = (center_x, center_y)
end_pos = (center_x, center_y)
- start_zoom_factor = 1.0 # Relative to the base zoomed size
- end_zoom_factor = 1.0
+ start_visual_zoom = 1.0 # 1.0 = fits target, >1.0 = zoomed in
+ end_visual_zoom = 1.0
if effect_type == "zoom-in":
- start_zoom_factor = 1.0
- end_zoom_factor = 1.0 / zoom_scale # Zoom in to fill the original zoomed size
+ start_visual_zoom = 1.0
+ end_visual_zoom = zoom_scale # Zoom in to the max pre-zoom
elif effect_type == "zoom-out":
- start_zoom_factor = 1.0 / zoom_scale
- end_zoom_factor = 1.0
+ start_visual_zoom = zoom_scale
+ end_visual_zoom = 1.0
elif effect_type == "pan-left":
start_pos = (center_x + max_offset_x / 2, center_y)
end_pos = (center_x - max_offset_x / 2, center_y)
@@ -900,177 +1050,164 @@ def apply_kenburns_effect(clip, target_resolution, effect_type="random"):
start_pos = (center_x + max_offset_x / 2, center_y - max_offset_y / 2)
end_pos = (center_x - max_offset_x / 2, center_y + max_offset_y / 2)
-
# --- Define the transformation function for moviepy's fl ---
+ # This function operates on each frame (as a numpy array)
def transform_frame(get_frame, t):
- frame = get_frame(t) # Get the frame from the *zoomed* clip at time t
+ # get_frame(t) returns the frame of the *input* clip (zoomed_clip) at time t
+ frame = get_frame(t)
# Smooth interpolation (cosine ease-in-out)
- if clip.duration is None or clip.duration <= 0:
- ratio = 0
- else:
- ratio = t / clip.duration
+ ratio = t / zoomed_clip.duration if zoomed_clip.duration > 0 else 0
ratio = 0.5 - 0.5 * math.cos(math.pi * ratio) # Ease in/out
# Interpolate zoom and position
- current_zoom_factor = start_zoom_factor + (end_zoom_factor - start_zoom_factor) * ratio
+ current_visual_zoom = start_visual_zoom + (end_visual_zoom - start_visual_zoom) * ratio
current_center_x = start_pos[0] + (end_pos[0] - start_pos[0]) * ratio
current_center_y = start_pos[1] + (end_pos[1] - start_pos[1]) * ratio
- # Calculate the size of the crop window in the zoomed image coordinates
- # This needs to be target_w/h divided by the current zoom factor relative to the *original* target size
- # The base zoom is `zoom_scale`, current relative zoom is `current_zoom_factor`
- effective_zoom = zoom_scale * current_zoom_factor # This isn't quite right. Let's rethink.
-
- # --- Simpler approach: Define crop window size based on target ---
- # The frame we get *is* the zoomed frame. We need to crop *from* it.
- # The size of the window we cut *from the zoomed frame* needs to scale inversely with zoom? No.
-
- # Let's define the zoom based on the *final output size* relative to the *zoomed clip size*.
- # If zoom_factor is 1.0, we crop target_w x target_h.
- # If zoom_factor is < 1.0 (zoomed out), we crop a larger area and scale down.
- # If zoom_factor is > 1.0 (zoomed in), we crop a smaller area and scale up.
-
- # Let's redefine start/end zoom based on the final *visual* zoom level.
- # zoom_level = 1.0 means the final image fills the target resolution exactly.
- # zoom_level = 1.1 means the final image is zoomed in by 10%.
-
- start_visual_zoom = 1.0
- end_visual_zoom = 1.0
-
- if effect_type == "zoom-in":
- start_visual_zoom = 1.0
- end_visual_zoom = zoom_scale # Zoom in to the max pre-zoom
- elif effect_type == "zoom-out":
- start_visual_zoom = zoom_scale
- end_visual_zoom = 1.0
- # For pans, visual zoom stays constant at 1.0
-
- current_visual_zoom = start_visual_zoom + (end_visual_zoom - start_visual_zoom) * ratio
-
# Calculate crop window size based on the current visual zoom needed
+ # The crop window size should be the target size divided by the zoom factor
crop_w = int(target_w / current_visual_zoom)
crop_h = int(target_h / current_visual_zoom)
# Ensure the crop window isn't larger than the actual frame dimensions
crop_w = min(crop_w, zoomed_w)
crop_h = min(crop_h, zoomed_h)
+ # Ensure crop dimensions are positive
+ if crop_w <= 0 or crop_h <= 0:
+ print(f"Warning: Invalid crop dimensions ({crop_w}x{crop_h}) calculated in Ken Burns. Using target size.")
+ crop_w = min(target_w, zoomed_w)
+ crop_h = min(target_h, zoomed_h)
+
# Clamp the center position to prevent cropping outside the image bounds
+ # The center point is relative to the zoomed frame (zoomed_w, zoomed_h)
min_center_x = crop_w / 2
max_center_x = zoomed_w - crop_w / 2
min_center_y = crop_h / 2
max_center_y = zoomed_h - crop_h / 2
+ # Ensure max > min before clamping
+ if max_center_x < min_center_x: max_center_x = min_center_x
+ if max_center_y < min_center_y: max_center_y = min_center_y
+
clamped_center_x = max(min_center_x, min(current_center_x, max_center_x))
clamped_center_y = max(min_center_y, min(current_center_y, max_center_y))
# Use cv2.getRectSubPix for subpixel accuracy cropping
- # Input frame should be numpy array
if not isinstance(frame, np.ndarray):
- # This shouldn't happen if using ImageClip, but good check
print("Warning: Frame is not numpy array in Ken Burns transform.")
- return frame # Or handle conversion
+ # Try to convert? Risky. Return frame resized to target.
+ try: return cv2.resize(np.array(frame), (target_w, target_h)) # Attempt conversion
+ except: return np.zeros((target_h, target_w, 3), dtype=np.uint8) # Return black frame on failure
- # Ensure frame is contiguous C-style array if needed by cv2
- frame_contiguous = np.ascontiguousarray(frame)
+ # Ensure frame is contiguous C-style array if needed by cv2 (often helps)
+ if not frame.flags['C_CONTIGUOUS']:
+ frame = np.ascontiguousarray(frame)
try:
- cropped_frame = cv2.getRectSubPix(frame_contiguous, (crop_w, crop_h), (clamped_center_x, clamped_center_y))
- except cv2.error as e:
+ # Ensure crop dimensions are integers
+ crop_w_int, crop_h_int = int(round(crop_w)), int(round(crop_h))
+ if crop_w_int <= 0 or crop_h_int <= 0: raise ValueError("Crop dimensions must be positive")
+
+ cropped_frame = cv2.getRectSubPix(frame, (crop_w_int, crop_h_int), (clamped_center_x, clamped_center_y))
+
+ except (cv2.error, ValueError) as e:
print(f"Error during cv2.getRectSubPix: {e}")
- print(f" Frame shape: {frame_contiguous.shape}, dtype: {frame_contiguous.dtype}")
- print(f" Crop size: ({crop_w}, {crop_h})")
- print(f" Center: ({clamped_center_x}, {clamped_center_y})")
- # Fallback: return uncropped frame, maybe resized
- return cv2.resize(frame_contiguous, (target_w, target_h), interpolation=cv2.INTER_LINEAR)
+ print(f" Frame shape: {frame.shape}, dtype: {frame.dtype}, flags: {frame.flags}")
+ print(f" Req Crop size: ({crop_w:.2f}, {crop_h:.2f}), Int Crop: ({crop_w_int}, {crop_h_int})")
+ print(f" Req Center: ({current_center_x:.2f}, {current_center_y:.2f}), Clamped: ({clamped_center_x:.2f}, {clamped_center_y:.2f})")
+ # Fallback: return the original frame, resized to target
+ return cv2.resize(frame, (target_w, target_h), interpolation=cv2.INTER_LINEAR)
- # Resize the cropped frame to the target resolution
+ # Resize the cropped frame to the final target resolution
# Use LANCZOS4 for high quality resize
resized_frame = cv2.resize(cropped_frame, (target_w, target_h), interpolation=cv2.INTER_LANCZOS4)
return resized_frame
# Apply the transformation using moviepy's fl method
- # ismask=False indicates we are transforming the color channels
- # apply_to=['mask'] would apply only to mask if needed
- return clip.fl(transform_frame, apply_to='mask') if clip.ismask else clip.fl(transform_frame)
+ # Apply to mask if the original clip had one
+ final_clip = zoomed_clip.fl(transform_frame, apply_to=['mask'] if zoomed_clip.ismask else [])
+ # Set the duration explicitly as fl might mess it up
+ final_clip = final_clip.set_duration(clip.duration)
+ return final_clip
except Exception as e:
print(f"Error applying Ken Burns effect: {e}")
- # Return the original clip (possibly resized to fill initially) if effect fails
- return resize_to_fill(clip, target_resolution) # Fallback to simple resize/crop
+ traceback.print_exc()
+ # Fallback: Return the original clip, resized to fill target
+ print("Falling back to simple resize_to_fill.")
+ try:
+ return resize_to_fill(clip, target_resolution)
+ except Exception as e_resize:
+ print(f"Fallback resize_to_fill also failed: {e_resize}")
+ return clip # Return original as last resort
def resize_to_fill(clip, target_resolution):
"""Resize and crop a clip (video or image) to fill the target resolution, maintaining aspect ratio."""
try:
target_w, target_h = target_resolution
- if not hasattr(clip, 'size') or clip.size is None or clip.w is None or clip.h is None or clip.w == 0 or clip.h == 0:
+ if not hasattr(clip, 'size') or clip.size is None or not all(isinstance(d, (int, float)) and d > 0 for d in clip.size):
print(f"Error: Cannot resize clip with invalid dimensions: size={getattr(clip, 'size', 'N/A')}")
- # Return a dummy clip or raise error? Let's return None to signal failure.
- # Or maybe return a black clip of target size?
- # For now, let's try to return the original clip, maybe it recovers later.
- print("Returning original clip due to resize error.")
- return clip
+ return clip # Return original clip if dimensions are bad
clip_w, clip_h = clip.w, clip.h
+ if clip_w == 0 or clip_h == 0:
+ print(f"Error: Cannot resize clip with zero dimensions: {clip_w}x{clip_h}")
+ return clip
+
clip_aspect = clip_w / clip_h
target_aspect = target_w / target_h
- if clip_aspect > target_aspect:
+ if abs(clip_aspect - target_aspect) < 0.01:
+ # Aspect ratios are close enough, just resize
+ print(f"Resizing clip directly to {target_w}x{target_h}")
+ resized_clip = clip.resize(newsize=(target_w, target_h))
+ elif clip_aspect > target_aspect:
# Clip is wider than target: Resize based on height, crop width
- new_h = target_h
- scale_factor = new_h / clip_h
- new_w = int(clip_w * scale_factor)
- clip_resized = clip.resize(height=new_h) # Moviepy handles width automatically if height is set
+ print(f"Resizing clip to height {target_h}, cropping width.")
+ resized_clip = clip.resize(height=target_h) # Moviepy calculates width
+ # Ensure dimensions are updated after resize
+ if resized_clip.w is None or resized_clip.h is None: raise ValueError("Resize failed to update dimensions")
- # Calculate cropping amounts (ensure they are integers)
- crop_amount = int((new_w - target_w) / 2)
+ crop_amount = (resized_clip.w - target_w) / 2
if crop_amount < 0: crop_amount = 0 # Avoid negative crop
- # Ensure crop doesn't exceed bounds
+ # Use moviepy's crop method (x1, y1, x2, y2)
x1 = crop_amount
- x2 = new_w - crop_amount
- # Adjust if rounding caused issues
- if x2 > new_w: x2 = new_w
- if x1 >= x2: x1 = 0; x2 = target_w # Fallback if crop is invalid
+ x2 = resized_clip.w - crop_amount
+ resized_clip = resized_clip.crop(x1=x1, y1=0, x2=x2, y2=resized_clip.h)
- clip_cropped = clip_resized.crop(x1=x1, width=target_w, y1=0, height=target_h) # Use width/height args for crop
-
- elif clip_aspect < target_aspect:
+ else: # clip_aspect < target_aspect
# Clip is taller than target: Resize based on width, crop height
- new_w = target_w
- scale_factor = new_w / clip_w
- new_h = int(clip_h * scale_factor)
- clip_resized = clip.resize(width=new_w) # Moviepy handles height automatically
+ print(f"Resizing clip to width {target_w}, cropping height.")
+ resized_clip = clip.resize(width=target_w) # Moviepy calculates height
+ if resized_clip.w is None or resized_clip.h is None: raise ValueError("Resize failed to update dimensions")
- crop_amount = int((new_h - target_h) / 2)
+ crop_amount = (resized_clip.h - target_h) / 2
if crop_amount < 0: crop_amount = 0
y1 = crop_amount
- y2 = new_h - crop_amount
- if y2 > new_h: y2 = new_h
- if y1 >= y2: y1 = 0; y2 = target_h
-
- clip_cropped = clip_resized.crop(y1=y1, height=target_h, x1=0, width=target_w) # Use width/height args for crop
- else:
- # Aspect ratios match: Just resize
- clip_cropped = clip.resize(newsize=(target_w, target_h))
+ y2 = resized_clip.h - crop_amount
+ resized_clip = resized_clip.crop(x1=0, y1=y1, x2=resized_clip.w, y2=y2)
- # Final check on dimensions
- if clip_cropped.w != target_w or clip_cropped.h != target_h:
- print(f"Warning: resize_to_fill resulted in unexpected dimensions ({clip_cropped.w}x{clip_cropped.h}). Attempting final resize.")
- return clip_cropped.resize(newsize=(target_w, target_h))
+ # Final check and resize if dimensions are slightly off due to rounding
+ if resized_clip.w != target_w or resized_clip.h != target_h:
+ print(f"Warning: resize_to_fill resulted in dimensions {resized_clip.w}x{resized_clip.h}. Forcing final resize to {target_w}x{target_h}.")
+ resized_clip = resized_clip.resize(newsize=(target_w, target_h))
- return clip_cropped
+ return resized_clip
except Exception as e:
print(f"Error in resize_to_fill: {e}")
print(f"Clip info: duration={getattr(clip, 'duration', 'N/A')}, size={getattr(clip, 'size', 'N/A')}")
+ traceback.print_exc()
# Fallback: Try a simple resize without cropping if complex logic failed
try:
+ print("Attempting simple fallback resize.")
return clip.resize(newsize=target_resolution)
except Exception as e_resize:
print(f"Fallback resize also failed: {e_resize}")
@@ -1079,141 +1216,193 @@ def resize_to_fill(clip, target_resolution):
def find_mp3_files():
- """Search for any MP3 files in the current directory and subdirectories."""
+ """Search for any MP3 files in the current directory and subdirectories (DEPRECATED)."""
# This function is no longer used as music is uploaded via Gradio and copied to "music.mp3"
- # Keeping it here for potential future use or reference.
- mp3_files = []
- try:
- for root, dirs, files in os.walk('.'):
- for file in files:
- if file.lower().endswith('.mp3'):
- mp3_path = os.path.join(root, file)
- mp3_files.append(mp3_path)
- print(f"Found MP3 file: {mp3_path}")
- return mp3_files[0] if mp3_files else None
- except Exception as e:
- print(f"Error searching for MP3 files: {e}")
- return None
+ print("Warning: find_mp3_files() is deprecated. Music should be uploaded via interface.")
+ return None
def add_background_music(final_video, bg_music_volume=0.10):
"""Add background music using 'music.mp3' if it exists."""
try:
# Expect the music file to be named 'music.mp3' in the current directory
bg_music_path = "music.mp3"
- if os.path.exists(bg_music_path) and os.path.getsize(bg_music_path) > 100:
+ if os.path.exists(bg_music_path) and os.path.getsize(bg_music_path) > 1000: # Check > 1KB
print(f"Adding background music from: {bg_music_path}")
- bg_music = AudioFileClip(bg_music_path)
- # Ensure video has audio track to mix with
+ # Load background music
+ try:
+ bg_music = AudioFileClip(bg_music_path)
+ except Exception as e_load:
+ print(f"Error loading background music file '{bg_music_path}': {e_load}")
+ print("Skipping background music.")
+ return final_video # Return original video
+
+ # Ensure video has audio track to mix with, or create silent track
if final_video.audio is None:
- print("Warning: Video has no primary audio track. Adding only background music.")
- # Create silent audio matching video duration if needed
- if bg_music.duration < final_video.duration:
- loops_needed = math.ceil(final_video.duration / bg_music.duration)
- bg_music = concatenate_audioclips([bg_music] * loops_needed)
- final_audio = bg_music.subclip(0, final_video.duration).volumex(bg_music_volume)
+ print("Video has no primary audio track. Creating silent track.")
+ # Create silent audio matching video duration
+ silent_audio = AudioSegment.silent(duration=int(final_video.duration * 1000)) # pydub uses ms
+ silent_audio_path = os.path.join(TEMP_FOLDER, "silent_for_bg.wav")
+ silent_audio.export(silent_audio_path, format="wav")
+ video_audio = AudioFileClip(silent_audio_path)
+ final_video = final_video.set_audio(video_audio) # Add silent track
else:
- # Loop or trim background music to match video duration
- if bg_music.duration < final_video.duration:
- loops_needed = math.ceil(final_video.duration / bg_music.duration)
- # Check if looping is feasible
- if loops_needed > 100: # Avoid excessive looping
- print(f"Warning: Background music is very short ({bg_music.duration:.1f}s) compared to video ({final_video.duration:.1f}s). Looping capped.")
- loops_needed = 100
- bg_segments = [bg_music] * int(loops_needed)
- try:
- bg_music_looped = concatenate_audioclips(bg_segments)
- except Exception as e_concat:
- print(f"Error concatenating audio for looping: {e_concat}. Using single instance.")
- bg_music_looped = bg_music # Fallback to single instance
- bg_music = bg_music_looped
-
- # Trim precisely to video duration
- bg_music = bg_music.subclip(0, final_video.duration)
-
- # Apply volume adjustment
- bg_music = bg_music.volumex(bg_music_volume)
-
- # Mix audio tracks
video_audio = final_video.audio
- # Ensure both clips have the same duration before compositing
- if abs(video_audio.duration - bg_music.duration) > 0.1:
- print(f"Warning: Audio duration mismatch before mixing (Vid: {video_audio.duration:.2f}s, BG: {bg_music.duration:.2f}s). Adjusting BG music.")
- bg_music = bg_music.set_duration(video_audio.duration)
- mixed_audio = CompositeAudioClip([video_audio, bg_music])
- final_audio = mixed_audio
+
+ # Loop or trim background music to match video duration
+ if bg_music.duration < final_video.duration:
+ loops_needed = math.ceil(final_video.duration / bg_music.duration)
+ print(f"Looping background music {loops_needed} times.")
+ # Avoid excessive looping for very short music
+ if loops_needed > 50:
+ print(f"Warning: Background music is very short ({bg_music.duration:.1f}s) compared to video ({final_video.duration:.1f}s). Looping capped at 50.")
+ loops_needed = 50
+
+ # Check if concatenate_audioclips is available and works
+ try:
+ bg_segments = [bg_music] * int(loops_needed)
+ bg_music_looped = concatenate_audioclips(bg_segments)
+ bg_music = bg_music_looped
+ except Exception as e_concat:
+ print(f"Error concatenating audio for looping: {e_concat}. Using single instance of BG music.")
+ # Fallback: use only the first instance, trimmed later
+
+ # Trim precisely to video duration
+ bg_music = bg_music.subclip(0, final_video.duration)
+
+ # Apply volume adjustment
+ bg_music = bg_music.volumex(bg_music_volume)
+
+ # Mix audio tracks using CompositeAudioClip
+ # Ensure both clips have the same duration before compositing for safety
+ if abs(video_audio.duration - bg_music.duration) > 0.1:
+ print(f"Warning: Audio duration mismatch before mixing (Vid: {video_audio.duration:.2f}s, BG: {bg_music.duration:.2f}s). Adjusting BG music duration.")
+ bg_music = bg_music.set_duration(video_audio.duration)
+
+ # Create the composite audio clip
+ mixed_audio = CompositeAudioClip([video_audio, bg_music])
# Set the composite audio to the video
- final_video = final_video.set_audio(final_audio)
+ final_video = final_video.set_audio(mixed_audio)
print(f"Background music added successfully (Volume: {bg_music_volume:.2f})")
+
+ # Close the original bg music clip handle if possible
+ bg_music.close()
+ if video_audio != final_video.audio: # Close intermediate silent track if created
+ video_audio.close()
+
+
else:
- print("Background music file 'music.mp3' not found or is empty. Skipping background music.")
+ if not os.path.exists(bg_music_path):
+ print("Background music file 'music.mp3' not found. Skipping background music.")
+ else:
+ print(f"Background music file 'music.mp3' found but is too small ({os.path.getsize(bg_music_path)} bytes). Skipping.")
return final_video
except Exception as e:
print(f"Error adding background music: {e}")
+ traceback.print_exc()
print("Continuing without background music.")
- # Return the video without the potentially failed audio modification
- return final_video.set_audio(final_video.audio) # Ensure audio is reset if it failed mid-process
+ # Return the video, ensuring it has *some* audio track if possible
+ if final_video.audio is None:
+ # Try adding silence if no audio track exists
+ try:
+ print("Adding silent track as fallback after BG music error.")
+ silent_audio = AudioSegment.silent(duration=int(final_video.duration * 1000))
+ silent_audio_path = os.path.join(TEMP_FOLDER, "silent_fallback.wav")
+ silent_audio.export(silent_audio_path, format="wav")
+ final_video = final_video.set_audio(AudioFileClip(silent_audio_path))
+ except Exception as silent_err:
+ print(f"Failed to add silent fallback audio: {silent_err}")
+ return final_video
-# --- NEW create_clip Function ---
+# --- create_clip Function (Incorporating Subtitle Logic) ---
def create_clip(media_path, asset_type, tts_path, duration=None, effects=None, narration_text=None, segment_index=0):
"""Create a video clip with synchronized subtitles and narration."""
+ clip_start_time = time.time()
+ print(f"--- Creating Clip #{segment_index+1} ---")
+ print(f" Media: {asset_type} at {os.path.basename(media_path)}")
+ print(f" TTS: {os.path.basename(tts_path)}")
+ print(f" Narration: '{narration_text[:60]}...'")
+
+ # Input validation
+ if not media_path or not os.path.exists(media_path) or os.path.getsize(media_path) < 100:
+ print(f"ERROR: Invalid or missing media file: {media_path}")
+ return None
+ if not tts_path or not os.path.exists(tts_path) or os.path.getsize(tts_path) < 100:
+ print(f"ERROR: Invalid or missing TTS file: {tts_path}")
+ # Attempt to use silent audio as fallback?
+ print("Attempting to generate silent audio as fallback for missing TTS.")
+ # Use the estimated duration from parse_script if available
+ fallback_duration = duration if duration and duration > 0 else 3.0
+ tts_path = generate_silent_audio(fallback_duration)
+ if not tts_path:
+ print("ERROR: Failed to generate fallback silent audio. Cannot create clip.")
+ return None # Critical failure if no audio
+
+ # Load audio first to get accurate duration
+ audio_clip = None
+ audio_duration = 0.0
try:
- print(f"--- Creating Clip #{segment_index+1} ---")
- print(f" Media: {asset_type} at {os.path.basename(media_path)}")
- print(f" TTS: {os.path.basename(tts_path)}")
- print(f" Narration: '{narration_text[:50]}...'")
+ audio_clip = AudioFileClip(tts_path)
+ # Apply slight fade out to prevent abrupt cuts
+ audio_clip = audio_clip.audio_fadeout(0.1)
+ audio_duration = audio_clip.duration
+ if audio_duration <= 0.1: # Check for very short/empty audio
+ print(f"Warning: Audio duration is very short ({audio_duration:.2f}s). Using minimum 1s.")
+ audio_duration = 1.0 # Ensure at least 1s duration for visuals
+ # Adjust audio clip duration if needed (might not be necessary if silence was generated)
+ if audio_clip.duration < 1.0:
+ audio_clip = audio_clip.set_duration(1.0) # Stretch silence? Risky.
+ # Better: Regenerate silence if original was too short
+ print("Regenerating 1s silent audio.")
+ new_silent_path = generate_silent_audio(1.0)
+ if new_silent_path:
+ audio_clip.close() # Close old clip
+ audio_clip = AudioFileClip(new_silent_path)
+ else:
+ print("Error: Failed to regenerate silent audio. Clip might be very short.")
- if not media_path or not os.path.exists(media_path) or os.path.getsize(media_path) < 100:
- print(f"Error: Invalid or missing media file: {media_path}")
- return None
- if not tts_path or not os.path.exists(tts_path) or os.path.getsize(tts_path) < 100:
- print(f"Error: Invalid or missing TTS file: {tts_path}")
- # Attempt to use silent audio as fallback?
- print("Attempting to generate silent audio as fallback.")
- # Use the estimated duration from parse_script if available
- fallback_duration = duration if duration else 3.0
- tts_path = generate_silent_audio(fallback_duration)
- if not tts_path:
- print("Error: Failed to generate fallback silent audio. Cannot create clip.")
- return None # Critical failure if no audio
-
- # Load audio first to get accurate duration
+
+ except Exception as e:
+ print(f"Error loading audio file {tts_path}: {e}")
+ print("Using estimated duration and generating silence.")
+ audio_duration = duration if duration and duration > 0 else 3.0
+ silent_audio_path = generate_silent_audio(audio_duration)
+ if not silent_audio_path:
+ print("Error: Failed to generate fallback silent audio after load error.")
+ return None # Cannot proceed without audio
try:
- audio_clip = AudioFileClip(tts_path)
- # Apply slight fade out to prevent abrupt cuts
- audio_clip = audio_clip.audio_fadeout(0.1)
- audio_duration = audio_clip.duration
- if audio_duration <= 0.1: # Check for very short/empty audio
- print(f"Warning: Audio duration is very short ({audio_duration:.2f}s). Adjusting target duration.")
- audio_duration = max(audio_duration, 1.0) # Ensure at least 1s duration
- except Exception as e:
- print(f"Error loading audio file {tts_path}: {e}")
- print("Using estimated duration and generating silence.")
- audio_duration = duration if duration else 3.0
- silent_audio_path = generate_silent_audio(audio_duration)
- if not silent_audio_path: return None # Cannot proceed without audio
- audio_clip = AudioFileClip(silent_audio_path)
+ audio_clip = AudioFileClip(silent_audio_path)
+ audio_duration = audio_clip.duration # Get duration from generated silence
+ except Exception as e_silent:
+ print(f"Error loading generated silent audio {silent_audio_path}: {e_silent}")
+ return None
- # Add a small buffer to the target duration for visuals
- target_duration = audio_duration + 0.2 # e.g., 0.2s buffer
+ # Add a small buffer to the target duration for visuals
+ target_duration = audio_duration + 0.2 # e.g., 0.2s buffer
- print(f" Audio Duration: {audio_duration:.2f}s, Target Visual Duration: {target_duration:.2f}s")
+ print(f" Audio Duration: {audio_duration:.2f}s, Target Visual Duration: {target_duration:.2f}s")
- # Create base visual clip (video or image)
- clip = None
+ # Create base visual clip (video or image)
+ clip = None
+ temp_img_path = None # To track temporary converted images
+ try:
if asset_type == "video":
try:
- clip = VideoFileClip(media_path)
+ clip = VideoFileClip(media_path, target_resolution=TARGET_RESOLUTION[:2]) # Request target size on load if possible
# Ensure video duration is sufficient, loop/subclip as needed
if clip.duration < target_duration:
print(f" Looping video (duration {clip.duration:.2f}s) to match target {target_duration:.2f}s")
- # Use loop method carefully, might cause issues if duration is very short
- # Alternative: freeze last frame? For now, loop.
- clip = clip.loop(duration=target_duration)
+ # Check if loop is feasible
+ if clip.duration > 0.1: # Avoid looping near-zero duration clips
+ clip = clip.loop(duration=target_duration)
+ else:
+ print("Warning: Video duration too short to loop effectively. Freezing last frame.")
+ clip = clip.to_ImageClip(t=clip.duration - 0.01 if clip.duration > 0.01 else 0).set_duration(target_duration)
+ asset_type = "image" # Treat as image now for Ken Burns etc.
else:
# Start from beginning, take required duration
clip = clip.subclip(0, target_duration)
@@ -1225,26 +1414,47 @@ def create_clip(media_path, asset_type, tts_path, duration=None, effects=None, n
except Exception as e:
print(f"Error processing video file {media_path}: {e}")
+ traceback.print_exc()
# Fallback to generating a color background if video fails
- fallback_media = generate_media("abstract", current_index=segment_index, total_segments=0) # Use a simple fallback
- if fallback_media and fallback_media['asset_type'] == 'image':
+ fallback_media = generate_media("abstract color", current_index=segment_index, total_segments=0) # Use a simple fallback
+ if fallback_media and fallback_media.get('path'):
print("Falling back to generated image due to video error.")
asset_type = 'image'
media_path = fallback_media['path']
+ # Now process this as an image in the next block
else:
print("ERROR: Video processing failed, and fallback media generation failed.")
+ if audio_clip: audio_clip.close()
return None # Cannot proceed
# This needs to handle the case where video processing failed and fell back to image
+ # Or if it was an image from the start
if asset_type == "image":
try:
- # Check image validity again before creating ImageClip
- img = Image.open(media_path)
- img.verify()
- img.close() # Close after verify
+ # Validate image before creating ImageClip
+ try:
+ img = Image.open(media_path)
+ img.verify()
+ # Reopen after verify
+ img = Image.open(media_path)
+ # Convert to RGB if needed (ensure compatibility)
+ if img.mode != 'RGB':
+ print(f"Converting image {os.path.basename(media_path)} from {img.mode} to RGB.")
+ # Save to a new temp file to avoid modifying original download
+ temp_img_path = os.path.join(TEMP_FOLDER, f"converted_{segment_index}.jpg")
+ img.convert('RGB').save(temp_img_path, quality=90)
+ img.close()
+ media_path_for_clip = temp_img_path
+ else:
+ img.close()
+ media_path_for_clip = media_path # Use original if already RGB
+ except Exception as img_err:
+ print(f"Error validating/converting image {media_path}: {img_err}")
+ if audio_clip: audio_clip.close()
+ return None # Fail if image is invalid
# Create ImageClip and set duration
- clip = ImageClip(media_path).set_duration(target_duration)
+ clip = ImageClip(media_path_for_clip).set_duration(target_duration)
# Apply Ken Burns effect (which includes resizing)
clip = apply_kenburns_effect(clip, TARGET_RESOLUTION, effect_type=effects or "random") # Use specified or random effect
@@ -1254,10 +1464,13 @@ def create_clip(media_path, asset_type, tts_path, duration=None, effects=None, n
except Exception as e:
print(f"Error processing image file {media_path}: {e}")
+ traceback.print_exc()
+ if audio_clip: audio_clip.close()
return None # Fail if image processing has critical error
if clip is None:
print("Error: Visual clip (video or image) could not be created.")
+ if audio_clip: audio_clip.close()
return None
# --- SUBTITLE GENERATION START ---
@@ -1266,89 +1479,100 @@ def create_clip(media_path, asset_type, tts_path, duration=None, effects=None, n
subtitle_clips = []
try:
words = narration_text.split()
- # Dynamic chunking: aim for ~3-7 words per chunk, max ~3 seconds per chunk
- max_words_per_chunk = 6
- min_words_per_chunk = 3
- max_duration_per_chunk = 3.0 # seconds
-
- chunks_data = [] # Stores (text, start_time, end_time)
- current_chunk_words = []
- current_chunk_start_time = 0.0
- approx_time_per_word = audio_duration / len(words) if words else 0
-
- for i, word in enumerate(words):
- current_chunk_words.append(word)
- current_word_end_time = current_chunk_start_time + len(current_chunk_words) * approx_time_per_word
-
- # Check if chunk should end
- time_condition = (current_word_end_time - current_chunk_start_time) >= max_duration_per_chunk
- word_count_condition = len(current_chunk_words) >= max_words_per_chunk
- is_last_word = (i == len(words) - 1)
-
- # End chunk if time/word limit reached, or if it's the last word
- # Ensure minimum word count unless it's the last segment
- if ( (time_condition or word_count_condition) and len(current_chunk_words) >= min_words_per_chunk ) or is_last_word:
- chunk_text = ' '.join(current_chunk_words)
- # Ensure end time doesn't exceed total audio duration
- chunk_end_time = min(current_word_end_time, audio_duration)
- # Prevent zero-duration chunks
- if chunk_end_time > current_chunk_start_time:
- chunks_data.append((chunk_text, current_chunk_start_time, chunk_end_time))
- # Prepare for next chunk
- current_chunk_start_time = chunk_end_time
- current_chunk_words = []
-
- # If loop finished but last chunk wasn't added (e.g., few words left)
- if current_chunk_words:
- chunk_text = ' '.join(current_chunk_words)
- chunk_end_time = audio_duration # Last chunk goes to the end
- if chunk_end_time > current_chunk_start_time:
- chunks_data.append((chunk_text, current_chunk_start_time, chunk_end_time))
-
-
- # Calculate subtitle position (e.g., 80% down the screen)
- subtitle_y_position = int(TARGET_RESOLUTION[1] * 0.80) # Lower position
-
- # Create TextClip for each chunk
- for chunk_text, start_time, end_time in chunks_data:
- # Ensure duration is positive
- chunk_duration = end_time - start_time
- if chunk_duration <= 0.05: # Skip tiny duration chunks
- continue
-
- try:
- # Use global font_size here
- txt_clip = TextClip(
- txt=chunk_text,
- fontsize=font_size, # Use global variable
- font='Arial-Bold', # Consider making font configurable?
- color=CAPTION_COLOR,
- bg_color='rgba(0, 0, 0, 0.4)', # Slightly darker background
- method='caption', # Handles word wrapping
- align='center',
- stroke_color='black', # Black stroke for better contrast
- stroke_width=1.5,
- # Adjust size: 85% of width, height automatic
- size=(TARGET_RESOLUTION[0] * 0.85, None)
- ).set_start(start_time).set_duration(chunk_duration) # Use duration
-
- # Position the text clip
- txt_clip = txt_clip.set_position(('center', subtitle_y_position))
- subtitle_clips.append(txt_clip)
- except Exception as e_textclip:
- # Handle potential errors from TextClip generation (e.g., font not found)
- print(f"Error creating TextClip for chunk '{chunk_text}': {e_textclip}")
- # Optionally add a fallback simple text clip here if needed
-
- # Overlay the list of subtitle clips onto the main video/image clip
- if subtitle_clips:
- clip = CompositeVideoClip([clip] + subtitle_clips)
+ if not words:
+ print("Warning: Narration text has no words, skipping captions.")
else:
- print("Warning: No subtitle clips were generated despite text being present.")
+ # Dynamic chunking: aim for ~3-7 words per chunk, max ~3 seconds per chunk
+ max_words_per_chunk = 6
+ min_words_per_chunk = 2 # Allow slightly shorter chunks
+ max_duration_per_chunk = 3.5 # Allow slightly longer chunks
+
+ chunks_data = [] # Stores (text, start_time, end_time)
+ current_chunk_words = []
+ current_chunk_start_time = 0.0
+ approx_time_per_word = audio_duration / len(words) if len(words) > 0 else 0
+
+ for i, word in enumerate(words):
+ current_chunk_words.append(word)
+ # Estimate end time based on word count and average time per word
+ # This is approximate; actual speech timing varies.
+ current_word_end_time = current_chunk_start_time + len(current_chunk_words) * approx_time_per_word
+
+ # Check conditions to end the current chunk
+ time_limit_reached = (current_word_end_time - current_chunk_start_time) >= max_duration_per_chunk
+ word_limit_reached = len(current_chunk_words) >= max_words_per_chunk
+ is_last_word = (i == len(words) - 1)
+ min_words_met = len(current_chunk_words) >= min_words_per_chunk
+
+ # End chunk if:
+ # - It's the last word OR
+ # - Word/time limit reached AND minimum words met
+ if is_last_word or ((time_limit_reached or word_limit_reached) and min_words_met):
+ chunk_text = ' '.join(current_chunk_words)
+ # Ensure end time doesn't exceed total audio duration
+ chunk_end_time = min(current_word_end_time, audio_duration)
+ # Prevent zero-duration or overlapping chunks
+ if chunk_end_time > current_chunk_start_time + 0.05: # Min duration 50ms
+ chunks_data.append((chunk_text, current_chunk_start_time, chunk_end_time))
+ # Prepare for next chunk
+ current_chunk_start_time = chunk_end_time
+ current_chunk_words = []
+ else:
+ # If chunk is too short, try adding the next word (unless it was the last)
+ if not is_last_word:
+ print(f"Skipping very short subtitle chunk: '{chunk_text}'")
+ # Reset start time for next chunk if we skipped
+ current_chunk_start_time = chunk_end_time
+ current_chunk_words = []
+
+
+ # Calculate subtitle position (e.g., 85% down the screen)
+ subtitle_y_position = int(TARGET_RESOLUTION[1] * 0.85) # Lower position
+
+ # Create TextClip for each valid chunk
+ for chunk_text, start_time, end_time in chunks_data:
+ chunk_duration = end_time - start_time
+ if chunk_duration <= 0.05: continue # Skip tiny duration chunks
+
+ try:
+ # Use global font_size here
+ # Ensure font is available or handle gracefully
+ font_name = 'Arial-Bold' # Check if this font exists on the system
+ # font_name = 'Liberation-Sans-Bold' # Common Linux alternative
+ txt_clip = TextClip(
+ txt=chunk_text,
+ fontsize=font_size, # Use global variable
+ font=font_name,
+ color=CAPTION_COLOR,
+ bg_color='rgba(0, 0, 0, 0.5)', # Slightly darker background
+ method='caption', # Handles word wrapping
+ align='center',
+ stroke_color='black', # Black stroke for better contrast
+ stroke_width=1.5,
+ # Adjust size: 85% of width, height automatic
+ size=(TARGET_RESOLUTION[0] * 0.85, None)
+ ).set_start(start_time).set_duration(chunk_duration) # Use duration
+
+ # Position the text clip
+ txt_clip = txt_clip.set_position(('center', subtitle_y_position))
+ subtitle_clips.append(txt_clip)
+ except Exception as e_textclip:
+ # Handle potential errors from TextClip generation (e.g., font not found)
+ print(f"ERROR creating TextClip for chunk '{chunk_text}': {e_textclip}")
+ print("Check if ImageMagick is installed and configured, and if the font is available.")
+ # Fallback to simple text? Or skip this chunk? Skipping for now.
+
+ # Overlay the list of subtitle clips onto the main video/image clip
+ if subtitle_clips:
+ clip = CompositeVideoClip([clip] + subtitle_clips)
+ print(f" Added {len(subtitle_clips)} subtitle chunks.")
+ else:
+ print("Warning: No subtitle clips were generated despite text being present.")
except Exception as sub_error:
# Fallback: If complex chunking/styling fails, display the whole text simply
- print(f"Subtitle generation error: {sub_error}. Using fallback simple text.")
+ print(f"ERROR during subtitle generation: {sub_error}. Using fallback simple text.")
+ traceback.print_exc()
try:
txt_clip = TextClip(
narration_text,
@@ -1357,38 +1581,55 @@ def create_clip(media_path, asset_type, tts_path, duration=None, effects=None, n
font='Arial', # Simpler font for fallback
align='center',
method='caption',
- bg_color='rgba(0, 0, 0, 0.4)',
+ bg_color='rgba(0, 0, 0, 0.5)',
size=(TARGET_RESOLUTION[0] * 0.8, None) # Max width
).set_position(('center', subtitle_y_position)).set_duration(clip.duration) # Show for full clip duration
# Overlay the single fallback text clip
clip = CompositeVideoClip([clip, txt_clip])
except Exception as e_fallback_text:
- print(f"Error creating fallback TextClip: {e_fallback_text}")
+ print(f"ERROR creating fallback TextClip: {e_fallback_text}")
# Proceed without captions if fallback also fails
# --- SUBTITLE GENERATION END ---
# Set the audio track to the final clip
- clip = clip.set_audio(audio_clip)
+ if audio_clip:
+ clip = clip.set_audio(audio_clip)
+ else:
+ print("Error: No valid audio_clip available to set.")
+ # Clip will be silent, which might be acceptable if TTS failed utterly.
+
- # Final duration check/adjustment (optional but good practice)
+ # Final duration check/adjustment (important after compositing)
+ # Set duration based on the *audio* clip's duration + buffer, as visuals might be longer
if abs(clip.duration - target_duration) > 0.1:
- print(f"Warning: Final clip duration ({clip.duration:.2f}s) differs from target ({target_duration:.2f}s). Adjusting.")
+ print(f"Warning: Final clip duration ({clip.duration:.2f}s) differs significantly from target ({target_duration:.2f}s). Forcing duration.")
clip = clip.set_duration(target_duration)
+ clip_creation_duration = time.time() - clip_start_time
+ print(f"--- Clip #{segment_index+1} created successfully (Duration: {clip.duration:.2f}s) [Took {clip_creation_duration:.1f}s] ---")
+
+ # Clean up temporary converted image file if created
+ if temp_img_path and os.path.exists(temp_img_path):
+ try: os.remove(temp_img_path)
+ except OSError: pass
- print(f"--- Clip #{segment_index+1} created successfully (Duration: {clip.duration:.2f}s) ---")
return clip
except Exception as e:
print(f"*************** FATAL ERROR in create_clip (Segment {segment_index+1}) ***************")
- import traceback
traceback.print_exc() # Print detailed traceback
print(f"Error details: {str(e)}")
print(f" Media Path: {media_path}")
print(f" TTS Path: {tts_path}")
print(f" Asset Type: {asset_type}")
print("**************************************************************************")
+ # Clean up resources if possible
+ if 'clip' in locals() and clip is not None and hasattr(clip, 'close'): clip.close()
+ if audio_clip is not None and hasattr(audio_clip, 'close'): audio_clip.close()
+ if temp_img_path and os.path.exists(temp_img_path):
+ try: os.remove(temp_img_path)
+ except OSError: pass
return None # Return None on failure
@@ -1397,6 +1638,10 @@ def fix_imagemagick_policy():
# This is often needed for TextClip with complex features (backgrounds, strokes) on Colab/Linux.
# It might require sudo privileges.
policy_fixed = False
+ if os.name != 'posix': # Only run on Linux/macOS etc.
+ print("Skipping ImageMagick policy fix (not on POSIX system).")
+ return False
+
try:
print("Attempting to fix ImageMagick security policies (may require sudo)...")
# Common paths for ImageMagick policy files
@@ -1405,7 +1650,7 @@ def fix_imagemagick_policy():
"/etc/ImageMagick-7/policy.xml",
"/etc/ImageMagick/policy.xml",
"/usr/local/etc/ImageMagick-7/policy.xml",
- # Add other potential paths if needed
+ "/opt/homebrew/etc/ImageMagick-7/policy.xml", # macOS Homebrew path
]
found_policy = None
for path in policy_paths:
@@ -1418,37 +1663,70 @@ def fix_imagemagick_policy():
print("TextClip features might be limited if default policies are restrictive.")
return False # Indicate policy wasn't found/fixed
+ # Check if modification is needed (simple check for common restriction)
+ needs_fix = False
+ try:
+ with open(found_policy, 'r') as f:
+ content = f.read()
+ # Look for common restrictive patterns that TextClip might hit
+ if 'rights="none" pattern="LABEL"' in content or \
+ 'rights="none" pattern="caption"' in content or \
+ 'rights="none" pattern="TEXT"' in content or \
+ '' in content:
+ needs_fix = True
+ except Exception as read_err:
+ print(f"Could not read policy file {found_policy} to check if fix is needed: {read_err}")
+ needs_fix = True # Assume fix is needed if we can't read it
+
+ if not needs_fix:
+ print(f"Policy file {found_policy} seems okay or already modified. Skipping modification.")
+ return True # Assume it's okay
+
print(f"Found policy file: {found_policy}. Attempting to modify...")
# Commands to relax restrictions (use with caution)
# Backup the original file first
- backup_cmd = f"sudo cp {found_policy} {found_policy}.bak"
- # Allow read/write for common formats (including text/caption)
- sed_cmd_rights = f"sudo sed -i 's/rights=\"none\" pattern=\"PS\"/rights=\"read|write\" pattern=\"PS\"/' {found_policy}; " \
- f"sudo sed -i 's/rights=\"none\" pattern=\"EPS\"/rights=\"read|write\" pattern=\"EPS\"/' {found_policy}; " \
- f"sudo sed -i 's/rights=\"none\" pattern=\"PDF\"/rights=\"read|write\" pattern=\"PDF\"/' {found_policy}; " \
- f"sudo sed -i 's/rights=\"none\" pattern=\"XPS\"/rights=\"read|write\" pattern=\"XPS\"/' {found_policy}; " \
- f"sudo sed -i 's/rights=\"none\" pattern=\"LABEL\"/rights=\"read|write\" pattern=\"LABEL\"/' {found_policy}; " \
- f"sudo sed -i 's/rights=\"none\" pattern=\"caption\"/rights=\"read|write\" pattern=\"caption\"/' {found_policy}; " \
- f"sudo sed -i 's/rights=\"none\" pattern=\"TEXT\"/rights=\"read|write\" pattern=\"TEXT\"/' {found_policy}"
- # Allow read/write for path operations (needed for fonts, temp files)
- sed_cmd_path = f"sudo sed -i 's///' {found_policy}"
-
+ backup_cmd = f"sudo cp '{found_policy}' '{found_policy}.bak'"
+ # Allow read/write for formats used by TextClip and path operations
+ # Use simpler sed commands that are less likely to fail on different sed versions
+ sed_commands = [
+ f"sudo sed -i.bak 's/rights=\"none\" pattern=\"PS\"/rights=\"read|write\" pattern=\"PS\"/g' '{found_policy}'",
+ f"sudo sed -i.bak 's/rights=\"none\" pattern=\"EPS\"/rights=\"read|write\" pattern=\"EPS\"/g' '{found_policy}'",
+ f"sudo sed -i.bak 's/rights=\"none\" pattern=\"PDF\"/rights=\"read|write\" pattern=\"PDF\"/g' '{found_policy}'",
+ f"sudo sed -i.bak 's/rights=\"none\" pattern=\"XPS\"/rights=\"read|write\" pattern=\"XPS\"/g' '{found_policy}'",
+ f"sudo sed -i.bak 's/rights=\"none\" pattern=\"LABEL\"/rights=\"read|write\" pattern=\"LABEL\"/g' '{found_policy}'",
+ f"sudo sed -i.bak 's/rights=\"none\" pattern=\"caption\"/rights=\"read|write\" pattern=\"caption\"/g' '{found_policy}'",
+ f"sudo sed -i.bak 's/rights=\"none\" pattern=\"TEXT\"/rights=\"read|write\" pattern=\"TEXT\"/g' '{found_policy}'",
+ f"sudo sed -i.bak 's///g' '{found_policy}'" # Handle path policy
+ ]
print("Executing policy modification commands (requires sudo)...")
+ # Try backup first
+ print(f"Executing: {backup_cmd}")
backup_status = os.system(backup_cmd)
- if backup_status == 0:
- print("Policy file backed up.")
- rights_status = os.system(sed_cmd_rights)
- path_status = os.system(sed_cmd_path)
-
- if rights_status == 0 and path_status == 0:
- print("ImageMagick policies potentially updated successfully.")
- policy_fixed = True
- else:
- print("Error executing policy modification commands. Check sudo permissions and sed syntax.")
- else:
- print("Error backing up policy file. Aborting modifications.")
+
+ if backup_status != 0:
+ print(f"Warning: Failed to backup policy file (Exit code: {backup_status}). Proceeding without backup...")
+ # Modify sed commands to not create individual backups if main backup failed
+ sed_commands = [cmd.replace("-i.bak", "-i") for cmd in sed_commands]
+
+
+ # Execute sed commands one by one
+ all_sed_ok = True
+ for cmd in sed_commands:
+ print(f"Executing: {cmd}")
+ status = os.system(cmd)
+ if status != 0:
+ print(f"Warning: Sed command failed (Exit code: {status}). Policy might not be fully fixed.")
+ # Don't necessarily stop, maybe other commands worked
+ # all_sed_ok = False # Uncomment if any failure should mark the fix as failed
+
+ # Check the outcome loosely
+ # We can't be certain without parsing, but if commands ran without error codes, assume it worked.
+ # A more robust check would re-read the file.
+ print("ImageMagick policy modification commands executed.")
+ policy_fixed = True # Assume success if commands ran
+
# Optional: Restart services if needed (usually not required just for policy changes)
# os.system("sudo systemctl restart imagemagick") # Example
@@ -1457,6 +1735,7 @@ def fix_imagemagick_policy():
except Exception as e:
print(f"Error occurred during ImageMagick policy fix: {e}")
+ traceback.print_exc()
return False
@@ -1468,22 +1747,29 @@ def generate_video(user_input, resolution, caption_option):
start_time = time.time()
print("\n=============================================")
print("======= STARTING VIDEO GENERATION =======")
+ print(f" Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f" Concept: '{user_input[:100]}...'")
print(f" Resolution: {resolution}")
print(f" Captions: {caption_option}")
print(f" Voice: {selected_voice} (Speed: {voice_speed})")
print(f" BG Music Vol: {bg_music_volume}, FPS: {fps}, Preset: {preset}")
- print(f" Video Clip Prob: {video_clip_probability*100}%, Caption Size: {font_size}")
+ print(f" Video Clip Prob: {video_clip_probability*100:.0f}%, Caption Size: {font_size}")
print("=============================================\n")
- # --- Setup ---
- if not OPENROUTER_API_KEY or not PEXELS_API_KEY:
- print("ERROR: API keys (OpenRouter or Pexels) are missing!")
- # Gradio doesn't handle exceptions well, return None or error message?
- # For now, print and return None. Consider adding gr.Error later.
- return None
+ # --- Pre-checks ---
+ if not OPENROUTER_API_KEY or OPENROUTER_API_KEY == "YOUR_OPENROUTER_API_KEY_HERE":
+ print("FATAL ERROR: OpenRouter API Key is missing or still a placeholder!")
+ raise gr.Error("OpenRouter API Key is not configured. Please set it in the script or environment.")
+ if not PEXELS_API_KEY or PEXELS_API_KEY == "YOUR_PEXELS_API_KEY_HERE":
+ print("FATAL ERROR: Pexels API Key is missing or still a placeholder!")
+ raise gr.Error("Pexels API Key is not configured. Please set it in the script or environment.")
+ if pipeline is None:
+ print("FATAL ERROR: Kokoro TTS pipeline failed to initialize.")
+ raise gr.Error("TTS engine (Kokoro) failed to initialize. Cannot proceed.")
+
+ # --- Setup ---
# Set resolution
if resolution == "Full": # 16:9 Landscape
TARGET_RESOLUTION = (1920, 1080)
@@ -1498,11 +1784,17 @@ def generate_video(user_input, resolution, caption_option):
# Create a unique temporary folder for this run
try:
- TEMP_FOLDER = tempfile.mkdtemp()
+ # Use a more descriptive temp dir name if possible
+ base_temp_dir = os.path.join(os.getcwd(), "temp_video_gen")
+ os.makedirs(base_temp_dir, exist_ok=True)
+ TEMP_FOLDER = tempfile.mkdtemp(prefix=f"{time.strftime('%Y%m%d_%H%M%S')}_", dir=base_temp_dir)
print(f"Temporary folder created: {TEMP_FOLDER}")
except Exception as e:
print(f"FATAL ERROR: Could not create temporary folder: {e}")
- return None # Cannot proceed without temp folder
+ traceback.print_exc()
+ # Cannot proceed without temp folder
+ raise gr.Error(f"Failed to create temporary directory: {e}")
+
# Fix ImageMagick policy (important for captions)
fix_success = fix_imagemagick_policy()
@@ -1516,9 +1808,9 @@ def generate_video(user_input, resolution, caption_option):
script = generate_script(user_input)
if not script:
print("FATAL ERROR: Failed to generate script from API.")
- shutil.rmtree(TEMP_FOLDER) # Clean up temp folder on failure
- return None
- print("Generated Script:\n", script) # Print the full script for debugging
+ if TEMP_FOLDER and os.path.isdir(TEMP_FOLDER): shutil.rmtree(TEMP_FOLDER) # Clean up
+ raise gr.Error("Failed to generate script from API. Check logs and API key.")
+ print(f"Generated Script:\n{'-'*20}\n{script}\n{'-'*20}") # Print the full script for debugging
# --- End Script Generation ---
@@ -1527,37 +1819,44 @@ def generate_video(user_input, resolution, caption_option):
elements = parse_script(script)
if not elements:
print("FATAL ERROR: Failed to parse script into elements. Check script format and parsing logic.")
- shutil.rmtree(TEMP_FOLDER)
- return None
+ if TEMP_FOLDER and os.path.isdir(TEMP_FOLDER): shutil.rmtree(TEMP_FOLDER)
+ raise gr.Error("Failed to parse the generated script. Check script format and logs.")
num_segments = len(elements) // 2
print(f"Parsed {num_segments} script segments.")
if num_segments == 0:
print("Warning: Script parsed into 0 segments. No video will be generated.")
- shutil.rmtree(TEMP_FOLDER)
- return None
+ if TEMP_FOLDER and os.path.isdir(TEMP_FOLDER): shutil.rmtree(TEMP_FOLDER)
+ # Return None instead of raising error? Or show message?
+ return None # Indicate no video was created
# --- End Script Parsing ---
# --- Pair Elements (Media + TTS) ---
paired_elements = []
- for i in range(0, len(elements), 2):
- if i + 1 < len(elements) and elements[i]['type'] == 'media' and elements[i+1]['type'] == 'tts':
+ if len(elements) % 2 != 0:
+ print(f"Warning: Odd number of elements ({len(elements)}) after parsing. Last element might be ignored.")
+
+ for i in range(0, len(elements) - 1, 2): # Iterate up to second-to-last element
+ if elements[i]['type'] == 'media' and elements[i+1]['type'] == 'tts':
paired_elements.append((elements[i], elements[i + 1]))
else:
- print(f"Warning: Skipping invalid element pair at index {i}. Expected media then tts.")
+ print(f"Warning: Skipping invalid element pair at index {i}. Expected media then tts, got {elements[i]['type']} then {elements[i+1]['type']}.")
if not paired_elements:
print("FATAL ERROR: No valid media-tts pairs found after parsing.")
- shutil.rmtree(TEMP_FOLDER)
- return None
+ if TEMP_FOLDER and os.path.isdir(TEMP_FOLDER): shutil.rmtree(TEMP_FOLDER)
+ raise gr.Error("Script parsed, but no valid [Scene]-Narration pairs found.")
# --- End Pairing ---
# --- Clip Creation Loop ---
- print("\n--- Creating Individual Clips ---")
+ print(f"\n--- Creating {len(paired_elements)} Individual Clips ---")
clips = []
successful_clips = 0
+ clip_paths_to_clean = [] # Keep track of intermediate files for cleanup if needed
+
for idx, (media_elem, tts_elem) in enumerate(paired_elements):
+ segment_start_time = time.time()
print(f"\n>>> Processing Segment {idx+1}/{len(paired_elements)}: Prompt '{media_elem.get('prompt', 'N/A')}'")
# 1. Generate Media Asset
@@ -1569,16 +1868,14 @@ def generate_video(user_input, resolution, caption_option):
if not media_asset or not media_asset.get('path'):
print(f"ERROR: Failed to generate media for segment {idx+1}. Skipping segment.")
continue # Skip this segment
+ clip_paths_to_clean.append(media_asset['path']) # Add for potential cleanup
# 2. Generate TTS
tts_path = generate_tts(tts_elem['text'], tts_elem['voice'])
if not tts_path:
print(f"ERROR: Failed to generate TTS for segment {idx+1}. Skipping segment.")
- # Clean up the potentially downloaded media asset if TTS failed
- if media_asset and os.path.exists(media_asset['path']):
- try: os.remove(media_asset['path'])
- except OSError: pass
continue # Skip this segment
+ clip_paths_to_clean.append(tts_path) # Add for potential cleanup
# 3. Create the Clip (Visual + Audio + Subtitles)
clip = create_clip(
@@ -1593,18 +1890,17 @@ def generate_video(user_input, resolution, caption_option):
if clip:
# Validate clip duration and dimensions before adding
- if clip.duration > 0 and clip.w == TARGET_RESOLUTION[0] and clip.h == TARGET_RESOLUTION[1]:
+ if clip.duration > 0.1 and clip.w == TARGET_RESOLUTION[0] and clip.h == TARGET_RESOLUTION[1]:
clips.append(clip)
successful_clips += 1
- print(f">>> Segment {idx+1} processed successfully.")
+ segment_duration = time.time() - segment_start_time
+ print(f">>> Segment {idx+1} processed successfully. [Took {segment_duration:.1f}s]")
else:
- print(f"ERROR: Clip for segment {idx+1} has invalid duration ({clip.duration}) or dimensions ({clip.w}x{clip.h}). Skipping.")
+ print(f"ERROR: Clip for segment {idx+1} has invalid duration ({clip.duration:.2f}s) or dimensions ({clip.w}x{clip.h}). Expected {TARGET_RESOLUTION[0]}x{TARGET_RESOLUTION[1]}. Skipping.")
# Clean up resources associated with the failed clip
- clip.close() # Close moviepy resources if possible
- # Files in TEMP_FOLDER will be cleaned later, no need to delete individually here unless necessary
+ if hasattr(clip, 'close'): clip.close()
else:
print(f"ERROR: Clip creation failed for segment {idx+1}. See errors above.")
- # Files in TEMP_FOLDER will be cleaned later
# --- End Clip Creation Loop ---
@@ -1612,53 +1908,80 @@ def generate_video(user_input, resolution, caption_option):
# --- Final Video Assembly ---
if not clips:
print("\nFATAL ERROR: No clips were successfully created. Cannot generate video.")
- shutil.rmtree(TEMP_FOLDER)
- return None
+ if TEMP_FOLDER and os.path.isdir(TEMP_FOLDER): shutil.rmtree(TEMP_FOLDER)
+ raise gr.Error("Failed to create any video clips from the script segments.")
print(f"\n--- Assembling Final Video ({len(clips)} clips) ---")
+ final_video = None # Initialize to ensure cleanup happens
try:
# Concatenate clips
+ print("Concatenating clips...")
final_video = concatenate_videoclips(clips, method="compose") # 'compose' is generally safer
print(f"Clips concatenated. Total duration before music: {final_video.duration:.2f}s")
# Add background music
+ print("Adding background music (if provided)...")
final_video = add_background_music(final_video, bg_music_volume=bg_music_volume)
# Write the final video file
print(f"Exporting final video to '{OUTPUT_VIDEO_FILENAME}' (FPS: {fps}, Preset: {preset})...")
# Use threads based on CPU count? Maybe default is fine. logger='bar' for progress bar
+ # Ensure output directory exists if OUTPUT_VIDEO_FILENAME includes a path
+ output_dir = os.path.dirname(OUTPUT_VIDEO_FILENAME)
+ if output_dir and not os.path.exists(output_dir):
+ os.makedirs(output_dir)
+
final_video.write_videofile(
OUTPUT_VIDEO_FILENAME,
codec='libx264', # Common, good quality codec
audio_codec='aac', # Common audio codec
fps=fps,
preset=preset, # Controls encoding speed vs compression
- threads=4, # Use multiple threads if available
- logger='bar' # Show progress bar
+ threads=os.cpu_count() or 4, # Use available cores or default to 4
+ logger='bar', # Show progress bar in console
+ ffmpeg_params=["-movflags", "+faststart"] # Optimize for web streaming
)
print(f"Final video saved successfully as {OUTPUT_VIDEO_FILENAME}")
- # Close clips to release resources
- for clip in clips:
- clip.close()
- final_video.close()
except Exception as e:
print(f"FATAL ERROR during final video assembly or writing: {e}")
- import traceback
traceback.print_exc()
- shutil.rmtree(TEMP_FOLDER)
- return None
- # --- End Final Video Assembly ---
+ if TEMP_FOLDER and os.path.isdir(TEMP_FOLDER): shutil.rmtree(TEMP_FOLDER) # Clean up on error
+ raise gr.Error(f"Error during final video creation: {e}")
+ finally:
+ # --- Resource Cleanup (Clips) ---
+ # Crucial to close clips to release file handles, especially on Windows
+ print("Closing individual clip resources...")
+ for i, clip in enumerate(clips):
+ try:
+ if hasattr(clip, 'close'):
+ clip.close()
+ # Also close audio if it's separate and hasn't been closed yet
+ if hasattr(clip, 'audio') and clip.audio is not None and hasattr(clip.audio, 'close'):
+ clip.audio.close()
+ except Exception as e_close:
+ print(f"Warning: Error closing clip {i}: {e_close}")
+ if final_video is not None and hasattr(final_video, 'close'):
+ try:
+ final_video.close()
+ print("Closed final video resource.")
+ except Exception as e_final_close:
+ print(f"Warning: Error closing final video resource: {e_final_close}")
+ # --- End Resource Cleanup ---
- # --- Cleanup ---
+ # --- Temp Folder Cleanup ---
print("\n--- Cleaning Up Temporary Files ---")
try:
- shutil.rmtree(TEMP_FOLDER)
- print(f"Temporary folder removed: {TEMP_FOLDER}")
+ if TEMP_FOLDER and os.path.isdir(TEMP_FOLDER):
+ shutil.rmtree(TEMP_FOLDER)
+ print(f"Temporary folder removed: {TEMP_FOLDER}")
+ else:
+ print("Temporary folder not found or already removed.")
except Exception as e:
print(f"Warning: Could not remove temporary folder {TEMP_FOLDER}: {e}")
+ print("Manual cleanup might be required.")
# --- End Cleanup ---
end_time = time.time()
@@ -1666,6 +1989,7 @@ def generate_video(user_input, resolution, caption_option):
print("\n=============================================")
print("======= VIDEO GENERATION COMPLETE =======")
print(f" Total time: {total_time:.2f} seconds")
+ print(f" Output file: {OUTPUT_VIDEO_FILENAME}")
print("=============================================\n")
# Return the path to the generated video file
@@ -1688,8 +2012,11 @@ VOICE_CHOICES = {
'Lewis đŦđ§ (Male)': 'bm_lewis', 'Daniel đŦđ§ (Male)': 'bm_daniel'
}
-def generate_video_with_options(user_input, resolution, caption_option, music_file, voice, vclip_prob, bg_vol, video_fps, video_preset, v_speed, caption_size):
+def generate_video_with_options(user_input, resolution, caption_option, music_file, voice, vclip_prob, bg_vol, video_fps, video_preset, v_speed, caption_size, progress=gr.Progress(track_ ĪĪĪÎĩ=True)):
"""Wrapper function for Gradio to set global options before calling generate_video."""
+ # Use Gradio progress tracker if needed (though detailed logs are in console)
+ progress(0, desc="Initializing...")
+
global selected_voice, voice_speed, font_size, video_clip_probability, bg_music_volume, fps, preset
print("--- Updating Settings from Gradio ---")
@@ -1713,112 +2040,175 @@ def generate_video_with_options(user_input, resolution, caption_option, music_fi
print(f"Warning: Could not remove previous music file: {e}")
if music_file is not None:
- try:
- # music_file is a TemporaryFileWrapper object in Gradio >= 3.0
- shutil.copy(music_file.name, target_music_path)
- print(f"Uploaded music '{os.path.basename(music_file.name)}' copied to '{target_music_path}'")
- except Exception as e:
- print(f"Error copying uploaded music file: {e}")
- # Continue without background music if copy fails
+ # music_file is the path to the temporary uploaded file when type='filepath'
+ if isinstance(music_file, str) and os.path.exists(music_file):
+ try:
+ shutil.copy(music_file, target_music_path)
+ print(f"Uploaded music '{os.path.basename(music_file)}' copied to '{target_music_path}'")
+ except Exception as e:
+ print(f"Error copying uploaded music file: {e}")
+ # Continue without background music if copy fails
+ gr.Warning("Failed to copy background music file.") # Show warning in UI
+ else:
+ print(f"Invalid music file object received: {music_file}")
+ gr.Warning("Received invalid background music file.")
else:
print("No background music file uploaded.")
- # Call the main video generation function with the core inputs
- # The function will use the global variables updated above
+ # --- Call the main video generation function ---
+ # Wrap in try...except to catch errors and report them via Gradio
try:
+ # Update progress description
+ progress(0.1, desc="Generating script...") # Example progress update
+
+ # Note: generate_video itself prints detailed logs to console.
+ # Adding more progress steps here requires modifying generate_video
+ # to accept the progress object and call progress.update() internally.
+ # For simplicity, we rely on console logs for detailed progress.
+
video_path = generate_video(user_input, resolution, caption_option)
+
# Check if video generation failed (returned None)
- if video_path is None:
- # Raise a Gradio error to display it in the interface
- raise gr.Error("Video generation failed. Please check the console logs for details.")
+ if video_path is None or not os.path.exists(video_path):
+ print("Video generation function returned None or file does not exist.")
+ raise gr.Error("Video generation process completed, but the final video file was not created successfully. Please check the console logs for errors.")
+
+ progress(1.0, desc="Video Ready!")
+ gr.Info("Video generation complete!")
return video_path
+
except gr.Error as e:
- # Re-raise Gradio errors to show them in the UI
+ # Re-raise Gradio errors directly to show them in the UI
+ print(f"Gradio Error: {e}")
raise e
except Exception as e:
- # Catch unexpected errors during generation
+ # Catch any other unexpected errors during generation
print(f"An unexpected error occurred in generate_video_with_options: {e}")
- import traceback
traceback.print_exc()
# Raise a Gradio error for unexpected issues
- raise gr.Error(f"An unexpected error occurred: {e}. Check logs.")
-
-
-# Create the Gradio interface definition
-with gr.Blocks(theme=gr.themes.Soft()) as iface: # Using Blocks for better layout control
- gr.Markdown("# đ¤ AI Documentary Video Generator")
- gr.Markdown("Create short, funny documentary-style videos with AI narration and stock footage. Customize voice, music, captions, and more.")
-
- with gr.Row():
- with gr.Column(scale=2):
- user_input = gr.Textbox(label="đŦ Video Concept / Script", placeholder="Enter your video topic (e.g., 'Top 5 facts about cats') or paste a full script formatted like the example...", lines=4)
- with gr.Accordion("Example Script Format", open=False):
- gr.Markdown("""
- ```
- [Cats]
- Cats: tiny ninjas plotting world domination.
- [Sleeping]
- They sleep 23 hours a day, planning.
- [Boxes]
- Their mortal enemy? The empty box. It must be contained.
- [Zoomies]
- Suddenly, zoomies! Because reasons.
- [Subscribe]
- Subscribe now, or a cat will judge you silently. Forever.
- ```
- **Rules:**
- - Start each scene with `[Search Term]` (1-2 words for Pexels).
- - Follow with 5-15 words of narration.
- - Keep it funny and conversational.
- - End with a subscribe line related to the topic.
- """)
- with gr.Column(scale=1):
- resolution = gr.Radio(["Full", "Short"], label="đ Resolution", value="Full", info="Full=16:9 (YouTube), Short=9:16 (TikTok/Shorts)")
- caption_option = gr.Radio(["Yes", "No"], label="âī¸ Add Captions?", value="Yes")
- music_file = gr.File(label="đĩ Upload Background Music (Optional MP3)", file_types=[".mp3"], type="file") # Use type="file"
-
- with gr.Accordion("âī¸ Advanced Settings", open=False):
- with gr.Row():
- voice = gr.Dropdown(choices=list(VOICE_CHOICES.keys()), label="đŖī¸ Choose Voice", value="Emma (Female)")
- v_speed = gr.Slider(minimum=0.5, maximum=1.5, value=0.9, step=0.05, label="đ¨ Voice Speed", info="0.5=Slow, 1.0=Normal, 1.5=Fast")
- with gr.Row():
- caption_size = gr.Slider(minimum=20, maximum=100, value=45, step=1, label="đĄ Caption Font Size")
- vclip_prob = gr.Slider(minimum=0, maximum=100, value=25, step=5, label="đī¸ Video Clip %", info="Chance of using a video clip instead of an image for a scene.")
- with gr.Row():
- bg_vol = gr.Slider(minimum=0.0, maximum=1.0, value=0.08, step=0.01, label="đ BG Music Volume", info="0.0=Silent, 1.0=Full Volume")
- video_fps = gr.Slider(minimum=15, maximum=60, value=30, step=1, label="đŦ Video FPS")
- video_preset = gr.Dropdown(
- choices=["ultrafast", "superfast", "veryfast", "faster", "fast", "medium", "slow", "slower", "veryslow"],
- value="veryfast", label="âī¸ Export Quality/Speed", info="Faster presets = lower quality/size, Slower presets = higher quality/size."
- )
-
- submit_button = gr.Button("⨠Generate Video â¨", variant="primary")
- output_video = gr.Video(label="Generated Video")
-
- # Define the action when the button is clicked
- submit_button.click(
- fn=generate_video_with_options,
- inputs=[
- user_input, resolution, caption_option, music_file, voice,
- vclip_prob, bg_vol, video_fps, video_preset, v_speed, caption_size
- ],
- outputs=output_video
+ raise gr.Error(f"An unexpected error occurred during video generation: {str(e)}. Check console logs for details.")
+
+
+# Create the Gradio interface definition using Blocks API
+with gr.Blocks(theme=gr.themes.Soft(primary_hue=gr.themes.colors.blue)) as iface:
+ gr.Markdown(
+ """
+ # đ¤ AI Documentary Video Generator đŦ
+ Create short, funny documentary-style videos with AI narration and stock footage.
+ Customize voice, music, captions, and more!
+ """
)
- gr.Markdown("---")
- gr.Markdown("â ī¸ **Note:** Video generation can take several minutes, especially on CPU. Check console logs for progress.")
+ with gr.Tab("đŦ Create Video"):
+ with gr.Row():
+ with gr.Column(scale=2):
+ user_input = gr.Textbox(
+ label="đ Video Concept / Script",
+ placeholder="Enter your video topic (e.g., 'Top 5 facts about cats') or paste a full script formatted like the example...",
+ lines=5,
+ info="Provide a topic for AI script generation or paste your own formatted script."
+ )
+ with gr.Accordion("Example Script Format", open=False):
+ gr.Markdown(
+ """
+ ```
+ [Cats]
+ Cats: tiny ninjas plotting world domination from fluffy pillows.
+ [Sleeping]
+ They sleep 23 hours a day, conserving energy for midnight zoomies.
+ [Boxes]
+ Their mortal enemy? The empty box. It must be investigated and sat in.
+ [Judgement]
+ Silently judging your life choices from atop the bookshelf.
+ [Subscribe]
+ Subscribe now, or a cat will knock something off your table. Purrhaps.
+ ```
+ **Rules:**
+ - Start each scene with `[Search Term]` (1-2 words for Pexels).
+ - Follow with 5-15 words of narration.
+ - Keep it funny and conversational.
+ - End with a subscribe line related to the topic.
+ """
+ )
+ with gr.Column(scale=1):
+ resolution = gr.Radio(["Full", "Short"], label="đ Resolution", value="Full", info="Full=16:9 (YouTube), Short=9:16 (TikTok/Reels)")
+ caption_option = gr.Radio(["Yes", "No"], label="âī¸ Add Captions?", value="Yes")
+ music_file = gr.File(
+ label="đĩ Upload Background Music (Optional MP3)",
+ file_types=[".mp3"],
+ type="filepath" # Corrected type
+ )
+
+ with gr.Accordion("âī¸ Advanced Settings", open=False):
+ with gr.Row():
+ voice = gr.Dropdown(choices=list(VOICE_CHOICES.keys()), label="đŖī¸ Choose Voice", value="Emma (Female)")
+ v_speed = gr.Slider(minimum=0.5, maximum=1.5, value=0.9, step=0.05, label="đ¨ Voice Speed", info="0.5=Slow, 1.0=Normal, 1.5=Fast")
+ with gr.Row():
+ caption_size = gr.Slider(minimum=20, maximum=100, value=45, step=1, label="đĄ Caption Font Size")
+ vclip_prob = gr.Slider(minimum=0, maximum=100, value=25, step=5, label="đī¸ Video Clip %", info="Chance of using a video clip instead of an image for a scene.")
+ with gr.Row():
+ bg_vol = gr.Slider(minimum=0.0, maximum=1.0, value=0.08, step=0.01, label="đ BG Music Volume", info="0.0=Silent, 1.0=Full Volume")
+ video_fps = gr.Slider(minimum=15, maximum=60, value=30, step=1, label="đŦ Video FPS")
+ video_preset = gr.Dropdown(
+ choices=["ultrafast", "superfast", "veryfast", "faster", "fast", "medium", "slow", "slower", "veryslow"],
+ value="veryfast", label="âī¸ Export Quality/Speed", info="Faster presets = lower quality/size, Slower presets = higher quality/size."
+ )
+
+ submit_button = gr.Button("⨠Generate Video â¨", variant="primary", scale=1)
+ output_video = gr.Video(label="Generated Video", scale=3) # Make video output larger
+
+ # Define the action when the button is clicked
+ submit_button.click(
+ fn=generate_video_with_options,
+ inputs=[
+ user_input, resolution, caption_option, music_file, voice,
+ vclip_prob, bg_vol, video_fps, video_preset, v_speed, caption_size
+ ],
+ outputs=output_video
+ )
+
+ with gr.Tab("âšī¸ Notes & Tips"):
+ gr.Markdown(
+ """
+ ### Important Notes:
+ * **API Keys:** Ensure your Pexels and OpenRouter API keys are correctly set at the top of the script or as environment variables.
+ * **Dependencies:** Make sure all required libraries (`moviepy`, `kokoro`, `gTTS`, `requests`, `pydub`, `pillow`, `gradio`, `numpy`, `soundfile`, `bs4`, `pysrt`) are installed. You might also need `ffmpeg` and `imagemagick` installed on your system.
+ * **ImageMagick:** For captions with backgrounds/strokes to work reliably (especially on Linux/Colab), ImageMagick needs to be installed and its policy file might need adjustment (the script attempts this with `sudo`).
+ * **Performance:** Video generation can be CPU and time-intensive. Generation times of several minutes are normal. Check the console/terminal output for detailed progress and potential errors.
+ * **Stock Footage:** The quality and relevance of stock footage depend on the search terms in your script (`[Search Term]`) and Pexels/Google search results. Keep search terms general but descriptive.
+ * **Error Handling:** If generation fails, check the console output for specific error messages from API calls, file downloads, or video processing steps.
+
+ ### Tips:
+ * Start with simple topics to test the workflow.
+ * Use the "Example Script Format" as a guide for your own scripts.
+ * Experiment with different voices and speeds.
+ * Adjust the "Video Clip %" slider to control the mix of video vs. images.
+ * If captions look wrong, ensure ImageMagick is working and try a standard font like 'Arial'.
+ """
+ )
# Launch the interface
if __name__ == "__main__":
- # Ensure API keys are set before launching
- if not PEXELS_API_KEY or not OPENROUTER_API_KEY:
+ # Final check for API keys before launching
+ keys_ok = True
+ if PEXELS_API_KEY == "YOUR_PEXELS_API_KEY_HERE":
print("####################################################################")
- print("ERROR: PEXELS_API_KEY or OPENROUTER_API_KEY is not set!")
- print("Please set these variables at the top of the script before running.")
+ print("ERROR: PEXELS_API_KEY is not set!")
+ print("Please set it at the top of the script or as an environment variable.")
print("####################################################################")
- # Optionally exit if keys are missing
- # exit(1)
+ keys_ok = False
+ if OPENROUTER_API_KEY == "YOUR_OPENROUTER_API_KEY_HERE":
+ print("####################################################################")
+ print("ERROR: OPENROUTER_API_KEY is not set!")
+ print("Please set it at the top of the script or as an environment variable.")
+ print("####################################################################")
+ keys_ok = False
+
+ if keys_ok:
+ print("\nAPI Keys seem to be set. Launching Gradio interface...")
+ print("Access the interface at the URL provided below (usually http://127.0.0.1:7860 or a public URL if share=True).")
+ iface.launch(share=True, debug=True) # Enable share=True for public link, debug=True for more logs
else:
- print("API Keys seem to be set. Launching Gradio interface...")
- iface.launch(share=True, debug=True) # Enable share=True for public link, debug=True for more logs
\ No newline at end of file
+ print("\nCannot launch Gradio interface due to missing API keys.")
+
\ No newline at end of file