import asyncio import logging import re from dataclasses import dataclass from typing import Any, Dict, List, Optional from aioice import Candidate, Connection, ConnectionClosed from pyee.asyncio import AsyncIOEventEmitter from .exceptions import InvalidStateError from .rtcconfiguration import RTCIceServer STUN_REGEX = re.compile( r"(?Pstun|stuns)\:(?P[^?:]+)(\:(?P[0-9]+?))?" # RFC 7064 does not define a "transport" option but some providers # include it, so just ignore it r"(\?transport=.*)?" ) TURN_REGEX = re.compile( r"(?Pturn|turns)\:(?P[^?:]+)(\:(?P[0-9]+?))?" r"(\?transport=(?P.*))?" ) logger = logging.getLogger(__name__) @dataclass class RTCIceCandidate: """ The :class:`RTCIceCandidate` interface represents a candidate Interactive Connectivity Establishment (ICE) configuration which may be used to establish an RTCPeerConnection. """ component: int foundation: str ip: str port: int priority: int protocol: str type: str relatedAddress: Optional[str] = None relatedPort: Optional[int] = None sdpMid: Optional[str] = None sdpMLineIndex: Optional[int] = None tcpType: Optional[str] = None @dataclass class RTCIceParameters: """ The :class:`RTCIceParameters` dictionary includes the ICE username fragment and password and other ICE-related parameters. """ usernameFragment: Optional[str] = None "ICE username fragment." password: Optional[str] = None "ICE password." iceLite: bool = False def candidate_from_aioice(x: Candidate) -> RTCIceCandidate: return RTCIceCandidate( component=x.component, foundation=x.foundation, ip=x.host, port=x.port, priority=x.priority, protocol=x.transport, relatedAddress=x.related_address, relatedPort=x.related_port, tcpType=x.tcptype, type=x.type, ) def candidate_to_aioice(x: RTCIceCandidate) -> Candidate: return Candidate( component=x.component, foundation=x.foundation, host=x.ip, port=x.port, priority=x.priority, related_address=x.relatedAddress, related_port=x.relatedPort, transport=x.protocol, tcptype=x.tcpType, type=x.type, ) def connection_kwargs(servers: List[RTCIceServer]) -> Dict[str, Any]: kwargs: Dict[str, Any] = {} for server in servers: if isinstance(server.urls, list): uris = server.urls else: uris = [server.urls] for uri in uris: parsed = parse_stun_turn_uri(uri) if parsed["scheme"] == "stun": # only a single STUN server is supported if "stun_server" in kwargs: continue kwargs["stun_server"] = (parsed["host"], parsed["port"]) elif parsed["scheme"] in ["turn", "turns"]: # only a single TURN server is supported if "turn_server" in kwargs: continue # only 'udp' and 'tcp' transports are supported if parsed["scheme"] == "turn" and parsed["transport"] not in [ "udp", "tcp", ]: continue elif parsed["scheme"] == "turns" and parsed["transport"] != "tcp": continue # only 'password' credentialType is supported if server.credentialType != "password": continue kwargs["turn_server"] = (parsed["host"], parsed["port"]) kwargs["turn_ssl"] = parsed["scheme"] == "turns" kwargs["turn_transport"] = parsed["transport"] kwargs["turn_username"] = server.username kwargs["turn_password"] = server.credential return kwargs def parse_stun_turn_uri(uri: str) -> Dict[str, Any]: if uri.startswith("stun"): match = STUN_REGEX.fullmatch(uri) elif uri.startswith("turn"): match = TURN_REGEX.fullmatch(uri) else: raise ValueError("malformed uri: invalid scheme") if not match: raise ValueError("malformed uri") # set port parsed: Dict[str, Any] = match.groupdict() if parsed["port"]: parsed["port"] = int(parsed["port"]) elif parsed["scheme"] in ["stuns", "turns"]: parsed["port"] = 5349 else: parsed["port"] = 3478 # set transport if parsed["scheme"] == "turn" and not parsed["transport"]: parsed["transport"] = "udp" elif parsed["scheme"] == "turns" and not parsed["transport"]: parsed["transport"] = "tcp" return parsed class RTCIceGatherer(AsyncIOEventEmitter): """ The :class:`RTCIceGatherer` interface gathers local host, server reflexive and relay candidates, as well as enabling the retrieval of local Interactive Connectivity Establishment (ICE) parameters which can be exchanged in signaling. """ def __init__(self, iceServers: Optional[List[RTCIceServer]] = None) -> None: super().__init__() if iceServers is None: iceServers = self.getDefaultIceServers() ice_kwargs = connection_kwargs(iceServers) self._connection = Connection(ice_controlling=False, **ice_kwargs) self._remote_candidates_end = False self.__state = "new" @property def state(self) -> str: """ The current state of the ICE gatherer. """ return self.__state async def gather(self) -> None: """ Gather ICE candidates. """ if self.__state == "new": self.__setState("gathering") await self._connection.gather_candidates() self.__setState("completed") @classmethod def getDefaultIceServers(cls) -> List[RTCIceServer]: """ Return the list of default :class:`RTCIceServer`. """ return [RTCIceServer("stun:stun.l.google.com:19302")] def getLocalCandidates(self) -> List[RTCIceCandidate]: """ Retrieve the list of valid local candidates associated with the ICE gatherer. """ return [candidate_from_aioice(x) for x in self._connection.local_candidates] def getLocalParameters(self) -> RTCIceParameters: """ Retrieve the ICE parameters of the ICE gatherer. :rtype: RTCIceParameters """ return RTCIceParameters( usernameFragment=self._connection.local_username, password=self._connection.local_password, ) def __setState(self, state: str) -> None: self.__state = state self.emit("statechange") class RTCIceTransport(AsyncIOEventEmitter): """ The :class:`RTCIceTransport` interface allows an application access to information about the Interactive Connectivity Establishment (ICE) transport over which packets are sent and received. :param gatherer: An :class:`RTCIceGatherer`. """ def __init__(self, gatherer: RTCIceGatherer) -> None: super().__init__() self.__iceGatherer = gatherer self.__monitor_task: Optional[asyncio.Future[None]] = None self.__start: Optional[asyncio.Event] = None self.__state = "new" self._connection = gatherer._connection self._role_set = False # expose recv / send methods self._recv = self._connection.recv self._send = self._connection.send @property def iceGatherer(self) -> RTCIceGatherer: """ The ICE gatherer passed in the constructor. """ return self.__iceGatherer @property def role(self) -> str: """ The current role of the ICE transport. Either `'controlling'` or `'controlled'`. """ if self._connection.ice_controlling: return "controlling" else: return "controlled" @property def state(self) -> str: """ The current state of the ICE transport. """ return self.__state async def addRemoteCandidate(self, candidate: Optional[RTCIceCandidate]) -> None: """ Add a remote candidate. :param candidate: The new candidate or `None` to signal end of candidates. """ if not self.__iceGatherer._remote_candidates_end: if candidate is None: self.__iceGatherer._remote_candidates_end = True await self._connection.add_remote_candidate(None) else: await self._connection.add_remote_candidate( candidate_to_aioice(candidate) ) def getRemoteCandidates(self) -> List[RTCIceCandidate]: """ Retrieve the list of candidates associated with the remote :class:`RTCIceTransport`. """ return [candidate_from_aioice(x) for x in self._connection.remote_candidates] async def start(self, remoteParameters: RTCIceParameters) -> None: """ Initiate connectivity checks. :param remoteParameters: The :class:`RTCIceParameters` associated with the remote :class:`RTCIceTransport`. """ if self.state == "closed": raise InvalidStateError("RTCIceTransport is closed") # handle the case where start is already in progress if self.__start is not None: await self.__start.wait() return self.__start = asyncio.Event() self.__monitor_task = asyncio.ensure_future(self._monitor()) self.__setState("checking") self._connection.remote_is_lite = remoteParameters.iceLite self._connection.remote_username = remoteParameters.usernameFragment self._connection.remote_password = remoteParameters.password try: await self._connection.connect() except ConnectionError: self.__setState("failed") else: self.__setState("completed") self.__start.set() async def stop(self) -> None: """ Irreversibly stop the :class:`RTCIceTransport`. """ if self.state != "closed": self.__setState("closed") await self._connection.close() if self.__monitor_task is not None: await self.__monitor_task self.__monitor_task = None async def _monitor(self) -> None: while True: event = await self._connection.get_event() if isinstance(event, ConnectionClosed): if self.state == "completed": self.__setState("failed") return def __log_debug(self, msg: str, *args) -> None: logger.debug(f"RTCIceTransport(%s) {msg}", self.role, *args) def __setState(self, state: str) -> None: if state != self.__state: self.__log_debug("- %s -> %s", self.__state, state) self.__state = state self.emit("statechange") # no more events will be emitted, so remove all event listeners # to facilitate garbage collection. if state == "closed": self.iceGatherer.remove_all_listeners() self.remove_all_listeners()