|
import os
|
|
import warnings
|
|
|
|
import numpy as np
|
|
import torch
|
|
from torch import nn
|
|
|
|
from ..masknn import activations
|
|
from ..utils.torch_utils import pad_x_to_y
|
|
|
|
|
|
def _unsqueeze_to_3d(x):
|
|
if x.ndim == 1:
|
|
return x.reshape(1, 1, -1)
|
|
elif x.ndim == 2:
|
|
return x.unsqueeze(1)
|
|
else:
|
|
return x
|
|
|
|
|
|
class BaseModel(nn.Module):
|
|
def __init__(self):
|
|
print("initialize BaseModel")
|
|
super().__init__()
|
|
|
|
def forward(self, *args, **kwargs):
|
|
raise NotImplementedError
|
|
|
|
@torch.no_grad()
|
|
def separate(self, wav, output_dir=None, force_overwrite=False, **kwargs):
|
|
"""Infer separated sources from input waveforms.
|
|
Also supports filenames.
|
|
|
|
Args:
|
|
wav (Union[torch.Tensor, numpy.ndarray, str]): waveform array/tensor.
|
|
Shape: 1D, 2D or 3D tensor, time last.
|
|
output_dir (str): path to save all the wav files. If None,
|
|
estimated sources will be saved next to the original ones.
|
|
force_overwrite (bool): whether to overwrite existing files.
|
|
**kwargs: keyword arguments to be passed to `_separate`.
|
|
|
|
Returns:
|
|
Union[torch.Tensor, numpy.ndarray, None], the estimated sources.
|
|
(batch, n_src, time) or (n_src, time) w/o batch dim.
|
|
|
|
.. note::
|
|
By default, `separate` calls `_separate` which calls `forward`.
|
|
For models whose `forward` doesn't return waveform tensors,
|
|
overwrite `_separate` to return waveform tensors.
|
|
"""
|
|
if isinstance(wav, str):
|
|
self.file_separate(
|
|
wav, output_dir=output_dir, force_overwrite=force_overwrite, **kwargs
|
|
)
|
|
elif isinstance(wav, np.ndarray):
|
|
print("is ndarray")
|
|
|
|
return self.numpy_separate(wav, **kwargs)
|
|
elif isinstance(wav, torch.Tensor):
|
|
print("is torch.Tensor")
|
|
return self.torch_separate(wav, **kwargs)
|
|
else:
|
|
raise ValueError(
|
|
f"Only support filenames, numpy arrays and torch tensors, received {type(wav)}"
|
|
)
|
|
|
|
def torch_separate(self, wav: torch.Tensor, **kwargs) -> torch.Tensor:
|
|
""" Core logic of `separate`."""
|
|
|
|
input_device = wav.device
|
|
model_device = next(self.parameters()).device
|
|
wav = wav.to(model_device)
|
|
|
|
out_wavs = self._separate(wav, **kwargs)
|
|
|
|
|
|
out_wavs *= wav.abs().sum() / (out_wavs.abs().sum())
|
|
|
|
|
|
out_wavs = out_wavs.to(input_device)
|
|
return out_wavs
|
|
|
|
def numpy_separate(self, wav: np.ndarray, **kwargs) -> np.ndarray:
|
|
""" Numpy interface to `separate`."""
|
|
wav = torch.from_numpy(wav)
|
|
out_wav = self.torch_separate(wav, **kwargs)
|
|
out_wav = out_wav.data.numpy()
|
|
return out_wav
|
|
|
|
def file_separate(
|
|
self, filename: str, output_dir=None, force_overwrite=False, **kwargs
|
|
) -> None:
|
|
""" Filename interface to `separate`."""
|
|
import soundfile as sf
|
|
|
|
wav, fs = sf.read(filename, dtype="float32", always_2d=True)
|
|
|
|
to_save = self.numpy_separate(wav[:, 0], **kwargs)
|
|
|
|
|
|
for src_idx, est_src in enumerate(to_save):
|
|
base = ".".join(filename.split(".")[:-1])
|
|
save_name = base + "_est{}.".format(src_idx + 1) + filename.split(".")[-1]
|
|
if os.path.isfile(save_name) and not force_overwrite:
|
|
warnings.warn(
|
|
f"File {save_name} already exists, pass `force_overwrite=True` to overwrite it",
|
|
UserWarning,
|
|
)
|
|
return
|
|
if output_dir is not None:
|
|
save_name = os.path.join(output_dir, save_name.split("/")[-1])
|
|
sf.write(save_name, est_src, fs)
|
|
|
|
def _separate(self, wav, *args, **kwargs):
|
|
"""Hidden separation method
|
|
|
|
Args:
|
|
wav (Union[torch.Tensor, numpy.ndarray, str]): waveform array/tensor.
|
|
Shape: 1D, 2D or 3D tensor, time last.
|
|
|
|
Returns:
|
|
The output of self(wav, *args, **kwargs).
|
|
"""
|
|
return self(wav, *args, **kwargs)
|
|
|
|
@classmethod
|
|
def from_pretrained(cls, pretrained_model_conf_or_path, *args, **kwargs):
|
|
"""Instantiate separation model from a model config (file or dict).
|
|
|
|
Args:
|
|
pretrained_model_conf_or_path (Union[dict, str]): model conf as
|
|
returned by `serialize`, or path to it. Need to contain
|
|
`model_args` and `state_dict` keys.
|
|
*args: Positional arguments to be passed to the model.
|
|
**kwargs: Keyword arguments to be passed to the model.
|
|
They overwrite the ones in the model package.
|
|
|
|
Returns:
|
|
nn.Module corresponding to the pretrained model conf/URL.
|
|
|
|
Raises:
|
|
ValueError if the input config file doesn't contain the keys
|
|
`model_name`, `model_args` or `state_dict`.
|
|
"""
|
|
from . import get
|
|
|
|
if isinstance(pretrained_model_conf_or_path, str):
|
|
|
|
if os.path.isfile(pretrained_model_conf_or_path):
|
|
cached_model = pretrained_model_conf_or_path
|
|
else:
|
|
raise ValueError(
|
|
"Model {} is not a file or doesn't exist.".format(pretrained_model_conf_or_path)
|
|
)
|
|
|
|
conf = torch.load(cached_model, map_location="cpu")
|
|
else:
|
|
conf = pretrained_model_conf_or_path
|
|
|
|
if "model_name" not in conf.keys():
|
|
raise ValueError(
|
|
"Expected config dictionary to have field "
|
|
"model_name`. Found only: {}".format(conf.keys())
|
|
)
|
|
if "state_dict" not in conf.keys():
|
|
raise ValueError(
|
|
"Expected config dictionary to have field "
|
|
"state_dict`. Found only: {}".format(conf.keys())
|
|
)
|
|
if "model_args" not in conf.keys():
|
|
raise ValueError(
|
|
"Expected config dictionary to have field "
|
|
"model_args`. Found only: {}".format(conf.keys())
|
|
)
|
|
conf["model_args"].update(kwargs)
|
|
|
|
try:
|
|
model_class = get(conf["model_name"])
|
|
except ValueError:
|
|
model = cls(*args, **conf["model_args"])
|
|
else:
|
|
model = model_class(*args, **conf["model_args"])
|
|
model.load_state_dict(conf["state_dict"])
|
|
return model
|
|
|
|
def serialize(self):
|
|
"""Serialize model and output dictionary.
|
|
|
|
Returns:
|
|
dict, serialized model with keys `model_args` and `state_dict`.
|
|
"""
|
|
import pytorch_lightning as pl
|
|
|
|
from .. import __version__ as asteroid_version
|
|
|
|
model_conf = dict(
|
|
model_name=self.__class__.__name__,
|
|
state_dict=self.get_state_dict(),
|
|
model_args=self.get_model_args(),
|
|
)
|
|
|
|
infos = dict()
|
|
infos["software_versions"] = dict(
|
|
torch_version=torch.__version__,
|
|
pytorch_lightning_version=pl.__version__,
|
|
asteroid_version=asteroid_version,
|
|
)
|
|
model_conf["infos"] = infos
|
|
return model_conf
|
|
|
|
def get_state_dict(self):
|
|
""" In case the state dict needs to be modified before sharing the model."""
|
|
return self.state_dict()
|
|
|
|
def get_model_args(self):
|
|
raise NotImplementedError
|
|
|
|
def cached_download(self, filename_or_url):
|
|
if os.path.isfile(filename_or_url):
|
|
print("is file")
|
|
return filename_or_url
|
|
else:
|
|
print("Model {} is not a file or doesn't exist.".format(filename_or_url))
|
|
|
|
|
|
class BaseEncoderMaskerDecoder(BaseModel):
|
|
"""Base class for encoder-masker-decoder separation models.
|
|
|
|
Args:
|
|
encoder (Encoder): Encoder instance.
|
|
masker (nn.Module): masker network.
|
|
decoder (Decoder): Decoder instance.
|
|
encoder_activation (Optional[str], optional): Activation to apply after encoder.
|
|
See ``asteroid.masknn.activations`` for valid values.
|
|
"""
|
|
|
|
def __init__(self, encoder, masker, decoder, encoder_activation=None):
|
|
super().__init__()
|
|
self.encoder = encoder
|
|
self.masker = masker
|
|
self.decoder = decoder
|
|
|
|
self.encoder_activation = encoder_activation
|
|
self.enc_activation = activations.get(encoder_activation or "linear")()
|
|
|
|
def forward(self, wav):
|
|
"""Enc/Mask/Dec model forward
|
|
|
|
Args:
|
|
wav (torch.Tensor): waveform tensor. 1D, 2D or 3D tensor, time last.
|
|
|
|
Returns:
|
|
torch.Tensor, of shape (batch, n_src, time) or (n_src, time).
|
|
"""
|
|
|
|
was_one_d = wav.ndim == 1
|
|
|
|
wav = _unsqueeze_to_3d(wav)
|
|
|
|
|
|
tf_rep = self.encoder(wav)
|
|
tf_rep = self.postprocess_encoded(tf_rep)
|
|
tf_rep = self.enc_activation(tf_rep)
|
|
|
|
est_masks = self.masker(tf_rep)
|
|
est_masks = self.postprocess_masks(est_masks)
|
|
|
|
masked_tf_rep = est_masks * tf_rep.unsqueeze(1)
|
|
masked_tf_rep = self.postprocess_masked(masked_tf_rep)
|
|
|
|
decoded = self.decoder(masked_tf_rep)
|
|
decoded = self.postprocess_decoded(decoded)
|
|
|
|
reconstructed = pad_x_to_y(decoded, wav)
|
|
if was_one_d:
|
|
return reconstructed.squeeze(0)
|
|
else:
|
|
return reconstructed
|
|
|
|
def postprocess_encoded(self, tf_rep):
|
|
"""Hook to perform transformations on the encoded, time-frequency domain
|
|
representation (output of the encoder) before encoder activation is applied.
|
|
|
|
Args:
|
|
tf_rep (Tensor of shape (batch, freq, time)):
|
|
Output of the encoder, before encoder activation is applied.
|
|
|
|
Return:
|
|
Transformed `tf_rep`
|
|
"""
|
|
return tf_rep
|
|
|
|
def postprocess_masks(self, masks):
|
|
"""Hook to perform transformations on the masks (output of the masker) before
|
|
masks are applied.
|
|
|
|
Args:
|
|
masks (Tensor of shape (batch, n_src, freq, time)):
|
|
Output of the masker
|
|
|
|
Return:
|
|
Transformed `masks`
|
|
"""
|
|
return masks
|
|
|
|
def postprocess_masked(self, masked_tf_rep):
|
|
"""Hook to perform transformations on the masked time-frequency domain
|
|
representation (result of masking in the time-frequency domain) before decoding.
|
|
|
|
Args:
|
|
masked_tf_rep (Tensor of shape (batch, n_src, freq, time)):
|
|
Masked time-frequency representation, before decoding.
|
|
|
|
Return:
|
|
Transformed `masked_tf_rep`
|
|
"""
|
|
return masked_tf_rep
|
|
|
|
def postprocess_decoded(self, decoded):
|
|
"""Hook to perform transformations on the decoded, time domain representation
|
|
(output of the decoder) before original shape reconstruction.
|
|
|
|
Args:
|
|
decoded (Tensor of shape (batch, n_src, time)):
|
|
Output of the decoder, before original shape reconstruction.
|
|
|
|
Return:
|
|
Transformed `decoded`
|
|
"""
|
|
return decoded
|
|
|
|
def get_model_args(self):
|
|
""" Arguments needed to re-instantiate the model. """
|
|
fb_config = self.encoder.filterbank.get_config()
|
|
masknet_config = self.masker.get_config()
|
|
|
|
if not all(k not in fb_config for k in masknet_config):
|
|
raise AssertionError(
|
|
"Filterbank and Mask network config share" "common keys. Merging them is not safe."
|
|
)
|
|
|
|
model_args = {
|
|
**fb_config,
|
|
**masknet_config,
|
|
"encoder_activation": self.encoder_activation,
|
|
}
|
|
return model_args
|
|
|
|
|
|
|
|
BaseTasNet = BaseEncoderMaskerDecoder
|
|
|