File size: 12,346 Bytes
06555b5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.

from __future__ import annotations

import email.base64mime
import email.generator
import email.message
import email.policy
import io
import typing

from cryptography import utils, x509
from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
from cryptography.hazmat.bindings._rust import pkcs7 as rust_pkcs7
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec, padding, rsa
from cryptography.utils import _check_byteslike

load_pem_pkcs7_certificates = rust_pkcs7.load_pem_pkcs7_certificates

load_der_pkcs7_certificates = rust_pkcs7.load_der_pkcs7_certificates

serialize_certificates = rust_pkcs7.serialize_certificates

PKCS7HashTypes = typing.Union[
    hashes.SHA224,
    hashes.SHA256,
    hashes.SHA384,
    hashes.SHA512,
]

PKCS7PrivateKeyTypes = typing.Union[
    rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey
]


class PKCS7Options(utils.Enum):
    Text = "Add text/plain MIME type"
    Binary = "Don't translate input data into canonical MIME format"
    DetachedSignature = "Don't embed data in the PKCS7 structure"
    NoCapabilities = "Don't embed SMIME capabilities"
    NoAttributes = "Don't embed authenticatedAttributes"
    NoCerts = "Don't embed signer certificate"


class PKCS7SignatureBuilder:
    def __init__(
        self,
        data: bytes | None = None,
        signers: list[
            tuple[
                x509.Certificate,
                PKCS7PrivateKeyTypes,
                PKCS7HashTypes,
                padding.PSS | padding.PKCS1v15 | None,
            ]
        ] = [],
        additional_certs: list[x509.Certificate] = [],
    ):
        self._data = data
        self._signers = signers
        self._additional_certs = additional_certs

    def set_data(self, data: bytes) -> PKCS7SignatureBuilder:
        _check_byteslike("data", data)
        if self._data is not None:
            raise ValueError("data may only be set once")

        return PKCS7SignatureBuilder(data, self._signers)

    def add_signer(
        self,
        certificate: x509.Certificate,
        private_key: PKCS7PrivateKeyTypes,
        hash_algorithm: PKCS7HashTypes,
        *,
        rsa_padding: padding.PSS | padding.PKCS1v15 | None = None,
    ) -> PKCS7SignatureBuilder:
        if not isinstance(
            hash_algorithm,
            (
                hashes.SHA224,
                hashes.SHA256,
                hashes.SHA384,
                hashes.SHA512,
            ),
        ):
            raise TypeError(
                "hash_algorithm must be one of hashes.SHA224, "
                "SHA256, SHA384, or SHA512"
            )
        if not isinstance(certificate, x509.Certificate):
            raise TypeError("certificate must be a x509.Certificate")

        if not isinstance(
            private_key, (rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey)
        ):
            raise TypeError("Only RSA & EC keys are supported at this time.")

        if rsa_padding is not None:
            if not isinstance(rsa_padding, (padding.PSS, padding.PKCS1v15)):
                raise TypeError("Padding must be PSS or PKCS1v15")
            if not isinstance(private_key, rsa.RSAPrivateKey):
                raise TypeError("Padding is only supported for RSA keys")

        return PKCS7SignatureBuilder(
            self._data,
            [
                *self._signers,
                (certificate, private_key, hash_algorithm, rsa_padding),
            ],
        )

    def add_certificate(
        self, certificate: x509.Certificate
    ) -> PKCS7SignatureBuilder:
        if not isinstance(certificate, x509.Certificate):
            raise TypeError("certificate must be a x509.Certificate")

        return PKCS7SignatureBuilder(
            self._data, self._signers, [*self._additional_certs, certificate]
        )

    def sign(
        self,
        encoding: serialization.Encoding,
        options: typing.Iterable[PKCS7Options],
        backend: typing.Any = None,
    ) -> bytes:
        if len(self._signers) == 0:
            raise ValueError("Must have at least one signer")
        if self._data is None:
            raise ValueError("You must add data to sign")
        options = list(options)
        if not all(isinstance(x, PKCS7Options) for x in options):
            raise ValueError("options must be from the PKCS7Options enum")
        if encoding not in (
            serialization.Encoding.PEM,
            serialization.Encoding.DER,
            serialization.Encoding.SMIME,
        ):
            raise ValueError(
                "Must be PEM, DER, or SMIME from the Encoding enum"
            )

        # Text is a meaningless option unless it is accompanied by
        # DetachedSignature
        if (
            PKCS7Options.Text in options
            and PKCS7Options.DetachedSignature not in options
        ):
            raise ValueError(
                "When passing the Text option you must also pass "
                "DetachedSignature"
            )

        if PKCS7Options.Text in options and encoding in (
            serialization.Encoding.DER,
            serialization.Encoding.PEM,
        ):
            raise ValueError(
                "The Text option is only available for SMIME serialization"
            )

        # No attributes implies no capabilities so we'll error if you try to
        # pass both.
        if (
            PKCS7Options.NoAttributes in options
            and PKCS7Options.NoCapabilities in options
        ):
            raise ValueError(
                "NoAttributes is a superset of NoCapabilities. Do not pass "
                "both values."
            )

        return rust_pkcs7.sign_and_serialize(self, encoding, options)


class PKCS7EnvelopeBuilder:
    def __init__(
        self,
        *,
        _data: bytes | None = None,
        _recipients: list[x509.Certificate] | None = None,
    ):
        from cryptography.hazmat.backends.openssl.backend import (
            backend as ossl,
        )

        if not ossl.rsa_encryption_supported(padding=padding.PKCS1v15()):
            raise UnsupportedAlgorithm(
                "RSA with PKCS1 v1.5 padding is not supported by this version"
                " of OpenSSL.",
                _Reasons.UNSUPPORTED_PADDING,
            )
        self._data = _data
        self._recipients = _recipients if _recipients is not None else []

    def set_data(self, data: bytes) -> PKCS7EnvelopeBuilder:
        _check_byteslike("data", data)
        if self._data is not None:
            raise ValueError("data may only be set once")

        return PKCS7EnvelopeBuilder(_data=data, _recipients=self._recipients)

    def add_recipient(
        self,
        certificate: x509.Certificate,
    ) -> PKCS7EnvelopeBuilder:
        if not isinstance(certificate, x509.Certificate):
            raise TypeError("certificate must be a x509.Certificate")

        if not isinstance(certificate.public_key(), rsa.RSAPublicKey):
            raise TypeError("Only RSA keys are supported at this time.")

        return PKCS7EnvelopeBuilder(
            _data=self._data,
            _recipients=[
                *self._recipients,
                certificate,
            ],
        )

    def encrypt(
        self,
        encoding: serialization.Encoding,
        options: typing.Iterable[PKCS7Options],
    ) -> bytes:
        if len(self._recipients) == 0:
            raise ValueError("Must have at least one recipient")
        if self._data is None:
            raise ValueError("You must add data to encrypt")
        options = list(options)
        if not all(isinstance(x, PKCS7Options) for x in options):
            raise ValueError("options must be from the PKCS7Options enum")
        if encoding not in (
            serialization.Encoding.PEM,
            serialization.Encoding.DER,
            serialization.Encoding.SMIME,
        ):
            raise ValueError(
                "Must be PEM, DER, or SMIME from the Encoding enum"
            )

        # Only allow options that make sense for encryption
        if any(
            opt not in [PKCS7Options.Text, PKCS7Options.Binary]
            for opt in options
        ):
            raise ValueError(
                "Only the following options are supported for encryption: "
                "Text, Binary"
            )
        elif PKCS7Options.Text in options and PKCS7Options.Binary in options:
            # OpenSSL accepts both options at the same time, but ignores Text.
            # We fail defensively to avoid unexpected outputs.
            raise ValueError(
                "Cannot use Binary and Text options at the same time"
            )

        return rust_pkcs7.encrypt_and_serialize(self, encoding, options)


pkcs7_decrypt_der = rust_pkcs7.decrypt_der
pkcs7_decrypt_pem = rust_pkcs7.decrypt_pem
pkcs7_decrypt_smime = rust_pkcs7.decrypt_smime


def _smime_signed_encode(
    data: bytes, signature: bytes, micalg: str, text_mode: bool
) -> bytes:
    # This function works pretty hard to replicate what OpenSSL does
    # precisely. For good and for ill.

    m = email.message.Message()
    m.add_header("MIME-Version", "1.0")
    m.add_header(
        "Content-Type",
        "multipart/signed",
        protocol="application/x-pkcs7-signature",
        micalg=micalg,
    )

    m.preamble = "This is an S/MIME signed message\n"

    msg_part = OpenSSLMimePart()
    msg_part.set_payload(data)
    if text_mode:
        msg_part.add_header("Content-Type", "text/plain")
    m.attach(msg_part)

    sig_part = email.message.MIMEPart()
    sig_part.add_header(
        "Content-Type", "application/x-pkcs7-signature", name="smime.p7s"
    )
    sig_part.add_header("Content-Transfer-Encoding", "base64")
    sig_part.add_header(
        "Content-Disposition", "attachment", filename="smime.p7s"
    )
    sig_part.set_payload(
        email.base64mime.body_encode(signature, maxlinelen=65)
    )
    del sig_part["MIME-Version"]
    m.attach(sig_part)

    fp = io.BytesIO()
    g = email.generator.BytesGenerator(
        fp,
        maxheaderlen=0,
        mangle_from_=False,
        policy=m.policy.clone(linesep="\r\n"),
    )
    g.flatten(m)
    return fp.getvalue()


def _smime_enveloped_encode(data: bytes) -> bytes:
    m = email.message.Message()
    m.add_header("MIME-Version", "1.0")
    m.add_header("Content-Disposition", "attachment", filename="smime.p7m")
    m.add_header(
        "Content-Type",
        "application/pkcs7-mime",
        smime_type="enveloped-data",
        name="smime.p7m",
    )
    m.add_header("Content-Transfer-Encoding", "base64")

    m.set_payload(email.base64mime.body_encode(data, maxlinelen=65))

    return m.as_bytes(policy=m.policy.clone(linesep="\n", max_line_length=0))


def _smime_enveloped_decode(data: bytes) -> bytes:
    m = email.message_from_bytes(data)
    if m.get_content_type() not in {
        "application/x-pkcs7-mime",
        "application/pkcs7-mime",
    }:
        raise ValueError("Not an S/MIME enveloped message")
    return bytes(m.get_payload(decode=True))


def _smime_remove_text_headers(data: bytes) -> bytes:
    m = email.message_from_bytes(data)
    # Using get() instead of get_content_type() since it has None as default,
    # where the latter has "text/plain". Both methods are case-insensitive.
    content_type = m.get("content-type")
    if content_type is None:
        raise ValueError(
            "Decrypted MIME data has no 'Content-Type' header. "
            "Please remove the 'Text' option to parse it manually."
        )
    if "text/plain" not in content_type:
        raise ValueError(
            f"Decrypted MIME data content type is '{content_type}', not "
            "'text/plain'. Remove the 'Text' option to parse it manually."
        )
    return bytes(m.get_payload(decode=True))


class OpenSSLMimePart(email.message.MIMEPart):
    # A MIMEPart subclass that replicates OpenSSL's behavior of not including
    # a newline if there are no headers.
    def _write_headers(self, generator) -> None:
        if list(self.raw_items()):
            generator._write_headers(self)