|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
""" |
|
Processor class for Phi4Multimodal |
|
""" |
|
|
|
from typing import Optional, Union, List, Tuple |
|
|
|
import numpy as np |
|
|
|
from transformers.feature_extraction_sequence_utils import SequenceFeatureExtractor |
|
from transformers.image_processing_utils import BatchFeature |
|
from transformers.utils import TensorType, is_torch_available, logging |
|
|
|
|
|
if is_torch_available(): |
|
import torch |
|
|
|
|
|
logger = logging.get_logger(__name__) |
|
|
|
AudioInput = Union[ |
|
np.ndarray, "torch.Tensor", List[np.ndarray], Tuple[np.ndarray], List["torch.Tensor"], Tuple["torch.Tensor"] |
|
] |
|
|
|
|
|
|
|
def speechlib_mel(sample_rate, n_fft, n_mels, fmin=None, fmax=None): |
|
"""Create a Mel filter-bank the same as SpeechLib FbankFC. |
|
|
|
Args: |
|
sample_rate (int): Sample rate in Hz. number > 0 [scalar] |
|
n_fft (int): FFT size. int > 0 [scalar] |
|
n_mel (int): Mel filter size. int > 0 [scalar] |
|
fmin (float): lowest frequency (in Hz). If None use 0.0. |
|
float >= 0 [scalar] |
|
fmax: highest frequency (in Hz). If None use sample_rate / 2. |
|
float >= 0 [scalar] |
|
|
|
Returns |
|
out (numpy.ndarray): Mel transform matrix |
|
[shape=(n_mels, 1 + n_fft/2)] |
|
""" |
|
|
|
bank_width = int(n_fft // 2 + 1) |
|
if fmax is None: |
|
fmax = sample_rate / 2 |
|
if fmin is None: |
|
fmin = 0 |
|
assert fmin >= 0, "fmin cannot be negtive" |
|
assert fmin < fmax <= sample_rate / 2, "fmax must be between (fmin, samplerate / 2]" |
|
|
|
def mel(f): |
|
return 1127.0 * np.log(1.0 + f / 700.0) |
|
|
|
def bin2mel(fft_bin): |
|
return 1127.0 * np.log(1.0 + fft_bin * sample_rate / (n_fft * 700.0)) |
|
|
|
def f2bin(f): |
|
return int((f * n_fft / sample_rate) + 0.5) |
|
|
|
|
|
klo = f2bin(fmin) + 1 |
|
khi = f2bin(fmax) |
|
|
|
khi = max(khi, klo) |
|
|
|
|
|
mlo = mel(fmin) |
|
mhi = mel(fmax) |
|
m_centers = np.linspace(mlo, mhi, n_mels + 2) |
|
ms = (mhi - mlo) / (n_mels + 1) |
|
|
|
matrix = np.zeros((n_mels, bank_width), dtype=np.float32) |
|
for m in range(0, n_mels): |
|
left = m_centers[m] |
|
center = m_centers[m + 1] |
|
right = m_centers[m + 2] |
|
for fft_bin in range(klo, khi): |
|
mbin = bin2mel(fft_bin) |
|
if left < mbin < right: |
|
matrix[m, fft_bin] = 1.0 - abs(center - mbin) / ms |
|
|
|
return matrix |
|
|
|
|
|
class Phi4MultimodalFeatureExtractor(SequenceFeatureExtractor): |
|
model_input_names = ["audio_input_features", "audio_embed_sizes", "audio_attention_mask"] |
|
|
|
def __init__( |
|
self, |
|
feature_size: int = 80, |
|
sampling_rate: int = 16000, |
|
hop_length: int = 160, |
|
n_fft: int = 512, |
|
win_length: int = 400, |
|
preemphasis: float = 0.97, |
|
padding_value: float = 0.0, |
|
audio_compression_rate: int = 8, |
|
audio_downsample_rate: int = 1, |
|
audio_feat_stride: int = 1, |
|
mel_min_frequency: float = 0, |
|
mel_max_frequency: float = 7690, |
|
**kwargs, |
|
): |
|
super().__init__(feature_size=feature_size, sampling_rate=sampling_rate, padding_value=padding_value, **kwargs) |
|
|
|
self.hop_length = hop_length |
|
self.n_fft = n_fft |
|
self.win_length = win_length |
|
self.preemphasis = preemphasis |
|
self.padding_value = padding_value |
|
self.audio_compression_rate = audio_compression_rate |
|
self.audio_downsample_rate = audio_downsample_rate |
|
self.audio_feat_stride = audio_feat_stride |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.mel_filters = speechlib_mel( |
|
self.sampling_rate, self.n_fft, self.feature_size, mel_min_frequency, mel_max_frequency |
|
).T |
|
|
|
def __call__( |
|
self, |
|
raw_speech: AudioInput, |
|
sampling_rate: Optional[int] = None, |
|
pad_to_multiple_of: Optional[int] = None, |
|
padding: Optional[str] = "longest", |
|
max_length: Optional[int] = None, |
|
truncation: bool = False, |
|
return_tensors: Optional[Union[str, TensorType]] = None, |
|
return_attention_mask: Optional[bool] = True, |
|
device: Optional[str] = "cpu", |
|
**kwargs, |
|
) -> BatchFeature: |
|
""" |
|
Main method to featurize and prepare for the model one or several audio sequence(s). Implementation uses PyTorch for |
|
the STFT computation if available, otherwise a slower NumPy based one. |
|
|
|
Args: |
|
raw_speech (`np.ndarray`, `torch.Tensor`, `List[np.ndarray]`, `List[torch.Tensor]`): |
|
The sequence or batch of sequences to be processed. Each sequence can be a numpy array or PyTorch tensor. |
|
For batched inputs, sequences can be a list of numpy arrays or PyTorch tensors, or a single numpy array or |
|
PyTorch tensor with first dimension being the batch size. |
|
sampling_rate (`int`, *optional*): |
|
The sampling rate at which the `raw_speech` input was sampled. It is strongly recommended to pass |
|
`sampling_rate` at the forward call to prevent silent errors. |
|
pad_to_multiple_of (`int`, *optional*, defaults to None): |
|
If set will pad the sequence to a multiple of the provided value. |
|
padding (`str`, *optional*, defaults to "longest"): |
|
Padding strategy. Can be "longest" to pad to the longest sequence in the batch, or a specific length. |
|
max_length (`int`, *optional*): |
|
Maximum length of the returned list and optionally padding length. |
|
truncation (`bool`, *optional*, defaults to False): |
|
Activates truncation to cut input sequences longer than *max_length* to *max_length*. |
|
return_tensors (`str` or [`~utils.TensorType`], *optional*): |
|
If set, will return tensors instead of numpy arrays. Acceptable values are: |
|
- `'pt'`: Return PyTorch `torch.Tensor` objects. |
|
- `'np'`: Return Numpy `np.ndarray` objects. |
|
- `'tf'`: Return TensorFlow `tf.constant` objects. |
|
return_attention_mask (`bool`, *optional*, defaults to `True`): |
|
Whether to return the extracted audio input features' attention mask. |
|
device (`str`, *optional*, defaults to "cpu"): |
|
Specifies the device for computation of the audio features. (e.g., "cpu", "cuda") |
|
|
|
Returns: |
|
[`BatchFeature`]: A [`BatchFeature`] with the following fields: |
|
- **audio_input_features** -- Audio features extracted from the raw audio input, shape (batch_size, max_feature_length, feature_size). |
|
- **audio_lengths** -- Length of each audio sample in the batch, shape (batch_size,). |
|
- **audio_attention_mask** -- Attention mask for the audio input, shape (batch_size, max_feature_length). |
|
If `return_tensors` is not specified, the fields will be PyTorch tensors if PyTorch is available, otherwise NumPy arrays. |
|
""" |
|
if sampling_rate is not None: |
|
if sampling_rate != self.sampling_rate: |
|
raise ValueError( |
|
f"The model corresponding to this feature extractor: {self.__class__.__name__} was trained using a" |
|
f" sampling rate of {self.sampling_rate}. Please make sure that the provided `raw_speech` input" |
|
f" was sampled with {self.sampling_rate} and not {sampling_rate}." |
|
) |
|
else: |
|
logger.warning( |
|
f"It is strongly recommended to pass the `sampling_rate` argument to `{self.__class__.__name__}()`. " |
|
"Failing to do so can result in silent errors that might be hard to debug." |
|
) |
|
|
|
|
|
if isinstance(raw_speech, np.ndarray): |
|
raw_speech = torch.tensor(raw_speech) |
|
elif isinstance(raw_speech, (list, tuple)) and isinstance(raw_speech[0], np.ndarray): |
|
raw_speech = [torch.tensor(speech) for speech in raw_speech] |
|
|
|
is_batched_torch = isinstance(raw_speech, torch.Tensor) and len(raw_speech.shape) > 1 |
|
if is_batched_torch and len(raw_speech.shape) > 2: |
|
logger.warning( |
|
f"Only mono-channel audio is supported for input to {self.__class__.__name__}. " |
|
"We will take the mean of the channels to convert to mono." |
|
) |
|
raw_speech = raw_speech.mean(-1) |
|
|
|
is_batched_sequence = isinstance(raw_speech, (list, tuple)) |
|
if is_batched_sequence: |
|
for speech in raw_speech: |
|
if len(speech.shape) > 1: |
|
logger.warning( |
|
f"Only mono-channel audio is supported for input to {self.__class__.__name__}. " |
|
"We will take the mean of the channels to convert to mono." |
|
) |
|
speech = speech.mean(-1) |
|
|
|
if is_batched_torch or is_batched_sequence: |
|
raw_speech = [speech[:, None].to(torch.float32) for speech in raw_speech] |
|
else: |
|
raw_speech = [raw_speech[:, None].to(torch.float32)] |
|
|
|
audio_lengths = [len(speech) for speech in raw_speech] |
|
|
|
|
|
batched_speech = BatchFeature(data={"audio_input_features": raw_speech, "audio_lengths": audio_lengths}) |
|
padded_inputs = self.pad( |
|
batched_speech, |
|
padding=padding, |
|
max_length=max_length, |
|
truncation=truncation, |
|
pad_to_multiple_of=pad_to_multiple_of, |
|
return_tensors="pt", |
|
) |
|
input_features = padded_inputs.audio_input_features.squeeze(-1) |
|
audio_lengths = padded_inputs.audio_lengths |
|
|
|
input_features = self._torch_extract_fbank_features(input_features, audio_lengths, device) |
|
|
|
feature_lengths = (audio_lengths - self.win_length) // self.hop_length + 1 |
|
feature_lengths = feature_lengths * self.audio_feat_stride |
|
audio_embed_sizes = self._compute_audio_embed_size(feature_lengths) |
|
|
|
feature_attention_mask = ( |
|
torch.arange(0, feature_lengths.max()) if is_torch_available() else np.arange(0, feature_lengths.max()) |
|
) |
|
feature_attention_mask = ( |
|
feature_attention_mask[None, :] < feature_lengths[:, None] if len(feature_lengths) > 1 else None |
|
) |
|
|
|
data = { |
|
"audio_input_features": input_features, |
|
"audio_embed_sizes": audio_embed_sizes, |
|
} |
|
if feature_attention_mask is not None and return_attention_mask: |
|
data["audio_attention_mask"] = feature_attention_mask |
|
|
|
return BatchFeature(data=data, tensor_type=return_tensors) |
|
|
|
|
|
def _torch_extract_fbank_features( |
|
self, waveform: "torch.FloatTensor", audio_lengths: "torch.Tensor", device: str = "cpu" |
|
) -> "torch.FloatTensor": |
|
""" |
|
Compute the log mel-scaled spectrogram of batched waveforms using PyTorch's FFT implementation. |
|
|
|
Args: |
|
waveform (torch.FloatTensor` of shape `(batch_size, max_audio_length)`): |
|
The batched waveforms. |
|
audio_lengths (`torch.Tensor` of shape `(batch_size,)`): |
|
The lengths of the waveforms along the max_audio_length dimension. |
|
device (`str`, *optional*, defaults to "cpu"): |
|
The device to run the computation on. (e.g., "cpu", "cuda") |
|
|
|
Returns: |
|
`torch.FloatTensor` of shape `(batch_size, max_feature_length, feature_size)`: |
|
The log mel-scaled spectrogram of the batched waveforms. |
|
""" |
|
fft_window = torch.hamming_window(self.win_length, periodic=False, device=device, dtype=torch.float64) |
|
|
|
|
|
batch_size = waveform.shape[0] |
|
frames = waveform.unfold(-1, self.win_length, self.hop_length) |
|
|
|
|
|
|
|
|
|
if batch_size > 1: |
|
frames = frames.clone() |
|
|
|
to_mask_batch_idxs = torch.arange(batch_size)[audio_lengths != audio_lengths.max()] |
|
if to_mask_batch_idxs.numel() > 0: |
|
batch_idxs_down = (audio_lengths[to_mask_batch_idxs] - self.win_length) // self.hop_length + 1 |
|
batch_idxs_up = audio_lengths[to_mask_batch_idxs] // self.hop_length + 1 |
|
offset_idx = batch_idxs_down.min() |
|
max_idx = batch_idxs_up.max() |
|
|
|
mask = torch.arange(max_idx - offset_idx, device=device).expand(to_mask_batch_idxs.shape[0], -1) |
|
mask = ((batch_idxs_down - offset_idx).unsqueeze(1) <= mask) & ( |
|
mask < (batch_idxs_up - offset_idx).unsqueeze(1) |
|
) |
|
mask = mask.unsqueeze(-1).expand(-1, -1, self.win_length) |
|
masked_frames = frames[to_mask_batch_idxs, offset_idx:max_idx].masked_fill_(mask, 0) |
|
frames[to_mask_batch_idxs, offset_idx:max_idx] = masked_frames |
|
|
|
|
|
|
|
frames_prev = torch.roll(frames, 1, dims=-1) |
|
frames_prev[:, :, 0] = frames_prev[:, :, 1] |
|
frames = (frames - self.preemphasis * frames_prev) * 32768 |
|
|
|
|
|
S = torch.fft.rfft(fft_window * frames.view(-1, self.win_length), n=self.n_fft, dim=1) |
|
S = S.view(frames.shape[0], -1, S.shape[-1]) |
|
S = S.to(torch.complex64) |
|
|
|
spec = torch.abs(S) |
|
spec_power = spec**2 |
|
|
|
|
|
mel_filters = torch.from_numpy(self.mel_filters).to(device, torch.float32) |
|
log_spec = torch.clamp(spec_power @ mel_filters, min=1.0) |
|
log_spec = torch.log(log_spec) |
|
|
|
return log_spec |
|
|
|
def _compute_audio_embed_size(self, audio_frames): |
|
integer = audio_frames // self.audio_compression_rate |
|
remainder = audio_frames % self.audio_compression_rate |
|
result = integer + (remainder > 0).to(integer.dtype) |
|
|
|
integer = result // self.audio_downsample_rate |
|
remainder = result % self.audio_downsample_rate |
|
result = integer + (remainder > 0).to(integer.dtype) |
|
|
|
return result |
|
|
|
|
|
__all__ = ["Phi4MultimodalFeatureExtractor"] |
|
|
|
Phi4MultimodalFeatureExtractor.register_for_auto_class() |