File size: 4,211 Bytes
06555b5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import annotations
import abc
import typing
from cryptography.hazmat.bindings._rust import openssl as rust_openssl
from cryptography.hazmat.primitives._cipheralgorithm import CipherAlgorithm
from cryptography.hazmat.primitives.ciphers import modes
class CipherContext(metaclass=abc.ABCMeta):
@abc.abstractmethod
def update(self, data: bytes) -> bytes:
"""
Processes the provided bytes through the cipher and returns the results
as bytes.
"""
@abc.abstractmethod
def update_into(self, data: bytes, buf: bytes) -> int:
"""
Processes the provided bytes and writes the resulting data into the
provided buffer. Returns the number of bytes written.
"""
@abc.abstractmethod
def finalize(self) -> bytes:
"""
Returns the results of processing the final block as bytes.
"""
@abc.abstractmethod
def reset_nonce(self, nonce: bytes) -> None:
"""
Resets the nonce for the cipher context to the provided value.
Raises an exception if it does not support reset or if the
provided nonce does not have a valid length.
"""
class AEADCipherContext(CipherContext, metaclass=abc.ABCMeta):
@abc.abstractmethod
def authenticate_additional_data(self, data: bytes) -> None:
"""
Authenticates the provided bytes.
"""
class AEADDecryptionContext(AEADCipherContext, metaclass=abc.ABCMeta):
@abc.abstractmethod
def finalize_with_tag(self, tag: bytes) -> bytes:
"""
Returns the results of processing the final block as bytes and allows
delayed passing of the authentication tag.
"""
class AEADEncryptionContext(AEADCipherContext, metaclass=abc.ABCMeta):
@property
@abc.abstractmethod
def tag(self) -> bytes:
"""
Returns tag bytes. This is only available after encryption is
finalized.
"""
Mode = typing.TypeVar(
"Mode", bound=typing.Optional[modes.Mode], covariant=True
)
class Cipher(typing.Generic[Mode]):
def __init__(
self,
algorithm: CipherAlgorithm,
mode: Mode,
backend: typing.Any = None,
) -> None:
if not isinstance(algorithm, CipherAlgorithm):
raise TypeError("Expected interface of CipherAlgorithm.")
if mode is not None:
# mypy needs this assert to narrow the type from our generic
# type. Maybe it won't some time in the future.
assert isinstance(mode, modes.Mode)
mode.validate_for_algorithm(algorithm)
self.algorithm = algorithm
self.mode = mode
@typing.overload
def encryptor(
self: Cipher[modes.ModeWithAuthenticationTag],
) -> AEADEncryptionContext: ...
@typing.overload
def encryptor(
self: _CIPHER_TYPE,
) -> CipherContext: ...
def encryptor(self):
if isinstance(self.mode, modes.ModeWithAuthenticationTag):
if self.mode.tag is not None:
raise ValueError(
"Authentication tag must be None when encrypting."
)
return rust_openssl.ciphers.create_encryption_ctx(
self.algorithm, self.mode
)
@typing.overload
def decryptor(
self: Cipher[modes.ModeWithAuthenticationTag],
) -> AEADDecryptionContext: ...
@typing.overload
def decryptor(
self: _CIPHER_TYPE,
) -> CipherContext: ...
def decryptor(self):
return rust_openssl.ciphers.create_decryption_ctx(
self.algorithm, self.mode
)
_CIPHER_TYPE = Cipher[
typing.Union[
modes.ModeWithNonce,
modes.ModeWithTweak,
None,
modes.ECB,
modes.ModeWithInitializationVector,
]
]
CipherContext.register(rust_openssl.ciphers.CipherContext)
AEADEncryptionContext.register(rust_openssl.ciphers.AEADEncryptionContext)
AEADDecryptionContext.register(rust_openssl.ciphers.AEADDecryptionContext)
|