|
import sys |
|
import numpy as np |
|
import soundfile as sf |
|
from typing import List, Tuple, Optional, Dict, Union |
|
import webrtcvad |
|
from dataclasses import dataclass, asdict |
|
from scipy import signal |
|
import json |
|
import os |
|
from datetime import datetime |
|
import logging |
|
|
|
|
|
logger = logging.getLogger("vad") |
|
handler = logging.StreamHandler() |
|
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
|
handler.setFormatter(formatter) |
|
logger.addHandler(handler) |
|
logger.setLevel(logging.INFO) |
|
|
|
@dataclass |
|
class AudioSegment: |
|
start_time: float |
|
end_time: float |
|
audio_data: np.ndarray |
|
is_speech: bool |
|
|
|
class AudioVad: |
|
def __init__(self, |
|
sample_rate: int = 16000, |
|
frame_duration_ms: int = 20, |
|
vad_level: int = 0, |
|
min_silence_duration: float = 0.3, |
|
min_speech_duration: float = 0.3, |
|
amplitude_threshold: float = 0.0015, |
|
save_audio: bool = False, |
|
save_json: bool = False, |
|
output_dir: str = "dataset/audio/segments", |
|
json_dir: str = "dataset/audio/metadata", |
|
log_level: Union[int, str] = logging.INFO): |
|
""" |
|
初始化音频VAD处理器 |
|
|
|
Args: |
|
sample_rate: 采样率 |
|
frame_duration_ms: VAD帧长度(毫秒) |
|
vad_level: VAD灵敏度 (0-3) |
|
min_silence_duration: 最小静音持续时间(秒) |
|
min_speech_duration: 最小语音片段长度(秒) |
|
amplitude_threshold: 振幅阈值 |
|
save_audio: 是否保存分段音频文件 |
|
save_json: 是否保存JSON元数据 |
|
output_dir: 音频输出目录 |
|
json_dir: JSON元数据输出目录 |
|
log_level: 日志级别 |
|
""" |
|
|
|
if isinstance(log_level, str): |
|
log_level = getattr(logging, log_level.upper()) |
|
logger.setLevel(log_level) |
|
|
|
self.sample_rate = sample_rate |
|
self.frame_duration_ms = frame_duration_ms |
|
self.frame_size = int(sample_rate * frame_duration_ms / 1000) |
|
self.vad = webrtcvad.Vad(vad_level) |
|
self.min_silence_frames = int(min_silence_duration * 1000 / frame_duration_ms) |
|
self.min_speech_frames = int(min_speech_duration * 1000 / frame_duration_ms) |
|
self.amplitude_threshold = amplitude_threshold |
|
|
|
|
|
self.save_audio = save_audio |
|
self.save_json = save_json |
|
self.output_dir = output_dir |
|
self.json_dir = json_dir |
|
|
|
|
|
if self.save_audio: |
|
os.makedirs(self.output_dir, exist_ok=True) |
|
if self.save_json: |
|
os.makedirs(self.json_dir, exist_ok=True) |
|
|
|
def _is_speech_frame(self, frame: np.ndarray) -> bool: |
|
""" |
|
判断一帧是否包含语音 |
|
""" |
|
|
|
if len(frame) != self.frame_size: |
|
return False |
|
|
|
|
|
frame_int16 = np.clip(frame * 32768, -32768, 32767).astype(np.int16) |
|
|
|
|
|
frame_amplitude = np.max(np.abs(frame)) |
|
if frame_amplitude < self.amplitude_threshold: |
|
return False |
|
|
|
|
|
try: |
|
return self.vad.is_speech(frame_int16.tobytes(), self.sample_rate) |
|
except Exception as e: |
|
logger.error(f"VAD处理出错: {e}") |
|
|
|
return frame_amplitude >= self.amplitude_threshold * 2 |
|
|
|
def process_audio_data(self, audio_data: np.ndarray, sample_rate: int = None) -> List[AudioSegment]: |
|
""" |
|
处理音频数据,返回切割后的片段列表 |
|
|
|
Args: |
|
audio_data: 音频数据numpy数组 |
|
sample_rate: 音频采样率,如果与初始化不同则会重采样 |
|
|
|
Returns: |
|
AudioSegment列表 |
|
""" |
|
logger.debug(f"处理音频数据,形状: {audio_data.shape}") |
|
|
|
|
|
if sample_rate is not None and sample_rate != self.sample_rate: |
|
logger.debug(f"正在重采样音频从 {sample_rate}Hz 到 {self.sample_rate}Hz") |
|
|
|
num_samples = int(len(audio_data) * self.sample_rate / sample_rate) |
|
audio_data = signal.resample(audio_data, num_samples) |
|
logger.debug(f"重采样后音频长度: {len(audio_data)} 采样点") |
|
|
|
|
|
if len(audio_data.shape) > 1: |
|
logger.debug("检测到多声道音频,正在转换为单声道") |
|
audio_data = audio_data.mean(axis=1) |
|
|
|
|
|
segments: List[AudioSegment] = [] |
|
logger.debug(f"开始处理音频,总长度: {len(audio_data)} 采样点 ({len(audio_data)/self.sample_rate:.2f}秒)") |
|
|
|
|
|
current_segment_start = 0 |
|
silence_frame_count = 0 |
|
is_in_speech = False |
|
|
|
|
|
total_frames = len(audio_data) // self.frame_size |
|
speech_frames = 0 |
|
for i in range(0, len(audio_data), self.frame_size): |
|
|
|
frame = audio_data[i:i + self.frame_size] |
|
if len(frame) < self.frame_size: |
|
|
|
frame = np.pad(frame, (0, self.frame_size - len(frame)), 'constant') |
|
|
|
is_speech = self._is_speech_frame(frame) |
|
if is_speech: |
|
speech_frames += 1 |
|
|
|
if is_speech and not is_in_speech: |
|
|
|
current_segment_start = i |
|
is_in_speech = True |
|
silence_frame_count = 0 |
|
logger.debug(f"检测到语音开始,位置: {i/self.sample_rate:.2f}秒") |
|
elif not is_speech and is_in_speech: |
|
silence_frame_count += 1 |
|
|
|
|
|
if silence_frame_count >= self.min_silence_frames: |
|
segment_end = i - (silence_frame_count * self.frame_size) |
|
duration_frames = (segment_end - current_segment_start) // self.frame_size |
|
|
|
|
|
if duration_frames >= self.min_speech_frames: |
|
start_time = current_segment_start / self.sample_rate |
|
end_time = segment_end / self.sample_rate |
|
logger.debug(f"保存语音片段: {start_time:.2f}s -> {end_time:.2f}s (持续时间: {end_time-start_time:.2f}s)") |
|
segments.append(AudioSegment( |
|
start_time=start_time, |
|
end_time=end_time, |
|
audio_data=audio_data[current_segment_start:segment_end], |
|
is_speech=True |
|
)) |
|
else: |
|
logger.debug(f"丢弃过短的语音片段: {duration_frames * self.frame_duration_ms / 1000:.2f}s") |
|
|
|
is_in_speech = False |
|
|
|
|
|
if is_in_speech: |
|
segment_end = len(audio_data) |
|
duration_frames = (segment_end - current_segment_start) // self.frame_size |
|
if duration_frames >= self.min_speech_frames: |
|
start_time = current_segment_start / self.sample_rate |
|
end_time = segment_end / self.sample_rate |
|
logger.debug(f"保存最后的语音片段: {start_time:.2f}s -> {end_time:.2f}s (持续时间: {end_time-start_time:.2f}s)") |
|
segments.append(AudioSegment( |
|
start_time=start_time, |
|
end_time=end_time, |
|
audio_data=audio_data[current_segment_start:segment_end], |
|
is_speech=True |
|
)) |
|
else: |
|
logger.debug(f"丢弃过短的最后语音片段: {duration_frames * self.frame_duration_ms / 1000:.2f}s") |
|
|
|
logger.info(f"音频处理完成: 总帧数: {total_frames}, 语音帧数: {speech_frames}, 检测到的语音片段数: {len(segments)}") |
|
|
|
return segments |
|
|
|
def process_audio_file(self, audio_path: str) -> List[AudioSegment]: |
|
""" |
|
处理音频文件,返回切割后的片段列表 |
|
|
|
Args: |
|
audio_path: 音频文件路径 |
|
|
|
Returns: |
|
AudioSegment列表 |
|
""" |
|
|
|
logger.info(f"正在读取音频文件: {audio_path}") |
|
audio_data, sample_rate = sf.read(audio_path) |
|
logger.debug(f"音频采样率: {sample_rate}Hz, 形状: {audio_data.shape}") |
|
|
|
|
|
segments = self.process_audio_data(audio_data, sample_rate) |
|
|
|
|
|
if self.save_audio and segments: |
|
base_name = os.path.splitext(os.path.basename(audio_path))[0] |
|
for i, segment in enumerate(segments): |
|
output_path = os.path.join(self.output_dir, f"{base_name}_segment_{i+1}.wav") |
|
self.save_segment(segment, output_path) |
|
logger.debug(f"保存音频片段到: {output_path}") |
|
|
|
|
|
if self.save_json and segments: |
|
self.save_segments_metadata(segments, audio_path) |
|
|
|
return segments |
|
|
|
def save_segment(self, segment: AudioSegment, output_path: str): |
|
""" |
|
保存音频片段到文件 |
|
|
|
Args: |
|
segment: 音频片段 |
|
output_path: 输出文件路径 |
|
""" |
|
sf.write(output_path, segment.audio_data, self.sample_rate) |
|
|
|
def save_segments_metadata(self, segments: List[AudioSegment], audio_path: str): |
|
""" |
|
保存片段元数据到JSON文件 |
|
|
|
Args: |
|
segments: 音频片段列表 |
|
audio_path: 原始音频文件路径 |
|
""" |
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
base_name = os.path.splitext(os.path.basename(audio_path))[0] |
|
|
|
|
|
metadata = { |
|
"audio_file": audio_path, |
|
"timestamp": timestamp, |
|
"total_segments": len(segments), |
|
"segments": [ |
|
{ |
|
"index": i, |
|
"start_time": seg.start_time, |
|
"end_time": seg.end_time, |
|
"duration": seg.end_time - seg.start_time, |
|
"is_speech": seg.is_speech |
|
} |
|
for i, seg in enumerate(segments) |
|
] |
|
} |
|
|
|
|
|
json_path = os.path.join(self.json_dir, f"{base_name}_segments_{timestamp}.json") |
|
with open(json_path, 'w', encoding='utf-8') as f: |
|
json.dump(metadata, f, ensure_ascii=False, indent=2) |
|
logger.info(f"保存片段元数据到: {json_path}") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
logger.setLevel(logging.DEBUG) |
|
|
|
|
|
vad = AudioVad( |
|
save_audio=True, |
|
save_json=True, |
|
output_dir="dataset/audio/segments", |
|
json_dir="dataset/audio/metadata" |
|
) |
|
|
|
|
|
audio_path = "dataset/audio/test1.wav" |
|
try: |
|
segments = vad.process_audio_file(audio_path) |
|
logger.info(f"检测到 {len(segments)} 个语音片段:") |
|
for i, segment in enumerate(segments): |
|
logger.info(f"片段 {i+1}: {segment.start_time:.2f}s -> {segment.end_time:.2f}s") |
|
except Exception as e: |
|
logger.error(f"处理音频时出错: {e}") |
|
|