Nishur's picture
Update app.py
45248e1 verified
import gradio as gr
import os
import subprocess
import torch
from TTS.api import TTS
from deep_translator import GoogleTranslator
import pysrt
import whisper
import webvtt
import shutil
import time
from tqdm import tqdm
from typing import Dict, List, Optional
import logging
# Set up logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Configuration
LANGUAGES = {
"English": {"code": "en", "speakers": ["default"], "whisper": "en"},
"Spanish": {"code": "es", "speakers": ["default"], "whisper": "es"},
"French": {"code": "fr", "speakers": ["default"], "whisper": "fr"},
"German": {"code": "de", "speakers": ["thorsten", "eva_k"], "whisper": "de"},
"Japanese": {"code": "ja", "speakers": ["default"], "whisper": "ja"},
"Hindi": {"code": "hi", "speakers": ["default"], "whisper": "hi"}
}
SUBTITLE_STYLES = {
"Default": "",
"White Text": "color: white;",
"Yellow Text": "color: yellow;",
"Large Text": "font-size: 24px;",
"Bold Text": "font-weight: bold;",
"Black Background": "background-color: black; padding: 5px;"
}
# Create output directory (relative path for Spaces)
OUTPUT_DIR = "outputs"
os.makedirs(OUTPUT_DIR, exist_ok=True)
# Initialize TTS with error handling
device = "cuda" if torch.cuda.is_available() else "cpu"
tts_models = {}
def load_tts_model(model_name: str, lang_code: str):
try:
tts = TTS(model_name).to(device)
# Try to use gruut phonemizer if espeak fails
if hasattr(tts.synthesizer, 'tts_config'):
tts.synthesizer.tts_config.phonemizer = "gruut"
return tts
except Exception as e:
logger.error(f"Failed to load {model_name}: {str(e)}")
return None
# Initialize models only when needed
def get_tts_model(lang_code: str):
if lang_code not in tts_models:
model_map = {
"en": "tts_models/en/ljspeech/tacotron2-DDC",
"es": "tts_models/es/css10/vits",
"fr": "tts_models/fr/css10/vits",
"de": "tts_models/de/thorsten/vits", # Using VITS instead of tacotron2
"ja": "tts_models/ja/kokoro/tacotron2-DDC",
"hi": "tts_models/hi/kb/tacotron2-DDC"
}
tts_models[lang_code] = load_tts_model(model_map[lang_code], lang_code)
return tts_models[lang_code]
# Initialize Whisper (load when needed)
whisper_model = None
def get_whisper_model():
global whisper_model
if whisper_model is None:
whisper_model = whisper.load_model("small")
return whisper_model
def extract_audio(video_path: str) -> str:
"""Extract audio using ffmpeg"""
audio_path = os.path.join(OUTPUT_DIR, "audio.wav")
cmd = [
'ffmpeg', '-i', video_path, '-vn',
'-acodec', 'pcm_s16le', '-ar', '16000',
'-ac', '1', '-y', audio_path
]
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return audio_path
def transcribe_with_whisper(audio_path: str, language: str = None) -> str:
"""Transcribe audio using Whisper"""
model = get_whisper_model()
result = model.transcribe(audio_path, language=language, word_timestamps=True)
return result
def generate_srt_from_whisper(audio_path: str, language: str) -> str:
"""Generate SRT subtitles from Whisper output"""
result = transcribe_with_whisper(audio_path, language)
subs = pysrt.SubRipFile()
for i, segment in enumerate(result["segments"]):
subs.append(pysrt.SubRipItem(
index=i+1,
start=pysrt.SubRipTime(seconds=segment["start"]),
end=pysrt.SubRipTime(seconds=segment["end"]),
text=segment["text"]
))
srt_path = os.path.join(OUTPUT_DIR, "subtitles.srt")
subs.save(srt_path, encoding='utf-8')
return srt_path
def detect_language(audio_path: str) -> str:
"""Detect language using Whisper"""
result = transcribe_with_whisper(audio_path)
detected_code = result["language"]
for name, data in LANGUAGES.items():
if data["whisper"] == detected_code:
return name
return "English"
def translate_subtitles(srt_path: str, target_langs: List[str]) -> Dict[str, str]:
"""Translate subtitles to multiple languages"""
subs = pysrt.open(srt_path)
results = {}
for lang_name in target_langs:
lang_code = LANGUAGES[lang_name]["code"]
translated_subs = subs[:]
translator = GoogleTranslator(source='auto', target=lang_code)
for sub in translated_subs:
try:
sub.text = translator.translate(sub.text)
except Exception as e:
logger.warning(f"Translation failed: {str(e)}")
continue
output_path = os.path.join(OUTPUT_DIR, f"subtitles_{lang_code}.srt")
translated_subs.save(output_path, encoding='utf-8')
results[lang_code] = output_path
return results
def generate_webvtt_subtitles(srt_path: str, style: str = "") -> str:
"""Convert SRT to WebVTT with optional styling"""
subs = pysrt.open(srt_path)
lang_code = os.path.basename(srt_path).split('_')[-1].replace('.srt', '')
vtt_path = os.path.join(OUTPUT_DIR, f"subtitles_{lang_code}.vtt")
with open(vtt_path, 'w', encoding='utf-8') as f:
f.write("WEBVTT\n\n")
if style:
f.write(f"STYLE\n::cue {{\n{style}\n}}\n\n")
for sub in subs:
start = sub.start.to_time().strftime('%H:%M:%S.%f')[:-3]
end = sub.end.to_time().strftime('%H:%M:%S.%f')[:-3]
f.write(f"{start} --> {end}\n")
f.write(f"{sub.text}\n\n")
return vtt_path
def generate_translated_audio(
srt_path: str,
target_lang: str,
speaker: str = "default"
) -> str:
"""Generate translated audio using TTS"""
subs = pysrt.open(srt_path)
temp_dir = os.path.join(OUTPUT_DIR, f"temp_audio_{target_lang}")
os.makedirs(temp_dir, exist_ok=True)
audio_files = []
timings = []
tts = get_tts_model(target_lang)
if tts is None:
raise Exception(f"TTS model for {target_lang} not available")
for i, sub in enumerate(tqdm(subs, desc=f"Generating {target_lang} audio")):
text = sub.text.strip()
if not text:
continue
start_time = sub.start.ordinal / 1000
audio_file = os.path.join(temp_dir, f"chunk_{i:04d}.wav")
try:
kwargs = {"speaker": speaker} if speaker != "default" and hasattr(tts, 'synthesizer') else {}
tts.tts_to_file(text=text, file_path=audio_file, **kwargs)
audio_files.append(audio_file)
timings.append((start_time, audio_file))
except Exception as e:
logger.warning(f"TTS failed: {str(e)}")
if not audio_files:
raise Exception("No audio generated")
# Create silent audio
video_duration = get_video_duration(os.path.join(OUTPUT_DIR, "base_video.mp4"))
silence_file = os.path.join(temp_dir, "silence.wav")
subprocess.run([
'ffmpeg', '-f', 'lavfi', '-i', 'anullsrc=r=44100:cl=stereo',
'-t', str(video_duration), '-y', silence_file
], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Mix audio
filter_complex = "[0:a]" + "".join(
f"[{i+1}:a]adelay={int(start*1000)}|{int(start*1000)}[a{i}];" +
f"[a{i-1 if i>0 else 'out'}]" + f"[a{i}]amix=inputs=2[aout]"
for i, (start, _) in enumerate(timings)
)
cmd = ['ffmpeg', '-y', '-i', silence_file] + \
[f'-i {f}' for f in audio_files] + [
'-filter_complex', filter_complex,
'-map', '[aout]',
os.path.join(OUTPUT_DIR, f"translated_audio_{target_lang}.wav")]
subprocess.run(' '.join(cmd), shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
shutil.rmtree(temp_dir)
return os.path.join(OUTPUT_DIR, f"translated_audio_{target_lang}.wav")
def get_video_duration(video_path: str) -> float:
"""Get video duration in seconds"""
result = subprocess.run([
'ffprobe', '-v', 'error', '-show_entries', 'format=duration',
'-of', 'default=noprint_wrappers=1:nokey=1', video_path
], capture_output=True, text=True)
return float(result.stdout.strip() or 180)
def create_html_player(
video_path: str,
subtitle_paths: Dict[str, str],
style: str = ""
) -> str:
"""Create HTML player with video and subtitles"""
html_path = os.path.join(OUTPUT_DIR, "player.html")
video_name = os.path.basename(video_path)
subtitle_tracks = "\n".join(
f'<track kind="subtitles" src="{os.path.basename(path)}" '
f'srclang="{lang}" label="{lang.capitalize()}" '
f'{"default" if lang == "en" else ""}>'
for lang, path in subtitle_paths.items()
)
style_block = f"video::cue {{ {style} }}" if style else ""
html_content = f"""<!DOCTYPE html>
<html>
<head>
<title>Video Player</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
.container {{ max-width: 800px; margin: 0 auto; }}
video {{ width: 100%; background: #000; }}
.downloads {{ margin-top: 20px; }}
{style_block}
</style>
</head>
<body>
<div class="container">
<h2>Video Player with Subtitles</h2>
<video controls>
<source src="{video_name}" type="video/mp4">
{subtitle_tracks}
</video>
<div class="downloads">
<h3>Download Subtitles:</h3>
{"".join(
f'<a href="{os.path.basename(path)}" download>'
f'{lang.upper()} Subtitles (.vtt)</a><br>'
for lang, path in subtitle_paths.items()
)}
</div>
</div>
</body>
</html>"""
with open(html_path, 'w', encoding='utf-8') as f:
f.write(html_content)
return html_path
def process_video(
video_file: str,
source_lang: str,
target_langs: List[str],
subtitle_style: str,
speaker_settings: Dict[str, str],
progress: gr.Progress = gr.Progress()
) -> List[str]:
"""Complete video processing pipeline"""
try:
progress(0.05, "Initializing...")
# 1. Extract audio
progress(0.1, "Extracting audio...")
audio_path = extract_audio(video_file)
# 2. Detect language if needed
if source_lang == "Auto-detect":
source_lang = detect_language(audio_path)
progress(0.15, f"Detected language: {source_lang}")
# 3. Generate subtitles
progress(0.2, "Generating subtitles...")
srt_path = generate_srt_from_whisper(
audio_path,
LANGUAGES[source_lang]["whisper"]
)
# 4. Translate subtitles
progress(0.3, "Translating subtitles...")
translated_subs = translate_subtitles(srt_path, target_langs)
# 5. Save original video
base_video = os.path.join(OUTPUT_DIR, "base_video.mp4")
shutil.copy(video_file, base_video)
# 6. Process each target language
translated_vtts = {}
for i, lang_name in enumerate(target_langs, 1):
lang_code = LANGUAGES[lang_name]["code"]
progress(0.4 + (i * 0.5 / len(target_langs)), f"Processing {lang_name}...")
# Generate audio
translated_audio = generate_translated_audio(
translated_subs[lang_code],
lang_code,
speaker_settings.get(lang_code, "default")
)
# Generate subtitles
vtt_path = generate_webvtt_subtitles(
translated_subs[lang_code],
SUBTITLE_STYLES.get(subtitle_style, "")
)
translated_vtts[lang_code] = vtt_path
# Create translated video version
output_video = os.path.join(OUTPUT_DIR, f"output_{lang_code}.mp4")
subprocess.run([
'ffmpeg', '-i', base_video, '-i', translated_audio,
'-map', '0:v', '-map', '1:a', '-c:v', 'copy', '-c:a', 'aac',
'-y', output_video
], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# 7. Create HTML player
progress(0.9, "Creating HTML player...")
html_path = create_html_player(
base_video,
translated_vtts,
SUBTITLE_STYLES.get(subtitle_style, "")
)
# Prepare all output files
output_files = [html_path, base_video] + \
list(translated_vtts.values()) + \
[os.path.join(OUTPUT_DIR, f"output_{LANGUAGES[lang]['code']}.mp4")
for lang in target_langs]
progress(1.0, "Done!")
return output_files, "Processing completed successfully!"
except Exception as e:
logger.error(f"Processing failed: {str(e)}", exc_info=True)
return None, f"Error: {str(e)}"
def get_speaker_settings(*args) -> Dict[str, str]:
"""Create speaker settings dictionary from inputs"""
settings = {}
for i, lang in enumerate(LANGUAGES.keys()):
if i < len(args) and args[i]:
settings[LANGUAGES[lang]["code"]] = args[i]
return settings
def create_interface():
"""Create Gradio interface"""
with gr.Blocks(title="Video Translator") as demo:
gr.Markdown("# Free Video Translation System")
gr.Markdown("Translate videos with subtitles and audio dubbing using free/open-source tools")
with gr.Row():
with gr.Column(scale=1):
video_input = gr.Video(label="Upload Video")
with gr.Accordion("Source Settings", open=True):
source_lang = gr.Dropdown(
label="Source Language",
choices=["Auto-detect"] + list(LANGUAGES.keys()),
value="Auto-detect"
)
with gr.Accordion("Target Languages", open=True):
target_langs = gr.CheckboxGroup(
label="Select target languages",
choices=list(LANGUAGES.keys()),
value=["English", "Spanish"]
)
with gr.Accordion("Subtitle Styling", open=False):
subtitle_style = gr.Dropdown(
label="Subtitle Appearance",
choices=list(SUBTITLE_STYLES.keys()),
value="Default"
)
with gr.Accordion("Voice Settings", open=False):
speaker_inputs = []
for lang_name in LANGUAGES.keys():
speakers = LANGUAGES[lang_name]["speakers"]
if len(speakers) > 1:
speaker_inputs.append(
gr.Dropdown(
label=f"{lang_name} Speaker",
choices=speakers,
value=speakers[0],
visible=False
)
)
else:
speaker_inputs.append(gr.Textbox(visible=False))
submit_btn = gr.Button("Translate Video", variant="primary")
with gr.Column(scale=2):
output_files = gr.Files(label="Download Files")
status = gr.Textbox(label="Status")
gr.Markdown("""
**Instructions:**
1. Upload a video file
2. Select source and target languages
3. Customize subtitles and voices
4. Click Translate
5. Download the HTML player and open in browser
""")
def update_speaker_ui(selected_langs):
updates = []
for i, lang_name in enumerate(LANGUAGES.keys()):
visible = lang_name in selected_langs and len(LANGUAGES[lang_name]["speakers"]) > 1
updates.append(gr.Dropdown.update(visible=visible))
return updates
target_langs.change(
update_speaker_ui,
inputs=target_langs,
outputs=speaker_inputs
)
submit_btn.click(
process_video,
inputs=[
video_input,
source_lang,
target_langs,
subtitle_style,
gr.State(lambda: get_speaker_settings(*speaker_inputs))
],
outputs=[output_files, status]
)
return demo
if __name__ == "__main__":
# Clear output directory on startup
if os.path.exists(OUTPUT_DIR):
shutil.rmtree(OUTPUT_DIR)
os.makedirs(OUTPUT_DIR, exist_ok=True)
demo = create_interface()
demo.launch(share=True) # Required for Hugging Face Spaces