# Standard library imports import os import logging import subprocess from typing import Annotated # Related third party imports from pyannote.audio import Pipeline logging.basicConfig(level=logging.INFO) class DialogueDetecting: """ Class for detecting dialogue in audio files using speaker diarization. This class processes audio files by dividing them into chunks, applying a pre-trained speaker diarization model, and detecting if there are multiple speakers in the audio. Parameters ---------- pipeline_model : str, optional Name of the pre-trained diarization model. Defaults to "pyannote/speaker-diarization". chunk_duration : int, optional Duration of each chunk in seconds. Defaults to 5. sample_rate : int, optional Sampling rate for the processed audio chunks. Defaults to 16000. channels : int, optional Number of audio channels. Defaults to 1. delete_original : bool, optional If True, deletes the original audio file when no dialogue is detected. Defaults to False. skip_if_no_dialogue : bool, optional If True, skips further processing if no dialogue is detected. Defaults to False. temp_dir : str, optional Directory for temporary chunk files. Defaults to ".temp". Attributes ---------- pipeline : Pipeline Instance of the PyAnnote pipeline for speaker diarization. """ def __init__(self, pipeline_model: str = "pyannote/speaker-diarization", chunk_duration: int = 5, sample_rate: int = 16000, channels: int = 1, delete_original: bool = False, skip_if_no_dialogue: bool = False, temp_dir: str = ".temp"): self.pipeline_model = pipeline_model self.chunk_duration = chunk_duration self.sample_rate = sample_rate self.channels = channels self.delete_original = delete_original self.skip_if_no_dialogue = skip_if_no_dialogue self.temp_dir = temp_dir self.pipeline = Pipeline.from_pretrained(pipeline_model) if not os.path.exists(self.temp_dir): os.makedirs(self.temp_dir) @staticmethod def get_audio_duration(audio_file: Annotated[str, "Path to the audio file"]) -> Annotated[ float, "Duration of the audio in seconds"]: """ Get the duration of an audio file in seconds. Parameters ---------- audio_file : str Path to the audio file. Returns ------- float Duration of the audio file in seconds. Examples -------- >>> DialogueDetecting.get_audio_duration("example.wav") 120.5 """ result = subprocess.run( ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", audio_file], capture_output=True, text=True, check=True ) return float(result.stdout.strip()) def create_chunk(self, audio_file: str, chunk_file: str, start_time: float, end_time: float): """ Create a chunk of the audio file. Parameters ---------- audio_file : str Path to the original audio file. chunk_file : str Path to save the generated chunk file. start_time : float Start time of the chunk in seconds. end_time : float End time of the chunk in seconds. """ duration = end_time - start_time subprocess.run([ "ffmpeg", "-y", "-ss", str(start_time), "-t", str(duration), "-i", audio_file, "-ar", str(self.sample_rate), "-ac", str(self.channels), "-f", "wav", chunk_file ], check=True) def process_chunk(self, chunk_file: Annotated[str, "Path to the chunk file"]) -> Annotated[ set, "Set of detected speaker labels"]: """ Process a single chunk of audio to detect speakers. Parameters ---------- chunk_file : str Path to the chunk file. Returns ------- set Set of detected speaker labels in the chunk. """ diarization = self.pipeline(chunk_file) speakers_in_chunk = set() for segment, track, label in diarization.itertracks(yield_label=True): speakers_in_chunk.add(label) return speakers_in_chunk def process(self, audio_file: Annotated[str, "Path to the input audio file"]) -> Annotated[ bool, "True if dialogue detected, False otherwise"]: """ Process the audio file to detect dialogue. Parameters ---------- audio_file : str Path to the audio file. Returns ------- bool True if at least two speakers are detected, False otherwise. Examples -------- >>> dialogue_detector = DialogueDetecting() >>> dialogue_detector.process("example.wav") True """ total_duration = self.get_audio_duration(audio_file) num_chunks = int(total_duration // self.chunk_duration) + 1 speakers_detected = set() chunk_files = [] try: for i in range(num_chunks): start_time = i * self.chunk_duration end_time = min(float((i + 1) * self.chunk_duration), total_duration) if end_time - start_time < 1.0: logging.info("Last chunk is too short to process.") break chunk_file = os.path.join(self.temp_dir, f"chunk_{i}.wav") chunk_files.append(chunk_file) logging.info(f"Creating chunk: {chunk_file}") self.create_chunk(audio_file, chunk_file, start_time, end_time) logging.info(f"Processing chunk: {chunk_file}") chunk_speakers = self.process_chunk(chunk_file) speakers_detected.update(chunk_speakers) if len(speakers_detected) >= 2: logging.info("At least two speakers detected, stopping.") return True if len(speakers_detected) < 2: logging.info("No dialogue detected or only one speaker found.") if self.delete_original: logging.info(f"No dialogue found. Deleting original file: {audio_file}") os.remove(audio_file) if self.skip_if_no_dialogue: logging.info("Skipping further processing due to lack of dialogue.") return False finally: logging.info("Cleaning up temporary chunk files.") for chunk_file in chunk_files: if os.path.exists(chunk_file): os.remove(chunk_file) if os.path.exists(self.temp_dir) and not os.listdir(self.temp_dir): os.rmdir(self.temp_dir) return len(speakers_detected) >= 2 if __name__ == "__main__": processor = DialogueDetecting(delete_original=True) audio_path = ".data/example/kafkasya.mp3" process_result = processor.process(audio_path) print("Dialogue detected:", process_result)