Spaces:
Running
Running
# Standard library imports | |
import os | |
import logging | |
import subprocess | |
from typing import Annotated | |
# Related third party imports | |
from pyannote.audio import Pipeline | |
logging.basicConfig(level=logging.INFO) | |
class DialogueDetecting: | |
""" | |
Class for detecting dialogue in audio files using speaker diarization. | |
This class processes audio files by dividing them into chunks, applying a | |
pre-trained speaker diarization model, and detecting if there are multiple | |
speakers in the audio. | |
Parameters | |
---------- | |
pipeline_model : str, optional | |
Name of the pre-trained diarization model. Defaults to "pyannote/speaker-diarization". | |
chunk_duration : int, optional | |
Duration of each chunk in seconds. Defaults to 5. | |
sample_rate : int, optional | |
Sampling rate for the processed audio chunks. Defaults to 16000. | |
channels : int, optional | |
Number of audio channels. Defaults to 1. | |
delete_original : bool, optional | |
If True, deletes the original audio file when no dialogue is detected. Defaults to False. | |
skip_if_no_dialogue : bool, optional | |
If True, skips further processing if no dialogue is detected. Defaults to False. | |
temp_dir : str, optional | |
Directory for temporary chunk files. Defaults to ".temp". | |
Attributes | |
---------- | |
pipeline : Pipeline | |
Instance of the PyAnnote pipeline for speaker diarization. | |
""" | |
def __init__(self, | |
pipeline_model: str = "pyannote/speaker-diarization", | |
chunk_duration: int = 5, | |
sample_rate: int = 16000, | |
channels: int = 1, | |
delete_original: bool = False, | |
skip_if_no_dialogue: bool = False, | |
temp_dir: str = ".temp"): | |
self.pipeline_model = pipeline_model | |
self.chunk_duration = chunk_duration | |
self.sample_rate = sample_rate | |
self.channels = channels | |
self.delete_original = delete_original | |
self.skip_if_no_dialogue = skip_if_no_dialogue | |
self.temp_dir = temp_dir | |
self.pipeline = Pipeline.from_pretrained(pipeline_model) | |
if not os.path.exists(self.temp_dir): | |
os.makedirs(self.temp_dir) | |
def get_audio_duration(audio_file: Annotated[str, "Path to the audio file"]) -> Annotated[ | |
float, "Duration of the audio in seconds"]: | |
""" | |
Get the duration of an audio file in seconds. | |
Parameters | |
---------- | |
audio_file : str | |
Path to the audio file. | |
Returns | |
------- | |
float | |
Duration of the audio file in seconds. | |
Examples | |
-------- | |
>>> DialogueDetecting.get_audio_duration("example.wav") | |
120.5 | |
""" | |
result = subprocess.run( | |
["ffprobe", "-v", "error", "-show_entries", "format=duration", | |
"-of", "default=noprint_wrappers=1:nokey=1", audio_file], | |
capture_output=True, text=True, check=True | |
) | |
return float(result.stdout.strip()) | |
def create_chunk(self, audio_file: str, chunk_file: str, start_time: float, end_time: float): | |
""" | |
Create a chunk of the audio file. | |
Parameters | |
---------- | |
audio_file : str | |
Path to the original audio file. | |
chunk_file : str | |
Path to save the generated chunk file. | |
start_time : float | |
Start time of the chunk in seconds. | |
end_time : float | |
End time of the chunk in seconds. | |
""" | |
duration = end_time - start_time | |
subprocess.run([ | |
"ffmpeg", "-y", | |
"-ss", str(start_time), | |
"-t", str(duration), | |
"-i", audio_file, | |
"-ar", str(self.sample_rate), | |
"-ac", str(self.channels), | |
"-f", "wav", | |
chunk_file | |
], check=True) | |
def process_chunk(self, chunk_file: Annotated[str, "Path to the chunk file"]) -> Annotated[ | |
set, "Set of detected speaker labels"]: | |
""" | |
Process a single chunk of audio to detect speakers. | |
Parameters | |
---------- | |
chunk_file : str | |
Path to the chunk file. | |
Returns | |
------- | |
set | |
Set of detected speaker labels in the chunk. | |
""" | |
diarization = self.pipeline(chunk_file) | |
speakers_in_chunk = set() | |
for segment, track, label in diarization.itertracks(yield_label=True): | |
speakers_in_chunk.add(label) | |
return speakers_in_chunk | |
def process(self, audio_file: Annotated[str, "Path to the input audio file"]) -> Annotated[ | |
bool, "True if dialogue detected, False otherwise"]: | |
""" | |
Process the audio file to detect dialogue. | |
Parameters | |
---------- | |
audio_file : str | |
Path to the audio file. | |
Returns | |
------- | |
bool | |
True if at least two speakers are detected, False otherwise. | |
Examples | |
-------- | |
>>> dialogue_detector = DialogueDetecting() | |
>>> dialogue_detector.process("example.wav") | |
True | |
""" | |
total_duration = self.get_audio_duration(audio_file) | |
num_chunks = int(total_duration // self.chunk_duration) + 1 | |
speakers_detected = set() | |
chunk_files = [] | |
try: | |
for i in range(num_chunks): | |
start_time = i * self.chunk_duration | |
end_time = min(float((i + 1) * self.chunk_duration), total_duration) | |
if end_time - start_time < 1.0: | |
logging.info("Last chunk is too short to process.") | |
break | |
chunk_file = os.path.join(self.temp_dir, f"chunk_{i}.wav") | |
chunk_files.append(chunk_file) | |
logging.info(f"Creating chunk: {chunk_file}") | |
self.create_chunk(audio_file, chunk_file, start_time, end_time) | |
logging.info(f"Processing chunk: {chunk_file}") | |
chunk_speakers = self.process_chunk(chunk_file) | |
speakers_detected.update(chunk_speakers) | |
if len(speakers_detected) >= 2: | |
logging.info("At least two speakers detected, stopping.") | |
return True | |
if len(speakers_detected) < 2: | |
logging.info("No dialogue detected or only one speaker found.") | |
if self.delete_original: | |
logging.info(f"No dialogue found. Deleting original file: {audio_file}") | |
os.remove(audio_file) | |
if self.skip_if_no_dialogue: | |
logging.info("Skipping further processing due to lack of dialogue.") | |
return False | |
finally: | |
logging.info("Cleaning up temporary chunk files.") | |
for chunk_file in chunk_files: | |
if os.path.exists(chunk_file): | |
os.remove(chunk_file) | |
if os.path.exists(self.temp_dir) and not os.listdir(self.temp_dir): | |
os.rmdir(self.temp_dir) | |
return len(speakers_detected) >= 2 | |
if __name__ == "__main__": | |
processor = DialogueDetecting(delete_original=True) | |
audio_path = ".data/example/kafkasya.mp3" | |
process_result = processor.process(audio_path) | |
print("Dialogue detected:", process_result) | |