mirror of
https://github.com/certbot/certbot.git
synced 2026-05-28 04:34:11 -04:00
deprecate certbot.ocsp (#10584)
this is part of https://github.com/certbot/certbot/issues/10517 to update this description in response to the discussion below, i'd recommend reviewing this PR by commit. the first commit just moves ocsp.py under _internal making no other changes while the second commit fixes everything else up. the diff really isn't as big here as it looks
This commit is contained in:
parent
b42b986fb7
commit
9ed92009db
9 changed files with 274 additions and 251 deletions
|
|
@ -1,7 +0,0 @@
|
|||
certbot.ocsp package
|
||||
======================
|
||||
|
||||
.. automodule:: certbot.ocsp
|
||||
:members:
|
||||
:undoc-members:
|
||||
:show-inheritance:
|
||||
|
|
@ -26,7 +26,6 @@ Submodules
|
|||
certbot.errors
|
||||
certbot.interfaces
|
||||
certbot.main
|
||||
certbot.ocsp
|
||||
certbot.reverter
|
||||
certbot.util
|
||||
|
||||
|
|
|
|||
|
|
@ -13,10 +13,10 @@ from typing import Union
|
|||
from certbot import configuration
|
||||
from certbot import crypto_util
|
||||
from certbot import errors
|
||||
from certbot import ocsp
|
||||
from certbot import util
|
||||
from certbot._internal import storage
|
||||
from certbot._internal import ocsp
|
||||
from certbot._internal import san
|
||||
from certbot._internal import storage
|
||||
from certbot.compat import os
|
||||
from certbot.display import util as display_util
|
||||
|
||||
|
|
|
|||
233
certbot/src/certbot/_internal/ocsp.py
Normal file
233
certbot/src/certbot/_internal/ocsp.py
Normal file
|
|
@ -0,0 +1,233 @@
|
|||
"""Tools for checking certificate revocation."""
|
||||
from datetime import datetime
|
||||
from datetime import timedelta
|
||||
from datetime import timezone
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from cryptography import x509
|
||||
from cryptography.exceptions import InvalidSignature
|
||||
from cryptography.exceptions import UnsupportedAlgorithm
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from cryptography.x509 import ocsp
|
||||
import requests
|
||||
|
||||
from certbot import crypto_util
|
||||
from certbot import errors
|
||||
from certbot.interfaces import RenewableCert
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RevocationChecker:
|
||||
"""This class figures out OCSP checking on this system, and performs it."""
|
||||
def ocsp_revoked(self, cert: RenewableCert) -> bool:
|
||||
"""Get revoked status for a particular cert version.
|
||||
|
||||
.. todo:: Make this a non-blocking call
|
||||
|
||||
:param `.interfaces.RenewableCert` cert: Certificate object
|
||||
:returns: True if revoked; False if valid or the check failed or cert is expired.
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
return self.ocsp_revoked_by_paths(cert.cert_path, cert.chain_path)
|
||||
|
||||
def ocsp_revoked_by_paths(self, cert_path: str, chain_path: str, timeout: int = 10) -> bool:
|
||||
"""Performs the OCSP revocation check
|
||||
|
||||
:param str cert_path: Certificate filepath
|
||||
:param str chain_path: Certificate chain
|
||||
:param int timeout: Timeout (in seconds) for the OCSP query
|
||||
|
||||
:returns: True if revoked; False if valid or the check failed or cert is expired.
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
# Let's Encrypt doesn't update OCSP for expired certificates,
|
||||
# so don't check OCSP if the cert is expired.
|
||||
# https://github.com/certbot/certbot/issues/7152
|
||||
now = datetime.now(timezone.utc)
|
||||
if crypto_util.notAfter(cert_path) <= now:
|
||||
return False
|
||||
|
||||
url, host = _determine_ocsp_server(cert_path)
|
||||
if not host or not url:
|
||||
return False
|
||||
|
||||
return _check_ocsp_cryptography(cert_path, chain_path, url, timeout)
|
||||
|
||||
|
||||
def _determine_ocsp_server(cert_path: str) -> tuple[Optional[str], Optional[str]]:
|
||||
"""Extract the OCSP server host from a certificate.
|
||||
|
||||
:param str cert_path: Path to the cert we're checking OCSP for
|
||||
:rtype tuple:
|
||||
:returns: (OCSP server URL or None, OCSP server host or None)
|
||||
|
||||
"""
|
||||
with open(cert_path, 'rb') as file_handler:
|
||||
cert = x509.load_pem_x509_certificate(file_handler.read(), default_backend())
|
||||
try:
|
||||
extension = cert.extensions.get_extension_for_class(x509.AuthorityInformationAccess)
|
||||
ocsp_oid = x509.AuthorityInformationAccessOID.OCSP
|
||||
descriptions = [description for description in extension.value
|
||||
if description.access_method == ocsp_oid]
|
||||
|
||||
url = descriptions[0].access_location.value
|
||||
except (x509.ExtensionNotFound, IndexError):
|
||||
logger.info("Cannot extract OCSP URI from %s", cert_path)
|
||||
return None, None
|
||||
|
||||
url = url.rstrip()
|
||||
host = url.partition("://")[2].rstrip("/")
|
||||
|
||||
if host:
|
||||
return url, host
|
||||
logger.info("Cannot process OCSP host from URL (%s) in certificate at %s", url, cert_path)
|
||||
return None, None
|
||||
|
||||
|
||||
def _check_ocsp_cryptography(cert_path: str, chain_path: str, url: str, timeout: int) -> bool:
|
||||
# Retrieve OCSP response
|
||||
with open(chain_path, 'rb') as file_handler:
|
||||
issuer = x509.load_pem_x509_certificate(file_handler.read(), default_backend())
|
||||
with open(cert_path, 'rb') as file_handler:
|
||||
cert = x509.load_pem_x509_certificate(file_handler.read(), default_backend())
|
||||
builder = ocsp.OCSPRequestBuilder()
|
||||
builder = builder.add_certificate(cert, issuer, hashes.SHA1())
|
||||
request = builder.build()
|
||||
request_binary = request.public_bytes(serialization.Encoding.DER)
|
||||
try:
|
||||
response = requests.post(url, data=request_binary,
|
||||
headers={'Content-Type': 'application/ocsp-request'},
|
||||
timeout=timeout)
|
||||
except requests.exceptions.RequestException:
|
||||
logger.info("OCSP check failed for %s (are we offline?)", cert_path, exc_info=True)
|
||||
return False
|
||||
if response.status_code != 200:
|
||||
logger.info("OCSP check failed for %s (HTTP status: %d)", cert_path, response.status_code)
|
||||
return False
|
||||
|
||||
response_ocsp = ocsp.load_der_ocsp_response(response.content)
|
||||
|
||||
# Check OCSP response validity
|
||||
if response_ocsp.response_status != ocsp.OCSPResponseStatus.SUCCESSFUL:
|
||||
logger.warning("Invalid OCSP response status for %s: %s",
|
||||
cert_path, response_ocsp.response_status)
|
||||
return False
|
||||
|
||||
# Check OCSP signature
|
||||
try:
|
||||
_check_ocsp_response(response_ocsp, request, issuer, cert_path)
|
||||
except UnsupportedAlgorithm as e:
|
||||
logger.warning(str(e))
|
||||
except errors.Error as e:
|
||||
logger.warning(str(e))
|
||||
except InvalidSignature:
|
||||
logger.warning('Invalid signature on OCSP response for %s', cert_path)
|
||||
except AssertionError as error:
|
||||
logger.warning('Invalid OCSP response for %s: %s.', cert_path, str(error))
|
||||
else:
|
||||
# Check OCSP certificate status
|
||||
logger.debug("OCSP certificate status for %s is: %s",
|
||||
cert_path, response_ocsp.certificate_status)
|
||||
return response_ocsp.certificate_status == ocsp.OCSPCertStatus.REVOKED
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def _check_ocsp_response(response_ocsp: 'ocsp.OCSPResponse', request_ocsp: 'ocsp.OCSPRequest',
|
||||
issuer_cert: x509.Certificate, cert_path: str) -> None:
|
||||
"""Verify that the OCSP is valid for several criteria"""
|
||||
# Assert OCSP response corresponds to the certificate we are talking about
|
||||
if response_ocsp.serial_number != request_ocsp.serial_number:
|
||||
raise AssertionError('the certificate in response does not correspond '
|
||||
'to the certificate in request')
|
||||
|
||||
# Assert signature is valid
|
||||
_check_ocsp_response_signature(response_ocsp, issuer_cert, cert_path)
|
||||
|
||||
# Assert issuer in response is the expected one
|
||||
if (not isinstance(response_ocsp.hash_algorithm, type(request_ocsp.hash_algorithm))
|
||||
or response_ocsp.issuer_key_hash != request_ocsp.issuer_key_hash
|
||||
or response_ocsp.issuer_name_hash != request_ocsp.issuer_name_hash):
|
||||
raise AssertionError('the issuer does not correspond to issuer of the certificate.')
|
||||
|
||||
# In following checks, two situations can occur:
|
||||
# * nextUpdate is set, and requirement is thisUpdate < now < nextUpdate
|
||||
# * nextUpdate is not set, and requirement is thisUpdate < now
|
||||
# NB1: We add a validity period tolerance to handle clock time inconsistencies,
|
||||
# value is 5 min like for OpenSSL.
|
||||
# NB2: Another check is to verify that thisUpdate is not too old, it is optional
|
||||
# for OpenSSL, so we do not do it here.
|
||||
# See OpenSSL implementation as a reference:
|
||||
# https://github.com/openssl/openssl/blob/ef45aa14c5af024fcb8bef1c9007f3d1c115bd85/crypto/ocsp/ocsp_cl.c#L338-L391
|
||||
# thisUpdate/nextUpdate are expressed in UTC/GMT time zone
|
||||
now = datetime.now(timezone.utc)
|
||||
if not response_ocsp.this_update_utc:
|
||||
raise AssertionError('param thisUpdate is not set.')
|
||||
if response_ocsp.this_update_utc > now + timedelta(minutes=5):
|
||||
raise AssertionError('param thisUpdate is in the future.')
|
||||
if response_ocsp.next_update_utc and response_ocsp.next_update_utc < now - timedelta(minutes=5):
|
||||
raise AssertionError('param nextUpdate is in the past.')
|
||||
|
||||
|
||||
def _check_ocsp_response_signature(response_ocsp: 'ocsp.OCSPResponse',
|
||||
issuer_cert: x509.Certificate, cert_path: str) -> None:
|
||||
"""Verify an OCSP response signature against certificate issuer or responder"""
|
||||
def _key_hash(cert: x509.Certificate) -> bytes:
|
||||
return x509.SubjectKeyIdentifier.from_public_key(cert.public_key()).digest
|
||||
|
||||
if (response_ocsp.responder_name == issuer_cert.subject
|
||||
or response_ocsp.responder_key_hash == _key_hash(issuer_cert)):
|
||||
# Case where the OCSP responder is also the certificate issuer
|
||||
logger.debug('OCSP response for certificate %s is signed by the certificate\'s issuer.',
|
||||
cert_path)
|
||||
responder_cert = issuer_cert
|
||||
else:
|
||||
# Case where the OCSP responder is not the certificate issuer
|
||||
logger.debug('OCSP response for certificate %s is delegated to an external responder.',
|
||||
cert_path)
|
||||
|
||||
responder_certs = [cert for cert in response_ocsp.certificates
|
||||
if response_ocsp.responder_name == cert.subject or \
|
||||
response_ocsp.responder_key_hash == _key_hash(cert)]
|
||||
if not responder_certs:
|
||||
raise AssertionError('no matching responder certificate could be found')
|
||||
|
||||
# We suppose here that the ACME server support only one certificate in the OCSP status
|
||||
# request. This is currently the case for LetsEncrypt servers.
|
||||
# See https://github.com/letsencrypt/boulder/issues/2331
|
||||
responder_cert = responder_certs[0]
|
||||
|
||||
if responder_cert.issuer != issuer_cert.subject:
|
||||
raise AssertionError('responder certificate is not signed '
|
||||
'by the certificate\'s issuer')
|
||||
|
||||
try:
|
||||
extension = responder_cert.extensions.get_extension_for_class(x509.ExtendedKeyUsage)
|
||||
delegate_authorized = x509.oid.ExtendedKeyUsageOID.OCSP_SIGNING in extension.value
|
||||
except (x509.ExtensionNotFound, IndexError):
|
||||
delegate_authorized = False
|
||||
if not delegate_authorized:
|
||||
raise AssertionError('responder is not authorized by issuer to sign OCSP responses')
|
||||
|
||||
# Following line may raise UnsupportedAlgorithm
|
||||
chosen_cert_hash = responder_cert.signature_hash_algorithm
|
||||
assert chosen_cert_hash # always present for RSA and ECDSA certificates.
|
||||
# For a delegate OCSP responder, we need first check that its certificate is effectively
|
||||
# signed by the certificate issuer.
|
||||
crypto_util.verify_signed_payload(issuer_cert.public_key(), responder_cert.signature,
|
||||
responder_cert.tbs_certificate_bytes, chosen_cert_hash)
|
||||
|
||||
# Following line may raise UnsupportedAlgorithm
|
||||
chosen_response_hash = response_ocsp.signature_hash_algorithm
|
||||
# We check that the OSCP response is effectively signed by the responder
|
||||
# (an authorized delegate one or the certificate issuer itself).
|
||||
if not chosen_response_hash:
|
||||
raise AssertionError("no signature hash algorithm defined")
|
||||
crypto_util.verify_signed_payload(responder_cert.public_key(), response_ocsp.signature,
|
||||
response_ocsp.tbs_response_bytes, chosen_response_hash)
|
||||
|
|
@ -26,9 +26,9 @@ from certbot import configuration
|
|||
from certbot import crypto_util
|
||||
from certbot import errors
|
||||
from certbot import interfaces
|
||||
from certbot import ocsp
|
||||
from certbot import util
|
||||
from certbot._internal import error_handler
|
||||
from certbot._internal import ocsp
|
||||
from certbot._internal import san
|
||||
from certbot._internal.plugins import disco as plugins_disco
|
||||
from certbot.compat import filesystem
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ from cryptography.hazmat.primitives import hashes
|
|||
from cryptography.x509 import ocsp as ocsp_lib
|
||||
import pytest
|
||||
|
||||
from certbot import crypto_util
|
||||
from certbot.tests import util as test_util
|
||||
|
||||
|
||||
|
|
@ -25,7 +26,7 @@ class OSCPTestCryptography(unittest.TestCase):
|
|||
"""
|
||||
|
||||
def setUp(self):
|
||||
from certbot import ocsp
|
||||
from certbot._internal import ocsp
|
||||
self.checker = ocsp.RevocationChecker()
|
||||
self.cert_path = test_util.vector_path('ocsp_certificate.pem')
|
||||
self.chain_path = test_util.vector_path('ocsp_issuer_certificate.pem')
|
||||
|
|
@ -33,14 +34,14 @@ class OSCPTestCryptography(unittest.TestCase):
|
|||
self.cert_obj.cert_path = self.cert_path
|
||||
self.cert_obj.chain_path = self.chain_path
|
||||
now = datetime.now(timezone.utc)
|
||||
self.mock_notAfter = mock.patch('certbot.ocsp.crypto_util.notAfter',
|
||||
self.mock_notAfter = mock.patch('certbot._internal.ocsp.crypto_util.notAfter',
|
||||
return_value=now + timedelta(hours=2))
|
||||
self.mock_notAfter.start()
|
||||
# Ensure the mock.patch is stopped even if test raises an exception
|
||||
self.addCleanup(self.mock_notAfter.stop)
|
||||
|
||||
@mock.patch('certbot.ocsp._determine_ocsp_server')
|
||||
@mock.patch('certbot.ocsp._check_ocsp_cryptography')
|
||||
@mock.patch('certbot._internal.ocsp._determine_ocsp_server')
|
||||
@mock.patch('certbot._internal.ocsp._check_ocsp_cryptography')
|
||||
def test_ensure_cryptography_toggled(self, mock_check, mock_determine):
|
||||
mock_determine.return_value = ('http://example.com', 'example.com')
|
||||
self.checker.ocsp_revoked(self.cert_obj)
|
||||
|
|
@ -170,7 +171,7 @@ class OSCPTestCryptography(unittest.TestCase):
|
|||
with _ocsp_mock(ocsp_lib.OCSPCertStatus.REVOKED, ocsp_lib.OCSPResponseStatus.SUCCESSFUL):
|
||||
# This mock is necessary to avoid the first call contained in _determine_ocsp_server
|
||||
# of the method cryptography.x509.Extensions.get_extension_for_class.
|
||||
with mock.patch('certbot.ocsp._determine_ocsp_server') as mock_server:
|
||||
with mock.patch('certbot._internal.ocsp._determine_ocsp_server') as mock_server:
|
||||
mock_server.return_value = ('https://example.com', 'example.com')
|
||||
with mock.patch('cryptography.x509.Extensions.get_extension_for_class',
|
||||
side_effect=x509.ExtensionNotFound(
|
||||
|
|
@ -179,15 +180,35 @@ class OSCPTestCryptography(unittest.TestCase):
|
|||
assert revoked is False
|
||||
|
||||
|
||||
class TestDeprecation:
|
||||
"""Tests related to the deprecation of certbot.ocsp.
|
||||
|
||||
These tests can be deleted after this module is removed from Certbot.
|
||||
|
||||
"""
|
||||
def test_deprecation_warning(self):
|
||||
with pytest.warns(DeprecationWarning, match='certbot.ocsp is deprecated'):
|
||||
import certbot.ocsp # noqa: F401
|
||||
|
||||
def test_no_changes(self):
|
||||
from certbot._internal import ocsp
|
||||
expected_hash = '4f595b3c6e63749af1f71b5b4890b94e04734bb75f8bff95cf7d7a7e4752d5c1'
|
||||
failure_message = ('Despite being prefixed by _internal, certbot._internal.ocsp is still '
|
||||
'part of our public API while certbot.ocsp exists. You are free to make changes to '
|
||||
'this file and update the hash in this test however, please be sure your changes do '
|
||||
'not affect the API of the certbot.ocsp module.')
|
||||
assert crypto_util.sha256sum(ocsp.__file__) == expected_hash, failure_message
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _ocsp_mock(certificate_status, response_status,
|
||||
http_status_code=200, check_signature_side_effect=None):
|
||||
with mock.patch('certbot.ocsp.ocsp.load_der_ocsp_response') as mock_response:
|
||||
with mock.patch('certbot._internal.ocsp.ocsp.load_der_ocsp_response') as mock_response:
|
||||
mock_response.return_value = _construct_mock_ocsp_response(
|
||||
certificate_status, response_status)
|
||||
with mock.patch('certbot.ocsp.requests.post') as mock_post:
|
||||
with mock.patch('certbot._internal.ocsp.requests.post') as mock_post:
|
||||
mock_post.return_value = mock.Mock(status_code=http_status_code)
|
||||
with mock.patch('certbot.ocsp.crypto_util.verify_signed_payload') \
|
||||
with mock.patch('certbot._internal.ocsp.crypto_util.verify_signed_payload') \
|
||||
as mock_check:
|
||||
if check_signature_side_effect:
|
||||
mock_check.side_effect = check_signature_side_effect
|
||||
|
|
|
|||
|
|
@ -647,7 +647,7 @@ class RenewableCertTests(BaseRenewableCertTest):
|
|||
with pytest.raises(errors.CertStorageError):
|
||||
self.test_rc._update_link_to("elephant", 17)
|
||||
|
||||
@mock.patch("certbot.ocsp.RevocationChecker.ocsp_revoked_by_paths")
|
||||
@mock.patch("certbot._internal.ocsp.RevocationChecker.ocsp_revoked_by_paths")
|
||||
def test_ocsp_revoked(self, mock_checker):
|
||||
# Write out test files
|
||||
for kind in ALL_FOUR:
|
||||
|
|
|
|||
|
|
@ -1,233 +1,9 @@
|
|||
"""Tools for checking certificate revocation."""
|
||||
from datetime import datetime
|
||||
from datetime import timedelta
|
||||
from datetime import timezone
|
||||
import logging
|
||||
from typing import Optional
|
||||
"""Deprecated tools for checking certificate revocation."""
|
||||
import warnings
|
||||
|
||||
from cryptography import x509
|
||||
from cryptography.exceptions import InvalidSignature
|
||||
from cryptography.exceptions import UnsupportedAlgorithm
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from cryptography.x509 import ocsp
|
||||
import requests
|
||||
# ruff: disable[F403]
|
||||
from certbot._internal.ocsp import * # pylint: disable=wildcard-import,unused-wildcard-import
|
||||
# ruff: enable[F403]
|
||||
|
||||
from certbot import crypto_util
|
||||
from certbot import errors
|
||||
from certbot.interfaces import RenewableCert
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RevocationChecker:
|
||||
"""This class figures out OCSP checking on this system, and performs it."""
|
||||
def ocsp_revoked(self, cert: RenewableCert) -> bool:
|
||||
"""Get revoked status for a particular cert version.
|
||||
|
||||
.. todo:: Make this a non-blocking call
|
||||
|
||||
:param `.interfaces.RenewableCert` cert: Certificate object
|
||||
:returns: True if revoked; False if valid or the check failed or cert is expired.
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
return self.ocsp_revoked_by_paths(cert.cert_path, cert.chain_path)
|
||||
|
||||
def ocsp_revoked_by_paths(self, cert_path: str, chain_path: str, timeout: int = 10) -> bool:
|
||||
"""Performs the OCSP revocation check
|
||||
|
||||
:param str cert_path: Certificate filepath
|
||||
:param str chain_path: Certificate chain
|
||||
:param int timeout: Timeout (in seconds) for the OCSP query
|
||||
|
||||
:returns: True if revoked; False if valid or the check failed or cert is expired.
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
# Let's Encrypt doesn't update OCSP for expired certificates,
|
||||
# so don't check OCSP if the cert is expired.
|
||||
# https://github.com/certbot/certbot/issues/7152
|
||||
now = datetime.now(timezone.utc)
|
||||
if crypto_util.notAfter(cert_path) <= now:
|
||||
return False
|
||||
|
||||
url, host = _determine_ocsp_server(cert_path)
|
||||
if not host or not url:
|
||||
return False
|
||||
|
||||
return _check_ocsp_cryptography(cert_path, chain_path, url, timeout)
|
||||
|
||||
|
||||
def _determine_ocsp_server(cert_path: str) -> tuple[Optional[str], Optional[str]]:
|
||||
"""Extract the OCSP server host from a certificate.
|
||||
|
||||
:param str cert_path: Path to the cert we're checking OCSP for
|
||||
:rtype tuple:
|
||||
:returns: (OCSP server URL or None, OCSP server host or None)
|
||||
|
||||
"""
|
||||
with open(cert_path, 'rb') as file_handler:
|
||||
cert = x509.load_pem_x509_certificate(file_handler.read(), default_backend())
|
||||
try:
|
||||
extension = cert.extensions.get_extension_for_class(x509.AuthorityInformationAccess)
|
||||
ocsp_oid = x509.AuthorityInformationAccessOID.OCSP
|
||||
descriptions = [description for description in extension.value
|
||||
if description.access_method == ocsp_oid]
|
||||
|
||||
url = descriptions[0].access_location.value
|
||||
except (x509.ExtensionNotFound, IndexError):
|
||||
logger.info("Cannot extract OCSP URI from %s", cert_path)
|
||||
return None, None
|
||||
|
||||
url = url.rstrip()
|
||||
host = url.partition("://")[2].rstrip("/")
|
||||
|
||||
if host:
|
||||
return url, host
|
||||
logger.info("Cannot process OCSP host from URL (%s) in certificate at %s", url, cert_path)
|
||||
return None, None
|
||||
|
||||
|
||||
def _check_ocsp_cryptography(cert_path: str, chain_path: str, url: str, timeout: int) -> bool:
|
||||
# Retrieve OCSP response
|
||||
with open(chain_path, 'rb') as file_handler:
|
||||
issuer = x509.load_pem_x509_certificate(file_handler.read(), default_backend())
|
||||
with open(cert_path, 'rb') as file_handler:
|
||||
cert = x509.load_pem_x509_certificate(file_handler.read(), default_backend())
|
||||
builder = ocsp.OCSPRequestBuilder()
|
||||
builder = builder.add_certificate(cert, issuer, hashes.SHA1())
|
||||
request = builder.build()
|
||||
request_binary = request.public_bytes(serialization.Encoding.DER)
|
||||
try:
|
||||
response = requests.post(url, data=request_binary,
|
||||
headers={'Content-Type': 'application/ocsp-request'},
|
||||
timeout=timeout)
|
||||
except requests.exceptions.RequestException:
|
||||
logger.info("OCSP check failed for %s (are we offline?)", cert_path, exc_info=True)
|
||||
return False
|
||||
if response.status_code != 200:
|
||||
logger.info("OCSP check failed for %s (HTTP status: %d)", cert_path, response.status_code)
|
||||
return False
|
||||
|
||||
response_ocsp = ocsp.load_der_ocsp_response(response.content)
|
||||
|
||||
# Check OCSP response validity
|
||||
if response_ocsp.response_status != ocsp.OCSPResponseStatus.SUCCESSFUL:
|
||||
logger.warning("Invalid OCSP response status for %s: %s",
|
||||
cert_path, response_ocsp.response_status)
|
||||
return False
|
||||
|
||||
# Check OCSP signature
|
||||
try:
|
||||
_check_ocsp_response(response_ocsp, request, issuer, cert_path)
|
||||
except UnsupportedAlgorithm as e:
|
||||
logger.warning(str(e))
|
||||
except errors.Error as e:
|
||||
logger.warning(str(e))
|
||||
except InvalidSignature:
|
||||
logger.warning('Invalid signature on OCSP response for %s', cert_path)
|
||||
except AssertionError as error:
|
||||
logger.warning('Invalid OCSP response for %s: %s.', cert_path, str(error))
|
||||
else:
|
||||
# Check OCSP certificate status
|
||||
logger.debug("OCSP certificate status for %s is: %s",
|
||||
cert_path, response_ocsp.certificate_status)
|
||||
return response_ocsp.certificate_status == ocsp.OCSPCertStatus.REVOKED
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def _check_ocsp_response(response_ocsp: 'ocsp.OCSPResponse', request_ocsp: 'ocsp.OCSPRequest',
|
||||
issuer_cert: x509.Certificate, cert_path: str) -> None:
|
||||
"""Verify that the OCSP is valid for several criteria"""
|
||||
# Assert OCSP response corresponds to the certificate we are talking about
|
||||
if response_ocsp.serial_number != request_ocsp.serial_number:
|
||||
raise AssertionError('the certificate in response does not correspond '
|
||||
'to the certificate in request')
|
||||
|
||||
# Assert signature is valid
|
||||
_check_ocsp_response_signature(response_ocsp, issuer_cert, cert_path)
|
||||
|
||||
# Assert issuer in response is the expected one
|
||||
if (not isinstance(response_ocsp.hash_algorithm, type(request_ocsp.hash_algorithm))
|
||||
or response_ocsp.issuer_key_hash != request_ocsp.issuer_key_hash
|
||||
or response_ocsp.issuer_name_hash != request_ocsp.issuer_name_hash):
|
||||
raise AssertionError('the issuer does not correspond to issuer of the certificate.')
|
||||
|
||||
# In following checks, two situations can occur:
|
||||
# * nextUpdate is set, and requirement is thisUpdate < now < nextUpdate
|
||||
# * nextUpdate is not set, and requirement is thisUpdate < now
|
||||
# NB1: We add a validity period tolerance to handle clock time inconsistencies,
|
||||
# value is 5 min like for OpenSSL.
|
||||
# NB2: Another check is to verify that thisUpdate is not too old, it is optional
|
||||
# for OpenSSL, so we do not do it here.
|
||||
# See OpenSSL implementation as a reference:
|
||||
# https://github.com/openssl/openssl/blob/ef45aa14c5af024fcb8bef1c9007f3d1c115bd85/crypto/ocsp/ocsp_cl.c#L338-L391
|
||||
# thisUpdate/nextUpdate are expressed in UTC/GMT time zone
|
||||
now = datetime.now(timezone.utc)
|
||||
if not response_ocsp.this_update_utc:
|
||||
raise AssertionError('param thisUpdate is not set.')
|
||||
if response_ocsp.this_update_utc > now + timedelta(minutes=5):
|
||||
raise AssertionError('param thisUpdate is in the future.')
|
||||
if response_ocsp.next_update_utc and response_ocsp.next_update_utc < now - timedelta(minutes=5):
|
||||
raise AssertionError('param nextUpdate is in the past.')
|
||||
|
||||
|
||||
def _check_ocsp_response_signature(response_ocsp: 'ocsp.OCSPResponse',
|
||||
issuer_cert: x509.Certificate, cert_path: str) -> None:
|
||||
"""Verify an OCSP response signature against certificate issuer or responder"""
|
||||
def _key_hash(cert: x509.Certificate) -> bytes:
|
||||
return x509.SubjectKeyIdentifier.from_public_key(cert.public_key()).digest
|
||||
|
||||
if (response_ocsp.responder_name == issuer_cert.subject
|
||||
or response_ocsp.responder_key_hash == _key_hash(issuer_cert)):
|
||||
# Case where the OCSP responder is also the certificate issuer
|
||||
logger.debug('OCSP response for certificate %s is signed by the certificate\'s issuer.',
|
||||
cert_path)
|
||||
responder_cert = issuer_cert
|
||||
else:
|
||||
# Case where the OCSP responder is not the certificate issuer
|
||||
logger.debug('OCSP response for certificate %s is delegated to an external responder.',
|
||||
cert_path)
|
||||
|
||||
responder_certs = [cert for cert in response_ocsp.certificates
|
||||
if response_ocsp.responder_name == cert.subject or \
|
||||
response_ocsp.responder_key_hash == _key_hash(cert)]
|
||||
if not responder_certs:
|
||||
raise AssertionError('no matching responder certificate could be found')
|
||||
|
||||
# We suppose here that the ACME server support only one certificate in the OCSP status
|
||||
# request. This is currently the case for LetsEncrypt servers.
|
||||
# See https://github.com/letsencrypt/boulder/issues/2331
|
||||
responder_cert = responder_certs[0]
|
||||
|
||||
if responder_cert.issuer != issuer_cert.subject:
|
||||
raise AssertionError('responder certificate is not signed '
|
||||
'by the certificate\'s issuer')
|
||||
|
||||
try:
|
||||
extension = responder_cert.extensions.get_extension_for_class(x509.ExtendedKeyUsage)
|
||||
delegate_authorized = x509.oid.ExtendedKeyUsageOID.OCSP_SIGNING in extension.value
|
||||
except (x509.ExtensionNotFound, IndexError):
|
||||
delegate_authorized = False
|
||||
if not delegate_authorized:
|
||||
raise AssertionError('responder is not authorized by issuer to sign OCSP responses')
|
||||
|
||||
# Following line may raise UnsupportedAlgorithm
|
||||
chosen_cert_hash = responder_cert.signature_hash_algorithm
|
||||
assert chosen_cert_hash # always present for RSA and ECDSA certificates.
|
||||
# For a delegate OCSP responder, we need first check that its certificate is effectively
|
||||
# signed by the certificate issuer.
|
||||
crypto_util.verify_signed_payload(issuer_cert.public_key(), responder_cert.signature,
|
||||
responder_cert.tbs_certificate_bytes, chosen_cert_hash)
|
||||
|
||||
# Following line may raise UnsupportedAlgorithm
|
||||
chosen_response_hash = response_ocsp.signature_hash_algorithm
|
||||
# We check that the OSCP response is effectively signed by the responder
|
||||
# (an authorized delegate one or the certificate issuer itself).
|
||||
if not chosen_response_hash:
|
||||
raise AssertionError("no signature hash algorithm defined")
|
||||
crypto_util.verify_signed_payload(responder_cert.public_key(), response_ocsp.signature,
|
||||
response_ocsp.tbs_response_bytes, chosen_response_hash)
|
||||
warnings.warn("certbot.ocsp is deprecated and will be removed in the next major"
|
||||
" release", DeprecationWarning, stacklevel=2)
|
||||
|
|
|
|||
1
newsfragments/10584.changed
Normal file
1
newsfragments/10584.changed
Normal file
|
|
@ -0,0 +1 @@
|
|||
The certbot.ocsp module has been deprecated and will be removed in the next major release. This is not a change to Certbot's OCSP functionality. The code is just being removed from Certbot's public API.
|
||||
Loading…
Reference in a new issue