Spaces:
Running
Running
File size: 7,404 Bytes
1b97239 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
# Standard library imports
import os
import logging
import subprocess
from typing import Annotated
# Related third party imports
from pyannote.audio import Pipeline
logging.basicConfig(level=logging.INFO)
class DialogueDetecting:
"""
Class for detecting dialogue in audio files using speaker diarization.
This class processes audio files by dividing them into chunks, applying a
pre-trained speaker diarization model, and detecting if there are multiple
speakers in the audio.
Parameters
----------
pipeline_model : str, optional
Name of the pre-trained diarization model. Defaults to "pyannote/speaker-diarization".
chunk_duration : int, optional
Duration of each chunk in seconds. Defaults to 5.
sample_rate : int, optional
Sampling rate for the processed audio chunks. Defaults to 16000.
channels : int, optional
Number of audio channels. Defaults to 1.
delete_original : bool, optional
If True, deletes the original audio file when no dialogue is detected. Defaults to False.
skip_if_no_dialogue : bool, optional
If True, skips further processing if no dialogue is detected. Defaults to False.
temp_dir : str, optional
Directory for temporary chunk files. Defaults to ".temp".
Attributes
----------
pipeline : Pipeline
Instance of the PyAnnote pipeline for speaker diarization.
"""
def __init__(self,
pipeline_model: str = "pyannote/speaker-diarization",
chunk_duration: int = 5,
sample_rate: int = 16000,
channels: int = 1,
delete_original: bool = False,
skip_if_no_dialogue: bool = False,
temp_dir: str = ".temp"):
self.pipeline_model = pipeline_model
self.chunk_duration = chunk_duration
self.sample_rate = sample_rate
self.channels = channels
self.delete_original = delete_original
self.skip_if_no_dialogue = skip_if_no_dialogue
self.temp_dir = temp_dir
self.pipeline = Pipeline.from_pretrained(pipeline_model)
if not os.path.exists(self.temp_dir):
os.makedirs(self.temp_dir)
@staticmethod
def get_audio_duration(audio_file: Annotated[str, "Path to the audio file"]) -> Annotated[
float, "Duration of the audio in seconds"]:
"""
Get the duration of an audio file in seconds.
Parameters
----------
audio_file : str
Path to the audio file.
Returns
-------
float
Duration of the audio file in seconds.
Examples
--------
>>> DialogueDetecting.get_audio_duration("example.wav")
120.5
"""
result = subprocess.run(
["ffprobe", "-v", "error", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", audio_file],
capture_output=True, text=True, check=True
)
return float(result.stdout.strip())
def create_chunk(self, audio_file: str, chunk_file: str, start_time: float, end_time: float):
"""
Create a chunk of the audio file.
Parameters
----------
audio_file : str
Path to the original audio file.
chunk_file : str
Path to save the generated chunk file.
start_time : float
Start time of the chunk in seconds.
end_time : float
End time of the chunk in seconds.
"""
duration = end_time - start_time
subprocess.run([
"ffmpeg", "-y",
"-ss", str(start_time),
"-t", str(duration),
"-i", audio_file,
"-ar", str(self.sample_rate),
"-ac", str(self.channels),
"-f", "wav",
chunk_file
], check=True)
def process_chunk(self, chunk_file: Annotated[str, "Path to the chunk file"]) -> Annotated[
set, "Set of detected speaker labels"]:
"""
Process a single chunk of audio to detect speakers.
Parameters
----------
chunk_file : str
Path to the chunk file.
Returns
-------
set
Set of detected speaker labels in the chunk.
"""
diarization = self.pipeline(chunk_file)
speakers_in_chunk = set()
for segment, track, label in diarization.itertracks(yield_label=True):
speakers_in_chunk.add(label)
return speakers_in_chunk
def process(self, audio_file: Annotated[str, "Path to the input audio file"]) -> Annotated[
bool, "True if dialogue detected, False otherwise"]:
"""
Process the audio file to detect dialogue.
Parameters
----------
audio_file : str
Path to the audio file.
Returns
-------
bool
True if at least two speakers are detected, False otherwise.
Examples
--------
>>> dialogue_detector = DialogueDetecting()
>>> dialogue_detector.process("example.wav")
True
"""
total_duration = self.get_audio_duration(audio_file)
num_chunks = int(total_duration // self.chunk_duration) + 1
speakers_detected = set()
chunk_files = []
try:
for i in range(num_chunks):
start_time = i * self.chunk_duration
end_time = min(float((i + 1) * self.chunk_duration), total_duration)
if end_time - start_time < 1.0:
logging.info("Last chunk is too short to process.")
break
chunk_file = os.path.join(self.temp_dir, f"chunk_{i}.wav")
chunk_files.append(chunk_file)
logging.info(f"Creating chunk: {chunk_file}")
self.create_chunk(audio_file, chunk_file, start_time, end_time)
logging.info(f"Processing chunk: {chunk_file}")
chunk_speakers = self.process_chunk(chunk_file)
speakers_detected.update(chunk_speakers)
if len(speakers_detected) >= 2:
logging.info("At least two speakers detected, stopping.")
return True
if len(speakers_detected) < 2:
logging.info("No dialogue detected or only one speaker found.")
if self.delete_original:
logging.info(f"No dialogue found. Deleting original file: {audio_file}")
os.remove(audio_file)
if self.skip_if_no_dialogue:
logging.info("Skipping further processing due to lack of dialogue.")
return False
finally:
logging.info("Cleaning up temporary chunk files.")
for chunk_file in chunk_files:
if os.path.exists(chunk_file):
os.remove(chunk_file)
if os.path.exists(self.temp_dir) and not os.listdir(self.temp_dir):
os.rmdir(self.temp_dir)
return len(speakers_detected) >= 2
if __name__ == "__main__":
processor = DialogueDetecting(delete_original=True)
audio_path = ".data/example/kafkasya.mp3"
process_result = processor.process(audio_path)
print("Dialogue detected:", process_result)
|