krishna-k's picture
Upload folder using huggingface_hub
06555b5 verified
raw
history blame contribute delete
3.19 kB
import fractions
from typing import List, Tuple
from av import AudioFrame, AudioResampler
from av.frame import Frame
from av.packet import Packet
from ..jitterbuffer import JitterFrame
from ..mediastreams import convert_timebase
from ._opus import ffi, lib
from .base import Decoder, Encoder
CHANNELS = 2
SAMPLE_RATE = 48000
SAMPLE_WIDTH = 2
SAMPLES_PER_FRAME = 960
TIME_BASE = fractions.Fraction(1, SAMPLE_RATE)
class OpusDecoder(Decoder):
def __init__(self) -> None:
error = ffi.new("int *")
self.decoder = lib.opus_decoder_create(SAMPLE_RATE, CHANNELS, error)
assert error[0] == lib.OPUS_OK
def __del__(self) -> None:
lib.opus_decoder_destroy(self.decoder)
def decode(self, encoded_frame: JitterFrame) -> List[Frame]:
frame = AudioFrame(format="s16", layout="stereo", samples=SAMPLES_PER_FRAME)
frame.pts = encoded_frame.timestamp
frame.sample_rate = SAMPLE_RATE
frame.time_base = TIME_BASE
length = lib.opus_decode(
self.decoder,
encoded_frame.data,
len(encoded_frame.data),
ffi.cast("int16_t *", frame.planes[0].buffer_ptr),
SAMPLES_PER_FRAME,
0,
)
assert length == SAMPLES_PER_FRAME
return [frame]
class OpusEncoder(Encoder):
def __init__(self) -> None:
error = ffi.new("int *")
self.encoder = lib.opus_encoder_create(
SAMPLE_RATE, CHANNELS, lib.OPUS_APPLICATION_VOIP, error
)
assert error[0] == lib.OPUS_OK
self.cdata = ffi.new(
"unsigned char []", SAMPLES_PER_FRAME * CHANNELS * SAMPLE_WIDTH
)
self.buffer = ffi.buffer(self.cdata)
# Create our own resampler to control the frame size.
self.resampler = AudioResampler(
format="s16",
layout="stereo",
rate=SAMPLE_RATE,
frame_size=SAMPLES_PER_FRAME,
)
def __del__(self) -> None:
lib.opus_encoder_destroy(self.encoder)
def encode(
self, frame: Frame, force_keyframe: bool = False
) -> Tuple[List[bytes], int]:
assert isinstance(frame, AudioFrame)
assert frame.format.name == "s16"
assert frame.layout.name in ["mono", "stereo"]
# Send frame through resampler and encoder.
payloads = []
timestamp = None
for frame in self.resampler.resample(frame):
data = bytes(frame.planes[0])
length = lib.opus_encode(
self.encoder,
ffi.cast("int16_t*", ffi.from_buffer(data)),
SAMPLES_PER_FRAME,
self.cdata,
len(self.cdata),
)
assert length > 0
payloads.append(self.buffer[0:length])
if timestamp is None:
timestamp = frame.pts
return payloads, timestamp
def pack(self, packet: Packet) -> Tuple[List[bytes], int]:
timestamp = convert_timebase(packet.pts, packet.time_base, TIME_BASE)
return [bytes(packet)], timestamp