File size: 2,850 Bytes
06555b5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import fractions
from typing import List, Tuple, cast

from av import AudioFrame, AudioResampler, CodecContext
from av.audio.codeccontext import AudioCodecContext
from av.frame import Frame
from av.packet import Packet

from ..jitterbuffer import JitterFrame
from ..mediastreams import convert_timebase
from .base import Decoder, Encoder

SAMPLE_RATE = 8000
SAMPLE_WIDTH = 2
SAMPLES_PER_FRAME = 160
TIME_BASE = fractions.Fraction(1, 8000)


class PcmDecoder(Decoder):
    def __init__(self, codec_name: str) -> None:
        self.codec = cast(AudioCodecContext, CodecContext.create(codec_name, "r"))
        self.codec.format = "s16"
        self.codec.layout = "mono"
        self.codec.sample_rate = SAMPLE_RATE

    def decode(self, encoded_frame: JitterFrame) -> List[Frame]:
        packet = Packet(encoded_frame.data)
        packet.pts = encoded_frame.timestamp
        packet.time_base = TIME_BASE
        return cast(List[Frame], self.codec.decode(packet))


class PcmEncoder(Encoder):
    def __init__(self, codec_name: str) -> None:
        self.codec = cast(AudioCodecContext, CodecContext.create(codec_name, "w"))
        self.codec.format = "s16"
        self.codec.layout = "mono"
        self.codec.sample_rate = SAMPLE_RATE
        self.codec.time_base = TIME_BASE

        # Create our own resampler to control the frame size.
        self.resampler = AudioResampler(
            format="s16",
            layout="mono",
            rate=SAMPLE_RATE,
            frame_size=SAMPLES_PER_FRAME,
        )

    def encode(

        self, frame: Frame, force_keyframe: bool = False

    ) -> Tuple[List[bytes], int]:
        assert isinstance(frame, AudioFrame)
        assert frame.format.name == "s16"
        assert frame.layout.name in ["mono", "stereo"]

        # Send frame through resampler and encoder.
        packets = []
        for frame in self.resampler.resample(frame):
            packets += self.codec.encode(frame)

        if packets:
            # Packets were returned.
            return [bytes(p) for p in packets], packets[0].pts
        else:
            # No packets were returned due to buffering.
            return [], None

    def pack(self, packet: Packet) -> Tuple[List[bytes], int]:
        timestamp = convert_timebase(packet.pts, packet.time_base, TIME_BASE)
        return [bytes(packet)], timestamp


class PcmaDecoder(PcmDecoder):
    def __init__(self) -> None:
        super().__init__("pcm_alaw")


class PcmaEncoder(PcmEncoder):
    def __init__(self) -> None:
        super().__init__("pcm_alaw")


class PcmuDecoder(PcmDecoder):
    def __init__(self) -> None:
        super().__init__("pcm_mulaw")


class PcmuEncoder(PcmEncoder):
    def __init__(self) -> None:
        super().__init__("pcm_mulaw")