Spaces:
Build error
Build error
import os | |
import re | |
import json | |
import time | |
import random | |
import tempfile | |
import requests | |
import numpy as np | |
from PIL import Image | |
from io import BytesIO | |
from datetime import datetime | |
import gradio as gr | |
from dotenv import load_dotenv | |
import moviepy.editor as mpy | |
from moviepy.editor import * | |
from moviepy.audio.fx.all import volumex | |
from moviepy.video.fx.all import crop | |
# Load environment variables from .env file if present | |
load_dotenv() | |
# Constants | |
CACHE_DIR = os.path.join(tempfile.gettempdir(), "yt_shorts_generator") | |
ASSETS_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "assets") | |
MUSIC_DIR = os.path.join(ASSETS_DIR, "background_music") | |
FONTS_DIR = os.path.join(ASSETS_DIR, "fonts") | |
# Create necessary directories | |
os.makedirs(CACHE_DIR, exist_ok=True) | |
os.makedirs(MUSIC_DIR, exist_ok=True) | |
os.makedirs(FONTS_DIR, exist_ok=True) | |
# Helper functions for logging | |
def info(message): | |
timestamp = datetime.now().strftime("%H:%M:%S") | |
formatted_message = f"[{timestamp}] [INFO] {message}" | |
print(formatted_message) | |
return formatted_message | |
def success(message): | |
timestamp = datetime.now().strftime("%H:%M:%S") | |
formatted_message = f"[{timestamp}] [SUCCESS] {message}" | |
print(formatted_message) | |
return formatted_message | |
def warning(message): | |
timestamp = datetime.now().strftime("%H:%M:%S") | |
formatted_message = f"[{timestamp}] [WARNING] {message}" | |
print(formatted_message) | |
return formatted_message | |
def error(message): | |
timestamp = datetime.now().strftime("%H:%M:%S") | |
formatted_message = f"[{timestamp}] [ERROR] {message}" | |
print(formatted_message) | |
return formatted_message | |
def get_music_files(): | |
"""Get list of available music files in the music directory.""" | |
if not os.path.exists(MUSIC_DIR): | |
return ["none"] | |
music_files = [f for f in os.listdir(MUSIC_DIR) if f.endswith(('.mp3', '.wav'))] | |
if not music_files: | |
return ["none"] | |
return ["random"] + music_files | |
def get_font_files(): | |
"""Get list of available font files in the fonts directory.""" | |
if not os.path.exists(FONTS_DIR): | |
return ["default"] | |
font_files = [f.split('.')[0] for f in os.listdir(FONTS_DIR) if f.endswith(('.ttf', '.otf'))] | |
if not font_files: | |
return ["default"] | |
return ["default"] + font_files | |
def choose_random_music(): | |
"""Selects a random music file from the music directory.""" | |
if not os.path.exists(MUSIC_DIR): | |
error(f"Music directory {MUSIC_DIR} does not exist") | |
return None | |
music_files = [f for f in os.listdir(MUSIC_DIR) if f.endswith(('.mp3', '.wav'))] | |
if not music_files: | |
warning(f"No music files found in {MUSIC_DIR}") | |
return None | |
return os.path.join(MUSIC_DIR, random.choice(music_files)) | |
class YouTube: | |
def __init__(self, niche: str, language: str, | |
text_gen="g4f", text_model="gpt-4", | |
image_gen="g4f", image_model="flux", | |
tts_engine="edge", tts_voice="en-US-AriaNeural", | |
subtitle_font="default", font_size=80, | |
text_color="white", highlight_color="blue", | |
subtitles_enabled=True, highlighting_enabled=True, | |
subtitle_position="bottom", music_file="random", | |
api_keys=None, progress=gr.Progress()) -> None: | |
"""Initialize the YouTube Shorts Generator.""" | |
self.progress = progress | |
self.progress(0, desc="Initializing") | |
# Store basic parameters | |
info(f"Initializing YouTube class") | |
self._niche = niche | |
self._language = language | |
self.text_gen = text_gen | |
self.text_model = text_model | |
self.image_gen = image_gen | |
self.image_model = image_model | |
self.tts_engine = tts_engine | |
self.tts_voice = tts_voice | |
self.subtitle_font = subtitle_font | |
self.font_size = font_size | |
self.text_color = text_color | |
self.highlight_color = highlight_color | |
self.subtitles_enabled = subtitles_enabled | |
self.highlighting_enabled = highlighting_enabled | |
self.subtitle_position = subtitle_position | |
self.music_file = music_file | |
self.api_keys = api_keys or {} | |
self.images = [] | |
self.logs = [] | |
# Set API keys from parameters or environment variables | |
if 'gemini' in self.api_keys and self.api_keys['gemini']: | |
os.environ["GEMINI_API_KEY"] = self.api_keys['gemini'] | |
if 'assemblyai' in self.api_keys and self.api_keys['assemblyai']: | |
os.environ["ASSEMBLYAI_API_KEY"] = self.api_keys['assemblyai'] | |
if 'elevenlabs' in self.api_keys and self.api_keys['elevenlabs']: | |
os.environ["ELEVENLABS_API_KEY"] = self.api_keys['elevenlabs'] | |
if 'segmind' in self.api_keys and self.api_keys['segmind']: | |
os.environ["SEGMIND_API_KEY"] = self.api_keys['segmind'] | |
if 'openai' in self.api_keys and self.api_keys['openai']: | |
os.environ["OPENAI_API_KEY"] = self.api_keys['openai'] | |
info(f"Niche: {niche}, Language: {language}") | |
self.log(f"Initialized with niche: {niche}, language: {language}") | |
self.log(f"Text generator: {text_gen} - Model: {text_model}") | |
self.log(f"Image generator: {image_gen} - Model: {image_model}") | |
self.log(f"TTS engine: {tts_engine} - Voice: {tts_voice}") | |
self.log(f"Subtitles: {'Enabled' if subtitles_enabled else 'Disabled'} - Highlighting: {'Enabled' if highlighting_enabled else 'Disabled'}") | |
self.log(f"Music: {music_file}") | |
def log(self, message): | |
"""Add a log message to the logs list.""" | |
timestamp = datetime.now().strftime("%H:%M:%S") | |
log_entry = f"[{timestamp}] {message}" | |
self.logs.append(log_entry) | |
return log_entry | |
def niche(self) -> str: | |
return self._niche | |
def language(self) -> str: | |
return self._language | |
def generate_response(self, prompt: str, model: str = None) -> str: | |
"""Generate a response using the selected text generation model.""" | |
self.log(f"Generating response for prompt: {prompt[:50]}...") | |
try: | |
if self.text_gen == "gemini": | |
self.log("Using Google's Gemini model") | |
# Check if API key is set | |
gemini_api_key = os.environ.get("GEMINI_API_KEY", "") | |
if not gemini_api_key: | |
raise ValueError("Gemini API key is not set. Please provide a valid API key.") | |
import google.generativeai as genai | |
genai.configure(api_key=gemini_api_key) | |
model_to_use = model if model else self.text_model | |
genai_model = genai.GenerativeModel(model_to_use) | |
response = genai_model.generate_content(prompt).text | |
elif self.text_gen == "g4f": | |
self.log("Using G4F for text generation") | |
import g4f | |
model_to_use = model if model else self.text_model | |
self.log(f"Using G4F model: {model_to_use}") | |
response = g4f.ChatCompletion.create( | |
model=model_to_use, | |
messages=[{"role": "user", "content": prompt}] | |
) | |
elif self.text_gen == "openai": | |
self.log("Using OpenAI for text generation") | |
openai_api_key = os.environ.get("OPENAI_API_KEY", "") | |
if not openai_api_key: | |
raise ValueError("OpenAI API key is not set. Please provide a valid API key.") | |
from openai import OpenAI | |
client = OpenAI(api_key=openai_api_key) | |
model_to_use = model if model else "gpt-3.5-turbo" | |
response = client.chat.completions.create( | |
model=model_to_use, | |
messages=[{"role": "user", "content": prompt}] | |
).choices[0].message.content | |
else: | |
# Default to g4f if other methods aren't available | |
self.log(f"Using default G4F model as fallback") | |
import g4f | |
response = g4f.ChatCompletion.create( | |
model="gpt-3.5-turbo", | |
messages=[{"role": "user", "content": prompt}] | |
) | |
self.log(f"Response generated successfully, length: {len(response)} characters") | |
return response | |
except Exception as e: | |
error_msg = f"Error generating response: {str(e)}" | |
self.log(error_msg) | |
raise Exception(error_msg) | |
def generate_topic(self) -> str: | |
"""Generate a topic based on the YouTube Channel niche.""" | |
self.progress(0.05, desc="Generating topic") | |
self.log("Generating topic based on niche") | |
completion = self.generate_response( | |
f"Please generate a specific video idea that takes about the following topic: {self.niche}. " | |
f"Make it exactly one sentence. Only return the topic, nothing else." | |
) | |
if not completion: | |
self.log(error("Failed to generate Topic.")) | |
raise Exception("Failed to generate a topic. Please try again with a different niche.") | |
self.subject = completion | |
self.log(success(f"Generated topic: {completion}")) | |
return completion | |
def generate_script(self) -> str: | |
"""Generate a script for a video, based on the subject and language.""" | |
self.progress(0.1, desc="Creating script") | |
self.log("Generating script for video") | |
prompt = f""" | |
Generate a script for youtube shorts video, depending on the subject of the video. | |
The script is to be returned as a string with the specified number of paragraphs. | |
Here is an example of a string: | |
"This is an example string." | |
Do not under any circumstance reference this prompt in your response. | |
Get straight to the point, don't start with unnecessary things like, "welcome to this video". | |
Obviously, the script should be related to the subject of the video. | |
YOU MUST NOT INCLUDE ANY TYPE OF MARKDOWN OR FORMATTING IN THE SCRIPT, NEVER USE A TITLE. | |
YOU MUST WRITE THE SCRIPT IN THE LANGUAGE SPECIFIED IN [LANGUAGE]. | |
ONLY RETURN THE RAW CONTENT OF THE SCRIPT. DO NOT INCLUDE "VOICEOVER", "NARRATOR" OR SIMILAR INDICATORS. | |
Subject: {self.subject} | |
Language: {self.language} | |
""" | |
completion = self.generate_response(prompt) | |
# Apply regex to remove * | |
completion = re.sub(r"\*", "", completion) | |
if not completion: | |
self.log(error("The generated script is empty.")) | |
raise Exception("Failed to generate a script. Please try again.") | |
if len(completion) > 5000: | |
self.log(warning("Generated Script is too long. Retrying...")) | |
return self.generate_script() | |
self.script = completion | |
self.log(success(f"Generated script ({len(completion)} chars)")) | |
return completion | |
def generate_metadata(self) -> dict: | |
"""Generate video metadata (title, description).""" | |
self.progress(0.15, desc="Creating title and description") | |
self.log("Generating metadata (title and description)") | |
title = self.generate_response( | |
f"Please generate a YouTube Video Title for the following subject, including hashtags: " | |
f"{self.subject}. Only return the title, nothing else. Limit the title under 100 characters." | |
) | |
if len(title) > 100: | |
self.log(warning("Generated Title is too long. Retrying...")) | |
return self.generate_metadata() | |
description = self.generate_response( | |
f"Please generate a YouTube Video Description for the following script: {self.script}. " | |
f"Only return the description, nothing else." | |
) | |
self.metadata = { | |
"title": title, | |
"description": description | |
} | |
self.log(success(f"Generated title: {title}")) | |
self.log(success(f"Generated description: {description[:50]}...")) | |
return self.metadata | |
def generate_prompts(self, count=5) -> list: | |
"""Generate AI Image Prompts based on the provided Video Script.""" | |
self.progress(0.2, desc="Creating image prompts") | |
self.log(f"Generating {count} image prompts") | |
prompt = f""" | |
Generate {count} Image Prompts for AI Image Generation, | |
depending on the subject of a video. | |
Subject: {self.subject} | |
The image prompts are to be returned as | |
a JSON-Array of strings. | |
Each search term should consist of a full sentence, | |
always add the main subject of the video. | |
Be emotional and use interesting adjectives to make the | |
Image Prompt as detailed as possible. | |
YOU MUST ONLY RETURN THE JSON-ARRAY OF STRINGS. | |
YOU MUST NOT RETURN ANYTHING ELSE. | |
YOU MUST NOT RETURN THE SCRIPT. | |
The search terms must be related to the subject of the video. | |
Here is an example of a JSON-Array of strings: | |
["image prompt 1", "image prompt 2", "image prompt 3"] | |
For context, here is the full text: | |
{self.script} | |
""" | |
completion = str(self.generate_response(prompt))\ | |
.replace("```json", "") \ | |
.replace("```", "") | |
image_prompts = [] | |
if "image_prompts" in completion: | |
try: | |
image_prompts = json.loads(completion)["image_prompts"] | |
except: | |
self.log(warning("Failed to parse 'image_prompts' from JSON response.")) | |
if not image_prompts: | |
try: | |
image_prompts = json.loads(completion) | |
self.log(f"Parsed image prompts from JSON response.") | |
except Exception: | |
self.log(warning("JSON parsing failed. Attempting to extract array using regex...")) | |
# Get everything between [ and ], and turn it into a list | |
r = re.compile(r"\[.*\]", re.DOTALL) | |
matches = r.findall(completion) | |
if len(matches) == 0: | |
self.log(warning("Failed to extract array. Creating generic image prompts.")) | |
# Create generic prompts based on the subject | |
image_prompts = [ | |
f"A beautiful image showing {self.subject}, photorealistic", | |
f"A detailed visualization of {self.subject}, high quality", | |
f"An artistic representation of {self.subject}, vibrant colors", | |
f"A photorealistic image about {self.subject}, high resolution", | |
f"A dramatic scene related to {self.subject}, cinema quality" | |
] | |
else: | |
try: | |
image_prompts = json.loads(matches[0]) | |
except: | |
self.log(error("Failed to parse array from regex match.")) | |
# Use regex to extract individual strings | |
string_pattern = r'"([^"]*)"' | |
strings = re.findall(string_pattern, matches[0]) | |
if strings: | |
image_prompts = strings | |
else: | |
# Last resort - split by commas and clean up | |
image_prompts = [ | |
s.strip().strip('"').strip("'") | |
for s in matches[0].strip('[]').split(',') | |
] | |
# Ensure we have the requested number of prompts | |
while len(image_prompts) < count: | |
image_prompts.append(f"A high-quality image about {self.subject}") | |
# Limit to the requested count | |
image_prompts = image_prompts[:count] | |
self.image_prompts = image_prompts | |
self.log(success(f"Generated {len(self.image_prompts)} Image Prompts")) | |
for i, prompt in enumerate(self.image_prompts): | |
self.log(f"Image Prompt {i+1}: {prompt}") | |
return image_prompts | |
def generate_image(self, prompt) -> str: | |
"""Generate an image using the selected image generation model.""" | |
self.log(f"Generating image for prompt: {prompt[:50]}...") | |
try: | |
image_path = os.path.join(CACHE_DIR, f"img_{len(self.images)}_{int(time.time())}.png") | |
if self.image_gen == "prodia": | |
self.log("Using Prodia provider for image generation") | |
s = requests.Session() | |
headers = { | |
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" | |
} | |
# Generate job | |
self.log("Sending generation request to Prodia API") | |
resp = s.get( | |
"https://api.prodia.com/generate", | |
params={ | |
"new": "true", | |
"prompt": prompt, | |
"model": self.image_model, | |
"negative_prompt": "verybadimagenegative_v1.3", | |
"steps": "20", | |
"cfg": "7", | |
"seed": random.randint(1, 10000), | |
"sample": "DPM++ 2M Karras", | |
"aspect_ratio": "square" | |
}, | |
headers=headers | |
) | |
if resp.status_code != 200: | |
raise Exception(f"Prodia API error: {resp.text}") | |
job_id = resp.json()['job'] | |
self.log(f"Job created with ID: {job_id}") | |
# Wait for generation to complete | |
max_attempts = 30 | |
attempts = 0 | |
while attempts < max_attempts: | |
attempts += 1 | |
time.sleep(2) | |
status = s.get(f"https://api.prodia.com/job/{job_id}", headers=headers).json() | |
if status["status"] == "succeeded": | |
self.log("Image generation successful, downloading result") | |
img_data = s.get(f"https://images.prodia.xyz/{job_id}.png?download=1", headers=headers).content | |
with open(image_path, "wb") as f: | |
f.write(img_data) | |
self.images.append(image_path) | |
self.log(success(f"Image saved to: {image_path}")) | |
return image_path | |
elif status["status"] == "failed": | |
raise Exception(f"Prodia job failed: {status.get('error', 'Unknown error')}") | |
# Still processing | |
self.log(f"Still processing, attempt {attempts}/{max_attempts}...") | |
raise Exception("Prodia job timed out") | |
elif self.image_gen == "hercai": | |
self.log("Using Hercai provider for image generation") | |
url = f"https://hercai.onrender.com/{self.image_model}/text2image?prompt={prompt}" | |
r = requests.get(url) | |
if r.status_code != 200: | |
raise Exception(f"Hercai API error: {r.text}") | |
parsed = r.json() | |
if "url" in parsed and parsed["url"]: | |
self.log("Image URL received from Hercai") | |
image_url = parsed["url"] | |
img_data = requests.get(image_url).content | |
with open(image_path, "wb") as f: | |
f.write(img_data) | |
self.images.append(image_path) | |
self.log(success(f"Image saved to: {image_path}")) | |
return image_path | |
else: | |
raise Exception("No image URL in Hercai response") | |
elif self.image_gen == "g4f": | |
self.log("Using G4F provider for image generation") | |
try: | |
from g4f.client import Client | |
client = Client() | |
response = client.images.generate( | |
model=self.image_model, | |
prompt=prompt, | |
response_format="url" | |
) | |
if response and response.data and len(response.data) > 0: | |
image_url = response.data[0].url | |
image_response = requests.get(image_url) | |
if image_response.status_code == 200: | |
with open(image_path, "wb") as f: | |
f.write(image_response.content) | |
self.images.append(image_path) | |
self.log(success(f"Image saved to: {image_path}")) | |
return image_path | |
else: | |
raise Exception(f"Failed to download image from {image_url}") | |
else: | |
raise Exception("No image URL received from G4F") | |
except Exception as e: | |
raise Exception(f"G4F image generation failed: {str(e)}") | |
elif self.image_gen == "segmind": | |
self.log("Using Segmind provider for image generation") | |
api_key = os.environ.get("SEGMIND_API_KEY", "") | |
if not api_key: | |
raise ValueError("Segmind API key is not set. Please provide a valid API key.") | |
headers = { | |
"x-api-key": api_key, | |
"Content-Type": "application/json" | |
} | |
response = requests.post( | |
"https://api.segmind.com/v1/sdxl-turbo", | |
json={ | |
"prompt": prompt, | |
"negative_prompt": "blurry, low quality, distorted face, text, watermark", | |
"samples": 1, | |
"size": "1024x1024", | |
"guidance_scale": 1.0 | |
}, | |
headers=headers | |
) | |
if response.status_code == 200: | |
with open(image_path, "wb") as f: | |
f.write(response.content) | |
self.images.append(image_path) | |
self.log(success(f"Image saved to: {image_path}")) | |
return image_path | |
else: | |
raise Exception(f"Segmind request failed: {response.status_code} {response.text}") | |
elif self.image_gen == "pollinations": | |
self.log("Using Pollinations provider for image generation") | |
response = requests.get(f"https://image.pollinations.ai/prompt/{prompt}{random.randint(1,10000)}") | |
if response.status_code == 200: | |
self.log("Image received from Pollinations") | |
with open(image_path, "wb") as f: | |
f.write(response.content) | |
self.images.append(image_path) | |
self.log(success(f"Image saved to: {image_path}")) | |
return image_path | |
else: | |
raise Exception(f"Pollinations request failed with status code: {response.status_code}") | |
else: | |
# Default to generating a colored placeholder image | |
self.log(f"Unknown provider '{self.image_gen}'. Generating placeholder image.") | |
# Create a placeholder colored image with the prompt text | |
img = Image.new('RGB', (800, 800), color=(random.randint(0, 255), | |
random.randint(0, 255), | |
random.randint(0, 255))) | |
img.save(image_path) | |
self.images.append(image_path) | |
self.log(warning(f"Created placeholder image at: {image_path}")) | |
return image_path | |
except Exception as e: | |
error_msg = f"Image generation failed: {str(e)}" | |
self.log(error(error_msg)) | |
# Create a fallback image | |
try: | |
img = Image.new('RGB', (800, 800), color=(200, 200, 200)) | |
image_path = os.path.join(CACHE_DIR, f"error_img_{len(self.images)}_{int(time.time())}.png") | |
img.save(image_path) | |
self.images.append(image_path) | |
self.log(warning(f"Created error placeholder image at: {image_path}")) | |
return image_path | |
except: | |
# If all else fails, return None and handle it gracefully | |
return None | |
def generate_speech(self, text, output_format='mp3') -> str: | |
"""Generate speech from text using the selected TTS engine.""" | |
self.progress(0.6, desc="Creating voiceover") | |
self.log("Generating speech from text") | |
# Clean text | |
text = re.sub(r'[^\w\s.?!,;:\'"-]', '', text) | |
self.log(f"Using TTS Engine: {self.tts_engine}, Voice: {self.tts_voice}") | |
audio_path = os.path.join(CACHE_DIR, f"speech_{int(time.time())}.{output_format}") | |
try: | |
if self.tts_engine == "elevenlabs": | |
self.log("Using ElevenLabs provider for speech generation") | |
elevenlabs_api_key = os.environ.get("ELEVENLABS_API_KEY", "") | |
if not elevenlabs_api_key: | |
raise ValueError("ElevenLabs API key is not set. Please provide a valid API key.") | |
headers = { | |
"Accept": "audio/mpeg", | |
"Content-Type": "application/json", | |
"xi-api-key": elevenlabs_api_key | |
} | |
payload = { | |
"text": text, | |
"model_id": "eleven_monolingual_v1", | |
"voice_settings": { | |
"stability": 0.5, | |
"similarity_boost": 0.5, | |
"style": 0.0, | |
"use_speaker_boost": True | |
} | |
} | |
voice_id = self.tts_voice if self.tts_voice not in ["Sarah", "default"] else "21m00Tcm4TlvDq8ikWAM" | |
response = requests.post( | |
url=f"https://api.elevenlabs.io/v1/text-to-speech/{voice_id}", | |
json=payload, | |
headers=headers | |
) | |
if response.status_code == 200: | |
with open(audio_path, 'wb') as f: | |
f.write(response.content) | |
self.log(success(f"Speech generated successfully using ElevenLabs at {audio_path}")) | |
else: | |
raise Exception(f"ElevenLabs API error: {response.text}") | |
elif self.tts_engine == "gtts": | |
self.log("Using Google TTS provider for speech generation") | |
from gtts import gTTS | |
tts = gTTS(text=text, lang=self.language[:2].lower(), slow=False) | |
tts.save(audio_path) | |
elif self.tts_engine == "openai": | |
self.log("Using OpenAI provider for speech generation") | |
openai_api_key = os.environ.get("OPENAI_API_KEY", "") | |
if not openai_api_key: | |
raise ValueError("OpenAI API key is not set. Please provide a valid API key.") | |
from openai import OpenAI | |
client = OpenAI(api_key=openai_api_key) | |
voice = self.tts_voice if self.tts_voice else "alloy" | |
response = client.audio.speech.create( | |
model="tts-1", | |
voice=voice, | |
input=text | |
) | |
response.stream_to_file(audio_path) | |
elif self.tts_engine == "edge": | |
self.log("Using Edge TTS provider for speech generation") | |
import edge_tts | |
import asyncio | |
voice = self.tts_voice if self.tts_voice else "en-US-AriaNeural" | |
async def generate(): | |
communicate = edge_tts.Communicate(text, voice) | |
await communicate.save(audio_path) | |
asyncio.run(generate()) | |
else: | |
# Fallback to gtts | |
self.log(f"Unknown TTS engine '{self.tts_engine}'. Falling back to gTTS.") | |
from gtts import gTTS | |
tts = gTTS(text=text, lang=self.language[:2].lower(), slow=False) | |
tts.save(audio_path) | |
self.log(success(f"Speech generated and saved to: {audio_path}")) | |
self.tts_path = audio_path | |
return audio_path | |
except Exception as e: | |
error_msg = f"Speech generation failed: {str(e)}" | |
self.log(error(error_msg)) | |
# Create a silent audio file as fallback | |
try: | |
from pydub import AudioSegment | |
from pydub.generators import Sine | |
# Generate 30 seconds of silence | |
silence = AudioSegment.silent(duration=30000) | |
silence.export(audio_path, format=output_format) | |
self.log(warning(f"Created silent audio fallback at: {audio_path}")) | |
self.tts_path = audio_path | |
return audio_path | |
except: | |
self.log(error("Failed to create silent audio fallback")) | |
return None | |
def generate_subtitles(self, audio_path): | |
"""Generate word-level subtitles for the video.""" | |
if not self.subtitles_enabled: | |
self.log("Subtitles are disabled. Skipping subtitle generation.") | |
return None | |
self.progress(0.65, desc="Creating subtitles") | |
self.log("Starting subtitle generation process") | |
try: | |
assemblyai_api_key = os.environ.get("ASSEMBLYAI_API_KEY", "") | |
if not assemblyai_api_key: | |
self.log(warning("AssemblyAI API key not set. Generating simulated subtitles.")) | |
return self._generate_simulated_subtitles() | |
import assemblyai as aai | |
aai.settings.api_key = assemblyai_api_key | |
config = aai.TranscriptionConfig(speaker_labels=False, word_boost=[], format_text=True) | |
transcriber = aai.Transcriber(config=config) | |
self.log("Submitting audio for transcription") | |
transcript = transcriber.transcribe(audio_path) | |
if not transcript or not transcript.words: | |
self.log(warning("Transcription returned no words. Using simulated subtitles.")) | |
return self._generate_simulated_subtitles() | |
# Process word-level information | |
wordlevel_info = [] | |
for word in transcript.words: | |
word_data = { | |
"word": word.text.strip(), | |
"start": word.start / 1000.0, | |
"end": word.end / 1000.0 | |
} | |
wordlevel_info.append(word_data) | |
self.log(success(f"Transcription successful. Got {len(wordlevel_info)} words.")) | |
# Define constants for subtitle generation | |
FONT = self.subtitle_font | |
FONTSIZE = self.font_size | |
COLOR = self.text_color | |
BG_COLOR = self.highlight_color if self.highlighting_enabled else None | |
FRAME_SIZE = (1080, 1920) | |
MAX_CHARS = 30 | |
MAX_DURATION = 3.0 | |
MAX_GAP = 2.5 | |
# Split text into lines based on character count, duration, and gap | |
subtitles = [] | |
line = [] | |
line_duration = 0 | |
for idx, word_data in enumerate(wordlevel_info): | |
line.append(word_data) | |
line_duration += word_data["end"] - word_data["start"] | |
temp = " ".join(item["word"] for item in line) | |
new_line_chars = len(temp) | |
duration_exceeded = line_duration > MAX_DURATION | |
chars_exceeded = new_line_chars > MAX_CHARS | |
if idx > 0: | |
gap = word_data['start'] - wordlevel_info[idx - 1]['end'] | |
maxgap_exceeded = gap > MAX_GAP | |
else: | |
maxgap_exceeded = False | |
# Check if any condition is exceeded to finalize the current line | |
if duration_exceeded or chars_exceeded or maxgap_exceeded: | |
if line: | |
subtitle_line = { | |
"text": " ".join(item["word"] for item in line), | |
"start": line[0]["start"], | |
"end": line[-1]["end"], | |
"words": line | |
} | |
subtitles.append(subtitle_line) | |
line = [] | |
line_duration = 0 | |
# Add the remaining words as the last subtitle line if any | |
if line: | |
subtitle_line = { | |
"text": " ".join(item["word"] for item in line), | |
"start": line[0]["start"], | |
"end": line[-1]["end"], | |
"words": line | |
} | |
subtitles.append(subtitle_line) | |
self.log(success(f"Generated {len(subtitles)} subtitle lines")) | |
return { | |
"wordlevel": wordlevel_info, | |
"linelevel": subtitles, | |
"settings": { | |
"font": FONT, | |
"fontsize": FONTSIZE, | |
"color": COLOR, | |
"bg_color": BG_COLOR, | |
"position": self.subtitle_position, | |
"highlighting_enabled": self.highlighting_enabled | |
} | |
} | |
except Exception as e: | |
error_msg = f"Subtitle generation failed: {str(e)}" | |
self.log(error(error_msg)) | |
return self._generate_simulated_subtitles() | |
def _generate_simulated_subtitles(self): | |
"""Generate simulated subtitles when AssemblyAI is not available.""" | |
self.log("Generating simulated subtitles") | |
# Split script into words | |
words = self.script.split() | |
# Estimate audio duration based on word count (average speaking rate) | |
estimated_duration = len(words) * 0.3 # 0.3 seconds per word on average | |
# Generate word-level timings | |
wordlevel_info = [] | |
current_time = 0 | |
for word in words: | |
# Adjust duration based on word length | |
word_duration = 0.2 + min(0.05 * len(word), 0.3) # Between 0.2 and 0.5 seconds | |
word_data = { | |
"word": word, | |
"start": current_time, | |
"end": current_time + word_duration | |
} | |
wordlevel_info.append(word_data) | |
# Add a small gap between words | |
current_time += word_duration + 0.05 | |
# Generate line-level subtitles | |
subtitles = [] | |
line = [] | |
line_start = 0 | |
line_text = "" | |
for word_data in wordlevel_info: | |
# Check if adding this word would exceed character limit | |
if len(line_text + " " + word_data["word"]) > 30 and line: | |
# Finalize current line | |
subtitle_line = { | |
"text": line_text, | |
"start": line_start, | |
"end": line[-1]["end"], | |
"words": line.copy() | |
} | |
subtitles.append(subtitle_line) | |
# Start new line | |
line = [word_data] | |
line_start = word_data["start"] | |
line_text = word_data["word"] | |
else: | |
# Add word to current line | |
line.append(word_data) | |
line_text = (line_text + " " + word_data["word"]).strip() | |
if len(line) == 1: | |
line_start = word_data["start"] | |
# Add final line if not empty | |
if line: | |
subtitle_line = { | |
"text": line_text, | |
"start": line_start, | |
"end": line[-1]["end"], | |
"words": line | |
} | |
subtitles.append(subtitle_line) | |
self.log(success(f"Generated {len(wordlevel_info)} simulated word timings and {len(subtitles)} subtitle lines")) | |
# Define settings for subtitle display | |
settings = { | |
"font": self.subtitle_font, | |
"fontsize": self.font_size, | |
"color": self.text_color, | |
"bg_color": self.highlight_color if self.highlighting_enabled else None, | |
"position": self.subtitle_position, | |
"highlighting_enabled": self.highlighting_enabled | |
} | |
return { | |
"wordlevel": wordlevel_info, | |
"linelevel": subtitles, | |
"settings": settings | |
} | |
def combine(self) -> str: | |
"""Combine images, audio, and subtitles into a final video.""" | |
self.progress(0.8, desc="Creating final video") | |
self.log("Combining images and audio into final video") | |
try: | |
output_path = os.path.join(CACHE_DIR, f"output_{int(time.time())}.mp4") | |
# Check for required files | |
if not self.images: | |
raise ValueError("No images available for video creation") | |
if not hasattr(self, 'tts_path') or not self.tts_path or not os.path.exists(self.tts_path): | |
raise ValueError("No TTS audio file available") | |
# Load audio | |
tts_clip = AudioFileClip(self.tts_path) | |
max_duration = tts_clip.duration | |
# Calculate duration for each image | |
num_images = len(self.images) | |
req_dur = max_duration / num_images | |
# Create video clips from images | |
clips = [] | |
tot_dur = 0 | |
# Loop through images, repeating if necessary to fill audio duration | |
while tot_dur < max_duration: | |
for image_path in self.images: | |
# Check if image exists and is valid | |
if not os.path.exists(image_path): | |
self.log(warning(f"Image not found: {image_path}, skipping")) | |
continue | |
try: | |
clip = ImageClip(image_path) | |
clip = clip.set_duration(req_dur) | |
clip = clip.set_fps(30) | |
# Handle aspect ratio (vertical video for shorts) | |
aspect_ratio = 9/16 # Standard vertical video ratio | |
if clip.w / clip.h < aspect_ratio: | |
# Image is too tall, crop height | |
clip = crop( | |
clip, | |
width=clip.w, | |
height=round(clip.w / aspect_ratio), | |
x_center=clip.w / 2, | |
y_center=clip.h / 2 | |
) | |
else: | |
# Image is too wide, crop width | |
clip = crop( | |
clip, | |
width=round(aspect_ratio * clip.h), | |
height=clip.h, | |
x_center=clip.w / 2, | |
y_center=clip.h / 2 | |
) | |
# Resize to standard size for shorts | |
clip = clip.resize((1080, 1920)) | |
clips.append(clip) | |
tot_dur += clip.duration | |
# If we've exceeded the duration, break | |
if tot_dur >= max_duration: | |
break | |
except Exception as e: | |
self.log(warning(f"Error processing image {image_path}: {str(e)}")) | |
# Create video from clips | |
self.log(f"Creating video from {len(clips)} clips") | |
final_clip = concatenate_videoclips(clips) | |
final_clip = final_clip.set_fps(30) | |
# Add background music if available | |
music_path = None | |
if self.music_file == "random": | |
music_path = choose_random_music() | |
elif self.music_file != "none" and os.path.exists(os.path.join(MUSIC_DIR, self.music_file)): | |
music_path = os.path.join(MUSIC_DIR, self.music_file) | |
if music_path and os.path.exists(music_path): | |
self.log(f"Adding background music: {music_path}") | |
try: | |
music_clip = AudioFileClip(music_path) | |
# Loop music if it's shorter than the video | |
if music_clip.duration < max_duration: | |
repeats = int(max_duration / music_clip.duration) + 1 | |
music_clip = concatenate_audioclips([music_clip] * repeats) | |
# Trim if it's longer | |
music_clip = music_clip.subclip(0, max_duration) | |
# Reduce volume | |
music_clip = music_clip.fx(volumex, 0.1) | |
# Combine audio tracks | |
comp_audio = CompositeAudioClip([tts_clip, music_clip]) | |
final_clip = final_clip.set_audio(comp_audio) | |
except Exception as e: | |
self.log(warning(f"Error adding background music: {str(e)}")) | |
final_clip = final_clip.set_audio(tts_clip) | |
else: | |
self.log("No background music found, using TTS audio only") | |
final_clip = final_clip.set_audio(tts_clip) | |
# Set final duration | |
final_clip = final_clip.set_duration(tts_clip.duration) | |
# Generate subtitles if enabled | |
subtitle_clips = [] | |
if self.subtitles_enabled: | |
subtitles = self.generate_subtitles(self.tts_path) | |
if subtitles and 'wordlevel' in subtitles: | |
self.log("Adding word-level subtitles") | |
from moviepy.video.tools.subtitles import TextClip | |
# Define subtitle styles | |
font = subtitles['settings']['font'] if subtitles['settings']['font'] != "default" and os.path.exists(os.path.join(FONTS_DIR, f"{subtitles['settings']['font']}.ttf")) else None | |
fontsize = subtitles['settings']['fontsize'] | |
color = subtitles['settings']['color'] | |
bg_color = subtitles['settings']['bg_color'] if subtitles['settings']['highlighting_enabled'] else None | |
# Calculate position based on subtitle_position setting | |
frame_width, frame_height = 1080, 1920 | |
if self.subtitle_position == "top": | |
y_pos = frame_height * 0.1 # Position at 10% from top | |
elif self.subtitle_position == "middle": | |
y_pos = frame_height * 0.5 # Position at middle | |
else: # bottom (default) | |
y_pos = frame_height * 0.85 # Position at 85% from top | |
for subtitle in subtitles['linelevel']: | |
full_duration = subtitle['end'] - subtitle['start'] | |
# Initialize position for each subtitle line | |
x_pos = 0 | |
x_buffer = frame_width * 1 / 10 | |
# Handle word-level subtitles if highlighting is enabled | |
if self.highlighting_enabled: | |
# Add each word with proper timing and highlighting | |
for word_data in subtitle['words']: | |
word = word_data['word'] | |
start = word_data['start'] | |
end = word_data['end'] | |
# Create text clip for word | |
try: | |
word_clip = TextClip( | |
txt=word, | |
font=font, | |
fontsize=fontsize, | |
color=color, | |
bg_color=bg_color, | |
stroke_color='black', | |
stroke_width=1 | |
).set_position((x_pos + x_buffer, y_pos)).set_start(start).set_duration(end - start) | |
subtitle_clips.append(word_clip) | |
x_pos += word_clip.w + 10 # Add spacing between words | |
# Wrap to next line if needed | |
if x_pos + word_clip.w > frame_width - 2 * x_buffer: | |
x_pos = 0 | |
y_pos += word_clip.h + 10 | |
except Exception as e: | |
self.log(warning(f"Error creating subtitle for word '{word}': {str(e)}")) | |
else: | |
# Show entire line without word-level highlighting | |
try: | |
line_clip = TextClip( | |
txt=subtitle['text'], | |
font=font, | |
fontsize=fontsize, | |
color=color, | |
bg_color=None, | |
stroke_color='black', | |
stroke_width=1, | |
method='caption', | |
size=(frame_width - 2 * x_buffer, None), | |
align='center' | |
).set_position(('center', y_pos)).set_start(subtitle['start']).set_duration(full_duration) | |
subtitle_clips.append(line_clip) | |
except Exception as e: | |
self.log(warning(f"Error creating subtitle line: {str(e)}")) | |
# Add subtitles to video if any were created | |
if subtitle_clips: | |
self.log(f"Adding {len(subtitle_clips)} subtitle clips to video") | |
final_clip = CompositeVideoClip([final_clip] + subtitle_clips) | |
# Write final video | |
self.log("Writing final video file") | |
final_clip.write_videofile(output_path, threads=4, codec='libx264', audio_codec='aac') | |
success_msg = f"Video successfully created at: {output_path}" | |
self.log(success(success_msg)) | |
self.video_path = output_path | |
return output_path | |
except Exception as e: | |
error_msg = f"Error combining video: {str(e)}" | |
self.log(error(error_msg)) | |
# Create a minimal fallback video if possible | |
try: | |
# Try to create a simple video with just the first image and audio | |
fallback_path = os.path.join(CACHE_DIR, f"fallback_{int(time.time())}.mp4") | |
if self.images and os.path.exists(self.images[0]) and hasattr(self, 'tts_path') and os.path.exists(self.tts_path): | |
img_clip = ImageClip(self.images[0]).set_duration(10) | |
img_clip = img_clip.resize((1080, 1920)) | |
audio_clip = AudioFileClip(self.tts_path).subclip(0, min(10, AudioFileClip(self.tts_path).duration)) | |
video_clip = img_clip.set_audio(audio_clip) | |
video_clip.write_videofile(fallback_path, threads=2, codec='libx264', audio_codec='aac') | |
self.log(warning(f"Created fallback video at: {fallback_path}")) | |
self.video_path = fallback_path | |
return fallback_path | |
else: | |
raise Exception("Cannot create fallback video: missing images or audio") | |
except Exception as fallback_error: | |
self.log(error(f"Failed to create fallback video: {str(fallback_error)}")) | |
return None | |
def generate_video(self) -> dict: | |
"""Generate complete video with all components.""" | |
try: | |
self.log("Starting video generation process") | |
# Step 1: Generate topic | |
self.log("Generating topic") | |
self.generate_topic() | |
# Step 2: Generate script | |
self.progress(0.1, desc="Creating script") | |
self.log("Generating script") | |
self.generate_script() | |
# Step 3: Generate metadata | |
self.progress(0.2, desc="Creating metadata") | |
self.log("Generating metadata") | |
self.generate_metadata() | |
# Step 4: Generate image prompts | |
self.progress(0.3, desc="Creating image prompts") | |
self.log("Generating image prompts") | |
self.generate_prompts() | |
# Step 5: Generate images | |
self.progress(0.4, desc="Generating images") | |
self.log("Generating images") | |
for i, prompt in enumerate(self.image_prompts, 1): | |
self.progress(0.4 + 0.2 * (i / len(self.image_prompts)), | |
desc=f"Generating image {i}/{len(self.image_prompts)}") | |
self.log(f"Generating image {i}/{len(self.image_prompts)}") | |
self.generate_image(prompt) | |
# Step 6: Generate speech | |
self.progress(0.6, desc="Creating speech") | |
self.log("Generating speech") | |
self.generate_speech(self.script) | |
# Step 7: Combine all elements into final video | |
self.progress(0.8, desc="Creating final video") | |
self.log("Combining all elements into final video") | |
path = self.combine() | |
self.progress(0.95, desc="Finalizing") | |
self.log(f"Video generation complete. File saved at: {path}") | |
# Return the result | |
return { | |
'video_path': path, | |
'title': self.metadata['title'], | |
'description': self.metadata['description'], | |
'subject': self.subject, | |
'script': self.script, | |
'logs': self.logs | |
} | |
except Exception as e: | |
error_msg = f"Error during video generation: {str(e)}" | |
self.log(error(error_msg)) | |
raise Exception(error_msg) | |
# Data for dynamic dropdowns | |
def get_text_generator_models(generator): | |
"""Get available models for the selected text generator.""" | |
models = { | |
"gemini": [ | |
"gemini-2.0-flash", | |
"gemini-2.0-flash-lite", | |
"gemini-1.5-flash", | |
"gemini-1.5-flash-8b", | |
"gemini-1.5-pro" | |
], | |
"g4f": [ | |
"gpt-4", | |
"gpt-4o", | |
"gpt-3.5-turbo", | |
"llama-3-70b-chat", | |
"claude-3-opus-20240229", | |
"claude-3-sonnet-20240229", | |
"claude-3-haiku-20240307" | |
], | |
"openai": [ | |
"gpt-4o", | |
"gpt-4-turbo", | |
"gpt-3.5-turbo" | |
] | |
} | |
return models.get(generator, ["default"]) | |
def get_image_generator_models(generator): | |
"""Get available models for the selected image generator.""" | |
models = { | |
"prodia": [ | |
"sdxl", | |
"realvisxl", | |
"juggernaut", | |
"dreamshaper", | |
"dalle" | |
], | |
"hercai": [ | |
"v1", | |
"v2", | |
"v3", | |
"lexica" | |
], | |
"g4f": [ | |
"flux", | |
"dall-e-3", | |
"dall-e-2", | |
"midjourney" | |
], | |
"segmind": [ | |
"sdxl-turbo", | |
"realistic-vision", | |
"sd3" | |
], | |
"pollinations": [ | |
"default" | |
] | |
} | |
return models.get(generator, ["default"]) | |
def get_tts_voices(engine): | |
"""Get available voices for the selected TTS engine.""" | |
voices = { | |
"elevenlabs": [ | |
"Sarah", | |
"Brian", | |
"Lily", | |
"Monika Sogam", | |
"George", | |
"River", | |
"Matilda", | |
"Will", | |
"Jessica" | |
], | |
"openai": [ | |
"alloy", | |
"echo", | |
"fable", | |
"onyx", | |
"nova", | |
"shimmer" | |
], | |
"edge": [ | |
"en-US-AriaNeural", | |
"en-US-GuyNeural", | |
"en-GB-SoniaNeural", | |
"en-AU-NatashaNeural" | |
], | |
"gtts": [ | |
"en", | |
"es", | |
"fr", | |
"de", | |
"it", | |
"pt", | |
"ru", | |
"ja", | |
"zh", | |
"hi" | |
] | |
} | |
return voices.get(engine, ["default"]) | |
# Create the Gradio interface | |
def create_interface(): | |
with gr.Blocks(theme=gr.themes.Soft(primary_hue="indigo"), title="YouTube Shorts Generator") as demo: | |
with gr.Row(): | |
gr.Markdown( | |
""" | |
# 📱 YouTube Shorts Generator | |
Generate engaging YouTube Shorts videos with AI. Just provide a niche and language to get started! | |
""" | |
) | |
with gr.Row(equal_height=True): | |
# Left panel: Content Settings | |
with gr.Column(scale=1, min_width=400): | |
with gr.Group(): | |
gr.Markdown("### 📝 Content") | |
niche = gr.Textbox( | |
label="Niche/Topic", | |
placeholder="What's your video about?", | |
value="Historical Facts" | |
) | |
language = gr.Dropdown( | |
choices=["English", "Spanish", "French", "German", "Italian", "Portuguese", | |
"Russian", "Japanese", "Chinese", "Hindi"], | |
label="Language", | |
value="English" | |
) | |
# Middle panel: Generator Settings | |
with gr.Group(): | |
gr.Markdown("### 🔧 Generator Settings") | |
with gr.Tabs(): | |
with gr.TabItem("Text"): | |
text_gen = gr.Dropdown( | |
choices=["g4f", "gemini", "openai"], | |
label="Text Generator", | |
value="g4f" | |
) | |
text_model = gr.Dropdown( | |
choices=get_text_generator_models("g4f"), | |
label="Text Model", | |
value="gpt-4" | |
) | |
with gr.TabItem("Image"): | |
image_gen = gr.Dropdown( | |
choices=["g4f", "prodia", "hercai", "segmind", "pollinations"], | |
label="Image Generator", | |
value="g4f" | |
) | |
image_model = gr.Dropdown( | |
choices=get_image_generator_models("g4f"), | |
label="Image Model", | |
value="flux" | |
) | |
with gr.TabItem("Audio"): | |
tts_engine = gr.Dropdown( | |
choices=["edge", "elevenlabs", "gtts", "openai"], | |
label="Speech Engine", | |
value="edge" | |
) | |
tts_voice = gr.Dropdown( | |
choices=get_tts_voices("edge"), | |
label="Voice", | |
value="en-US-AriaNeural" | |
) | |
music_file = gr.Dropdown( | |
choices=get_music_files(), | |
label="Background Music", | |
value="random" | |
) | |
with gr.TabItem("Subtitles"): | |
subtitles_enabled = gr.Checkbox(label="Enable Subtitles", value=True) | |
highlighting_enabled = gr.Checkbox(label="Enable Word Highlighting", value=True) | |
subtitle_font = gr.Dropdown( | |
choices=get_font_files(), | |
label="Font", | |
value="default" | |
) | |
with gr.Row(): | |
font_size = gr.Slider( | |
minimum=40, | |
maximum=120, | |
value=80, | |
step=5, | |
label="Font Size" | |
) | |
subtitle_position = gr.Dropdown( | |
choices=["bottom", "middle", "top"], | |
label="Position", | |
value="bottom" | |
) | |
with gr.Row(): | |
text_color = gr.ColorPicker(label="Text Color", value="#FFFFFF") | |
highlight_color = gr.ColorPicker(label="Highlight Color", value="#0000FF") | |
# API Keys section | |
with gr.Accordion("🔑 API Keys", open=False): | |
gemini_api_key = gr.Textbox( | |
label="Gemini API Key", | |
type="password", | |
value=os.environ.get("GEMINI_API_KEY", "") | |
) | |
assemblyai_api_key = gr.Textbox( | |
label="AssemblyAI API Key", | |
type="password", | |
value=os.environ.get("ASSEMBLYAI_API_KEY", "") | |
) | |
elevenlabs_api_key = gr.Textbox( | |
label="ElevenLabs API Key", | |
type="password", | |
value=os.environ.get("ELEVENLABS_API_KEY", "") | |
) | |
segmind_api_key = gr.Textbox( | |
label="Segmind API Key", | |
type="password", | |
value=os.environ.get("SEGMIND_API_KEY", "") | |
) | |
openai_api_key = gr.Textbox( | |
label="OpenAI API Key", | |
type="password", | |
value=os.environ.get("OPENAI_API_KEY", "") | |
) | |
# Generate button | |
generate_btn = gr.Button("🎬 Generate Video", variant="primary", size="lg") | |
# Right panel: Output display | |
with gr.Column(scale=1, min_width=400): | |
with gr.Tabs(): | |
with gr.TabItem("Video"): | |
video_output = gr.Video(label="Generated Video", height=600) | |
with gr.TabItem("Metadata"): | |
title_output = gr.Textbox(label="Title", lines=2) | |
description_output = gr.Textbox(label="Description", lines=4) | |
script_output = gr.Textbox(label="Script", lines=8) | |
with gr.TabItem("Log"): | |
log_output = gr.Textbox(label="Process Log", lines=20, max_lines=100) | |
# Dynamic dropdown updates | |
def update_text_models(generator): | |
return gr.Dropdown(choices=get_text_generator_models(generator)) | |
def update_image_models(generator): | |
return gr.Dropdown(choices=get_image_generator_models(generator)) | |
def update_tts_voices(engine): | |
return gr.Dropdown(choices=get_tts_voices(engine)) | |
# Connect the change events | |
text_gen.change(fn=update_text_models, inputs=text_gen, outputs=text_model) | |
image_gen.change(fn=update_image_models, inputs=image_gen, outputs=image_model) | |
tts_engine.change(fn=update_tts_voices, inputs=tts_engine, outputs=tts_voice) | |
# Main generation function | |
def generate_youtube_short(niche, language, gemini_api_key, assemblyai_api_key, | |
elevenlabs_api_key, segmind_api_key, openai_api_key, | |
text_gen, text_model, image_gen, image_model, | |
tts_engine, tts_voice, subtitles_enabled, highlighting_enabled, | |
subtitle_font, font_size, subtitle_position, | |
text_color, highlight_color, music_file, progress=gr.Progress()): | |
if not niche.strip(): | |
return { | |
video_output: None, | |
title_output: "ERROR: Please enter a niche/topic", | |
description_output: "", | |
script_output: "", | |
log_output: "Error: Niche/Topic is required. Please enter a valid topic and try again." | |
} | |
# Create API keys dictionary | |
api_keys = { | |
'gemini': gemini_api_key, | |
'assemblyai': assemblyai_api_key, | |
'elevenlabs': elevenlabs_api_key, | |
'segmind': segmind_api_key, | |
'openai': openai_api_key | |
} | |
try: | |
# Initialize YouTube class | |
yt = YouTube( | |
niche=niche, | |
language=language, | |
text_gen=text_gen, | |
text_model=text_model, | |
image_gen=image_gen, | |
image_model=image_model, | |
tts_engine=tts_engine, | |
tts_voice=tts_voice, | |
subtitle_font=subtitle_font, | |
font_size=font_size, | |
text_color=text_color, | |
highlight_color=highlight_color, | |
subtitles_enabled=subtitles_enabled, | |
highlighting_enabled=highlighting_enabled, | |
subtitle_position=subtitle_position, | |
music_file=music_file, | |
api_keys=api_keys, | |
progress=progress | |
) | |
# Generate video | |
result = yt.generate_video() | |
# Check if video was successfully created | |
if not result or not result.get('video_path') or not os.path.exists(result.get('video_path', '')): | |
return { | |
video_output: None, | |
title_output: "ERROR: Video generation failed", | |
description_output: "", | |
script_output: "", | |
log_output: "\n".join(yt.logs) | |
} | |
return { | |
video_output: result['video_path'], | |
title_output: result['title'], | |
description_output: result['description'], | |
script_output: result['script'], | |
log_output: "\n".join(result['logs']) | |
} | |
except Exception as e: | |
import traceback | |
error_details = f"Error: {str(e)}\n\n{traceback.format_exc()}" | |
return { | |
video_output: None, | |
title_output: f"ERROR: {str(e)}", | |
description_output: "", | |
script_output: "", | |
log_output: error_details | |
} | |
# Connect the button click event | |
generate_btn.click( | |
fn=generate_youtube_short, | |
inputs=[ | |
niche, language, gemini_api_key, assemblyai_api_key, elevenlabs_api_key, | |
segmind_api_key, openai_api_key, text_gen, text_model, image_gen, image_model, | |
tts_engine, tts_voice, subtitles_enabled, highlighting_enabled, | |
subtitle_font, font_size, subtitle_position, text_color, highlight_color, music_file | |
], | |
outputs=[video_output, title_output, description_output, script_output, log_output] | |
) | |
# Add examples | |
gr.Examples( | |
[ | |
["Historical Facts", "English", "g4f", "gpt-4", "g4f", "flux", "edge", "en-US-AriaNeural", True, True, "default", 80, "bottom", "#FFFFFF", "#0000FF", "random"], | |
["Cooking Tips", "English", "g4f", "gpt-4", "g4f", "flux", "edge", "en-US-AriaNeural", True, True, "default", 80, "bottom", "#FFFFFF", "#FF0000", "random"], | |
["Technology News", "English", "g4f", "gpt-4", "g4f", "flux", "edge", "en-US-GuyNeural", True, True, "default", 80, "bottom", "#FFFFFF", "#00FF00", "random"], | |
], | |
[niche, language, text_gen, text_model, image_gen, image_model, tts_engine, tts_voice, | |
subtitles_enabled, highlighting_enabled, subtitle_font, font_size, | |
subtitle_position, text_color, highlight_color, music_file], | |
label="Quick Start Templates" | |
) | |
return demo | |
# Create and launch the interface | |
if __name__ == "__main__": | |
# Create necessary directories | |
os.makedirs(CACHE_DIR, exist_ok=True) | |
os.makedirs(MUSIC_DIR, exist_ok=True) | |
os.makedirs(FONTS_DIR, exist_ok=True) | |
# Launch the app | |
demo = create_interface() | |
demo.launch() |