File size: 3,192 Bytes
06555b5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import fractions
from typing import List, Tuple

from av import AudioFrame, AudioResampler
from av.frame import Frame
from av.packet import Packet

from ..jitterbuffer import JitterFrame
from ..mediastreams import convert_timebase
from ._opus import ffi, lib
from .base import Decoder, Encoder

CHANNELS = 2
SAMPLE_RATE = 48000
SAMPLE_WIDTH = 2
SAMPLES_PER_FRAME = 960
TIME_BASE = fractions.Fraction(1, SAMPLE_RATE)


class OpusDecoder(Decoder):
    def __init__(self) -> None:
        error = ffi.new("int *")
        self.decoder = lib.opus_decoder_create(SAMPLE_RATE, CHANNELS, error)
        assert error[0] == lib.OPUS_OK

    def __del__(self) -> None:
        lib.opus_decoder_destroy(self.decoder)

    def decode(self, encoded_frame: JitterFrame) -> List[Frame]:
        frame = AudioFrame(format="s16", layout="stereo", samples=SAMPLES_PER_FRAME)
        frame.pts = encoded_frame.timestamp
        frame.sample_rate = SAMPLE_RATE
        frame.time_base = TIME_BASE

        length = lib.opus_decode(
            self.decoder,
            encoded_frame.data,
            len(encoded_frame.data),
            ffi.cast("int16_t *", frame.planes[0].buffer_ptr),
            SAMPLES_PER_FRAME,
            0,
        )
        assert length == SAMPLES_PER_FRAME
        return [frame]


class OpusEncoder(Encoder):
    def __init__(self) -> None:
        error = ffi.new("int *")
        self.encoder = lib.opus_encoder_create(
            SAMPLE_RATE, CHANNELS, lib.OPUS_APPLICATION_VOIP, error
        )
        assert error[0] == lib.OPUS_OK

        self.cdata = ffi.new(
            "unsigned char []", SAMPLES_PER_FRAME * CHANNELS * SAMPLE_WIDTH
        )
        self.buffer = ffi.buffer(self.cdata)

        # Create our own resampler to control the frame size.
        self.resampler = AudioResampler(
            format="s16",
            layout="stereo",
            rate=SAMPLE_RATE,
            frame_size=SAMPLES_PER_FRAME,
        )

    def __del__(self) -> None:
        lib.opus_encoder_destroy(self.encoder)

    def encode(

        self, frame: Frame, force_keyframe: bool = False

    ) -> Tuple[List[bytes], int]:
        assert isinstance(frame, AudioFrame)
        assert frame.format.name == "s16"
        assert frame.layout.name in ["mono", "stereo"]

        # Send frame through resampler and encoder.
        payloads = []
        timestamp = None
        for frame in self.resampler.resample(frame):
            data = bytes(frame.planes[0])
            length = lib.opus_encode(
                self.encoder,
                ffi.cast("int16_t*", ffi.from_buffer(data)),
                SAMPLES_PER_FRAME,
                self.cdata,
                len(self.cdata),
            )
            assert length > 0

            payloads.append(self.buffer[0:length])
            if timestamp is None:
                timestamp = frame.pts

        return payloads, timestamp

    def pack(self, packet: Packet) -> Tuple[List[bytes], int]:
        timestamp = convert_timebase(packet.pts, packet.time_base, TIME_BASE)
        return [bytes(packet)], timestamp