Spaces:
Build error
Build error
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
""" | |
AI Documentary Video Generator | |
Version: 2.0 | |
Author: AI Assistant | |
Description: Creates documentary-style videos from text prompts using TTS, media APIs, and video processing | |
""" | |
import os | |
import sys | |
import re | |
import time | |
import random | |
import tempfile | |
import shutil | |
import traceback | |
import math | |
from typing import Optional, Tuple, Dict, List, Union | |
from dataclasses import dataclass | |
from concurrent.futures import ThreadPoolExecutor | |
from urllib.parse import quote | |
# Third-party imports | |
import numpy as np | |
import cv2 | |
import requests | |
from PIL import Image | |
import soundfile as sf | |
import torch | |
from bs4 import BeautifulSoup | |
import gradio as gr | |
# MoviePy imports | |
from moviepy.editor import ( | |
VideoFileClip, AudioFileClip, ImageClip, | |
concatenate_videoclips, CompositeVideoClip, | |
CompositeAudioClip, TextClip | |
) | |
import moviepy.video.fx.all as vfx | |
import moviepy.config as mpy_config | |
# Custom imports | |
try: | |
from kokoro import KPipeline | |
except ImportError: | |
print("Warning: Kokoro TTS not available. TTS features will be disabled.") | |
# ====================== | |
# CONSTANTS & CONFIG | |
# ====================== | |
DEFAULT_CONFIG = { | |
'PEXELS_API_KEY': os.getenv('PEXELS_API_KEY', 'your_pexels_key_here'), | |
'OPENROUTER_API_KEY': os.getenv('OPENROUTER_API_KEY', 'your_openrouter_key_here'), | |
'OUTPUT_VIDEO': "documentary_output.mp4", | |
'USER_AGENT': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", | |
'MAX_RETRIES': 3, | |
'RETRY_DELAY': 2, | |
'MAX_VIDEO_DURATION': 600, # 10 minutes | |
'TTS_SAMPLE_RATE': 24000, | |
'DEFAULT_VOICE': 'en', | |
'CAPTION_FONT': 'Arial-Bold', | |
'CAPTION_FONT_SIZES': { | |
'1080p': 40, | |
'720p': 32, | |
'480p': 24 | |
}, | |
'BACKGROUND_MUSIC_VOLUME': 0.08, | |
'DEFAULT_FPS': 30, | |
'VIDEO_PROBABILITY': 0.45, | |
'RESOLUTIONS': { | |
'1080p': (1920, 1080), | |
'720p': (1280, 720), | |
'480p': (854, 480), | |
'vertical_1080p': (1080, 1920), | |
'vertical_720p': (720, 1280) | |
} | |
} | |
# ====================== | |
# CORE CLASSES | |
# ====================== | |
class VideoSegment: | |
media_path: str | |
tts_path: str | |
narration: str | |
duration: float | |
media_type: str # 'image' or 'video' | |
effects: Dict | |
caption_style: Dict | |
class DocumentaryGenerator: | |
def __init__(self, config: Optional[Dict] = None): | |
self.config = config or DEFAULT_CONFIG | |
self.tts_pipeline = None | |
self.temp_dir = None | |
self.current_resolution = None | |
self.caption_color = None | |
# Initialize subsystems | |
self._initialize_tts() | |
self._configure_imagemagick() | |
def _initialize_tts(self): | |
"""Initialize the TTS pipeline""" | |
try: | |
if 'KPipeline' in globals(): | |
print("Initializing Kokoro TTS pipeline...") | |
self.tts_pipeline = KPipeline(lang_code='a') | |
print("TTS pipeline ready") | |
except Exception as e: | |
print(f"Could not initialize TTS: {str(e)}") | |
self.tts_pipeline = None | |
def _configure_imagemagick(self): | |
"""Configure ImageMagick paths""" | |
try: | |
common_paths = [ | |
"/usr/bin/convert", | |
"/usr/local/bin/convert", | |
"/opt/homebrew/bin/convert" | |
] | |
for path in common_paths: | |
if os.path.exists(path): | |
mpy_config.change_settings({"IMAGEMAGICK_BINARY": path}) | |
print(f"ImageMagick configured: {path}") | |
break | |
else: | |
print("ImageMagick not found - text rendering may be limited") | |
except Exception as e: | |
print(f"ImageMagick config error: {str(e)}") | |
def _create_temp_dir(self): | |
"""Create a temporary working directory""" | |
if self.temp_dir and os.path.exists(self.temp_dir): | |
shutil.rmtree(self.temp_dir) | |
self.temp_dir = tempfile.mkdtemp(prefix="docgen_") | |
print(f"Created temp directory: {self.temp_dir}") | |
return self.temp_dir | |
def _cleanup(self): | |
"""Clean up temporary resources""" | |
if self.temp_dir and os.path.exists(self.temp_dir): | |
try: | |
shutil.rmtree(self.temp_dir) | |
print("Cleaned up temporary files") | |
except Exception as e: | |
print(f"Cleanup error: {str(e)}") | |
def generate_script(self, topic: str) -> str: | |
"""Generate a documentary script using OpenRouter API""" | |
if not self.config['OPENROUTER_API_KEY']: | |
return "Error: OpenRouter API key not configured" | |
prompt = f"""Create a funny, engaging documentary script about {topic}. | |
Format each section with [TITLE] followed by narration text. | |
Keep narration concise (1-2 sentences per section). | |
Include at least 5 sections. | |
End with a humorous call-to-action.""" | |
headers = { | |
'Authorization': f'Bearer {self.config['OPENROUTER_API_KEY']}', | |
'Content-Type': 'application/json' | |
} | |
data = { | |
"model": "mistralai/mistral-small-3.1-24b-instruct:free", | |
"messages": [{"role": "user", "content": prompt}], | |
"temperature": 0.7, | |
"max_tokens": 1024 | |
} | |
try: | |
response = requests.post( | |
'https://openrouter.ai/api/v1/chat/completions', | |
headers=headers, | |
json=data, | |
timeout=30 | |
) | |
response.raise_for_status() | |
return response.json()['choices'][0]['message']['content'] | |
except Exception as e: | |
return f"Error generating script: {str(e)}" | |
def _download_media(self, url: str, filename: str) -> Optional[str]: | |
"""Download media file from URL""" | |
local_path = os.path.join(self.temp_dir, filename) | |
for attempt in range(self.config['MAX_RETRIES']): | |
try: | |
with requests.get(url, stream=True, timeout=15) as r: | |
r.raise_for_status() | |
with open(local_path, 'wb') as f: | |
for chunk in r.iter_content(chunk_size=8192): | |
f.write(chunk) | |
# Validate downloaded file | |
if os.path.getsize(local_path) > 1024: | |
return local_path | |
except Exception as e: | |
print(f"Download attempt {attempt + 1} failed: {str(e)}") | |
time.sleep(self.config['RETRY_DELAY'] * (attempt + 1)) | |
return None | |
def _search_pexels_video(self, query: str) -> Optional[str]: | |
"""Search for videos on Pexels""" | |
if not self.config['PEXELS_API_KEY']: | |
return None | |
headers = {'Authorization': self.config['PEXELS_API_KEY']} | |
params = { | |
'query': query, | |
'per_page': 15, | |
'orientation': 'landscape' | |
} | |
try: | |
response = requests.get( | |
'https://api.pexels.com/videos/search', | |
headers=headers, | |
params=params, | |
timeout=10 | |
) | |
response.raise_for_status() | |
videos = response.json().get('videos', []) | |
if videos: | |
video_files = videos[0].get('video_files', []) | |
for file in video_files: | |
if file.get('quality') == 'hd': | |
return file.get('link') | |
except Exception as e: | |
print(f"Pexels search error: {str(e)}") | |
return None | |
def _generate_tts(self, text: str) -> Optional[str]: | |
"""Generate TTS audio for narration""" | |
if not self.tts_pipeline: | |
return None | |
safe_name = re.sub(r'[^\w\-_]', '', text[:20]) + '.wav' | |
output_path = os.path.join(self.temp_dir, safe_name) | |
try: | |
audio_segments = [] | |
for chunk in self.tts_pipeline(text, speed=1.0): | |
if isinstance(chunk, tuple): | |
chunk = chunk[-1] # Get audio data from tuple | |
audio_segments.append(chunk) | |
full_audio = np.concatenate(audio_segments) | |
sf.write(output_path, full_audio, self.config['TTS_SAMPLE_RATE']) | |
return output_path | |
except Exception as e: | |
print(f"TTS generation error: {str(e)}") | |
return None | |
def _create_video_segment(self, segment: VideoSegment) -> Optional[VideoClip]: | |
"""Create a single video segment with media, audio, and effects""" | |
try: | |
# Load media | |
if segment.media_type == 'video': | |
media_clip = VideoFileClip(segment.media_path) | |
else: | |
media_clip = ImageClip(segment.media_path).set_duration(segment.duration) | |
# Apply effects | |
if segment.media_type == 'image': | |
media_clip = self._apply_kenburns(media_clip) | |
else: | |
media_clip = self._resize_clip(media_clip) | |
# Add audio | |
audio_clip = AudioFileClip(segment.tts_path) | |
media_clip = media_clip.set_audio(audio_clip) | |
# Add captions if enabled | |
if segment.caption_style.get('enabled', False): | |
media_clip = self._add_captions(media_clip, segment.narration, segment.caption_style) | |
return media_clip | |
except Exception as e: | |
print(f"Segment creation error: {str(e)}") | |
traceback.print_exc() | |
return None | |
def _apply_kenburns(self, clip: ImageClip) -> VideoClip: | |
"""Apply Ken Burns effect to an image clip""" | |
try: | |
target_w, target_h = self.current_resolution | |
base_scale = 1.2 | |
# Choose random effect | |
effects = { | |
'zoom_in': {'start_scale': 1.0, 'end_scale': base_scale}, | |
'zoom_out': {'start_scale': base_scale, 'end_scale': 1.0}, | |
'pan_left': {'start_pos': (0.7, 0.5), 'end_pos': (0.3, 0.5)}, | |
'pan_right': {'start_pos': (0.3, 0.5), 'end_pos': (0.7, 0.5)} | |
} | |
effect = random.choice(list(effects.values())) | |
def transform(get_frame, t): | |
ratio = t / clip.duration | |
scale = effect['start_scale'] + (effect['end_scale'] - effect['start_scale']) * ratio | |
pos_x = effect['start_pos'][0] + (effect['end_pos'][0] - effect['start_pos'][0]) * ratio | |
pos_y = effect['start_pos'][1] + (effect['end_pos'][1] - effect['start_pos'][1]) * ratio | |
frame = get_frame(t) | |
h, w = frame.shape[:2] | |
crop_size = (int(w/scale), int(h/scale)) | |
center_x = int(pos_x * w) | |
center_y = int(pos_y * h) | |
x = max(0, min(center_x - crop_size[0]//2, w - crop_size[0])) | |
y = max(0, min(center_y - crop_size[1]//2, h - crop_size[1])) | |
cropped = frame[y:y+crop_size[1], x:x+crop_size[0]] | |
return cv2.resize(cropped, (target_w, target_h), interpolation=cv2.INTER_LANCZOS4) | |
return clip.fl(transform) | |
except Exception as e: | |
print(f"Ken Burns error: {str(e)}") | |
return self._resize_clip(clip) | |
def _resize_clip(self, clip: Union[VideoClip, ImageClip]) -> VideoClip: | |
"""Resize clip to target resolution""" | |
try: | |
target_w, target_h = self.current_resolution | |
clip_aspect = clip.w / clip.h | |
target_aspect = target_w / target_h | |
if abs(clip_aspect - target_aspect) < 0.01: | |
return clip.resize((target_w, target_h)) | |
if clip_aspect > target_aspect: | |
# Wider than target | |
new_height = target_h | |
new_width = int(clip.w * (new_height / clip.h)) | |
resized = clip.resize(height=new_height) | |
crop = (resized.w - target_w) / 2 | |
return resized.crop(x1=crop, y1=0, x2=crop+target_w, y2=target_h) | |
else: | |
# Taller than target | |
new_width = target_w | |
new_height = int(clip.h * (new_width / clip.w)) | |
resized = clip.resize(width=new_width) | |
crop = (resized.h - target_h) / 2 | |
return resized.crop(x1=0, y1=crop, x2=target_w, y2=crop+target_h) | |
except Exception as e: | |
print(f"Resize error: {str(e)}") | |
return clip | |
def _add_captions(self, clip: VideoClip, text: str, style: Dict) -> VideoClip: | |
"""Add captions to a video clip""" | |
try: | |
words = text.split() | |
chunks = [] | |
current_chunk = [] | |
char_count = 0 | |
# Split text into manageable chunks | |
for word in words: | |
if char_count + len(word) > 30 and current_chunk: | |
chunks.append(' '.join(current_chunk)) | |
current_chunk = [word] | |
char_count = len(word) | |
else: | |
current_chunk.append(word) | |
char_count += len(word) + 1 | |
if current_chunk: | |
chunks.append(' '.join(current_chunk)) | |
# Create text clips | |
text_clips = [] | |
duration_per_chunk = clip.duration / len(chunks) | |
for i, chunk in enumerate(chunks): | |
txt_clip = TextClip( | |
chunk, | |
fontsize=style['font_size'], | |
font=style['font'], | |
color=style['color'], | |
bg_color=style['bg_color'], | |
stroke_color=style['stroke_color'], | |
stroke_width=style['stroke_width'], | |
method='caption', | |
size=(style['max_width'], None) | |
).set_start(i * duration_per_chunk).set_duration(duration_per_chunk) | |
txt_clip = txt_clip.set_position(('center', style['y_position'])) | |
text_clips.append(txt_clip) | |
return CompositeVideoClip([clip] + text_clips) | |
except Exception as e: | |
print(f"Caption error: {str(e)}") | |
return clip | |
def generate_video(self, script: str, resolution: str, captions: bool, | |
music_path: Optional[str] = None) -> Optional[str]: | |
"""Main video generation pipeline""" | |
start_time = time.time() | |
try: | |
# Setup | |
self._create_temp_dir() | |
self.current_resolution = self.config['RESOLUTIONS'].get(resolution, (1920, 1080)) | |
self.caption_color = 'white' if captions else None | |
# Parse script into segments | |
segments = self._parse_script(script) | |
if not segments: | |
print("Error: No valid segments found in script") | |
return None | |
# Process segments in parallel | |
with ThreadPoolExecutor() as executor: | |
video_segments = list(executor.map(self._process_segment, segments)) | |
# Combine segments | |
final_clip = concatenate_videoclips( | |
[s for s in video_segments if s is not None], | |
method="compose" | |
) | |
# Add background music if provided | |
if music_path and os.path.exists(music_path): | |
music_clip = AudioFileClip(music_path).volumex(self.config['BACKGROUND_MUSIC_VOLUME']) | |
if music_clip.duration < final_clip.duration: | |
music_clip = music_clip.loop(duration=final_clip.duration) | |
final_clip = final_clip.set_audio( | |
CompositeAudioClip([final_clip.audio, music_clip]) | |
) | |
# Export final video | |
output_path = self.config['OUTPUT_VIDEO'] | |
final_clip.write_videofile( | |
output_path, | |
codec='libx264', | |
audio_codec='aac', | |
fps=self.config['DEFAULT_FPS'], | |
threads=os.cpu_count() or 4 | |
) | |
print(f"Video generated in {time.time() - start_time:.2f} seconds") | |
return output_path | |
except Exception as e: | |
print(f"Video generation failed: {str(e)}") | |
traceback.print_exc() | |
return None | |
finally: | |
self._cleanup() | |
def _parse_script(self, script: str) -> List[Dict]: | |
"""Parse script into media and narration segments""" | |
segments = [] | |
current_title = None | |
current_text = "" | |
for line in script.split('\n'): | |
line = line.strip() | |
if not line: | |
continue | |
# Check for section title | |
title_match = re.match(r'^\s*\[([^\]]+)\]\s*(.*)', line) | |
if title_match: | |
if current_title and current_text: | |
segments.append({ | |
'title': current_title, | |
'text': current_text.strip() | |
}) | |
current_title = title_match.group(1).strip() | |
current_text = title_match.group(2).strip() + " " | |
elif current_title: | |
current_text += line + " " | |
# Add the last segment | |
if current_title and current_text: | |
segments.append({ | |
'title': current_title, | |
'text': current_text.strip() | |
}) | |
return segments | |
def _process_segment(self, segment: Dict) -> Optional[VideoSegment]: | |
"""Process a single script segment into a video segment""" | |
try: | |
# Get media | |
use_video = random.random() < self.config['VIDEO_PROBABILITY'] | |
if use_video: | |
media_url = self._search_pexels_video(segment['title']) | |
media_type = 'video' | |
else: | |
media_url = self._search_pexels_image(segment['title']) | |
media_type = 'image' | |
if not media_url: | |
print(f"No media found for: {segment['title']}") | |
return None | |
# Download media | |
media_ext = '.mp4' if media_type == 'video' else '.jpg' | |
media_filename = f"media_{len(segment['title'])}_media_ext" | |
media_path = self._download_media(media_url, media_filename) | |
if not media_path: | |
print(f"Failed to download media for: {segment['title']}") | |
return None | |
# Generate TTS | |
tts_path = self._generate_tts(segment['text']) | |
if not tts_path: | |
print(f"Failed to generate TTS for: {segment['title']}") | |
return None | |
# Calculate duration based on TTS | |
tts_duration = AudioFileClip(tts_path).duration | |
duration = max(3.0, min(tts_duration * 1.1, 10.0)) # 3-10 seconds | |
# Prepare caption style | |
caption_style = { | |
'enabled': self.caption_color is not None, | |
'font_size': self._get_font_size(), | |
'font': self.config['CAPTION_FONT'], | |
'color': self.caption_color or 'white', | |
'bg_color': 'rgba(0,0,0,0.5)', | |
'stroke_color': 'black', | |
'stroke_width': 1.5, | |
'max_width': int(self.current_resolution[0] * 0.8), | |
'y_position': int(self.current_resolution[1] * 0.8) | |
} | |
return VideoSegment( | |
media_path=media_path, | |
tts_path=tts_path, | |
narration=segment['text'], | |
duration=duration, | |
media_type=media_type, | |
effects={'type': 'random'}, | |
caption_style=caption_style | |
) | |
except Exception as e: | |
print(f"Segment processing error: {str(e)}") | |
return None | |
def _get_font_size(self) -> int: | |
"""Get appropriate font size for current resolution""" | |
if self.current_resolution[1] >= 1080: | |
return self.config['CAPTION_FONT_SIZES']['1080p'] | |
elif self.current_resolution[1] >= 720: | |
return self.config['CAPTION_FONT_SIZES']['720p'] | |
else: | |
return self.config['CAPTION_FONT_SIZES']['480p'] | |
# ====================== | |
# GRADIO INTERFACE | |
# ====================== | |
def create_gradio_interface(): | |
"""Create the Gradio web interface""" | |
generator = DocumentaryGenerator() | |
with gr.Blocks(title="AI Documentary Maker", theme="soft") as app: | |
gr.Markdown("# AI Documentary Video Generator") | |
with gr.Row(): | |
with gr.Column(): | |
topic_input = gr.Textbox(label="Documentary Topic", placeholder="Enter your topic...") | |
generate_script_btn = gr.Button("Generate Script") | |
script_output = gr.Textbox(label="Generated Script", lines=10, interactive=True) | |
with gr.Accordion("Advanced Options", open=False): | |
resolution = gr.Dropdown( | |
list(generator.config['RESOLUTIONS'].keys()), | |
value="1080p", | |
label="Resolution" | |
) | |
captions = gr.Checkbox( | |
value=True, | |
label="Enable Captions" | |
) | |
music_input = gr.Audio( | |
label="Background Music", | |
type="filepath", | |
optional=True | |
) | |
video_prob = gr.Slider( | |
minimum=0.1, | |
maximum=1.0, | |
value=generator.config['VIDEO_PROBABILITY'], | |
label="Video Clip Probability" | |
) | |
generate_video_btn = gr.Button("Generate Video", variant="primary") | |
with gr.Column(): | |
video_output = gr.Video(label="Generated Documentary") | |
status_output = gr.Textbox(label="Status", interactive=False) | |
# Event handlers | |
generate_script_btn.click( | |
fn=generator.generate_script, | |
inputs=[topic_input], | |
outputs=[script_output] | |
) | |
generate_video_btn.click( | |
fn=generator.generate_video, | |
inputs=[script_output, resolution, captions, music_input], | |
outputs=[video_output], | |
api_name="generate" | |
) | |
return app | |
# ====================== | |
# MAIN EXECUTION | |
# ====================== | |
if __name__ == "__main__": | |
app = create_gradio_interface() | |
app.launch(server_name="0.0.0.0", server_port=7860) |