mirror of
https://github.com/certbot/certbot.git
synced 2026-06-05 14:54:24 -04:00
Move ocsp.py to public api (#7744)
We should move ocsp.py to public API, as an upcoming OCSP prefetching functionality in Apache plugin relies on it, and as the plugins are note released in lockstep with the Certbot core, we need to be careful when changing those APIs. * Move ocsp.py to public api * Fix type annotations, move to pointing to an interface and fix linting * Add certbot.ocsp to documentation table of contents * Modify tests to reflect the changes in ocsp.py * Add changelog entry * Fix notAfter mock for tests
This commit is contained in:
parent
5607025e9b
commit
e6f050dbe9
6 changed files with 46 additions and 32 deletions
|
|
@ -6,6 +6,8 @@ Certbot adheres to [Semantic Versioning](https://semver.org/).
|
|||
|
||||
### Added
|
||||
|
||||
* Added certbot.ocsp Certbot's API. The certbot.ocsp module can be used to
|
||||
determine the OCSP status of certificates.
|
||||
* Don't verify the existing certificate in HTTP01Response.simple_verify, for
|
||||
compatibility with the real-world ACME challenge checks.
|
||||
|
||||
|
|
|
|||
|
|
@ -11,8 +11,8 @@ from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-
|
|||
from certbot import crypto_util
|
||||
from certbot import errors
|
||||
from certbot import interfaces
|
||||
from certbot import ocsp
|
||||
from certbot import util
|
||||
from certbot._internal import ocsp
|
||||
from certbot._internal import storage
|
||||
from certbot.compat import os
|
||||
from certbot.display import util as display_util
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ from acme.magic_typing import Tuple # pylint: disable=unused-import, no-name-in
|
|||
from certbot import crypto_util
|
||||
from certbot import errors
|
||||
from certbot import util
|
||||
from certbot._internal.storage import RenewableCert # pylint: disable=unused-import
|
||||
from certbot.interfaces import RenewableCert # pylint: disable=unused-import
|
||||
|
||||
try:
|
||||
# Only cryptography>=2.5 has ocsp module
|
||||
|
|
@ -32,7 +32,6 @@ except (ImportError, AttributeError): # pragma: no cover
|
|||
ocsp = None # type: ignore
|
||||
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
|
@ -64,12 +63,12 @@ class RevocationChecker(object):
|
|||
|
||||
.. todo:: Make this a non-blocking call
|
||||
|
||||
:param `.storage.RenewableCert` cert: Certificate object
|
||||
:param `.interfaces.RenewableCert` cert: Certificate object
|
||||
:returns: True if revoked; False if valid or the check failed or cert is expired.
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
cert_path, chain_path = cert.cert, cert.chain
|
||||
cert_path, chain_path = cert.cert_path, cert.chain_path
|
||||
|
||||
if self.broken:
|
||||
return False
|
||||
|
|
@ -78,7 +77,7 @@ class RevocationChecker(object):
|
|||
# so don't check OCSP if the cert is expired.
|
||||
# https://github.com/certbot/certbot/issues/7152
|
||||
now = pytz.UTC.fromutc(datetime.utcnow())
|
||||
if cert.target_expiry <= now:
|
||||
if crypto_util.notAfter(cert_path) <= now:
|
||||
return False
|
||||
|
||||
url, host = _determine_ocsp_server(cert_path)
|
||||
|
|
@ -296,5 +295,5 @@ def _translate_ocsp_query(cert_path, ocsp_output, ocsp_errors):
|
|||
return True
|
||||
else:
|
||||
logger.warning("Unable to properly parse OCSP output: %s\nstderr:%s",
|
||||
ocsp_output, ocsp_errors)
|
||||
ocsp_output, ocsp_errors)
|
||||
return False
|
||||
7
certbot/docs/api/certbot.ocsp.rst
Normal file
7
certbot/docs/api/certbot.ocsp.rst
Normal file
|
|
@ -0,0 +1,7 @@
|
|||
certbot.ocsp package
|
||||
======================
|
||||
|
||||
.. automodule:: certbot.ocsp
|
||||
:members:
|
||||
:undoc-members:
|
||||
:show-inheritance:
|
||||
|
|
@ -26,6 +26,7 @@ Submodules
|
|||
certbot.errors
|
||||
certbot.interfaces
|
||||
certbot.main
|
||||
certbot.ocsp
|
||||
certbot.reverter
|
||||
certbot.util
|
||||
|
||||
|
|
|
|||
|
|
@ -32,12 +32,12 @@ ocsp: Use -help for summary.
|
|||
|
||||
class OCSPTestOpenSSL(unittest.TestCase):
|
||||
"""
|
||||
OCSP revokation tests using OpenSSL binary.
|
||||
OCSP revocation tests using OpenSSL binary.
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
from certbot._internal import ocsp
|
||||
with mock.patch('certbot._internal.ocsp.Popen') as mock_popen:
|
||||
from certbot import ocsp
|
||||
with mock.patch('certbot.ocsp.Popen') as mock_popen:
|
||||
with mock.patch('certbot.util.exe_exists') as mock_exists:
|
||||
mock_communicate = mock.MagicMock()
|
||||
mock_communicate.communicate.return_value = (None, out)
|
||||
|
|
@ -48,8 +48,8 @@ class OCSPTestOpenSSL(unittest.TestCase):
|
|||
def tearDown(self):
|
||||
pass
|
||||
|
||||
@mock.patch('certbot._internal.ocsp.logger.info')
|
||||
@mock.patch('certbot._internal.ocsp.Popen')
|
||||
@mock.patch('certbot.ocsp.logger.info')
|
||||
@mock.patch('certbot.ocsp.Popen')
|
||||
@mock.patch('certbot.util.exe_exists')
|
||||
def test_init(self, mock_exists, mock_popen, mock_log):
|
||||
mock_communicate = mock.MagicMock()
|
||||
|
|
@ -57,7 +57,7 @@ class OCSPTestOpenSSL(unittest.TestCase):
|
|||
mock_popen.return_value = mock_communicate
|
||||
mock_exists.return_value = True
|
||||
|
||||
from certbot._internal import ocsp
|
||||
from certbot import ocsp
|
||||
checker = ocsp.RevocationChecker(enforce_openssl_binary_usage=True)
|
||||
self.assertEqual(mock_popen.call_count, 1)
|
||||
self.assertEqual(checker.host_args("x"), ["Host=x"])
|
||||
|
|
@ -74,14 +74,15 @@ class OCSPTestOpenSSL(unittest.TestCase):
|
|||
self.assertEqual(mock_log.call_count, 1)
|
||||
self.assertEqual(checker.broken, True)
|
||||
|
||||
@mock.patch('certbot._internal.ocsp._determine_ocsp_server')
|
||||
@mock.patch('certbot.ocsp._determine_ocsp_server')
|
||||
@mock.patch('certbot.ocsp.crypto_util.notAfter')
|
||||
@mock.patch('certbot.util.run_script')
|
||||
def test_ocsp_revoked(self, mock_run, mock_determine):
|
||||
def test_ocsp_revoked(self, mock_run, mock_na, mock_determine):
|
||||
now = pytz.UTC.fromutc(datetime.utcnow())
|
||||
cert_obj = mock.MagicMock()
|
||||
cert_obj.cert = "x"
|
||||
cert_obj.chain = "y"
|
||||
cert_obj.target_expiry = now + timedelta(hours=2)
|
||||
cert_obj.cert_path = "x"
|
||||
cert_obj.chain_path = "y"
|
||||
mock_na.return_value = now + timedelta(hours=2)
|
||||
|
||||
self.checker.broken = True
|
||||
mock_determine.return_value = ("", "")
|
||||
|
|
@ -99,7 +100,7 @@ class OCSPTestOpenSSL(unittest.TestCase):
|
|||
self.assertEqual(mock_run.call_count, 2)
|
||||
|
||||
# cert expired
|
||||
cert_obj.target_expiry = now
|
||||
mock_na.return_value = now
|
||||
mock_determine.return_value = ("", "")
|
||||
count_before = mock_determine.call_count
|
||||
self.assertEqual(self.checker.ocsp_revoked(cert_obj), False)
|
||||
|
|
@ -108,16 +109,16 @@ class OCSPTestOpenSSL(unittest.TestCase):
|
|||
def test_determine_ocsp_server(self):
|
||||
cert_path = test_util.vector_path('ocsp_certificate.pem')
|
||||
|
||||
from certbot._internal import ocsp
|
||||
from certbot import ocsp
|
||||
result = ocsp._determine_ocsp_server(cert_path)
|
||||
self.assertEqual(('http://ocsp.test4.buypass.com', 'ocsp.test4.buypass.com'), result)
|
||||
|
||||
@mock.patch('certbot._internal.ocsp.logger')
|
||||
@mock.patch('certbot.ocsp.logger')
|
||||
@mock.patch('certbot.util.run_script')
|
||||
def test_translate_ocsp(self, mock_run, mock_log):
|
||||
# pylint: disable=protected-access
|
||||
mock_run.return_value = openssl_confused
|
||||
from certbot._internal import ocsp
|
||||
from certbot import ocsp
|
||||
self.assertEqual(ocsp._translate_ocsp_query(*openssl_happy), False)
|
||||
self.assertEqual(ocsp._translate_ocsp_query(*openssl_confused), False)
|
||||
self.assertEqual(mock_log.debug.call_count, 1)
|
||||
|
|
@ -145,18 +146,22 @@ class OSCPTestCryptography(unittest.TestCase):
|
|||
"""
|
||||
|
||||
def setUp(self):
|
||||
from certbot._internal import ocsp
|
||||
from certbot import ocsp
|
||||
self.checker = ocsp.RevocationChecker()
|
||||
self.cert_path = test_util.vector_path('ocsp_certificate.pem')
|
||||
self.chain_path = test_util.vector_path('ocsp_issuer_certificate.pem')
|
||||
self.cert_obj = mock.MagicMock()
|
||||
self.cert_obj.cert = self.cert_path
|
||||
self.cert_obj.chain = self.chain_path
|
||||
self.cert_obj.cert_path = self.cert_path
|
||||
self.cert_obj.chain_path = self.chain_path
|
||||
now = pytz.UTC.fromutc(datetime.utcnow())
|
||||
self.cert_obj.target_expiry = now + timedelta(hours=2)
|
||||
self.mock_notAfter = mock.patch('certbot.ocsp.crypto_util.notAfter',
|
||||
return_value=now + timedelta(hours=2))
|
||||
self.mock_notAfter.start()
|
||||
# Ensure the mock.patch is stopped even if test raises an exception
|
||||
self.addCleanup(self.mock_notAfter.stop)
|
||||
|
||||
@mock.patch('certbot._internal.ocsp._determine_ocsp_server')
|
||||
@mock.patch('certbot._internal.ocsp._check_ocsp_cryptography')
|
||||
@mock.patch('certbot.ocsp._determine_ocsp_server')
|
||||
@mock.patch('certbot.ocsp._check_ocsp_cryptography')
|
||||
def test_ensure_cryptography_toggled(self, mock_revoke, mock_determine):
|
||||
mock_determine.return_value = ('http://example.com', 'example.com')
|
||||
self.checker.ocsp_revoked(self.cert_obj)
|
||||
|
|
@ -263,7 +268,7 @@ class OSCPTestCryptography(unittest.TestCase):
|
|||
with _ocsp_mock(ocsp_lib.OCSPCertStatus.REVOKED, ocsp_lib.OCSPResponseStatus.SUCCESSFUL):
|
||||
# This mock is necessary to avoid the first call contained in _determine_ocsp_server
|
||||
# of the method cryptography.x509.Extensions.get_extension_for_class.
|
||||
with mock.patch('certbot._internal.ocsp._determine_ocsp_server') as mock_server:
|
||||
with mock.patch('certbot.ocsp._determine_ocsp_server') as mock_server:
|
||||
mock_server.return_value = ('https://example.com', 'example.com')
|
||||
with mock.patch('cryptography.x509.Extensions.get_extension_for_class',
|
||||
side_effect=x509.ExtensionNotFound(
|
||||
|
|
@ -275,12 +280,12 @@ class OSCPTestCryptography(unittest.TestCase):
|
|||
@contextlib.contextmanager
|
||||
def _ocsp_mock(certificate_status, response_status,
|
||||
http_status_code=200, check_signature_side_effect=None):
|
||||
with mock.patch('certbot._internal.ocsp.ocsp.load_der_ocsp_response') as mock_response:
|
||||
with mock.patch('certbot.ocsp.ocsp.load_der_ocsp_response') as mock_response:
|
||||
mock_response.return_value = _construct_mock_ocsp_response(
|
||||
certificate_status, response_status)
|
||||
with mock.patch('certbot._internal.ocsp.requests.post') as mock_post:
|
||||
with mock.patch('certbot.ocsp.requests.post') as mock_post:
|
||||
mock_post.return_value = mock.Mock(status_code=http_status_code)
|
||||
with mock.patch('certbot._internal.ocsp.crypto_util.verify_signed_payload') \
|
||||
with mock.patch('certbot.ocsp.crypto_util.verify_signed_payload') \
|
||||
as mock_check:
|
||||
if check_signature_side_effect:
|
||||
mock_check.side_effect = check_signature_side_effect
|
||||
|
|
|
|||
Loading…
Reference in a new issue