diff --git a/certbot-apache/certbot_apache/configurator.py b/certbot-apache/certbot_apache/configurator.py index c1a3e8ec1..665254429 100644 --- a/certbot-apache/certbot_apache/configurator.py +++ b/certbot-apache/certbot_apache/configurator.py @@ -2562,7 +2562,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator): db.close() else: logger.warning("Encountered an issue while trying to prefetch OCSP " - "response for certificate: ", cert_path) + "response for certificate: %s", cert_path) def _ocsp_response_dbm(self, workfile): """Creates a dbm entry for OCSP response data diff --git a/certbot-apache/certbot_apache/tests/ocsp_test.py b/certbot-apache/certbot_apache/tests/ocsp_prefetch_test.py similarity index 60% rename from certbot-apache/certbot_apache/tests/ocsp_test.py rename to certbot-apache/certbot_apache/tests/ocsp_prefetch_test.py index d8554951c..d84ffe33f 100644 --- a/certbot-apache/certbot_apache/tests/ocsp_test.py +++ b/certbot-apache/certbot_apache/tests/ocsp_prefetch_test.py @@ -1,14 +1,11 @@ """Test for certbot_apache.configurator OCSP Prefetching functionality""" import dbm import os -import re import unittest import mock # six is used in mock.patch() import six # pylint: disable=unused-import -from certbot import errors, storage -from certbot_apache import constants from certbot_apache.tests import util @@ -40,7 +37,7 @@ class OCSPPrefetchTest(util.ApacheTest): cry_path = "certbot.crypto_util.cert_sha1_fingerprint" with mock.patch(ver_path) as mock_ver: - mock_ver.return_value = (2,4,10) + mock_ver.return_value = (2, 4, 10) with mock.patch(cry_path) as mock_cry: mock_cry.return_value = 'j\x056\x1f\xfa\x08B\xe8D\xa1Bn\xeb*A\xebWx\xdd\xfe' return func(*args, **kwargs) @@ -54,14 +51,13 @@ class OCSPPrefetchTest(util.ApacheTest): self.config.parser.modules.discard("mod_header.c") ref_path = "certbot_apache.configurator.ApacheConfigurator._ocsp_refresh" - - with mock.patch(ref_path): self.call_mocked(self.config.enable_ocsp_prefetch, self.lineage, ["ocspvhost.com"]) self.assertTrue(mock_enable.called) self.assertEquals(len(self.config._ocsp_prefetch), 1) + @mock.patch("certbot_apache.constants.OCSP_INTERNAL_TTL", 0) @mock.patch("certbot_apache.configurator.ApacheConfigurator.restart") def test_ocsp_prefetch_refresh(self, _mock_restart): @@ -86,6 +82,25 @@ class OCSPPrefetchTest(util.ApacheTest): self.call_mocked(self.config.update_ocsp_prefetch, None) self.assertTrue(mock_ocsp.called) + @mock.patch("certbot_apache.configurator.ApacheConfigurator.restart") + def test_ocsp_prefetch_refresh_noop(self, _mock_restart): + def ocsp_req_mock(workfile): + """Method to mock the OCSP request and write response to file""" + with open(workfile, 'w') as fh: + fh.write("MOCKRESPONSE") + return True + + ocsp_path = "certbot.ocsp.OCSPResponseHandler.ocsp_request_to_file" + with mock.patch(ocsp_path, side_effect=ocsp_req_mock): + self.call_mocked(self.config.enable_ocsp_prefetch, + self.lineage, + ["ocspvhost.com"]) + self.assertEquals(len(self.config._ocsp_prefetch), 1) + refresh_path = "certbot_apache.configurator.ApacheConfigurator._ocsp_refresh" + with mock.patch(refresh_path) as mock_refresh: + self.call_mocked(self.config.update_ocsp_prefetch, None) + self.assertFalse(mock_refresh.called) + @mock.patch("certbot_apache.configurator.ApacheConfigurator.config_test") @mock.patch("certbot_apache.configurator.ApacheConfigurator._reload") def test_ocsp_prefetch_backup_db(self, mock_reload, _mock_test): @@ -97,20 +112,54 @@ class OCSPPrefetchTest(util.ApacheTest): self.assertFalse(os.path.isfile(db_path)) mock_reload.side_effect = ocsp_del_db - db_path = os.path.join(self.config_dir, "ocsp", "ocsp_cache.db") self.config._ensure_ocsp_dirs() odbm = dbm.open(db_path[:-3], 'c') odbm["mock_key"] = "mock_value" odbm.close() # Mock OCSP prefetch dict to signify that there should be a db - self.config._ocsp_prefetch = {"mock":"value"} + self.config._ocsp_prefetch = {"mock": "value"} self.config.restart() odbm = dbm.open(db_path[:-3], 'c') self.assertEquals(odbm["mock_key"], "mock_value") odbm.close() + @mock.patch("certbot_apache.configurator.ApacheConfigurator.config_test") + @mock.patch("certbot_apache.configurator.ApacheConfigurator._reload") + def test_ocsp_prefetch_backup_db_error(self, _mock_reload, _mock_test): + self.config._ensure_ocsp_dirs() + log_path = "certbot_apache.configurator.logger.debug" + log_string = "Encountered an issue while trying to backup OCSP dbm file" + log_string2 = "Encountered an issue when trying to restore OCSP dbm file" + self.config._ocsp_prefetch = {"mock": "value"} + with mock.patch("shutil.copy2", side_effect=IOError): + with mock.patch(log_path) as mock_log: + self.config.restart() + self.assertTrue(mock_log.called) + self.assertEquals(mock_log.call_count, 2) + self.assertTrue(log_string in mock_log.call_args_list[0][0][0]) + self.assertTrue(log_string2 in mock_log.call_args_list[1][0][0]) + + @mock.patch("certbot_apache.configurator.ApacheConfigurator.restart") + def test_ocsp_prefetch_refresh_fail(self, _mock_restart): + ocsp_path = "certbot.ocsp.OCSPResponseHandler.ocsp_request_to_file" + log_path = "certbot_apache.configurator.logger.warning" + with mock.patch(ocsp_path) as mock_ocsp: + mock_ocsp.return_value = False + with mock.patch(log_path) as mock_log: + self.call_mocked(self.config.enable_ocsp_prefetch, + self.lineage, + ["ocspvhost.com"]) + self.assertTrue(mock_log.called) + self.assertTrue( + "trying to prefetch OCSP" in mock_log.call_args[0][0]) + + @mock.patch("certbot_apache.configurator.ApacheConfigurator._ocsp_refresh_if_needed") + def test_ocsp_prefetch_update_noop(self, mock_refresh): + self.config.update_ocsp_prefetch(None) + self.assertFalse(mock_refresh.called) + if __name__ == "__main__": unittest.main() # pragma: no cover diff --git a/certbot/crypto_util.py b/certbot/crypto_util.py index 2163c6c15..0c65c0b11 100644 --- a/certbot/crypto_util.py +++ b/certbot/crypto_util.py @@ -14,7 +14,7 @@ import six import zope.component from cryptography.exceptions import InvalidSignature from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives import hashes # type: ignore from cryptography.hazmat.primitives.asymmetric.ec import ECDSA from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15 diff --git a/certbot/ocsp.py b/certbot/ocsp.py index 9f5cf6d6b..89ca2a60f 100644 --- a/certbot/ocsp.py +++ b/certbot/ocsp.py @@ -24,15 +24,9 @@ class OCSPBase(object): :returns: (OCSP server URL or None, OCSP server host or None) """ - url = None - cert = crypto_util.load_cert(cert_path) - ocsp_authinfo = cert.extensions.get_extension_for_oid( - ExtensionOID.AUTHORITY_INFORMATION_ACCESS) - for obj in ocsp_authinfo.value: - if obj.access_method == AuthorityInformationAccessOID.OCSP: - url = obj.access_location.value - - url = url.strip() + url = self._ocsp_host_from_cert(cert_path) + if url: + url = url.strip() host = url.partition("://")[2].rstrip("/") if host: return url, host @@ -40,6 +34,17 @@ class OCSPBase(object): logger.info("Cannot process OCSP host from URL (%s) in cert at %s", url, cert_path) return None, None + def _ocsp_host_from_cert(self, cert_path): + """Helper method for determine_ocsp_server to read the actual OCSP + server information from a certificate file""" + cert = crypto_util.load_cert(cert_path) + ocsp_authinfo = cert.extensions.get_extension_for_oid( + ExtensionOID.AUTHORITY_INFORMATION_ACCESS) + for obj in ocsp_authinfo.value: + if obj.access_method == AuthorityInformationAccessOID.OCSP: + return obj.access_location.value + return None + def _request_status(self, response): """Checks that the OCSP response was successful from the text output""" pattern = re.compile(r"OCSP Response Status: (.*)\n") diff --git a/certbot/tests/ocsp_test.py b/certbot/tests/ocsp_test.py index 2d54274f0..237aa5b49 100644 --- a/certbot/tests/ocsp_test.py +++ b/certbot/tests/ocsp_test.py @@ -73,20 +73,16 @@ class OCSPTest(unittest.TestCase): @mock.patch('certbot.ocsp.logger.info') - @mock.patch('certbot.util.run_script') - def test_determine_ocsp_server(self, mock_run, mock_info): + @mock.patch('certbot.ocsp.RevocationChecker._ocsp_host_from_cert') + def test_determine_ocsp_server(self, mock_host, mock_info): uri = "http://ocsp.stg-int-x1.letsencrypt.org/" host = "ocsp.stg-int-x1.letsencrypt.org" - mock_run.return_value = uri, "" + mock_host.return_value = uri self.assertEqual(self.checker.determine_ocsp_server("beep"), (uri, host)) - mock_run.return_value = "ftp:/" + host + "/", "" + mock_host.return_value = "ftp:/" + host + "/" self.assertEqual(self.checker.determine_ocsp_server("beep"), (None, None)) self.assertEqual(mock_info.call_count, 1) - c = "confusion" - mock_run.side_effect = errors.SubprocessError(c) - self.assertEqual(self.checker.determine_ocsp_server("beep"), (None, None)) - @mock.patch('certbot.ocsp.logger') @mock.patch('certbot.util.run_script') def test_translate_ocsp(self, mock_run, mock_log): @@ -111,7 +107,6 @@ class OCSPTest(unittest.TestCase): self.assertEqual(ocsp._translate_ocsp_query(*openssl_expired_ocsp_revoked), True) self.assertEqual(mock_log.info.call_count, 1) - # pylint: disable=line-too-long openssl_confused = ("", """ /etc/letsencrypt/live/example.org/cert.pem: good