Spaces:
Running
on
Zero
Running
on
Zero
import os | |
import gradio as gr | |
import torch | |
import nemo.collections.asr as nemo_asr | |
from omegaconf import OmegaConf | |
import time | |
import spaces | |
# Check if CUDA is available | |
print(f"CUDA available: {torch.cuda.is_available()}") | |
if torch.cuda.is_available(): | |
print(f"CUDA device: {torch.cuda.get_device_name(0)}") | |
model = nemo_asr.models.EncDecRNNTBPEModel.from_pretrained("nvidia/parakeet-tdt-0.6b-v2") | |
print(f"Model loaded on device: {model.device}") | |
import numpy as np | |
import soundfile as sf | |
audio_buffer = [] | |
def transcribe(audio, state=""): | |
global model, audio_buffer | |
if audio is None or isinstance(audio, int): | |
print(f"Skipping invalid audio input: {type(audio)}") | |
return state, state | |
print(f"Received audio input of type: {type(audio)}") | |
print(f"Audio shape: {audio.shape if isinstance(audio, np.ndarray) else 'N/A'}") | |
# Append NumPy array to buffer | |
if isinstance(audio, tuple): | |
print(f"Tuple contents: {audio}") | |
# Try extracting the first element | |
audio = audio[1] if len(audio) > 1 else None | |
if not audio: | |
print("Empty tuple, skipping") | |
return state, state | |
if isinstance(audio, np.ndarray): | |
audio_buffer.append(audio) | |
# Process if buffer has enough data (e.g., 5 seconds at 16kHz) | |
if len(np.concatenate(audio_buffer)) >= 5 * 16000: | |
# Concatenate and preprocess | |
audio_data = np.concatenate(audio_buffer) | |
audio_data = audio_data.mean(axis=1) if audio_data.ndim > 1 else audio_data # To mono | |
temp_file = "temp_audio.wav" | |
sf.write(temp_file, audio_data, samplerate=16000) | |
print("Transcribing audio...") | |
# Transcribe | |
if torch.cuda.is_available(): | |
model = model.cuda() | |
transcription = model.transcribe([temp_file])[0] | |
print(f"Transcription: {transcription}") | |
model = model.cpu() | |
os.remove(temp_file) | |
print("Temporary file removed.") | |
# Clear buffer | |
audio_buffer = [] | |
new_state = state + " " + transcription if state else transcription | |
return new_state, new_state | |
return state, state | |
# Define the Gradio interface | |
with gr.Blocks(title="Real-time Speech-to-Text with NeMo") as demo: | |
gr.Markdown("# ποΈ Real-time Speech-to-Text Transcription") | |
gr.Markdown("Powered by NVIDIA NeMo and the parakeet-tdt-0.6b-v2 model") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
audio_input = gr.Audio( | |
sources=["microphone"], | |
type="numpy", | |
streaming=True, | |
label="Speak into your microphone" | |
) | |
clear_btn = gr.Button("Clear Transcript") | |
with gr.Column(scale=3): | |
text_output = gr.Textbox( | |
label="Transcription", | |
placeholder="Your speech will appear here...", | |
lines=10 | |
) | |
streaming_text = gr.Textbox( | |
label="Real-time Transcription", | |
placeholder="Real-time results will appear here...", | |
lines=2 | |
) | |
# State to store the ongoing transcription | |
state = gr.State("") | |
# Handle the audio stream | |
audio_input.stream( | |
fn=transcribe, | |
inputs=[audio_input, state], | |
outputs=[state, streaming_text], | |
) | |
# Clear the transcription | |
def clear_transcription(): | |
return "", "", "" | |
clear_btn.click( | |
fn=clear_transcription, | |
inputs=[], | |
outputs=[text_output, streaming_text, state] | |
) | |
# Update the main text output when the state changes | |
state.change( | |
fn=lambda s: s, | |
inputs=[state], | |
outputs=[text_output] | |
) | |
gr.Markdown("## π Instructions") | |
gr.Markdown(""" | |
1. Click the microphone button to start recording | |
2. Speak clearly into your microphone | |
3. The transcription will appear in real-time | |
4. Click 'Clear Transcript' to start a new transcription | |
""") | |
# Launch the app | |
if __name__ == "__main__": | |
demo.launch() | |