Spaces:
Running
Running
import gradio as gr | |
import subprocess | |
import datetime | |
import tempfile | |
import requests | |
import os | |
import time | |
from loguru import logger | |
# Load API keys from environment variables | |
API_URL = os.getenv("API_URL").rstrip('/') | |
SIEVE_API_KEY = os.getenv("SIEVE_API_KEY") | |
SIEVE_API_URL = "https://mango.sievedata.com/v2" | |
headers = { | |
"Accept": "application/json", | |
"Content-Type": "audio/flac" | |
} | |
def format_time(seconds): | |
"""Convert seconds to SRT time format (HH:MM:SS,mmm). | |
Args: | |
seconds (float): Time in seconds to convert. | |
Returns: | |
str: Time formatted as HH:MM:SS,mmm where: | |
- HH: Hours (00-99) | |
- MM: Minutes (00-59) | |
- SS: Seconds (00-59) | |
- mmm: Milliseconds (000-999) | |
Example: | |
>>> format_time(3661.5) | |
'01:01:01,500' | |
""" | |
td = datetime.timedelta(seconds=float(seconds)) | |
hours = td.seconds // 3600 | |
minutes = (td.seconds % 3600) // 60 | |
seconds = td.seconds % 60 | |
milliseconds = td.microseconds // 1000 | |
return f"{hours:02d}:{minutes:02d}:{seconds:02d},{milliseconds:03d}" | |
def generate_srt(segments): | |
"""Generate SRT format subtitles from transcription segments.""" | |
srt_content = [] | |
for i, segment in enumerate(segments, 1): | |
start_time = format_time(segment["start_time"]) | |
end_time = format_time(segment["end_time"]) | |
text = segment.get("text", "").strip() | |
srt_content.append(f"{i}\n{start_time} --> {end_time}\n{text}\n\n") | |
return "".join(srt_content) | |
def save_srt_to_file(srt_content): | |
"""Save SRT content to a temporary file.""" | |
if not srt_content: | |
return None | |
temp_file = tempfile.NamedTemporaryFile(suffix='.srt', delete=False) | |
temp_file.write(srt_content.encode('utf-8')) | |
temp_file.close() | |
return temp_file.name | |
# Check if ffmpeg is installed | |
def check_ffmpeg(): | |
try: | |
subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True) | |
logger.info("ffmpeg check passed successfully") | |
except (subprocess.CalledProcessError, FileNotFoundError) as e: | |
logger.error(f"ffmpeg check failed: {str(e)}") | |
raise gr.Error("ffmpeg is not installed. Please install ffmpeg to use this application.") | |
# Initialize ffmpeg check | |
check_ffmpeg() | |
def get_youtube_audio_url(url): | |
"""Get audio URL from YouTube using Sieve API.""" | |
if not SIEVE_API_KEY: | |
raise gr.Error("SIEVE_API_KEY environment variable is not set") | |
try: | |
payload = { | |
"function": "sieve/youtube-downloader", | |
"inputs": { | |
"url": url, | |
"download_type": "audio", | |
"audio_format": "mp3", | |
"start_time": 0, | |
"end_time": -1 | |
} | |
} | |
# Send request to Sieve API with retries | |
max_retries = 3 | |
retry_delay = 5 | |
for attempt in range(max_retries): | |
try: | |
logger.info(f"Sending request to Sieve API (attempt {attempt + 1}/{max_retries})") | |
response = requests.post( | |
f"{SIEVE_API_URL}/push", | |
headers={"X-API-Key": SIEVE_API_KEY, "Content-Type": "application/json"}, | |
json=payload, | |
timeout=1800 | |
) | |
response.raise_for_status() | |
response_data = response.json() | |
logger.info(f"Sieve API response: {response_data}") | |
job_id = response_data.get("id") | |
if not job_id: | |
if attempt < max_retries - 1: | |
logger.warning(f"No job ID received, retrying in {retry_delay} seconds") | |
time.sleep(retry_delay) | |
continue | |
raise gr.Error("Failed to get job ID from Sieve API") | |
break | |
except requests.exceptions.RequestException as e: | |
if attempt < max_retries - 1: | |
logger.warning(f"Request failed: {str(e)}, retrying in {retry_delay} seconds") | |
time.sleep(retry_delay) | |
continue | |
raise | |
# Poll for job completion | |
poll_count = 0 | |
max_polls = 1800 | |
while True: | |
poll_count += 1 | |
try: | |
logger.info(f"Polling job status (attempt {poll_count}/{max_polls})") | |
job_response = requests.get( | |
f"{SIEVE_API_URL}/jobs/{job_id}", | |
headers={"X-API-Key": SIEVE_API_KEY}, | |
timeout=1800, | |
) | |
job_response.raise_for_status() | |
job_data = job_response.json() | |
logger.info(f"Job response: {job_data}") | |
status = job_data.get("status") | |
if status == "completed" or status == "finished": | |
output_data = job_data.get("outputs", []) | |
if not output_data: | |
raise gr.Error("No output data in job response") | |
first_output = output_data[0] | |
if not isinstance(first_output, dict): | |
raise gr.Error("Unexpected output format from job response") | |
output_data = first_output.get("data", {}) | |
if not isinstance(output_data, dict): | |
raise gr.Error("Unexpected data format from job response") | |
audio_url = output_data.get("url") | |
if not audio_url: | |
raise gr.Error("No audio URL in output data") | |
if not audio_url.startswith(('http://', 'https://')): | |
raise gr.Error(f"Invalid audio URL scheme: {audio_url}") | |
logger.info(f"Successfully got audio URL: {audio_url}") | |
return audio_url | |
elif status == "failed": | |
error_msg = job_data.get("error", "Unknown error") | |
logger.error(f"Job failed: {error_msg}") | |
raise gr.Error(f"Job failed: {error_msg}") | |
if poll_count >= max_polls: | |
raise gr.Error("Download took too long. Please try again or check if the video is accessible.") | |
time.sleep(2) | |
except requests.exceptions.RequestException as e: | |
if poll_count >= max_polls: | |
raise gr.Error("Failed to check job status. Please try again.") | |
logger.warning(f"Request failed: {str(e)}, retrying in 2 seconds") | |
time.sleep(2) | |
except Exception as e: | |
logger.exception(f"Error during YouTube URL fetch: {str(e)}") | |
raise gr.Error(f"Failed to get YouTube audio URL: {str(e)}") | |
def check_api_health(): | |
"""Check if the API is healthy before making requests.""" | |
max_retries = 5 | |
retry_delay = 10 # seconds | |
last_error = None | |
for attempt in range(max_retries): | |
try: | |
logger.info(f"Performing API health check (attempt {attempt + 1}/{max_retries})...") | |
response = requests.get(f"{API_URL}/health") | |
response.raise_for_status() | |
health_data = response.json() | |
# Check if service is healthy | |
if health_data.get("status") != "healthy": | |
raise gr.Error("API service is not healthy. Please try again later.") | |
# Check resource usage | |
cpu_percent = health_data.get("cpu_percent", 0) | |
memory_percent = health_data.get("memory_percent", 0) | |
if cpu_percent > 90 or memory_percent > 90: | |
logger.warning(f"High resource usage detected - CPU: {cpu_percent}%, Memory: {memory_percent}%") | |
logger.info("API health check passed successfully") | |
return True | |
except requests.exceptions.RequestException as e: | |
last_error = str(e) | |
logger.warning(f"Health check attempt {attempt + 1} failed: {last_error}") | |
if attempt < max_retries - 1: | |
logger.info(f"Waiting {retry_delay} seconds before next attempt...") | |
time.sleep(retry_delay) | |
continue | |
logger.error(f"All health check attempts failed. Last error: {last_error}") | |
raise gr.Error(f"Failed to connect to the API service after {max_retries} attempts. Please try again later.") | |
def transcribe_youtube(url, return_timestamps, generate_subs, chunk_length_s=15, batch_size=8): | |
"""Transcribe audio from YouTube video using URL endpoint.""" | |
try: | |
# Check API health first | |
check_api_health() | |
# Validate URL scheme | |
if not url.startswith(('http://', 'https://')): | |
raise gr.Error("URL must start with http:// or https://") | |
# Get audio URL from Sieve | |
audio_url = get_youtube_audio_url(url) | |
# Send request to API | |
response = requests.post( | |
f"{API_URL}/transcribe/url", | |
json={ | |
"url": audio_url, | |
"timestamp_level": "sentence" if return_timestamps else None, | |
"task": "transcribe", | |
"chunk_length_s": chunk_length_s, | |
"batch_size": batch_size, | |
"source_language": "tg" # Add source language parameter | |
}, | |
timeout=1800 | |
) | |
response.raise_for_status() | |
result = response.json() | |
# Generate subtitles if requested | |
srt_file = None | |
if generate_subs and return_timestamps and "segments" in result["transcription"]: | |
srt_content = generate_srt(result["transcription"]["segments"]) | |
srt_file = save_srt_to_file(srt_content) | |
return result, srt_file, "" | |
except Exception as e: | |
raise gr.Error(f"Failed to transcribe YouTube video: {str(e)}") | |
def transcribe(inputs, return_timestamps, generate_subs, chunk_length_s=15, batch_size=8): | |
"""Transcribe audio input using Whisper API.""" | |
logger.info(f"Starting transcription process for file: {inputs}") | |
logger.info(f"Parameters - return_timestamps: {return_timestamps}, generate_subs: {generate_subs}, chunk_length_s: {chunk_length_s}, batch_size: {batch_size}") | |
if inputs is None: | |
logger.error("No audio file submitted") | |
raise gr.Error("No audio file submitted! Please upload or record an audio file before submitting your request.") | |
try: | |
# Check API health first | |
logger.info("Performing API health check...") | |
check_api_health() | |
# Read the audio file | |
logger.info(f"Reading audio file: {inputs}") | |
with open(inputs, "rb") as f: | |
files = {"file": f} | |
# Send request to API | |
logger.info("Sending transcription request to API...") | |
response = requests.post( | |
f"{API_URL}/transcribe", | |
files=files, | |
data={ | |
"timestamp_level": "sentence" if return_timestamps else None, | |
"task": "transcribe", | |
"chunk_length_s": chunk_length_s, # Send as integer | |
"batch_size": batch_size # Send as integer | |
}, | |
timeout=1800 | |
) | |
response.raise_for_status() | |
result = response.json() | |
logger.info("Successfully received response from API") | |
# Log metadata | |
metadata = result.get("metadata", {}) | |
logger.info(f"Transcription metadata: {metadata}") | |
logger.info(f"Transcription completed in {metadata.get('timing', {}).get('total_time', 0):.2f} seconds") | |
# Generate subtitles if requested | |
srt_file = None | |
if generate_subs and return_timestamps and "segments" in result["transcription"]: | |
logger.info("Generating SRT subtitles...") | |
srt_content = generate_srt(result["transcription"]["segments"]) | |
srt_file = save_srt_to_file(srt_content) | |
logger.info(f"Generated SRT file: {srt_file}") | |
logger.info("Transcription process completed successfully") | |
return result, srt_file, "" | |
except requests.exceptions.RequestException as e: | |
logger.exception(f"API request failed: {str(e)}") | |
raise gr.Error(f"Failed to transcribe audio: API request failed - {str(e)}") | |
except Exception as e: | |
logger.exception(f"Error during transcription: {str(e)}") | |
raise gr.Error(f"Failed to transcribe audio: {str(e)}") | |
demo = gr.Blocks(theme=gr.themes.Ocean()) | |
# Define interfaces first | |
youtube_transcribe = gr.Interface( | |
fn=transcribe_youtube, | |
inputs=[ | |
gr.Textbox(label="YouTube URL", placeholder="https://www.youtube.com/watch?v=..."), | |
gr.Checkbox(label="Include timestamps", value=True), | |
gr.Checkbox(label="Generate subtitles", value=True), | |
gr.Slider(minimum=5, maximum=30, value=5, step=5, label="Chunk Length (seconds)"), | |
gr.Slider(minimum=8, maximum=128, value=64, step=8, label="Batch Size") | |
], | |
outputs=[ | |
gr.JSON(label="API Response", open=True), | |
gr.File(label="Subtitles (SRT)", visible=True), | |
gr.Textbox(label="Error", visible=False) | |
], | |
title="Tajik Speech Transcription", | |
description=( | |
"Transcribe Tajik language audio from YouTube videos. " | |
"Paste a YouTube URL and get accurate transcription with optional timestamps " | |
"and subtitles.\n\n" | |
"⚠️ Note: YouTube downloads may occasionally fail due to YouTube's restrictions " | |
"or temporary service issues. If this happens, please try again in a few minutes " | |
"or use the audio file upload option instead." | |
) | |
) | |
mf_transcribe = gr.Interface( | |
fn=transcribe, | |
inputs=[ | |
gr.Audio(sources="microphone", type="filepath"), | |
gr.Checkbox(label="Include timestamps", value=True), | |
gr.Checkbox(label="Generate subtitles", value=True), | |
gr.Slider(minimum=5, maximum=30, value=5, step=5, label="Chunk Length (seconds)"), | |
gr.Slider(minimum=8, maximum=128, value=64, step=8, label="Batch Size") | |
], | |
outputs=[ | |
gr.JSON(label="API Response", open=True), | |
gr.File(label="Subtitles (SRT)", visible=True), | |
gr.Textbox(label="Error", visible=False) | |
], | |
title="Tajik Speech Transcription", | |
description=( | |
"Transcribe Tajik language audio from microphone or file upload. " | |
"Perfect for transcribing Tajik podcasts, interviews, and conversations. " | |
"Supports both microphone recording and file uploads." | |
) | |
) | |
file_transcribe = gr.Interface( | |
fn=transcribe, | |
inputs=[ | |
gr.Audio(sources="upload", type="filepath", label="Audio file"), | |
gr.Checkbox(label="Include timestamps", value=True), | |
gr.Checkbox(label="Generate subtitles", value=True), | |
gr.Slider(minimum=5, maximum=30, value=5, step=5, label="Chunk Length (seconds)"), | |
gr.Slider(minimum=8, maximum=128, value=64, step=8, label="Batch Size") | |
], | |
outputs=[ | |
gr.JSON(label="API Response", open=True), | |
gr.File(label="Subtitles (SRT)", visible=True), | |
gr.Textbox(label="Error", visible=False) | |
], | |
title="Tajik Speech Transcription", | |
description=( | |
"Transcribe Tajik language audio files. " | |
"Upload your audio file and get accurate transcription with optional timestamps " | |
"and subtitles. Supports various audio formats." | |
) | |
) | |
with demo: | |
gr.TabbedInterface( | |
[file_transcribe, mf_transcribe, youtube_transcribe], | |
["Audio file", "Microphone", "YouTube"] | |
) | |
logger.info("Starting Gradio interface") | |
demo.queue().launch(ssr_mode=False) | |