import gradio as gr import os import subprocess import torch from TTS.api import TTS from deep_translator import GoogleTranslator import pysrt import whisper import webvtt import shutil import time from tqdm import tqdm from typing import Dict, List, Optional import logging # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Configuration LANGUAGES = { "English": {"code": "en", "speakers": ["default"], "whisper": "en"}, "Spanish": {"code": "es", "speakers": ["default"], "whisper": "es"}, "French": {"code": "fr", "speakers": ["default"], "whisper": "fr"}, "German": {"code": "de", "speakers": ["thorsten", "eva_k"], "whisper": "de"}, "Japanese": {"code": "ja", "speakers": ["default"], "whisper": "ja"}, "Hindi": {"code": "hi", "speakers": ["default"], "whisper": "hi"} } SUBTITLE_STYLES = { "Default": "", "White Text": "color: white;", "Yellow Text": "color: yellow;", "Large Text": "font-size: 24px;", "Bold Text": "font-weight: bold;", "Black Background": "background-color: black; padding: 5px;" } # Create output directory (relative path for Spaces) OUTPUT_DIR = "outputs" os.makedirs(OUTPUT_DIR, exist_ok=True) # Initialize TTS with error handling device = "cuda" if torch.cuda.is_available() else "cpu" tts_models = {} def load_tts_model(model_name: str, lang_code: str): try: tts = TTS(model_name).to(device) # Try to use gruut phonemizer if espeak fails if hasattr(tts.synthesizer, 'tts_config'): tts.synthesizer.tts_config.phonemizer = "gruut" return tts except Exception as e: logger.error(f"Failed to load {model_name}: {str(e)}") return None # Initialize models only when needed def get_tts_model(lang_code: str): if lang_code not in tts_models: model_map = { "en": "tts_models/en/ljspeech/tacotron2-DDC", "es": "tts_models/es/css10/vits", "fr": "tts_models/fr/css10/vits", "de": "tts_models/de/thorsten/vits", # Using VITS instead of tacotron2 "ja": "tts_models/ja/kokoro/tacotron2-DDC", "hi": "tts_models/hi/kb/tacotron2-DDC" } tts_models[lang_code] = load_tts_model(model_map[lang_code], lang_code) return tts_models[lang_code] # Initialize Whisper (load when needed) whisper_model = None def get_whisper_model(): global whisper_model if whisper_model is None: whisper_model = whisper.load_model("small") return whisper_model def extract_audio(video_path: str) -> str: """Extract audio using ffmpeg""" audio_path = os.path.join(OUTPUT_DIR, "audio.wav") cmd = [ 'ffmpeg', '-i', video_path, '-vn', '-acodec', 'pcm_s16le', '-ar', '16000', '-ac', '1', '-y', audio_path ] subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return audio_path def transcribe_with_whisper(audio_path: str, language: str = None) -> str: """Transcribe audio using Whisper""" model = get_whisper_model() result = model.transcribe(audio_path, language=language, word_timestamps=True) return result def generate_srt_from_whisper(audio_path: str, language: str) -> str: """Generate SRT subtitles from Whisper output""" result = transcribe_with_whisper(audio_path, language) subs = pysrt.SubRipFile() for i, segment in enumerate(result["segments"]): subs.append(pysrt.SubRipItem( index=i+1, start=pysrt.SubRipTime(seconds=segment["start"]), end=pysrt.SubRipTime(seconds=segment["end"]), text=segment["text"] )) srt_path = os.path.join(OUTPUT_DIR, "subtitles.srt") subs.save(srt_path, encoding='utf-8') return srt_path def detect_language(audio_path: str) -> str: """Detect language using Whisper""" result = transcribe_with_whisper(audio_path) detected_code = result["language"] for name, data in LANGUAGES.items(): if data["whisper"] == detected_code: return name return "English" def translate_subtitles(srt_path: str, target_langs: List[str]) -> Dict[str, str]: """Translate subtitles to multiple languages""" subs = pysrt.open(srt_path) results = {} for lang_name in target_langs: lang_code = LANGUAGES[lang_name]["code"] translated_subs = subs[:] translator = GoogleTranslator(source='auto', target=lang_code) for sub in translated_subs: try: sub.text = translator.translate(sub.text) except Exception as e: logger.warning(f"Translation failed: {str(e)}") continue output_path = os.path.join(OUTPUT_DIR, f"subtitles_{lang_code}.srt") translated_subs.save(output_path, encoding='utf-8') results[lang_code] = output_path return results def generate_webvtt_subtitles(srt_path: str, style: str = "") -> str: """Convert SRT to WebVTT with optional styling""" subs = pysrt.open(srt_path) lang_code = os.path.basename(srt_path).split('_')[-1].replace('.srt', '') vtt_path = os.path.join(OUTPUT_DIR, f"subtitles_{lang_code}.vtt") with open(vtt_path, 'w', encoding='utf-8') as f: f.write("WEBVTT\n\n") if style: f.write(f"STYLE\n::cue {{\n{style}\n}}\n\n") for sub in subs: start = sub.start.to_time().strftime('%H:%M:%S.%f')[:-3] end = sub.end.to_time().strftime('%H:%M:%S.%f')[:-3] f.write(f"{start} --> {end}\n") f.write(f"{sub.text}\n\n") return vtt_path def generate_translated_audio( srt_path: str, target_lang: str, speaker: str = "default" ) -> str: """Generate translated audio using TTS""" subs = pysrt.open(srt_path) temp_dir = os.path.join(OUTPUT_DIR, f"temp_audio_{target_lang}") os.makedirs(temp_dir, exist_ok=True) audio_files = [] timings = [] tts = get_tts_model(target_lang) if tts is None: raise Exception(f"TTS model for {target_lang} not available") for i, sub in enumerate(tqdm(subs, desc=f"Generating {target_lang} audio")): text = sub.text.strip() if not text: continue start_time = sub.start.ordinal / 1000 audio_file = os.path.join(temp_dir, f"chunk_{i:04d}.wav") try: kwargs = {"speaker": speaker} if speaker != "default" and hasattr(tts, 'synthesizer') else {} tts.tts_to_file(text=text, file_path=audio_file, **kwargs) audio_files.append(audio_file) timings.append((start_time, audio_file)) except Exception as e: logger.warning(f"TTS failed: {str(e)}") if not audio_files: raise Exception("No audio generated") # Create silent audio video_duration = get_video_duration(os.path.join(OUTPUT_DIR, "base_video.mp4")) silence_file = os.path.join(temp_dir, "silence.wav") subprocess.run([ 'ffmpeg', '-f', 'lavfi', '-i', 'anullsrc=r=44100:cl=stereo', '-t', str(video_duration), '-y', silence_file ], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # Mix audio filter_complex = "[0:a]" + "".join( f"[{i+1}:a]adelay={int(start*1000)}|{int(start*1000)}[a{i}];" + f"[a{i-1 if i>0 else 'out'}]" + f"[a{i}]amix=inputs=2[aout]" for i, (start, _) in enumerate(timings) ) cmd = ['ffmpeg', '-y', '-i', silence_file] + \ [f'-i {f}' for f in audio_files] + [ '-filter_complex', filter_complex, '-map', '[aout]', os.path.join(OUTPUT_DIR, f"translated_audio_{target_lang}.wav")] subprocess.run(' '.join(cmd), shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) shutil.rmtree(temp_dir) return os.path.join(OUTPUT_DIR, f"translated_audio_{target_lang}.wav") def get_video_duration(video_path: str) -> float: """Get video duration in seconds""" result = subprocess.run([ 'ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', video_path ], capture_output=True, text=True) return float(result.stdout.strip() or 180) def create_html_player( video_path: str, subtitle_paths: Dict[str, str], style: str = "" ) -> str: """Create HTML player with video and subtitles""" html_path = os.path.join(OUTPUT_DIR, "player.html") video_name = os.path.basename(video_path) subtitle_tracks = "\n".join( f'' for lang, path in subtitle_paths.items() ) style_block = f"video::cue {{ {style} }}" if style else "" html_content = f""" Video Player

Video Player with Subtitles

Download Subtitles:

{"".join( f'' f'{lang.upper()} Subtitles (.vtt)
' for lang, path in subtitle_paths.items() )}
""" with open(html_path, 'w', encoding='utf-8') as f: f.write(html_content) return html_path def process_video( video_file: str, source_lang: str, target_langs: List[str], subtitle_style: str, speaker_settings: Dict[str, str], progress: gr.Progress = gr.Progress() ) -> List[str]: """Complete video processing pipeline""" try: progress(0.05, "Initializing...") # 1. Extract audio progress(0.1, "Extracting audio...") audio_path = extract_audio(video_file) # 2. Detect language if needed if source_lang == "Auto-detect": source_lang = detect_language(audio_path) progress(0.15, f"Detected language: {source_lang}") # 3. Generate subtitles progress(0.2, "Generating subtitles...") srt_path = generate_srt_from_whisper( audio_path, LANGUAGES[source_lang]["whisper"] ) # 4. Translate subtitles progress(0.3, "Translating subtitles...") translated_subs = translate_subtitles(srt_path, target_langs) # 5. Save original video base_video = os.path.join(OUTPUT_DIR, "base_video.mp4") shutil.copy(video_file, base_video) # 6. Process each target language translated_vtts = {} for i, lang_name in enumerate(target_langs, 1): lang_code = LANGUAGES[lang_name]["code"] progress(0.4 + (i * 0.5 / len(target_langs)), f"Processing {lang_name}...") # Generate audio translated_audio = generate_translated_audio( translated_subs[lang_code], lang_code, speaker_settings.get(lang_code, "default") ) # Generate subtitles vtt_path = generate_webvtt_subtitles( translated_subs[lang_code], SUBTITLE_STYLES.get(subtitle_style, "") ) translated_vtts[lang_code] = vtt_path # Create translated video version output_video = os.path.join(OUTPUT_DIR, f"output_{lang_code}.mp4") subprocess.run([ 'ffmpeg', '-i', base_video, '-i', translated_audio, '-map', '0:v', '-map', '1:a', '-c:v', 'copy', '-c:a', 'aac', '-y', output_video ], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # 7. Create HTML player progress(0.9, "Creating HTML player...") html_path = create_html_player( base_video, translated_vtts, SUBTITLE_STYLES.get(subtitle_style, "") ) # Prepare all output files output_files = [html_path, base_video] + \ list(translated_vtts.values()) + \ [os.path.join(OUTPUT_DIR, f"output_{LANGUAGES[lang]['code']}.mp4") for lang in target_langs] progress(1.0, "Done!") return output_files, "Processing completed successfully!" except Exception as e: logger.error(f"Processing failed: {str(e)}", exc_info=True) return None, f"Error: {str(e)}" def get_speaker_settings(*args) -> Dict[str, str]: """Create speaker settings dictionary from inputs""" settings = {} for i, lang in enumerate(LANGUAGES.keys()): if i < len(args) and args[i]: settings[LANGUAGES[lang]["code"]] = args[i] return settings def create_interface(): """Create Gradio interface""" with gr.Blocks(title="Video Translator") as demo: gr.Markdown("# Free Video Translation System") gr.Markdown("Translate videos with subtitles and audio dubbing using free/open-source tools") with gr.Row(): with gr.Column(scale=1): video_input = gr.Video(label="Upload Video") with gr.Accordion("Source Settings", open=True): source_lang = gr.Dropdown( label="Source Language", choices=["Auto-detect"] + list(LANGUAGES.keys()), value="Auto-detect" ) with gr.Accordion("Target Languages", open=True): target_langs = gr.CheckboxGroup( label="Select target languages", choices=list(LANGUAGES.keys()), value=["English", "Spanish"] ) with gr.Accordion("Subtitle Styling", open=False): subtitle_style = gr.Dropdown( label="Subtitle Appearance", choices=list(SUBTITLE_STYLES.keys()), value="Default" ) with gr.Accordion("Voice Settings", open=False): speaker_inputs = [] for lang_name in LANGUAGES.keys(): speakers = LANGUAGES[lang_name]["speakers"] if len(speakers) > 1: speaker_inputs.append( gr.Dropdown( label=f"{lang_name} Speaker", choices=speakers, value=speakers[0], visible=False ) ) else: speaker_inputs.append(gr.Textbox(visible=False)) submit_btn = gr.Button("Translate Video", variant="primary") with gr.Column(scale=2): output_files = gr.Files(label="Download Files") status = gr.Textbox(label="Status") gr.Markdown(""" **Instructions:** 1. Upload a video file 2. Select source and target languages 3. Customize subtitles and voices 4. Click Translate 5. Download the HTML player and open in browser """) def update_speaker_ui(selected_langs): updates = [] for i, lang_name in enumerate(LANGUAGES.keys()): visible = lang_name in selected_langs and len(LANGUAGES[lang_name]["speakers"]) > 1 updates.append(gr.Dropdown.update(visible=visible)) return updates target_langs.change( update_speaker_ui, inputs=target_langs, outputs=speaker_inputs ) submit_btn.click( process_video, inputs=[ video_input, source_lang, target_langs, subtitle_style, gr.State(lambda: get_speaker_settings(*speaker_inputs)) ], outputs=[output_files, status] ) return demo if __name__ == "__main__": # Clear output directory on startup if os.path.exists(OUTPUT_DIR): shutil.rmtree(OUTPUT_DIR) os.makedirs(OUTPUT_DIR, exist_ok=True) demo = create_interface() demo.launch(share=True) # Required for Hugging Face Spaces