File size: 4,968 Bytes
06555b5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
import logging
from typing import List, Optional
from .codecs import get_capabilities
from .rtcdtlstransport import RTCDtlsTransport
from .rtcrtpparameters import (
RTCRtpCodecCapability,
RTCRtpCodecParameters,
RTCRtpHeaderExtensionParameters,
)
from .rtcrtpreceiver import RTCRtpReceiver
from .rtcrtpsender import RTCRtpSender
from .sdp import DIRECTIONS
logger = logging.getLogger(__name__)
class RTCRtpTransceiver:
"""
The RTCRtpTransceiver interface describes a permanent pairing of an
:class:`RTCRtpSender` and an :class:`RTCRtpReceiver`, along with some
shared state.
"""
def __init__(
self,
kind: str,
receiver: RTCRtpReceiver,
sender: RTCRtpSender,
direction: str = "sendrecv",
):
self.__currentDirection: Optional[str] = None
self.__direction = direction
self.__kind = kind
self.__mid: Optional[str] = None
self.__mline_index: Optional[int] = None
self.__receiver = receiver
self.__sender = sender
self.__stopped = False
self._offerDirection: Optional[str] = None
self._preferred_codecs: List[RTCRtpCodecCapability] = []
self._transport: RTCDtlsTransport = None
# FIXME: this is only used by RTCPeerConnection
self._bundled = False
self._codecs: List[RTCRtpCodecParameters] = []
self._headerExtensions: List[RTCRtpHeaderExtensionParameters] = []
@property
def currentDirection(self) -> Optional[str]:
"""
The currently negotiated direction of the transceiver.
One of `'sendrecv'`, `'sendonly'`, `'recvonly'`, `'inactive'` or `None`.
"""
return self.__currentDirection
@property
def direction(self) -> str:
"""
The preferred direction of the transceiver, which will be used in
:meth:`RTCPeerConnection.createOffer` and
:meth:`RTCPeerConnection.createAnswer`.
One of `'sendrecv'`, `'sendonly'`, `'recvonly'` or `'inactive'`.
"""
return self.__direction
@direction.setter
def direction(self, direction: str) -> None:
assert direction in DIRECTIONS
self.__direction = direction
@property
def kind(self) -> str:
return self.__kind
@property
def mid(self) -> Optional[str]:
return self.__mid
@property
def receiver(self) -> RTCRtpReceiver:
"""
The :class:`RTCRtpReceiver` that handles receiving and decoding
incoming media.
"""
return self.__receiver
@property
def sender(self) -> RTCRtpSender:
"""
The :class:`RTCRtpSender` responsible for encoding and sending
data to the remote peer.
"""
return self.__sender
@property
def stopped(self) -> bool:
return self.__stopped
def setCodecPreferences(self, codecs: List[RTCRtpCodecCapability]) -> None:
"""
Override the default codec preferences.
See :meth:`RTCRtpSender.getCapabilities` and
:meth:`RTCRtpReceiver.getCapabilities` for the supported codecs.
:param codecs: A list of :class:`RTCRtpCodecCapability`, in decreasing order
of preference. If empty, restores the default preferences.
"""
if not codecs:
self._preferred_codecs = []
capabilities = get_capabilities(self.kind).codecs
unique: List[RTCRtpCodecCapability] = []
for codec in reversed(codecs):
if codec not in capabilities:
raise ValueError("Codec is not in capabilities")
if codec not in unique:
unique.insert(0, codec)
self._preferred_codecs = unique
async def stop(self) -> None:
"""
Permanently stops the :class:`RTCRtpTransceiver`.
"""
await self.__receiver.stop()
await self.__sender.stop()
self.__stopped = True
def _setCurrentDirection(self, direction: str) -> None:
self.__currentDirection = direction
if direction == "sendrecv":
self.__sender._enabled = True
self.__receiver._enabled = True
elif direction == "sendonly":
self.__sender._enabled = True
self.__receiver._enabled = False
elif direction == "recvonly":
self.__sender._enabled = False
self.__receiver._enabled = True
elif direction == "inactive":
self.__sender._enabled = False
self.__receiver._enabled = False
def _set_mid(self, mid: str) -> None:
self.__mid = mid
def _get_mline_index(self) -> Optional[int]:
return self.__mline_index
def _set_mline_index(self, idx: int) -> None:
self.__mline_index = idx
|