diff --git a/certbot/certbot/_internal/ocsp.py b/certbot/certbot/_internal/ocsp.py index d16aea918..300639b19 100644 --- a/certbot/certbot/_internal/ocsp.py +++ b/certbot/certbot/_internal/ocsp.py @@ -21,7 +21,6 @@ from acme.magic_typing import Tuple # pylint: disable=unused-import, no-name-in from certbot import crypto_util from certbot import errors from certbot import util -from certbot._internal.storage import RenewableCert # pylint: disable=unused-import try: # Only cryptography>=2.5 has ocsp module @@ -32,7 +31,6 @@ except (ImportError, AttributeError): # pragma: no cover ocsp = None # type: ignore - logger = logging.getLogger(__name__) @@ -112,34 +110,9 @@ class RevocationChecker(object): """ if self.use_openssl_binary: - return self._ocsp_times_openssl_bin(response_file) + return _ocsp_times_openssl_bin(response_file) return _ocsp_times_cryptography(response_file) - def _ocsp_times_openssl_bin(self, response_file): - """ - Reads OCSP response file using OpenSSL binary and returns - producedAt, thisUpdate and nextUpdate values in datetime format. - - :param str response_file: File path to OCSP response - - :returns: tuple of producedAt, thisUpdate and nextUpdate values - :rtype: tuple of datetime - """ - cmd = ["openssl", "ocsp", "-resp_text", "-noverify", "-respin", response_file] - logger.debug("Reading OCSP response from temp file: %s", response_file) - logger.debug(" ".join(cmd)) - try: - output, err = util.run_script(cmd, log=logger.debug) - except errors.SubprocessError: - logger.info("Reading OCSP response from file failed.") - return None, None, None - - prod_str, this_str, next_str = _translate_ocsp_response_times(output) - prod_dt = util.parse_datetime(prod_str) - this_dt = util.parse_datetime(this_str) - next_dt = util.parse_datetime(next_str) - return prod_dt, this_dt, next_dt - def _check_ocsp_openssl_bin(self, cert_path, chain_path, host, url, response_file=None): # type: (str, str, str, str, Optional[str]) -> bool # jdkasten thanks "Bulletproof SSL and TLS - Ivan Ristic" for documenting this! @@ -195,10 +168,37 @@ def _determine_ocsp_server(cert_path): return None, None +def _ocsp_times_openssl_bin(response_file): + """ + Reads OCSP response file using OpenSSL binary and returns + producedAt, thisUpdate and nextUpdate values in datetime format. + + :param str response_file: File path to OCSP response + + :returns: tuple of producedAt, thisUpdate and nextUpdate values + :rtype: tuple of datetime + """ + cmd = ["openssl", "ocsp", "-resp_text", "-noverify", "-respin", response_file] + logger.debug("Reading OCSP response from temp file: %s", response_file) + logger.debug(" ".join(cmd)) + try: + output, err = util.run_script(cmd, log=logger.debug) + except errors.SubprocessError: + logger.info("Reading OCSP response from file failed.") + return None, None, None + + prod_str, this_str, next_str = _translate_ocsp_response_times(output) + prod_dt = util.parse_datetime(prod_str) + this_dt = util.parse_datetime(this_str) + next_dt = util.parse_datetime(next_str) + return prod_dt, this_dt, next_dt + + def _ocsp_times_cryptography(response_file): """ Reads OCSP response using cryptography and returns producedAt, - thisUpdate and nextUpdate values in datetime format. + thisUpdate and nextUpdate values in datetime format, or None + if the file cannot be opened. :param str response_file: File path to OCSP response @@ -206,8 +206,11 @@ def _ocsp_times_cryptography(response_file): :rtype: tuple of datetime """ - with open(response_file, 'rb') as fh: - raw_response = fh.read() + try: + with open(response_file, 'rb') as fh: + raw_response = fh.read() + except OSError: + return None, None, None response = ocsp.load_der_ocsp_response(raw_response) return response.produced_at, response.this_update, response.next_update @@ -371,7 +374,7 @@ def _translate_ocsp_query(cert_path, ocsp_output, ocsp_errors): return True else: logger.warning("Unable to properly parse OCSP output: %s\nstderr:%s", - ocsp_output, ocsp_errors) + ocsp_output, ocsp_errors) return False diff --git a/certbot/tests/ocsp_test.py b/certbot/tests/ocsp_test.py index b550546c0..9f298a365 100644 --- a/certbot/tests/ocsp_test.py +++ b/certbot/tests/ocsp_test.py @@ -137,6 +137,32 @@ class OCSPTestOpenSSL(unittest.TestCase): self.assertEqual(ocsp._translate_ocsp_query(*openssl_expired_ocsp_revoked), True) self.assertEqual(mock_log.info.call_count, 1) + @mock.patch('certbot.util.run_script') + def test_ocsp_response_get_times(self, mock_run): + mock_run.return_value = ocsp_times_example + producedAt, thisUpdate, nextUpdate = self.checker.ocsp_times("mocked") + self.assertEqual(producedAt, datetime(2020, 1, 24, 11, 10)) + self.assertEqual(thisUpdate, datetime(2020, 1, 24, 11, 0)) + self.assertEqual(nextUpdate, datetime(2020, 1, 31, 11, 0)) + + @mock.patch('certbot.util.run_script') + def test_ocsp_response_get_times_badoutput(self, mock_run): + mock_run.return_value = ("Something unparsable", "") + producedAt, thisUpdate, nextUpdate = self.checker.ocsp_times("mocked") + self.assertEqual(producedAt, None) + self.assertEqual(thisUpdate, None) + self.assertEqual(nextUpdate, None) + + @mock.patch('certbot._internal.ocsp.logger') + @mock.patch('certbot.util.run_script') + def test_ocsp_response_get_times_error(self, mock_run, mock_log): + mock_run.side_effect = errors.SubprocessError + producedAt, thisUpdate, nextUpdate = self.checker.ocsp_times("mocked") + self.assertEqual(producedAt, None) + self.assertEqual(thisUpdate, None) + self.assertEqual(nextUpdate, None) + self.assertEqual(mock_log.info.call_count, 1) + @unittest.skipIf(not ocsp_lib, reason='This class tests functionalities available only on cryptography>=2.5.0') @@ -278,6 +304,26 @@ class OSCPTestCryptography(unittest.TestCase): revoked = self.checker.ocsp_revoked(self.cert_obj) self.assertFalse(revoked) + def test_ocsp_times_cryptography(self): + with mock.patch('certbot._internal.ocsp.open', mock.mock_open(read_data="")): + with mock.patch('cryptography.x509.ocsp.load_der_ocsp_response') as mock_load: + resp = mock.MagicMock() + resp.produced_at = datetime(2020, 1, 2, 9, 9) + resp.this_update = datetime(2020, 1, 2, 8, 8) + resp.next_update = datetime(2020, 1, 3, 4, 4) + mock_load.return_value = resp + + produced_at, this_update, next_update = self.checker.ocsp_times("mocked") + self.assertEqual(produced_at, datetime(2020, 1, 2, 9, 9)) + self.assertEqual(this_update, datetime(2020, 1, 2, 8, 8)) + self.assertEqual(next_update, datetime(2020, 1, 3, 4, 4)) + + def test_ocsp_times_cryptography_error(self): + with mock.patch('certbot._internal.ocsp.open', mock.mock_open(read_data="")) as mock_open: + mock_open.side_effect = OSError + produced_at, this_update, next_update = self.checker.ocsp_times("mocked") + self.assertEqual([produced_at, this_update, next_update], [None, None, None]) + @contextlib.contextmanager def _ocsp_mock(certificate_status, response_status, @@ -377,6 +423,12 @@ revoked """, """Response verify OK""") +ocsp_times_example = (""" + Produced At: Jan 24 11:10:00 2020 GMT + This Update: Jan 24 11:00:00 2020 GMT + Next Update: Jan 31 11:00:00 2020 GMT +""", "") + if __name__ == '__main__': unittest.main() # pragma: no cover