from dataclasses import dataclass import torch from tqdm.auto import trange import typing as tp from einops import rearrange from torch import nn from .autoencoders import AudioAutoencoder from .conditioners import MultiConditioner, create_multi_conditioner_from_conditioning_config from .factory import create_pretransform_from_config from .lm_backbone import AudioLMBackbone, XTransformersAudioLMBackbone, ContinuousTransformerAudioLMBackbone from .pretransforms import Pretransform, AutoencoderPretransform, PretrainedDACPretransform, AudiocraftCompressionPretransform from .utils import multinomial, sample_top_k, sample_top_p from audiocraft.modules.codebooks_patterns import ( CodebooksPatternProvider, DelayedPatternProvider, MusicLMPattern, ParallelPatternProvider, UnrolledPatternProvider, VALLEPattern, ) # Copied and modified from https://github.com/facebookresearch/audiocraft/blob/main/audiocraft/models/lm.py under MIT license # License can be found in LICENSES/LICENSE_META.txt @dataclass class LMOutput: # The logits are already re-aligned with the input codes # hence no extra shift is required, e.g. when computing CE logits: torch.Tensor # [B, K, T, card] mask: torch.Tensor # [B, K, T] # Wrapper for a multi-codebook language model # Handles patterns and quantizer heads class AudioLanguageModel(nn.Module): def __init__( self, pattern_provider: CodebooksPatternProvider, backbone: AudioLMBackbone, num_quantizers: int, codebook_size: int ): super().__init__() self.pattern_provider = pattern_provider self.backbone = backbone self.num_quantizers = num_quantizers self.codebook_size = codebook_size self.masked_token_id = codebook_size # Per-quantizer embedders # Add one for the mask embed self.embeds = nn.ModuleList([nn.Embedding(codebook_size + 1, backbone.embed_dim) for _ in range(num_quantizers)]) # Per-quantizer output heads self.quantizer_heads = nn.ModuleList([ nn.Linear(backbone.embed_dim, codebook_size) for _ in range(num_quantizers) ]) def forward(self, sequence: torch.Tensor, #[batch, seq_len, prepend_cond=None, #[batch, seq, channels] prepend_cond_mask=None, cross_attn_cond=None, #[batch, seq, channels], **kwargs ): batch, num_quantizers, seq_len = sequence.shape assert num_quantizers == self.num_quantizers, "Number of quantizers in sequence must match number of quantizers in model" backbone_input = sum([self.embeds[i](sequence[:, i]) for i in range(num_quantizers)]) # [batch, seq_len, embed_dim] output = self.backbone( backbone_input, cross_attn_cond=cross_attn_cond, prepend_cond=prepend_cond, prepend_cond_mask=prepend_cond_mask, **kwargs ) # [batch, seq_len, embed_dim] # Run output through quantizer heads logits = torch.stack([self.quantizer_heads[i](output) for i in range(num_quantizers)], dim=1) # [batch, num_quantizers, seq_len, codebook_size] return logits def compute_logits( self, codes, #[batch, num_quantizers, seq_len] **kwargs): """ Compute logits for a batch of codes, optionally conditioning on cross-attention and prepend conditioning Handles translation between input sequence and pattern-shifted sequence Only used during training """ batch, _, seq_len = codes.shape pattern = self.pattern_provider.get_pattern(seq_len) # Apply the token pattern to the codes, shifting the codes as needed and masking out invalid steps shifted_codes, _, _ = pattern.build_pattern_sequence( codes, self.masked_token_id, keep_only_valid_steps=True ) # Run the model to get logits for each quantizer [batch, num_quantizers, seq_len, codebook_size] logits = self(shifted_codes, **kwargs) # Rearrange logits to prepare to revert pattern logits = rearrange(logits, "b n s c -> b c n s") # Revert sequence logits back to original sequence length, removing masked steps logits, _, logits_mask = pattern.revert_pattern_logits( logits, float('nan'), keep_only_valid_steps=True ) logits = rearrange(logits, "b c n t -> b n t c") logits_mask = logits_mask[None, :, :].expand(batch, -1, -1) # [batch, num_quantizers, seq_len] return LMOutput(logits=logits, mask=logits_mask) # Conditioning and generation wrapper for a multi-codebook language model # Handles conditioning, CFG, generation, and encoding/decoding class AudioLanguageModelWrapper(nn.Module): def __init__( self, pretransform: Pretransform, lm: AudioLanguageModel, sample_rate: int, min_input_length: int, conditioner: MultiConditioner = None, cross_attn_cond_ids: tp.List[str] = [], prepend_cond_ids: tp.List[str] = [], global_cond_ids: tp.List[str] = [] ): super().__init__() assert pretransform.is_discrete, "Pretransform must be discrete" self.pretransform = pretransform self.pretransform.requires_grad_(False) self.pretransform.eval() if isinstance(self.pretransform, AutoencoderPretransform): self.num_quantizers = self.pretransform.model.bottleneck.num_quantizers self.codebook_size = self.pretransform.model.bottleneck.codebook_size elif isinstance(self.pretransform, PretrainedDACPretransform): self.num_quantizers = self.pretransform.model.num_quantizers self.codebook_size = self.pretransform.model.codebook_size elif isinstance(self.pretransform, AudiocraftCompressionPretransform): self.num_quantizers = self.pretransform.num_quantizers self.codebook_size = self.pretransform.codebook_size else: raise NotImplementedError(f"Unrecognized pretransform type {type(self.pretransform)}") self.conditioner = conditioner self.lm = lm self.sample_rate = sample_rate self.min_input_length = min_input_length self.cross_attn_cond_ids = cross_attn_cond_ids self.prepend_cond_ids = prepend_cond_ids self.global_cond_ids = global_cond_ids def get_conditioning_inputs(self, cond: tp.Dict[str, tp.Any], negative=False): cross_attention_input = None prepend_cond = None prepend_cond_mask = None global_cond = None if len(self.cross_attn_cond_ids) > 0: # Concatenate all cross-attention inputs over the sequence dimension # Assumes that the cross-attention inputs are of shape (batch, seq, channels) cross_attention_input = torch.cat([cond[key][0] for key in self.cross_attn_cond_ids], dim=1) if len(self.prepend_cond_ids) > 0: # Concatenate all prepend conditioning inputs over the sequence dimension # Assumes that the prepend conditioning inputs are of shape (batch, seq, channels) prepend_cond = torch.cat([cond[key][0] for key in self.prepend_cond_ids], dim=1) prepend_cond_mask = torch.cat([cond[key][1] for key in self.prepend_cond_ids], dim=1) if len(self.global_cond_ids) > 0: # Concatenate all global conditioning inputs over the channel dimension # Assumes that the global conditioning inputs are of shape (batch, channels) global_cond = torch.cat([cond[key][0] for key in self.global_cond_ids], dim=-1) if len(global_cond.shape) == 3: global_cond = global_cond.squeeze(1) if negative: return { "negative_cross_attn_cond": cross_attention_input, "negative_prepend_cond": prepend_cond, "negative_prepend_cond_mask": prepend_cond_mask, "negative_global_cond": global_cond } else: return { "cross_attn_cond": cross_attention_input, "prepend_cond": prepend_cond, "prepend_cond_mask": prepend_cond_mask, "global_cond": global_cond } def compute_logits( self, codes, condition_tensors=None, cfg_dropout_prob=0.0, **kwargs ): """ Compute logits for a batch of codes, and translates from conditioning inputs to model inputs Handles CFG dropout """ if condition_tensors is None: condition_tensors = {} conditioning_inputs = self.get_conditioning_inputs(condition_tensors) cross_attn_cond = conditioning_inputs["cross_attn_cond"] prepend_cond = conditioning_inputs["prepend_cond"] prepend_cond_mask = conditioning_inputs["prepend_cond_mask"] global_cond = conditioning_inputs["global_cond"] if cfg_dropout_prob > 0.0: if cross_attn_cond is not None: null_embed = torch.zeros_like(cross_attn_cond, device=cross_attn_cond.device) dropout_mask = torch.bernoulli(torch.full((cross_attn_cond.shape[0], 1, 1), cfg_dropout_prob, device=cross_attn_cond.device)).to(torch.bool) cross_attn_cond = torch.where(dropout_mask, null_embed, cross_attn_cond) if prepend_cond is not None: null_embed = torch.zeros_like(prepend_cond, device=prepend_cond.device) dropout_mask = torch.bernoulli(torch.full((prepend_cond.shape[0], 1, 1), cfg_dropout_prob, device=prepend_cond.device)).to(torch.bool) prepend_cond = torch.where(dropout_mask, null_embed, prepend_cond) if global_cond is not None: null_embed = torch.zeros_like(global_cond, device=global_cond.device) dropout_mask = torch.bernoulli(torch.full((global_cond.shape[0], 1), cfg_dropout_prob, device=global_cond.device)).to(torch.bool) global_cond = torch.where(dropout_mask, null_embed, global_cond) return self.lm.compute_logits(codes, cross_attn_cond=cross_attn_cond, prepend_cond=prepend_cond, prepend_cond_mask=prepend_cond_mask, global_cond=global_cond, **kwargs) def _sample_next_token( self, sequence, #[batch, num_quantizers, seq_len] conditioning_tensors=None, cross_attn_use_cfg=True, prepend_use_cfg=True, global_use_cfg=True, cfg_scale=1.0, top_k=250, top_p=0.0, temp=1.0, **kwargs ): """ Sample the next token for a batch of codes, and translates from conditioning inputs to model inputs Handles CFG inference """ if conditioning_tensors is None: conditioning_tensors = {} conditioning_inputs = self.get_conditioning_inputs(conditioning_tensors) cross_attn_cond = conditioning_inputs["cross_attn_cond"] prepend_cond = conditioning_inputs["prepend_cond"] prepend_cond_mask = conditioning_inputs["prepend_cond_mask"] global_cond = conditioning_inputs["global_cond"] if cfg_scale != 1.0: # Batch size is doubled to account for negative samples sequence = torch.cat([sequence, sequence], dim=0) if cross_attn_cond is not None and cross_attn_use_cfg: null_embed = torch.zeros_like(cross_attn_cond, device=cross_attn_cond.device) cross_attn_cond = torch.cat([cross_attn_cond, null_embed], dim=0) if prepend_cond is not None and prepend_use_cfg: null_embed = torch.zeros_like(prepend_cond, device=prepend_cond.device) prepend_cond = torch.cat([prepend_cond, null_embed], dim=0) if prepend_cond_mask is not None: prepend_cond_mask = torch.cat([prepend_cond_mask, prepend_cond_mask], dim=0) if global_cond is not None and global_use_cfg: null_embed = torch.zeros_like(global_cond, device=global_cond.device) global_cond = torch.cat([global_cond, null_embed], dim=0) logits = self.lm(sequence, cross_attn_cond=cross_attn_cond, prepend_cond=prepend_cond, prepend_cond_mask=prepend_cond_mask, global_cond=global_cond, **kwargs) if cfg_scale != 1.0: cond_logits, uncond_logits = logits.chunk(2, dim=0) logits = uncond_logits + (cond_logits - uncond_logits) * cfg_scale logits = rearrange(logits, "b n s c -> b n c s") # [batch, num_quantizers, codebook_size, seq_len] # Grab the logits for the last step logits = logits[:, :, :, -1] # [batch, num_quantizers, codebook_size] # Apply top-k or top-p sampling if temp > 0: probs = torch.softmax(logits / temp, dim=-1) if top_p > 0.0: next_token = sample_top_p(probs, p=top_p) elif top_k > 0: next_token = sample_top_k(probs, k=top_k) else: next_token = multinomial(probs, num_samples=1) else: next_token = torch.argmax(logits, dim=-1, keepdim=True) # [batch, num_quantizers, 1] return next_token @torch.no_grad() def generate( self, max_gen_len: int = 256, batch_size: tp.Optional[int] = None, init_data: tp.Optional[torch.Tensor] = None, conditioning: tp.Optional[tp.Dict[str, tp.Any]] = None, conditioning_tensors: tp.Optional[tp.Dict[str, tp.Any]] = None, callback: tp.Optional[tp.Callable[[int, int], None]] = None, use_cache: bool = True, cfg_scale: float = 1.0, **kwargs ): device = next(self.parameters()).device if conditioning_tensors is None and conditioning is not None: # Convert conditioning inputs to conditioning tensors conditioning_tensors = self.conditioner(conditioning, device) # Check that batch size is consistent across inputs possible_batch_sizes = [] if batch_size is not None: possible_batch_sizes.append(batch_size) elif init_data is not None: possible_batch_sizes.append(init_data.shape[0]) elif conditioning_tensors is not None: # Assume that the first conditioning tensor has the batch dimension possible_batch_sizes.append(conditioning_tensors[list(conditioning_tensors.keys())[0]][0].shape[0]) else: possible_batch_sizes.append(1) assert [x == possible_batch_sizes[0] for x in possible_batch_sizes], "Batch size must be consistent across inputs" batch_size = possible_batch_sizes[0] if init_data is None: # Initialize with zeros assert batch_size > 0 init_data = torch.zeros((batch_size, self.num_quantizers, 0), device=device, dtype=torch.long) batch_size, num_quantizers, seq_len = init_data.shape start_offset = seq_len assert start_offset < max_gen_len, "init data longer than max gen length" pattern = self.lm.pattern_provider.get_pattern(max_gen_len) unknown_token = -1 # Initialize the generated codes with the init data, padded with unknown tokens gen_codes = torch.full((batch_size, num_quantizers, max_gen_len), unknown_token, device=device, dtype=torch.long) gen_codes[:, :, :start_offset] = init_data # [batch, num_quantizers, max_gen_len] gen_sequence, _, mask = pattern.build_pattern_sequence(gen_codes, self.lm.masked_token_id) # [batch, num_quantizers, gen_sequence_len] start_offset_sequence = pattern.get_first_step_with_timesteps(start_offset) assert start_offset_sequence is not None # Generation prev_offset = 0 gen_sequence_len = gen_sequence.shape[-1] # Reset generation cache if use_cache and self.lm.backbone.use_generation_cache: self.lm.backbone.reset_generation_cache(max_gen_len, batch_size if cfg_scale == 1.0 else batch_size * 2) for offset in trange(start_offset_sequence, gen_sequence_len): # Get the full sequence up to the current offset curr_sequence = gen_sequence[..., prev_offset:offset] next_token = self._sample_next_token( curr_sequence, conditioning_tensors=conditioning_tensors, use_cache=use_cache, cfg_scale=cfg_scale, **kwargs ) valid_mask = mask[..., offset:offset+1].expand(batch_size, -1, -1) next_token[~valid_mask] = self.lm.masked_token_id # Update the generated sequence with the next token gen_sequence[..., offset:offset+1] = torch.where( gen_sequence[..., offset:offset+1] == unknown_token, next_token, gen_sequence[..., offset:offset+1] ) if use_cache and self.lm.backbone.use_generation_cache: # Only update the offset if caching is being used prev_offset = offset self.lm.backbone.update_generation_cache(offset) if callback is not None: # Callback to report progress # Pass in the offset relative to the start of the sequence, and the length of the current sequence callback(1 + offset - start_offset_sequence, gen_sequence_len - start_offset_sequence) assert not (gen_sequence == unknown_token).any(), "Unknown tokens in generated sequence" out_codes, _, out_mask = pattern.revert_pattern_sequence(gen_sequence, special_token=unknown_token) # sanity checks over the returned codes and corresponding masks assert (out_codes[..., :max_gen_len] != unknown_token).all() assert (out_mask[..., :max_gen_len] == 1).all() #out_codes = out_codes[..., 0:max_gen_len] return out_codes def generate_audio( self, **kwargs ): """ Generate audio from a batch of codes """ codes = self.generate(**kwargs) audio = self.pretransform.decode_tokens(codes) return audio def create_audio_lm_from_config(config): model_config = config.get('model', None) assert model_config is not None, 'model config must be specified in config' sample_rate = config.get('sample_rate', None) assert sample_rate is not None, "Must specify sample_rate in config" lm_config = model_config.get('lm', None) assert lm_config is not None, 'lm config must be specified in model config' codebook_pattern = lm_config.get("codebook_pattern", "delay") pattern_providers = { 'parallel': ParallelPatternProvider, 'delay': DelayedPatternProvider, 'unroll': UnrolledPatternProvider, 'valle': VALLEPattern, 'musiclm': MusicLMPattern, } pretransform_config = model_config.get("pretransform", None) pretransform = create_pretransform_from_config(pretransform_config, sample_rate) assert pretransform.is_discrete, "Pretransform must be discrete" min_input_length = pretransform.downsampling_ratio pattern_provider = pattern_providers[codebook_pattern](n_q=pretransform.num_quantizers) conditioning_config = model_config.get('conditioning', None) conditioner = None if conditioning_config is not None: conditioner = create_multi_conditioner_from_conditioning_config(conditioning_config) cross_attn_cond_ids = lm_config.get('cross_attention_cond_ids', []) prepend_cond_ids = lm_config.get('prepend_cond_ids', []) global_cond_ids = lm_config.get('global_cond_ids', []) lm_type = lm_config.get("type", None) lm_model_config = lm_config.get("config", None) assert lm_type is not None, "Must specify lm type in lm config" assert lm_model_config is not None, "Must specify lm model config in lm config" if lm_type == "x-transformers": backbone = XTransformersAudioLMBackbone(**lm_model_config) elif lm_type == "continuous_transformer": backbone = ContinuousTransformerAudioLMBackbone(**lm_model_config) else: raise NotImplementedError(f"Unrecognized lm type {lm_type}") lm = AudioLanguageModel( pattern_provider=pattern_provider, backbone=backbone, num_quantizers=pretransform.num_quantizers, codebook_size=pretransform.codebook_size ) model = AudioLanguageModelWrapper( pretransform=pretransform, lm=lm, conditioner=conditioner, sample_rate=sample_rate, min_input_length=min_input_length, cross_attn_cond_ids=cross_attn_cond_ids, prepend_cond_ids=prepend_cond_ids, global_cond_ids=global_cond_ids ) return model