|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import logging
|
|
import re
|
|
|
|
import librosa
|
|
import numpy as np
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def is_silent(data):
|
|
if np.abs(data).max() < 3e-3:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
|
|
def sentence_end(txt):
|
|
for c in [".", "。", "!", "?", "!", "?"]:
|
|
if c in txt:
|
|
if c == ".":
|
|
idx = txt.find(c)
|
|
if idx > 0:
|
|
if txt[idx - 1].isdigit():
|
|
continue
|
|
return c
|
|
return ""
|
|
|
|
|
|
class NumberToTextConverter:
|
|
r"""
|
|
A helper class to ensure text-to-speech (TTS) systems read numeric digits
|
|
in the desired language (Chinese or English) digit-by-digit. It forcibly
|
|
replaces all numeric substrings in text with their language-specific
|
|
textual representations, thereby reducing the likelihood of TTS mistakes
|
|
on numbers.
|
|
Note: MiniCPM-o 2.6 only use this in streaming mode.
|
|
|
|
Attributes:
|
|
num_to_chinese (dict):
|
|
Mapping from digit (str) to its Chinese textual form (str).
|
|
num_to_english (dict):
|
|
Mapping from digit (str) to its English textual form (str).
|
|
|
|
Example:
|
|
>>> converter = NumberToTextConverter()
|
|
>>> converter.replace_numbers_with_text("我有2个苹果", language="chinese")
|
|
'我有两个苹果'
|
|
>>> converter.replace_numbers_with_text("I have 23 books", language="english")
|
|
'I have two three books'
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.num_to_chinese = {
|
|
"0": "零",
|
|
"1": "一",
|
|
"2": "二",
|
|
"3": "三",
|
|
"4": "四",
|
|
"5": "五",
|
|
"6": "六",
|
|
"7": "七",
|
|
"8": "八",
|
|
"9": "九",
|
|
}
|
|
self.num_to_english = {
|
|
"0": "zero",
|
|
"1": "one",
|
|
"2": "two",
|
|
"3": "three",
|
|
"4": "four",
|
|
"5": "five",
|
|
"6": "six",
|
|
"7": "seven",
|
|
"8": "eight",
|
|
"9": "nine",
|
|
}
|
|
|
|
def number_to_chinese_digit_by_digit(self, num_str):
|
|
result = ""
|
|
for char in num_str:
|
|
if char in self.num_to_chinese:
|
|
result += self.num_to_chinese[char]
|
|
return result
|
|
|
|
def number_to_english_digit_by_digit(self, num_str):
|
|
result = []
|
|
for char in num_str:
|
|
if char in self.num_to_english:
|
|
result.append(self.num_to_english[char])
|
|
return " ".join(result)
|
|
|
|
def detect_language(self, text):
|
|
chinese_count = len(re.findall(r"[\u4e00-\u9fff]", text))
|
|
english_count = len(re.findall(r"[a-zA-Z]", text))
|
|
return "chinese" if chinese_count >= english_count else "english"
|
|
|
|
def replace_numbers_with_text(self, text, language=None):
|
|
if language is None:
|
|
language = self.detect_language(text)
|
|
numbers = re.findall(r"\d+", text)
|
|
|
|
for num in numbers:
|
|
if language == "chinese":
|
|
replacement = self.number_to_chinese_digit_by_digit(num)
|
|
else:
|
|
replacement = self.number_to_english_digit_by_digit(num)
|
|
text = text.replace(num, replacement, 1)
|
|
|
|
return text
|
|
|
|
|
|
class VoiceChecker:
|
|
r"""
|
|
A simple utility class to detect silence or low variation in consecutive audio chunks by comparing
|
|
the mel-spectrogram distances. It keeps track of consecutive zero-distance and low-distance chunks
|
|
to decide if the audio is considered "bad" (e.g., overly silent or not changing enough).
|
|
|
|
Attributes:
|
|
previous_mel (`np.ndarray` or `None`):
|
|
Holds the previously observed mel-spectrogram in decibel scale. Used to compute
|
|
the next distance; reset via :meth:`reset`.
|
|
consecutive_zeros (`int`):
|
|
The number of consecutive chunks that were detected as silent (distance = 0).
|
|
consecutive_low_distance (`int`):
|
|
The number of consecutive chunks whose distance was below the threshold.
|
|
|
|
Example:
|
|
>>> checker = VoiceChecker()
|
|
>>> # Suppose we have audio_wav (list or np.ndarray) and mel_spec (np.ndarray)
|
|
>>> # We split them into chunks and call checker.is_bad(...)
|
|
>>> is_audio_bad = checker.is_bad(audio_wav, mel_spec, chunk_size=2560, thresh=100.0)
|
|
>>> if is_audio_bad:
|
|
... print("Audio deemed bad!")
|
|
>>> # Reset states if needed
|
|
>>> checker.reset()
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.previous_mel = None
|
|
self.consecutive_zeros = 0
|
|
self.consecutive_low_distance = 0
|
|
|
|
def compute_distance(self, audio_chunk, mel_spec):
|
|
if is_silent(audio_chunk):
|
|
return 0.0
|
|
|
|
mel_db = librosa.power_to_db(mel_spec)
|
|
if self.previous_mel is None:
|
|
self.previous_mel = mel_db
|
|
return -1.0
|
|
|
|
distance = np.linalg.norm(np.mean(mel_db, axis=1) - np.mean(self.previous_mel, axis=1))
|
|
self.previous_mel = mel_db
|
|
return distance
|
|
|
|
def is_bad(self, audio_wav, mel_spec, chunk_size=2560, thresh=100.0):
|
|
num_chunks = len(audio_wav) // chunk_size
|
|
mel_chunk_size = mel_spec.shape[-1] // num_chunks
|
|
for i in range(num_chunks):
|
|
audio_chunk = audio_wav[i * chunk_size : (i + 1) * chunk_size]
|
|
mel_spec_chunk = mel_spec[:, i * mel_chunk_size : (i + 1) * mel_chunk_size]
|
|
|
|
distance = self.compute_distance(audio_chunk, mel_spec_chunk)
|
|
logger.warning(
|
|
f"mel dist: {distance:.1f}, zero: {self.consecutive_zeros}, low: {self.consecutive_low_distance}"
|
|
)
|
|
if distance == 0:
|
|
self.consecutive_low_distance = 0
|
|
self.consecutive_zeros += 1
|
|
if self.consecutive_zeros >= 12:
|
|
logger.warning("VoiceChecker detected 1.2 s silent. Marking as failed.")
|
|
return True
|
|
elif distance < thresh:
|
|
self.consecutive_zeros = 0
|
|
self.consecutive_low_distance += 1
|
|
if self.consecutive_low_distance >= 5:
|
|
logger.warning("VoiceChecker detected 5 consecutive low distance chunks. Marking as failed.")
|
|
return True
|
|
else:
|
|
self.consecutive_low_distance = 0
|
|
self.consecutive_zeros = 0
|
|
|
|
return False
|
|
|
|
def reset(self):
|
|
self.previous_mel = None
|
|
self.consecutive_zeros = 0
|
|
self.consecutive_low_distance = 0
|
|
|