File size: 4,060 Bytes
06555b5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import hashlib
import ipaddress
from typing import Optional


def candidate_foundation(
    candidate_type: str, candidate_transport: str, base_address: str
) -> str:
    """
    See RFC 5245 - 4.1.1.3. Computing Foundations
    """
    key = "%s|%s|%s" % (candidate_type, candidate_transport, base_address)
    return hashlib.md5(key.encode("ascii")).hexdigest()


def candidate_priority(
    candidate_component: int, candidate_type: str, local_pref: int = 65535
) -> int:
    """
    See RFC 5245 - 4.1.2.1. Recommended Formula
    """
    if candidate_type == "host":
        type_pref = 126
    elif candidate_type == "prflx":
        type_pref = 110
    elif candidate_type == "srflx":
        type_pref = 100
    else:
        type_pref = 0

    return (1 << 24) * type_pref + (1 << 8) * local_pref + (256 - candidate_component)


class Candidate:
    """
    An ICE candidate.
    """

    def __init__(
        self,
        foundation: str,
        component: int,
        transport: str,
        priority: int,
        host: str,
        port: int,
        type: str,
        related_address: Optional[str] = None,
        related_port: Optional[int] = None,
        tcptype: Optional[str] = None,
        generation: Optional[int] = None,
    ) -> None:
        self.foundation = foundation
        self.component = component
        self.transport = transport
        self.priority = priority
        self.host = host
        self.port = port
        self.type = type
        self.related_address = related_address
        self.related_port = related_port
        self.tcptype = tcptype
        self.generation = generation

    @classmethod
    def from_sdp(cls, sdp):
        """
        Parse a :class:`Candidate` from SDP.

        .. code-block:: python

           Candidate.from_sdp(
            '6815297761 1 udp 659136 1.2.3.4 31102 typ host generation 0')
        """
        bits = sdp.split()
        if len(bits) < 8:
            raise ValueError("SDP does not have enough properties")

        kwargs = {
            "foundation": bits[0],
            "component": int(bits[1]),
            "transport": bits[2],
            "priority": int(bits[3]),
            "host": bits[4],
            "port": int(bits[5]),
            "type": bits[7],
        }

        for i in range(8, len(bits) - 1, 2):
            if bits[i] == "raddr":
                kwargs["related_address"] = bits[i + 1]
            elif bits[i] == "rport":
                kwargs["related_port"] = int(bits[i + 1])
            elif bits[i] == "tcptype":
                kwargs["tcptype"] = bits[i + 1]
            elif bits[i] == "generation":
                kwargs["generation"] = int(bits[i + 1])

        return Candidate(**kwargs)

    def to_sdp(self) -> str:
        """
        Return a string representation suitable for SDP.
        """
        sdp = "%s %d %s %d %s %d typ %s" % (
            self.foundation,
            self.component,
            self.transport,
            self.priority,
            self.host,
            self.port,
            self.type,
        )
        if self.related_address is not None:
            sdp += " raddr %s" % self.related_address
        if self.related_port is not None:
            sdp += " rport %s" % self.related_port
        if self.tcptype is not None:
            sdp += " tcptype %s" % self.tcptype
        if self.generation is not None:
            sdp += " generation %d" % self.generation
        return sdp

    def can_pair_with(self, other) -> bool:
        """
        A local candidate is paired with a remote candidate if and only if
        the two candidates have the same component ID and have the same IP
        address version.
        """
        a = ipaddress.ip_address(self.host)
        b = ipaddress.ip_address(other.host)
        return (
            self.component == other.component
            and self.transport.lower() == other.transport.lower()
            and a.version == b.version
        )

    def __repr__(self) -> str:
        return "Candidate(%s)" % self.to_sdp()