diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -1,324 +1,94 @@ - - -
- - -Generate short videos based on a niche and language
- --gradio==3.50.2 -g4f==0.1.9.0 -moviepy==1.0.3 -assemblyai==0.17.0 -requests==2.31.0 -google-generativeai==0.3.1 -python-dotenv==1.0.0 -Pillow==10.0.0 -openai==1.3.5 -edge-tts==6.1.9 -bark==0.0.1 -tensorflow==2.12.0 -soundfile==0.12.1 -TTS==0.21.1 -rvc-engine==0.0.1 -termcolor==2.3.0 --
import os import re -import g4f import json import time import random import tempfile import requests -import assemblyai as aai -from moviepy.editor import * +import numpy as np +from PIL import Image +from io import BytesIO from datetime import datetime import gradio as gr from dotenv import load_dotenv +import moviepy.editor as mpy +from moviepy.editor import * +from moviepy.audio.fx.all import volumex +from moviepy.video.fx.all import crop -# Load environment variables from .env file +# Load environment variables from .env file if present load_dotenv() # Constants CACHE_DIR = os.path.join(tempfile.gettempdir(), "yt_shorts_generator") +ASSETS_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "assets") +MUSIC_DIR = os.path.join(ASSETS_DIR, "background_music") +FONTS_DIR = os.path.join(ASSETS_DIR, "fonts") + +# Create necessary directories os.makedirs(CACHE_DIR, exist_ok=True) +os.makedirs(MUSIC_DIR, exist_ok=True) +os.makedirs(FONTS_DIR, exist_ok=True) -# Helper functions +# Helper functions for logging def info(message): - print(f"[INFO] {message}") - return f"[INFO] {message}" + timestamp = datetime.now().strftime("%H:%M:%S") + formatted_message = f"[{timestamp}] [INFO] {message}" + print(formatted_message) + return formatted_message def success(message): - print(f"[SUCCESS] {message}") - return f"[SUCCESS] {message}" + timestamp = datetime.now().strftime("%H:%M:%S") + formatted_message = f"[{timestamp}] [SUCCESS] {message}" + print(formatted_message) + return formatted_message def warning(message): - print(f"[WARNING] {message}") - return f"[WARNING] {message}" + timestamp = datetime.now().strftime("%H:%M:%S") + formatted_message = f"[{timestamp}] [WARNING] {message}" + print(formatted_message) + return formatted_message def error(message): - print(f"[ERROR] {message}") - return f"[ERROR] {message}" + timestamp = datetime.now().strftime("%H:%M:%S") + formatted_message = f"[{timestamp}] [ERROR] {message}" + print(formatted_message) + return formatted_message + +def choose_random_music(): + """Selects a random music file from the music directory.""" + if not os.path.exists(MUSIC_DIR): + error(f"Music directory {MUSIC_DIR} does not exist") + return None + + music_files = [f for f in os.listdir(MUSIC_DIR) if f.endswith(('.mp3', '.wav'))] + if not music_files: + warning(f"No music files found in {MUSIC_DIR}") + return None + + return os.path.join(MUSIC_DIR, random.choice(music_files)) class YouTube: - def __init__(self, niche, language, text_gen="gemini", image_gen="prodia", tts_engine="elevenlabs", - tts_voice="Sarah", subtitle_font="Helvetica-Bold", font_size=80, - text_color="white", highlight_color="blue", api_keys=None): + def __init__(self, niche: str, language: str, + text_gen="gemini", text_model="gemini-2.0-flash", + image_gen="prodia", image_model="sdxl", + tts_engine="elevenlabs", tts_voice="Sarah", + subtitle_font="Helvetica-Bold", font_size=80, + text_color="white", highlight_color="blue", + api_keys=None, progress=gr.Progress()) -> None: + + """Initialize the YouTube Shorts Generator.""" + self.progress = progress + self.progress(0, desc="Initializing") + + # Store basic parameters info(f"Initializing YouTube class") self._niche = niche self._language = language self.text_gen = text_gen + self.text_model = text_model self.image_gen = image_gen + self.image_model = image_model self.tts_engine = tts_engine self.tts_voice = tts_voice self.subtitle_font = subtitle_font @@ -329,55 +99,109 @@ class YouTube: self.images = [] self.logs = [] - # Set API keys + # Set API keys from parameters or environment variables if 'gemini' in self.api_keys and self.api_keys['gemini']: os.environ["GEMINI_API_KEY"] = self.api_keys['gemini'] + if 'assemblyai' in self.api_keys and self.api_keys['assemblyai']: os.environ["ASSEMBLYAI_API_KEY"] = self.api_keys['assemblyai'] + if 'elevenlabs' in self.api_keys and self.api_keys['elevenlabs']: os.environ["ELEVENLABS_API_KEY"] = self.api_keys['elevenlabs'] + if 'segmind' in self.api_keys and self.api_keys['segmind']: os.environ["SEGMIND_API_KEY"] = self.api_keys['segmind'] + + if 'openai' in self.api_keys and self.api_keys['openai']: + os.environ["OPENAI_API_KEY"] = self.api_keys['openai'] info(f"Niche: {niche}, Language: {language}") self.log(f"Initialized with niche: {niche}, language: {language}") - self.log(f"Text generator: {text_gen}, Image generator: {image_gen}, TTS engine: {tts_engine}") + self.log(f"Text generator: {text_gen} - Model: {text_model}") + self.log(f"Image generator: {image_gen} - Model: {image_model}") + self.log(f"TTS engine: {tts_engine} - Voice: {tts_voice}") def log(self, message): - """Add a log message to the logs list""" + """Add a log message to the logs list.""" timestamp = datetime.now().strftime("%H:%M:%S") log_entry = f"[{timestamp}] {message}" self.logs.append(log_entry) return log_entry @property - def niche(self): + def niche(self) -> str: return self._niche @property - def language(self): + def language(self) -> str: return self._language - def generate_response(self, prompt, model=None): + def generate_response(self, prompt: str, model: str = None) -> str: + """Generate a response using the selected text generation model.""" self.log(f"Generating response for prompt: {prompt[:50]}...") - if self.text_gen == "gemini": - self.log("Using Google's Gemini model") - import google.generativeai as genai - genai.configure(api_key=os.environ.get("GEMINI_API_KEY", "")) - model = genai.GenerativeModel('gemini-2.0-flash') - response = model.generate_content(prompt).text - else: - model_name = model if model else "gpt-3.5-turbo" - self.log(f"Using G4F model: {model_name}") - response = g4f.ChatCompletion.create( - model=model_name, - messages=[{"role": "user", "content": prompt}] - ) - self.log(f"Response generated successfully, length: {len(response)} characters") - return response + + try: + if self.text_gen == "gemini": + self.log("Using Google's Gemini model") + + # Check if API key is set + gemini_api_key = os.environ.get("GEMINI_API_KEY", "") + if not gemini_api_key: + raise ValueError("Gemini API key is not set. Please provide a valid API key.") + + import google.generativeai as genai + genai.configure(api_key=gemini_api_key) + model_to_use = model if model else self.text_model + genai_model = genai.GenerativeModel(model_to_use) + response = genai_model.generate_content(prompt).text + + elif self.text_gen == "g4f": + self.log("Using G4F for text generation") + import g4f + model_to_use = model if model else "gpt-3.5-turbo" + self.log(f"Using G4F model: {model_to_use}") + response = g4f.ChatCompletion.create( + model=model_to_use, + messages=[{"role": "user", "content": prompt}] + ) + + elif self.text_gen == "openai": + self.log("Using OpenAI for text generation") + openai_api_key = os.environ.get("OPENAI_API_KEY", "") + if not openai_api_key: + raise ValueError("OpenAI API key is not set. Please provide a valid API key.") + + from openai import OpenAI + client = OpenAI(api_key=openai_api_key) + model_to_use = model if model else "gpt-3.5-turbo" + + response = client.chat.completions.create( + model=model_to_use, + messages=[{"role": "user", "content": prompt}] + ).choices[0].message.content + + else: + # Default to g4f if other methods aren't available + self.log(f"Using default G4F model as fallback") + import g4f + response = g4f.ChatCompletion.create( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": prompt}] + ) + + self.log(f"Response generated successfully, length: {len(response)} characters") + return response + + except Exception as e: + error_msg = f"Error generating response: {str(e)}" + self.log(error_msg) + raise Exception(error_msg) - def generate_topic(self): + def generate_topic(self) -> str: + """Generate a topic based on the YouTube Channel niche.""" + self.progress(0.05, desc="Generating topic") self.log("Generating topic based on niche") + completion = self.generate_response( f"Please generate a specific video idea that takes about the following topic: {self.niche}. " f"Make it exactly one sentence. Only return the topic, nothing else." @@ -385,14 +209,17 @@ class YouTube: if not completion: self.log(error("Failed to generate Topic.")) - return None + raise Exception("Failed to generate a topic. Please try again with a different niche.") self.subject = completion self.log(success(f"Generated topic: {completion}")) return completion - def generate_script(self): + def generate_script(self) -> str: + """Generate a script for a video, based on the subject and language.""" + self.progress(0.1, desc="Creating script") self.log("Generating script for video") + prompt = f""" Generate a script for youtube shorts video, depending on the subject of the video. @@ -421,7 +248,7 @@ class YouTube: if not completion: self.log(error("The generated script is empty.")) - return None + raise Exception("Failed to generate a script. Please try again.") if len(completion) > 5000: self.log(warning("Generated Script is too long. Retrying...")) @@ -431,8 +258,11 @@ class YouTube: self.log(success(f"Generated script ({len(completion)} chars)")) return completion - def generate_metadata(self): + def generate_metadata(self) -> dict: + """Generate video metadata (title, description).""" + self.progress(0.15, desc="Creating title and description") self.log("Generating metadata (title and description)") + title = self.generate_response( f"Please generate a YouTube Video Title for the following subject, including hashtags: " f"{self.subject}. Only return the title, nothing else. Limit the title under 100 characters." @@ -453,11 +283,14 @@ class YouTube: } self.log(success(f"Generated title: {title}")) - self.log(success(f"Generated description: {description}")) + self.log(success(f"Generated description: {description[:50]}...")) return self.metadata - def generate_prompts(self, count=5): + def generate_prompts(self, count=5) -> list: + """Generate AI Image Prompts based on the provided Video Script.""" + self.progress(0.2, desc="Creating image prompts") self.log(f"Generating {count} image prompts") + prompt = f""" Generate {count} Image Prompts for AI Image Generation, depending on the subject of a video. @@ -491,231 +324,422 @@ class YouTube: image_prompts = [] if "image_prompts" in completion: - image_prompts = json.loads(completion)["image_prompts"] - else: + try: + image_prompts = json.loads(completion)["image_prompts"] + except: + self.log(warning("Failed to parse 'image_prompts' from JSON response.")) + + if not image_prompts: try: image_prompts = json.loads(completion) - self.log(f"Generated Image Prompts: {image_prompts}") + self.log(f"Parsed image prompts from JSON response.") except Exception: - self.log(warning("GPT returned an unformatted response. Attempting to clean...")) + self.log(warning("JSON parsing failed. Attempting to extract array using regex...")) # Get everything between [ and ], and turn it into a list r = re.compile(r"\[.*\]", re.DOTALL) matches = r.findall(completion) if len(matches) == 0: - self.log(warning("Failed to generate Image Prompts. Retrying...")) - return self.generate_prompts(count) - - try: - image_prompts = json.loads(matches[0]) - except: - self.log(error("Failed to parse image prompts JSON")) - # Try a fallback approach - create some generic prompts + self.log(warning("Failed to extract array. Creating generic image prompts.")) + # Create generic prompts based on the subject image_prompts = [ - f"A beautiful image showing {self.subject}", - f"A detailed visualization of {self.subject}", - f"An artistic representation of {self.subject}", - f"A photorealistic image about {self.subject}", - f"A dramatic scene related to {self.subject}" + f"A beautiful image showing {self.subject}, photorealistic", + f"A detailed visualization of {self.subject}, high quality", + f"An artistic representation of {self.subject}, vibrant colors", + f"A photorealistic image about {self.subject}, high resolution", + f"A dramatic scene related to {self.subject}, cinema quality" ] + else: + try: + image_prompts = json.loads(matches[0]) + except: + self.log(error("Failed to parse array from regex match.")) + # Use regex to extract individual strings + string_pattern = r'"([^"]*)"' + strings = re.findall(string_pattern, matches[0]) + if strings: + image_prompts = strings + else: + # Last resort - split by commas and clean up + image_prompts = [ + s.strip().strip('"').strip("'") + for s in matches[0].strip('[]').split(',') + ] - self.image_prompts = image_prompts[:count] # Limit to requested count + # Ensure we have the requested number of prompts + while len(image_prompts) < count: + image_prompts.append(f"A high-quality image about {self.subject}") + + # Limit to the requested count + image_prompts = image_prompts[:count] + + self.image_prompts = image_prompts self.log(success(f"Generated {len(self.image_prompts)} Image Prompts")) for i, prompt in enumerate(self.image_prompts): self.log(f"Image Prompt {i+1}: {prompt}") - return self.image_prompts + + return image_prompts - def generate_image(self, prompt): + def generate_image(self, prompt) -> str: + """Generate an image using the selected image generation model.""" self.log(f"Generating image for prompt: {prompt[:50]}...") - if self.image_gen == "prodia": - self.log("Using Prodia provider for image generation") - s = requests.Session() - headers = { - "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" - } - - # Generate job - self.log("Sending generation request to Prodia API") - resp = s.get( - "https://api.prodia.com/generate", - params={ - "new": "true", - "prompt": prompt, - "model": "sdxl", # Default model - "negative_prompt": "verybadimagenegative_v1.3", - "steps": "20", - "cfg": "7", - "seed": random.randint(1, 10000), - "sample": "DPM++ 2M Karras", - "aspect_ratio": "square" - }, - headers=headers - ) + try: + image_path = os.path.join(CACHE_DIR, f"img_{len(self.images)}_{int(time.time())}.png") - job_id = resp.json()['job'] - self.log(f"Job created with ID: {job_id}") + if self.image_gen == "prodia": + self.log("Using Prodia provider for image generation") + s = requests.Session() + headers = { + "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" + } + + # Generate job + self.log("Sending generation request to Prodia API") + resp = s.get( + "https://api.prodia.com/generate", + params={ + "new": "true", + "prompt": prompt, + "model": self.image_model, + "negative_prompt": "verybadimagenegative_v1.3", + "steps": "20", + "cfg": "7", + "seed": random.randint(1, 10000), + "sample": "DPM++ 2M Karras", + "aspect_ratio": "square" + }, + headers=headers + ) + + if resp.status_code != 200: + raise Exception(f"Prodia API error: {resp.text}") + + job_id = resp.json()['job'] + self.log(f"Job created with ID: {job_id}") + + # Wait for generation to complete + max_attempts = 30 + attempts = 0 + while attempts < max_attempts: + attempts += 1 + time.sleep(2) + status = s.get(f"https://api.prodia.com/job/{job_id}", headers=headers).json() + + if status["status"] == "succeeded": + self.log("Image generation successful, downloading result") + img_data = s.get(f"https://images.prodia.xyz/{job_id}.png?download=1", headers=headers).content + with open(image_path, "wb") as f: + f.write(img_data) + self.images.append(image_path) + self.log(success(f"Image saved to: {image_path}")) + return image_path + + elif status["status"] == "failed": + raise Exception(f"Prodia job failed: {status.get('error', 'Unknown error')}") + + # Still processing + self.log(f"Still processing, attempt {attempts}/{max_attempts}...") + + raise Exception("Prodia job timed out") - # For demo purposes, simulate waiting - self.log("Waiting for image generation to complete...") - time.sleep(3) # Simulate API call + elif self.image_gen == "hercai": + self.log("Using Hercai provider for image generation") + url = f"https://hercai.onrender.com/{self.image_model}/text2image?prompt={prompt}" + r = requests.get(url) + + if r.status_code != 200: + raise Exception(f"Hercai API error: {r.text}") + + parsed = r.json() + if "url" in parsed and parsed["url"]: + self.log("Image URL received from Hercai") + image_url = parsed["url"] + img_data = requests.get(image_url).content + with open(image_path, "wb") as f: + f.write(img_data) + self.images.append(image_path) + self.log(success(f"Image saved to: {image_path}")) + return image_path + else: + raise Exception("No image URL in Hercai response") - # In a real implementation we would poll until completion - # For demo, we'll just create a placeholder image - image_path = os.path.join(CACHE_DIR, f"image_{len(self.images)}.png") + elif self.image_gen == "g4f": + self.log("Using G4F provider for image generation") + try: + from g4f.client import Client + client = Client() + response = client.images.generate( + model=self.image_model, + prompt=prompt, + response_format="url" + ) + + if response and response.data and len(response.data) > 0: + image_url = response.data[0].url + image_response = requests.get(image_url) + + if image_response.status_code == 200: + with open(image_path, "wb") as f: + f.write(image_response.content) + self.images.append(image_path) + self.log(success(f"Image saved to: {image_path}")) + return image_path + else: + raise Exception(f"Failed to download image from {image_url}") + else: + raise Exception("No image URL received from G4F") + except Exception as e: + raise Exception(f"G4F image generation failed: {str(e)}") - # Since we can't actually generate a real image, for demonstration we'll - # return a simple example URL that would be the result in a real implementation - image_url = "https://images.unsplash.com/photo-1579546929518-9e396f3cc809" - self.log(success(f"Image generated and saved (placeholder for demo)")) - self.images.append(image_url) - return image_url - - elif self.image_gen == "hercai": - self.log("Using Hercai provider for image generation") - # For demo purposes, simulate API call - time.sleep(2) - image_url = "https://images.unsplash.com/photo-1513151233558-d860c5398176" - self.log(success(f"Image generated and saved (placeholder for demo)")) - self.images.append(image_url) - return image_url + elif self.image_gen == "segmind": + self.log("Using Segmind provider for image generation") + api_key = os.environ.get("SEGMIND_API_KEY", "") + if not api_key: + raise ValueError("Segmind API key is not set. Please provide a valid API key.") + + headers = { + "x-api-key": api_key, + "Content-Type": "application/json" + } + + response = requests.post( + "https://api.segmind.com/v1/sdxl-turbo", + json={ + "prompt": prompt, + "negative_prompt": "blurry, low quality, distorted face, text, watermark", + "samples": 1, + "size": "1024x1024", + "guidance_scale": 1.0 + }, + headers=headers + ) + + if response.status_code == 200: + with open(image_path, "wb") as f: + f.write(response.content) + self.images.append(image_path) + self.log(success(f"Image saved to: {image_path}")) + return image_path + else: + raise Exception(f"Segmind request failed: {response.status_code} {response.text}") - elif self.image_gen == "segmind": - self.log("Using Segmind provider for image generation") - # For demo purposes, simulate API call - time.sleep(2) - image_url = "https://images.unsplash.com/photo-1618005182384-a83a8bd57fbe" - self.log(success(f"Image generated and saved (placeholder for demo)")) - self.images.append(image_url) - return image_url + elif self.image_gen == "pollinations": + self.log("Using Pollinations provider for image generation") + response = requests.get(f"https://image.pollinations.ai/prompt/{prompt}{random.randint(1,10000)}") + + if response.status_code == 200: + self.log("Image received from Pollinations") + with open(image_path, "wb") as f: + f.write(response.content) + self.images.append(image_path) + self.log(success(f"Image saved to: {image_path}")) + return image_path + else: + raise Exception(f"Pollinations request failed with status code: {response.status_code}") - elif self.image_gen == "pollinations": - self.log("Using Pollinations provider for image generation") - # For demo purposes, simulate API call - time.sleep(2) - image_url = "https://images.unsplash.com/photo-1550859492-d5da9d8e45f3" - self.log(success(f"Image generated and saved (placeholder for demo)")) - self.images.append(image_url) - return image_url + else: + # Default to generating a colored placeholder image + self.log(f"Unknown provider '{self.image_gen}'. Generating placeholder image.") + + # Create a placeholder colored image with the prompt text + img = Image.new('RGB', (800, 800), color=(random.randint(0, 255), + random.randint(0, 255), + random.randint(0, 255))) + img.save(image_path) + self.images.append(image_path) + self.log(warning(f"Created placeholder image at: {image_path}")) + return image_path + + except Exception as e: + error_msg = f"Image generation failed: {str(e)}" + self.log(error(error_msg)) - else: # Default or g4f - self.log("Using default provider for image generation") - # For demo purposes, simulate API call - time.sleep(2) - image_url = "https://images.unsplash.com/photo-1541701494587-cb58502866ab" - self.log(success(f"Image generated and saved (placeholder for demo)")) - self.images.append(image_url) - return image_url + # Create a fallback image + try: + img = Image.new('RGB', (800, 800), color=(200, 200, 200)) + image_path = os.path.join(CACHE_DIR, f"error_img_{len(self.images)}_{int(time.time())}.png") + img.save(image_path) + self.images.append(image_path) + self.log(warning(f"Created error placeholder image at: {image_path}")) + return image_path + except: + # If all else fails, return None and handle it gracefully + return None - def generate_speech(self, text, output_format='mp3'): + def generate_speech(self, text, output_format='mp3') -> str: + """Generate speech from text using the selected TTS engine.""" + self.progress(0.6, desc="Creating voiceover") self.log("Generating speech from text") # Clean text - text = re.sub(r'[^\w\s.?!]', '', text) + text = re.sub(r'[^\w\s.?!,;:\'"-]', '', text) self.log(f"Using TTS Engine: {self.tts_engine}, Voice: {self.tts_voice}") - audio_path = os.path.join(CACHE_DIR, f"speech.{output_format}") + audio_path = os.path.join(CACHE_DIR, f"speech_{int(time.time())}.{output_format}") - if self.tts_engine == "elevenlabs": - self.log("Using ElevenLabs provider for speech generation") - # For demo purposes, we'll just simulate the API call - self.log("Simulating ElevenLabs API call (would use real API in production)") - time.sleep(3) # Simulate API call - self.tts_path = audio_path - return audio_path + try: + if self.tts_engine == "elevenlabs": + self.log("Using ElevenLabs provider for speech generation") + elevenlabs_api_key = os.environ.get("ELEVENLABS_API_KEY", "") + if not elevenlabs_api_key: + raise ValueError("ElevenLabs API key is not set. Please provide a valid API key.") - elif self.tts_engine == 'bark': - self.log("Using Bark provider for speech generation") - # For demo purposes, simulate API call - time.sleep(3) - self.tts_path = audio_path - return audio_path - - elif self.tts_engine == "gtts": - self.log("Using Google TTS provider for speech generation") - # For demo purposes, simulate API call - time.sleep(2) - self.tts_path = audio_path - return audio_path + headers = { + "Accept": "audio/mpeg", + "Content-Type": "application/json", + "xi-api-key": elevenlabs_api_key + } + + payload = { + "text": text, + "model_id": "eleven_monolingual_v1", + "voice_settings": { + "stability": 0.5, + "similarity_boost": 0.5, + "style": 0.0, + "use_speaker_boost": True + } + } + + voice_id = self.tts_voice if self.tts_voice not in ["Sarah", "default"] else "21m00Tcm4TlvDq8ikWAM" + + response = requests.post( + url=f"https://api.elevenlabs.io/v1/text-to-speech/{voice_id}", + json=payload, + headers=headers + ) + + if response.status_code == 200: + with open(audio_path, 'wb') as f: + f.write(response.content) + self.log(success(f"Speech generated successfully using ElevenLabs at {audio_path}")) + else: + raise Exception(f"ElevenLabs API error: {response.text}") + + elif self.tts_engine == "gtts": + self.log("Using Google TTS provider for speech generation") + from gtts import gTTS + tts = gTTS(text=text, lang=self.language[:2].lower(), slow=False) + tts.save(audio_path) + + elif self.tts_engine == "openai": + self.log("Using OpenAI provider for speech generation") + openai_api_key = os.environ.get("OPENAI_API_KEY", "") + if not openai_api_key: + raise ValueError("OpenAI API key is not set. Please provide a valid API key.") + + from openai import OpenAI + client = OpenAI(api_key=openai_api_key) + + voice = self.tts_voice if self.tts_voice else "alloy" + response = client.audio.speech.create( + model="tts-1", + voice=voice, + input=text + ) + response.stream_to_file(audio_path) + + elif self.tts_engine == "edge": + self.log("Using Edge TTS provider for speech generation") + import edge_tts + import asyncio + + voice = self.tts_voice if self.tts_voice else "en-US-AriaNeural" + + async def generate(): + communicate = edge_tts.Communicate(text, voice) + await communicate.save(audio_path) + + asyncio.run(generate()) - elif self.tts_engine == "openai": - self.log("Using OpenAI provider for speech generation") - # For demo purposes, simulate API call - time.sleep(3) - self.tts_path = audio_path - return audio_path + else: + # Fallback to gtts + self.log(f"Unknown TTS engine '{self.tts_engine}'. Falling back to gTTS.") + from gtts import gTTS + tts = gTTS(text=text, lang=self.language[:2].lower(), slow=False) + tts.save(audio_path) - elif self.tts_engine == "edge": - self.log("Using Edge TTS provider for speech generation") - # For demo purposes, simulate API call - time.sleep(2) + self.log(success(f"Speech generated and saved to: {audio_path}")) self.tts_path = audio_path return audio_path - else: - self.log(f"Using default TTS engine (would use {self.tts_engine} in production)") - # For demo purposes, simulate API call - time.sleep(2) - self.tts_path = audio_path - return audio_path + except Exception as e: + error_msg = f"Speech generation failed: {str(e)}" + self.log(error(error_msg)) - self.log(success(f"Speech generated and saved to: {audio_path}")) - self.tts_path = audio_path - return audio_path + # Create a silent audio file as fallback + try: + from pydub import AudioSegment + from pydub.generators import Sine + + # Generate 30 seconds of silence + silence = AudioSegment.silent(duration=30000) + silence.export(audio_path, format=output_format) + + self.log(warning(f"Created silent audio fallback at: {audio_path}")) + self.tts_path = audio_path + return audio_path + except: + self.log(error("Failed to create silent audio fallback")) + return None def generate_subtitles(self, audio_path): - self.log("Generating word-level subtitles for video") - - # Define constants - FONT = self.subtitle_font - FONTSIZE = self.font_size - COLOR = self.text_color - BG_COLOR = self.highlight_color - FRAME_SIZE = (1080, 1920) - MAX_CHARS = 30 - MAX_DURATION = 3.0 - MAX_GAP = 2.5 + """Generate word-level subtitles for the video.""" + self.progress(0.65, desc="Creating subtitles") + self.log("Starting subtitle generation process") try: - # In a real implementation, we would use AssemblyAI to transcribe - self.log("In a production environment, this would use AssemblyAI for transcription") + assemblyai_api_key = os.environ.get("ASSEMBLYAI_API_KEY", "") - # For demo purposes, we'll simulate the word-level data - self.log("Simulating transcription with word-level timing") - words = self.script.split() - total_duration = 60 # Assume 60 seconds for demo - avg_word_duration = total_duration / len(words) + if not assemblyai_api_key: + self.log(warning("AssemblyAI API key not set. Generating simulated subtitles.")) + return self._generate_simulated_subtitles() - wordlevel_info = [] - current_time = 0 + import assemblyai as aai + aai.settings.api_key = assemblyai_api_key + + config = aai.TranscriptionConfig(speaker_labels=False, word_boost=[], format_text=True) + transcriber = aai.Transcriber(config=config) + + self.log("Submitting audio for transcription") + transcript = transcriber.transcribe(audio_path) - for word in words: - # Calculate a slightly randomized duration based on word length - word_duration = avg_word_duration * (0.5 + (len(word) / 10)) + if not transcript or not transcript.words: + self.log(warning("Transcription returned no words. Using simulated subtitles.")) + return self._generate_simulated_subtitles() + # Process word-level information + wordlevel_info = [] + for word in transcript.words: word_data = { - "word": word.strip(), - "start": current_time, - "end": current_time + word_duration + "word": word.text.strip(), + "start": word.start / 1000.0, + "end": word.end / 1000.0 } wordlevel_info.append(word_data) - current_time += word_duration - self.log(success(f"Generated word-level timing for {len(wordlevel_info)} words")) + self.log(success(f"Transcription successful. Got {len(wordlevel_info)} words.")) + + # Define constants for subtitle generation + FONT = self.subtitle_font + FONTSIZE = self.font_size + COLOR = self.text_color + BG_COLOR = self.highlight_color + FRAME_SIZE = (1080, 1920) + MAX_CHARS = 30 + MAX_DURATION = 3.0 + MAX_GAP = 2.5 - # Process into line-level data (simplified for demo) + # Split text into lines based on character count, duration, and gap subtitles = [] line = [] line_duration = 0 - - for idx, word_data in enumerate(wordlevel_info): - word = word_data["word"] - start = word_data["start"] - end = word_data["end"] + for idx, word_data in enumerate(wordlevel_info): line.append(word_data) - line_duration += end - start + line_duration += word_data["end"] - word_data["start"] temp = " ".join(item["word"] for item in line) new_line_chars = len(temp) duration_exceeded = line_duration > MAX_DURATION @@ -751,365 +775,683 @@ class YouTube: subtitles.append(subtitle_line) self.log(success(f"Generated {len(subtitles)} subtitle lines")) - - # In a real implementation, we would create TextClips for MoviePy - # For the demo, we'll just return the subtitle data return { "wordlevel": wordlevel_info, "linelevel": subtitles } - + except Exception as e: - self.log(error(f"Subtitle generation failed: {str(e)}")) - return None - - def combine(self): - self.log("Combining images and audio into final video") - - # For demonstration purposes, we're simulating the video creation process - combined_video_path = os.path.join(CACHE_DIR, "output.mp4") - - # In a real implementation, this would: - # 1. Create ImageClips from each image - # 2. Create an audio clip from the speech - # 3. Add background music - # 4. Add word-level subtitles - # 5. Combine everything into a final video - - self.log("This would create a vertical (9:16) video with:") - self.log(f"- {len(self.images)} images as a slideshow") - self.log("- TTS audio as the main audio track") - self.log("- Background music at low volume") - self.log("- Word-level subtitles that highlight as words are spoken") - - # For demo purposes, simulate video processing - self.log("Processing video (simulated for demo)...") - time.sleep(3) - - success_msg = f"Video successfully created at: {combined_video_path}" - self.log(success(success_msg)) - self.video_path = combined_video_path - - # For the demo, we'll return a mock result - return { - 'video_path': combined_video_path, - 'images': self.images, - 'audio_path': self.tts_path, - 'metadata': self.metadata - } - - def generate_video(self): - """Generate complete video with all components""" - self.log("Starting video generation process") - - # Step 1: Generate topic - self.log("Generating topic") - self.generate_topic() + error_msg = f"Subtitle generation failed: {str(e)}" + self.log(error(error_msg)) + return self._generate_simulated_subtitles() + + def _generate_simulated_subtitles(self): + """Generate simulated subtitles when AssemblyAI is not available.""" + self.log("Generating simulated subtitles") - # Step 2: Generate script - self.log("Generating script") - self.generate_script() + # Split script into words + words = self.script.split() - # Step 3: Generate metadata - self.log("Generating metadata") - self.generate_metadata() + # Estimate audio duration based on word count (average speaking rate) + estimated_duration = len(words) * 0.3 # 0.3 seconds per word on average - # Step 4: Generate image prompts - self.log("Generating image prompts") - self.generate_prompts() + # Generate word-level timings + wordlevel_info = [] + current_time = 0 - # Step 5: Generate images - self.log("Generating images") - for i, prompt in enumerate(self.image_prompts, 1): - self.log(f"Generating image {i}/{len(self.image_prompts)}") - self.generate_image(prompt) + for word in words: + # Adjust duration based on word length + word_duration = 0.2 + min(0.05 * len(word), 0.3) # Between 0.2 and 0.5 seconds + + word_data = { + "word": word, + "start": current_time, + "end": current_time + word_duration + } + wordlevel_info.append(word_data) + + # Add a small gap between words + current_time += word_duration + 0.05 - # Step 6: Generate speech - self.log("Generating speech") - self.generate_speech(self.script) + # Generate line-level subtitles + subtitles = [] + line = [] + line_start = 0 + line_text = "" - # Step 7: Generate subtitles - self.log("Generating subtitles") - self.generate_subtitles(self.tts_path) + for word_data in wordlevel_info: + # Check if adding this word would exceed character limit + if len(line_text + " " + word_data["word"]) > 30 and line: + # Finalize current line + subtitle_line = { + "text": line_text, + "start": line_start, + "end": line[-1]["end"], + "words": line.copy() + } + subtitles.append(subtitle_line) + + # Start new line + line = [word_data] + line_start = word_data["start"] + line_text = word_data["word"] + else: + # Add word to current line + line.append(word_data) + line_text = (line_text + " " + word_data["word"]).strip() + if len(line) == 1: + line_start = word_data["start"] - # Step 8: Combine all elements into final video - self.log("Combining all elements into final video") - result = self.combine() + # Add final line if not empty + if line: + subtitle_line = { + "text": line_text, + "start": line_start, + "end": line[-1]["end"], + "words": line + } + subtitles.append(subtitle_line) - self.log(f"Video generation complete.") + self.log(success(f"Generated {len(wordlevel_info)} simulated word timings and {len(subtitles)} subtitle lines")) return { - 'video_path': result['video_path'], - 'images': result['images'], - 'audio_path': self.tts_path, - 'title': self.metadata['title'], - 'description': self.metadata['description'], - 'subject': self.subject, - 'script': self.script, - 'logs': self.logs + "wordlevel": wordlevel_info, + "linelevel": subtitles } -# Gradio interface -def create_youtube_short(niche, language, gemini_api_key="", assemblyai_api_key="", - elevenlabs_api_key="", segmind_api_key="", text_gen="gemini", - image_gen="prodia", tts_engine="elevenlabs", tts_voice="Sarah", - subtitle_font="Helvetica-Bold", font_size=80, text_color="white", - highlight_color="blue"): - - # Create API keys dictionary - api_keys = { - 'gemini': gemini_api_key, - 'assemblyai': assemblyai_api_key, - 'elevenlabs': elevenlabs_api_key, - 'segmind': segmind_api_key - } - - # Initialize YouTube class - yt = YouTube( - niche=niche, - language=language, - text_gen=text_gen, - image_gen=image_gen, - tts_engine=tts_engine, - tts_voice=tts_voice, - subtitle_font=subtitle_font, - font_size=font_size, - text_color=text_color, - highlight_color=highlight_color, - api_keys=api_keys - ) - - # Generate video - result = yt.generate_video() - - # In a real implementation we would return the actual video file - # For demo, we'll just simulate it with a placeholder - demo_video = "https://sample-videos.com/video123/mp4/720/big_buck_bunny_720p_1mb.mp4" - - # Return all the relevant information for the UI - return { - "video": demo_video, - "title": result['title'], - "description": result['description'], - "script": result['script'], - "logs": "\n".join(result['logs']) - } + def combine(self) -> str: + """Combine images, audio, and subtitles into a final video.""" + self.progress(0.8, desc="Creating final video") + self.log("Combining images and audio into final video") + + try: + output_path = os.path.join(CACHE_DIR, f"output_{int(time.time())}.mp4") + + # Check for required files + if not self.images: + raise ValueError("No images available for video creation") + + if not hasattr(self, 'tts_path') or not self.tts_path or not os.path.exists(self.tts_path): + raise ValueError("No TTS audio file available") + + # Load audio + tts_clip = AudioFileClip(self.tts_path) + max_duration = tts_clip.duration + + # Calculate duration for each image + num_images = len(self.images) + req_dur = max_duration / num_images + + # Create video clips from images + clips = [] + tot_dur = 0 + + # Loop through images, repeating if necessary to fill audio duration + while tot_dur < max_duration: + for image_path in self.images: + # Check if image exists and is valid + if not os.path.exists(image_path): + self.log(warning(f"Image not found: {image_path}, skipping")) + continue + + try: + clip = ImageClip(image_path) + clip = clip.set_duration(req_dur) + clip = clip.set_fps(30) + + # Handle aspect ratio (vertical video for shorts) + aspect_ratio = 9/16 # Standard vertical video ratio + if clip.w / clip.h < aspect_ratio: + # Image is too tall, crop height + clip = crop( + clip, + width=clip.w, + height=round(clip.w / aspect_ratio), + x_center=clip.w / 2, + y_center=clip.h / 2 + ) + else: + # Image is too wide, crop width + clip = crop( + clip, + width=round(aspect_ratio * clip.h), + height=clip.h, + x_center=clip.w / 2, + y_center=clip.h / 2 + ) + + # Resize to standard size for shorts + clip = clip.resize((1080, 1920)) + clips.append(clip) + tot_dur += clip.duration + + # If we've exceeded the duration, break + if tot_dur >= max_duration: + break + except Exception as e: + self.log(warning(f"Error processing image {image_path}: {str(e)}")) + + # Create video from clips + self.log(f"Creating video from {len(clips)} clips") + final_clip = concatenate_videoclips(clips) + final_clip = final_clip.set_fps(30) + + # Add background music if available + background_music = choose_random_music() + if background_music and os.path.exists(background_music): + self.log(f"Adding background music: {background_music}") + try: + music_clip = AudioFileClip(background_music) + # Loop music if it's shorter than the video + if music_clip.duration < max_duration: + repeats = int(max_duration / music_clip.duration) + 1 + music_clip = concatenate_audioclips([music_clip] * repeats) + # Trim if it's longer + music_clip = music_clip.subclip(0, max_duration) + # Reduce volume + music_clip = music_clip.fx(volumex, 0.1) + + # Combine audio tracks + comp_audio = CompositeAudioClip([tts_clip, music_clip]) + final_clip = final_clip.set_audio(comp_audio) + except Exception as e: + self.log(warning(f"Error adding background music: {str(e)}")) + final_clip = final_clip.set_audio(tts_clip) + else: + self.log("No background music found, using TTS audio only") + final_clip = final_clip.set_audio(tts_clip) + + # Set final duration + final_clip = final_clip.set_duration(tts_clip.duration) + + # Generate subtitles if available + subtitle_clips = [] + subtitles = self.generate_subtitles(self.tts_path) + + if subtitles and 'wordlevel' in subtitles: + self.log("Adding word-level subtitles") + + from moviepy.video.tools.subtitles import TextClip + + # Define subtitle styles + font = self.subtitle_font if os.path.exists(os.path.join(FONTS_DIR, f"{self.subtitle_font}.ttf")) else None + fontsize = self.font_size + color = self.text_color + bg_color = self.highlight_color + + # Add subtitles as highlighted words + for subtitle in subtitles['linelevel']: + full_duration = subtitle['end'] - subtitle['start'] + + # Calculate position on screen (bottom centered) + frame_width, frame_height = 1080, 1920 + x_pos = 0 + y_pos = frame_height * 0.85 # Position at 85% of frame height + x_buffer = frame_width * 1 / 10 + + # Add each word with proper timing and highlighting + for word_data in subtitle['words']: + word = word_data['word'] + start = word_data['start'] + end = word_data['end'] + + # Create text clip for word + try: + word_clip = TextClip( + txt=word, + font=font, + fontsize=fontsize, + color=color, + bg_color=bg_color, + stroke_color='black', + stroke_width=1 + ).set_position((x_pos + x_buffer, y_pos)).set_start(start).set_duration(end - start) + + subtitle_clips.append(word_clip) + x_pos += word_clip.w + 10 # Add spacing between words + + # Wrap to next line if needed + if x_pos + word_clip.w > frame_width - 2 * x_buffer: + x_pos = 0 + y_pos += word_clip.h + 10 + except Exception as e: + self.log(warning(f"Error creating subtitle for word '{word}': {str(e)}")) + + # Add subtitles to video if any were created + if subtitle_clips: + self.log(f"Adding {len(subtitle_clips)} subtitle clips to video") + final_clip = CompositeVideoClip([final_clip] + subtitle_clips) + + # Write final video + self.log("Writing final video file") + final_clip.write_videofile(output_path, threads=4, codec='libx264', audio_codec='aac') + + success_msg = f"Video successfully created at: {output_path}" + self.log(success(success_msg)) + self.video_path = output_path + + return output_path + + except Exception as e: + error_msg = f"Error combining video: {str(e)}" + self.log(error(error_msg)) + + # Create a minimal fallback video if possible + try: + # Try to create a simple video with just the first image and audio + fallback_path = os.path.join(CACHE_DIR, f"fallback_{int(time.time())}.mp4") + + if self.images and os.path.exists(self.images[0]) and hasattr(self, 'tts_path') and os.path.exists(self.tts_path): + img_clip = ImageClip(self.images[0]).set_duration(10) + img_clip = img_clip.resize((1080, 1920)) + audio_clip = AudioFileClip(self.tts_path).subclip(0, min(10, AudioFileClip(self.tts_path).duration)) + video_clip = img_clip.set_audio(audio_clip) + video_clip.write_videofile(fallback_path, threads=2, codec='libx264', audio_codec='aac') + + self.log(warning(f"Created fallback video at: {fallback_path}")) + self.video_path = fallback_path + return fallback_path + else: + raise Exception("Cannot create fallback video: missing images or audio") + except Exception as fallback_error: + self.log(error(f"Failed to create fallback video: {str(fallback_error)}")) + return None -# Create Gradio app -with gr.Blocks() as demo: - gr.Markdown("# YouTube Shorts Generator") - gr.Markdown("Generate short videos based on a niche and language") - - with gr.Row(): - with gr.Column(scale=1): - with gr.Group(): - gr.Markdown("### Required Inputs") - niche = gr.Textbox(label="Niche/Topic", placeholder="E.g., Fitness tips, Technology facts") - language = gr.Dropdown( - choices=["English", "Spanish", "French", "German", "Italian", "Portuguese", - "Russian", "Japanese", "Chinese", "Hindi"], - label="Language", - value="English" - ) + def generate_video(self) -> dict: + """Generate complete video with all components.""" + try: + self.log("Starting video generation process") - with gr.Accordion("API Keys", open=False): - gemini_api_key = gr.Textbox(label="Gemini API Key", type="password") - assemblyai_api_key = gr.Textbox(label="AssemblyAI API Key", type="password") - elevenlabs_api_key = gr.Textbox(label="ElevenLabs API Key", type="password") - segmind_api_key = gr.Textbox(label="Segmind API Key", type="password") + # Step 1: Generate topic + self.log("Generating topic") + self.generate_topic() - with gr.Accordion("Model Selection", open=False): - text_gen = gr.Dropdown( - choices=["gemini", "g4f"], - label="Text Generator", - value="gemini" - ) - image_gen = gr.Dropdown( - choices=["prodia", "hercai", "g4f", "segmind", "pollinations"], - label="Image Generator", - value="prodia" - ) - tts_engine = gr.Dropdown( - choices=["elevenlabs", "bark", "gtts", "openai", "edge", "local_tts", "xtts", "rvc"], - label="Text-to-Speech Engine", - value="elevenlabs" - ) - tts_voice = gr.Textbox( - label="TTS Voice", - placeholder="E.g., Sarah, Brian, Lily, Monika Sogam", - value="Sarah" - ) + # Step 2: Generate script + self.progress(0.1, desc="Creating script") + self.log("Generating script") + self.generate_script() - with gr.Accordion("Subtitle Options", open=False): - subtitle_font = gr.Dropdown( - choices=["Helvetica-Bold", "Arial-Bold", "Impact", "Comic-Sans-MS"], - label="Font", - value="Helvetica-Bold" - ) - font_size = gr.Slider( - minimum=40, - maximum=120, - value=80, - step=5, - label="Font Size" - ) - with gr.Row(): - text_color = gr.ColorPicker(label="Text Color", value="#FFFFFF") - highlight_color = gr.ColorPicker(label="Highlight Color", value="#0000FF") + # Step 3: Generate metadata + self.progress(0.2, desc="Creating metadata") + self.log("Generating metadata") + self.generate_metadata() - generate_btn = gr.Button("Generate Video", variant="primary") - - with gr.Column(scale=1): - video_output = gr.Video(label="Generated Video") - title_output = gr.Textbox(label="Title") - description_output = gr.Textbox(label="Description", lines=3) - script_output = gr.Textbox(label="Script", lines=5) - log_output = gr.Textbox(label="Process Log", lines=10) - - # Set up the function to call when the generate button is clicked - generate_btn.click( - fn=create_youtube_short, - inputs=[ - niche, language, gemini_api_key, assemblyai_api_key, elevenlabs_api_key, - segmind_api_key, text_gen, image_gen, tts_engine, tts_voice, - subtitle_font, font_size, text_color, highlight_color + # Step 4: Generate image prompts + self.progress(0.3, desc="Creating image prompts") + self.log("Generating image prompts") + self.generate_prompts() + + # Step 5: Generate images + self.progress(0.4, desc="Generating images") + self.log("Generating images") + for i, prompt in enumerate(self.image_prompts, 1): + self.progress(0.4 + 0.2 * (i / len(self.image_prompts)), + desc=f"Generating image {i}/{len(self.image_prompts)}") + self.log(f"Generating image {i}/{len(self.image_prompts)}") + self.generate_image(prompt) + + # Step 6: Generate speech + self.progress(0.6, desc="Creating speech") + self.log("Generating speech") + self.generate_speech(self.script) + + # Step 7: Combine all elements into final video + self.progress(0.8, desc="Creating final video") + self.log("Combining all elements into final video") + path = self.combine() + + self.progress(0.95, desc="Finalizing") + self.log(f"Video generation complete. File saved at: {path}") + + # Return the result + return { + 'video_path': path, + 'title': self.metadata['title'], + 'description': self.metadata['description'], + 'subject': self.subject, + 'script': self.script, + 'logs': self.logs + } + + except Exception as e: + error_msg = f"Error during video generation: {str(e)}" + self.log(error(error_msg)) + raise Exception(error_msg) + +# Data for dynamic dropdowns +def get_text_generator_models(generator): + """Get available models for the selected text generator.""" + models = { + "gemini": [ + "gemini-2.0-flash", + "gemini-2.0-flash-lite", + "gemini-1.5-flash", + "gemini-1.5-flash-8b", + "gemini-1.5-pro" ], - outputs={ - "video": video_output, - "title": title_output, - "description": description_output, - "script": script_output, - "logs": log_output - } - ) + "g4f": [ + "gpt-3.5-turbo", + "gpt-4", + "gpt-4o", + "llama-3-70b-chat", + "claude-3-opus-20240229", + "claude-3-sonnet-20240229", + "claude-3-haiku-20240307" + ], + "openai": [ + "gpt-3.5-turbo", + "gpt-4-turbo", + "gpt-4o" + ] + } + return models.get(generator, ["default"]) -# Launch the app -if __name__ == "__main__": - demo.launch() --