diff --git a/certbot/certbot/ocsp.py b/certbot/certbot/ocsp.py index 00c1f2549..863c5f163 100644 --- a/certbot/certbot/ocsp.py +++ b/certbot/certbot/ocsp.py @@ -65,26 +65,26 @@ class RevocationChecker(object): .. todo:: Make this a non-blocking call :param `.interfaces.RenewableCert` cert: Certificate object - :returns: True if revoked; False if valid or the check failed or cert is expired. :rtype: bool """ - return self.ocsp_revoked_by_paths(cert.cert_path, cert.chain_path) - def ocsp_revoked_by_paths(self, cert_path, chain_path, timeout=10, response_file=None): - # type: (str, str, int, Optional[str]) -> bool + def ocsp_revoked_by_paths(self, cert_path, chain_path, timeout=10): + # type: (str, str, int) -> bool """Performs the OCSP revocation check - :param str cert_path: Certificate path + :param str cert_path: Certificate filepath :param str chain_path: Certificate chain :param int timeout: Timeout (in seconds) for the OCSP query - :param str response_file: File path where the raw OCSP response should be written :returns: True if revoked; False if valid or the check failed or cert is expired. :rtype: bool + """ + if self.broken: + return False # Let's Encrypt doesn't update OCSP for expired certificates, # so don't check OCSP if the cert is expired. @@ -93,37 +93,16 @@ class RevocationChecker(object): if crypto_util.notAfter(cert_path) <= now: return False - if self.broken: - return False - url, host = _determine_ocsp_server(cert_path) if not host or not url: return False - if self.use_openssl_binary: - return self._check_ocsp_openssl_bin(cert_path, chain_path, host, url, - timeout, response_file) - return _check_ocsp_cryptography(cert_path, chain_path, url, timeout, response_file) - - def ocsp_times(self, response_file): - # type: (str) -> Tuple[Optional[datetime], Optional[datetime], Optional[datetime]] - """ - Reads OCSP response file and returns producedAt, thisUpdate - and nextUpdate values in datetime format, or None if an error - occurs. - - :param str response_file: File path to OCSP response - - :returns: tuple of producedAt, thisUpdate and nextUpdate values - :rtype: tuple of datetime or None - """ if self.use_openssl_binary: - return _ocsp_times_openssl_bin(response_file) - return _ocsp_times_cryptography(response_file) + return self._check_ocsp_openssl_bin(cert_path, chain_path, host, url, timeout) + return _check_ocsp_cryptography(cert_path, chain_path, url, timeout) - def _check_ocsp_openssl_bin(self, cert_path, chain_path, host, url, timeout, response_file=None): # pylint: disable=line-too-long - - # type: (str, str, str, str, int, Optional[str]) -> bool + def _check_ocsp_openssl_bin(self, cert_path, chain_path, host, url, timeout): + # type: (str, str, str, str, int) -> bool # Minimal implementation of proxy selection logic as seen in, e.g., cURL # Some things that won't work, but may well be in use somewhere: # - username and password for proxy authentication @@ -145,16 +124,11 @@ class RevocationChecker(object): "-no_nonce", "-issuer", chain_path, "-cert", cert_path, - "-url", url, "-CAfile", chain_path, "-verify_other", chain_path, "-trust_other", "-timeout", str(timeout), "-header"] + self.host_args(host) + url_opts - - if response_file: # pragma: no cover - cmd += ["-respout", response_file] - logger.debug("Querying OCSP for %s", cert_path) logger.debug(" ".join(cmd)) try: @@ -196,58 +170,8 @@ def _determine_ocsp_server(cert_path): return None, None -def _ocsp_times_openssl_bin(response_file): - # type: (str) -> Tuple[Optional[datetime], Optional[datetime], Optional[datetime]] - """ - Reads OCSP response file using OpenSSL binary and returns - producedAt, thisUpdate and nextUpdate values in datetime format. - - :param str response_file: File path to OCSP response - - :returns: tuple of producedAt, thisUpdate and nextUpdate values - :rtype: tuple of datetime or None - """ - cmd = ["openssl", "ocsp", "-resp_text", "-noverify", "-respin", response_file] - logger.debug("Reading OCSP response from file: %s", response_file) - logger.debug(" ".join(cmd)) - try: - output, _ = util.run_script(cmd, log=logger.debug) - except errors.SubprocessError: - logger.info("Reading OCSP response from file failed.") - return None, None, None - - prod_str, this_str, next_str = _translate_ocsp_response_times(output) - prod_dt = _parse_datetime(prod_str) - this_dt = _parse_datetime(this_str) - next_dt = _parse_datetime(next_str) - return prod_dt, this_dt, next_dt - - -def _ocsp_times_cryptography(response_file): - # type: (str) -> Tuple[Optional[datetime], Optional[datetime], Optional[datetime]] - """ - Reads OCSP response using cryptography and returns producedAt, - thisUpdate and nextUpdate values in datetime format, or None - if the file cannot be opened. - - :param str response_file: File path to OCSP response - - :returns: tuple of producedAt, thisUpdate and nextUpdate values - :rtype: tuple of datetime or None - """ - - try: - with open(response_file, 'rb') as fh: - raw_response = fh.read() - except OSError: - return None, None, None - - response = ocsp.load_der_ocsp_response(raw_response) - return response.produced_at, response.this_update, response.next_update - - -def _check_ocsp_cryptography(cert_path, chain_path, url, timeout, response_file=None): - # type: (str, str, str, int, Optional[str]) -> bool +def _check_ocsp_cryptography(cert_path, chain_path, url, timeout): + # type: (str, str, str, int) -> bool # Retrieve OCSP response with open(chain_path, 'rb') as file_handler: issuer = x509.load_pem_x509_certificate(file_handler.read(), default_backend()) @@ -268,10 +192,6 @@ def _check_ocsp_cryptography(cert_path, chain_path, url, timeout, response_file= logger.info("OCSP check failed for %s (HTTP status: %d)", cert_path, response.status_code) return False - if response_file: # pragma: no cover - with open(response_file, 'wb') as fh: - fh.write(response.content) - response_ocsp = ocsp.load_der_ocsp_response(response.content) # Check OCSP response validity @@ -407,54 +327,3 @@ def _translate_ocsp_query(cert_path, ocsp_output, ocsp_errors): logger.warning("Unable to properly parse OCSP output: %s\nstderr:%s", ocsp_output, ocsp_errors) return False - - -def _translate_ocsp_response_times(response): - # type: (str) -> Tuple[str, str, str] - """ - Parse openssl OCSP response output and return producedAt, - thisUpdate and nextUpdate values. - - :param str response: OpenSSL OCSP response output - - :returns: tuple of producedAt, thisUpdate and nextUpdate values - :rtype: tuple of str - """ - - prod_pattern = "Produced At: (.+)$" - this_pattern = "This Update: (.+)$" - next_pattern = "Next Update: (.+)$" - - prod_date = "" - this_date = "" - next_date = "" - - prod_match = re.search(prod_pattern, response, flags=re.MULTILINE) - if prod_match: - prod_date = prod_match.group(1) - - this_match = re.search(this_pattern, response, flags=re.MULTILINE) - if this_match: - this_date = this_match.group(1) - - next_match = re.search(next_pattern, response, flags=re.MULTILINE) - if next_match: - next_date = next_match.group(1) - - return prod_date, this_date, next_date - - -def _parse_datetime(dt_string): - """ - Parses a string to datetime, ignoring timezone. - - :param str dt_string: String representation of date and time - - :returns: datetime representation of time - :rtype: datetime.datetime or None - """ - try: - dateformat = "%b %d %H:%M:%S %Y %Z" - return datetime.strptime(dt_string, dateformat) - except ValueError: - return None diff --git a/certbot/tests/ocsp_test.py b/certbot/tests/ocsp_test.py index 8b2e8fdee..af54844cf 100644 --- a/certbot/tests/ocsp_test.py +++ b/certbot/tests/ocsp_test.py @@ -140,40 +140,6 @@ class OCSPTestOpenSSL(unittest.TestCase): self.assertEqual(ocsp._translate_ocsp_query(*openssl_expired_ocsp_revoked), True) self.assertEqual(mock_log.info.call_count, 1) - @mock.patch('certbot.util.run_script') - def test_ocsp_response_get_times(self, mock_run): - mock_run.return_value = ocsp_times_example - producedAt, thisUpdate, nextUpdate = self.checker.ocsp_times("mocked") - self.assertEqual(producedAt, datetime(2020, 1, 24, 11, 10)) - self.assertEqual(thisUpdate, datetime(2020, 1, 24, 11, 0)) - self.assertEqual(nextUpdate, datetime(2020, 1, 31, 11, 0)) - - @mock.patch('certbot.util.run_script') - def test_ocsp_response_get_times_no_nextupdate(self, mock_run): - mock_run.return_value = ocsp_times_example_nonext - producedAt, thisUpdate, nextUpdate = self.checker.ocsp_times("mocked") - self.assertEqual(producedAt, datetime(2020, 1, 24, 11, 10)) - self.assertEqual(thisUpdate, datetime(2020, 1, 24, 11, 0)) - self.assertEqual(nextUpdate, None) - - @mock.patch('certbot.util.run_script') - def test_ocsp_response_get_times_badoutput(self, mock_run): - mock_run.return_value = ("Something unparsable", "") - producedAt, thisUpdate, nextUpdate = self.checker.ocsp_times("mocked") - self.assertEqual(producedAt, None) - self.assertEqual(thisUpdate, None) - self.assertEqual(nextUpdate, None) - - @mock.patch('certbot.ocsp.logger') - @mock.patch('certbot.util.run_script') - def test_ocsp_response_get_times_error(self, mock_run, mock_log): - mock_run.side_effect = errors.SubprocessError - producedAt, thisUpdate, nextUpdate = self.checker.ocsp_times("mocked") - self.assertEqual(producedAt, None) - self.assertEqual(thisUpdate, None) - self.assertEqual(nextUpdate, None) - self.assertEqual(mock_log.info.call_count, 1) - @unittest.skipIf(not ocsp_lib, reason='This class tests functionalities available only on cryptography>=2.5.0') @@ -199,12 +165,11 @@ class OSCPTestCryptography(unittest.TestCase): @mock.patch('certbot.ocsp._determine_ocsp_server') @mock.patch('certbot.ocsp._check_ocsp_cryptography') - def test_ensure_cryptography_toggled(self, mock_revoke, mock_determine): + def test_ensure_cryptography_toggled(self, mock_check, mock_determine): mock_determine.return_value = ('http://example.com', 'example.com') self.checker.ocsp_revoked(self.cert_obj) - mock_revoke.assert_called_once_with(self.cert_path, self.chain_path, - 'http://example.com', 10, None) + mock_check.assert_called_once_with(self.cert_path, self.chain_path, 'http://example.com', 10) def test_revoke(self): with _ocsp_mock(ocsp_lib.OCSPCertStatus.REVOKED, ocsp_lib.OCSPResponseStatus.SUCCESSFUL): @@ -314,40 +279,6 @@ class OSCPTestCryptography(unittest.TestCase): revoked = self.checker.ocsp_revoked(self.cert_obj) self.assertFalse(revoked) - def test_ocsp_times_cryptography(self): - with mock.patch('certbot.ocsp.open', mock.mock_open(read_data="")): - with mock.patch('cryptography.x509.ocsp.load_der_ocsp_response') as mock_load: - resp = mock.MagicMock() - resp.produced_at = datetime(2020, 1, 2, 9, 9) - resp.this_update = datetime(2020, 1, 2, 8, 8) - resp.next_update = datetime(2020, 1, 3, 4, 4) - mock_load.return_value = resp - - produced_at, this_update, next_update = self.checker.ocsp_times("mocked") - self.assertEqual(produced_at, datetime(2020, 1, 2, 9, 9)) - self.assertEqual(this_update, datetime(2020, 1, 2, 8, 8)) - self.assertEqual(next_update, datetime(2020, 1, 3, 4, 4)) - - def test_ocsp_times_cryptography_no_nextupdate(self): - with mock.patch('certbot.ocsp.open', mock.mock_open(read_data="")): - with mock.patch('cryptography.x509.ocsp.load_der_ocsp_response') as mock_load: - resp = mock.MagicMock() - resp.produced_at = datetime(2020, 1, 2, 9, 9) - resp.this_update = datetime(2020, 1, 2, 8, 8) - resp.next_update = None - mock_load.return_value = resp - - produced_at, this_update, next_update = self.checker.ocsp_times("mocked") - self.assertEqual(produced_at, datetime(2020, 1, 2, 9, 9)) - self.assertEqual(this_update, datetime(2020, 1, 2, 8, 8)) - self.assertEqual(next_update, None) - - def test_ocsp_times_cryptography_error(self): - with mock.patch('certbot.ocsp.open', mock.mock_open(read_data="")) as mock_open: - mock_open.side_effect = OSError - produced_at, this_update, next_update = self.checker.ocsp_times("mocked") - self.assertEqual([produced_at, this_update, next_update], [None, None, None]) - @contextlib.contextmanager def _ocsp_mock(certificate_status, response_status, @@ -447,17 +378,6 @@ revoked """, """Response verify OK""") -ocsp_times_example = (""" - Produced At: Jan 24 11:10:00 2020 GMT - This Update: Jan 24 11:00:00 2020 GMT - Next Update: Jan 31 11:00:00 2020 GMT -""", "") - -ocsp_times_example_nonext = (""" - Produced At: Jan 24 11:10:00 2020 GMT - This Update: Jan 24 11:00:00 2020 GMT -""", "") - if __name__ == '__main__': unittest.main() # pragma: no cover