Spaces:
Running
Running
import os | |
import re | |
import google.generativeai as genai | |
from moviepy.video.io.VideoFileClip import VideoFileClip | |
import tempfile | |
import logging | |
import gradio as gr | |
from datetime import timedelta | |
from pydub import AudioSegment | |
# Suppress moviepy logs | |
logging.getLogger("moviepy").setLevel(logging.ERROR) | |
# Configure Gemini API | |
genai.configure(api_key=os.environ["GEMINI_API_KEY"]) | |
model = genai.GenerativeModel("gemini-2.0-pro-exp-02-05") | |
# Supported languages | |
SUPPORTED_LANGUAGES = [ | |
"Auto Detect", "English", "Spanish", "French", "German", "Italian", | |
"Portuguese", "Russian", "Japanese", "Korean", "Arabic", "Hindi", | |
"Chinese", "Dutch", "Turkish", "Polish", "Vietnamese", "Thai" | |
] | |
# Magic Prompts | |
TRANSCRIPTION_PROMPT = """Generate precise subtitles with accurate timestamps: | |
1. Use [HH:MM:SS.ms -> HH:MM:SS.ms] format | |
2. Each subtitle 3-7 words | |
3. Include speaker changes | |
4. Preserve emotional tone | |
5. Example: | |
[00:00:05.250 -> 00:00:08.100] | |
Example subtitle text | |
Return ONLY subtitles with timestamps.""" | |
TRANSLATION_PROMPT = """Translate these subtitles to {target_language}: | |
1. Keep timestamps identical | |
2. Match text length to timing | |
3. Preserve technical terms | |
4. Use natural speech patterns | |
ORIGINAL: | |
{subtitles} | |
TRANSLATED:""" | |
def split_audio(audio_path, chunk_duration=60): | |
"""Split audio into smaller chunks (default: 60 seconds)""" | |
audio = AudioSegment.from_wav(audio_path) | |
chunks = [] | |
for i in range(0, len(audio), chunk_duration * 1000): | |
chunk = audio[i:i + chunk_duration * 1000] | |
chunk_path = os.path.join(tempfile.gettempdir(), f"chunk_{i//1000}.wav") | |
chunk.export(chunk_path, format="wav") | |
chunks.append(chunk_path) | |
return chunks | |
def process_audio_chunk(chunk_path, start_time): | |
"""Transcribe a single audio chunk""" | |
try: | |
# Upload file using Gemini's File API | |
uploaded_file = genai.upload_file(path=chunk_path) | |
# Get transcription | |
response = model.generate_content( | |
[TRANSCRIPTION_PROMPT, uploaded_file] | |
) | |
# Adjust timestamps relative to chunk start | |
adjusted_transcription = [] | |
for line in response.text.splitlines(): | |
if '->' in line: | |
start, end = line.split('->') | |
adjusted_start = parse_timestamp(start.strip()) + start_time | |
adjusted_end = parse_timestamp(end.strip()) + start_time | |
adjusted_line = f"[{format_timestamp(adjusted_start)} -> {format_timestamp(adjusted_end)}]" | |
adjusted_transcription.append(adjusted_line) | |
else: | |
adjusted_transcription.append(line) | |
return "\n".join(adjusted_transcription) | |
finally: | |
os.remove(chunk_path) | |
def parse_timestamp(timestamp_str): | |
"""Flexible timestamp parser""" | |
clean_ts = timestamp_str.strip("[] ").replace(',', '.') | |
parts = clean_ts.split(':') | |
seconds = 0.0 | |
if len(parts) == 3: # HH:MM:SS.ss | |
hours, minutes, seconds_part = parts | |
seconds += float(hours) * 3600 | |
elif len(parts) == 2: # MM:SS.ss | |
minutes, seconds_part = parts | |
else: | |
raise ValueError(f"Invalid timestamp: {timestamp_str}") | |
seconds += float(minutes) * 60 | |
seconds += float(seconds_part) | |
return seconds | |
def format_timestamp(seconds): | |
"""Convert seconds to SRT format""" | |
return str(timedelta(seconds=seconds)).replace('.', ',') | |
def create_srt(subtitles_text): | |
"""Convert raw transcription to SRT format""" | |
entries = re.split(r'\n{2,}', subtitles_text.strip()) | |
srt_output = [] | |
for idx, entry in enumerate(entries, 1): | |
try: | |
time_match = re.search( | |
r'\[?\s*((?:\d+:)?\d+:\d+[.,]\d{3})\s*->\s*((?:\d+:)?\d+:\d+[.,]\d{3})\s*\]?', | |
entry | |
) | |
if not time_match: | |
continue | |
start_time = parse_timestamp(time_match.group(1)) | |
end_time = parse_timestamp(time_match.group(2)) | |
text = entry.split(']', 1)[-1].strip() | |
srt_entry = ( | |
f"{idx}\n" | |
f"{format_timestamp(start_time)} --> {format_timestamp(end_time)}\n" | |
f"{text}\n" | |
) | |
srt_output.append(srt_entry) | |
except Exception as e: | |
print(f"Skipping invalid entry {idx}: {str(e)}") | |
continue | |
return "\n".join(srt_output) | |
def process_video(video_path, source_lang, target_lang): | |
"""Complete processing pipeline""" | |
try: | |
# Extract audio | |
audio_path = extract_audio(video_path) | |
# Split into chunks | |
chunks = split_audio(audio_path) | |
full_transcription = [] | |
# Process each chunk | |
for i, chunk_path in enumerate(chunks): | |
start_time = i * 60 # 60 seconds per chunk | |
chunk_transcription = process_audio_chunk(chunk_path, start_time) | |
full_transcription.append(chunk_transcription) | |
# Combine results | |
srt_original = create_srt("\n\n".join(full_transcription)) | |
# Save original subtitles | |
original_srt = os.path.join(tempfile.gettempdir(), "original.srt") | |
with open(original_srt, "w") as f: | |
f.write(srt_original) | |
# Translate if needed | |
translated_srt = None | |
if target_lang != "None": | |
translated_text = translate_subtitles(srt_original, target_lang) | |
translated_srt = os.path.join(tempfile.gettempdir(), "translated.srt") | |
with open(translated_srt, "w") as f: | |
f.write(create_srt(translated_text)) | |
return original_srt, translated_srt | |
except Exception as e: | |
print(f"Processing error: {str(e)}") | |
return None, None | |
finally: | |
if os.path.exists(audio_path): | |
os.remove(audio_path) | |
# Gradio Interface | |
with gr.Blocks(theme=gr.themes.Soft(), title="AI Subtitle Studio") as app: | |
gr.Markdown("# 🎬 Professional Subtitle Generator") | |
with gr.Row(): | |
video_input = gr.Video(label="Upload Video", sources=["upload"]) | |
with gr.Column(): | |
source_lang = gr.Dropdown( | |
label="Source Language", | |
choices=SUPPORTED_LANGUAGES, | |
value="Auto Detect" | |
) | |
target_lang = gr.Dropdown( | |
label="Translate To", | |
choices=["None"] + SUPPORTED_LANGUAGES[1:], | |
value="None" | |
) | |
process_btn = gr.Button("Generate", variant="primary") | |
with gr.Row(): | |
original_sub = gr.File(label="Original Subtitles") | |
translated_sub = gr.File(label="Translated Subtitles") | |
process_btn.click( | |
process_video, | |
inputs=[video_input, source_lang, target_lang], | |
outputs=[original_sub, translated_sub] | |
) | |
if __name__ == "__main__": | |
app.launch(server_port=7860, share=True) |