Revert certbot.ocsp changes.

This commit is contained in:
Brad Warren 2020-06-17 11:53:32 -07:00
parent 0976176a56
commit 7bf1e9a061
2 changed files with 14 additions and 225 deletions

View file

@ -65,26 +65,26 @@ class RevocationChecker(object):
.. todo:: Make this a non-blocking call
:param `.interfaces.RenewableCert` cert: Certificate object
:returns: True if revoked; False if valid or the check failed or cert is expired.
:rtype: bool
"""
return self.ocsp_revoked_by_paths(cert.cert_path, cert.chain_path)
def ocsp_revoked_by_paths(self, cert_path, chain_path, timeout=10, response_file=None):
# type: (str, str, int, Optional[str]) -> bool
def ocsp_revoked_by_paths(self, cert_path, chain_path, timeout=10):
# type: (str, str, int) -> bool
"""Performs the OCSP revocation check
:param str cert_path: Certificate path
:param str cert_path: Certificate filepath
:param str chain_path: Certificate chain
:param int timeout: Timeout (in seconds) for the OCSP query
:param str response_file: File path where the raw OCSP response should be written
:returns: True if revoked; False if valid or the check failed or cert is expired.
:rtype: bool
"""
if self.broken:
return False
# Let's Encrypt doesn't update OCSP for expired certificates,
# so don't check OCSP if the cert is expired.
@ -93,37 +93,16 @@ class RevocationChecker(object):
if crypto_util.notAfter(cert_path) <= now:
return False
if self.broken:
return False
url, host = _determine_ocsp_server(cert_path)
if not host or not url:
return False
if self.use_openssl_binary:
return self._check_ocsp_openssl_bin(cert_path, chain_path, host, url,
timeout, response_file)
return _check_ocsp_cryptography(cert_path, chain_path, url, timeout, response_file)
def ocsp_times(self, response_file):
# type: (str) -> Tuple[Optional[datetime], Optional[datetime], Optional[datetime]]
"""
Reads OCSP response file and returns producedAt, thisUpdate
and nextUpdate values in datetime format, or None if an error
occurs.
:param str response_file: File path to OCSP response
:returns: tuple of producedAt, thisUpdate and nextUpdate values
:rtype: tuple of datetime or None
"""
if self.use_openssl_binary:
return _ocsp_times_openssl_bin(response_file)
return _ocsp_times_cryptography(response_file)
return self._check_ocsp_openssl_bin(cert_path, chain_path, host, url, timeout)
return _check_ocsp_cryptography(cert_path, chain_path, url, timeout)
def _check_ocsp_openssl_bin(self, cert_path, chain_path, host, url, timeout, response_file=None): # pylint: disable=line-too-long
# type: (str, str, str, str, int, Optional[str]) -> bool
def _check_ocsp_openssl_bin(self, cert_path, chain_path, host, url, timeout):
# type: (str, str, str, str, int) -> bool
# Minimal implementation of proxy selection logic as seen in, e.g., cURL
# Some things that won't work, but may well be in use somewhere:
# - username and password for proxy authentication
@ -145,16 +124,11 @@ class RevocationChecker(object):
"-no_nonce",
"-issuer", chain_path,
"-cert", cert_path,
"-url", url,
"-CAfile", chain_path,
"-verify_other", chain_path,
"-trust_other",
"-timeout", str(timeout),
"-header"] + self.host_args(host) + url_opts
if response_file: # pragma: no cover
cmd += ["-respout", response_file]
logger.debug("Querying OCSP for %s", cert_path)
logger.debug(" ".join(cmd))
try:
@ -196,58 +170,8 @@ def _determine_ocsp_server(cert_path):
return None, None
def _ocsp_times_openssl_bin(response_file):
# type: (str) -> Tuple[Optional[datetime], Optional[datetime], Optional[datetime]]
"""
Reads OCSP response file using OpenSSL binary and returns
producedAt, thisUpdate and nextUpdate values in datetime format.
:param str response_file: File path to OCSP response
:returns: tuple of producedAt, thisUpdate and nextUpdate values
:rtype: tuple of datetime or None
"""
cmd = ["openssl", "ocsp", "-resp_text", "-noverify", "-respin", response_file]
logger.debug("Reading OCSP response from file: %s", response_file)
logger.debug(" ".join(cmd))
try:
output, _ = util.run_script(cmd, log=logger.debug)
except errors.SubprocessError:
logger.info("Reading OCSP response from file failed.")
return None, None, None
prod_str, this_str, next_str = _translate_ocsp_response_times(output)
prod_dt = _parse_datetime(prod_str)
this_dt = _parse_datetime(this_str)
next_dt = _parse_datetime(next_str)
return prod_dt, this_dt, next_dt
def _ocsp_times_cryptography(response_file):
# type: (str) -> Tuple[Optional[datetime], Optional[datetime], Optional[datetime]]
"""
Reads OCSP response using cryptography and returns producedAt,
thisUpdate and nextUpdate values in datetime format, or None
if the file cannot be opened.
:param str response_file: File path to OCSP response
:returns: tuple of producedAt, thisUpdate and nextUpdate values
:rtype: tuple of datetime or None
"""
try:
with open(response_file, 'rb') as fh:
raw_response = fh.read()
except OSError:
return None, None, None
response = ocsp.load_der_ocsp_response(raw_response)
return response.produced_at, response.this_update, response.next_update
def _check_ocsp_cryptography(cert_path, chain_path, url, timeout, response_file=None):
# type: (str, str, str, int, Optional[str]) -> bool
def _check_ocsp_cryptography(cert_path, chain_path, url, timeout):
# type: (str, str, str, int) -> bool
# Retrieve OCSP response
with open(chain_path, 'rb') as file_handler:
issuer = x509.load_pem_x509_certificate(file_handler.read(), default_backend())
@ -268,10 +192,6 @@ def _check_ocsp_cryptography(cert_path, chain_path, url, timeout, response_file=
logger.info("OCSP check failed for %s (HTTP status: %d)", cert_path, response.status_code)
return False
if response_file: # pragma: no cover
with open(response_file, 'wb') as fh:
fh.write(response.content)
response_ocsp = ocsp.load_der_ocsp_response(response.content)
# Check OCSP response validity
@ -407,54 +327,3 @@ def _translate_ocsp_query(cert_path, ocsp_output, ocsp_errors):
logger.warning("Unable to properly parse OCSP output: %s\nstderr:%s",
ocsp_output, ocsp_errors)
return False
def _translate_ocsp_response_times(response):
# type: (str) -> Tuple[str, str, str]
"""
Parse openssl OCSP response output and return producedAt,
thisUpdate and nextUpdate values.
:param str response: OpenSSL OCSP response output
:returns: tuple of producedAt, thisUpdate and nextUpdate values
:rtype: tuple of str
"""
prod_pattern = "Produced At: (.+)$"
this_pattern = "This Update: (.+)$"
next_pattern = "Next Update: (.+)$"
prod_date = ""
this_date = ""
next_date = ""
prod_match = re.search(prod_pattern, response, flags=re.MULTILINE)
if prod_match:
prod_date = prod_match.group(1)
this_match = re.search(this_pattern, response, flags=re.MULTILINE)
if this_match:
this_date = this_match.group(1)
next_match = re.search(next_pattern, response, flags=re.MULTILINE)
if next_match:
next_date = next_match.group(1)
return prod_date, this_date, next_date
def _parse_datetime(dt_string):
"""
Parses a string to datetime, ignoring timezone.
:param str dt_string: String representation of date and time
:returns: datetime representation of time
:rtype: datetime.datetime or None
"""
try:
dateformat = "%b %d %H:%M:%S %Y %Z"
return datetime.strptime(dt_string, dateformat)
except ValueError:
return None

View file

@ -140,40 +140,6 @@ class OCSPTestOpenSSL(unittest.TestCase):
self.assertEqual(ocsp._translate_ocsp_query(*openssl_expired_ocsp_revoked), True)
self.assertEqual(mock_log.info.call_count, 1)
@mock.patch('certbot.util.run_script')
def test_ocsp_response_get_times(self, mock_run):
mock_run.return_value = ocsp_times_example
producedAt, thisUpdate, nextUpdate = self.checker.ocsp_times("mocked")
self.assertEqual(producedAt, datetime(2020, 1, 24, 11, 10))
self.assertEqual(thisUpdate, datetime(2020, 1, 24, 11, 0))
self.assertEqual(nextUpdate, datetime(2020, 1, 31, 11, 0))
@mock.patch('certbot.util.run_script')
def test_ocsp_response_get_times_no_nextupdate(self, mock_run):
mock_run.return_value = ocsp_times_example_nonext
producedAt, thisUpdate, nextUpdate = self.checker.ocsp_times("mocked")
self.assertEqual(producedAt, datetime(2020, 1, 24, 11, 10))
self.assertEqual(thisUpdate, datetime(2020, 1, 24, 11, 0))
self.assertEqual(nextUpdate, None)
@mock.patch('certbot.util.run_script')
def test_ocsp_response_get_times_badoutput(self, mock_run):
mock_run.return_value = ("Something unparsable", "")
producedAt, thisUpdate, nextUpdate = self.checker.ocsp_times("mocked")
self.assertEqual(producedAt, None)
self.assertEqual(thisUpdate, None)
self.assertEqual(nextUpdate, None)
@mock.patch('certbot.ocsp.logger')
@mock.patch('certbot.util.run_script')
def test_ocsp_response_get_times_error(self, mock_run, mock_log):
mock_run.side_effect = errors.SubprocessError
producedAt, thisUpdate, nextUpdate = self.checker.ocsp_times("mocked")
self.assertEqual(producedAt, None)
self.assertEqual(thisUpdate, None)
self.assertEqual(nextUpdate, None)
self.assertEqual(mock_log.info.call_count, 1)
@unittest.skipIf(not ocsp_lib,
reason='This class tests functionalities available only on cryptography>=2.5.0')
@ -199,12 +165,11 @@ class OSCPTestCryptography(unittest.TestCase):
@mock.patch('certbot.ocsp._determine_ocsp_server')
@mock.patch('certbot.ocsp._check_ocsp_cryptography')
def test_ensure_cryptography_toggled(self, mock_revoke, mock_determine):
def test_ensure_cryptography_toggled(self, mock_check, mock_determine):
mock_determine.return_value = ('http://example.com', 'example.com')
self.checker.ocsp_revoked(self.cert_obj)
mock_revoke.assert_called_once_with(self.cert_path, self.chain_path,
'http://example.com', 10, None)
mock_check.assert_called_once_with(self.cert_path, self.chain_path, 'http://example.com', 10)
def test_revoke(self):
with _ocsp_mock(ocsp_lib.OCSPCertStatus.REVOKED, ocsp_lib.OCSPResponseStatus.SUCCESSFUL):
@ -314,40 +279,6 @@ class OSCPTestCryptography(unittest.TestCase):
revoked = self.checker.ocsp_revoked(self.cert_obj)
self.assertFalse(revoked)
def test_ocsp_times_cryptography(self):
with mock.patch('certbot.ocsp.open', mock.mock_open(read_data="")):
with mock.patch('cryptography.x509.ocsp.load_der_ocsp_response') as mock_load:
resp = mock.MagicMock()
resp.produced_at = datetime(2020, 1, 2, 9, 9)
resp.this_update = datetime(2020, 1, 2, 8, 8)
resp.next_update = datetime(2020, 1, 3, 4, 4)
mock_load.return_value = resp
produced_at, this_update, next_update = self.checker.ocsp_times("mocked")
self.assertEqual(produced_at, datetime(2020, 1, 2, 9, 9))
self.assertEqual(this_update, datetime(2020, 1, 2, 8, 8))
self.assertEqual(next_update, datetime(2020, 1, 3, 4, 4))
def test_ocsp_times_cryptography_no_nextupdate(self):
with mock.patch('certbot.ocsp.open', mock.mock_open(read_data="")):
with mock.patch('cryptography.x509.ocsp.load_der_ocsp_response') as mock_load:
resp = mock.MagicMock()
resp.produced_at = datetime(2020, 1, 2, 9, 9)
resp.this_update = datetime(2020, 1, 2, 8, 8)
resp.next_update = None
mock_load.return_value = resp
produced_at, this_update, next_update = self.checker.ocsp_times("mocked")
self.assertEqual(produced_at, datetime(2020, 1, 2, 9, 9))
self.assertEqual(this_update, datetime(2020, 1, 2, 8, 8))
self.assertEqual(next_update, None)
def test_ocsp_times_cryptography_error(self):
with mock.patch('certbot.ocsp.open', mock.mock_open(read_data="")) as mock_open:
mock_open.side_effect = OSError
produced_at, this_update, next_update = self.checker.ocsp_times("mocked")
self.assertEqual([produced_at, this_update, next_update], [None, None, None])
@contextlib.contextmanager
def _ocsp_mock(certificate_status, response_status,
@ -447,17 +378,6 @@ revoked
""",
"""Response verify OK""")
ocsp_times_example = ("""
Produced At: Jan 24 11:10:00 2020 GMT
This Update: Jan 24 11:00:00 2020 GMT
Next Update: Jan 31 11:00:00 2020 GMT
""", "")
ocsp_times_example_nonext = ("""
Produced At: Jan 24 11:10:00 2020 GMT
This Update: Jan 24 11:00:00 2020 GMT
""", "")
if __name__ == '__main__':
unittest.main() # pragma: no cover