# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the license found in the # LICENSE file in the root directory of this source tree. import math import typing as tp import torch from torch import nn from .core_vq import ResidualVectorQuantization ################################################################################ # Residual quantization module ################################################################################ class ResidualVectorQuantizer(nn.Module): """Residual Vector Quantizer. Args: dimension (int): Dimension of the codebooks. n_q (int): Number of residual vector quantizers used. bins (int): Codebook size. decay (float): Decay for exponential moving average over the codebooks. kmeans_init (bool): Whether to use kmeans to initialize the codebooks. kmeans_iters (int): Number of iterations used for kmeans initialization. threshold_ema_dead_code (int): Threshold for dead code expiration. Replace any codes that have an exponential moving average cluster size less than the specified threshold with randomly selected vector from the current batch. """ def __init__( self, dimension: int = 256, n_q: int = 8, bins: int = 1024, decay: float = 0.99, kmeans_init: bool = True, kmeans_iters: int = 50, threshold_ema_dead_code: int = 2, ): super().__init__() self.n_q = n_q self.dimension = dimension self.bins = bins self.decay = decay self.kmeans_init = kmeans_init self.kmeans_iters = kmeans_iters self.threshold_ema_dead_code = threshold_ema_dead_code self.vq = ResidualVectorQuantization( dim=self.dimension, codebook_size=self.bins, num_quantizers=self.n_q, decay=self.decay, kmeans_init=self.kmeans_init, kmeans_iters=self.kmeans_iters, threshold_ema_dead_code=self.threshold_ema_dead_code, ) def get_num_quantizers_for_bandwidth( self, frame_rate: int, bandwidth: tp.Optional[float] = None ) -> int: """Return n_q based on specified target bandwidth.""" bw_per_q = self.get_bandwidth_per_quantizer(frame_rate) n_q = self.n_q if bandwidth and bandwidth > 0.0: # bandwidth is represented as a thousandth of what it is, e.g. 6kbps bandwidth is represented as # bandwidth == 6.0 n_q = int(max(1, math.floor(bandwidth * 1000 / bw_per_q))) return n_q def get_bandwidth_per_quantizer(self, frame_rate: int): """Return bandwidth per quantizer for a given input frame rate. Each quantizer encodes a frame with lg(bins) bits. """ return math.log2(self.bins) * frame_rate def encode( self, x: torch.Tensor, frame_rate: int, bandwidth: tp.Optional[float] = None ) -> torch.Tensor: """Encode a given input tensor with the specified frame rate at the given bandwidth. The RVQ encode method sets the appropriate number of quantizers to use and returns indices for each quantizer. """ n_q = self.get_num_quantizers_for_bandwidth(frame_rate, bandwidth) codes, z_O, z_o = self.vq.encode(x, n_q=n_q) return codes, z_O, z_o def decode(self, codes: torch.Tensor) -> torch.Tensor: """ Decode the given codes to the quantized representation. """ quantized = self.vq.decode(codes) return quantized