File size: 3,192 Bytes
06555b5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
import fractions
from typing import List, Tuple
from av import AudioFrame, AudioResampler
from av.frame import Frame
from av.packet import Packet
from ..jitterbuffer import JitterFrame
from ..mediastreams import convert_timebase
from ._opus import ffi, lib
from .base import Decoder, Encoder
CHANNELS = 2
SAMPLE_RATE = 48000
SAMPLE_WIDTH = 2
SAMPLES_PER_FRAME = 960
TIME_BASE = fractions.Fraction(1, SAMPLE_RATE)
class OpusDecoder(Decoder):
def __init__(self) -> None:
error = ffi.new("int *")
self.decoder = lib.opus_decoder_create(SAMPLE_RATE, CHANNELS, error)
assert error[0] == lib.OPUS_OK
def __del__(self) -> None:
lib.opus_decoder_destroy(self.decoder)
def decode(self, encoded_frame: JitterFrame) -> List[Frame]:
frame = AudioFrame(format="s16", layout="stereo", samples=SAMPLES_PER_FRAME)
frame.pts = encoded_frame.timestamp
frame.sample_rate = SAMPLE_RATE
frame.time_base = TIME_BASE
length = lib.opus_decode(
self.decoder,
encoded_frame.data,
len(encoded_frame.data),
ffi.cast("int16_t *", frame.planes[0].buffer_ptr),
SAMPLES_PER_FRAME,
0,
)
assert length == SAMPLES_PER_FRAME
return [frame]
class OpusEncoder(Encoder):
def __init__(self) -> None:
error = ffi.new("int *")
self.encoder = lib.opus_encoder_create(
SAMPLE_RATE, CHANNELS, lib.OPUS_APPLICATION_VOIP, error
)
assert error[0] == lib.OPUS_OK
self.cdata = ffi.new(
"unsigned char []", SAMPLES_PER_FRAME * CHANNELS * SAMPLE_WIDTH
)
self.buffer = ffi.buffer(self.cdata)
# Create our own resampler to control the frame size.
self.resampler = AudioResampler(
format="s16",
layout="stereo",
rate=SAMPLE_RATE,
frame_size=SAMPLES_PER_FRAME,
)
def __del__(self) -> None:
lib.opus_encoder_destroy(self.encoder)
def encode(
self, frame: Frame, force_keyframe: bool = False
) -> Tuple[List[bytes], int]:
assert isinstance(frame, AudioFrame)
assert frame.format.name == "s16"
assert frame.layout.name in ["mono", "stereo"]
# Send frame through resampler and encoder.
payloads = []
timestamp = None
for frame in self.resampler.resample(frame):
data = bytes(frame.planes[0])
length = lib.opus_encode(
self.encoder,
ffi.cast("int16_t*", ffi.from_buffer(data)),
SAMPLES_PER_FRAME,
self.cdata,
len(self.cdata),
)
assert length > 0
payloads.append(self.buffer[0:length])
if timestamp is None:
timestamp = frame.pts
return payloads, timestamp
def pack(self, packet: Packet) -> Tuple[List[bytes], int]:
timestamp = convert_timebase(packet.pts, packet.time_base, TIME_BASE)
return [bytes(packet)], timestamp
|