Spaces:
Running
Running
import gradio as gr | |
import os | |
import subprocess | |
import torch | |
from TTS.api import TTS | |
from deep_translator import GoogleTranslator | |
import pysrt | |
import whisper | |
import webvtt | |
import shutil | |
import time | |
from tqdm import tqdm | |
from typing import Dict, List, Optional | |
import logging | |
# Set up logging | |
logging.basicConfig(level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# Configuration | |
LANGUAGES = { | |
"English": {"code": "en", "speakers": ["default"], "whisper": "en"}, | |
"Spanish": {"code": "es", "speakers": ["default"], "whisper": "es"}, | |
"French": {"code": "fr", "speakers": ["default"], "whisper": "fr"}, | |
"German": {"code": "de", "speakers": ["thorsten", "eva_k"], "whisper": "de"}, | |
"Japanese": {"code": "ja", "speakers": ["default"], "whisper": "ja"}, | |
"Hindi": {"code": "hi", "speakers": ["default"], "whisper": "hi"} | |
} | |
SUBTITLE_STYLES = { | |
"Default": "", | |
"White Text": "color: white;", | |
"Yellow Text": "color: yellow;", | |
"Large Text": "font-size: 24px;", | |
"Bold Text": "font-weight: bold;", | |
"Black Background": "background-color: black; padding: 5px;" | |
} | |
# Create output directory (relative path for Spaces) | |
OUTPUT_DIR = "outputs" | |
os.makedirs(OUTPUT_DIR, exist_ok=True) | |
# Initialize TTS with error handling | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
tts_models = {} | |
def load_tts_model(model_name: str, lang_code: str): | |
try: | |
tts = TTS(model_name).to(device) | |
# Try to use gruut phonemizer if espeak fails | |
if hasattr(tts.synthesizer, 'tts_config'): | |
tts.synthesizer.tts_config.phonemizer = "gruut" | |
return tts | |
except Exception as e: | |
logger.error(f"Failed to load {model_name}: {str(e)}") | |
return None | |
# Initialize models only when needed | |
def get_tts_model(lang_code: str): | |
if lang_code not in tts_models: | |
model_map = { | |
"en": "tts_models/en/ljspeech/tacotron2-DDC", | |
"es": "tts_models/es/css10/vits", | |
"fr": "tts_models/fr/css10/vits", | |
"de": "tts_models/de/thorsten/vits", # Using VITS instead of tacotron2 | |
"ja": "tts_models/ja/kokoro/tacotron2-DDC", | |
"hi": "tts_models/hi/kb/tacotron2-DDC" | |
} | |
tts_models[lang_code] = load_tts_model(model_map[lang_code], lang_code) | |
return tts_models[lang_code] | |
# Initialize Whisper (load when needed) | |
whisper_model = None | |
def get_whisper_model(): | |
global whisper_model | |
if whisper_model is None: | |
whisper_model = whisper.load_model("small") | |
return whisper_model | |
def extract_audio(video_path: str) -> str: | |
"""Extract audio using ffmpeg""" | |
audio_path = os.path.join(OUTPUT_DIR, "audio.wav") | |
cmd = [ | |
'ffmpeg', '-i', video_path, '-vn', | |
'-acodec', 'pcm_s16le', '-ar', '16000', | |
'-ac', '1', '-y', audio_path | |
] | |
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
return audio_path | |
def transcribe_with_whisper(audio_path: str, language: str = None) -> str: | |
"""Transcribe audio using Whisper""" | |
model = get_whisper_model() | |
result = model.transcribe(audio_path, language=language, word_timestamps=True) | |
return result | |
def generate_srt_from_whisper(audio_path: str, language: str) -> str: | |
"""Generate SRT subtitles from Whisper output""" | |
result = transcribe_with_whisper(audio_path, language) | |
subs = pysrt.SubRipFile() | |
for i, segment in enumerate(result["segments"]): | |
subs.append(pysrt.SubRipItem( | |
index=i+1, | |
start=pysrt.SubRipTime(seconds=segment["start"]), | |
end=pysrt.SubRipTime(seconds=segment["end"]), | |
text=segment["text"] | |
)) | |
srt_path = os.path.join(OUTPUT_DIR, "subtitles.srt") | |
subs.save(srt_path, encoding='utf-8') | |
return srt_path | |
def detect_language(audio_path: str) -> str: | |
"""Detect language using Whisper""" | |
result = transcribe_with_whisper(audio_path) | |
detected_code = result["language"] | |
for name, data in LANGUAGES.items(): | |
if data["whisper"] == detected_code: | |
return name | |
return "English" | |
def translate_subtitles(srt_path: str, target_langs: List[str]) -> Dict[str, str]: | |
"""Translate subtitles to multiple languages""" | |
subs = pysrt.open(srt_path) | |
results = {} | |
for lang_name in target_langs: | |
lang_code = LANGUAGES[lang_name]["code"] | |
translated_subs = subs[:] | |
translator = GoogleTranslator(source='auto', target=lang_code) | |
for sub in translated_subs: | |
try: | |
sub.text = translator.translate(sub.text) | |
except Exception as e: | |
logger.warning(f"Translation failed: {str(e)}") | |
continue | |
output_path = os.path.join(OUTPUT_DIR, f"subtitles_{lang_code}.srt") | |
translated_subs.save(output_path, encoding='utf-8') | |
results[lang_code] = output_path | |
return results | |
def generate_webvtt_subtitles(srt_path: str, style: str = "") -> str: | |
"""Convert SRT to WebVTT with optional styling""" | |
subs = pysrt.open(srt_path) | |
lang_code = os.path.basename(srt_path).split('_')[-1].replace('.srt', '') | |
vtt_path = os.path.join(OUTPUT_DIR, f"subtitles_{lang_code}.vtt") | |
with open(vtt_path, 'w', encoding='utf-8') as f: | |
f.write("WEBVTT\n\n") | |
if style: | |
f.write(f"STYLE\n::cue {{\n{style}\n}}\n\n") | |
for sub in subs: | |
start = sub.start.to_time().strftime('%H:%M:%S.%f')[:-3] | |
end = sub.end.to_time().strftime('%H:%M:%S.%f')[:-3] | |
f.write(f"{start} --> {end}\n") | |
f.write(f"{sub.text}\n\n") | |
return vtt_path | |
def generate_translated_audio( | |
srt_path: str, | |
target_lang: str, | |
speaker: str = "default" | |
) -> str: | |
"""Generate translated audio using TTS""" | |
subs = pysrt.open(srt_path) | |
temp_dir = os.path.join(OUTPUT_DIR, f"temp_audio_{target_lang}") | |
os.makedirs(temp_dir, exist_ok=True) | |
audio_files = [] | |
timings = [] | |
tts = get_tts_model(target_lang) | |
if tts is None: | |
raise Exception(f"TTS model for {target_lang} not available") | |
for i, sub in enumerate(tqdm(subs, desc=f"Generating {target_lang} audio")): | |
text = sub.text.strip() | |
if not text: | |
continue | |
start_time = sub.start.ordinal / 1000 | |
audio_file = os.path.join(temp_dir, f"chunk_{i:04d}.wav") | |
try: | |
kwargs = {"speaker": speaker} if speaker != "default" and hasattr(tts, 'synthesizer') else {} | |
tts.tts_to_file(text=text, file_path=audio_file, **kwargs) | |
audio_files.append(audio_file) | |
timings.append((start_time, audio_file)) | |
except Exception as e: | |
logger.warning(f"TTS failed: {str(e)}") | |
if not audio_files: | |
raise Exception("No audio generated") | |
# Create silent audio | |
video_duration = get_video_duration(os.path.join(OUTPUT_DIR, "base_video.mp4")) | |
silence_file = os.path.join(temp_dir, "silence.wav") | |
subprocess.run([ | |
'ffmpeg', '-f', 'lavfi', '-i', 'anullsrc=r=44100:cl=stereo', | |
'-t', str(video_duration), '-y', silence_file | |
], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
# Mix audio | |
filter_complex = "[0:a]" + "".join( | |
f"[{i+1}:a]adelay={int(start*1000)}|{int(start*1000)}[a{i}];" + | |
f"[a{i-1 if i>0 else 'out'}]" + f"[a{i}]amix=inputs=2[aout]" | |
for i, (start, _) in enumerate(timings) | |
) | |
cmd = ['ffmpeg', '-y', '-i', silence_file] + \ | |
[f'-i {f}' for f in audio_files] + [ | |
'-filter_complex', filter_complex, | |
'-map', '[aout]', | |
os.path.join(OUTPUT_DIR, f"translated_audio_{target_lang}.wav")] | |
subprocess.run(' '.join(cmd), shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
shutil.rmtree(temp_dir) | |
return os.path.join(OUTPUT_DIR, f"translated_audio_{target_lang}.wav") | |
def get_video_duration(video_path: str) -> float: | |
"""Get video duration in seconds""" | |
result = subprocess.run([ | |
'ffprobe', '-v', 'error', '-show_entries', 'format=duration', | |
'-of', 'default=noprint_wrappers=1:nokey=1', video_path | |
], capture_output=True, text=True) | |
return float(result.stdout.strip() or 180) | |
def create_html_player( | |
video_path: str, | |
subtitle_paths: Dict[str, str], | |
style: str = "" | |
) -> str: | |
"""Create HTML player with video and subtitles""" | |
html_path = os.path.join(OUTPUT_DIR, "player.html") | |
video_name = os.path.basename(video_path) | |
subtitle_tracks = "\n".join( | |
f'<track kind="subtitles" src="{os.path.basename(path)}" ' | |
f'srclang="{lang}" label="{lang.capitalize()}" ' | |
f'{"default" if lang == "en" else ""}>' | |
for lang, path in subtitle_paths.items() | |
) | |
style_block = f"video::cue {{ {style} }}" if style else "" | |
html_content = f"""<!DOCTYPE html> | |
<html> | |
<head> | |
<title>Video Player</title> | |
<style> | |
body {{ font-family: Arial, sans-serif; margin: 20px; }} | |
.container {{ max-width: 800px; margin: 0 auto; }} | |
video {{ width: 100%; background: #000; }} | |
.downloads {{ margin-top: 20px; }} | |
{style_block} | |
</style> | |
</head> | |
<body> | |
<div class="container"> | |
<h2>Video Player with Subtitles</h2> | |
<video controls> | |
<source src="{video_name}" type="video/mp4"> | |
{subtitle_tracks} | |
</video> | |
<div class="downloads"> | |
<h3>Download Subtitles:</h3> | |
{"".join( | |
f'<a href="{os.path.basename(path)}" download>' | |
f'{lang.upper()} Subtitles (.vtt)</a><br>' | |
for lang, path in subtitle_paths.items() | |
)} | |
</div> | |
</div> | |
</body> | |
</html>""" | |
with open(html_path, 'w', encoding='utf-8') as f: | |
f.write(html_content) | |
return html_path | |
def process_video( | |
video_file: str, | |
source_lang: str, | |
target_langs: List[str], | |
subtitle_style: str, | |
speaker_settings: Dict[str, str], | |
progress: gr.Progress = gr.Progress() | |
) -> List[str]: | |
"""Complete video processing pipeline""" | |
try: | |
progress(0.05, "Initializing...") | |
# 1. Extract audio | |
progress(0.1, "Extracting audio...") | |
audio_path = extract_audio(video_file) | |
# 2. Detect language if needed | |
if source_lang == "Auto-detect": | |
source_lang = detect_language(audio_path) | |
progress(0.15, f"Detected language: {source_lang}") | |
# 3. Generate subtitles | |
progress(0.2, "Generating subtitles...") | |
srt_path = generate_srt_from_whisper( | |
audio_path, | |
LANGUAGES[source_lang]["whisper"] | |
) | |
# 4. Translate subtitles | |
progress(0.3, "Translating subtitles...") | |
translated_subs = translate_subtitles(srt_path, target_langs) | |
# 5. Save original video | |
base_video = os.path.join(OUTPUT_DIR, "base_video.mp4") | |
shutil.copy(video_file, base_video) | |
# 6. Process each target language | |
translated_vtts = {} | |
for i, lang_name in enumerate(target_langs, 1): | |
lang_code = LANGUAGES[lang_name]["code"] | |
progress(0.4 + (i * 0.5 / len(target_langs)), f"Processing {lang_name}...") | |
# Generate audio | |
translated_audio = generate_translated_audio( | |
translated_subs[lang_code], | |
lang_code, | |
speaker_settings.get(lang_code, "default") | |
) | |
# Generate subtitles | |
vtt_path = generate_webvtt_subtitles( | |
translated_subs[lang_code], | |
SUBTITLE_STYLES.get(subtitle_style, "") | |
) | |
translated_vtts[lang_code] = vtt_path | |
# Create translated video version | |
output_video = os.path.join(OUTPUT_DIR, f"output_{lang_code}.mp4") | |
subprocess.run([ | |
'ffmpeg', '-i', base_video, '-i', translated_audio, | |
'-map', '0:v', '-map', '1:a', '-c:v', 'copy', '-c:a', 'aac', | |
'-y', output_video | |
], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
# 7. Create HTML player | |
progress(0.9, "Creating HTML player...") | |
html_path = create_html_player( | |
base_video, | |
translated_vtts, | |
SUBTITLE_STYLES.get(subtitle_style, "") | |
) | |
# Prepare all output files | |
output_files = [html_path, base_video] + \ | |
list(translated_vtts.values()) + \ | |
[os.path.join(OUTPUT_DIR, f"output_{LANGUAGES[lang]['code']}.mp4") | |
for lang in target_langs] | |
progress(1.0, "Done!") | |
return output_files, "Processing completed successfully!" | |
except Exception as e: | |
logger.error(f"Processing failed: {str(e)}", exc_info=True) | |
return None, f"Error: {str(e)}" | |
def get_speaker_settings(*args) -> Dict[str, str]: | |
"""Create speaker settings dictionary from inputs""" | |
settings = {} | |
for i, lang in enumerate(LANGUAGES.keys()): | |
if i < len(args) and args[i]: | |
settings[LANGUAGES[lang]["code"]] = args[i] | |
return settings | |
def create_interface(): | |
"""Create Gradio interface""" | |
with gr.Blocks(title="Video Translator") as demo: | |
gr.Markdown("# Free Video Translation System") | |
gr.Markdown("Translate videos with subtitles and audio dubbing using free/open-source tools") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
video_input = gr.Video(label="Upload Video") | |
with gr.Accordion("Source Settings", open=True): | |
source_lang = gr.Dropdown( | |
label="Source Language", | |
choices=["Auto-detect"] + list(LANGUAGES.keys()), | |
value="Auto-detect" | |
) | |
with gr.Accordion("Target Languages", open=True): | |
target_langs = gr.CheckboxGroup( | |
label="Select target languages", | |
choices=list(LANGUAGES.keys()), | |
value=["English", "Spanish"] | |
) | |
with gr.Accordion("Subtitle Styling", open=False): | |
subtitle_style = gr.Dropdown( | |
label="Subtitle Appearance", | |
choices=list(SUBTITLE_STYLES.keys()), | |
value="Default" | |
) | |
with gr.Accordion("Voice Settings", open=False): | |
speaker_inputs = [] | |
for lang_name in LANGUAGES.keys(): | |
speakers = LANGUAGES[lang_name]["speakers"] | |
if len(speakers) > 1: | |
speaker_inputs.append( | |
gr.Dropdown( | |
label=f"{lang_name} Speaker", | |
choices=speakers, | |
value=speakers[0], | |
visible=False | |
) | |
) | |
else: | |
speaker_inputs.append(gr.Textbox(visible=False)) | |
submit_btn = gr.Button("Translate Video", variant="primary") | |
with gr.Column(scale=2): | |
output_files = gr.Files(label="Download Files") | |
status = gr.Textbox(label="Status") | |
gr.Markdown(""" | |
**Instructions:** | |
1. Upload a video file | |
2. Select source and target languages | |
3. Customize subtitles and voices | |
4. Click Translate | |
5. Download the HTML player and open in browser | |
""") | |
def update_speaker_ui(selected_langs): | |
updates = [] | |
for i, lang_name in enumerate(LANGUAGES.keys()): | |
visible = lang_name in selected_langs and len(LANGUAGES[lang_name]["speakers"]) > 1 | |
updates.append(gr.Dropdown.update(visible=visible)) | |
return updates | |
target_langs.change( | |
update_speaker_ui, | |
inputs=target_langs, | |
outputs=speaker_inputs | |
) | |
submit_btn.click( | |
process_video, | |
inputs=[ | |
video_input, | |
source_lang, | |
target_langs, | |
subtitle_style, | |
gr.State(lambda: get_speaker_settings(*speaker_inputs)) | |
], | |
outputs=[output_files, status] | |
) | |
return demo | |
if __name__ == "__main__": | |
# Clear output directory on startup | |
if os.path.exists(OUTPUT_DIR): | |
shutil.rmtree(OUTPUT_DIR) | |
os.makedirs(OUTPUT_DIR, exist_ok=True) | |
demo = create_interface() | |
demo.launch(share=True) # Required for Hugging Face Spaces |