Transcription / app.py
doyouknowmarc's picture
Init - create App.py
c2cfda7 verified
raw
history blame contribute delete
16.6 kB
import gradio as gr
import warnings
import torch
import os
import whisper
import ssl
import zipfile
from pydub import AudioSegment
from pydub.silence import detect_nonsilent
import subprocess
import tempfile
import time
ssl._create_default_https_context = ssl._create_unverified_context
def process_audio(
audio_paths,
remove_silence=False,
min_silence_len=500,
silence_thresh=-50,
enable_chunking=False,
chunk_duration=600,
ffmpeg_path="ffmpeg",
model_size="large-v3-turbo",
language="de"
):
try:
if not audio_paths:
return "No files selected.", "", None
# Clean up any existing temp directory at the start
temp_dir = "temp_processing"
if os.path.exists(temp_dir):
for file in os.listdir(temp_dir):
file_path = os.path.join(temp_dir, file)
try:
if os.path.isfile(file_path):
os.remove(file_path)
except Exception as e:
print(f"Error cleaning up {file_path}: {e}")
try:
os.rmdir(temp_dir)
except Exception as e:
print(f"Error removing temp directory: {e}")
# Create fresh temp directory with unique timestamp
temp_dir = f"temp_processing_{int(time.time())}"
os.makedirs(temp_dir, exist_ok=True)
processed_files = []
all_results = []
all_segments = []
all_txt_paths = []
try:
# Step 1: Process each audio file
for audio_path in audio_paths:
if not audio_path:
continue
current_file = audio_path
temp_files = []
# Step 1a: Split audio if chunking is enabled
if enable_chunking:
base_name = os.path.splitext(os.path.basename(current_file))[0]
output_pattern = os.path.join(temp_dir, f"{base_name}_part_%d.mp3")
cmd = [
ffmpeg_path, "-i", current_file,
"-f", "segment",
"-segment_time", str(chunk_duration),
"-c:a", "copy",
"-segment_start_number", "1",
output_pattern
]
subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
chunk_files = sorted([os.path.join(temp_dir, f) for f in os.listdir(temp_dir)
if f.startswith(f"{base_name}_part_")])
temp_files.extend(chunk_files)
else:
temp_files.append(current_file)
# Step 1b: Remove silence if requested
if remove_silence:
silence_removed_files = []
for file in temp_files:
audio = AudioSegment.from_file(file)
nonsilent = detect_nonsilent(
audio,
min_silence_len=min_silence_len,
silence_thresh=silence_thresh
)
output = AudioSegment.empty()
for start, end in nonsilent:
output += audio[start:end]
# Save the silence-removed file
silence_removed_path = os.path.join(temp_dir, f"silence_removed_{os.path.basename(file)}")
output.export(silence_removed_path, format="mp3")
silence_removed_files.append(silence_removed_path)
processed_files.extend(silence_removed_files)
else:
processed_files.extend(temp_files)
# Step 2: Transcribe all processed files
print(f"Loading Whisper model '{model_size}'...")
model = whisper.load_model(model_size, device="cpu")
for file in processed_files:
print(f"Transcribing: {file}")
warnings.filterwarnings("ignore", message="FP16 is not supported on CPU; using FP32 instead")
result = model.transcribe(file, fp16=False, language=language, temperature=0.0)
full_text = result["text"]
segments = ""
for segment in result["segments"]:
segments += f"[{segment['start']:.2f} - {segment['end']:.2f}]: {segment['text']}\n"
# Store transcript files in temp directory
txt_path = os.path.join(temp_dir, f"transcript_{os.path.splitext(os.path.basename(file))[0]}.txt")
with open(txt_path, "w", encoding="utf-8") as f:
f.write("=== Full Transcription ===\n\n")
f.write(full_text)
f.write("\n\n=== Segment-wise Transcription ===\n")
f.write(segments)
all_results.append(full_text)
all_segments.append(segments)
all_txt_paths.append(txt_path)
# Create combined transcript file in temp directory
combined_txt_path = os.path.join(temp_dir, "combined_transcripts.txt")
with open(combined_txt_path, "w", encoding="utf-8") as f:
f.write("=== Combined Transcriptions ===\n\n")
for i, (result, segment, path) in enumerate(zip(all_results, all_segments, all_txt_paths)):
filename = os.path.basename(processed_files[i])
f.write(f"File: {filename}\n")
f.write("=== Full Transcription ===\n")
f.write(result)
f.write("\n\n=== Segment-wise Transcription ===\n")
f.write(segment)
f.write("\n" + "-"*50 + "\n\n")
# Format display output
combined_results = "=== File Transcriptions ===\n\n"
combined_segments = "=== File Segments ===\n\n"
for i, (result, segment) in enumerate(zip(all_results, all_segments)):
filename = os.path.basename(processed_files[i])
combined_results += f"File: {filename}\n{result}\n\n"
combined_segments += f"File: {filename}\n{segment}\n\n"
# Create ZIP with all processed files and transcripts
zip_path = f"processed_files_and_transcripts_{int(time.time())}.zip"
cleanup_files = processed_files.copy()
with zipfile.ZipFile(zip_path, 'w') as zipf:
for file in processed_files:
if os.path.exists(file):
zipf.write(file, os.path.basename(file))
for txt_file in all_txt_paths:
if os.path.exists(txt_file):
zipf.write(txt_file)
if os.path.exists(combined_txt_path):
zipf.write(combined_txt_path)
# Cleanup files after ZIP creation
for file in cleanup_files:
if os.path.exists(file):
os.remove(file)
for txt_file in all_txt_paths:
if os.path.exists(txt_file):
os.remove(txt_file)
if os.path.exists(combined_txt_path):
os.remove(combined_txt_path)
# Clean up temp directory
if os.path.exists(temp_dir):
for file in os.listdir(temp_dir):
file_path = os.path.join(temp_dir, file)
if os.path.isfile(file_path):
os.remove(file_path)
os.rmdir(temp_dir)
return combined_results, combined_segments, zip_path
except Exception as inner_e:
print(f"Error during processing: {inner_e}")
raise inner_e
except Exception as e:
print(f"Error in process_audio: {e}")
if 'temp_dir' in locals() and os.path.exists(temp_dir):
try:
for file in os.listdir(temp_dir):
file_path = os.path.join(temp_dir, file)
if os.path.isfile(file_path):
os.remove(file_path)
os.rmdir(temp_dir)
except:
pass
return f"Error: {str(e)}", "", None
def create_interface():
with gr.Blocks(title="Interview Audio Processing App") as app:
gr.Markdown("""
# Audio Processing App
Upload audio files (MP3 or M4A) for processing and transcription.\\
Intended use case: transcription of interviews.
""")
with gr.Row():
with gr.Column():
audio_input = gr.File(
label="Upload Audio Files",
file_count="multiple",
type="filepath"
)
with gr.Group():
gr.Markdown("### Silence Removal Settings")
gr.Markdown(" Default settings are working very well. Silence removal helps to reduce hallucination.")
remove_silence = gr.Checkbox(
label="Remove Silence",
value=False
)
min_silence_len = gr.Slider(
minimum=100,
maximum=2000,
value=500,
step=100,
label="Minimum Silence Length (ms)",
visible=False
)
silence_thresh = gr.Slider(
minimum=-70,
maximum=-30,
value=-50,
step=5,
label="Silence Threshold (dB)",
visible=False
)
with gr.Group():
gr.Markdown("### Chunking Settings")
gr.Markdown(" Chunking reduces the load on the model. 10min chunks work really good.")
enable_chunking = gr.Checkbox(
label="Enable Chunking",
value=False
)
chunk_duration = gr.Slider(
minimum=60,
maximum=3600,
value=600,
step=60,
label="Chunk Duration (seconds)",
visible=False
)
ffmpeg_path = gr.Textbox(
label="FFmpeg Path",
value="ffmpeg",
placeholder="Path to ffmpeg executable",
visible=False
)
with gr.Group():
gr.Markdown("### Transcription Settings")
gr.Markdown(" tiny is the fastest, but the worst quality. Large-v3-turbo is the best, but slower.")
model_size = gr.Dropdown(
choices=["tiny", "base", "small", "medium", "large", "large-v2", "large-v3", "turbo", "large-v3-turbo"],
value="large-v3-turbo",
label="Whisper Model Size"
)
language = gr.Dropdown(
choices=["de", "en", "fr", "es", "it"],
value="de",
label="Language"
)
process_btn = gr.Button("Process", variant="primary")
delete_btn = gr.Button("Delete Everything", variant="stop")
with gr.Column():
full_transcription = gr.Textbox(label="Full Transcription", lines=15)
segmented_transcription = gr.Textbox(label="Segmented Transcription", lines=15)
download_output = gr.File(label="Download Processed Files and Transcripts (ZIP)")
def update_silence_controls(remove_silence):
return {
min_silence_len: gr.update(visible=remove_silence),
silence_thresh: gr.update(visible=remove_silence),
full_transcription: gr.update(value=""),
segmented_transcription: gr.update(value=""),
download_output: gr.update(value=None)
}
def update_chunking_controls(enable_chunking):
return {
chunk_duration: gr.update(visible=enable_chunking),
ffmpeg_path: gr.update(visible=enable_chunking),
full_transcription: gr.update(value=""),
segmented_transcription: gr.update(value=""),
download_output: gr.update(value=None)
}
remove_silence.change(
fn=update_silence_controls,
inputs=[remove_silence],
outputs=[
min_silence_len,
silence_thresh,
full_transcription,
segmented_transcription,
download_output
]
)
enable_chunking.change(
fn=update_chunking_controls,
inputs=[enable_chunking],
outputs=[
chunk_duration,
ffmpeg_path,
full_transcription,
segmented_transcription,
download_output
]
)
process_btn.click(
fn=process_audio,
inputs=[
audio_input,
remove_silence,
min_silence_len,
silence_thresh,
enable_chunking,
chunk_duration,
ffmpeg_path,
model_size,
language,
],
outputs=[
full_transcription,
segmented_transcription,
download_output,
]
)
# Add cleanup function
def cleanup_files():
try:
# Clean up temp directories
temp_dirs = [d for d in os.listdir('.') if d.startswith('temp_processing')]
for temp_dir in temp_dirs:
if os.path.exists(temp_dir):
for file in os.listdir(temp_dir):
file_path = os.path.join(temp_dir, file)
if os.path.isfile(file_path):
os.remove(file_path)
os.rmdir(temp_dir)
# Clean up ZIP files
zip_files = [f for f in os.listdir('.') if f.startswith('processed_files_and_transcripts_')]
for zip_file in zip_files:
if os.path.exists(zip_file):
os.remove(zip_file)
# Clean up transcript files
transcript_files = [f for f in os.listdir('.') if f.startswith('transcript_')]
for transcript_file in transcript_files:
if os.path.exists(transcript_file):
os.remove(transcript_file)
# Return updates for all output fields
return {
full_transcription: gr.update(value="All temporary files have been deleted."),
segmented_transcription: gr.update(value=""),
download_output: gr.update(value=None)
}
except Exception as e:
return {
full_transcription: gr.update(value=f"Error during cleanup: {str(e)}"),
segmented_transcription: gr.update(value=""),
download_output: gr.update(value=None)
}
# Update the delete button click handler
delete_btn.click(
fn=cleanup_files,
inputs=[],
outputs=[
full_transcription,
segmented_transcription,
download_output
]
)
return app
if __name__ == "__main__":
app = create_interface()
app.launch(share=False)