AutoSubGen / app.py
Athspi's picture
Update app.py
4620e6c verified
raw
history blame
7.06 kB
import os
import re
import google.generativeai as genai
from moviepy.video.io.VideoFileClip import VideoFileClip
import tempfile
import logging
import gradio as gr
from datetime import timedelta
from pydub import AudioSegment
# Suppress moviepy logs
logging.getLogger("moviepy").setLevel(logging.ERROR)
# Configure Gemini API
genai.configure(api_key=os.environ["GEMINI_API_KEY"])
model = genai.GenerativeModel("gemini-2.0-pro-exp-02-05")
# Supported languages
SUPPORTED_LANGUAGES = [
"Auto Detect", "English", "Spanish", "French", "German", "Italian",
"Portuguese", "Russian", "Japanese", "Korean", "Arabic", "Hindi",
"Chinese", "Dutch", "Turkish", "Polish", "Vietnamese", "Thai"
]
# Magic Prompts
TRANSCRIPTION_PROMPT = """Generate precise subtitles with accurate timestamps:
1. Use [HH:MM:SS.ms -> HH:MM:SS.ms] format
2. Each subtitle 3-7 words
3. Include speaker changes
4. Preserve emotional tone
5. Example:
[00:00:05.250 -> 00:00:08.100]
Example subtitle text
Return ONLY subtitles with timestamps."""
TRANSLATION_PROMPT = """Translate these subtitles to {target_language}:
1. Keep timestamps identical
2. Match text length to timing
3. Preserve technical terms
4. Use natural speech patterns
ORIGINAL:
{subtitles}
TRANSLATED:"""
def split_audio(audio_path, chunk_duration=60):
"""Split audio into smaller chunks (default: 60 seconds)"""
audio = AudioSegment.from_wav(audio_path)
chunks = []
for i in range(0, len(audio), chunk_duration * 1000):
chunk = audio[i:i + chunk_duration * 1000]
chunk_path = os.path.join(tempfile.gettempdir(), f"chunk_{i//1000}.wav")
chunk.export(chunk_path, format="wav")
chunks.append(chunk_path)
return chunks
def process_audio_chunk(chunk_path, start_time):
"""Transcribe a single audio chunk"""
try:
# Upload file using Gemini's File API
uploaded_file = genai.upload_file(path=chunk_path)
# Get transcription
response = model.generate_content(
[TRANSCRIPTION_PROMPT, uploaded_file]
)
# Adjust timestamps relative to chunk start
adjusted_transcription = []
for line in response.text.splitlines():
if '->' in line:
start, end = line.split('->')
adjusted_start = parse_timestamp(start.strip()) + start_time
adjusted_end = parse_timestamp(end.strip()) + start_time
adjusted_line = f"[{format_timestamp(adjusted_start)} -> {format_timestamp(adjusted_end)}]"
adjusted_transcription.append(adjusted_line)
else:
adjusted_transcription.append(line)
return "\n".join(adjusted_transcription)
finally:
os.remove(chunk_path)
def parse_timestamp(timestamp_str):
"""Flexible timestamp parser"""
clean_ts = timestamp_str.strip("[] ").replace(',', '.')
parts = clean_ts.split(':')
seconds = 0.0
if len(parts) == 3: # HH:MM:SS.ss
hours, minutes, seconds_part = parts
seconds += float(hours) * 3600
elif len(parts) == 2: # MM:SS.ss
minutes, seconds_part = parts
else:
raise ValueError(f"Invalid timestamp: {timestamp_str}")
seconds += float(minutes) * 60
seconds += float(seconds_part)
return seconds
def format_timestamp(seconds):
"""Convert seconds to SRT format"""
return str(timedelta(seconds=seconds)).replace('.', ',')
def create_srt(subtitles_text):
"""Convert raw transcription to SRT format"""
entries = re.split(r'\n{2,}', subtitles_text.strip())
srt_output = []
for idx, entry in enumerate(entries, 1):
try:
time_match = re.search(
r'\[?\s*((?:\d+:)?\d+:\d+[.,]\d{3})\s*->\s*((?:\d+:)?\d+:\d+[.,]\d{3})\s*\]?',
entry
)
if not time_match:
continue
start_time = parse_timestamp(time_match.group(1))
end_time = parse_timestamp(time_match.group(2))
text = entry.split(']', 1)[-1].strip()
srt_entry = (
f"{idx}\n"
f"{format_timestamp(start_time)} --> {format_timestamp(end_time)}\n"
f"{text}\n"
)
srt_output.append(srt_entry)
except Exception as e:
print(f"Skipping invalid entry {idx}: {str(e)}")
continue
return "\n".join(srt_output)
def process_video(video_path, source_lang, target_lang):
"""Complete processing pipeline"""
try:
# Extract audio
audio_path = extract_audio(video_path)
# Split into chunks
chunks = split_audio(audio_path)
full_transcription = []
# Process each chunk
for i, chunk_path in enumerate(chunks):
start_time = i * 60 # 60 seconds per chunk
chunk_transcription = process_audio_chunk(chunk_path, start_time)
full_transcription.append(chunk_transcription)
# Combine results
srt_original = create_srt("\n\n".join(full_transcription))
# Save original subtitles
original_srt = os.path.join(tempfile.gettempdir(), "original.srt")
with open(original_srt, "w") as f:
f.write(srt_original)
# Translate if needed
translated_srt = None
if target_lang != "None":
translated_text = translate_subtitles(srt_original, target_lang)
translated_srt = os.path.join(tempfile.gettempdir(), "translated.srt")
with open(translated_srt, "w") as f:
f.write(create_srt(translated_text))
return original_srt, translated_srt
except Exception as e:
print(f"Processing error: {str(e)}")
return None, None
finally:
if os.path.exists(audio_path):
os.remove(audio_path)
# Gradio Interface
with gr.Blocks(theme=gr.themes.Soft(), title="AI Subtitle Studio") as app:
gr.Markdown("# 🎬 Professional Subtitle Generator")
with gr.Row():
video_input = gr.Video(label="Upload Video", sources=["upload"])
with gr.Column():
source_lang = gr.Dropdown(
label="Source Language",
choices=SUPPORTED_LANGUAGES,
value="Auto Detect"
)
target_lang = gr.Dropdown(
label="Translate To",
choices=["None"] + SUPPORTED_LANGUAGES[1:],
value="None"
)
process_btn = gr.Button("Generate", variant="primary")
with gr.Row():
original_sub = gr.File(label="Original Subtitles")
translated_sub = gr.File(label="Translated Subtitles")
process_btn.click(
process_video,
inputs=[video_input, source_lang, target_lang],
outputs=[original_sub, translated_sub]
)
if __name__ == "__main__":
app.launch(server_port=7860, share=True)