Nishur's picture
Update app.py
7ddad4d verified
raw
history blame
25.1 kB
import gradio as gr
import os
import tempfile
import subprocess
import assemblyai as aai
from deep_translator import GoogleTranslator
import pysrt
import logging
import sys
import shutil
from pathlib import Path
import time
from tqdm import tqdm
from gtts import gTTS
# Set up logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
stream=sys.stdout)
logger = logging.getLogger(__name__)
# Configuration
aai.settings.api_key = os.getenv("ASSEMBLYAI_API_KEY")
LANGUAGES = {
"English": "en",
"Spanish": "es",
"French": "fr",
"German": "de",
"Japanese": "ja",
"Hindi": "hi"
}
# TTS voice mapping for different languages
TTS_VOICES = {
"en": "en-US",
"es": "es-ES",
"fr": "fr-FR",
"de": "de-DE",
"ja": "ja-JP",
"hi": "hi-IN"
}
# Create a permanent output directory
OUTPUT_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "outputs")
os.makedirs(OUTPUT_DIR, exist_ok=True)
def extract_audio(video_path):
"""Extract audio from video file using ffmpeg"""
try:
logger.info(f"Extracting audio from video: {video_path}")
audio_path = os.path.join(OUTPUT_DIR, "audio.wav")
# Use ffmpeg to extract audio
cmd = [
'ffmpeg',
'-i', video_path,
'-vn', # No video
'-acodec', 'pcm_s16le', # PCM format
'-ar', '44100', # Sample rate
'-ac', '2', # Stereo
'-y', # Overwrite output file
audio_path
]
logger.info(f"Running command: {' '.join(cmd)}")
process = subprocess.run(cmd, capture_output=True, text=True)
if process.returncode != 0:
logger.error(f"Audio extraction failed: {process.stderr}")
raise Exception(f"Audio extraction failed: {process.stderr}")
return audio_path
except Exception as e:
logger.error(f"Audio extraction failed: {str(e)}", exc_info=True)
raise Exception(f"Audio extraction failed: {str(e)}")
def generate_subtitles(audio_path):
"""Generate subtitles using AssemblyAI"""
try:
logger.info(f"Transcribing audio with AssemblyAI: {audio_path}")
transcriber = aai.Transcriber()
transcript = transcriber.transcribe(audio_path)
srt_path = os.path.join(OUTPUT_DIR, "subtitles.srt")
logger.info(f"Saving subtitles to: {srt_path}")
with open(srt_path, "w", encoding="utf-8") as f:
f.write(transcript.export_subtitles_srt())
return srt_path
except Exception as e:
logger.error(f"Subtitle generation failed: {str(e)}", exc_info=True)
raise Exception(f"Subtitle generation failed: {str(e)}")
def translate_subtitles(srt_path, target_langs):
"""Translate subtitles to target languages"""
try:
logger.info(f"Loading subtitles from: {srt_path}")
subs = pysrt.open(srt_path, encoding="utf-8")
results = {}
for lang_code in target_langs:
logger.info(f"Translating to language code: {lang_code}")
translated_subs = subs[:]
translator = GoogleTranslator(source="auto", target=lang_code)
for i, sub in enumerate(translated_subs):
try:
sub.text = translator.translate(sub.text)
if i % 10 == 0: # Log progress every 10 subtitles
logger.info(f"Translated {i+1}/{len(translated_subs)} subtitles to {lang_code}")
except Exception as e:
logger.warning(f"Failed to translate subtitle: {sub.text}. Error: {str(e)}")
# Keep original text if translation fails
output_path = os.path.join(OUTPUT_DIR, f"subtitles_{lang_code}.srt")
logger.info(f"Saving translated subtitles to: {output_path}")
translated_subs.save(output_path, encoding='utf-8')
results[lang_code] = output_path
return results
except Exception as e:
logger.error(f"Translation failed: {str(e)}", exc_info=True)
raise Exception(f"Translation failed: {str(e)}")
def generate_translated_audio(srt_path, target_lang):
"""Generate translated audio using text-to-speech"""
try:
logger.info(f"Generating translated audio for {target_lang}")
subs = pysrt.open(srt_path, encoding="utf-8")
translated_text = [sub.text for sub in subs]
# Create temporary directory for audio chunks
temp_dir = os.path.join(OUTPUT_DIR, f"temp_audio_{target_lang}")
os.makedirs(temp_dir, exist_ok=True)
# Generate TTS for each subtitle
audio_files = []
timings = []
for i, sub in enumerate(tqdm(subs, desc=f"Generating {target_lang} speech")):
text = sub.text.strip()
if not text:
continue
# Get timing information
start_time = (sub.start.hours * 3600 +
sub.start.minutes * 60 +
sub.start.seconds +
sub.start.milliseconds / 1000)
end_time = (sub.end.hours * 3600 +
sub.end.minutes * 60 +
sub.end.seconds +
sub.end.milliseconds / 1000)
duration = end_time - start_time
# Generate TTS audio
tts_lang = TTS_VOICES.get(target_lang, target_lang)
audio_file = os.path.join(temp_dir, f"chunk_{i:04d}.mp3")
try:
# Add a retry mechanism for Hindi and other potentially problematic languages
retry_count = 0
max_retries = 3
while retry_count < max_retries:
try:
# For Hindi, use slower speed which might improve reliability
slow_option = target_lang == "hi"
tts = gTTS(text=text, lang=target_lang, slow=slow_option)
tts.save(audio_file)
break
except Exception as e:
retry_count += 1
logger.warning(f"TTS attempt {retry_count} failed for {target_lang}: {str(e)}")
time.sleep(1) # Wait before retrying
# If still failing after retries, try with shorter text
if retry_count == max_retries and len(text) > 100:
logger.warning(f"Trying with shortened text for {target_lang}")
shortened_text = text[:100] + "..."
tts = gTTS(text=shortened_text, lang=target_lang, slow=True)
tts.save(audio_file)
if os.path.exists(audio_file) and os.path.getsize(audio_file) > 0:
audio_files.append(audio_file)
timings.append((start_time, end_time, duration, audio_file))
else:
logger.warning(f"Generated audio file is empty or does not exist: {audio_file}")
except Exception as e:
logger.warning(f"Failed to generate TTS for: {text}. Error: {str(e)}")
# Check if we actually generated any audio files
if not audio_files:
logger.warning(f"No audio files were generated for {target_lang}")
# Create a silent audio file as fallback
silent_audio = os.path.join(OUTPUT_DIR, f"translated_audio_{target_lang}.wav")
silent_cmd = [
'ffmpeg',
'-f', 'lavfi',
'-i', f'anullsrc=r=44100:cl=stereo',
'-t', '180', # 3 minutes default
'-q:a', '0',
'-y',
silent_audio
]
subprocess.run(silent_cmd, capture_output=True)
return silent_audio
# Create a silent audio track the same length as the original video
silence_file = os.path.join(temp_dir, "silence.wav")
try:
video_duration_cmd = [
'ffprobe',
'-v', 'error',
'-show_entries', 'format=duration',
'-of', 'default=noprint_wrappers=1:nokey=1',
os.path.join(OUTPUT_DIR, "base_video.mp4")
]
duration_result = subprocess.run(video_duration_cmd, capture_output=True, text=True)
video_duration = float(duration_result.stdout.strip())
except Exception as e:
logger.warning(f"Could not determine video duration: {str(e)}. Using default of 180 seconds.")
video_duration = 180.0
# Create silent audio track
silent_cmd = [
'ffmpeg',
'-f', 'lavfi',
'-i', f'anullsrc=r=44100:cl=stereo',
'-t', str(video_duration),
'-q:a', '0',
'-y',
silence_file
]
subprocess.run(silent_cmd, capture_output=True)
# Create a file with the audio mixing commands
filter_complex = []
input_count = 1 # Starting with 1 because 0 is the silence track
# Start with silent track
filter_parts = ["[0:a]"]
# Add each audio segment
for start_time, end_time, duration, audio_file in timings:
filter_parts.append(f"[{input_count}:a]adelay={int(start_time*1000)}|{int(start_time*1000)}")
input_count += 1
# Mix all audio tracks
filter_parts.append(f"amix=inputs={input_count}:dropout_transition=0:normalize=0[aout]")
filter_complex = ";".join(filter_parts)
# Build the ffmpeg command with all audio chunks
cmd = ['ffmpeg', '-y']
# Add silent base track
cmd.extend(['-i', silence_file])
# Add all audio chunks
for audio_file in audio_files:
cmd.extend(['-i', audio_file])
# Add filter complex and output
output_audio = os.path.join(OUTPUT_DIR, f"translated_audio_{target_lang}.wav")
cmd.extend([
'-filter_complex', filter_complex,
'-map', '[aout]',
output_audio
])
# Run the command
logger.info(f"Combining audio segments: {' '.join(cmd)}")
process = subprocess.run(cmd, capture_output=True)
if process.returncode != 0:
logger.error(f"Audio combination failed: {process.stderr}")
# Create a fallback silent audio as last resort
silent_audio = os.path.join(OUTPUT_DIR, f"translated_audio_{target_lang}.wav")
silent_cmd = [
'ffmpeg',
'-f', 'lavfi',
'-i', f'anullsrc=r=44100:cl=stereo',
'-t', str(video_duration),
'-q:a', '0',
'-y',
silent_audio
]
subprocess.run(silent_cmd, capture_output=True)
output_audio = silent_audio
# Verify the output file exists
if not os.path.exists(output_audio):
logger.error(f"Output audio file does not exist: {output_audio}")
# Create emergency fallback
silent_audio = os.path.join(OUTPUT_DIR, f"translated_audio_{target_lang}.wav")
silent_cmd = [
'ffmpeg',
'-f', 'lavfi',
'-i', f'anullsrc=r=44100:cl=stereo',
'-t', '180',
'-q:a', '0',
'-y',
silent_audio
]
subprocess.run(silent_cmd, capture_output=True)
output_audio = silent_audio
# Clean up temporary files
try:
shutil.rmtree(temp_dir)
except Exception as e:
logger.warning(f"Failed to clean up temp directory: {str(e)}")
return output_audio
except Exception as e:
logger.error(f"Audio translation failed: {str(e)}", exc_info=True)
# Create an emergency fallback silent audio
try:
silent_audio = os.path.join(OUTPUT_DIR, f"translated_audio_{target_lang}.wav")
silent_cmd = [
'ffmpeg',
'-f', 'lavfi',
'-i', f'anullsrc=r=44100:cl=stereo',
'-t', '180',
'-q:a', '0',
'-y',
silent_audio
]
subprocess.run(silent_cmd, capture_output=True)
return silent_audio
except:
raise Exception(f"Audio translation failed: {str(e)}")
def combine_video_audio_subtitles(video_path, audio_path, srt_path, output_path):
"""Combine video with translated audio and subtitles"""
try:
logger.info(f"Combining video, audio, and subtitles")
# Verify that all input files exist
if not os.path.exists(video_path):
raise Exception(f"Video file does not exist: {video_path}")
if not os.path.exists(audio_path):
raise Exception(f"Audio file does not exist: {audio_path}")
if not os.path.exists(srt_path):
raise Exception(f"Subtitle file does not exist: {srt_path}")
logger.info(f"Input files verified: Video: {os.path.getsize(video_path)} bytes, Audio: {os.path.getsize(audio_path)} bytes, Subtitles: {os.path.getsize(srt_path)} bytes")
# Create a safe version of the subtitle path
safe_srt_path = srt_path.replace(" ", "\\ ").replace(":", "\\:")
# Command to combine video with translated audio and subtitles
try:
# Attempt method 1: Using subtitles filter
cmd = [
'ffmpeg',
'-i', video_path, # Input video
'-i', audio_path, # Input translated audio
'-map', '0:v', # Use video from first input
'-map', '1:a', # Use audio from second input
'-vf', f"subtitles={safe_srt_path}:force_style='FontSize=24,PrimaryColour=&H00FFFFFF,OutlineColour=&H00000000,BorderStyle=3'", # Burn subtitles
'-c:v', 'libx264', # Video codec
'-c:a', 'aac', # Audio codec
'-shortest', # End when shortest input ends
'-y', # Overwrite output file
output_path
]
logger.info(f"Running command: {' '.join(cmd)}")
process = subprocess.run(cmd, capture_output=True, text=True)
if process.returncode != 0:
logger.warning(f"First method failed: {process.stderr}")
raise Exception("First method failed")
except Exception as e:
logger.warning(f"First method failed: {str(e)}")
try:
# Attempt method 2: Using hardcoded subtitles approach
temp_srt_dir = os.path.join(OUTPUT_DIR, "temp_srt")
os.makedirs(temp_srt_dir, exist_ok=True)
# Copy the SRT file to the temp directory
temp_srt_path = os.path.join(temp_srt_dir, "temp.srt")
shutil.copy(srt_path, temp_srt_path)
cmd = [
'ffmpeg',
'-i', video_path,
'-i', audio_path,
'-map', '0:v',
'-map', '1:a',
'-vf', f"subtitles={temp_srt_path}",
'-c:v', 'libx264',
'-c:a', 'aac',
'-shortest',
'-y',
output_path
]
logger.info(f"Running second method: {' '.join(cmd)}")
process = subprocess.run(cmd, capture_output=True, text=True)
if process.returncode != 0:
logger.warning(f"Second method failed: {process.stderr}")
raise Exception("Second method failed")
# Clean up temp directory
shutil.rmtree(temp_srt_dir)
except Exception as e:
logger.warning(f"Second method failed: {str(e)}")
# Attempt method 3: No subtitles as last resort
cmd = [
'ffmpeg',
'-i', video_path,
'-i', audio_path,
'-map', '0:v',
'-map', '1:a',
'-c:v', 'libx264',
'-c:a', 'aac',
'-shortest',
'-y',
output_path
]
logger.info(f"Running fallback method (no subtitles): {' '.join(cmd)}")
process = subprocess.run(cmd, capture_output=True, text=True)
if process.returncode != 0:
logger.error(f"All methods failed: {process.stderr}")
raise Exception(f"Failed to combine video and audio: {process.stderr}")
else:
logger.warning("Created video without subtitles as fallback")
# Verify the output file exists and has a reasonable size
if not os.path.exists(output_path):
raise Exception(f"Output file does not exist: {output_path}")
if os.path.getsize(output_path) < 1000:
raise Exception(f"Output file is too small: {os.path.getsize(output_path)} bytes")
logger.info(f"Successfully created output file: {output_path} ({os.path.getsize(output_path)} bytes)")
return output_path
except Exception as e:
logger.error(f"Combining failed: {str(e)}", exc_info=True)
raise Exception(f"Combining failed: {str(e)}")
def process_video(video_file, source_lang, target_langs, progress=gr.Progress()):
"""Process video with translation of both subtitles and audio"""
try:
progress(0.05, "Starting processing...")
logger.info(f"Processing video: {video_file}")
# Make sure we have ffmpeg installed
try:
subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True)
logger.info("ffmpeg is installed and working")
except (subprocess.SubprocessError, FileNotFoundError):
error_msg = "ffmpeg is not installed or not in PATH. Please install ffmpeg."
logger.error(error_msg)
return None, error_msg
# Extract audio
progress(0.1, "Extracting audio...")
audio_path = extract_audio(video_file)
# Generate subtitles
progress(0.25, "Generating subtitles...")
srt_path = generate_subtitles(audio_path)
# Translate subtitles
progress(0.4, "Translating subtitles...")
target_lang_codes = [LANGUAGES[lang] for lang in target_langs]
translated_subs = translate_subtitles(srt_path, target_lang_codes)
# Create a copy of the video file in our output directory
base_video = os.path.join(OUTPUT_DIR, "base_video.mp4")
shutil.copy(video_file, base_video)
# Process each target language
output_videos = []
for i, (lang_code, sub_path) in enumerate(translated_subs.items()):
lang_name = next(name for name, code in LANGUAGES.items() if code == lang_code)
progress(0.5 + (i * 0.5 / len(translated_subs)), f"Processing {lang_name}...")
try:
# Generate translated audio
logger.info(f"Generating translated audio for {lang_code}")
translated_audio = generate_translated_audio(sub_path, lang_code)
# Verify audio file exists
if not os.path.exists(translated_audio):
logger.error(f"Translated audio file does not exist: {translated_audio}")
continue
# Combine video, translated audio, and subtitles
output_path = os.path.join(OUTPUT_DIR, f"output_{lang_code}.mp4")
logger.info(f"Creating final video with {lang_code} audio and subtitles")
output_video = combine_video_audio_subtitles(
base_video,
translated_audio,
sub_path,
output_path
)
# Verify the output file exists and has content
if os.path.exists(output_video) and os.path.getsize(output_video) > 1000:
logger.info(f"Successfully created output file: {output_video}")
output_videos.append(output_video)
else:
logger.warning(f"Output file is missing or too small: {output_video}")
except Exception as e:
logger.error(f"Failed to process {lang_code}: {str(e)}")
# If all output videos failed, return the original
if not output_videos:
logger.warning("All translations failed, returning original video")
return base_video, "Failed to translate video, returning original"
progress(1.0, "Done!")
message = f"Processing complete. Created {len(output_videos)} translated videos."
logger.info(message)
return output_videos[0], message
except Exception as e:
logger.error(f"Processing failed: {str(e)}", exc_info=True)
return None, f"Processing failed: {str(e)}"
with gr.Blocks() as demo:
gr.Markdown("# Complete Video Translation System")
gr.Markdown("Translates both subtitles and audio to target languages")
with gr.Row():
with gr.Column(scale=1):
video_input = gr.Video(label="Upload Video")
source_lang = gr.Dropdown(
label="Source Language",
choices=list(LANGUAGES.keys()),
value="English"
)
target_langs = gr.CheckboxGroup(
label="Target Languages (Both Audio & Subtitles)",
choices=list(LANGUAGES.keys()),
value=["Spanish"]
)
submit_btn = gr.Button("Translate", variant="primary")
with gr.Column(scale=2):
output_video = gr.Video(label="Translated Video")
status_text = gr.Textbox(label="Status", interactive=False)
output_info = gr.Markdown("Output videos will be saved in the 'outputs' directory")
submit_btn.click(
process_video,
inputs=[video_input, source_lang, target_langs],
outputs=[output_video, status_text]
)
if __name__ == "__main__":
# Check dependencies at startup
missing_deps = []
# Check ffmpeg
try:
version_info = subprocess.run(['ffmpeg', '-version'], capture_output=True, text=True)
ffmpeg_version = version_info.stdout.split('\n')[0]
logger.info(f"ffmpeg version: {ffmpeg_version}")
except:
logger.warning("ffmpeg not found - required for video processing")
missing_deps.append("ffmpeg")
# Check Python dependencies
try:
import assemblyai
logger.info("AssemblyAI package found")
except ImportError:
logger.warning("AssemblyAI package not found - required for transcription")
missing_deps.append("assemblyai")
try:
import gtts
logger.info("gTTS package found")
except ImportError:
logger.warning("gTTS package not found - required for text-to-speech")
missing_deps.append("gtts")
try:
import deep_translator
logger.info("deep_translator package found")
except ImportError:
logger.warning("deep_translator package not found - required for translation")
missing_deps.append("deep_translator")
# Print installation instructions if dependencies are missing
if missing_deps:
logger.warning("Missing dependencies detected. Please install:")
if "ffmpeg" in missing_deps:
logger.warning("- ffmpeg: https://ffmpeg.org/download.html")
python_deps = [dep for dep in missing_deps if dep != "ffmpeg"]
if python_deps:
deps_str = " ".join(python_deps)
logger.warning(f"- Python packages: pip install {deps_str}")
# Start the app
demo.launch()