Spaces:
Build error
Build error
from kokoro import KPipeline | |
import soundfile as sf | |
import torch | |
import soundfile as sf | |
import os | |
from moviepy.editor import VideoFileClip, AudioFileClip, ImageClip, ColorClip # Added ColorClip | |
from PIL import Image | |
import tempfile | |
import random | |
import cv2 | |
import math | |
import os, requests, io, time, re, random | |
from moviepy.editor import ( | |
VideoFileClip, concatenate_videoclips, AudioFileClip, ImageClip, | |
CompositeVideoClip, TextClip, CompositeAudioClip | |
) | |
import moviepy.video.fx.all as vfx | |
import moviepy.config as mpy_config | |
from pydub import AudioSegment | |
from pydub.generators import Sine | |
from PIL import Image, ImageDraw, ImageFont | |
import numpy as np | |
from bs4 import BeautifulSoup | |
import base64 | |
from urllib.parse import quote | |
# pysrt is imported but not used in the provided code snippets, keeping for completeness | |
# import pysrt | |
from gtts import gTTS | |
import gradio as gr # Import Gradio | |
import shutil # Needed for temp folder cleanup | |
# Initialize Kokoro TTS pipeline (using American English) | |
# Ensure you have the required voice models downloaded for Kokoro if needed, | |
# or it will fall back to gTTS. 'a' for American English uses voice 'af_heart'. | |
try: | |
pipeline = KPipeline(lang_code='a') | |
print("Kokoro TTS pipeline initialized.") | |
except Exception as e: | |
print(f"Warning: Could not initialize Kokoro TTS pipeline: {e}. Will rely on gTTS.") | |
pipeline = None # Set pipeline to None if initialization fails | |
# Ensure ImageMagick binary is set (Adjust path as needed for your system) | |
# This line requires imagemagick to be installed and the path correct. | |
# If TextClip fails, check ImageMagick installation and policy.xml (handled by fix_imagemagick_policy). | |
# Common paths: "/usr/bin/convert", "/usr/local/bin/convert", "C:\\Program Files\\ImageMagick-X.Y.Z-Q16\\convert.exe" | |
# You might need to adjust this based on your OS and installation | |
IMAGEMAGICK_BINARY_PATH = "/usr/bin/convert" # Default path, check your system | |
if not os.path.exists(IMAGEMAGICK_BINARY_PATH): | |
print(f"Warning: ImageMagick binary not found at {IMAGEMAGICK_BINARY_PATH}. TextClip may not work.") | |
print("Please install ImageMagick or update the IMAGEMAGICK_BINARY_PATH.") | |
mpy_config.change_settings({"IMAGEMAGICK_BINARY": IMAGEMAGICK_BINARY_PATH}) | |
# ---------------- Global Configuration ---------------- # | |
# Using the user's provided API keys | |
PEXELS_API_KEY = 'BhJqbcdm9Vi90KqzXKAhnEHGsuFNv4irXuOjWtT761U49lRzo03qBGna' | |
OPENROUTER_API_KEY = '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' | |
OPENROUTER_MODEL = "mistralai/mistral-small-3.1-24b-instruct:free" # Or another preferred model | |
OUTPUT_VIDEO_FILENAME = "final_video.mp4" | |
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" | |
# Maximum number of script segments to display for editing | |
MAX_SEGMENTS_FOR_EDITING = 15 | |
# Global placeholder for the temporary folder, will be created per run | |
TEMP_FOLDER = None | |
# ---------------- Helper Functions ---------------- # | |
# Most helper functions remain the same, but some might need minor adjustments | |
# to use passed parameters instead of global state (e.g., resolution). | |
# We'll update create_clip and add_background_music accordingly. | |
def generate_script(user_input, api_key, model): | |
"""Generate documentary script with proper OpenRouter handling.""" | |
if not api_key or api_key.startswith('YOUR_OPENROUTER_API_KEY'): | |
print("OpenRouter API key not set or is placeholder. Skipping script generation.") | |
return "[Error] API key not configured." | |
headers = { | |
'Authorization': f'Bearer {api_key}', | |
'HTTP-Referer': 'https://your-domain.com', # Replace with your app's domain | |
'X-Title': 'AI Documentary Maker' | |
} | |
# Updated prompt based on user's specific requirements for tone and style | |
prompt = f"""Short Documentary Script Generator | |
Instructions: | |
If I say "use this," output the script exactly as given. | |
If I only give topics, generate a script based on them. | |
If I provide a full script, rewrite it without any changes. | |
Make everything short, simple, humorous, funny, and act serious but be humorous. | |
Don't say anything off topic. | |
Always end with a funny call to subscribe based on the video topic. | |
Use normal conversational text, avoid AI phrases, humanize the statements. | |
Always focus on negative or cynical views, avoid positive statements. | |
The full script should be for a single video about one topic. | |
Formatting Rules: | |
Title in Square Brackets: Each section starts with a one-word title inside [ ] (max two words if necessary). This title will be used as a search term for Pexels footage. | |
Casual & Funny Narration: Each section has 5-15 words of narration. Keep it natural, funny, and unpredictable. | |
No Special Formatting: No bold, italics, or special characters. | |
Generalized Search Terms: If a term is too specific, make it more general for Pexels search. | |
Scene-Specific Writing: Each section describes only what should be shown in the video. | |
Output Only the Script, make it funny/humorous/hilarious, and add a funny subscribe statement. | |
No extra text, just the script. | |
Example Output: | |
[Cats] | |
They plot world domination while napping. | |
[Dogs] | |
Loyalty is just a bribe for snacks. | |
[Humans] | |
The only species that pays to live on a planet they destroy. | |
[Future] | |
It looks suspiciously like the present, but with more screens. | |
[Warning] | |
Subscribe or a cat will steal your bandwidth. | |
Now here is the Topic/script: {user_input} | |
""" | |
data = { | |
'model': model, | |
'messages': [{'role': 'user', 'content': prompt}], | |
'temperature': 0.7, # Increased temperature slightly for more unpredictable humor | |
'max_tokens': 500 # Limit token response to keep scripts short | |
} | |
try: | |
response = requests.post( | |
'https://openrouter.ai/api/v1/chat/completions', | |
headers=headers, | |
json=data, | |
timeout=45 # Increased timeout | |
) | |
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) | |
response_data = response.json() | |
if 'choices' in response_data and len(response_data['choices']) > 0: | |
script_text = response_data['choices'][0]['message']['content'] | |
# Basic post-processing to remove potential markdown code blocks | |
if script_text.startswith("```") and script_text.endswith("```"): | |
# Find the first and last ``` lines | |
first_code_block = script_text.find("```") | |
last_code_block = script_text.rfind("```") | |
if first_code_block != -1 and last_code_block != -1 and first_code_block < last_code_block: | |
# Extract content between the markers, removing the language specifier line if present | |
content_start = script_text.find('\n', first_code_block) + 1 | |
content_end = last_code_block | |
script_text = script_text[content_start:content_end].strip() | |
else: # Simple case, remove from start and end | |
script_text = script_text.strip("` \n") | |
return script_text | |
else: | |
print("Unexpected response format:", response_data) | |
return "[Error] Unexpected API response format." | |
except requests.exceptions.RequestException as e: | |
print(f"API Request failed: {str(e)}") | |
return f"[Error] API request failed: {str(e)}" | |
except Exception as e: | |
print(f"An unexpected error occurred during script generation: {e}") | |
return f"[Error] An unexpected error occurred: {str(e)}" | |
def parse_script(script_text): | |
""" | |
Parse the generated script into a list of segment dictionaries. | |
Each dictionary includes original prompt, narration text, estimated duration, and placeholder for uploaded media. | |
Handles potential API errors returned as strings. | |
""" | |
if script_text.startswith("[Error]"): | |
print(f"Skipping parse due to script generation error: {script_text}") | |
return [] | |
segments = [] | |
current_title = None | |
current_text = "" | |
try: | |
lines = script_text.strip().splitlines() | |
if not lines: | |
print("Script text is empty.") | |
return [] | |
for line in lines: | |
line = line.strip() | |
if line.startswith("[") and "]" in line: | |
bracket_start = line.find("[") | |
bracket_end = line.find("]", bracket_start) | |
if bracket_start != -1 and bracket_end != -1: | |
# Add previous segment if title and text are found | |
if current_title is not None and current_text.strip(): | |
# Estimate duration based on word count (adjust factor as needed) | |
duration = max(2.0, len(current_text.split()) * 0.4) # Minimum 2s, approx 0.4s per word | |
segments.append({ | |
"original_prompt": current_title.strip(), | |
"text": current_text.strip(), | |
"duration": duration, | |
"uploaded_media": None # Placeholder for user uploaded file path | |
}) | |
current_title = line[bracket_start+1:bracket_end].strip() | |
current_text = line[bracket_end+1:].strip() | |
elif current_title: # Append text if no new title found but currently parsing a segment | |
current_text += line + " " | |
elif current_title: # Append text to the current segment | |
current_text += line + " " | |
# Ignore lines before the first [Title] | |
# Add the last segment | |
if current_title is not None and current_text.strip(): | |
duration = max(2.0, len(current_text.split()) * 0.4) | |
segments.append({ | |
"original_prompt": current_title.strip(), | |
"text": current_text.strip(), | |
"duration": duration, | |
"uploaded_media": None | |
}) | |
# Limit segments to MAX_SEGMENTS_FOR_EDITING | |
if len(segments) > MAX_SEGMENTS_FOR_EDITING: | |
print(f"Warning: Script generated {len(segments)} segments, limiting to {MAX_SEGMENTS_FOR_EDITING} for editing.") | |
segments = segments[:MAX_SEGMENTS_FOR_EDITING] | |
print(f"Parsed {len(segments)} segments.") | |
return segments | |
except Exception as e: | |
print(f"Error parsing script: {e}") | |
return [] | |
# Pexels and Google Image search and download functions remain unchanged | |
# Using the global PEXELS_API_KEY directly now. | |
def search_pexels_videos(query): | |
"""Search for a video on Pexels by query and return a random HD video.""" | |
if not PEXELS_API_KEY or PEXELS_API_KEY.startswith('YOUR_PEXELS_API_KEY'): | |
print("Pexels API key not set or is placeholder. Skipping video search.") | |
return None | |
headers = {'Authorization': PEXELS_API_KEY} | |
base_url = "https://api.pexels.com/videos/search" | |
num_pages = 3 | |
videos_per_page = 15 | |
max_retries = 2 # Reduced retries for faster failure | |
retry_delay = 1 | |
search_query = query | |
all_videos = [] | |
for page in range(1, num_pages + 1): | |
for attempt in range(max_retries): | |
try: | |
params = {"query": search_query, "per_page": videos_per_page, "page": page} | |
response = requests.get(base_url, headers=headers, params=params, timeout=10) | |
if response.status_code == 200: | |
data = response.json() | |
videos = data.get("videos", []) | |
# Filter for HD videos first, then fallback to other qualities | |
hd_videos_on_page = [] | |
other_videos_on_page = [] | |
for video in videos: | |
video_files = video.get("video_files", []) | |
for file in video_files: | |
if file.get("quality") == "hd": | |
hd_videos_on_page.append(file.get("link")) | |
break # Found HD, move to next video file for this video entry | |
# Collect other qualities just in case no HD is found on this page or in total | |
other_videos_on_page.append(file.get("link")) | |
all_videos.extend(hd_videos_on_page) # Add HD videos found | |
if not hd_videos_on_page: # If no HD found on this page, add other videos | |
all_videos.extend(other_videos_on_page) | |
if not videos: | |
print(f"No videos found on page {page} for query '{query}'.") | |
break # No videos on this page or subsequent ones | |
break # Success for this page attempt | |
elif response.status_code == 429: | |
print(f"Pexels rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay}s for query '{query}'...") | |
time.sleep(retry_delay) | |
retry_delay *= 2 | |
else: | |
print(f"Pexels video search error {response.status_code}: {response.text} for query '{query}'") | |
break # Non-recoverable error or too many retries | |
except requests.exceptions.RequestException as e: | |
print(f"Pexels video request exception (attempt {attempt+1}/{max_retries}) for query '{query}': {e}") | |
if attempt < max_retries - 1: | |
time.sleep(retry_delay) | |
retry_delay *= 2 | |
else: | |
break # Too many retries | |
# Stop searching if no videos were found on the last page check | |
if not videos and page > 1: | |
print(f"Stopping Pexels video search for '{query}' as no videos were found on page {page}.") | |
break | |
if all_videos: | |
# Prioritize picking an HD video if any were collected | |
hd_options = [link for link in all_videos if 'hd' in link.lower()] # Simple check, might not be perfect | |
if hd_options: | |
random_video = random.choice(hd_options) | |
print(f"Selected random HD video from {len(hd_options)} options for query '{query}'.") | |
else: | |
# If no HD options, pick from the entire list (which includes SD and potentially others) | |
random_video = random.choice(all_videos) | |
print(f"Selected random video (likely SD or other quality) from {len(all_videos)} options for query '{query}' (no HD found).") | |
return random_video | |
else: | |
print(f"No suitable videos found after searching all pages for query '{query}'.") | |
return None | |
def search_pexels_images(query): | |
"""Search for an image on Pexels by query.""" | |
if not PEXELS_API_KEY or PEXELS_API_KEY.startswith('YOUR_PEXELS_API_KEY'): | |
print("Pexels API key not set or is placeholder. Skipping image search.") | |
return None | |
headers = {'Authorization': PEXELS_API_KEY} | |
url = "https://api.pexels.com/v1/search" | |
params = {"query": query, "per_page": 15, "orientation": "landscape"} # Increased per_page | |
max_retries = 2 | |
retry_delay = 1 | |
for attempt in range(max_retries): | |
try: | |
response = requests.get(url, headers=headers, params=params, timeout=10) | |
if response.status_code == 200: | |
data = response.json() | |
photos = data.get("photos", []) | |
if photos: | |
# Choose from the top results | |
photo = random.choice(photos[:min(10, len(photos))]) | |
img_url = photo.get("src", {}).get("original") | |
print(f"Found {len(photos)} images on Pexels for query '{query}', selected one.") | |
return img_url | |
else: | |
print(f"No images found for query: {query} on Pexels.") | |
return None | |
elif response.status_code == 429: | |
print(f"Pexels rate limit hit (attempt {attempt+1}/{max_retries}). Retrying in {retry_delay}s for query '{query}'...") | |
time.sleep(retry_delay) | |
retry_delay *= 2 | |
else: | |
print(f"Pexels image search error {response.status_code}: {response.text} for query '{query}'") | |
break # Non-recoverable error or too many retries | |
except requests.exceptions.RequestException as e: | |
print(f"Pexels image request exception (attempt {attempt+1}/{max_retries}) for query '{query}': {e}") | |
if attempt < max_retries - 1: | |
time.sleep(retry_delay) | |
retry_delay *= 2 | |
else: | |
break # Too many retries | |
print(f"No Pexels images found for query: {query} after all attempts.") | |
return None | |
def search_google_images(query): | |
"""Search for images on Google Images (fallback/news)""" | |
try: | |
# Using a simple text search method; dedicated Google Image Search APIs are better but may require setup. | |
# This is prone to breaking if Google changes its HTML structure. | |
search_url = f"https://www.google.com/search?q={quote(query)}&tbm=isch" | |
headers = {"User-Agent": USER_AGENT} | |
print(f"Searching Google Images for: {query}") | |
response = requests.get(search_url, headers=headers, timeout=15) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.text, "html.parser") | |
# Find img tags, look for src attributes | |
# This is a very fragile parsing method, might need adjustment | |
img_tags = soup.find_all("img") | |
image_urls = [] | |
# Look for src attributes that start with http and aren't data URIs or specific gstatic patterns | |
# This is a heuristic and might grab incorrect URLs | |
for img in img_tags: | |
src = img.get("src", "") | |
if src.startswith("http") and "encrypted" not in src and "base64" not in src: # Basic filtering | |
image_urls.append(src) | |
elif img.get("data-src", "").startswith("http"): # Some sites use data-src | |
image_urls.append(img.get("data-src", "")) | |
# Filter out potential tiny icons or invalid URLs | |
valid_image_urls = [url for url in image_urls if url and "gstatic" not in url and url.split('.')[-1].lower() in ['jpg', 'jpeg', 'png', 'gif', 'bmp']] | |
if valid_image_urls: | |
print(f"Found {len(valid_image_urls)} potential Google Images for query '{query}', picking one.") | |
return random.choice(valid_image_urls[:min(10, len(valid_image_urls))]) | |
else: | |
print(f"No valid Google Images found for query: {query}") | |
return None | |
except Exception as e: | |
print(f"Error in Google Images search for query '{query}': {e}") | |
return None | |
def download_image(image_url, filename): | |
"""Download an image from a URL to a local file with enhanced error handling.""" | |
if not image_url: | |
print("No image URL provided for download.") | |
return None | |
try: | |
headers = {"User-Agent": USER_AGENT} | |
print(f"Attempting to download image from: {image_url}") | |
response = requests.get(image_url, headers=headers, stream=True, timeout=20) # Increased timeout | |
response.raise_for_status() | |
# Check content type before saving | |
content_type = response.headers.get('Content-Type', '') | |
if not content_type.startswith('image/'): | |
print(f"URL did not return an image Content-Type ({content_type}). Skipping download.") | |
return None | |
# Ensure the directory exists | |
os.makedirs(os.path.dirname(filename), exist_ok=True) | |
with open(filename, 'wb') as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
f.write(chunk) | |
# print(f"Potential image downloaded to: {filename}") # Keep less noisy | |
# Validate and process the image | |
try: | |
img = Image.open(filename) | |
img.verify() # Verify it's an image file | |
img = Image.open(filename) # Re-open after verify | |
if img.mode != 'RGB': | |
# print("Converting image to RGB") # Keep less noisy | |
img = img.convert('RGB') | |
img.save(filename) | |
# print(f"Image validated and converted to RGB: {filename}") # Keep less noisy | |
return filename | |
except Exception as e_validate: | |
print(f"Downloaded file is not a valid image or processing failed for {filename}: {e_validate}") | |
if os.path.exists(filename): | |
os.remove(filename) # Clean up invalid file | |
return None | |
except requests.exceptions.RequestException as e_download: | |
print(f"Image download error for {image_url}: {e_download}") | |
if os.path.exists(filename): | |
os.remove(filename) # Clean up partially downloaded file | |
return None | |
except Exception as e_general: | |
print(f"General error during image download/processing for {filename}: {e_general}") | |
if os.path.exists(filename): | |
os.remove(filename) # Clean up if needed | |
return None | |
def download_video(video_url, filename): | |
"""Download a video from a URL to a local file.""" | |
if not video_url: | |
print("No video URL provided for download.") | |
return None | |
try: | |
headers = {"User-Agent": USER_AGENT} # Some sites block direct downloads | |
print(f"Attempting to download video from: {video_url}") | |
response = requests.get(video_url, stream=True, timeout=45) # Increased timeout for videos | |
response.raise_for_status() | |
# Check content type | |
content_type = response.headers.get('Content-Type', '') | |
if not content_type.startswith('video/'): | |
print(f"URL did not return a video Content-Type ({content_type}). Skipping download.") | |
return None | |
os.makedirs(os.path.dirname(filename), exist_ok=True) | |
# Use smaller chunk size for potentially large files | |
chunk_size = 4096 | |
downloaded_size = 0 | |
total_size = int(response.headers.get('content-length', 0)) | |
with open(filename, 'wb') as f: | |
for chunk in response.iter_content(chunk_size=chunk_size): | |
f.write(chunk) | |
downloaded_size += len(chunk) | |
# Optional: Add progress updates if needed, but noisy for console | |
print(f"Video downloaded successfully to: {filename} ({downloaded_size} bytes)") | |
# Basic check if the file seems valid (not just 0 bytes) | |
if os.path.exists(filename) and os.path.getsize(filename) > 1024: # Check for > 1KB | |
return filename | |
else: | |
print(f"Downloaded video file {filename} is too small or empty ({os.path.getsize(filename)} bytes).") | |
if os.path.exists(filename): | |
os.remove(filename) | |
return None | |
except requests.exceptions.RequestException as e: | |
print(f"Video download error for {video_url}: {e}") | |
if os.path.exists(filename): | |
os.remove(filename) | |
return None | |
except Exception as e_general: | |
print(f"General error during video download for {filename}: {e_general}") | |
if os.path.exists(filename): | |
os.remove(filename) | |
return None | |
def generate_media_asset(prompt, uploaded_media_path): | |
""" | |
Generate a visual asset (video or image). Prioritizes user upload, | |
then searches Pexels video, then Pexels image, then Google Image. | |
Returns a dict: {'path': <file_path>, 'asset_type': 'video' or 'image'}. | |
Ensures the returned path is within the TEMP_FOLDER. | |
""" | |
safe_prompt = re.sub(r'[^\w\s-]', '', prompt).strip().replace(' ', '_') | |
os.makedirs(TEMP_FOLDER, exist_ok=True) # Ensure temp folder exists | |
# 1. Use user uploaded media if provided | |
if uploaded_media_path and os.path.exists(uploaded_media_path): | |
print(f"Using user uploaded media: {uploaded_media_path}") | |
file_ext = os.path.splitext(uploaded_media_path)[1].lower() | |
asset_type = 'video' if file_ext in ['.mp4', '.mov', '.avi', '.webm', '.mkv'] else 'image' | |
# Copy the user file to temp folder to manage cleanup | |
temp_user_path = os.path.join(TEMP_FOLDER, f"user_upload_{os.path.basename(uploaded_media_path)}") | |
try: | |
# Use copy2 to preserve metadata like modification time | |
shutil.copy2(uploaded_media_path, temp_user_path) | |
print(f"Copied user upload to temp: {temp_user_path}") | |
return {"path": temp_user_path, "asset_type": asset_type} | |
# Handle case where source and destination might be the same (e.g., user uploads from temp) | |
except shutil.SameFileError: | |
print(f"User upload is already in temp folder: {uploaded_media_path}") | |
return {"path": uploaded_media_path, "asset_type": asset_type} | |
except Exception as e: | |
print(f"Error copying user file {uploaded_media_path}: {e}. Falling back to search.") | |
# 2. Search Pexels Videos (Increased chance) | |
# Let's slightly increase video search preference when available | |
if random.random() < 0.4: # Increase video search chance | |
video_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}_video.mp4") | |
print(f"Attempting Pexels video search for: '{prompt}'") | |
video_url = search_pexels_videos(prompt) # Use global API key | |
if video_url: | |
downloaded_video = download_video(video_url, video_file) | |
if downloaded_video: | |
print(f"Pexels video asset saved to {downloaded_video}") | |
return {"path": downloaded_video, "asset_type": "video"} | |
else: | |
print(f"Pexels video search failed or found no video for: '{prompt}'") | |
# 3. Search Pexels Images | |
image_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}.jpg") | |
print(f"Attempting Pexels image search for: '{prompt}'") | |
image_url = search_pexels_images(prompt) # Use global API key | |
if image_url: | |
downloaded_image = download_image(image_url, image_file) | |
if downloaded_image: | |
print(f"Pexels image asset saved to {downloaded_image}") | |
return {"path": downloaded_image, "asset_type": "image"} | |
else: | |
print(f"Pexels image search failed or found no image for: '{prompt}'") | |
# 4. Fallback: Search Google Images (especially useful for news/specific things Pexels might not have) | |
print(f"Attempting Google Images fallback for: '{prompt}'") | |
google_image_file = os.path.join(TEMP_FOLDER, f"{safe_prompt}_google.jpg") | |
google_image_url = search_google_images(prompt) | |
if google_image_url: | |
downloaded_google_image = download_image(google_image_url, google_image_file) | |
if downloaded_google_image: | |
print(f"Google Image asset saved to {downloaded_google_image}") | |
return {"path": downloaded_google_image, "asset_type": "image"} | |
else: | |
print(f"Google Images fallback failed for: '{prompt}'") | |
# 5. Final Fallback: Generic Images if specific search failed | |
fallback_terms = ["nature", "city", "abstract", "background"] # More generic fallbacks | |
for term in fallback_terms: | |
print(f"Trying generic fallback image search with term: '{term}'") | |
fallback_file = os.path.join(TEMP_FOLDER, f"fallback_{term}.jpg") | |
fallback_url = search_pexels_images(term) # Use Pexels for fallbacks, global API key | |
if fallback_url: | |
downloaded_fallback = download_image(fallback_url, fallback_file) | |
if downloaded_fallback: | |
print(f"Generic fallback image saved to {downloaded_fallback}") | |
return {"path": downloaded_fallback, "asset_type": "image"} | |
else: | |
print(f"Generic fallback image download failed for term: '{term}'") | |
else: | |
print(f"Generic fallback image search failed for term: '{term}'") | |
print(f"Failed to generate any visual asset for prompt: '{prompt}' after all attempts.") | |
return None | |
def generate_silent_audio(duration, sample_rate=24000): | |
"""Generate a silent WAV audio file lasting 'duration' seconds.""" | |
print(f"Generating {duration:.2f}s of silent audio.") | |
num_samples = int(duration * sample_rate) | |
silence = np.zeros(num_samples, dtype=np.float32) | |
# Use unique filename to avoid conflicts | |
silent_path = os.path.join(TEMP_FOLDER, f"silent_{abs(hash(duration)) % (10**8)}_{int(time.time())}.wav") | |
try: | |
sf.write(silent_path, silence, sample_rate) | |
print(f"Silent audio generated: {silent_path}") | |
return silent_path | |
except Exception as e: | |
print(f"Error generating silent audio to {silent_path}: {e}") | |
return None | |
def generate_tts(text, voice='en'): | |
""" | |
Generate TTS audio using Kokoro, falling back to gTTS or silent audio if needed. | |
Ensures temp folder exists. | |
""" | |
if not text or not text.strip(): | |
print("TTS text is empty. Generating silent audio.") | |
return generate_silent_audio(duration=2.0) # Default silence for empty text | |
os.makedirs(TEMP_FOLDER, exist_ok=True) # Ensure temp folder exists | |
safe_text_hash = str(abs(hash(text)) % (10**10)) # Use a hash for potentially long text | |
file_path = os.path.join(TEMP_FOLDER, f"tts_{safe_text_hash}.wav") | |
if os.path.exists(file_path): | |
# print(f"Using cached TTS for text hash '{safe_text_hash}'") # Keep less noisy | |
return file_path | |
# Estimate duration based on word count (adjust factor as needed), used if TTS fails | |
target_duration_fallback = max(2.0, len(text.split()) * 0.4) | |
if pipeline: | |
try: | |
print(f"Attempting Kokoro TTS for text: '{text[:50]}...'") | |
kokoro_voice = 'af_heart' if voice == 'en' else voice # Kokoro default American English voice | |
# Kokoro pipeline might return multiple segments for long text | |
generator = pipeline(text, voice=kokoro_voice, speed=1.0, split_pattern=r'\n+') # Use speed 1.0 | |
audio_segments = [] | |
total_duration = 0 | |
for i, (gs, ps, audio) in enumerate(generator): | |
audio_segments.append(audio) | |
total_duration += len(audio) / 24000.0 # Assuming 24000 Hz sample rate | |
if audio_segments: | |
full_audio = np.concatenate(audio_segments) if len(audio_segments) > 1 else audio_segments[0] | |
sf.write(file_path, full_audio, 24000) # Use 24000Hz standard | |
# print(f"TTS audio saved to {file_path} (Kokoro, {total_duration:.2f}s)") # Keep less noisy | |
return file_path | |
else: | |
print("Kokoro pipeline returned no audio segments.") | |
except Exception as e: | |
print(f"Error with Kokoro TTS: {e}") | |
# Continue to gTTS fallback | |
try: | |
print(f"Falling back to gTTS for text: '{text[:50]}...'") | |
tts = gTTS(text=text, lang='en', slow=False) # Use standard speed | |
mp3_path = os.path.join(TEMP_FOLDER, f"tts_{safe_text_hash}.mp3") | |
tts.save(mp3_path) | |
audio = AudioSegment.from_mp3(mp3_path) | |
audio.export(file_path, format="wav") | |
if os.path.exists(mp3_path): | |
os.remove(mp3_path) # Clean up intermediate mp3 | |
# print(f"Fallback TTS saved to {file_path} (gTTS, {audio.duration_seconds:.2f}s)") # Keep less noisy | |
return file_path | |
except Exception as fallback_error: | |
print(f"Both TTS methods failed for text: '{text[:50]}...'. Error: {fallback_error}") | |
# Use the estimated duration for silent audio | |
print(f"Generating silent audio of estimated duration {target_duration_fallback:.2f}s.") | |
return generate_silent_audio(duration=target_duration_fallback) | |
def apply_kenburns_effect(clip, target_resolution, effect_type=None): | |
"""Apply a smooth Ken Burns effect with a single movement pattern.""" | |
target_w, target_h = target_resolution | |
clip_aspect = clip.w / clip.h | |
target_aspect = target_w / target_h | |
# Resize clip to fill target resolution while maintaining aspect ratio, then scale up | |
# This ensures the image covers the whole frame even after scaling and panning | |
if clip_aspect > target_aspect: | |
# Wider than target: match height, scale width | |
clip = clip.resize(height=target_h) | |
else: | |
# Taller than target: match width, scale height | |
clip = clip.resize(width=target_w) | |
# Now scale the resized clip up for the Ken Burns movement margin | |
initial_w, initial_h = clip.size | |
scale_factor = 1.15 # Scale up by 15% | |
new_width = int(initial_w * scale_factor) | |
new_height = int(initial_h * scale_factor) | |
clip = clip.resize(newsize=(new_width, new_height)) | |
max_offset_x = new_width - target_w | |
max_offset_y = new_height - target_h | |
available_effects = ["zoom-in", "zoom-out", "pan-left", "pan-right", "pan-up", "pan-down", "up-left", "down-right"] | |
if effect_type is None or effect_type == "random": | |
effect_type = random.choice(available_effects) | |
# Define start and end positions of the top-left corner of the target_resolution window | |
start_x, start_y = 0, 0 | |
end_x, end_y = 0, 0 | |
start_zoom_factor = 1.0 # Relative to the scaled image size | |
end_zoom_factor = 1.0 | |
# Set start/end positions based on effect type. Positions are top-left corner of the target frame within the scaled image. | |
if effect_type == "zoom-in": | |
start_zoom_factor = 1.0 # Starts covering the entire scaled image | |
end_zoom_factor = scale_factor # Zooms to cover the original image size within the scaled frame | |
# Stay centered | |
start_x = max_offset_x / 2 # Top-left of the original image center | |
start_y = max_offset_y / 2 | |
end_x = max_offset_x / 2 | |
end_y = max_offset_y / 2 | |
# Note: The zoom factor here is relative to the FINAL frame size during the effect, | |
# which is `target_resolution`. A zoom factor of 1 means crop size is `target_resolution`. | |
# A zoom factor of `scale_factor` means crop size is `target_resolution / scale_factor`. | |
# Let's redefine zoom factors to be relative to target_resolution for clarity | |
start_zoom_relative = 1.0 # Start at target size | |
end_zoom_relative = scale_factor # End zoomed in by scale factor | |
def get_crop_size(zoom_relative): | |
return int(target_w / zoom_relative), int(target_h / zoom_relative) | |
# Adjust start/end positions to match the changing crop size to keep the center aligned | |
def get_current_center(t): | |
progress = t / clip.duration if clip.duration > 0 else 0 | |
eased_progress = 0.5 - 0.5 * math.cos(math.pi * progress) | |
current_zoom_relative = start_zoom_relative + (end_zoom_relative - start_zoom_relative) * eased_progress | |
current_crop_w, current_crop_h = get_crop_size(current_zoom_relative) | |
# Center position in the scaled image coordinates | |
center_x = new_width / 2 | |
center_y = new_height / 2 | |
return center_x, center_y, current_crop_w, current_crop_h | |
def transform_frame_zoom(get_frame, t): | |
frame = get_frame(t) | |
center_x, center_y, crop_w, crop_h = get_current_center(t) | |
# Ensure center stays within bounds | |
center_x = max(crop_w / 2, min(center_x, new_width - crop_w / 2)) | |
center_y = max(crop_h / 2, min(center_y, new_height - crop_h / 2)) | |
cropped_frame = cv2.getRectSubPix(frame, (crop_w, crop_h), (center_x, center_y)) | |
resized_frame = cv2.resize(cropped_frame, (target_w, target_h), interpolation=cv2.INTER_LANCZOS4) | |
return resized_frame | |
return clip.fl(transform_frame_zoom) | |
elif effect_type == "zoom-out": | |
start_zoom_relative = scale_factor # Start zoomed in | |
end_zoom_relative = 1.0 # End at target size | |
def get_crop_size(zoom_relative): | |
return int(target_w / zoom_relative), int(target_h / zoom_relative) | |
def get_current_center(t): | |
progress = t / clip.duration if clip.duration > 0 else 0 | |
eased_progress = 0.5 - 0.5 * math.cos(math.pi * progress) | |
current_zoom_relative = start_zoom_relative + (end_zoom_relative - start_zoom_relative) * eased_progress | |
current_crop_w, current_crop_h = get_crop_size(current_zoom_relative) | |
center_x = new_width / 2 | |
center_y = new_height / 2 | |
return center_x, center_y, current_crop_w, current_crop_h | |
def transform_frame_zoom(get_frame, t): | |
frame = get_frame(t) | |
center_x, center_y, crop_w, crop_h = get_current_center(t) | |
center_x = max(crop_w / 2, min(center_x, new_width - crop_w / 2)) | |
center_y = max(crop_h / 2, min(center_y, new_height - crop_h / 2)) | |
cropped_frame = cv2.getRectSubPix(frame, (crop_w, crop_h), (center_x, center_y)) | |
resized_frame = cv2.resize(cropped_frame, (target_w, target_h), interpolation=cv2.INTER_LANCZOS4) | |
return resized_frame | |
return clip.fl(transform_frame_zoom) | |
# For pan effects, the crop size is constant (target_resolution) | |
# We just interpolate the top-left corner position | |
crop_w, crop_h = target_w, target_h | |
if effect_type == "pan-left": | |
start_x = max_offset_x | |
start_y = max_offset_y / 2 | |
end_x = 0 | |
end_y = max_offset_y / 2 | |
elif effect_type == "pan-right": | |
start_x = 0 | |
start_y = max_offset_y / 2 | |
end_x = max_offset_x | |
end_y = max_offset_y / 2 | |
elif effect_type == "pan-up": | |
start_x = max_offset_x / 2 | |
start_y = max_offset_y | |
end_x = max_offset_x / 2 | |
end_y = 0 | |
elif effect_type == "pan-down": | |
start_x = max_offset_x / 2 | |
start_y = 0 | |
end_x = max_offset_x / 2 | |
end_y = max_offset_y | |
elif effect_type == "up-left": | |
start_x = max_offset_x | |
start_y = max_offset_y | |
end_x = 0 | |
end_y = 0 | |
elif effect_type == "down-right": | |
start_x = 0 | |
start_y = 0 | |
end_x = max_offset_x | |
end_y = max_offset_y | |
else: | |
# Default to pan-right if type is random but somehow invalid (shouldn't happen with random.choice) | |
effect_type = 'pan-right' | |
start_x = 0 | |
start_y = max_offset_y / 2 | |
end_x = max_offset_x | |
end_y = max_offset_y / 2 | |
print(f"Warning: Unexpected effect type '{effect_type}'. Defaulting to 'pan-right'.") | |
def transform_frame_pan(get_frame, t): | |
frame = get_frame(t) | |
# Use a smooth ease-in/ease-out function | |
progress = t / clip.duration if clip.duration > 0 else 0 | |
eased_progress = 0.5 - 0.5 * math.cos(math.pi * progress) # Cosine easing | |
# Interpolate position (top-left corner of the target frame) | |
current_x = start_x + (end_x - start_x) * eased_progress | |
current_y = start_y + (end_y - start_y) * eased_progress | |
# Calculate the center point for cv2.getRectSubPix | |
center_x = current_x + crop_w / 2 | |
center_y = current_y + crop_h / 2 | |
# Ensure center stays within the bounds of the scaled image | |
center_x = max(crop_w / 2, min(center_x, new_width - crop_w / 2)) | |
center_y = max(crop_h / 2, min(center_y, new_height - crop_h / 2)) | |
try: | |
# Perform the crop using cv2.getRectSubPix (expects floating point center) | |
# Ensure frame is a numpy array (moviepy returns numpy arrays) | |
# Clamp coordinates to avoid errors on edges | |
# Note: cv2.getRectSubPix handles bounds clipping internally, but explicit checks can prevent NaNs | |
center_x = np.clip(center_x, 0, new_width) | |
center_y = np.clip(center_y, 0, new_height) | |
cropped_frame = cv2.getRectSubPix(frame, (crop_w, crop_h), (center_x, center_y)) | |
# Resize the cropped frame back to the target resolution (should already be target_resolution size) | |
# This resize is actually redundant if crop_w, crop_h == target_w, target_h | |
# but might be needed if bounds clipping changed effective size slightly? | |
# Let's remove the resize if crop size == target size for efficiency | |
# if (crop_w, crop_h) == (target_w, target_h): | |
# resized_frame = cropped_frame # No need to resize | |
# else: | |
resized_frame = cv2.resize(cropped_frame, (target_w, target_h), interpolation=cv2.INTER_LANCZOS4) | |
return resized_frame | |
except Exception as e: | |
print(f"Error applying Ken Burns transform at t={t:.2f}s: {e}") | |
# Return a black frame or placeholder in case of error | |
return np.zeros((target_h, target_w, 3), dtype=np.uint8) | |
# Apply the panning transform | |
return clip.fl(transform_frame_pan) | |
def resize_to_fill(clip, target_resolution): | |
"""Resize and crop a clip to fill the target resolution while maintaining aspect ratio.""" | |
target_w, target_h = target_resolution | |
clip_aspect = clip.w / clip.h | |
target_aspect = target_w / target_h | |
# print(f"Resizing clip {clip.size} to fill target {target_resolution}") | |
if clip_aspect > target_aspect: # Clip is wider than target | |
clip = clip.resize(height=target_h) | |
# Calculate crop amount to make width match target_w | |
crop_amount_x = max(0, (clip.w - target_w) / 2) | |
# Ensure crop coordinates are integers | |
x1 = int(crop_amount_x) | |
x2 = int(clip.w - crop_amount_x) | |
clip = clip.crop(x1=x1, x2=x2, y1=0, y2=clip.h) | |
else: # Clip is taller than target or same aspect | |
clip = clip.resize(width=target_w) | |
# Calculate crop amount to make height match target_h | |
crop_amount_y = max(0, (clip.h - target_h) / 2) | |
# Ensure crop coordinates are integers | |
y1 = int(crop_amount_y) | |
y2 = int(clip.h - crop_amount_y) | |
clip = clip.crop(x1=0, x2=clip.w, y1=y1, y2=y2) | |
# Final check and resize if dimensions are slightly off due to rounding | |
if clip.size != target_resolution: | |
print(f"Warning: Clip size {clip.size} after resize_to_fill does not match target {target_resolution}. Resizing again.") | |
clip = clip.resize(newsize=target_resolution) | |
# print(f"Clip resized to {clip.size}") | |
return clip | |
def find_mp3_files(): | |
"""Search for any MP3 files in the current directory and subdirectories.""" | |
mp3_files = [] | |
# Check relative paths first | |
for root, dirs, files in os.walk('.'): | |
for file in files: | |
if file.lower().endswith('.mp3'): | |
mp3_path = os.path.join(root, file) | |
mp3_files.append(mp3_path) | |
print(f"Found MP3 file: {mp3_path}") | |
if mp3_files: | |
return mp3_files[0] # Return the first one found | |
else: | |
# print("No MP3 files found in the current directory or subdirectories.") # Keep less noisy | |
return None | |
def add_background_music(final_video, bg_music_path, bg_music_volume=0.08): | |
"""Add background music to the final video.""" | |
if not bg_music_path or not os.path.exists(bg_music_path): | |
print("No valid background music path provided or file not found. Skipping background music.") | |
return final_video | |
try: | |
print(f"Adding background music from: {bg_music_path}") | |
bg_music = AudioFileClip(bg_music_path) | |
# Loop background music if shorter than video | |
if bg_music.duration < final_video.duration: | |
loops_needed = math.ceil(final_video.duration / bg_music.duration) | |
bg_segments = [bg_music.copy() for _ in range(loops_needed)] # Use copy to avoid issues | |
bg_music = concatenate_audioclips(bg_segments) | |
# print(f"Looped background music to {bg_music.duration:.2f}s") # Keep less noisy | |
# Subclip background music to match video duration | |
bg_music = bg_music.subclip(0, final_video.duration) | |
# print(f"Subclipped background music to {bg_music.duration:.2f}s") # Keep less noisy | |
# Adjust volume | |
bg_music = bg_music.volumex(bg_music_volume) | |
# print(f"Set background music volume to {bg_music_volume}") # Keep less noisy | |
# Composite audio | |
video_audio = final_video.audio | |
if video_audio: | |
# Ensure video audio matches video duration before compositing | |
if abs(video_audio.duration - final_video.duration) > 0.1: | |
print(f"Adjusting video audio duration ({video_audio.duration:.2f}s) to match video duration ({final_video.duration:.2f}s)") | |
video_audio = video_audio.fx(vfx.speedx, factor=video_audio.duration / final_video.duration) | |
mixed_audio = CompositeAudioClip([video_audio, bg_music]) | |
# print("Composited video audio and background music") # Keep less noisy | |
else: | |
# Handle case where video might not have audio track initially | |
mixed_audio = bg_music | |
print("Warning: Video had no original audio track, only adding background music.") | |
final_video = final_video.set_audio(mixed_audio) | |
print("Background music added successfully.") | |
return final_video | |
except Exception as e: | |
print(f"Error adding background music: {e}") | |
print("Continuing without background music.") | |
return final_video | |
def create_clip(media_asset, tts_path, estimated_duration, target_resolution, | |
caption_enabled, caption_color, caption_size, caption_position, | |
caption_bg_color, caption_stroke_color, caption_stroke_width, | |
narration_text, segment_index): | |
"""Create a video clip with synchronized subtitles and narration.""" | |
try: | |
print(f"Creating clip #{segment_index} from asset: {media_asset.get('path')}, type: {media_asset.get('asset_type')}") | |
media_path = media_asset.get('path') | |
asset_type = media_asset.get('asset_type') | |
# Determine actual audio duration | |
audio_clip = None | |
audio_duration = estimated_duration # Default to estimated duration | |
target_clip_duration = estimated_duration # Default target duration | |
if tts_path and os.path.exists(tts_path): | |
try: | |
audio_clip = AudioFileClip(tts_path).audio_fadeout(0.2) # Fade out TTS slightly | |
audio_duration = audio_clip.duration | |
# Ensure clip duration is slightly longer than audio for transitions/padding | |
target_clip_duration = audio_duration + 0.3 # Add a small buffer after TTS ends | |
print(f"TTS audio duration: {audio_duration:.2f}s. Target clip duration: {target_clip_duration:.2f}s (estimated {estimated_duration:.2f}s)") | |
except Exception as e: | |
print(f"Error loading TTS audio clip {tts_path}: {e}. Using estimated duration {estimated_duration:.2f}s for clip.") | |
audio_clip = None # Ensure audio_clip is None if loading fails | |
target_clip_duration = estimated_duration # Fallback to estimated duration | |
# Handle missing media first | |
if not media_path or not os.path.exists(media_path): | |
print(f"Skipping clip {segment_index}: Missing media file {media_path}") | |
# Create a black clip with silent audio for the target duration | |
clip = ColorClip(size=target_resolution, color=(0,0,0), duration=target_clip_duration) | |
print(f"Created placeholder black clip for segment {segment_index}") | |
# Add placeholder text if captions are enabled | |
if caption_enabled and narration_text and caption_color.lower() != "transparent" and narration_text.strip(): | |
txt_clip = TextClip( | |
"[Missing Media]\n" + narration_text, # Indicate missing media | |
fontsize=caption_size, | |
font='Arial-Bold', # Ensure this font is available | |
color=caption_color, | |
bg_color=caption_bg_color, | |
method='caption', | |
align='center', | |
stroke_width=caption_stroke_width, | |
stroke_color=caption_stroke_color, | |
size=(target_resolution[0] * 0.9, None) | |
).set_position('center').set_duration(target_clip_duration) # Duration matches black clip | |
clip = CompositeVideoClip([clip, txt_clip]) | |
# Add silent audio to the placeholder clip | |
silent_audio_path = generate_silent_audio(target_clip_duration) | |
if silent_audio_path and os.path.exists(silent_audio_path): | |
try: | |
silent_audio_clip = AudioFileClip(silent_audio_path) | |
# Ensure silent audio duration matches video clip duration | |
if abs(silent_audio_clip.duration - clip.duration) > 0.1: | |
silent_audio_clip = silent_audio_clip.fx(vfx.speedx, factor=silent_audio_clip.duration / clip.duration) | |
clip = clip.set_audio(silent_audio_clip) | |
except Exception as e: | |
print(f"Error adding silent audio to placeholder clip {segment_index}: {e}") | |
clip = clip.set_audio(None) # Set audio to None if silent audio fails | |
else: | |
clip = clip.set_audio(None) # Set audio to None if silent audio generation fails | |
return clip # Return the placeholder clip | |
# Process media if path is valid | |
if asset_type == "video": | |
try: | |
clip = VideoFileClip(media_path) | |
print(f"Loaded video clip from {media_path} with duration {clip.duration:.2f}s") | |
clip = resize_to_fill(clip, target_resolution) | |
if clip.duration < target_clip_duration: | |
print("Looping video clip") | |
# Loop the video to match the target duration | |
clip = clip.loop(duration=target_clip_duration) | |
else: | |
# Subclip the video to match the target duration | |
clip = clip.subclip(0, target_clip_duration) | |
clip = clip.fadein(0.2).fadeout(0.2) # Add simple transitions | |
print(f"Video clip processed to duration {clip.duration:.2f}s") | |
except Exception as e: | |
print(f"Error processing video clip {media_path} for segment {segment_index}: {e}") | |
# Fallback to a black clip if video processing fails | |
print(f"Creating placeholder black clip instead for segment {segment_index}") | |
clip = ColorClip(size=target_resolution, color=(0,0,0), duration=target_clip_duration) | |
if caption_enabled and narration_text and caption_color.lower() != "transparent" and narration_text.strip(): | |
txt_clip = TextClip( | |
"[Video Error]\n" + narration_text, # Indicate video error | |
fontsize=caption_size, color=caption_color, bg_color=caption_bg_color, method='caption', align='center', | |
stroke_width=caption_stroke_width, stroke_color=caption_stroke_color, | |
size=(target_resolution[0] * 0.9, None) | |
).set_position('center').set_duration(target_clip_duration) | |
clip = CompositeVideoClip([clip, txt_clip]) | |
elif asset_type == "image": | |
try: | |
img = Image.open(media_path) | |
# Ensure image is in RGB format before passing to ImageClip | |
if img.mode != 'RGB': | |
print("Converting image to RGB") | |
img = img.convert('RGB') | |
# ImageClip accepts numpy arrays | |
img_array = np.array(img) | |
img.close() # Close the PIL image | |
clip = ImageClip(img_array).set_duration(target_clip_duration) | |
else: | |
img.close() # Close the PIL image | |
clip = ImageClip(media_path).set_duration(target_clip_duration) | |
# print(f"Loaded image clip from {media_path} with duration {clip.duration:.2f}s") # Keep less noisy | |
clip = apply_kenburns_effect(clip, target_resolution) # Ken Burns with random effect | |
clip = clip.fadein(0.3).fadeout(0.3) # Add simple transitions | |
# print(f"Image clip processed to duration {clip.duration:.2f}s with Ken Burns") # Keep less noisy | |
except Exception as e: | |
print(f"Error processing image clip {media_path} for segment {segment_index}: {e}") | |
# Fallback to a black clip if image processing fails | |
print(f"Creating placeholder black clip instead for segment {segment_index}") | |
clip = ColorClip(size=target_resolution, color=(0,0,0), duration=target_clip_duration) | |
if caption_enabled and narration_text and caption_color.lower() != "transparent" and narration_text.strip(): | |
txt_clip = TextClip( | |
"[Image Error]\n" + narration_text, # Indicate image error | |
fontsize=caption_size, color=caption_color, bg_color=caption_bg_color, method='caption', align='center', | |
stroke_width=caption_stroke_width, stroke_color=caption_stroke_color, | |
size=(target_resolution[0] * 0.9, None) | |
).set_position('center').set_duration(target_clip_duration) | |
clip = CompositeVideoClip([clip, txt_clip]) | |
else: | |
print(f"Unknown asset type '{asset_type}' for segment {segment_index}. Creating placeholder.") | |
# Create a placeholder black clip | |
clip = ColorClip(size=target_resolution, color=(0,0,0), duration=target_clip_duration) | |
if caption_enabled and narration_text and caption_color.lower() != "transparent" and narration_text.strip(): | |
txt_clip = TextClip( | |
"[Unknown Media Type Error]\n" + narration_text, # Indicate unknown type error | |
fontsize=caption_size, color=caption_color, bg_color=caption_bg_color, method='caption', align='center', | |
stroke_width=caption_stroke_width, stroke_color=caption_stroke_color, | |
size=(target_resolution[0] * 0.9, None) | |
).set_position('center').set_duration(target_clip_duration) | |
clip = CompositeVideoClip([clip, txt_clip]) | |
# Set the audio for the clip | |
if audio_clip: | |
# Ensure audio clip duration matches video clip duration after processing | |
if abs(audio_clip.duration - clip.duration) > 0.1: # Allow slight difference (e.g., 100ms) | |
print(f"Adjusting audio duration ({audio_clip.duration:.2f}s) to match video duration ({clip.duration:.2f}s) for segment {segment_index}") | |
try: | |
audio_clip = audio_clip.fx(vfx.speedx, factor=audio_clip.duration / clip.duration) | |
except Exception as e: | |
print(f"Error adjusting audio speed for segment {segment_index}: {e}. Using original audio duration.") | |
# If speeding fails, maybe just loop or subclip the audio? Or regenerate silent audio. | |
# For now, if speedx fails, let's just attach the original audio and hope for the best timing wise. | |
pass # Keep the original audio_clip if speedx fails | |
clip = clip.set_audio(audio_clip) | |
else: | |
# If TTS failed or audio loading failed, ensure video clip has no audio or silent audio | |
print(f"No valid audio for clip {segment_index}. Setting silent audio.") | |
silent_audio_path = generate_silent_audio(clip.duration) # Generate silent audio matching the clip's final duration | |
if silent_audio_path and os.path.exists(silent_audio_path): | |
try: | |
silent_audio_clip = AudioFileClip(silent_audio_path) | |
# Should match duration, but double check | |
if abs(silent_audio_clip.duration - clip.duration) > 0.1: | |
silent_audio_clip = silent_audio_clip.fx(vfx.speedx, factor=silent_audio_clip.duration / clip.duration) | |
clip = clip.set_audio(silent_audio_clip) | |
except Exception as e: | |
print(f"Error setting silent audio for segment {segment_index}: {e}") | |
clip = clip.set_audio(None) # Set audio to None if silent audio fails loading | |
else: | |
clip = clip.set_audio(None) # Set audio to None if silent audio generation fails | |
# Add subtitles if enabled and text exists | |
if caption_enabled and narration_text and caption_color.lower() != "transparent" and narration_text.strip(): | |
try: | |
# Determine total audio duration (using actual if available, else estimated) | |
actual_audio_duration_for_subtitles = audio_duration if audio_clip else target_clip_duration | |
# Simple word-based chunking for subtitles | |
words = narration_text.split() | |
# Calculate average word duration based on total audio duration and word count | |
# This is a simple approach; for better sync, use a forced aligner (more complex) | |
total_words = len(words) | |
average_word_duration = actual_audio_duration_for_subtitles / total_words if total_words > 0 else 0.5 # Default if no words | |
subtitle_clips = [] | |
current_time = 0 | |
chunk_size = 6 # Words per caption chunk (adjust as needed for readability) | |
for i in range(0, total_words, chunk_size): | |
chunk_words = words[i:i+chunk_size] | |
chunk_text = ' '.join(chunk_words) | |
# Estimate chunk duration based on word count * average word duration | |
estimated_chunk_duration = len(chunk_words) * average_word_duration | |
start_time = current_time | |
# Ensure end time doesn't exceed the *clip* duration | |
end_time = min(current_time + estimated_chunk_duration, clip.duration) | |
if start_time >= end_time: break # Avoid 0 or negative duration clips | |
# Determine vertical position | |
if caption_position == "Top": | |
subtitle_y_position = int(target_resolution[1] * 0.05) # Slightly lower than top edge | |
elif caption_position == "Middle": | |
subtitle_y_position = int(target_resolution[1] * 0.5) - int(caption_size * 1.2 / 2) # Center adjusted for text height | |
else: # Default to Bottom | |
subtitle_y_position = int(target_resolution[1] * 0.9) - int(caption_size * 1.2) # Slightly higher than bottom edge, accounting for multiple lines | |
txt_clip = TextClip( | |
chunk_text, | |
fontsize=caption_size, | |
font='Arial-Bold', # Ensure this font is available or use a common system font | |
color=caption_color, | |
bg_color=caption_bg_color, # Use background color | |
method='caption', # Enables text wrapping | |
align='center', | |
stroke_width=caption_stroke_width, # Use stroke | |
stroke_color=caption_stroke_color, # Use stroke color | |
size=(target_resolution[0] * 0.9, None) # Caption width max 90% of video width | |
).set_start(start_time).set_end(end_time) | |
# Position is tuple ('center', y_position) | |
txt_clip = txt_clip.set_position(('center', subtitle_y_position)) | |
subtitle_clips.append(txt_clip) | |
current_time = end_time # Move to the end of the current chunk | |
if subtitle_clips: | |
clip = CompositeVideoClip([clip] + subtitle_clips) | |
# print(f"Added {len(subtitle_clips)} subtitle chunks to clip {segment_index}.") # Keep less noisy | |
# else: | |
# print(f"No subtitle clips generated for segment {segment_index} (might be due to text/duration issues).") # Keep less noisy | |
except Exception as sub_error: | |
print(f"Error adding subtitles for segment {segment_index}: {sub_error}") | |
# Fallback to a single centered text overlay if detailed subtitling fails | |
try: | |
txt_clip = TextClip( | |
narration_text, | |
fontsize=caption_size, | |
font='Arial-Bold', | |
color=caption_color, | |
bg_color=caption_bg_color, | |
method='caption', | |
align='center', | |
stroke_width=caption_stroke_width, | |
stroke_color=caption_stroke_color, | |
size=(target_resolution[0] * 0.8, None) | |
).set_position(('center', int(target_resolution[1] * 0.75))).set_duration(clip.duration) | |
clip = CompositeVideoClip([clip, txt_clip]) | |
print(f"Added simple fallback subtitle for segment {segment_index}.") | |
except Exception as fallback_sub_error: | |
print(f"Simple fallback subtitle failed for segment {segment_index}: {fallback_sub_error}") | |
# Ensure final clip duration is explicitly set | |
clip = clip.set_duration(clip.duration) | |
# print(f"Clip {segment_index} created successfully: {clip.duration:.2f}s") # Keep less noisy | |
return clip | |
except Exception as e: | |
print(f"Critical error in create_clip for segment {segment_index}: {str(e)}") | |
# Create a black clip with error message if anything goes wrong during the main process | |
error_duration = target_clip_duration if 'target_clip_duration' in locals() else (estimated_duration if estimated_duration else 3.0) | |
print(f"Creating error placeholder black clip for segment {segment_index} with duration {error_duration:.2f}s.") | |
black_clip = ColorClip(size=target_resolution, color=(0,0,0), duration=error_duration) | |
error_text = f"Error in segment {segment_index}" | |
if narration_text: error_text += f":\n{narration_text[:50]}..." | |
error_txt_clip = TextClip( | |
error_text, | |
fontsize=30, | |
color="red", | |
align='center', | |
size=(target_resolution[0] * 0.9, None) | |
).set_position('center').set_duration(error_duration) | |
clip = CompositeVideoClip([black_clip, error_txt_clip]) | |
silent_audio_path = generate_silent_audio(error_duration) | |
if silent_audio_path and os.path.exists(silent_audio_path): | |
try: | |
clip = clip.set_audio(AudioFileClip(silent_audio_path)) | |
except Exception as audio_e: | |
print(f"Error setting silent audio for error clip {segment_index}: {audio_e}") | |
clip = clip.set_audio(None) | |
else: | |
clip = clip.set_audio(None) | |
return clip | |
def fix_imagemagick_policy(): | |
"""Attempt to fix ImageMagick security policies required by TextClip.""" | |
print("Attempting to fix ImageMagick security policies...") | |
policy_paths = [ | |
"/etc/ImageMagick-6/policy.xml", | |
"/etc/ImageMagick-7/policy.xml", | |
"/etc/ImageMagick/policy.xml", # Common symlink path | |
"/usr/local/etc/ImageMagick-7/policy.xml", # macports/homebrew path | |
"/usr/share/ImageMagick/policy.xml", # Another common path | |
"/usr/share/ImageMagick-6/policy.xml", | |
"/usr/share/ImageMagick-7/policy.xml", | |
os.path.join(os.environ.get('MAGICK_HOME', ''), 'policy.xml') if os.environ.get('MAGICK_HOME') else '', # Check MAGICK_HOME | |
# Add more paths if needed based on typical installations | |
] | |
# Filter out empty paths | |
policy_paths = [path for path in policy_paths if path and os.path.exists(path)] | |
found_policy = None | |
if policy_paths: | |
found_policy = policy_paths[0] # Use the first one found | |
if not found_policy: | |
print("No policy.xml found in common locations. TextClip may fail.") | |
print("Consider installing ImageMagick and checking its installation path and policy.xml location.") | |
return False | |
print(f"Attempting to modify policy file at {found_policy}") | |
try: | |
# Create a backup - use a unique name | |
backup_path = f"{found_policy}.bak_aivgen_{int(time.time())}" | |
if os.path.exists(found_policy): | |
shutil.copy2(found_policy, backup_path) | |
print(f"Created backup at {backup_path}") | |
else: | |
print(f"Warning: Policy file {found_policy} not found at copy stage, cannot create backup.") | |
# Read the original policy file (handle potential permission issues) | |
try: | |
with open(found_policy, 'r') as f: | |
policy_content = f.read() | |
except Exception as e: | |
print(f"Error reading policy file {found_policy}: {e}. Attempting with sudo cat...") | |
try: | |
# Use sudo cat to read if direct read fails | |
process = subprocess.Popen(['sudo', 'cat', found_policy], stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
stdout, stderr = process.communicate() | |
if process.returncode == 0: | |
policy_content = stdout.decode('utf-8') | |
print("Read policy file content using sudo.") | |
else: | |
print(f"Failed to read policy file using sudo cat. Error: {stderr.decode('utf-8')}") | |
print("Manual intervention may be required.") | |
return False | |
except Exception as e_sudo_read: | |
print(f"Error executing sudo cat: {e_sudo_read}") | |
print("Manual intervention may be required.") | |
return False | |
# Use regex to find and replace the specific policy lines | |
# Allow read and write rights for PDF, EPS, PS, etc. potentially restricted formats | |
# Also ensure path policies allow reading/writing files | |
# Be more specific with replacements to avoid unintended side effects | |
modified_content = re.sub( | |
r'<policy domain="coder" rights="none" pattern="(PDF|EPS|PS|XPS|MSL|SVG|FILTER)"', # Added common restricted patterns | |
r'<policy domain="coder" rights="read|write" pattern="\1"', # Changed rights to read|write | |
policy_content | |
) | |
# Ensure path rights are read/write, especially important for temporary files | |
modified_content = re.sub( | |
r'<policy domain="path" pattern="@\*" rights="none"', | |
r'<policy domain="path" pattern="@*" rights="read|write"', # Ensure path rights are read|write | |
modified_content | |
) | |
# Catch any other "rights=none" for coder or path domains, but be cautious | |
modified_content = re.sub( | |
r'<policy domain="(coder|path)" rights="none"(.*?)/>', | |
r'<policy domain="\1" rights="read|write"\2/>', | |
modified_content | |
) | |
# Write the modified content back (handle potential permission issues) | |
try: | |
with open(found_policy, 'w') as f: | |
f.write(modified_content) | |
print("ImageMagick policies updated successfully (direct write).") | |
return True | |
except IOError as e: | |
print(f"Direct write failed: {e}. Attempting with sudo tee...") | |
# Fallback to using os.system with sudo tee if direct write fails | |
# This requires the user to be able to run sudo commands without a password prompt for the script's execution | |
# and tee needs to be available. | |
# Using tee is safer than sudo cp for writing potentially large content. | |
try: | |
# Write modified content to a temporary file first | |
temp_policy_file = os.path.join(TEMP_FOLDER, "temp_policy_modified.xml") | |
with open(temp_policy_file, 'w') as f: | |
f.write(modified_content) | |
# Use sudo tee to overwrite the original file | |
# echo <content> | sudo tee <file> > /dev/null | |
cmd = f'sudo tee {found_policy} > /dev/null' | |
print(f"Executing: echo ... | {cmd}") | |
# Using subprocess is safer than os.system for piping | |
process = subprocess.Popen(['sudo', 'tee', found_policy], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
stdout, stderr = process.communicate(input=modified_content.encode('utf-8')) | |
if process.returncode == 0: | |
print("ImageMagick policies updated successfully using sudo tee.") | |
return True | |
else: | |
print(f"Failed to update ImageMagick policies using sudo tee. Result code: {process.returncode}. Error: {stderr.decode('utf-8')}") | |
print("Please manually edit your policy.xml to grant read/write rights for coder and path domains.") | |
print("Example: Change <policy domain='coder' rights='none' pattern='PDF'> to <policy domain='coder' rights='read|write' pattern='PDF'>") | |
return False | |
except Exception as e_sudo_write: | |
print(f"Error executing sudo tee process: {e_sudo_write}") | |
print("Manual intervention may be required.") | |
return False | |
finally: | |
# Clean up the temporary file | |
if 'temp_policy_file' in locals() and os.path.exists(temp_policy_file): | |
os.remove(temp_policy_file) | |
except Exception as e_general: | |
print(f"General error during ImageMagick policy modification: {e_general}") | |
print("Manual intervention may be required.") | |
return False | |
# Import subprocess for sudo commands in fix_imagemagick_policy | |
import subprocess | |
# ---------------- Gradio Interface Functions ---------------- # | |
def generate_script_and_show_editor(user_input, resolution_choice, | |
caption_enabled_choice, caption_color, | |
caption_size, caption_position, caption_bg_color, | |
caption_stroke_color, caption_stroke_width): | |
""" | |
Generates the script, parses it, stores segments in state, | |
and prepares the UI updates to show the editing interface. | |
Uses yield to update status. | |
""" | |
global TEMP_FOLDER | |
# Clean up previous run's temp folder if it exists | |
if TEMP_FOLDER and os.path.exists(TEMP_FOLDER): | |
print(f"Cleaning up previous temp folder: {TEMP_FOLDER}") | |
try: | |
# Use onerror to log errors during cleanup | |
def onerror(func, path, exc_info): | |
print(f"Error cleaning up {path}: {exc_info[1]}") | |
shutil.rmtree(TEMP_FOLDER, onerror=onerror) | |
except Exception as e: | |
print(f"Error starting cleanup of temp folder {TEMP_FOLDER}: {e}") | |
# Create a new unique temporary folder for this run | |
TEMP_FOLDER = tempfile.mkdtemp(prefix="aivgen_") | |
print(f"Created new temp folder: {TEMP_FOLDER}") | |
# Store global style choices in state or use them directly (let's store in state) | |
# Gradio State can hold a single object. Let's use a dict. | |
run_config = { | |
"resolution": (1920, 1080) if resolution_choice == "Full (1920x1080)" else (1080, 1920), | |
"caption_enabled": caption_enabled_choice == "Yes", | |
"caption_color": caption_color, | |
"caption_size": caption_size, | |
"caption_position": caption_position, | |
"caption_bg_color": caption_bg_color, | |
"caption_stroke_color": caption_stroke_color, | |
"caption_stroke_width": caption_stroke_width, | |
"temp_folder": TEMP_FOLDER # Store temp folder path | |
} | |
# Initial status update and hide editing/video areas | |
yield (run_config, | |
gr.update(value="Generating script...", visible=True), | |
gr.update(visible=False), # Hide editing area | |
gr.update(value=None, visible=False), # Hide video output and clear value | |
# Updates for dynamic components (initially hide/clear all) | |
[gr.update(visible=False, value="") for _ in range(MAX_SEGMENTS_FOR_EDITING)], # Hide textboxes | |
[gr.update(visible=False, value=None) for _ in range(MAX_SEGMENTS_FOR_EDITING)], # Hide file uploads | |
[gr.update(visible=False) for _ in range(MAX_SEGMENTS_FOR_EDITING)], # Hide segment groups | |
[]) # Clear segments_state | |
script_text = generate_script(user_input, OPENROUTER_API_KEY, OPENROUTER_MODEL) | |
# Update raw script preview | |
raw_script_preview = f"### Generated Script Preview\n\n```\n{script_text}\n```" if script_text else "### Generated Script Preview\n\nFailed to generate script." | |
if not script_text or script_text.startswith("[Error]"): | |
# Update status and keep editing/video areas hidden | |
yield (run_config, | |
gr.update(value=f"Script generation failed: {script_text}", visible=True), | |
gr.update(visible=False), | |
gr.update(value=None, visible=False), | |
# Updates for dynamic components (all hidden) | |
[gr.update(visible=False, value="") for _ in range(MAX_SEGMENTS_FOR_EDITING)], | |
[gr.update(visible=False, value=None) for _ in range(MAX_SEGMENTS_FOR_EDITING)], | |
[gr.update(visible=False) for _ in range(MAX_SEGMENTS_FOR_EDITING)], | |
[], # segments_state remains empty | |
raw_script_preview) # Update raw script preview | |
return # Stop execution | |
yield (run_config, | |
gr.update(value="Parsing script...", visible=True), | |
gr.update(visible=False), | |
gr.update(value=None, visible=False), | |
[gr.update(visible=False, value="") for _ in range(MAX_SEGMENTS_FOR_EDITING)], | |
[gr.update(visible=False, value=None) for _ in range(MAX_SEGMENTS_FOR_EDITING)], | |
[gr.update(visible=False) for _ in range(MAX_SEGMENTS_FOR_EDITING)], | |
[], # segments_state will be updated next | |
raw_script_preview) | |
segments = parse_script(script_text) | |
if not segments: | |
yield (run_config, | |
gr.update(value="Failed to parse script or script is empty after parsing.", visible=True), | |
gr.update(visible=False), | |
gr.update(value=None, visible=False), | |
# Updates for dynamic components (all hidden) | |
[gr.update(visible=False, value="") for _ in range(MAX_SEGMENTS_FOR_EDITING)], | |
[gr.update(visible=False, value=None) for _ in range(MAX_SEGMENTS_FOR_EDITING)], | |
[gr.update(visible=False) for _ in range(MAX_SEGMENTS_FOR_EDITING)], | |
[], # segments_state remains empty | |
raw_script_preview) # Update raw script preview | |
return # Stop execution | |
# Prepare updates for dynamic editing components | |
textbox_updates = [] | |
file_updates = [] | |
group_visibility_updates = [] | |
for i in range(MAX_SEGMENTS_FOR_EDITING): | |
if i < len(segments): | |
# Show group, populate text, clear file upload | |
textbox_updates.append(gr.update(value=segments[i]['text'], visible=True)) | |
file_updates.append(gr.update(value=None, visible=True)) # Clear previous uploads | |
group_visibility_updates.append(gr.update(visible=True)) | |
else: | |
# Hide unused groups and clear their values | |
textbox_updates.append(gr.update(value="", visible=False)) | |
file_updates.append(gr.update(value=None, visible=False)) | |
group_visibility_updates.append(gr.update(visible=False)) | |
# Final yield to update UI: show editing area, populate fields, update state | |
yield (run_config, | |
gr.update(value=f"Script generated with {len(segments)} segments. Edit segments below.", visible=True), | |
gr.update(visible=True), # Show Editing area | |
gr.update(value=None, visible=False), # Ensure video output is hidden and cleared | |
textbox_updates, # Update textboxes (visibility and value) | |
file_updates, # Update file uploads (visibility and value) | |
group_visibility_updates, # Update visibility of groups | |
segments, # Update the state with parsed segments | |
raw_script_preview) # Update raw script preview | |
def generate_video_from_edited(run_config, segments_data, segment_texts, segment_uploads, bg_music_volume): | |
""" | |
Takes the edited segment data (text, uploaded files) and configuration, | |
and generates the final video. | |
Uses yield to update status. | |
""" | |
if not segments_data: | |
yield "No segments to process. Generate script first.", None | |
return | |
global TEMP_FOLDER | |
# Ensure TEMP_FOLDER is correctly set from run_config | |
TEMP_FOLDER = run_config.get("temp_folder") | |
if not TEMP_FOLDER or not os.path.exists(TEMP_FOLDER): | |
yield "Error: Temporary folder not found from run config. Please regenerate script.", None | |
# Attempt cleanup just in case temp folder existed but was invalid | |
if TEMP_FOLDER and os.path.exists(TEMP_FOLDER): | |
try: | |
shutil.rmtree(TEMP_FOLDER) | |
except Exception as e: | |
print(f"Error cleaning up invalid temp folder {TEMP_FOLDER}: {e}") | |
TEMP_FOLDER = None # Reset global | |
return | |
# Extract config from run_config | |
TARGET_RESOLUTION = run_config.get("resolution", (1920, 1080)) # Default if missing | |
CAPTION_ENABLED = run_config.get("caption_enabled", True) # Default if missing | |
CAPTION_COLOR = run_config.get("caption_color", "#FFFFFF") # Default if missing | |
CAPTION_SIZE = run_config.get("caption_size", 45) # Default if missing | |
CAPTION_POSITION = run_config.get("caption_position", "Bottom") # Default if missing | |
CAPTION_BG_COLOR = run_config.get("caption_bg_color", "rgba(0, 0, 0, 0.25)") # Default if missing | |
CAPTION_STROKE_COLOR = run_config.get("caption_stroke_color", "#000000") # Default if missing | |
CAPTION_STROKE_WIDTH = run_config.get("caption_stroke_width", 2) # Default if missing | |
# Update segments_data with potentially edited text and uploaded file paths | |
# segment_texts and segment_uploads are lists of values from the Gradio components | |
processed_segments = [] | |
for i, segment in enumerate(segments_data): | |
if i < len(segment_texts) and i < len(segment_uploads): # Ensure we have corresponding input values | |
processed_segment = segment.copy() # Make a copy | |
# Use edited text, strip whitespace | |
processed_segment['text'] = segment_texts[i].strip() if segment_texts[i] is not None else segment.get('text', '').strip() | |
# Use uploaded media path (will be None if nothing uploaded) | |
processed_segment['uploaded_media'] = segment_uploads[i] | |
processed_segments.append(processed_segment) | |
else: | |
# This shouldn't happen if state and UI updates are in sync, but as a safeguard | |
print(f"Warning: Missing input value(s) for segment index {i}. Using original segment data.") | |
processed_segments.append(segment) # Append original if inputs are missing | |
if not processed_segments: | |
yield "No valid segments to process after editing.", None | |
# Clean up | |
if TEMP_FOLDER and os.path.exists(TEMP_FOLDER): | |
try: | |
shutil.rmtree(TEMP_FOLDER) | |
print(f"Cleaned up temp folder: {TEMP_FOLDER}") | |
except Exception as e: | |
print(f"Error cleaning up temp folder {TEMP_FOLDER}: {e}") | |
TEMP_FOLDER = None # Reset global | |
return | |
yield "Fixing ImageMagick policy...", None | |
fix_imagemagick_policy() # Attempt policy fix before creating clips | |
clips = [] | |
yield "Generating media and audio for clips...", None | |
total_segments = len(processed_segments) | |
for idx, segment in enumerate(processed_segments): | |
yield f"Processing segment {idx+1}/{total_segments}...", None | |
print(f"\nProcessing segment {idx+1}/{total_segments} (Prompt: '{segment.get('original_prompt', 'N/A')[:30]}...')") | |
# Determine media source: uploaded or generated | |
media_asset = generate_media_asset( | |
segment.get('original_prompt', 'background'), # Use original prompt for search if available, else a generic term | |
segment.get('uploaded_media') # Pass uploaded media path | |
) | |
# Generate TTS audio | |
tts_path = generate_tts(segment.get('text', '')) # Use edited text, default to empty string if None/missing | |
# Create the video clip for this segment | |
clip = create_clip( | |
media_asset=media_asset if media_asset else {"path": None, "asset_type": None}, # Pass dummy if generate_media_asset failed | |
tts_path=tts_path, | |
estimated_duration=segment.get('duration', 3.0), # Use estimated duration as a fallback reference | |
target_resolution=TARGET_RESOLUTION, | |
caption_enabled=CAPTION_ENABLED, | |
caption_color=CAPTION_COLOR, | |
caption_size=CAPTION_SIZE, | |
caption_position=CAPTION_POSITION, | |
caption_bg_color=CAPTION_BG_COLOR, | |
caption_stroke_color=CAPTION_STROKE_COLOR, | |
caption_stroke_width=CAPTION_STROKE_WIDTH, | |
narration_text=segment.get('text', ''), # Pass narration text for captions | |
segment_index=idx+1 | |
) | |
if clip: | |
clips.append(clip) | |
else: | |
print(f"Skipping segment {idx+1} due to clip creation failure.") | |
# If create_clip returns None (shouldn't happen with fallback logic, but as safety) | |
# Add a placeholder black clip | |
placeholder_duration = segment.get('duration', 3.0) # Use estimated duration or default | |
placeholder_clip = ColorClip(size=TARGET_RESOLUTION, color=(0,0,0), duration=placeholder_duration) | |
silent_audio_path = generate_silent_audio(placeholder_duration) | |
if silent_audio_path and os.path.exists(silent_audio_path): | |
placeholder_clip = placeholder_clip.set_audio(AudioFileClip(silent_audio_path)) | |
error_text = f"Segment {idx+1} Failed" | |
if segment.get('text'): error_text += f":\n{segment['text'][:50]}..." | |
error_txt_clip = TextClip(error_text, fontsize=30, color="red", align='center', size=(TARGET_RESOLUTION[0] * 0.9, None)).set_position('center').set_duration(placeholder_duration) | |
placeholder_clip = CompositeVideoClip([placeholder_clip, error_txt_clip]) | |
clips.append(placeholder_clip) | |
if not clips: | |
yield "No clips were successfully created. Video generation failed.", None | |
# Clean up | |
if TEMP_FOLDER and os.path.exists(TEMP_FOLDER): | |
try: | |
shutil.rmtree(TEMP_FOLDER) | |
print(f"Cleaned up temp folder: {TEMP_FOLDER}") | |
except Exception as e: | |
print(f"Error cleaning up temp folder {TEMP_FOLDER}: {e}") | |
TEMP_FOLDER = None # Reset global | |
return | |
yield "Concatenating clips...", None | |
print("\nConcatenating clips...") | |
try: | |
final_video = concatenate_videoclips(clips, method="compose") | |
except Exception as e: | |
print(f"Error concatenating clips: {e}") | |
yield f"Error concatenating clips: {e}", None | |
# Clean up | |
if TEMP_FOLDER and os.path.exists(TEMP_FOLDER): | |
try: | |
shutil.rmtree(TEMP_FOLDER) | |
print(f"Cleaned up temp folder: {TEMP_FOLDER}") | |
except Exception as e: | |
print(f"Error cleaning up temp folder {TEMP_FOLDER}: {e}") | |
TEMP_FOLDER = None # Reset global | |
return | |
yield "Adding background music...", None | |
bg_music_path = find_mp3_files() # Find background music | |
final_video = add_background_music(final_video, bg_music_path, bg_music_volume=bg_music_volume) # Use volume from input | |
yield f"Exporting final video to {OUTPUT_VIDEO_FILENAME}...", None | |
print(f"Exporting final video to {OUTPUT_VIDEO_FILENAME}...") | |
output_path = None | |
try: | |
# Use a temporary output file first for safety, within TEMP_FOLDER | |
temp_output_filename = os.path.join(TEMP_FOLDER, f"temp_final_video_{int(time.time())}.mp4") | |
final_video.write_videofile(temp_output_filename, codec='libx264', fps=24, preset='veryfast') | |
# Ensure the destination directory for the final output exists (current dir) | |
os.makedirs(os.path.dirname(OUTPUT_VIDEO_FILENAME) or '.', exist_ok=True) | |
# Move the final file to the intended location after successful export | |
final_output_path = OUTPUT_VIDEO_FILENAME | |
try: | |
shutil.move(temp_output_filename, final_output_path) | |
print(f"Final video saved as {final_output_path}") | |
output_path = final_output_path | |
except Exception as e: | |
print(f"Error moving temporary file {temp_output_filename} to final destination {final_output_path}: {e}") | |
# If move fails, return the temp file path or None | |
output_path = temp_output_filename # Return temp path so user can access it | |
print(f"Returning video from temporary path: {output_path}") | |
except Exception as e: | |
print(f"Error exporting video: {e}") | |
output_path = None | |
yield f"Video export failed: {e}", None # Provide error message in status | |
# Clean up temporary folder | |
yield "Cleaning up temporary files...", output_path # Update status before cleanup | |
if TEMP_FOLDER and os.path.exists(TEMP_FOLDER): | |
try: | |
# Use onerror to log errors during cleanup | |
def onerror(func, path, exc_info): | |
print(f"Error cleaning up {path}: {exc_info[1]}") | |
shutil.rmtree(TEMP_FOLDER, onerror=onerror) | |
print(f"Cleaned up temp folder: {TEMP_FOLDER}") | |
except Exception as e: | |
print(f"Error starting cleanup of temp folder {TEMP_FOLDER}: {e}") | |
TEMP_FOLDER = None # Reset global | |
yield "Done!", output_path # Final status update | |
# ---------------- Gradio Interface Definition (Blocks) ---------------- # | |
# Need lists to hold the dynamic UI components for segments | |
segment_editing_groups = [] | |
segment_text_inputs = [] | |
segment_file_inputs = [] | |
with gr.Blocks() as demo: | |
gr.Markdown("# 🤖 AI Documentary Video Generator 🎬") | |
gr.Markdown("Enter a concept to generate a funny documentary script. You can then edit the script text and replace the suggested media for each segment before generating the final video.") | |
# --- Global Settings --- | |
with gr.Accordion("Global Settings", open=True): | |
user_concept_input = gr.Textbox(label="Video Concept", placeholder="e.g., The secret life of pigeons, Why socks disappear in the laundry, The futility of alarm clocks...") | |
with gr.Row(): | |
resolution_radio = gr.Radio(["Full (1920x1080)", "Short (1080x1920)"], label="Video Resolution", value="Full (1920x1080)") | |
bg_music_volume_slider = gr.Slider(minimum=0, maximum=0.5, value=0.08, step=0.01, label="Background Music Volume", info="Lower volume keeps narration clear.") # Adjusted max volume | |
# --- Caption Settings --- | |
with gr.Accordion("Caption Settings", open=False): | |
caption_enabled_radio = gr.Radio(["Yes", "No"], label="Show Captions?", value="Yes") | |
with gr.Row(): | |
caption_color_picker = gr.ColorPicker(label="Caption Text Color", value="#FFFFFF") # Default white | |
caption_bg_color_picker = gr.ColorPicker(label="Caption Background Color (with transparency)", value="rgba(0, 0, 0, 0.4)") # Default semi-transparent black, slightly more opaque | |
with gr.Row(): | |
caption_size_slider = gr.Slider(minimum=20, maximum=80, value=45, step=1, label="Caption Font Size") # Adjusted max size | |
caption_stroke_width_slider = gr.Slider(minimum=0, maximum=5, value=2, step=0.5, label="Caption Stroke Width") | |
with gr.Row(): | |
caption_position_radio = gr.Radio(["Top", "Middle", "Bottom"], label="Caption Position", value="Bottom") | |
caption_stroke_color_picker = gr.ColorPicker(label="Caption Stroke Color", value="#000000") # Default black stroke | |
generate_script_btn = gr.Button("Generate Script", variant="primary") | |
# --- Status and Script Output --- | |
status_output = gr.Label(label="Status", value="", visible=True) # Always visible | |
# Using Markdown to show raw script content | |
script_preview_markdown = gr.Markdown("### Generated Script Preview\n\nScript will appear here...", visible=False) # Initially hidden | |
# --- State to hold parsed segments data and run config --- | |
segments_state = gr.State([]) # List of segment dictionaries | |
run_config_state = gr.State({}) # Dictionary for run configuration | |
# --- Dynamic Editing Area (Initially hidden) --- | |
# We create MAX_SEGMENTS_FOR_EDITING groups, and show/hide them dynamically | |
with gr.Column(visible=False) as editing_area: | |
gr.Markdown("### Edit Script Segments") | |
gr.Markdown("Review the AI-generated text and media suggestions below. Edit the text and/or upload your own image/video for any segment. If no file is uploaded, AI will fetch media based on the original prompt.") | |
for i in range(MAX_SEGMENTS_FOR_EDITING): | |
# Use gr.Box for better visual grouping | |
with gr.Box(visible=False) as segment_group: # Each group represents one segment | |
segment_editing_groups.append(segment_group) | |
# Use a Label to display the original prompt - it's non-interactive text | |
segment_prompt_label = gr.Label(f"Segment {i+1} Prompt:", show_label=False) # Label will be set by JS | |
# We'll update the value of this label using JS/state change | |
segment_text = gr.Textbox(label="Narration Text", lines=2, interactive=True) | |
segment_text_inputs.append(segment_text) | |
segment_file = gr.File(label="Upload Custom Media (Image or Video)", type="filepath", interactive=True) | |
segment_file_inputs.append(segment_file) | |
generate_video_btn = gr.Button("Generate Video", variant="primary") | |
# --- Final Video Output --- | |
final_video_output = gr.Video(label="Generated Video", visible=False) # Initially hidden | |
# --- Event Handlers --- | |
# Generate Script Button Click | |
generate_script_btn.click( | |
fn=generate_script_and_show_editor, | |
inputs=[ | |
user_concept_input, | |
resolution_radio, | |
caption_enabled_radio, | |
caption_color_picker, | |
caption_size_slider, | |
caption_position_radio, | |
caption_bg_color_picker, | |
caption_stroke_color_picker, | |
caption_stroke_width_slider | |
], | |
outputs=[ | |
run_config_state, # Update run config state | |
status_output, # Update status label | |
editing_area, # Show/hide editing area column | |
final_video_output, # Hide and clear video output | |
# Outputs for dynamic components (visibility and value updates) | |
*segment_text_inputs, | |
*segment_file_inputs, | |
*segment_editing_groups, | |
segments_state, # Update segments state | |
script_preview_markdown # Update raw script preview | |
] | |
) | |
# Generate Video Button Click | |
generate_video_btn.click( | |
fn=generate_video_from_edited, | |
inputs=[ | |
run_config_state, # Pass run config | |
segments_state, # Pass the original parsed segments data (needed for original_prompt and duration) | |
*segment_text_inputs, # Pass list of edited text values | |
*segment_file_inputs, # Pass list of uploaded file paths | |
bg_music_volume_slider # Pass background music volume | |
], | |
outputs=[status_output, final_video_output] # Yield status updates and final video | |
) | |
# Add JS to update segment prompt Labels after script generation | |
# This JS function reads the segments_state and updates the Labels | |
demo.load( | |
None, | |
None, | |
None, | |
_js=f""" | |
// Define the JS function | |
function updateSegmentPromptLabels(segments_data) {{ | |
console.log("updateSegmentPromptLabels called", segments_data); | |
// Gradio stores dynamic component outputs in a flat list. | |
// The prompt labels are the first Label component in each segment group. | |
// Assuming the order is consistent: [Label_0, Textbox_0, File_0, Label_1, Textbox_1, File_1, ...] | |
// We need to find the correct Label element for each segment index. | |
// Find all elements that are potentially segment prompt labels | |
const all_segment_labels = document.querySelectorAll('.segment_group_box > label.svelte-q5b6g8'); // Find Label elements within segment boxes | |
if (!segments_data || segments_data.length === 0) {{ | |
// Clear any existing labels if script generation failed or empty | |
all_segment_labels.forEach(label => label.textContent = ''); | |
return; | |
}} | |
for (let i = 0; i < {MAX_SEGMENTS_FOR_EDITING}; i++) {{ | |
// Assuming the labels correspond directly to the group index | |
const promptLabel = all_segment_labels[i]; // Get the i-th potential label | |
if (promptLabel) {{ | |
if (i < segments_data.length) {{ | |
// Update label text with the original prompt | |
promptLabel.textContent = `Segment ${i+1} (Prompt: ${segments_data[i].original_prompt})`; | |
promptLabel.parentElement.style.display = 'block'; // Ensure parent box is visible (redundant if group visibility is set, but safe) | |
}} else {{ | |
// Hide label for unused segments | |
promptLabel.textContent = ''; | |
promptLabel.parentElement.style.display = 'none'; // Hide parent box | |
}} | |
}} else {{ | |
console.warn(`Prompt label element not found for segment index ${i}`); | |
}} | |
}} | |
}} | |
""" | |
) | |
# Trigger the JS function whenever segments_state changes | |
segments_state.change( | |
None, # No Python function to call | |
segments_state, # The state variable that changed | |
None, # No output components to update via Python | |
_js=""" | |
(segments_data) => { | |
// Call the JS function defined in demo.load | |
updateSegmentPromptLabels(segments_data); | |
// Return the segments_data itself if needed for chaining, but here it's not. | |
// This function just updates the UI client-side. | |
return arguments[0]; // Return original arguments to avoid state getting cleared | |
} | |
""" | |
) | |
# Launch the interface | |
if __name__ == "__main__": | |
# Attempt ImageMagick policy fix on script startup | |
# This helps but might still require manual sudo depending on system config | |
fix_imagemagick_policy() | |
print("Launching Gradio interface...") | |
# Check if API keys are still placeholders (unlikely with hardcoded keys, but good practice) | |
if PEXELS_API_KEY.startswith('YOUR_PEXELS_API_KEY'): | |
print("Warning: PEXELS_API_KEY is not configured. Media search may fail.") | |
if OPENROUTER_API_KEY.startswith('YOUR_OPENROUTER_API_KEY'): | |
print("Warning: OPENROUTER_API_KEY is not configured. Script generation will fail.") | |
demo.launch(share=True) # Set share=True to get a public link |