File size: 2,850 Bytes
06555b5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
import fractions
from typing import List, Tuple, cast
from av import AudioFrame, AudioResampler, CodecContext
from av.audio.codeccontext import AudioCodecContext
from av.frame import Frame
from av.packet import Packet
from ..jitterbuffer import JitterFrame
from ..mediastreams import convert_timebase
from .base import Decoder, Encoder
SAMPLE_RATE = 8000
SAMPLE_WIDTH = 2
SAMPLES_PER_FRAME = 160
TIME_BASE = fractions.Fraction(1, 8000)
class PcmDecoder(Decoder):
def __init__(self, codec_name: str) -> None:
self.codec = cast(AudioCodecContext, CodecContext.create(codec_name, "r"))
self.codec.format = "s16"
self.codec.layout = "mono"
self.codec.sample_rate = SAMPLE_RATE
def decode(self, encoded_frame: JitterFrame) -> List[Frame]:
packet = Packet(encoded_frame.data)
packet.pts = encoded_frame.timestamp
packet.time_base = TIME_BASE
return cast(List[Frame], self.codec.decode(packet))
class PcmEncoder(Encoder):
def __init__(self, codec_name: str) -> None:
self.codec = cast(AudioCodecContext, CodecContext.create(codec_name, "w"))
self.codec.format = "s16"
self.codec.layout = "mono"
self.codec.sample_rate = SAMPLE_RATE
self.codec.time_base = TIME_BASE
# Create our own resampler to control the frame size.
self.resampler = AudioResampler(
format="s16",
layout="mono",
rate=SAMPLE_RATE,
frame_size=SAMPLES_PER_FRAME,
)
def encode(
self, frame: Frame, force_keyframe: bool = False
) -> Tuple[List[bytes], int]:
assert isinstance(frame, AudioFrame)
assert frame.format.name == "s16"
assert frame.layout.name in ["mono", "stereo"]
# Send frame through resampler and encoder.
packets = []
for frame in self.resampler.resample(frame):
packets += self.codec.encode(frame)
if packets:
# Packets were returned.
return [bytes(p) for p in packets], packets[0].pts
else:
# No packets were returned due to buffering.
return [], None
def pack(self, packet: Packet) -> Tuple[List[bytes], int]:
timestamp = convert_timebase(packet.pts, packet.time_base, TIME_BASE)
return [bytes(packet)], timestamp
class PcmaDecoder(PcmDecoder):
def __init__(self) -> None:
super().__init__("pcm_alaw")
class PcmaEncoder(PcmEncoder):
def __init__(self) -> None:
super().__init__("pcm_alaw")
class PcmuDecoder(PcmDecoder):
def __init__(self) -> None:
super().__init__("pcm_mulaw")
class PcmuEncoder(PcmEncoder):
def __init__(self) -> None:
super().__init__("pcm_mulaw")
|