import os import re import google.generativeai as genai from moviepy.video.io.VideoFileClip import VideoFileClip import tempfile import logging import gradio as gr from datetime import timedelta from pydub import AudioSegment # Suppress moviepy logs logging.getLogger("moviepy").setLevel(logging.ERROR) # Configure Gemini API genai.configure(api_key=os.environ["GEMINI_API_KEY"]) model = genai.GenerativeModel("gemini-2.0-pro-exp-02-05") # Supported languages SUPPORTED_LANGUAGES = [ "Auto Detect", "English", "Spanish", "French", "German", "Italian", "Portuguese", "Russian", "Japanese", "Korean", "Arabic", "Hindi", "Chinese", "Dutch", "Turkish", "Polish", "Vietnamese", "Thai" ] # Magic Prompts TRANSCRIPTION_PROMPT = """Generate precise subtitles with accurate timestamps: 1. Use [HH:MM:SS.ms -> HH:MM:SS.ms] format 2. Each subtitle 3-7 words 3. Include speaker changes 4. Preserve emotional tone 5. Example: [00:00:05.250 -> 00:00:08.100] Example subtitle text Return ONLY subtitles with timestamps.""" TRANSLATION_PROMPT = """Translate these subtitles to {target_language}: 1. Keep timestamps identical 2. Match text length to timing 3. Preserve technical terms 4. Use natural speech patterns ORIGINAL: {subtitles} TRANSLATED:""" def split_audio(audio_path, chunk_duration=60): """Split audio into smaller chunks (default: 60 seconds)""" audio = AudioSegment.from_wav(audio_path) chunks = [] for i in range(0, len(audio), chunk_duration * 1000): chunk = audio[i:i + chunk_duration * 1000] chunk_path = os.path.join(tempfile.gettempdir(), f"chunk_{i//1000}.wav") chunk.export(chunk_path, format="wav") chunks.append(chunk_path) return chunks def process_audio_chunk(chunk_path, start_time): """Transcribe a single audio chunk""" try: # Upload file using Gemini's File API uploaded_file = genai.upload_file(path=chunk_path) # Get transcription response = model.generate_content( [TRANSCRIPTION_PROMPT, uploaded_file] ) # Adjust timestamps relative to chunk start adjusted_transcription = [] for line in response.text.splitlines(): if '->' in line: start, end = line.split('->') adjusted_start = parse_timestamp(start.strip()) + start_time adjusted_end = parse_timestamp(end.strip()) + start_time adjusted_line = f"[{format_timestamp(adjusted_start)} -> {format_timestamp(adjusted_end)}]" adjusted_transcription.append(adjusted_line) else: adjusted_transcription.append(line) return "\n".join(adjusted_transcription) finally: os.remove(chunk_path) def parse_timestamp(timestamp_str): """Flexible timestamp parser""" clean_ts = timestamp_str.strip("[] ").replace(',', '.') parts = clean_ts.split(':') seconds = 0.0 if len(parts) == 3: # HH:MM:SS.ss hours, minutes, seconds_part = parts seconds += float(hours) * 3600 elif len(parts) == 2: # MM:SS.ss minutes, seconds_part = parts else: raise ValueError(f"Invalid timestamp: {timestamp_str}") seconds += float(minutes) * 60 seconds += float(seconds_part) return seconds def format_timestamp(seconds): """Convert seconds to SRT format""" return str(timedelta(seconds=seconds)).replace('.', ',') def create_srt(subtitles_text): """Convert raw transcription to SRT format""" entries = re.split(r'\n{2,}', subtitles_text.strip()) srt_output = [] for idx, entry in enumerate(entries, 1): try: time_match = re.search( r'\[?\s*((?:\d+:)?\d+:\d+[.,]\d{3})\s*->\s*((?:\d+:)?\d+:\d+[.,]\d{3})\s*\]?', entry ) if not time_match: continue start_time = parse_timestamp(time_match.group(1)) end_time = parse_timestamp(time_match.group(2)) text = entry.split(']', 1)[-1].strip() srt_entry = ( f"{idx}\n" f"{format_timestamp(start_time)} --> {format_timestamp(end_time)}\n" f"{text}\n" ) srt_output.append(srt_entry) except Exception as e: print(f"Skipping invalid entry {idx}: {str(e)}") continue return "\n".join(srt_output) def process_video(video_path, source_lang, target_lang): """Complete processing pipeline""" try: # Extract audio audio_path = extract_audio(video_path) # Split into chunks chunks = split_audio(audio_path) full_transcription = [] # Process each chunk for i, chunk_path in enumerate(chunks): start_time = i * 60 # 60 seconds per chunk chunk_transcription = process_audio_chunk(chunk_path, start_time) full_transcription.append(chunk_transcription) # Combine results srt_original = create_srt("\n\n".join(full_transcription)) # Save original subtitles original_srt = os.path.join(tempfile.gettempdir(), "original.srt") with open(original_srt, "w") as f: f.write(srt_original) # Translate if needed translated_srt = None if target_lang != "None": translated_text = translate_subtitles(srt_original, target_lang) translated_srt = os.path.join(tempfile.gettempdir(), "translated.srt") with open(translated_srt, "w") as f: f.write(create_srt(translated_text)) return original_srt, translated_srt except Exception as e: print(f"Processing error: {str(e)}") return None, None finally: if os.path.exists(audio_path): os.remove(audio_path) # Gradio Interface with gr.Blocks(theme=gr.themes.Soft(), title="AI Subtitle Studio") as app: gr.Markdown("# 🎬 Professional Subtitle Generator") with gr.Row(): video_input = gr.Video(label="Upload Video", sources=["upload"]) with gr.Column(): source_lang = gr.Dropdown( label="Source Language", choices=SUPPORTED_LANGUAGES, value="Auto Detect" ) target_lang = gr.Dropdown( label="Translate To", choices=["None"] + SUPPORTED_LANGUAGES[1:], value="None" ) process_btn = gr.Button("Generate", variant="primary") with gr.Row(): original_sub = gr.File(label="Original Subtitles") translated_sub = gr.File(label="Translated Subtitles") process_btn.click( process_video, inputs=[video_input, source_lang, target_lang], outputs=[original_sub, translated_sub] ) if __name__ == "__main__": app.launch(server_port=7860, share=True)