add _query_prep

This commit is contained in:
Brad Warren 2020-06-04 12:56:17 -07:00
parent 1a8b0df2f5
commit c8fa3fd9f9
2 changed files with 46 additions and 21 deletions

View file

@ -68,6 +68,7 @@ class RevocationChecker(object):
self.host_args = lambda host: ["Host", host]
def ocsp_response_by_paths(self, cert_path, chain_path, timeout=10):
# type: (str, str, int) -> Optional[interfaces.OCSPResponse]
"""Obtains a validated OCSP response.
The OCSP response could have any certificate status, however, if
@ -89,6 +90,7 @@ class RevocationChecker(object):
:rtype: interfaces.OCSPResponse or None
"""
return None
def ocsp_revoked(self, cert):
# type: (RenewableCert) -> bool
@ -115,23 +117,36 @@ class RevocationChecker(object):
:rtype: bool
"""
if self.broken:
url = self._query_prep(cert_path)
if not url:
return False
if self.use_openssl_binary:
host = _host_from_url(url)
return self._check_ocsp_openssl_bin(cert_path, chain_path, host, url, timeout)
return _check_ocsp_cryptography(cert_path, chain_path, url, timeout)
def _query_prep(self, cert_path):
# type: (str) -> Tuple[Optional[str], Optional[str]]
"""Prepare to make an OCSP query for the given cert.
:param str cert_path: Certificate filepath
:rtype: str or None
:returns: OCSP server URL if an OCSP query can be performed,
otherwise, None
"""
if self.broken:
return None
# Let's Encrypt doesn't update OCSP for expired certificates,
# so don't check OCSP if the cert is expired.
# https://github.com/certbot/certbot/issues/7152
now = pytz.UTC.fromutc(datetime.utcnow())
if crypto_util.notAfter(cert_path) <= now:
return False
return None
url, host = _determine_ocsp_server(cert_path)
if not host or not url:
return False
if self.use_openssl_binary:
return self._check_ocsp_openssl_bin(cert_path, chain_path, host, url, timeout)
return _check_ocsp_cryptography(cert_path, chain_path, url, timeout)
return _determine_ocsp_server(cert_path)
def _check_ocsp_openssl_bin(self, cert_path, chain_path, host, url, timeout):
# type: (str, str, str, str, int) -> bool
@ -180,8 +195,8 @@ def _determine_ocsp_server(cert_path):
"""Extract the OCSP server host from a certificate.
:param str cert_path: Path to the cert we're checking OCSP for
:rtype tuple:
:returns: (OCSP server URL or None, OCSP server host or None)
:rtype: str or None
:returns: OCSP server URL or None
"""
with open(cert_path, 'rb') as file_handler:
@ -195,15 +210,24 @@ def _determine_ocsp_server(cert_path):
url = descriptions[0].access_location.value
except (x509.ExtensionNotFound, IndexError):
logger.info("Cannot extract OCSP URI from %s", cert_path)
return None, None
return None
url = url.rstrip()
host = url.partition("://")[2].rstrip("/")
# Determining the host here may not be needed anymore since things have
# been refactored since the initial version of this function, but just in
# case, I kept it here as a sanity check of the URL value.
host = _host_from_url(url)
if host:
return url, host
return url
logger.info("Cannot process OCSP host from URL (%s) in cert at %s", url, cert_path)
return None, None
return None
def _host_from_url(url):
# type: (str) -> str
"""Returns the hostname from a URL."""
return url.partition("://")[2].rstrip("/")
def _check_ocsp_cryptography(cert_path, chain_path, url, timeout):

View file

@ -88,7 +88,7 @@ class OCSPTestOpenSSL(unittest.TestCase):
mock_na.return_value = now + timedelta(hours=2)
self.checker.broken = True
mock_determine.return_value = ("", "")
mock_determine.return_value = ""
self.assertEqual(self.checker.ocsp_revoked(cert_obj), False)
self.checker.broken = False
@ -96,7 +96,7 @@ class OCSPTestOpenSSL(unittest.TestCase):
self.assertEqual(self.checker.ocsp_revoked(cert_obj), False)
self.assertEqual(mock_run.call_count, 0)
mock_determine.return_value = ("http://x.co", "x.co")
mock_determine.return_value = "http://x.co"
self.assertEqual(self.checker.ocsp_revoked(cert_obj), False)
mock_run.side_effect = errors.SubprocessError("Unable to load certificate launcher")
self.assertEqual(self.checker.ocsp_revoked(cert_obj), False)
@ -104,17 +104,18 @@ class OCSPTestOpenSSL(unittest.TestCase):
# cert expired
mock_na.return_value = now
mock_determine.return_value = ("", "")
mock_determine.return_value = ""
count_before = mock_determine.call_count
self.assertEqual(self.checker.ocsp_revoked(cert_obj), False)
self.assertEqual(mock_determine.call_count, count_before)
def test_determine_ocsp_server(self):
def test_determine_ocsp_server_and_host_from_url(self):
cert_path = test_util.vector_path('ocsp_certificate.pem')
from certbot import ocsp
result = ocsp._determine_ocsp_server(cert_path)
self.assertEqual(('http://ocsp.test4.buypass.com', 'ocsp.test4.buypass.com'), result)
self.assertEqual('http://ocsp.test4.buypass.com', result)
self.assertEqual(ocsp._host_from_url(result), 'ocsp.test4.buypass.com')
@mock.patch('certbot.ocsp.logger')
@mock.patch('certbot.util.run_script')
@ -166,7 +167,7 @@ class OSCPTestCryptography(unittest.TestCase):
@mock.patch('certbot.ocsp._determine_ocsp_server')
@mock.patch('certbot.ocsp._check_ocsp_cryptography')
def test_ensure_cryptography_toggled(self, mock_check, mock_determine):
mock_determine.return_value = ('http://example.com', 'example.com')
mock_determine.return_value = 'http://example.com'
self.checker.ocsp_revoked(self.cert_obj)
mock_check.assert_called_once_with(self.cert_path, self.chain_path, 'http://example.com', 10)