Implement ocsp_response_by_paths.

This commit is contained in:
Brad Warren 2020-06-04 16:19:00 -07:00
parent 3d385debe2
commit 4422fea91a
2 changed files with 43 additions and 22 deletions

View file

@ -76,10 +76,10 @@ class RevocationChecker(object):
knows it is properly timestamped, signed, etc.
.. note:: This function currently only works when cryptography
is used for OCSP. Whether a new enough version of crypography
with OCSP support is available can be checked through
CRYPTOGRAPHY_OCSP_AVAILABLE. If it is not available, None is
always returned by this function for now.
is used for OCSP. Whether a new enough version of
cryptography with OCSP support is available can be checked
through CRYPTOGRAPHY_OCSP_AVAILABLE. If it is not available,
None is always returned by this function for now.
:param str cert_path: Certificate filepath
:param str chain_path: Certificate chain
@ -90,7 +90,14 @@ class RevocationChecker(object):
:rtype: interfaces.OCSPResponse or None
"""
return None
if self.use_openssl_binary:
return None
url = self._query_prep(cert_path)
if not url:
return None
return _get_cryptography_ocsp_response(cert_path, chain_path, url, timeout)
def ocsp_revoked(self, cert):
# type: (RenewableCert) -> bool
@ -117,14 +124,30 @@ class RevocationChecker(object):
:rtype: bool
"""
if self.use_openssl_binary:
return self._ocsp_revoked_by_paths_openssl(cert_path, chain_path, timeout)
else:
return self._ocsp_revoked_by_paths_cryptography(cert_path, chain_path, timeout)
def _ocsp_revoked_by_paths_openssl(self, cert_path, chain_path, timeout):
# type: (str, str, int) -> bool
"""ocsp_revoked_by_paths implementation shelling out to openssl."""
url = self._query_prep(cert_path)
if not url:
return False
host = _host_from_url(url)
return self._check_ocsp_openssl_bin(cert_path, chain_path, host, url, timeout)
if self.use_openssl_binary:
host = _host_from_url(url)
return self._check_ocsp_openssl_bin(cert_path, chain_path, host, url, timeout)
return _check_ocsp_cryptography(cert_path, chain_path, url, timeout)
def _ocsp_revoked_by_paths_cryptography(self, cert_path, chain_path, timeout):
# type: (str, str, int) -> bool
"""ocsp_revoked_by_paths implementation using cryptography."""
resp = self.ocsp_response_by_paths(cert_path, chain_path, timeout)
if resp is None:
return False
# Check OCSP certificate status
logger.debug("OCSP certificate status for %s is: %s",
cert_path, resp.certificate_status)
return resp.certificate_status == interfaces.OCSPCertStatus.REVOKED
def _query_prep(self, cert_path):
# type: (str) -> Tuple[Optional[str], Optional[str]]
@ -271,8 +294,8 @@ def _host_from_url(url):
return url.partition("://")[2].rstrip("/")
def _check_ocsp_cryptography(cert_path, chain_path, url, timeout):
# type: (str, str, str, int) -> bool
def _get_cryptography_ocsp_response(cert_path, chain_path, url, timeout):
# type: (str, str, str, int) -> Optional[_CryptographyOCSPResponse]
# Retrieve OCSP response
with open(chain_path, 'rb') as file_handler:
issuer = x509.load_pem_x509_certificate(file_handler.read(), default_backend())
@ -288,10 +311,10 @@ def _check_ocsp_cryptography(cert_path, chain_path, url, timeout):
timeout=timeout)
except requests.exceptions.RequestException:
logger.info("OCSP check failed for %s (are we offline?)", cert_path, exc_info=True)
return False
return None
if response.status_code != 200:
logger.info("OCSP check failed for %s (HTTP status: %d)", cert_path, response.status_code)
return False
return None
response_ocsp = ocsp.load_der_ocsp_response(response.content)
@ -299,7 +322,7 @@ def _check_ocsp_cryptography(cert_path, chain_path, url, timeout):
if response_ocsp.response_status != ocsp.OCSPResponseStatus.SUCCESSFUL:
logger.error("Invalid OCSP response status for %s: %s",
cert_path, response_ocsp.response_status)
return False
return None
# Check OCSP signature
try:
@ -313,12 +336,10 @@ def _check_ocsp_cryptography(cert_path, chain_path, url, timeout):
except AssertionError as error:
logger.error('Invalid OCSP response for %s: %s.', cert_path, str(error))
else:
# Check OCSP certificate status
logger.debug("OCSP certificate status for %s is: %s",
cert_path, response_ocsp.certificate_status)
return response_ocsp.certificate_status == ocsp.OCSPCertStatus.REVOKED
wrapped_response = _CryptographyOCSPResponse(response_ocsp)
return wrapped_response
return False
return None
def _check_ocsp_response(response_ocsp, request_ocsp, issuer_cert, cert_path):

View file

@ -165,12 +165,12 @@ class OSCPTestCryptography(unittest.TestCase):
self.addCleanup(self.mock_notAfter.stop)
@mock.patch('certbot.ocsp._determine_ocsp_server')
@mock.patch('certbot.ocsp._check_ocsp_cryptography')
def test_ensure_cryptography_toggled(self, mock_check, mock_determine):
@mock.patch('certbot.ocsp._get_cryptography_ocsp_response')
def test_ensure_cryptography_toggled(self, mock_get, mock_determine):
mock_determine.return_value = 'http://example.com'
self.checker.ocsp_revoked(self.cert_obj)
mock_check.assert_called_once_with(self.cert_path, self.chain_path, 'http://example.com', 10)
mock_get.assert_called_once_with(self.cert_path, self.chain_path, 'http://example.com', 10)
def test_revoke(self):
with _ocsp_mock(ocsp_lib.OCSPCertStatus.REVOKED, ocsp_lib.OCSPResponseStatus.SUCCESSFUL):