|
|
|
|
|
|
|
|
|
from __future__ import annotations |
|
|
|
import abc |
|
import typing |
|
|
|
from cryptography.hazmat.bindings._rust import openssl as rust_openssl |
|
from cryptography.hazmat.primitives._cipheralgorithm import CipherAlgorithm |
|
from cryptography.hazmat.primitives.ciphers import modes |
|
|
|
|
|
class CipherContext(metaclass=abc.ABCMeta): |
|
@abc.abstractmethod |
|
def update(self, data: bytes) -> bytes: |
|
""" |
|
Processes the provided bytes through the cipher and returns the results |
|
as bytes. |
|
""" |
|
|
|
@abc.abstractmethod |
|
def update_into(self, data: bytes, buf: bytes) -> int: |
|
""" |
|
Processes the provided bytes and writes the resulting data into the |
|
provided buffer. Returns the number of bytes written. |
|
""" |
|
|
|
@abc.abstractmethod |
|
def finalize(self) -> bytes: |
|
""" |
|
Returns the results of processing the final block as bytes. |
|
""" |
|
|
|
@abc.abstractmethod |
|
def reset_nonce(self, nonce: bytes) -> None: |
|
""" |
|
Resets the nonce for the cipher context to the provided value. |
|
Raises an exception if it does not support reset or if the |
|
provided nonce does not have a valid length. |
|
""" |
|
|
|
|
|
class AEADCipherContext(CipherContext, metaclass=abc.ABCMeta): |
|
@abc.abstractmethod |
|
def authenticate_additional_data(self, data: bytes) -> None: |
|
""" |
|
Authenticates the provided bytes. |
|
""" |
|
|
|
|
|
class AEADDecryptionContext(AEADCipherContext, metaclass=abc.ABCMeta): |
|
@abc.abstractmethod |
|
def finalize_with_tag(self, tag: bytes) -> bytes: |
|
""" |
|
Returns the results of processing the final block as bytes and allows |
|
delayed passing of the authentication tag. |
|
""" |
|
|
|
|
|
class AEADEncryptionContext(AEADCipherContext, metaclass=abc.ABCMeta): |
|
@property |
|
@abc.abstractmethod |
|
def tag(self) -> bytes: |
|
""" |
|
Returns tag bytes. This is only available after encryption is |
|
finalized. |
|
""" |
|
|
|
|
|
Mode = typing.TypeVar( |
|
"Mode", bound=typing.Optional[modes.Mode], covariant=True |
|
) |
|
|
|
|
|
class Cipher(typing.Generic[Mode]): |
|
def __init__( |
|
self, |
|
algorithm: CipherAlgorithm, |
|
mode: Mode, |
|
backend: typing.Any = None, |
|
) -> None: |
|
if not isinstance(algorithm, CipherAlgorithm): |
|
raise TypeError("Expected interface of CipherAlgorithm.") |
|
|
|
if mode is not None: |
|
|
|
|
|
assert isinstance(mode, modes.Mode) |
|
mode.validate_for_algorithm(algorithm) |
|
|
|
self.algorithm = algorithm |
|
self.mode = mode |
|
|
|
@typing.overload |
|
def encryptor( |
|
self: Cipher[modes.ModeWithAuthenticationTag], |
|
) -> AEADEncryptionContext: ... |
|
|
|
@typing.overload |
|
def encryptor( |
|
self: _CIPHER_TYPE, |
|
) -> CipherContext: ... |
|
|
|
def encryptor(self): |
|
if isinstance(self.mode, modes.ModeWithAuthenticationTag): |
|
if self.mode.tag is not None: |
|
raise ValueError( |
|
"Authentication tag must be None when encrypting." |
|
) |
|
|
|
return rust_openssl.ciphers.create_encryption_ctx( |
|
self.algorithm, self.mode |
|
) |
|
|
|
@typing.overload |
|
def decryptor( |
|
self: Cipher[modes.ModeWithAuthenticationTag], |
|
) -> AEADDecryptionContext: ... |
|
|
|
@typing.overload |
|
def decryptor( |
|
self: _CIPHER_TYPE, |
|
) -> CipherContext: ... |
|
|
|
def decryptor(self): |
|
return rust_openssl.ciphers.create_decryption_ctx( |
|
self.algorithm, self.mode |
|
) |
|
|
|
|
|
_CIPHER_TYPE = Cipher[ |
|
typing.Union[ |
|
modes.ModeWithNonce, |
|
modes.ModeWithTweak, |
|
None, |
|
modes.ECB, |
|
modes.ModeWithInitializationVector, |
|
] |
|
] |
|
|
|
CipherContext.register(rust_openssl.ciphers.CipherContext) |
|
AEADEncryptionContext.register(rust_openssl.ciphers.AEADEncryptionContext) |
|
AEADDecryptionContext.register(rust_openssl.ciphers.AEADDecryptionContext) |
|
|