File size: 6,727 Bytes
06555b5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 |
import logging
from dataclasses import dataclass
from typing import Optional, Union
from pyee.asyncio import AsyncIOEventEmitter
from .exceptions import InvalidStateError
logger = logging.getLogger(__name__)
@dataclass
class RTCDataChannelParameters:
"""
The :class:`RTCDataChannelParameters` dictionary describes the
configuration of an :class:`RTCDataChannel`.
"""
label: str = ""
"A name describing the data channel."
maxPacketLifeTime: Optional[int] = None
"The maximum time in milliseconds during which transmissions are attempted."
maxRetransmits: Optional[int] = None
"The maximum number of retransmissions that are attempted."
ordered: bool = True
"Whether the data channel guarantees in-order delivery of messages."
protocol: str = ""
"The name of the subprotocol in use."
negotiated: bool = False
"""
Whether data channel will be negotiated out of-band, where both sides
create data channel with an agreed-upon ID."""
id: Optional[int] = None
"""
An numeric ID for the channel; permitted values are 0-65534.
If you don't include this option, the user agent will select an ID for you.
Must be set when negotiating out-of-band.
"""
class RTCDataChannel(AsyncIOEventEmitter):
"""
The :class:`RTCDataChannel` interface represents a network channel which
can be used for bidirectional peer-to-peer transfers of arbitrary data.
:param transport: An :class:`RTCSctpTransport`.
:param parameters: An :class:`RTCDataChannelParameters`.
"""
def __init__(
self, transport, parameters: RTCDataChannelParameters, send_open: bool = True
) -> None:
super().__init__()
self.__bufferedAmount = 0
self.__bufferedAmountLowThreshold = 0
self.__id = parameters.id
self.__parameters = parameters
self.__readyState = "connecting"
self.__transport = transport
self.__send_open = send_open
if self.__parameters.negotiated and (
self.__id is None or self.__id < 0 or self.__id > 65534
):
raise ValueError(
"ID must be in range 0-65534 if data channel is negotiated out-of-band"
)
if not self.__parameters.negotiated:
if self.__send_open:
self.__send_open = False
self.__transport._data_channel_open(self)
else:
self.__transport._data_channel_add_negotiated(self)
@property
def bufferedAmount(self) -> int:
"""
The number of bytes of data currently queued to be sent over the data channel.
"""
return self.__bufferedAmount
@property
def bufferedAmountLowThreshold(self) -> int:
"""
The number of bytes of buffered outgoing data that is considered "low".
"""
return self.__bufferedAmountLowThreshold
@bufferedAmountLowThreshold.setter
def bufferedAmountLowThreshold(self, value: int) -> None:
if value < 0 or value > 4294967295:
raise ValueError(
"bufferedAmountLowThreshold must be in range 0 - 4294967295"
)
self.__bufferedAmountLowThreshold = value
@property
def negotiated(self) -> bool:
"""
Whether data channel was negotiated out-of-band.
"""
return self.__parameters.negotiated
@property
def id(self) -> Optional[int]:
"""
An ID number which uniquely identifies the data channel.
"""
return self.__id
@property
def label(self) -> str:
"""
A name describing the data channel.
These labels are not required to be unique.
"""
return self.__parameters.label
@property
def ordered(self) -> bool:
"""
Indicates whether or not the data channel guarantees in-order delivery of
messages.
"""
return self.__parameters.ordered
@property
def maxPacketLifeTime(self) -> Optional[int]:
"""
The maximum time in milliseconds during which transmissions are attempted.
"""
return self.__parameters.maxPacketLifeTime
@property
def maxRetransmits(self) -> Optional[int]:
"""
"The maximum number of retransmissions that are attempted.
"""
return self.__parameters.maxRetransmits
@property
def protocol(self) -> str:
"""
The name of the subprotocol in use.
"""
return self.__parameters.protocol
@property
def readyState(self) -> str:
"""
A string indicating the current state of the underlying data transport.
"""
return self.__readyState
@property
def transport(self):
"""
The :class:`RTCSctpTransport` over which data is transmitted.
"""
return self.__transport
def close(self) -> None:
"""
Close the data channel.
"""
self.transport._data_channel_close(self)
def send(self, data: Union[bytes, str]) -> None:
"""
Send `data` across the data channel to the remote peer.
"""
if self.readyState != "open":
raise InvalidStateError
if not isinstance(data, (str, bytes)):
raise ValueError(f"Cannot send unsupported data type: {type(data)}")
self.transport._data_channel_send(self, data)
def _addBufferedAmount(self, amount: int) -> None:
crosses_threshold = (
self.__bufferedAmount > self.bufferedAmountLowThreshold
and self.__bufferedAmount + amount <= self.bufferedAmountLowThreshold
)
self.__bufferedAmount += amount
if crosses_threshold:
self.emit("bufferedamountlow")
def _setId(self, id: int) -> None:
self.__id = id
def _setReadyState(self, state: str) -> None:
if state != self.__readyState:
self.__log_debug("- %s -> %s", self.__readyState, state)
self.__readyState = state
if state == "open":
self.emit("open")
elif state == "closed":
self.emit("close")
# no more events will be emitted, so remove all event listeners
# to facilitate garbage collection.
self.remove_all_listeners()
def __log_debug(self, msg: str, *args) -> None:
logger.debug(f"RTCDataChannel(%s) {msg}", self.__id, *args)
|