File size: 6,727 Bytes
06555b5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
import logging
from dataclasses import dataclass
from typing import Optional, Union

from pyee.asyncio import AsyncIOEventEmitter

from .exceptions import InvalidStateError

logger = logging.getLogger(__name__)


@dataclass
class RTCDataChannelParameters:
    """

    The :class:`RTCDataChannelParameters` dictionary describes the

    configuration of an :class:`RTCDataChannel`.

    """

    label: str = ""
    "A name describing the data channel."

    maxPacketLifeTime: Optional[int] = None
    "The maximum time in milliseconds during which transmissions are attempted."

    maxRetransmits: Optional[int] = None
    "The maximum number of retransmissions that are attempted."

    ordered: bool = True
    "Whether the data channel guarantees in-order delivery of messages."

    protocol: str = ""
    "The name of the subprotocol in use."

    negotiated: bool = False
    """

    Whether data channel will be negotiated out of-band, where both sides

    create data channel with an agreed-upon ID."""

    id: Optional[int] = None
    """

    An numeric ID for the channel; permitted values are 0-65534.

    If you don't include this option, the user agent will select an ID for you.

    Must be set when negotiating out-of-band.

    """


class RTCDataChannel(AsyncIOEventEmitter):
    """

    The :class:`RTCDataChannel` interface represents a network channel which

    can be used for bidirectional peer-to-peer transfers of arbitrary data.



    :param transport: An :class:`RTCSctpTransport`.

    :param parameters: An :class:`RTCDataChannelParameters`.

    """

    def __init__(

        self, transport, parameters: RTCDataChannelParameters, send_open: bool = True

    ) -> None:
        super().__init__()
        self.__bufferedAmount = 0
        self.__bufferedAmountLowThreshold = 0
        self.__id = parameters.id
        self.__parameters = parameters
        self.__readyState = "connecting"
        self.__transport = transport
        self.__send_open = send_open

        if self.__parameters.negotiated and (
            self.__id is None or self.__id < 0 or self.__id > 65534
        ):
            raise ValueError(
                "ID must be in range 0-65534 if data channel is negotiated out-of-band"
            )

        if not self.__parameters.negotiated:
            if self.__send_open:
                self.__send_open = False
                self.__transport._data_channel_open(self)
        else:
            self.__transport._data_channel_add_negotiated(self)

    @property
    def bufferedAmount(self) -> int:
        """

        The number of bytes of data currently queued to be sent over the data channel.

        """
        return self.__bufferedAmount

    @property
    def bufferedAmountLowThreshold(self) -> int:
        """

        The number of bytes of buffered outgoing data that is considered "low".

        """
        return self.__bufferedAmountLowThreshold

    @bufferedAmountLowThreshold.setter
    def bufferedAmountLowThreshold(self, value: int) -> None:
        if value < 0 or value > 4294967295:
            raise ValueError(
                "bufferedAmountLowThreshold must be in range 0 - 4294967295"
            )
        self.__bufferedAmountLowThreshold = value

    @property
    def negotiated(self) -> bool:
        """

        Whether data channel was negotiated out-of-band.

        """
        return self.__parameters.negotiated

    @property
    def id(self) -> Optional[int]:
        """

        An ID number which uniquely identifies the data channel.

        """
        return self.__id

    @property
    def label(self) -> str:
        """

        A name describing the data channel.



        These labels are not required to be unique.

        """
        return self.__parameters.label

    @property
    def ordered(self) -> bool:
        """

        Indicates whether or not the data channel guarantees in-order delivery of

        messages.

        """
        return self.__parameters.ordered

    @property
    def maxPacketLifeTime(self) -> Optional[int]:
        """

        The maximum time in milliseconds during which transmissions are attempted.

        """
        return self.__parameters.maxPacketLifeTime

    @property
    def maxRetransmits(self) -> Optional[int]:
        """

        "The maximum number of retransmissions that are attempted.

        """
        return self.__parameters.maxRetransmits

    @property
    def protocol(self) -> str:
        """

        The name of the subprotocol in use.

        """
        return self.__parameters.protocol

    @property
    def readyState(self) -> str:
        """

        A string indicating the current state of the underlying data transport.

        """
        return self.__readyState

    @property
    def transport(self):
        """

        The :class:`RTCSctpTransport` over which data is transmitted.

        """
        return self.__transport

    def close(self) -> None:
        """

        Close the data channel.

        """
        self.transport._data_channel_close(self)

    def send(self, data: Union[bytes, str]) -> None:
        """

        Send `data` across the data channel to the remote peer.

        """
        if self.readyState != "open":
            raise InvalidStateError

        if not isinstance(data, (str, bytes)):
            raise ValueError(f"Cannot send unsupported data type: {type(data)}")

        self.transport._data_channel_send(self, data)

    def _addBufferedAmount(self, amount: int) -> None:
        crosses_threshold = (
            self.__bufferedAmount > self.bufferedAmountLowThreshold
            and self.__bufferedAmount + amount <= self.bufferedAmountLowThreshold
        )
        self.__bufferedAmount += amount
        if crosses_threshold:
            self.emit("bufferedamountlow")

    def _setId(self, id: int) -> None:
        self.__id = id

    def _setReadyState(self, state: str) -> None:
        if state != self.__readyState:
            self.__log_debug("- %s -> %s", self.__readyState, state)
            self.__readyState = state

            if state == "open":
                self.emit("open")
            elif state == "closed":
                self.emit("close")

                # no more events will be emitted, so remove all event listeners
                # to facilitate garbage collection.
                self.remove_all_listeners()

    def __log_debug(self, msg: str, *args) -> None:
        logger.debug(f"RTCDataChannel(%s) {msg}", self.__id, *args)