|
|
|
|
|
|
|
|
|
from __future__ import annotations |
|
|
|
import datetime |
|
import typing |
|
|
|
from cryptography import utils, x509 |
|
from cryptography.hazmat.bindings._rust import ocsp |
|
from cryptography.hazmat.primitives import hashes |
|
from cryptography.hazmat.primitives.asymmetric.types import ( |
|
CertificateIssuerPrivateKeyTypes, |
|
) |
|
from cryptography.x509.base import ( |
|
_EARLIEST_UTC_TIME, |
|
_convert_to_naive_utc_time, |
|
_reject_duplicate_extension, |
|
) |
|
|
|
|
|
class OCSPResponderEncoding(utils.Enum): |
|
HASH = "By Hash" |
|
NAME = "By Name" |
|
|
|
|
|
class OCSPResponseStatus(utils.Enum): |
|
SUCCESSFUL = 0 |
|
MALFORMED_REQUEST = 1 |
|
INTERNAL_ERROR = 2 |
|
TRY_LATER = 3 |
|
SIG_REQUIRED = 5 |
|
UNAUTHORIZED = 6 |
|
|
|
|
|
_ALLOWED_HASHES = ( |
|
hashes.SHA1, |
|
hashes.SHA224, |
|
hashes.SHA256, |
|
hashes.SHA384, |
|
hashes.SHA512, |
|
) |
|
|
|
|
|
def _verify_algorithm(algorithm: hashes.HashAlgorithm) -> None: |
|
if not isinstance(algorithm, _ALLOWED_HASHES): |
|
raise ValueError( |
|
"Algorithm must be SHA1, SHA224, SHA256, SHA384, or SHA512" |
|
) |
|
|
|
|
|
class OCSPCertStatus(utils.Enum): |
|
GOOD = 0 |
|
REVOKED = 1 |
|
UNKNOWN = 2 |
|
|
|
|
|
class _SingleResponse: |
|
def __init__( |
|
self, |
|
cert: x509.Certificate, |
|
issuer: x509.Certificate, |
|
algorithm: hashes.HashAlgorithm, |
|
cert_status: OCSPCertStatus, |
|
this_update: datetime.datetime, |
|
next_update: datetime.datetime | None, |
|
revocation_time: datetime.datetime | None, |
|
revocation_reason: x509.ReasonFlags | None, |
|
): |
|
if not isinstance(cert, x509.Certificate) or not isinstance( |
|
issuer, x509.Certificate |
|
): |
|
raise TypeError("cert and issuer must be a Certificate") |
|
|
|
_verify_algorithm(algorithm) |
|
if not isinstance(this_update, datetime.datetime): |
|
raise TypeError("this_update must be a datetime object") |
|
if next_update is not None and not isinstance( |
|
next_update, datetime.datetime |
|
): |
|
raise TypeError("next_update must be a datetime object or None") |
|
|
|
self._cert = cert |
|
self._issuer = issuer |
|
self._algorithm = algorithm |
|
self._this_update = this_update |
|
self._next_update = next_update |
|
|
|
if not isinstance(cert_status, OCSPCertStatus): |
|
raise TypeError( |
|
"cert_status must be an item from the OCSPCertStatus enum" |
|
) |
|
if cert_status is not OCSPCertStatus.REVOKED: |
|
if revocation_time is not None: |
|
raise ValueError( |
|
"revocation_time can only be provided if the certificate " |
|
"is revoked" |
|
) |
|
if revocation_reason is not None: |
|
raise ValueError( |
|
"revocation_reason can only be provided if the certificate" |
|
" is revoked" |
|
) |
|
else: |
|
if not isinstance(revocation_time, datetime.datetime): |
|
raise TypeError("revocation_time must be a datetime object") |
|
|
|
revocation_time = _convert_to_naive_utc_time(revocation_time) |
|
if revocation_time < _EARLIEST_UTC_TIME: |
|
raise ValueError( |
|
"The revocation_time must be on or after" |
|
" 1950 January 1." |
|
) |
|
|
|
if revocation_reason is not None and not isinstance( |
|
revocation_reason, x509.ReasonFlags |
|
): |
|
raise TypeError( |
|
"revocation_reason must be an item from the ReasonFlags " |
|
"enum or None" |
|
) |
|
|
|
self._cert_status = cert_status |
|
self._revocation_time = revocation_time |
|
self._revocation_reason = revocation_reason |
|
|
|
|
|
OCSPRequest = ocsp.OCSPRequest |
|
OCSPResponse = ocsp.OCSPResponse |
|
OCSPSingleResponse = ocsp.OCSPSingleResponse |
|
|
|
|
|
class OCSPRequestBuilder: |
|
def __init__( |
|
self, |
|
request: tuple[ |
|
x509.Certificate, x509.Certificate, hashes.HashAlgorithm |
|
] |
|
| None = None, |
|
request_hash: tuple[bytes, bytes, int, hashes.HashAlgorithm] |
|
| None = None, |
|
extensions: list[x509.Extension[x509.ExtensionType]] = [], |
|
) -> None: |
|
self._request = request |
|
self._request_hash = request_hash |
|
self._extensions = extensions |
|
|
|
def add_certificate( |
|
self, |
|
cert: x509.Certificate, |
|
issuer: x509.Certificate, |
|
algorithm: hashes.HashAlgorithm, |
|
) -> OCSPRequestBuilder: |
|
if self._request is not None or self._request_hash is not None: |
|
raise ValueError("Only one certificate can be added to a request") |
|
|
|
_verify_algorithm(algorithm) |
|
if not isinstance(cert, x509.Certificate) or not isinstance( |
|
issuer, x509.Certificate |
|
): |
|
raise TypeError("cert and issuer must be a Certificate") |
|
|
|
return OCSPRequestBuilder( |
|
(cert, issuer, algorithm), self._request_hash, self._extensions |
|
) |
|
|
|
def add_certificate_by_hash( |
|
self, |
|
issuer_name_hash: bytes, |
|
issuer_key_hash: bytes, |
|
serial_number: int, |
|
algorithm: hashes.HashAlgorithm, |
|
) -> OCSPRequestBuilder: |
|
if self._request is not None or self._request_hash is not None: |
|
raise ValueError("Only one certificate can be added to a request") |
|
|
|
if not isinstance(serial_number, int): |
|
raise TypeError("serial_number must be an integer") |
|
|
|
_verify_algorithm(algorithm) |
|
utils._check_bytes("issuer_name_hash", issuer_name_hash) |
|
utils._check_bytes("issuer_key_hash", issuer_key_hash) |
|
if algorithm.digest_size != len( |
|
issuer_name_hash |
|
) or algorithm.digest_size != len(issuer_key_hash): |
|
raise ValueError( |
|
"issuer_name_hash and issuer_key_hash must be the same length " |
|
"as the digest size of the algorithm" |
|
) |
|
|
|
return OCSPRequestBuilder( |
|
self._request, |
|
(issuer_name_hash, issuer_key_hash, serial_number, algorithm), |
|
self._extensions, |
|
) |
|
|
|
def add_extension( |
|
self, extval: x509.ExtensionType, critical: bool |
|
) -> OCSPRequestBuilder: |
|
if not isinstance(extval, x509.ExtensionType): |
|
raise TypeError("extension must be an ExtensionType") |
|
|
|
extension = x509.Extension(extval.oid, critical, extval) |
|
_reject_duplicate_extension(extension, self._extensions) |
|
|
|
return OCSPRequestBuilder( |
|
self._request, self._request_hash, [*self._extensions, extension] |
|
) |
|
|
|
def build(self) -> OCSPRequest: |
|
if self._request is None and self._request_hash is None: |
|
raise ValueError("You must add a certificate before building") |
|
|
|
return ocsp.create_ocsp_request(self) |
|
|
|
|
|
class OCSPResponseBuilder: |
|
def __init__( |
|
self, |
|
response: _SingleResponse | None = None, |
|
responder_id: tuple[x509.Certificate, OCSPResponderEncoding] |
|
| None = None, |
|
certs: list[x509.Certificate] | None = None, |
|
extensions: list[x509.Extension[x509.ExtensionType]] = [], |
|
): |
|
self._response = response |
|
self._responder_id = responder_id |
|
self._certs = certs |
|
self._extensions = extensions |
|
|
|
def add_response( |
|
self, |
|
cert: x509.Certificate, |
|
issuer: x509.Certificate, |
|
algorithm: hashes.HashAlgorithm, |
|
cert_status: OCSPCertStatus, |
|
this_update: datetime.datetime, |
|
next_update: datetime.datetime | None, |
|
revocation_time: datetime.datetime | None, |
|
revocation_reason: x509.ReasonFlags | None, |
|
) -> OCSPResponseBuilder: |
|
if self._response is not None: |
|
raise ValueError("Only one response per OCSPResponse.") |
|
|
|
singleresp = _SingleResponse( |
|
cert, |
|
issuer, |
|
algorithm, |
|
cert_status, |
|
this_update, |
|
next_update, |
|
revocation_time, |
|
revocation_reason, |
|
) |
|
return OCSPResponseBuilder( |
|
singleresp, |
|
self._responder_id, |
|
self._certs, |
|
self._extensions, |
|
) |
|
|
|
def responder_id( |
|
self, encoding: OCSPResponderEncoding, responder_cert: x509.Certificate |
|
) -> OCSPResponseBuilder: |
|
if self._responder_id is not None: |
|
raise ValueError("responder_id can only be set once") |
|
if not isinstance(responder_cert, x509.Certificate): |
|
raise TypeError("responder_cert must be a Certificate") |
|
if not isinstance(encoding, OCSPResponderEncoding): |
|
raise TypeError( |
|
"encoding must be an element from OCSPResponderEncoding" |
|
) |
|
|
|
return OCSPResponseBuilder( |
|
self._response, |
|
(responder_cert, encoding), |
|
self._certs, |
|
self._extensions, |
|
) |
|
|
|
def certificates( |
|
self, certs: typing.Iterable[x509.Certificate] |
|
) -> OCSPResponseBuilder: |
|
if self._certs is not None: |
|
raise ValueError("certificates may only be set once") |
|
certs = list(certs) |
|
if len(certs) == 0: |
|
raise ValueError("certs must not be an empty list") |
|
if not all(isinstance(x, x509.Certificate) for x in certs): |
|
raise TypeError("certs must be a list of Certificates") |
|
return OCSPResponseBuilder( |
|
self._response, |
|
self._responder_id, |
|
certs, |
|
self._extensions, |
|
) |
|
|
|
def add_extension( |
|
self, extval: x509.ExtensionType, critical: bool |
|
) -> OCSPResponseBuilder: |
|
if not isinstance(extval, x509.ExtensionType): |
|
raise TypeError("extension must be an ExtensionType") |
|
|
|
extension = x509.Extension(extval.oid, critical, extval) |
|
_reject_duplicate_extension(extension, self._extensions) |
|
|
|
return OCSPResponseBuilder( |
|
self._response, |
|
self._responder_id, |
|
self._certs, |
|
[*self._extensions, extension], |
|
) |
|
|
|
def sign( |
|
self, |
|
private_key: CertificateIssuerPrivateKeyTypes, |
|
algorithm: hashes.HashAlgorithm | None, |
|
) -> OCSPResponse: |
|
if self._response is None: |
|
raise ValueError("You must add a response before signing") |
|
if self._responder_id is None: |
|
raise ValueError("You must add a responder_id before signing") |
|
|
|
return ocsp.create_ocsp_response( |
|
OCSPResponseStatus.SUCCESSFUL, self, private_key, algorithm |
|
) |
|
|
|
@classmethod |
|
def build_unsuccessful( |
|
cls, response_status: OCSPResponseStatus |
|
) -> OCSPResponse: |
|
if not isinstance(response_status, OCSPResponseStatus): |
|
raise TypeError( |
|
"response_status must be an item from OCSPResponseStatus" |
|
) |
|
if response_status is OCSPResponseStatus.SUCCESSFUL: |
|
raise ValueError("response_status cannot be SUCCESSFUL") |
|
|
|
return ocsp.create_ocsp_response(response_status, None, None, None) |
|
|
|
|
|
load_der_ocsp_request = ocsp.load_der_ocsp_request |
|
load_der_ocsp_response = ocsp.load_der_ocsp_response |
|
|