From 4422fea91ace7703edb4b18be5f3c620976b4230 Mon Sep 17 00:00:00 2001 From: Brad Warren Date: Thu, 4 Jun 2020 16:19:00 -0700 Subject: [PATCH] Implement ocsp_response_by_paths. --- certbot/certbot/ocsp.py | 59 ++++++++++++++++++++++++++------------ certbot/tests/ocsp_test.py | 6 ++-- 2 files changed, 43 insertions(+), 22 deletions(-) diff --git a/certbot/certbot/ocsp.py b/certbot/certbot/ocsp.py index dca0b8323..af6692120 100644 --- a/certbot/certbot/ocsp.py +++ b/certbot/certbot/ocsp.py @@ -76,10 +76,10 @@ class RevocationChecker(object): knows it is properly timestamped, signed, etc. .. note:: This function currently only works when cryptography - is used for OCSP. Whether a new enough version of crypography - with OCSP support is available can be checked through - CRYPTOGRAPHY_OCSP_AVAILABLE. If it is not available, None is - always returned by this function for now. + is used for OCSP. Whether a new enough version of + cryptography with OCSP support is available can be checked + through CRYPTOGRAPHY_OCSP_AVAILABLE. If it is not available, + None is always returned by this function for now. :param str cert_path: Certificate filepath :param str chain_path: Certificate chain @@ -90,7 +90,14 @@ class RevocationChecker(object): :rtype: interfaces.OCSPResponse or None """ - return None + if self.use_openssl_binary: + return None + + url = self._query_prep(cert_path) + if not url: + return None + + return _get_cryptography_ocsp_response(cert_path, chain_path, url, timeout) def ocsp_revoked(self, cert): # type: (RenewableCert) -> bool @@ -117,14 +124,30 @@ class RevocationChecker(object): :rtype: bool """ + if self.use_openssl_binary: + return self._ocsp_revoked_by_paths_openssl(cert_path, chain_path, timeout) + else: + return self._ocsp_revoked_by_paths_cryptography(cert_path, chain_path, timeout) + + def _ocsp_revoked_by_paths_openssl(self, cert_path, chain_path, timeout): + # type: (str, str, int) -> bool + """ocsp_revoked_by_paths implementation shelling out to openssl.""" url = self._query_prep(cert_path) if not url: return False + host = _host_from_url(url) + return self._check_ocsp_openssl_bin(cert_path, chain_path, host, url, timeout) - if self.use_openssl_binary: - host = _host_from_url(url) - return self._check_ocsp_openssl_bin(cert_path, chain_path, host, url, timeout) - return _check_ocsp_cryptography(cert_path, chain_path, url, timeout) + def _ocsp_revoked_by_paths_cryptography(self, cert_path, chain_path, timeout): + # type: (str, str, int) -> bool + """ocsp_revoked_by_paths implementation using cryptography.""" + resp = self.ocsp_response_by_paths(cert_path, chain_path, timeout) + if resp is None: + return False + # Check OCSP certificate status + logger.debug("OCSP certificate status for %s is: %s", + cert_path, resp.certificate_status) + return resp.certificate_status == interfaces.OCSPCertStatus.REVOKED def _query_prep(self, cert_path): # type: (str) -> Tuple[Optional[str], Optional[str]] @@ -271,8 +294,8 @@ def _host_from_url(url): return url.partition("://")[2].rstrip("/") -def _check_ocsp_cryptography(cert_path, chain_path, url, timeout): - # type: (str, str, str, int) -> bool +def _get_cryptography_ocsp_response(cert_path, chain_path, url, timeout): + # type: (str, str, str, int) -> Optional[_CryptographyOCSPResponse] # Retrieve OCSP response with open(chain_path, 'rb') as file_handler: issuer = x509.load_pem_x509_certificate(file_handler.read(), default_backend()) @@ -288,10 +311,10 @@ def _check_ocsp_cryptography(cert_path, chain_path, url, timeout): timeout=timeout) except requests.exceptions.RequestException: logger.info("OCSP check failed for %s (are we offline?)", cert_path, exc_info=True) - return False + return None if response.status_code != 200: logger.info("OCSP check failed for %s (HTTP status: %d)", cert_path, response.status_code) - return False + return None response_ocsp = ocsp.load_der_ocsp_response(response.content) @@ -299,7 +322,7 @@ def _check_ocsp_cryptography(cert_path, chain_path, url, timeout): if response_ocsp.response_status != ocsp.OCSPResponseStatus.SUCCESSFUL: logger.error("Invalid OCSP response status for %s: %s", cert_path, response_ocsp.response_status) - return False + return None # Check OCSP signature try: @@ -313,12 +336,10 @@ def _check_ocsp_cryptography(cert_path, chain_path, url, timeout): except AssertionError as error: logger.error('Invalid OCSP response for %s: %s.', cert_path, str(error)) else: - # Check OCSP certificate status - logger.debug("OCSP certificate status for %s is: %s", - cert_path, response_ocsp.certificate_status) - return response_ocsp.certificate_status == ocsp.OCSPCertStatus.REVOKED + wrapped_response = _CryptographyOCSPResponse(response_ocsp) + return wrapped_response - return False + return None def _check_ocsp_response(response_ocsp, request_ocsp, issuer_cert, cert_path): diff --git a/certbot/tests/ocsp_test.py b/certbot/tests/ocsp_test.py index 94281e53f..38a2f19cd 100644 --- a/certbot/tests/ocsp_test.py +++ b/certbot/tests/ocsp_test.py @@ -165,12 +165,12 @@ class OSCPTestCryptography(unittest.TestCase): self.addCleanup(self.mock_notAfter.stop) @mock.patch('certbot.ocsp._determine_ocsp_server') - @mock.patch('certbot.ocsp._check_ocsp_cryptography') - def test_ensure_cryptography_toggled(self, mock_check, mock_determine): + @mock.patch('certbot.ocsp._get_cryptography_ocsp_response') + def test_ensure_cryptography_toggled(self, mock_get, mock_determine): mock_determine.return_value = 'http://example.com' self.checker.ocsp_revoked(self.cert_obj) - mock_check.assert_called_once_with(self.cert_path, self.chain_path, 'http://example.com', 10) + mock_get.assert_called_once_with(self.cert_path, self.chain_path, 'http://example.com', 10) def test_revoke(self): with _ocsp_mock(ocsp_lib.OCSPCertStatus.REVOKED, ocsp_lib.OCSPResponseStatus.SUCCESSFUL):