Spaces:
Running
Running
# Install necessary packages (assuming these are already run in your environment) | |
# !pip install transformers==4.49.0 | |
# !pip install moviepy gTTS requests pydub pillow | |
# !pip cache purge | |
# !apt-get install imagemagick -y | |
# !pip install kokoro>=0.3.4 soundfile | |
# !apt-get-qq -y install espeak-ng > /dev/null 2>&1 | |
# !pip install pysrt | |
# !pip install gradio | |
# Import necessary libraries | |
from kokoro import KPipeline | |
import soundfile as sf | |
import torch | |
# Removed duplicate import of soundfile as sf | |
import os | |
from moviepy.editor import ( | |
VideoFileClip, AudioFileClip, ImageClip, concatenate_videoclips, | |
CompositeVideoClip, TextClip, CompositeAudioClip # Added CompositeAudioClip | |
) | |
from PIL import Image | |
import tempfile | |
import random | |
import cv2 | |
import math | |
import requests, io, time, re | |
# Removed duplicate import of random | |
import gradio as gr | |
import shutil | |
# Removed duplicate import of os | |
import moviepy.video.fx.all as vfx | |
import moviepy.config as mpy_config | |
from pydub import AudioSegment | |
from pydub.generators import Sine | |
# Removed duplicate import of Image, ImageDraw, ImageFont | |
import numpy as np | |
from bs4 import BeautifulSoup | |
import base64 | |
from urllib.parse import quote | |
import pysrt | |
from gtts import gTTS | |
# Removed duplicate import of gradio as gr | |
# Initialize Kokoro TTS pipeline (using American English) | |
pipeline = KPipeline(lang_code='a') # Use voice 'af_heart' for American English | |
# Ensure ImageMagick binary is set | |
try: | |
mpy_config.change_settings({"IMAGEMAGICK_BINARY": "/usr/bin/convert"}) | |
print("ImageMagick binary set successfully.") | |
except Exception as e: | |
print(f"Warning: Could not set ImageMagick binary automatically: {e}") | |
print("TextClip functionality might be limited if ImageMagick is not found.") | |
# ---------------- Global Configuration ---------------- # | |
PEXELS_API_KEY = 'BhJqbcdm9Vi90KqzXKAhnEHGsuFNv4irXuOjWtT761U49lRzo03qBGna' # Replace with your key | |
OPENROUTER_API_KEY = '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' # Replace with your key | |
OPENROUTER_MODEL = "mistralai/mistral-small-3.1-24b-instruct:free" | |
OUTPUT_VIDEO_FILENAME = "final_video.mp4" | |
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" | |
# Additional global variables needed for the Gradio interface | |
selected_voice = 'af_heart' # Default voice | |
voice_speed = 0.9 # Default voice speed | |
font_size = 45 # Default font size | |
video_clip_probability = 0.25 # Default probability for video clips | |
bg_music_volume = 0.08 # Default background music volume | |
fps = 30 # Default FPS | |
preset = "veryfast" # Default preset | |
TARGET_RESOLUTION = None | |
CAPTION_COLOR = None | |
TEMP_FOLDER = None | |
# ---------------- Helper Functions ---------------- # | |
def generate_script(user_input): | |
"""Generate documentary script with proper OpenRouter handling.""" | |
headers = { | |
'Authorization': f'Bearer {OPENROUTER_API_KEY}', | |
'HTTP-Referer': 'https://your-domain.com', # Optional: Replace with your actual domain if needed | |
'X-Title': 'AI Documentary Maker' # Optional | |
} | |
prompt = f"""Short Documentary Script GeneratorInstructions: | |
If I say "use this," just output the script exactly as I gave it. | |
If I only give topics, generate a script based on them. | |
If I provide a full script, rewrite it without any changes. Make everything short simple and humarous funny and act as serious but humarous. And don't say anything off topic. Also alway say a funny statement to subscribe based on the video topic at the end. Use normal conversational text like a normal person talking and avoid AI phase make the statements humanize and normal conversational | |
And always talk negative views don't really wants any positive statement and the full script will be one video about one topic so keep in mind | |
Formatting Rules: | |
Title in Square Brackets: | |
Each section starts with a one-word title inside [ ] (max two words if necessary). | |
This title will be used as a search term for Pexels footage. | |
Casual & Funny Narration: | |
Each section has 5-10 words of narration. | |
Keep it natural, funny, and unpredictable (not robotic, poetic, or rhythmic). | |
No Special Formatting: | |
No bold, italics, or special characters. You are a assistant AI your task is to create script. You aren't a chatbot. So, don't write extra text | |
Generalized Search Terms: | |
If a term is too specific, make it more general for Pexels search. | |
Scene-Specific Writing: | |
Each section describes only what should be shown in the video. | |
Output Only the Script, and also make it funny and humarous and helirous and also add to subscribe with a funny statement like subscribe now or ..... | |
No extra text, just the script. | |
Example Output: | |
[North Korea] | |
Top 5 unknown facts about North Korea. | |
[Invisibility] | |
North Korea’s internet speed is so fast… it doesn’t exist. | |
[Leadership] | |
Kim Jong-un once won an election with 100% votes… against himself. | |
[Magic] | |
North Korea discovered time travel. That’s why their news is always from the past. | |
[Warning] | |
Subscribe now, or Kim Jong-un will send you a free one-way ticket… to North Korea. | |
[Freedom] | |
North Korean citizens can do anything… as long as it's government-approved. | |
Now here is the Topic/scrip: {user_input} | |
""" | |
data = { | |
'model': OPENROUTER_MODEL, | |
'messages': [{'role': 'user', 'content': prompt}], | |
'temperature': 0.4, | |
'max_tokens': 5000 | |
} | |
try: | |
response = requests.post( | |
'https://openrouter.ai/api/v1/chat/completions', | |
headers=headers, | |
json=data, | |
timeout=60 # Increased timeout | |
) | |
response.raise_for_status() # Raise an exception for bad status codes | |
response_data = response.json() | |
if 'choices' in response_data and len(response_data['choices']) > 0 and 'message' in response_data['choices'][0] and 'content' in response_data['choices'][0]['message']: | |
return response_data['choices'][0]['message']['content'].strip() | |
else: | |
print("Unexpected API response format:", response_data) | |
return None | |
except requests.exceptions.RequestException as e: | |
print(f"API request failed: {e}") | |
return None | |
except Exception as e: | |
print(f"An unexpected error occurred during script generation: {e}") | |
return None | |
def parse_script(script_text): | |
""" | |
Parse the generated script into a list of elements. | |
For each section, create two elements: | |
- A 'media' element using the section title as the visual prompt. | |
- A 'tts' element with the narration text, voice info, and computed duration. | |
""" | |
sections = {} | |
current_title = None | |
current_text = "" | |
if not script_text: | |
print("Error: Received empty script text for parsing.") | |
return [] | |
try: | |
lines = script_text.strip().splitlines() | |
for line in lines: | |
line = line.strip() | |
if not line: # Skip empty lines | |
continue | |
match = re.match(r'^\[([^\]]+)\](.*)', line) | |
if match: | |
# If we were processing a previous title, save it | |
if current_title is not None and current_text: | |
sections[current_title] = current_text.strip() | |
current_title = match.group(1).strip() | |
current_text = match.group(2).strip() + " " # Start text for the new title | |
elif current_title is not None: | |
# Append line to the current text if it doesn't start a new section | |
current_text += line + " " | |
# Add the last section after the loop ends | |
if current_title is not None and current_text: | |
sections[current_title] = current_text.strip() | |
elements = [] | |
for title, narration in sections.items(): | |
narration = narration.strip() # Ensure no leading/trailing whitespace | |
if not title or not narration: | |
print(f"Warning: Skipping empty title ('{title}') or narration ('{narration}')") | |
continue | |
media_element = {"type": "media", "prompt": title, "effects": "fade-in"} | |
words = narration.split() | |
# Simple duration estimate: 0.5 seconds per word, minimum 3 seconds | |
duration = max(3.0, len(words) * 0.5) | |
tts_element = {"type": "tts", "text": narration, "voice": "en", "duration": duration} | |
elements.append(media_element) | |
elements.append(tts_element) | |
if not elements: | |
print("Warning: Script parsing resulted in no elements. Check script format.") | |
return elements | |
except Exception as e: | |
print(f"Error parsing script: {e}") | |
print(f"Problematic script text snippet: {script_text[:200]}") # Log part of the script | |
return [] | |
def search_pexels_videos(query, pexels_api_key): | |
"""Search for a video on Pexels by query and return a random HD video.""" | |
if not pexels_api_key: | |
print("Pexels API key is missing. Cannot search for videos.") | |
return None | |
headers = {'Authorization': pexels_api_key} | |
base_url = "https://api.pexels.com/videos/search" | |
num_pages = 3 # Search first 3 pages | |
videos_per_page = 15 | |
max_retries = 3 | |
retry_delay = 2 # Start with 2 seconds delay | |
search_query = query | |
all_videos = [] | |
print(f"Searching Pexels videos for: '{query}'") | |
for page in range(1, num_pages + 1): | |
params = {"query": search_query, "per_page": videos_per_page, "page": page, "orientation": "landscape"} # Added orientation | |
for attempt in range(max_retries): | |
try: | |
response = requests.get(base_url, headers=headers, params=params, timeout=15) # Increased timeout | |
if response.status_code == 200: | |
data = response.json() | |
videos = data.get("videos", []) | |
if not videos: | |
# print(f"No videos found on page {page} for '{query}'.") # Less verbose | |
break # Stop searching pages if one is empty | |
for video in videos: | |
video_files = video.get("video_files", []) | |
# Prefer HD, then SD if HD not found | |
hd_link = None | |
sd_link = None | |
for file in video_files: | |
if file.get("quality") == "hd" and file.get("link"): | |
hd_link = file.get("link") | |
break # Found HD, use it | |
elif file.get("quality") == "sd" and file.get("link"): | |
sd_link = file.get("link") # Keep SD as fallback | |
link_to_add = hd_link if hd_link else sd_link | |
if link_to_add: | |
all_videos.append(link_to_add) | |
break # Success for this page, move to next page | |
elif response.status_code == 429: | |
print(f"Pexels rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay} seconds...") | |
time.sleep(retry_delay) | |
retry_delay *= 2 | |
elif response.status_code == 400: # Bad request often means invalid query | |
print(f"Pexels API bad request (400) for query '{query}'. Skipping.") | |
return None # Don't retry bad requests | |
else: | |
print(f"Error fetching Pexels videos: {response.status_code} {response.text}") | |
if attempt < max_retries - 1: | |
print(f"Retrying Pexels video search in {retry_delay} seconds...") | |
time.sleep(retry_delay) | |
retry_delay *= 2 | |
else: | |
print("Max retries reached for Pexels video search.") | |
break # Max retries for this page | |
except requests.exceptions.Timeout: | |
print(f"Pexels video search timed out (attempt {attempt+1}/{max_retries}).") | |
if attempt < max_retries - 1: | |
print(f"Retrying Pexels video search in {retry_delay} seconds...") | |
time.sleep(retry_delay) | |
retry_delay *= 2 | |
else: | |
print("Max retries reached for Pexels video search due to timeout.") | |
break # Max retries for this page | |
except requests.exceptions.RequestException as e: | |
print(f"Pexels video search request exception: {e}") | |
# Don't retry general request exceptions unless specifically needed | |
break # Stop trying for this page | |
# Reset retry delay for the next page | |
retry_delay = 2 | |
if all_videos: | |
random_video = random.choice(all_videos) | |
print(f"Selected random video from {len(all_videos)} found for '{query}'") | |
return random_video | |
else: | |
print(f"No suitable Pexels videos found for query: '{query}'") | |
return None | |
def search_pexels_images(query, pexels_api_key): | |
"""Search for an image on Pexels by query.""" | |
if not pexels_api_key: | |
print("Pexels API key is missing. Cannot search for images.") | |
return None | |
headers = {'Authorization': pexels_api_key} | |
url = "https://api.pexels.com/v1/search" | |
params = {"query": query, "per_page": 10, "orientation": "landscape"} # Get more results, landscape only | |
max_retries = 3 | |
retry_delay = 2 | |
print(f"Searching Pexels images for: '{query}'") | |
for attempt in range(max_retries): | |
try: | |
response = requests.get(url, headers=headers, params=params, timeout=15) | |
if response.status_code == 200: | |
data = response.json() | |
photos = data.get("photos", []) | |
if photos: | |
# Select from 'original', 'large2x', 'large' in order of preference | |
valid_photos = [] | |
for photo in photos: | |
src = photo.get("src", {}) | |
img_url = src.get("original") or src.get("large2x") or src.get("large") | |
if img_url: | |
valid_photos.append(img_url) | |
if valid_photos: | |
chosen_url = random.choice(valid_photos) | |
print(f"Found {len(valid_photos)} Pexels images for '{query}', selected one.") | |
return chosen_url | |
else: | |
print(f"No valid image URLs found in Pexels response for '{query}'.") | |
return None | |
else: | |
# print(f"No Pexels images found for query: {query}") # Less verbose | |
return None | |
elif response.status_code == 429: | |
print(f"Pexels rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay} seconds...") | |
time.sleep(retry_delay) | |
retry_delay *= 2 | |
elif response.status_code == 400: | |
print(f"Pexels API bad request (400) for query '{query}'. Skipping.") | |
return None | |
else: | |
print(f"Error fetching Pexels images: {response.status_code} {response.text}") | |
if attempt < max_retries - 1: | |
print(f"Retrying Pexels image search in {retry_delay} seconds...") | |
time.sleep(retry_delay) | |
retry_delay *= 2 | |
else: | |
print("Max retries reached for Pexels image search.") | |
return None # Max retries failed | |
except requests.exceptions.Timeout: | |
print(f"Pexels image search timed out (attempt {attempt+1}/{max_retries}).") | |
if attempt < max_retries - 1: | |
print(f"Retrying Pexels image search in {retry_delay} seconds...") | |
time.sleep(retry_delay) | |
retry_delay *= 2 | |
else: | |
print("Max retries reached for Pexels image search due to timeout.") | |
return None # Max retries failed | |
except requests.exceptions.RequestException as e: | |
print(f"Pexels image search request exception: {e}") | |
return None # Don't retry | |
print(f"No Pexels images found for query: '{query}' after all attempts.") | |
return None | |
def search_google_images(query): | |
"""Search for images on Google Images (use cautiously, might break).""" | |
print(f"Attempting Google Image search for (use with caution): '{query}'") | |
try: | |
# Using a simpler, potentially more stable URL structure | |
search_url = f"https://www.google.com/search?q={quote(query)}&source=lnms&tbm=isch" | |
headers = {"User-Agent": USER_AGENT} | |
response = requests.get(search_url, headers=headers, timeout=10) | |
response.raise_for_status() # Check for HTTP errors | |
soup = BeautifulSoup(response.text, "html.parser") | |
# Google changes its HTML structure often. This is a common pattern, but might need updates. | |
# Look for image data embedded in script tags or specific img tags. | |
image_urls = [] | |
# Try finding JSON data first (often more reliable if present) | |
scripts = soup.find_all("script") | |
for script in scripts: | |
if script.string and 'AF_initDataCallback' in script.string: | |
# This requires more complex parsing of the JS data structure | |
# For simplicity, we'll stick to img tags for now. | |
pass # Placeholder for potential future JSON parsing | |
# Fallback to finding img tags (less reliable for direct source URLs) | |
img_tags = soup.find_all("img") | |
for img in img_tags: | |
src = img.get("src") or img.get("data-src") # Check both src and data-src | |
if src and src.startswith("http") and not "gstatic.com" in src: | |
# Basic filtering, might need refinement | |
image_urls.append(src) | |
elif src and src.startswith('data:image'): | |
# Handle base64 encoded images (less common for main results now) | |
try: | |
# Extract base64 data (simplistic extraction) | |
header, encoded = src.split(",", 1) | |
# You could save this, but it's often just thumbnails | |
# print("Found base64 image data (skipping for now)") | |
except ValueError: | |
pass # Ignore malformed data URIs | |
if image_urls: | |
# Return a random one from the first few potentially relevant results | |
num_to_consider = min(len(image_urls), 10) | |
chosen_url = random.choice(image_urls[:num_to_consider]) | |
print(f"Found {len(image_urls)} potential Google images, selected one.") | |
return chosen_url | |
else: | |
print(f"No suitable Google Images found for query: '{query}' with current parsing method.") | |
return None | |
except requests.exceptions.RequestException as e: | |
print(f"Error during Google Images request: {e}") | |
return None | |
except Exception as e: | |
print(f"Error parsing Google Images HTML: {e}") | |
return None | |
def download_image(image_url, filename): | |
"""Download an image from a URL to a local file with enhanced error handling.""" | |
if not image_url: | |
print("Error: No image URL provided for download.") | |
return None | |
try: | |
headers = {"User-Agent": USER_AGENT, "Accept": "image/*"} # Be more specific about accepted content | |
print(f"Downloading image from: {image_url} to {filename}") | |
response = requests.get(image_url, headers=headers, stream=True, timeout=20) # Increased timeout | |
response.raise_for_status() # Check for download errors | |
# Check content type if possible | |
content_type = response.headers.get('Content-Type', '').lower() | |
if 'image' not in content_type: | |
print(f"Warning: URL content type ({content_type}) might not be an image. Proceeding anyway.") | |
with open(filename, 'wb') as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
f.write(chunk) | |
print(f"Image downloaded successfully to: {filename}") | |
# Validate the downloaded image | |
try: | |
img = Image.open(filename) | |
img.verify() # Check if Pillow can read the header | |
# Re-open after verify | |
img = Image.open(filename) | |
if img.mode != 'RGB': | |
print(f"Converting image {filename} from {img.mode} to RGB.") | |
img = img.convert('RGB') | |
img.save(filename, quality=90) # Save with decent quality | |
img.close() # Close the image file handle | |
print(f"Image validated and processed: {filename}") | |
return filename | |
except (IOError, SyntaxError, Image.UnidentifiedImageError) as e_validate: | |
print(f"Downloaded file '{filename}' is not a valid image or is corrupted: {e_validate}") | |
if os.path.exists(filename): | |
try: | |
os.remove(filename) | |
print(f"Removed invalid image file: {filename}") | |
except OSError as e_remove: | |
print(f"Error removing invalid image file '{filename}': {e_remove}") | |
return None | |
except requests.exceptions.RequestException as e_download: | |
print(f"Image download error from {image_url}: {e_download}") | |
# Clean up potentially incomplete file | |
if os.path.exists(filename): | |
try: | |
os.remove(filename) | |
except OSError: pass | |
return None | |
except Exception as e_general: | |
print(f"General error during image processing for {image_url}: {e_general}") | |
if os.path.exists(filename): | |
try: | |
os.remove(filename) | |
except OSError: pass | |
return None | |
def download_video(video_url, filename): | |
"""Download a video from a URL to a local file.""" | |
if not video_url: | |
print("Error: No video URL provided for download.") | |
return None | |
try: | |
headers = {"User-Agent": USER_AGENT} # Pexels might not require this, but good practice | |
print(f"Downloading video from: {video_url} to {filename}") | |
response = requests.get(video_url, headers=headers, stream=True, timeout=60) # Generous timeout for videos | |
response.raise_for_status() | |
with open(filename, 'wb') as f: | |
for chunk in response.iter_content(chunk_size=1024*1024): # Larger chunks for video | |
f.write(chunk) | |
print(f"Video downloaded successfully to: {filename}") | |
# Basic validation: check file size | |
if os.path.getsize(filename) < 1024: # Check if file is suspiciously small (e.g., < 1KB) | |
print(f"Warning: Downloaded video file '{filename}' is very small. It might be invalid.") | |
# Keep the file for now, let moviepy handle potential errors later | |
return filename | |
except requests.exceptions.RequestException as e: | |
print(f"Video download error from {video_url}: {e}") | |
if os.path.exists(filename): | |
try: | |
os.remove(filename) # Clean up failed download | |
except OSError: pass | |
return None | |
except Exception as e_general: | |
print(f"General error during video download for {video_url}: {e_general}") | |
if os.path.exists(filename): | |
try: | |
os.remove(filename) | |
except OSError: pass | |
return None | |
def generate_media(prompt, user_image=None, current_index=0, total_segments=1): | |
""" | |
Generate a visual asset: Try video (based on probability), then Pexels image, then Google (news), then fallback Pexels image. | |
Returns a dict: {'path': <file_path>, 'asset_type': 'video' or 'image'}. | |
""" | |
safe_prompt = re.sub(r'[^\w\s-]', '', prompt).strip().replace(' ', '_') | |
if not safe_prompt: # Handle cases where prompt becomes empty after sanitizing | |
safe_prompt = f"media_{current_index}" | |
print(f"\n--- Generating Media for Prompt: '{prompt}' (Segment {current_index+1}/{total_segments}) ---") | |
# 1. Try Video first based on probability | |
if random.random() < video_clip_probability: | |
print(f"Attempting video search (Probability: {video_clip_probability*100}%)") | |
video_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}_video_{current_index}.mp4") | |
video_url = search_pexels_videos(prompt, PEXELS_API_KEY) | |
if video_url: | |
downloaded_video = download_video(video_url, video_file) | |
if downloaded_video and os.path.exists(downloaded_video): | |
# Further check if video is usable by moviepy (optional, adds overhead) | |
try: | |
with VideoFileClip(downloaded_video) as test_clip: | |
if test_clip.duration > 0: | |
print(f"Video asset usable: {downloaded_video}") | |
return {"path": downloaded_video, "asset_type": "video"} | |
else: | |
print(f"Downloaded video file seems invalid (duration 0): {downloaded_video}") | |
os.remove(downloaded_video) # Clean up invalid video | |
except Exception as e: | |
print(f"Error testing downloaded video {downloaded_video}: {e}") | |
if os.path.exists(downloaded_video): os.remove(downloaded_video) # Clean up invalid video | |
else: | |
print(f"Pexels video download failed for prompt: '{prompt}'") | |
else: | |
print(f"Pexels video search failed for prompt: '{prompt}'") | |
# 2. Try Pexels Image | |
print("Attempting Pexels image search...") | |
image_file_pexels = os.path.join(TEMP_FOLDER, f"{safe_prompt}_pexels_{current_index}.jpg") | |
image_url_pexels = search_pexels_images(prompt, PEXELS_API_KEY) | |
if image_url_pexels: | |
downloaded_image_pexels = download_image(image_url_pexels, image_file_pexels) | |
if downloaded_image_pexels and os.path.exists(downloaded_image_pexels): | |
print(f"Pexels image asset saved: {downloaded_image_pexels}") | |
return {"path": downloaded_image_pexels, "asset_type": "image"} | |
else: | |
print(f"Pexels image download failed for prompt: '{prompt}'") | |
# 3. If "news" in prompt, try Google Images as a secondary option | |
if "news" in prompt.lower(): | |
print(f"News-related query: '{prompt}'. Trying Google Images as secondary...") | |
image_file_google = os.path.join(TEMP_FOLDER, f"{safe_prompt}_google_{current_index}.jpg") | |
image_url_google = search_google_images(prompt) | |
if image_url_google: | |
downloaded_image_google = download_image(image_url_google, image_file_google) | |
if downloaded_image_google and os.path.exists(downloaded_image_google): | |
print(f"Google image asset saved: {downloaded_image_google}") | |
return {"path": downloaded_image_google, "asset_type": "image"} | |
else: | |
print(f"Google Images download failed for prompt: '{prompt}'") | |
else: | |
print(f"Google Images search failed for prompt: '{prompt}'") | |
# 4. Fallback to generic Pexels image search if everything else failed | |
print("Primary searches failed. Attempting fallback Pexels image search...") | |
fallback_terms = ["abstract", "texture", "technology", "nature", "background"] | |
fallback_term = random.choice(fallback_terms) | |
print(f"Using fallback term: '{fallback_term}'") | |
fallback_file = os.path.join(TEMP_FOLDER, f"fallback_{fallback_term}_{current_index}.jpg") | |
fallback_url = search_pexels_images(fallback_term, PEXELS_API_KEY) | |
if fallback_url: | |
downloaded_fallback = download_image(fallback_url, fallback_file) | |
if downloaded_fallback and os.path.exists(downloaded_fallback): | |
print(f"Fallback image asset saved: {downloaded_fallback}") | |
return {"path": downloaded_fallback, "asset_type": "image"} | |
else: | |
print(f"Fallback image download failed for term: '{fallback_term}'") | |
else: | |
print(f"Fallback image search failed for term: '{fallback_term}'") | |
# 5. Absolute fallback: Generate a simple color background (if ImageMagick is available) | |
try: | |
print("All media generation failed. Creating a simple color background.") | |
color_bg_path = os.path.join(TEMP_FOLDER, f"color_bg_{current_index}.png") | |
# Ensure TARGET_RESOLUTION is set before calling this | |
if TARGET_RESOLUTION: | |
w, h = TARGET_RESOLUTION | |
# Pick a random dark color | |
r, g, b = random.randint(0, 50), random.randint(0, 50), random.randint(0, 50) | |
color = f"rgb({r},{g},{b})" | |
# Use ImageMagick 'convert' command - requires it to be installed and accessible | |
cmd = f"convert -size {w}x{h} xc:'{color}' {color_bg_path}" | |
os.system(cmd) | |
if os.path.exists(color_bg_path): | |
print(f"Generated color background: {color_bg_path}") | |
return {"path": color_bg_path, "asset_type": "image"} | |
else: | |
print("Failed to generate color background using ImageMagick.") | |
return None | |
else: | |
print("Cannot generate color background: TARGET_RESOLUTION not set.") | |
return None | |
except Exception as e: | |
print(f"Error generating color background: {e}") | |
return None | |
# Should not be reached if color background works, but as a final safety net: | |
print(f"ERROR: Failed to generate *any* visual asset for prompt: '{prompt}'") | |
return None | |
def generate_silent_audio(duration, sample_rate=24000): | |
"""Generate a silent WAV audio file lasting 'duration' seconds.""" | |
try: | |
num_samples = int(duration * sample_rate) | |
silence = np.zeros(num_samples, dtype=np.float32) | |
# Ensure TEMP_FOLDER exists and is writable | |
if not TEMP_FOLDER or not os.path.isdir(TEMP_FOLDER): | |
print("Error: TEMP_FOLDER not set or invalid for silent audio.") | |
# Create a fallback temporary file | |
silent_file = tempfile.NamedTemporaryFile(suffix=".wav", delete=False) | |
silent_path = silent_file.name | |
silent_file.close() # Close handle immediately after getting name | |
else: | |
silent_path = os.path.join(TEMP_FOLDER, f"silent_{int(time.time()*1000)}.wav") | |
sf.write(silent_path, silence, sample_rate) | |
print(f"Silent audio generated: {silent_path} ({duration:.2f}s)") | |
return silent_path | |
except Exception as e: | |
print(f"Error generating silent audio: {e}") | |
# Return None or raise exception? Returning None might hide issues. | |
# Let's return None and let the calling function handle it. | |
return None | |
def generate_tts(text, voice): | |
""" | |
Generate TTS audio using Kokoro, falling back to gTTS or silent audio if needed. | |
Uses global `selected_voice` and `voice_speed`. | |
""" | |
if not text: | |
print("Warning: Empty text received for TTS. Generating silence.") | |
# Estimate a short duration for empty text, e.g., 1 second | |
return generate_silent_audio(duration=1.0) | |
# Sanitize text slightly for filename (limit length, basic chars) | |
safe_text_part = re.sub(r'[^\w-]', '', text[:15]).strip().replace(' ', '_') | |
if not safe_text_part: safe_text_part = f"tts_{int(time.time()*1000)}" | |
file_path = os.path.join(TEMP_FOLDER, f"{safe_text_part}.wav") | |
# Decide voice: Use global `selected_voice` if `voice` is the default 'en' | |
kokoro_voice_to_use = selected_voice if voice == 'en' else voice | |
print(f"Generating TTS for: '{text[:50]}...' (Voice: {kokoro_voice_to_use}, Speed: {voice_speed})") | |
# --- Try Kokoro TTS --- | |
try: | |
# Ensure pipeline is initialized | |
if pipeline is None: | |
raise ValueError("Kokoro pipeline is not initialized.") | |
generator = pipeline(text, voice=kokoro_voice_to_use, speed=voice_speed, split_pattern=r'\n+') # Split on newlines if any | |
audio_segments = [] | |
output_sample_rate = 24000 # Kokoro's default rate | |
for i, (gs, ps, audio) in enumerate(generator): | |
if audio is not None and audio.ndim > 0 and audio.size > 0: # Check if audio data is valid | |
# Ensure audio is float32, Kokoro might return different types | |
if audio.dtype != np.float32: | |
# Attempt conversion (e.g., from int16) | |
if audio.dtype == np.int16: | |
audio = audio.astype(np.float32) / 32768.0 | |
else: | |
print(f"Warning: Unexpected audio dtype {audio.dtype} from Kokoro. Trying direct use.") | |
# If unsure how to convert, might need to skip or handle specific cases | |
audio_segments.append(audio) | |
else: | |
print(f"Warning: Kokoro returned empty or invalid audio segment {i} for text.") | |
if not audio_segments: | |
print("Error: Kokoro generated no valid audio segments.") | |
raise ValueError("No audio data from Kokoro") | |
# Concatenate segments if needed | |
full_audio = np.concatenate(audio_segments) if len(audio_segments) > 1 else audio_segments[0] | |
# Check final audio shape and content | |
if full_audio is None or full_audio.ndim == 0 or full_audio.size == 0: | |
print("Error: Final concatenated audio from Kokoro is invalid.") | |
raise ValueError("Invalid final audio data from Kokoro") | |
# Check for NaN or Inf values | |
if np.isnan(full_audio).any() or np.isinf(full_audio).any(): | |
print("Error: Kokoro audio contains NaN or Inf values. Attempting to clean.") | |
full_audio = np.nan_to_num(full_audio) # Replace NaN with 0, Inf with large numbers | |
# Normalize audio slightly to prevent clipping (optional) | |
max_val = np.max(np.abs(full_audio)) | |
if max_val > 1.0: | |
full_audio = full_audio / max_val * 0.98 | |
sf.write(file_path, full_audio, output_sample_rate) | |
print(f"TTS audio saved: {file_path} (Kokoro)") | |
return file_path | |
except Exception as e_kokoro: | |
print(f"Error with Kokoro TTS: {e_kokoro}. Trying gTTS fallback...") | |
# --- Try gTTS Fallback --- | |
try: | |
tts = gTTS(text=text, lang='en', slow= (voice_speed < 0.8) ) # Basic speed control approximation | |
# Save MP3 temporarily | |
mp3_path = os.path.join(TEMP_FOLDER, f"{safe_text_part}_gtts.mp3") | |
tts.save(mp3_path) | |
# Convert MP3 to WAV using pydub | |
audio = AudioSegment.from_mp3(mp3_path) | |
# Export as WAV (pydub handles sample rate conversion if needed, defaults reasonable) | |
audio.export(file_path, format="wav") | |
# Clean up temporary MP3 | |
if os.path.exists(mp3_path): | |
try: | |
os.remove(mp3_path) | |
except OSError: pass | |
print(f"Fallback TTS saved: {file_path} (gTTS)") | |
# Check if the generated WAV file is valid | |
if os.path.exists(file_path) and os.path.getsize(file_path) > 100: # Basic size check | |
return file_path | |
else: | |
print(f"Error: gTTS generated an invalid or empty WAV file: {file_path}") | |
if os.path.exists(file_path): os.remove(file_path) | |
raise ValueError("gTTS output file invalid") | |
except Exception as e_gtts: | |
print(f"Error with gTTS fallback: {e_gtts}. Generating silence.") | |
# --- Generate Silence as final fallback --- | |
# Estimate duration based on text length if possible | |
estimated_duration = max(1.0, len(text.split()) * (0.6 / voice_speed)) # Rough estimate | |
return generate_silent_audio(duration=estimated_duration) | |
def apply_kenburns_effect(clip, target_resolution, effect_type="random"): | |
"""Apply a smooth Ken Burns effect (zoom/pan) to an image clip.""" | |
try: | |
target_w, target_h = target_resolution | |
# Ensure clip has dimensions (might be None if error occurred) | |
if not hasattr(clip, 'w') or not hasattr(clip, 'h') or clip.w is None or clip.h is None or clip.w == 0 or clip.h == 0: | |
print("Error applying Ken Burns: Invalid clip dimensions.") | |
return clip # Return original clip | |
clip_w, clip_h = clip.w, clip.h | |
clip_aspect = clip_w / clip_h | |
target_aspect = target_w / target_h | |
# --- Resize to cover target area --- | |
if clip_aspect > target_aspect: | |
# Image is wider than target: Resize based on height | |
scale_factor = target_h / clip_h | |
resized_w = int(clip_w * scale_factor) | |
resized_h = target_h | |
else: | |
# Image is taller than target: Resize based on width | |
scale_factor = target_w / clip_w | |
resized_w = target_w | |
resized_h = int(clip_h * scale_factor) | |
# Use LANCZOS for resizing images - better quality | |
clip = clip.resize(newsize=(resized_w, resized_h)) | |
# --- Apply scale for zoom effect --- | |
# Scale slightly larger to allow for movement without showing edges | |
zoom_scale = 1.15 # How much larger the image is than the frame initially | |
zoomed_w = int(resized_w * zoom_scale) | |
zoomed_h = int(resized_h * zoom_scale) | |
clip = clip.resize(newsize=(zoomed_w, zoomed_h)) | |
# --- Determine movement parameters --- | |
max_offset_x = max(0, zoomed_w - target_w) | |
max_offset_y = max(0, zoomed_h - target_h) | |
available_effects = ["zoom-in", "zoom-out", "pan-left", "pan-right", "pan-up", "pan-down", "diag-tl-br", "diag-tr-bl"] | |
if effect_type == "random": | |
effect_type = random.choice(available_effects) | |
elif effect_type not in available_effects: | |
print(f"Warning: Unknown Ken Burns effect '{effect_type}'. Defaulting to zoom-in.") | |
effect_type = "zoom-in" | |
print(f"Applying Ken Burns effect: {effect_type}") | |
# Define start and end positions/zooms based on effect type | |
# Position is the center of the crop window relative to the zoomed image | |
center_x = zoomed_w / 2 | |
center_y = zoomed_h / 2 | |
start_pos = (center_x, center_y) | |
end_pos = (center_x, center_y) | |
start_zoom_factor = 1.0 # Relative to the base zoomed size | |
end_zoom_factor = 1.0 | |
if effect_type == "zoom-in": | |
start_zoom_factor = 1.0 | |
end_zoom_factor = 1.0 / zoom_scale # Zoom in to fill the original zoomed size | |
elif effect_type == "zoom-out": | |
start_zoom_factor = 1.0 / zoom_scale | |
end_zoom_factor = 1.0 | |
elif effect_type == "pan-left": | |
start_pos = (center_x + max_offset_x / 2, center_y) | |
end_pos = (center_x - max_offset_x / 2, center_y) | |
elif effect_type == "pan-right": | |
start_pos = (center_x - max_offset_x / 2, center_y) | |
end_pos = (center_x + max_offset_x / 2, center_y) | |
elif effect_type == "pan-up": | |
start_pos = (center_x, center_y + max_offset_y / 2) | |
end_pos = (center_x, center_y - max_offset_y / 2) | |
elif effect_type == "pan-down": | |
start_pos = (center_x, center_y - max_offset_y / 2) | |
end_pos = (center_x, center_y + max_offset_y / 2) | |
elif effect_type == "diag-tl-br": # Top-Left to Bottom-Right | |
start_pos = (center_x - max_offset_x / 2, center_y - max_offset_y / 2) | |
end_pos = (center_x + max_offset_x / 2, center_y + max_offset_y / 2) | |
elif effect_type == "diag-tr-bl": # Top-Right to Bottom-Left | |
start_pos = (center_x + max_offset_x / 2, center_y - max_offset_y / 2) | |
end_pos = (center_x - max_offset_x / 2, center_y + max_offset_y / 2) | |
# --- Define the transformation function for moviepy's fl --- | |
def transform_frame(get_frame, t): | |
frame = get_frame(t) # Get the frame from the *zoomed* clip at time t | |
# Smooth interpolation (cosine ease-in-out) | |
if clip.duration is None or clip.duration <= 0: | |
ratio = 0 | |
else: | |
ratio = t / clip.duration | |
ratio = 0.5 - 0.5 * math.cos(math.pi * ratio) # Ease in/out | |
# Interpolate zoom and position | |
current_zoom_factor = start_zoom_factor + (end_zoom_factor - start_zoom_factor) * ratio | |
current_center_x = start_pos[0] + (end_pos[0] - start_pos[0]) * ratio | |
current_center_y = start_pos[1] + (end_pos[1] - start_pos[1]) * ratio | |
# Calculate the size of the crop window in the zoomed image coordinates | |
# This needs to be target_w/h divided by the current zoom factor relative to the *original* target size | |
# The base zoom is `zoom_scale`, current relative zoom is `current_zoom_factor` | |
effective_zoom = zoom_scale * current_zoom_factor # This isn't quite right. Let's rethink. | |
# --- Simpler approach: Define crop window size based on target --- | |
# The frame we get *is* the zoomed frame. We need to crop *from* it. | |
# The size of the window we cut *from the zoomed frame* needs to scale inversely with zoom? No. | |
# Let's define the zoom based on the *final output size* relative to the *zoomed clip size*. | |
# If zoom_factor is 1.0, we crop target_w x target_h. | |
# If zoom_factor is < 1.0 (zoomed out), we crop a larger area and scale down. | |
# If zoom_factor is > 1.0 (zoomed in), we crop a smaller area and scale up. | |
# Let's redefine start/end zoom based on the final *visual* zoom level. | |
# zoom_level = 1.0 means the final image fills the target resolution exactly. | |
# zoom_level = 1.1 means the final image is zoomed in by 10%. | |
start_visual_zoom = 1.0 | |
end_visual_zoom = 1.0 | |
if effect_type == "zoom-in": | |
start_visual_zoom = 1.0 | |
end_visual_zoom = zoom_scale # Zoom in to the max pre-zoom | |
elif effect_type == "zoom-out": | |
start_visual_zoom = zoom_scale | |
end_visual_zoom = 1.0 | |
# For pans, visual zoom stays constant at 1.0 | |
current_visual_zoom = start_visual_zoom + (end_visual_zoom - start_visual_zoom) * ratio | |
# Calculate crop window size based on the current visual zoom needed | |
crop_w = int(target_w / current_visual_zoom) | |
crop_h = int(target_h / current_visual_zoom) | |
# Ensure the crop window isn't larger than the actual frame dimensions | |
crop_w = min(crop_w, zoomed_w) | |
crop_h = min(crop_h, zoomed_h) | |
# Clamp the center position to prevent cropping outside the image bounds | |
min_center_x = crop_w / 2 | |
max_center_x = zoomed_w - crop_w / 2 | |
min_center_y = crop_h / 2 | |
max_center_y = zoomed_h - crop_h / 2 | |
clamped_center_x = max(min_center_x, min(current_center_x, max_center_x)) | |
clamped_center_y = max(min_center_y, min(current_center_y, max_center_y)) | |
# Use cv2.getRectSubPix for subpixel accuracy cropping | |
# Input frame should be numpy array | |
if not isinstance(frame, np.ndarray): | |
# This shouldn't happen if using ImageClip, but good check | |
print("Warning: Frame is not numpy array in Ken Burns transform.") | |
return frame # Or handle conversion | |
# Ensure frame is contiguous C-style array if needed by cv2 | |
frame_contiguous = np.ascontiguousarray(frame) | |
try: | |
cropped_frame = cv2.getRectSubPix(frame_contiguous, (crop_w, crop_h), (clamped_center_x, clamped_center_y)) | |
except cv2.error as e: | |
print(f"Error during cv2.getRectSubPix: {e}") | |
print(f" Frame shape: {frame_contiguous.shape}, dtype: {frame_contiguous.dtype}") | |
print(f" Crop size: ({crop_w}, {crop_h})") | |
print(f" Center: ({clamped_center_x}, {clamped_center_y})") | |
# Fallback: return uncropped frame, maybe resized | |
return cv2.resize(frame_contiguous, (target_w, target_h), interpolation=cv2.INTER_LINEAR) | |
# Resize the cropped frame to the target resolution | |
# Use LANCZOS4 for high quality resize | |
resized_frame = cv2.resize(cropped_frame, (target_w, target_h), interpolation=cv2.INTER_LANCZOS4) | |
return resized_frame | |
# Apply the transformation using moviepy's fl method | |
# ismask=False indicates we are transforming the color channels | |
# apply_to=['mask'] would apply only to mask if needed | |
return clip.fl(transform_frame, apply_to='mask') if clip.ismask else clip.fl(transform_frame) | |
except Exception as e: | |
print(f"Error applying Ken Burns effect: {e}") | |
# Return the original clip (possibly resized to fill initially) if effect fails | |
return resize_to_fill(clip, target_resolution) # Fallback to simple resize/crop | |
def resize_to_fill(clip, target_resolution): | |
"""Resize and crop a clip (video or image) to fill the target resolution, maintaining aspect ratio.""" | |
try: | |
target_w, target_h = target_resolution | |
if not hasattr(clip, 'size') or clip.size is None or clip.w is None or clip.h is None or clip.w == 0 or clip.h == 0: | |
print(f"Error: Cannot resize clip with invalid dimensions: size={getattr(clip, 'size', 'N/A')}") | |
# Return a dummy clip or raise error? Let's return None to signal failure. | |
# Or maybe return a black clip of target size? | |
# For now, let's try to return the original clip, maybe it recovers later. | |
print("Returning original clip due to resize error.") | |
return clip | |
clip_w, clip_h = clip.w, clip.h | |
clip_aspect = clip_w / clip_h | |
target_aspect = target_w / target_h | |
if clip_aspect > target_aspect: | |
# Clip is wider than target: Resize based on height, crop width | |
new_h = target_h | |
scale_factor = new_h / clip_h | |
new_w = int(clip_w * scale_factor) | |
clip_resized = clip.resize(height=new_h) # Moviepy handles width automatically if height is set | |
# Calculate cropping amounts (ensure they are integers) | |
crop_amount = int((new_w - target_w) / 2) | |
if crop_amount < 0: crop_amount = 0 # Avoid negative crop | |
# Ensure crop doesn't exceed bounds | |
x1 = crop_amount | |
x2 = new_w - crop_amount | |
# Adjust if rounding caused issues | |
if x2 > new_w: x2 = new_w | |
if x1 >= x2: x1 = 0; x2 = target_w # Fallback if crop is invalid | |
clip_cropped = clip_resized.crop(x1=x1, width=target_w, y1=0, height=target_h) # Use width/height args for crop | |
elif clip_aspect < target_aspect: | |
# Clip is taller than target: Resize based on width, crop height | |
new_w = target_w | |
scale_factor = new_w / clip_w | |
new_h = int(clip_h * scale_factor) | |
clip_resized = clip.resize(width=new_w) # Moviepy handles height automatically | |
crop_amount = int((new_h - target_h) / 2) | |
if crop_amount < 0: crop_amount = 0 | |
y1 = crop_amount | |
y2 = new_h - crop_amount | |
if y2 > new_h: y2 = new_h | |
if y1 >= y2: y1 = 0; y2 = target_h | |
clip_cropped = clip_resized.crop(y1=y1, height=target_h, x1=0, width=target_w) # Use width/height args for crop | |
else: | |
# Aspect ratios match: Just resize | |
clip_cropped = clip.resize(newsize=(target_w, target_h)) | |
# Final check on dimensions | |
if clip_cropped.w != target_w or clip_cropped.h != target_h: | |
print(f"Warning: resize_to_fill resulted in unexpected dimensions ({clip_cropped.w}x{clip_cropped.h}). Attempting final resize.") | |
return clip_cropped.resize(newsize=(target_w, target_h)) | |
return clip_cropped | |
except Exception as e: | |
print(f"Error in resize_to_fill: {e}") | |
print(f"Clip info: duration={getattr(clip, 'duration', 'N/A')}, size={getattr(clip, 'size', 'N/A')}") | |
# Fallback: Try a simple resize without cropping if complex logic failed | |
try: | |
return clip.resize(newsize=target_resolution) | |
except Exception as e_resize: | |
print(f"Fallback resize also failed: {e_resize}") | |
# Return original clip as last resort | |
return clip | |
def find_mp3_files(): | |
"""Search for any MP3 files in the current directory and subdirectories.""" | |
# This function is no longer used as music is uploaded via Gradio and copied to "music.mp3" | |
# Keeping it here for potential future use or reference. | |
mp3_files = [] | |
try: | |
for root, dirs, files in os.walk('.'): | |
for file in files: | |
if file.lower().endswith('.mp3'): | |
mp3_path = os.path.join(root, file) | |
mp3_files.append(mp3_path) | |
print(f"Found MP3 file: {mp3_path}") | |
return mp3_files[0] if mp3_files else None | |
except Exception as e: | |
print(f"Error searching for MP3 files: {e}") | |
return None | |
def add_background_music(final_video, bg_music_volume=0.10): | |
"""Add background music using 'music.mp3' if it exists.""" | |
try: | |
# Expect the music file to be named 'music.mp3' in the current directory | |
bg_music_path = "music.mp3" | |
if os.path.exists(bg_music_path) and os.path.getsize(bg_music_path) > 100: | |
print(f"Adding background music from: {bg_music_path}") | |
bg_music = AudioFileClip(bg_music_path) | |
# Ensure video has audio track to mix with | |
if final_video.audio is None: | |
print("Warning: Video has no primary audio track. Adding only background music.") | |
# Create silent audio matching video duration if needed | |
if bg_music.duration < final_video.duration: | |
loops_needed = math.ceil(final_video.duration / bg_music.duration) | |
bg_music = concatenate_audioclips([bg_music] * loops_needed) | |
final_audio = bg_music.subclip(0, final_video.duration).volumex(bg_music_volume) | |
else: | |
# Loop or trim background music to match video duration | |
if bg_music.duration < final_video.duration: | |
loops_needed = math.ceil(final_video.duration / bg_music.duration) | |
# Check if looping is feasible | |
if loops_needed > 100: # Avoid excessive looping | |
print(f"Warning: Background music is very short ({bg_music.duration:.1f}s) compared to video ({final_video.duration:.1f}s). Looping capped.") | |
loops_needed = 100 | |
bg_segments = [bg_music] * int(loops_needed) | |
try: | |
bg_music_looped = concatenate_audioclips(bg_segments) | |
except Exception as e_concat: | |
print(f"Error concatenating audio for looping: {e_concat}. Using single instance.") | |
bg_music_looped = bg_music # Fallback to single instance | |
bg_music = bg_music_looped | |
# Trim precisely to video duration | |
bg_music = bg_music.subclip(0, final_video.duration) | |
# Apply volume adjustment | |
bg_music = bg_music.volumex(bg_music_volume) | |
# Mix audio tracks | |
video_audio = final_video.audio | |
# Ensure both clips have the same duration before compositing | |
if abs(video_audio.duration - bg_music.duration) > 0.1: | |
print(f"Warning: Audio duration mismatch before mixing (Vid: {video_audio.duration:.2f}s, BG: {bg_music.duration:.2f}s). Adjusting BG music.") | |
bg_music = bg_music.set_duration(video_audio.duration) | |
mixed_audio = CompositeAudioClip([video_audio, bg_music]) | |
final_audio = mixed_audio | |
# Set the composite audio to the video | |
final_video = final_video.set_audio(final_audio) | |
print(f"Background music added successfully (Volume: {bg_music_volume:.2f})") | |
else: | |
print("Background music file 'music.mp3' not found or is empty. Skipping background music.") | |
return final_video | |
except Exception as e: | |
print(f"Error adding background music: {e}") | |
print("Continuing without background music.") | |
# Return the video without the potentially failed audio modification | |
return final_video.set_audio(final_video.audio) # Ensure audio is reset if it failed mid-process | |
# --- NEW create_clip Function --- | |
def create_clip(media_path, asset_type, tts_path, duration=None, effects=None, narration_text=None, segment_index=0): | |
"""Create a video clip with synchronized subtitles and narration.""" | |
try: | |
print(f"--- Creating Clip #{segment_index+1} ---") | |
print(f" Media: {asset_type} at {os.path.basename(media_path)}") | |
print(f" TTS: {os.path.basename(tts_path)}") | |
print(f" Narration: '{narration_text[:50]}...'") | |
if not media_path or not os.path.exists(media_path) or os.path.getsize(media_path) < 100: | |
print(f"Error: Invalid or missing media file: {media_path}") | |
return None | |
if not tts_path or not os.path.exists(tts_path) or os.path.getsize(tts_path) < 100: | |
print(f"Error: Invalid or missing TTS file: {tts_path}") | |
# Attempt to use silent audio as fallback? | |
print("Attempting to generate silent audio as fallback.") | |
# Use the estimated duration from parse_script if available | |
fallback_duration = duration if duration else 3.0 | |
tts_path = generate_silent_audio(fallback_duration) | |
if not tts_path: | |
print("Error: Failed to generate fallback silent audio. Cannot create clip.") | |
return None # Critical failure if no audio | |
# Load audio first to get accurate duration | |
try: | |
audio_clip = AudioFileClip(tts_path) | |
# Apply slight fade out to prevent abrupt cuts | |
audio_clip = audio_clip.audio_fadeout(0.1) | |
audio_duration = audio_clip.duration | |
if audio_duration <= 0.1: # Check for very short/empty audio | |
print(f"Warning: Audio duration is very short ({audio_duration:.2f}s). Adjusting target duration.") | |
audio_duration = max(audio_duration, 1.0) # Ensure at least 1s duration | |
except Exception as e: | |
print(f"Error loading audio file {tts_path}: {e}") | |
print("Using estimated duration and generating silence.") | |
audio_duration = duration if duration else 3.0 | |
silent_audio_path = generate_silent_audio(audio_duration) | |
if not silent_audio_path: return None # Cannot proceed without audio | |
audio_clip = AudioFileClip(silent_audio_path) | |
# Add a small buffer to the target duration for visuals | |
target_duration = audio_duration + 0.2 # e.g., 0.2s buffer | |
print(f" Audio Duration: {audio_duration:.2f}s, Target Visual Duration: {target_duration:.2f}s") | |
# Create base visual clip (video or image) | |
clip = None | |
if asset_type == "video": | |
try: | |
clip = VideoFileClip(media_path) | |
# Ensure video duration is sufficient, loop/subclip as needed | |
if clip.duration < target_duration: | |
print(f" Looping video (duration {clip.duration:.2f}s) to match target {target_duration:.2f}s") | |
# Use loop method carefully, might cause issues if duration is very short | |
# Alternative: freeze last frame? For now, loop. | |
clip = clip.loop(duration=target_duration) | |
else: | |
# Start from beginning, take required duration | |
clip = clip.subclip(0, target_duration) | |
# Resize/crop video to fill target resolution *after* duration adjustment | |
clip = resize_to_fill(clip, TARGET_RESOLUTION) | |
# Apply fade-in/out to video clips too | |
clip = clip.fadein(0.3).fadeout(0.3) | |
except Exception as e: | |
print(f"Error processing video file {media_path}: {e}") | |
# Fallback to generating a color background if video fails | |
fallback_media = generate_media("abstract", current_index=segment_index, total_segments=0) # Use a simple fallback | |
if fallback_media and fallback_media['asset_type'] == 'image': | |
print("Falling back to generated image due to video error.") | |
asset_type = 'image' | |
media_path = fallback_media['path'] | |
else: | |
print("ERROR: Video processing failed, and fallback media generation failed.") | |
return None # Cannot proceed | |
# This needs to handle the case where video processing failed and fell back to image | |
if asset_type == "image": | |
try: | |
# Check image validity again before creating ImageClip | |
img = Image.open(media_path) | |
img.verify() | |
img.close() # Close after verify | |
# Create ImageClip and set duration | |
clip = ImageClip(media_path).set_duration(target_duration) | |
# Apply Ken Burns effect (which includes resizing) | |
clip = apply_kenburns_effect(clip, TARGET_RESOLUTION, effect_type=effects or "random") # Use specified or random effect | |
# Apply fade-in/out (Ken Burns function doesn't handle this) | |
clip = clip.fadein(0.3).fadeout(0.3) | |
except Exception as e: | |
print(f"Error processing image file {media_path}: {e}") | |
return None # Fail if image processing has critical error | |
if clip is None: | |
print("Error: Visual clip (video or image) could not be created.") | |
return None | |
# --- SUBTITLE GENERATION START --- | |
if narration_text and CAPTION_COLOR != "transparent" and audio_duration > 0.1: # Avoid captions on silent/very short clips | |
print(f" Adding Captions (Color: {CAPTION_COLOR}, Size: {font_size})") | |
subtitle_clips = [] | |
try: | |
words = narration_text.split() | |
# Dynamic chunking: aim for ~3-7 words per chunk, max ~3 seconds per chunk | |
max_words_per_chunk = 6 | |
min_words_per_chunk = 3 | |
max_duration_per_chunk = 3.0 # seconds | |
chunks_data = [] # Stores (text, start_time, end_time) | |
current_chunk_words = [] | |
current_chunk_start_time = 0.0 | |
approx_time_per_word = audio_duration / len(words) if words else 0 | |
for i, word in enumerate(words): | |
current_chunk_words.append(word) | |
current_word_end_time = current_chunk_start_time + len(current_chunk_words) * approx_time_per_word | |
# Check if chunk should end | |
time_condition = (current_word_end_time - current_chunk_start_time) >= max_duration_per_chunk | |
word_count_condition = len(current_chunk_words) >= max_words_per_chunk | |
is_last_word = (i == len(words) - 1) | |
# End chunk if time/word limit reached, or if it's the last word | |
# Ensure minimum word count unless it's the last segment | |
if ( (time_condition or word_count_condition) and len(current_chunk_words) >= min_words_per_chunk ) or is_last_word: | |
chunk_text = ' '.join(current_chunk_words) | |
# Ensure end time doesn't exceed total audio duration | |
chunk_end_time = min(current_word_end_time, audio_duration) | |
# Prevent zero-duration chunks | |
if chunk_end_time > current_chunk_start_time: | |
chunks_data.append((chunk_text, current_chunk_start_time, chunk_end_time)) | |
# Prepare for next chunk | |
current_chunk_start_time = chunk_end_time | |
current_chunk_words = [] | |
# If loop finished but last chunk wasn't added (e.g., few words left) | |
if current_chunk_words: | |
chunk_text = ' '.join(current_chunk_words) | |
chunk_end_time = audio_duration # Last chunk goes to the end | |
if chunk_end_time > current_chunk_start_time: | |
chunks_data.append((chunk_text, current_chunk_start_time, chunk_end_time)) | |
# Calculate subtitle position (e.g., 80% down the screen) | |
subtitle_y_position = int(TARGET_RESOLUTION[1] * 0.80) # Lower position | |
# Create TextClip for each chunk | |
for chunk_text, start_time, end_time in chunks_data: | |
# Ensure duration is positive | |
chunk_duration = end_time - start_time | |
if chunk_duration <= 0.05: # Skip tiny duration chunks | |
continue | |
try: | |
# Use global font_size here | |
txt_clip = TextClip( | |
txt=chunk_text, | |
fontsize=font_size, # Use global variable | |
font='Arial-Bold', # Consider making font configurable? | |
color=CAPTION_COLOR, | |
bg_color='rgba(0, 0, 0, 0.4)', # Slightly darker background | |
method='caption', # Handles word wrapping | |
align='center', | |
stroke_color='black', # Black stroke for better contrast | |
stroke_width=1.5, | |
# Adjust size: 85% of width, height automatic | |
size=(TARGET_RESOLUTION[0] * 0.85, None) | |
).set_start(start_time).set_duration(chunk_duration) # Use duration | |
# Position the text clip | |
txt_clip = txt_clip.set_position(('center', subtitle_y_position)) | |
subtitle_clips.append(txt_clip) | |
except Exception as e_textclip: | |
# Handle potential errors from TextClip generation (e.g., font not found) | |
print(f"Error creating TextClip for chunk '{chunk_text}': {e_textclip}") | |
# Optionally add a fallback simple text clip here if needed | |
# Overlay the list of subtitle clips onto the main video/image clip | |
if subtitle_clips: | |
clip = CompositeVideoClip([clip] + subtitle_clips) | |
else: | |
print("Warning: No subtitle clips were generated despite text being present.") | |
except Exception as sub_error: | |
# Fallback: If complex chunking/styling fails, display the whole text simply | |
print(f"Subtitle generation error: {sub_error}. Using fallback simple text.") | |
try: | |
txt_clip = TextClip( | |
narration_text, | |
fontsize=int(font_size * 0.8), # Slightly smaller for full text | |
color=CAPTION_COLOR, | |
font='Arial', # Simpler font for fallback | |
align='center', | |
method='caption', | |
bg_color='rgba(0, 0, 0, 0.4)', | |
size=(TARGET_RESOLUTION[0] * 0.8, None) # Max width | |
).set_position(('center', subtitle_y_position)).set_duration(clip.duration) # Show for full clip duration | |
# Overlay the single fallback text clip | |
clip = CompositeVideoClip([clip, txt_clip]) | |
except Exception as e_fallback_text: | |
print(f"Error creating fallback TextClip: {e_fallback_text}") | |
# Proceed without captions if fallback also fails | |
# --- SUBTITLE GENERATION END --- | |
# Set the audio track to the final clip | |
clip = clip.set_audio(audio_clip) | |
# Final duration check/adjustment (optional but good practice) | |
if abs(clip.duration - target_duration) > 0.1: | |
print(f"Warning: Final clip duration ({clip.duration:.2f}s) differs from target ({target_duration:.2f}s). Adjusting.") | |
clip = clip.set_duration(target_duration) | |
print(f"--- Clip #{segment_index+1} created successfully (Duration: {clip.duration:.2f}s) ---") | |
return clip | |
except Exception as e: | |
print(f"*************** FATAL ERROR in create_clip (Segment {segment_index+1}) ***************") | |
import traceback | |
traceback.print_exc() # Print detailed traceback | |
print(f"Error details: {str(e)}") | |
print(f" Media Path: {media_path}") | |
print(f" TTS Path: {tts_path}") | |
print(f" Asset Type: {asset_type}") | |
print("**************************************************************************") | |
return None # Return None on failure | |
def fix_imagemagick_policy(): | |
"""Attempts to fix ImageMagick security policies on Linux systems.""" | |
# This is often needed for TextClip with complex features (backgrounds, strokes) on Colab/Linux. | |
# It might require sudo privileges. | |
policy_fixed = False | |
try: | |
print("Attempting to fix ImageMagick security policies (may require sudo)...") | |
# Common paths for ImageMagick policy files | |
policy_paths = [ | |
"/etc/ImageMagick-6/policy.xml", | |
"/etc/ImageMagick-7/policy.xml", | |
"/etc/ImageMagick/policy.xml", | |
"/usr/local/etc/ImageMagick-7/policy.xml", | |
# Add other potential paths if needed | |
] | |
found_policy = None | |
for path in policy_paths: | |
if os.path.exists(path): | |
found_policy = path | |
break | |
if not found_policy: | |
print("ImageMagick policy.xml not found in common locations. Skipping policy fix.") | |
print("TextClip features might be limited if default policies are restrictive.") | |
return False # Indicate policy wasn't found/fixed | |
print(f"Found policy file: {found_policy}. Attempting to modify...") | |
# Commands to relax restrictions (use with caution) | |
# Backup the original file first | |
backup_cmd = f"sudo cp {found_policy} {found_policy}.bak" | |
# Allow read/write for common formats (including text/caption) | |
sed_cmd_rights = f"sudo sed -i 's/rights=\"none\" pattern=\"PS\"/rights=\"read|write\" pattern=\"PS\"/' {found_policy}; " \ | |
f"sudo sed -i 's/rights=\"none\" pattern=\"EPS\"/rights=\"read|write\" pattern=\"EPS\"/' {found_policy}; " \ | |
f"sudo sed -i 's/rights=\"none\" pattern=\"PDF\"/rights=\"read|write\" pattern=\"PDF\"/' {found_policy}; " \ | |
f"sudo sed -i 's/rights=\"none\" pattern=\"XPS\"/rights=\"read|write\" pattern=\"XPS\"/' {found_policy}; " \ | |
f"sudo sed -i 's/rights=\"none\" pattern=\"LABEL\"/rights=\"read|write\" pattern=\"LABEL\"/' {found_policy}; " \ | |
f"sudo sed -i 's/rights=\"none\" pattern=\"caption\"/rights=\"read|write\" pattern=\"caption\"/' {found_policy}; " \ | |
f"sudo sed -i 's/rights=\"none\" pattern=\"TEXT\"/rights=\"read|write\" pattern=\"TEXT\"/' {found_policy}" | |
# Allow read/write for path operations (needed for fonts, temp files) | |
sed_cmd_path = f"sudo sed -i 's/<policy domain=\"path\" rights=\"none\" pattern=\"@\*\"/>/<policy domain=\"path\" rights=\"read|write\" pattern=\"@*\"/>/' {found_policy}" | |
print("Executing policy modification commands (requires sudo)...") | |
backup_status = os.system(backup_cmd) | |
if backup_status == 0: | |
print("Policy file backed up.") | |
rights_status = os.system(sed_cmd_rights) | |
path_status = os.system(sed_cmd_path) | |
if rights_status == 0 and path_status == 0: | |
print("ImageMagick policies potentially updated successfully.") | |
policy_fixed = True | |
else: | |
print("Error executing policy modification commands. Check sudo permissions and sed syntax.") | |
else: | |
print("Error backing up policy file. Aborting modifications.") | |
# Optional: Restart services if needed (usually not required just for policy changes) | |
# os.system("sudo systemctl restart imagemagick") # Example | |
return policy_fixed | |
except Exception as e: | |
print(f"Error occurred during ImageMagick policy fix: {e}") | |
return False | |
# ---------------- Main Video Generation Function ---------------- # | |
def generate_video(user_input, resolution, caption_option): | |
"""Generate a video based on user input via Gradio. Uses global settings.""" | |
global TARGET_RESOLUTION, CAPTION_COLOR, TEMP_FOLDER | |
start_time = time.time() | |
print("\n=============================================") | |
print("======= STARTING VIDEO GENERATION =======") | |
print(f" Concept: '{user_input[:100]}...'") | |
print(f" Resolution: {resolution}") | |
print(f" Captions: {caption_option}") | |
print(f" Voice: {selected_voice} (Speed: {voice_speed})") | |
print(f" BG Music Vol: {bg_music_volume}, FPS: {fps}, Preset: {preset}") | |
print(f" Video Clip Prob: {video_clip_probability*100}%, Caption Size: {font_size}") | |
print("=============================================\n") | |
# --- Setup --- | |
if not OPENROUTER_API_KEY or not PEXELS_API_KEY: | |
print("ERROR: API keys (OpenRouter or Pexels) are missing!") | |
# Gradio doesn't handle exceptions well, return None or error message? | |
# For now, print and return None. Consider adding gr.Error later. | |
return None | |
# Set resolution | |
if resolution == "Full": # 16:9 Landscape | |
TARGET_RESOLUTION = (1920, 1080) | |
elif resolution == "Short": # 9:16 Portrait | |
TARGET_RESOLUTION = (1080, 1920) | |
else: | |
print(f"Warning: Unknown resolution '{resolution}'. Defaulting to Full HD (1920x1080).") | |
TARGET_RESOLUTION = (1920, 1080) | |
# Set caption color based on user choice | |
CAPTION_COLOR = "white" if caption_option == "Yes" else "transparent" | |
# Create a unique temporary folder for this run | |
try: | |
TEMP_FOLDER = tempfile.mkdtemp() | |
print(f"Temporary folder created: {TEMP_FOLDER}") | |
except Exception as e: | |
print(f"FATAL ERROR: Could not create temporary folder: {e}") | |
return None # Cannot proceed without temp folder | |
# Fix ImageMagick policy (important for captions) | |
fix_success = fix_imagemagick_policy() | |
if not fix_success: | |
print("Continuing without guaranteed ImageMagick policy fix. Captions might have issues.") | |
# --- End Setup --- | |
# --- Script Generation --- | |
print("\n--- Generating Script ---") | |
script = generate_script(user_input) | |
if not script: | |
print("FATAL ERROR: Failed to generate script from API.") | |
shutil.rmtree(TEMP_FOLDER) # Clean up temp folder on failure | |
return None | |
print("Generated Script:\n", script) # Print the full script for debugging | |
# --- End Script Generation --- | |
# --- Script Parsing --- | |
print("\n--- Parsing Script ---") | |
elements = parse_script(script) | |
if not elements: | |
print("FATAL ERROR: Failed to parse script into elements. Check script format and parsing logic.") | |
shutil.rmtree(TEMP_FOLDER) | |
return None | |
num_segments = len(elements) // 2 | |
print(f"Parsed {num_segments} script segments.") | |
if num_segments == 0: | |
print("Warning: Script parsed into 0 segments. No video will be generated.") | |
shutil.rmtree(TEMP_FOLDER) | |
return None | |
# --- End Script Parsing --- | |
# --- Pair Elements (Media + TTS) --- | |
paired_elements = [] | |
for i in range(0, len(elements), 2): | |
if i + 1 < len(elements) and elements[i]['type'] == 'media' and elements[i+1]['type'] == 'tts': | |
paired_elements.append((elements[i], elements[i + 1])) | |
else: | |
print(f"Warning: Skipping invalid element pair at index {i}. Expected media then tts.") | |
if not paired_elements: | |
print("FATAL ERROR: No valid media-tts pairs found after parsing.") | |
shutil.rmtree(TEMP_FOLDER) | |
return None | |
# --- End Pairing --- | |
# --- Clip Creation Loop --- | |
print("\n--- Creating Individual Clips ---") | |
clips = [] | |
successful_clips = 0 | |
for idx, (media_elem, tts_elem) in enumerate(paired_elements): | |
print(f"\n>>> Processing Segment {idx+1}/{len(paired_elements)}: Prompt '{media_elem.get('prompt', 'N/A')}'") | |
# 1. Generate Media Asset | |
media_asset = generate_media( | |
media_elem['prompt'], | |
current_index=idx, | |
total_segments=len(paired_elements) | |
) | |
if not media_asset or not media_asset.get('path'): | |
print(f"ERROR: Failed to generate media for segment {idx+1}. Skipping segment.") | |
continue # Skip this segment | |
# 2. Generate TTS | |
tts_path = generate_tts(tts_elem['text'], tts_elem['voice']) | |
if not tts_path: | |
print(f"ERROR: Failed to generate TTS for segment {idx+1}. Skipping segment.") | |
# Clean up the potentially downloaded media asset if TTS failed | |
if media_asset and os.path.exists(media_asset['path']): | |
try: os.remove(media_asset['path']) | |
except OSError: pass | |
continue # Skip this segment | |
# 3. Create the Clip (Visual + Audio + Subtitles) | |
clip = create_clip( | |
media_path=media_asset['path'], | |
asset_type=media_asset['asset_type'], | |
tts_path=tts_path, | |
duration=tts_elem.get('duration'), # Pass estimated duration for potential fallback | |
effects=media_elem.get('effects'), | |
narration_text=tts_elem['text'], | |
segment_index=idx | |
) | |
if clip: | |
# Validate clip duration and dimensions before adding | |
if clip.duration > 0 and clip.w == TARGET_RESOLUTION[0] and clip.h == TARGET_RESOLUTION[1]: | |
clips.append(clip) | |
successful_clips += 1 | |
print(f">>> Segment {idx+1} processed successfully.") | |
else: | |
print(f"ERROR: Clip for segment {idx+1} has invalid duration ({clip.duration}) or dimensions ({clip.w}x{clip.h}). Skipping.") | |
# Clean up resources associated with the failed clip | |
clip.close() # Close moviepy resources if possible | |
# Files in TEMP_FOLDER will be cleaned later, no need to delete individually here unless necessary | |
else: | |
print(f"ERROR: Clip creation failed for segment {idx+1}. See errors above.") | |
# Files in TEMP_FOLDER will be cleaned later | |
# --- End Clip Creation Loop --- | |
# --- Final Video Assembly --- | |
if not clips: | |
print("\nFATAL ERROR: No clips were successfully created. Cannot generate video.") | |
shutil.rmtree(TEMP_FOLDER) | |
return None | |
print(f"\n--- Assembling Final Video ({len(clips)} clips) ---") | |
try: | |
# Concatenate clips | |
final_video = concatenate_videoclips(clips, method="compose") # 'compose' is generally safer | |
print(f"Clips concatenated. Total duration before music: {final_video.duration:.2f}s") | |
# Add background music | |
final_video = add_background_music(final_video, bg_music_volume=bg_music_volume) | |
# Write the final video file | |
print(f"Exporting final video to '{OUTPUT_VIDEO_FILENAME}' (FPS: {fps}, Preset: {preset})...") | |
# Use threads based on CPU count? Maybe default is fine. logger='bar' for progress bar | |
final_video.write_videofile( | |
OUTPUT_VIDEO_FILENAME, | |
codec='libx264', # Common, good quality codec | |
audio_codec='aac', # Common audio codec | |
fps=fps, | |
preset=preset, # Controls encoding speed vs compression | |
threads=4, # Use multiple threads if available | |
logger='bar' # Show progress bar | |
) | |
print(f"Final video saved successfully as {OUTPUT_VIDEO_FILENAME}") | |
# Close clips to release resources | |
for clip in clips: | |
clip.close() | |
final_video.close() | |
except Exception as e: | |
print(f"FATAL ERROR during final video assembly or writing: {e}") | |
import traceback | |
traceback.print_exc() | |
shutil.rmtree(TEMP_FOLDER) | |
return None | |
# --- End Final Video Assembly --- | |
# --- Cleanup --- | |
print("\n--- Cleaning Up Temporary Files ---") | |
try: | |
shutil.rmtree(TEMP_FOLDER) | |
print(f"Temporary folder removed: {TEMP_FOLDER}") | |
except Exception as e: | |
print(f"Warning: Could not remove temporary folder {TEMP_FOLDER}: {e}") | |
# --- End Cleanup --- | |
end_time = time.time() | |
total_time = end_time - start_time | |
print("\n=============================================") | |
print("======= VIDEO GENERATION COMPLETE =======") | |
print(f" Total time: {total_time:.2f} seconds") | |
print("=============================================\n") | |
# Return the path to the generated video file | |
return OUTPUT_VIDEO_FILENAME | |
# ---------------- Gradio Interface Setup ---------------- # | |
# Dictionary mapping user-friendly names to Kokoro voice IDs | |
VOICE_CHOICES = { | |
'Emma (Female)': 'af_heart', 'Bella (Female)': 'af_bella', 'Nicole (Female)': 'af_nicole', | |
'Aoede (Female)': 'af_aoede', 'Kore (Female)': 'af_kore', 'Sarah (Female)': 'af_sarah', | |
'Nova (Female)': 'af_nova', 'Sky (Female)': 'af_sky', 'Alloy (Female)': 'af_alloy', | |
'Jessica (Female)': 'af_jessica', 'River (Female)': 'af_river', | |
'Michael (Male)': 'am_michael', 'Fenrir (Male)': 'am_fenrir', 'Puck (Male)': 'am_puck', | |
'Echo (Male)': 'am_echo', 'Eric (Male)': 'am_eric', 'Liam (Male)': 'am_liam', | |
'Onyx (Male)': 'am_onyx', 'Santa (Male)': 'am_santa', 'Adam (Male)': 'am_adam', | |
'Emma 🇬🇧 (Female)': 'bf_emma', 'Isabella 🇬🇧 (Female)': 'bf_isabella', 'Alice 🇬🇧 (Female)': 'bf_alice', | |
'Lily 🇬🇧 (Female)': 'bf_lily', 'George 🇬🇧 (Male)': 'bm_george', 'Fable 🇬🇧 (Male)': 'bm_fable', | |
'Lewis 🇬🇧 (Male)': 'bm_lewis', 'Daniel 🇬🇧 (Male)': 'bm_daniel' | |
} | |
def generate_video_with_options(user_input, resolution, caption_option, music_file, voice, vclip_prob, bg_vol, video_fps, video_preset, v_speed, caption_size): | |
"""Wrapper function for Gradio to set global options before calling generate_video.""" | |
global selected_voice, voice_speed, font_size, video_clip_probability, bg_music_volume, fps, preset | |
print("--- Updating Settings from Gradio ---") | |
# Update global variables with user selections from Gradio interface | |
selected_voice = VOICE_CHOICES.get(voice, 'af_heart') # Get voice ID, default if key not found | |
voice_speed = v_speed | |
font_size = caption_size | |
video_clip_probability = vclip_prob / 100.0 # Convert percentage to decimal | |
bg_music_volume = bg_vol | |
fps = video_fps | |
preset = video_preset | |
# Handle music upload: Copy uploaded file to a standard name 'music.mp3' | |
target_music_path = "music.mp3" | |
# Remove previous music file if it exists | |
if os.path.exists(target_music_path): | |
try: | |
os.remove(target_music_path) | |
print(f"Removed previous '{target_music_path}'") | |
except OSError as e: | |
print(f"Warning: Could not remove previous music file: {e}") | |
if music_file is not None: | |
try: | |
# music_file is a TemporaryFileWrapper object in Gradio >= 3.0 | |
shutil.copy(music_file.name, target_music_path) | |
print(f"Uploaded music '{os.path.basename(music_file.name)}' copied to '{target_music_path}'") | |
except Exception as e: | |
print(f"Error copying uploaded music file: {e}") | |
# Continue without background music if copy fails | |
else: | |
print("No background music file uploaded.") | |
# Call the main video generation function with the core inputs | |
# The function will use the global variables updated above | |
try: | |
video_path = generate_video(user_input, resolution, caption_option) | |
# Check if video generation failed (returned None) | |
if video_path is None: | |
# Raise a Gradio error to display it in the interface | |
raise gr.Error("Video generation failed. Please check the console logs for details.") | |
return video_path | |
except gr.Error as e: | |
# Re-raise Gradio errors to show them in the UI | |
raise e | |
except Exception as e: | |
# Catch unexpected errors during generation | |
print(f"An unexpected error occurred in generate_video_with_options: {e}") | |
import traceback | |
traceback.print_exc() | |
# Raise a Gradio error for unexpected issues | |
raise gr.Error(f"An unexpected error occurred: {e}. Check logs.") | |
# Create the Gradio interface definition | |
with gr.Blocks(theme=gr.themes.Soft()) as iface: # Using Blocks for better layout control | |
gr.Markdown("# 🤖 AI Documentary Video Generator") | |
gr.Markdown("Create short, funny documentary-style videos with AI narration and stock footage. Customize voice, music, captions, and more.") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
user_input = gr.Textbox(label="🎬 Video Concept / Script", placeholder="Enter your video topic (e.g., 'Top 5 facts about cats') or paste a full script formatted like the example...", lines=4) | |
with gr.Accordion("Example Script Format", open=False): | |
gr.Markdown(""" | |
``` | |
[Cats] | |
Cats: tiny ninjas plotting world domination. | |
[Sleeping] | |
They sleep 23 hours a day, planning. | |
[Boxes] | |
Their mortal enemy? The empty box. It must be contained. | |
[Zoomies] | |
Suddenly, zoomies! Because reasons. | |
[Subscribe] | |
Subscribe now, or a cat will judge you silently. Forever. | |
``` | |
**Rules:** | |
- Start each scene with `[Search Term]` (1-2 words for Pexels). | |
- Follow with 5-15 words of narration. | |
- Keep it funny and conversational. | |
- End with a subscribe line related to the topic. | |
""") | |
with gr.Column(scale=1): | |
resolution = gr.Radio(["Full", "Short"], label="📐 Resolution", value="Full", info="Full=16:9 (YouTube), Short=9:16 (TikTok/Shorts)") | |
caption_option = gr.Radio(["Yes", "No"], label="✍️ Add Captions?", value="Yes") | |
music_file = gr.File(label="🎵 Upload Background Music (Optional MP3)", file_types=[".mp3"], type="file") # Use type="file" | |
with gr.Accordion("⚙️ Advanced Settings", open=False): | |
with gr.Row(): | |
voice = gr.Dropdown(choices=list(VOICE_CHOICES.keys()), label="🗣️ Choose Voice", value="Emma (Female)") | |
v_speed = gr.Slider(minimum=0.5, maximum=1.5, value=0.9, step=0.05, label="💨 Voice Speed", info="0.5=Slow, 1.0=Normal, 1.5=Fast") | |
with gr.Row(): | |
caption_size = gr.Slider(minimum=20, maximum=100, value=45, step=1, label="🔡 Caption Font Size") | |
vclip_prob = gr.Slider(minimum=0, maximum=100, value=25, step=5, label="🎞️ Video Clip %", info="Chance of using a video clip instead of an image for a scene.") | |
with gr.Row(): | |
bg_vol = gr.Slider(minimum=0.0, maximum=1.0, value=0.08, step=0.01, label="🔉 BG Music Volume", info="0.0=Silent, 1.0=Full Volume") | |
video_fps = gr.Slider(minimum=15, maximum=60, value=30, step=1, label="🎬 Video FPS") | |
video_preset = gr.Dropdown( | |
choices=["ultrafast", "superfast", "veryfast", "faster", "fast", "medium", "slow", "slower", "veryslow"], | |
value="veryfast", label="⚙️ Export Quality/Speed", info="Faster presets = lower quality/size, Slower presets = higher quality/size." | |
) | |
submit_button = gr.Button("✨ Generate Video ✨", variant="primary") | |
output_video = gr.Video(label="Generated Video") | |
# Define the action when the button is clicked | |
submit_button.click( | |
fn=generate_video_with_options, | |
inputs=[ | |
user_input, resolution, caption_option, music_file, voice, | |
vclip_prob, bg_vol, video_fps, video_preset, v_speed, caption_size | |
], | |
outputs=output_video | |
) | |
gr.Markdown("---") | |
gr.Markdown("⚠️ **Note:** Video generation can take several minutes, especially on CPU. Check console logs for progress.") | |
# Launch the interface | |
if __name__ == "__main__": | |
# Ensure API keys are set before launching | |
if not PEXELS_API_KEY or not OPENROUTER_API_KEY: | |
print("####################################################################") | |
print("ERROR: PEXELS_API_KEY or OPENROUTER_API_KEY is not set!") | |
print("Please set these variables at the top of the script before running.") | |
print("####################################################################") | |
# Optionally exit if keys are missing | |
# exit(1) | |
else: | |
print("API Keys seem to be set. Launching Gradio interface...") | |
iface.launch(share=True, debug=True) # Enable share=True for public link, debug=True for more logs |