Spaces:
Running
Running
File size: 16,267 Bytes
6c226f9 daf7f7b c5741b3 2f8a4bc a6eeb9b 020bd94 a1d6c0c 6c226f9 020bd94 2770d50 020bd94 a6eeb9b 6c226f9 c5741b3 a1d6c0c c5741b3 5476911 c5741b3 5476911 78fe717 5476911 c5741b3 2f8a4bc 5476911 2f8a4bc daf7f7b 7c39690 daf7f7b 4a6ebc4 020bd94 2e5323f 020bd94 f275e60 b61ca6d 2e5323f b61ca6d 020bd94 2e5323f 5476911 2e5323f b61ca6d 2e5323f 5476911 2e5323f b61ca6d 2e5323f b61ca6d 2e5323f b61ca6d 2e5323f f0fe708 2e5323f ac79ee4 020bd94 2e5323f 020bd94 2e5323f b61ca6d 2e5323f f0fe708 2e5323f b61ca6d 2e5323f 020bd94 9f81d50 1be1f18 2e5323f 020bd94 1be1f18 27431cc 2e5323f 822fd58 b61ca6d 4a6ebc4 2e5323f b61ca6d 2e5323f f0fe708 020bd94 2e5323f 020bd94 2e5323f b61ca6d 2e5323f 020bd94 b61ca6d 4a6ebc4 020bd94 5476911 0e36c34 5476911 0e36c34 020bd94 78fe717 5476911 1e5e969 5476911 2ba9b46 4a6ebc4 2ba9b46 a6eeb9b 4a6ebc4 7e06853 4a6ebc4 90b9a04 5476911 78fe717 5476911 06fa661 5476911 06fa661 5476911 06fa661 5476911 2e5323f 5476911 06fa661 5476911 06fa661 5476911 0cc77b1 2770d50 5476911 06fa661 5476911 67b74cb 06fa661 67b74cb 5476911 06fa661 5476911 06fa661 5476911 06fa661 a231b66 5476911 a6eeb9b 1e5e969 7c39690 1e5e969 6c226f9 2a49988 47407ef 6c226f9 22095b0 020bd94 8922f6e 020bd94 a231b66 020bd94 a231b66 020bd94 2e5323f 020bd94 6c226f9 3ce82e9 eb5510b c5741b3 8922f6e 3c0cd8e 478eee2 a231b66 c5741b3 a231b66 478eee2 0844bdd 3c0cd8e 0844bdd dbe4a4a 3c0cd8e 3ce82e9 eb5510b c5741b3 8922f6e 6c226f9 478eee2 a231b66 c5741b3 a231b66 478eee2 0844bdd 6c226f9 0844bdd dbe4a4a 6c226f9 020bd94 2e5323f 020bd94 6c226f9 7c39690 47407ef 7097513 78d9435 |
|
import gradio as gr
import subprocess
import datetime
import tempfile
import requests
import os
import time
from loguru import logger
# Load API keys from environment variables
API_URL = os.getenv("API_URL").rstrip('/')
SIEVE_API_KEY = os.getenv("SIEVE_API_KEY")
SIEVE_API_URL = "https://mango.sievedata.com/v2"
headers = {
"Accept": "application/json",
"Content-Type": "audio/flac"
}
def format_time(seconds):
"""Convert seconds to SRT time format (HH:MM:SS,mmm).
Args:
seconds (float): Time in seconds to convert.
Returns:
str: Time formatted as HH:MM:SS,mmm where:
- HH: Hours (00-99)
- MM: Minutes (00-59)
- SS: Seconds (00-59)
- mmm: Milliseconds (000-999)
Example:
>>> format_time(3661.5)
'01:01:01,500'
"""
td = datetime.timedelta(seconds=float(seconds))
hours = td.seconds // 3600
minutes = (td.seconds % 3600) // 60
seconds = td.seconds % 60
milliseconds = td.microseconds // 1000
return f"{hours:02d}:{minutes:02d}:{seconds:02d},{milliseconds:03d}"
def generate_srt(segments):
"""Generate SRT format subtitles from transcription segments."""
srt_content = []
for i, segment in enumerate(segments, 1):
start_time = format_time(segment["start_time"])
end_time = format_time(segment["end_time"])
text = segment.get("text", "").strip()
srt_content.append(f"{i}\n{start_time} --> {end_time}\n{text}\n\n")
return "".join(srt_content)
def save_srt_to_file(srt_content):
"""Save SRT content to a temporary file."""
if not srt_content:
return None
temp_file = tempfile.NamedTemporaryFile(suffix='.srt', delete=False)
temp_file.write(srt_content.encode('utf-8'))
temp_file.close()
return temp_file.name
# Check if ffmpeg is installed
def check_ffmpeg():
try:
subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True)
logger.info("ffmpeg check passed successfully")
except (subprocess.CalledProcessError, FileNotFoundError) as e:
logger.error(f"ffmpeg check failed: {str(e)}")
raise gr.Error("ffmpeg is not installed. Please install ffmpeg to use this application.")
# Initialize ffmpeg check
check_ffmpeg()
def get_youtube_audio_url(url):
"""Get audio URL from YouTube using Sieve API."""
if not SIEVE_API_KEY:
raise gr.Error("SIEVE_API_KEY environment variable is not set")
try:
payload = {
"function": "sieve/youtube-downloader",
"inputs": {
"url": url,
"download_type": "audio",
"audio_format": "mp3",
"start_time": 0,
"end_time": -1
}
}
# Send request to Sieve API with retries
max_retries = 3
retry_delay = 5
for attempt in range(max_retries):
try:
logger.info(f"Sending request to Sieve API (attempt {attempt + 1}/{max_retries})")
response = requests.post(
f"{SIEVE_API_URL}/push",
headers={"X-API-Key": SIEVE_API_KEY, "Content-Type": "application/json"},
json=payload,
timeout=1800
)
response.raise_for_status()
response_data = response.json()
logger.info(f"Sieve API response: {response_data}")
job_id = response_data.get("id")
if not job_id:
if attempt < max_retries - 1:
logger.warning(f"No job ID received, retrying in {retry_delay} seconds")
time.sleep(retry_delay)
continue
raise gr.Error("Failed to get job ID from Sieve API")
break
except requests.exceptions.RequestException as e:
if attempt < max_retries - 1:
logger.warning(f"Request failed: {str(e)}, retrying in {retry_delay} seconds")
time.sleep(retry_delay)
continue
raise
# Poll for job completion
poll_count = 0
max_polls = 1800
while True:
poll_count += 1
try:
logger.info(f"Polling job status (attempt {poll_count}/{max_polls})")
job_response = requests.get(
f"{SIEVE_API_URL}/jobs/{job_id}",
headers={"X-API-Key": SIEVE_API_KEY},
timeout=1800,
)
job_response.raise_for_status()
job_data = job_response.json()
logger.info(f"Job response: {job_data}")
status = job_data.get("status")
if status == "completed" or status == "finished":
output_data = job_data.get("outputs", [])
if not output_data:
raise gr.Error("No output data in job response")
first_output = output_data[0]
if not isinstance(first_output, dict):
raise gr.Error("Unexpected output format from job response")
output_data = first_output.get("data", {})
if not isinstance(output_data, dict):
raise gr.Error("Unexpected data format from job response")
audio_url = output_data.get("url")
if not audio_url:
raise gr.Error("No audio URL in output data")
if not audio_url.startswith(('http://', 'https://')):
raise gr.Error(f"Invalid audio URL scheme: {audio_url}")
logger.info(f"Successfully got audio URL: {audio_url}")
return audio_url
elif status == "failed":
error_msg = job_data.get("error", "Unknown error")
logger.error(f"Job failed: {error_msg}")
raise gr.Error(f"Job failed: {error_msg}")
if poll_count >= max_polls:
raise gr.Error("Download took too long. Please try again or check if the video is accessible.")
time.sleep(2)
except requests.exceptions.RequestException as e:
if poll_count >= max_polls:
raise gr.Error("Failed to check job status. Please try again.")
logger.warning(f"Request failed: {str(e)}, retrying in 2 seconds")
time.sleep(2)
except Exception as e:
logger.exception(f"Error during YouTube URL fetch: {str(e)}")
raise gr.Error(f"Failed to get YouTube audio URL: {str(e)}")
def check_api_health():
"""Check if the API is healthy before making requests."""
max_retries = 5
retry_delay = 10 # seconds
last_error = None
for attempt in range(max_retries):
try:
logger.info(f"Performing API health check (attempt {attempt + 1}/{max_retries})...")
response = requests.get(f"{API_URL}/health")
response.raise_for_status()
health_data = response.json()
# Check if service is healthy
if health_data.get("status") != "healthy":
raise gr.Error("API service is not healthy. Please try again later.")
# Check resource usage
cpu_percent = health_data.get("cpu_percent", 0)
memory_percent = health_data.get("memory_percent", 0)
if cpu_percent > 90 or memory_percent > 90:
logger.warning(f"High resource usage detected - CPU: {cpu_percent}%, Memory: {memory_percent}%")
logger.info("API health check passed successfully")
return True
except requests.exceptions.RequestException as e:
last_error = str(e)
logger.warning(f"Health check attempt {attempt + 1} failed: {last_error}")
if attempt < max_retries - 1:
logger.info(f"Waiting {retry_delay} seconds before next attempt...")
time.sleep(retry_delay)
continue
logger.error(f"All health check attempts failed. Last error: {last_error}")
raise gr.Error(f"Failed to connect to the API service after {max_retries} attempts. Please try again later.")
def transcribe_youtube(url, return_timestamps, generate_subs, chunk_length_s=15, batch_size=8):
"""Transcribe audio from YouTube video using URL endpoint."""
try:
# Check API health first
check_api_health()
# Validate URL scheme
if not url.startswith(('http://', 'https://')):
raise gr.Error("URL must start with http:// or https://")
# Get audio URL from Sieve
audio_url = get_youtube_audio_url(url)
# Send request to API
response = requests.post(
f"{API_URL}/transcribe/url",
json={
"url": audio_url,
"timestamp_level": "sentence" if return_timestamps else None,
"task": "transcribe",
"chunk_length_s": chunk_length_s,
"batch_size": batch_size,
"source_language": "tg" # Add source language parameter
},
timeout=1800
)
response.raise_for_status()
result = response.json()
# Generate subtitles if requested
srt_file = None
if generate_subs and return_timestamps and "segments" in result["transcription"]:
srt_content = generate_srt(result["transcription"]["segments"])
srt_file = save_srt_to_file(srt_content)
return result, srt_file, ""
except Exception as e:
raise gr.Error(f"Failed to transcribe YouTube video: {str(e)}")
def transcribe(inputs, return_timestamps, generate_subs, chunk_length_s=15, batch_size=8):
"""Transcribe audio input using Whisper API."""
logger.info(f"Starting transcription process for file: {inputs}")
logger.info(f"Parameters - return_timestamps: {return_timestamps}, generate_subs: {generate_subs}, chunk_length_s: {chunk_length_s}, batch_size: {batch_size}")
if inputs is None:
logger.error("No audio file submitted")
raise gr.Error("No audio file submitted! Please upload or record an audio file before submitting your request.")
try:
# Check API health first
logger.info("Performing API health check...")
check_api_health()
# Read the audio file
logger.info(f"Reading audio file: {inputs}")
with open(inputs, "rb") as f:
files = {"file": f}
# Send request to API
logger.info("Sending transcription request to API...")
response = requests.post(
f"{API_URL}/transcribe",
files=files,
data={
"timestamp_level": "sentence" if return_timestamps else None,
"task": "transcribe",
"chunk_length_s": chunk_length_s, # Send as integer
"batch_size": batch_size # Send as integer
},
timeout=1800
)
response.raise_for_status()
result = response.json()
logger.info("Successfully received response from API")
# Log metadata
metadata = result.get("metadata", {})
logger.info(f"Transcription metadata: {metadata}")
logger.info(f"Transcription completed in {metadata.get('timing', {}).get('total_time', 0):.2f} seconds")
# Generate subtitles if requested
srt_file = None
if generate_subs and return_timestamps and "segments" in result["transcription"]:
logger.info("Generating SRT subtitles...")
srt_content = generate_srt(result["transcription"]["segments"])
srt_file = save_srt_to_file(srt_content)
logger.info(f"Generated SRT file: {srt_file}")
logger.info("Transcription process completed successfully")
return result, srt_file, ""
except requests.exceptions.RequestException as e:
logger.exception(f"API request failed: {str(e)}")
raise gr.Error(f"Failed to transcribe audio: API request failed - {str(e)}")
except Exception as e:
logger.exception(f"Error during transcription: {str(e)}")
raise gr.Error(f"Failed to transcribe audio: {str(e)}")
demo = gr.Blocks(theme=gr.themes.Ocean())
# Define interfaces first
youtube_transcribe = gr.Interface(
fn=transcribe_youtube,
inputs=[
gr.Textbox(label="YouTube URL", placeholder="https://www.youtube.com/watch?v=..."),
gr.Checkbox(label="Include timestamps", value=True),
gr.Checkbox(label="Generate subtitles", value=True),
gr.Slider(minimum=5, maximum=30, value=5, step=5, label="Chunk Length (seconds)"),
gr.Slider(minimum=8, maximum=128, value=64, step=8, label="Batch Size")
],
outputs=[
gr.JSON(label="API Response", open=True),
gr.File(label="Subtitles (SRT)", visible=True),
gr.Textbox(label="Error", visible=False)
],
title="Tajik Speech Transcription",
description=(
"Transcribe Tajik language audio from YouTube videos. "
"Paste a YouTube URL and get accurate transcription with optional timestamps "
"and subtitles.\n\n"
"⚠️ Note: YouTube downloads may occasionally fail due to YouTube's restrictions "
"or temporary service issues. If this happens, please try again in a few minutes "
"or use the audio file upload option instead."
)
)
mf_transcribe = gr.Interface(
fn=transcribe,
inputs=[
gr.Audio(sources="microphone", type="filepath"),
gr.Checkbox(label="Include timestamps", value=True),
gr.Checkbox(label="Generate subtitles", value=True),
gr.Slider(minimum=5, maximum=30, value=5, step=5, label="Chunk Length (seconds)"),
gr.Slider(minimum=8, maximum=128, value=64, step=8, label="Batch Size")
],
outputs=[
gr.JSON(label="API Response", open=True),
gr.File(label="Subtitles (SRT)", visible=True),
gr.Textbox(label="Error", visible=False)
],
title="Tajik Speech Transcription",
description=(
"Transcribe Tajik language audio from microphone or file upload. "
"Perfect for transcribing Tajik podcasts, interviews, and conversations. "
"Supports both microphone recording and file uploads."
)
)
file_transcribe = gr.Interface(
fn=transcribe,
inputs=[
gr.Audio(sources="upload", type="filepath", label="Audio file"),
gr.Checkbox(label="Include timestamps", value=True),
gr.Checkbox(label="Generate subtitles", value=True),
gr.Slider(minimum=5, maximum=30, value=5, step=5, label="Chunk Length (seconds)"),
gr.Slider(minimum=8, maximum=128, value=64, step=8, label="Batch Size")
],
outputs=[
gr.JSON(label="API Response", open=True),
gr.File(label="Subtitles (SRT)", visible=True),
gr.Textbox(label="Error", visible=False)
],
title="Tajik Speech Transcription",
description=(
"Transcribe Tajik language audio files. "
"Upload your audio file and get accurate transcription with optional timestamps "
"and subtitles. Supports various audio formats."
)
)
with demo:
gr.TabbedInterface(
[file_transcribe, mf_transcribe, youtube_transcribe],
["Audio file", "Microphone", "YouTube"]
)
logger.info("Starting Gradio interface")
demo.queue().launch(ssr_mode=False)
|