mirror of
https://github.com/certbot/certbot.git
synced 2026-06-17 12:39:39 -04:00
Finalize tests
This commit is contained in:
parent
118bf1d930
commit
c20c722acf
5 changed files with 77 additions and 28 deletions
|
|
@ -2562,7 +2562,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
|
|||
db.close()
|
||||
else:
|
||||
logger.warning("Encountered an issue while trying to prefetch OCSP "
|
||||
"response for certificate: ", cert_path)
|
||||
"response for certificate: %s", cert_path)
|
||||
|
||||
def _ocsp_response_dbm(self, workfile):
|
||||
"""Creates a dbm entry for OCSP response data
|
||||
|
|
|
|||
|
|
@ -1,14 +1,11 @@
|
|||
"""Test for certbot_apache.configurator OCSP Prefetching functionality"""
|
||||
import dbm
|
||||
import os
|
||||
import re
|
||||
import unittest
|
||||
import mock
|
||||
# six is used in mock.patch()
|
||||
import six # pylint: disable=unused-import
|
||||
|
||||
from certbot import errors, storage
|
||||
from certbot_apache import constants
|
||||
from certbot_apache.tests import util
|
||||
|
||||
|
||||
|
|
@ -40,7 +37,7 @@ class OCSPPrefetchTest(util.ApacheTest):
|
|||
cry_path = "certbot.crypto_util.cert_sha1_fingerprint"
|
||||
|
||||
with mock.patch(ver_path) as mock_ver:
|
||||
mock_ver.return_value = (2,4,10)
|
||||
mock_ver.return_value = (2, 4, 10)
|
||||
with mock.patch(cry_path) as mock_cry:
|
||||
mock_cry.return_value = 'j\x056\x1f\xfa\x08B\xe8D\xa1Bn\xeb*A\xebWx\xdd\xfe'
|
||||
return func(*args, **kwargs)
|
||||
|
|
@ -54,14 +51,13 @@ class OCSPPrefetchTest(util.ApacheTest):
|
|||
self.config.parser.modules.discard("mod_header.c")
|
||||
|
||||
ref_path = "certbot_apache.configurator.ApacheConfigurator._ocsp_refresh"
|
||||
|
||||
|
||||
with mock.patch(ref_path):
|
||||
self.call_mocked(self.config.enable_ocsp_prefetch,
|
||||
self.lineage,
|
||||
["ocspvhost.com"])
|
||||
self.assertTrue(mock_enable.called)
|
||||
self.assertEquals(len(self.config._ocsp_prefetch), 1)
|
||||
|
||||
@mock.patch("certbot_apache.constants.OCSP_INTERNAL_TTL", 0)
|
||||
@mock.patch("certbot_apache.configurator.ApacheConfigurator.restart")
|
||||
def test_ocsp_prefetch_refresh(self, _mock_restart):
|
||||
|
|
@ -86,6 +82,25 @@ class OCSPPrefetchTest(util.ApacheTest):
|
|||
self.call_mocked(self.config.update_ocsp_prefetch, None)
|
||||
self.assertTrue(mock_ocsp.called)
|
||||
|
||||
@mock.patch("certbot_apache.configurator.ApacheConfigurator.restart")
|
||||
def test_ocsp_prefetch_refresh_noop(self, _mock_restart):
|
||||
def ocsp_req_mock(workfile):
|
||||
"""Method to mock the OCSP request and write response to file"""
|
||||
with open(workfile, 'w') as fh:
|
||||
fh.write("MOCKRESPONSE")
|
||||
return True
|
||||
|
||||
ocsp_path = "certbot.ocsp.OCSPResponseHandler.ocsp_request_to_file"
|
||||
with mock.patch(ocsp_path, side_effect=ocsp_req_mock):
|
||||
self.call_mocked(self.config.enable_ocsp_prefetch,
|
||||
self.lineage,
|
||||
["ocspvhost.com"])
|
||||
self.assertEquals(len(self.config._ocsp_prefetch), 1)
|
||||
refresh_path = "certbot_apache.configurator.ApacheConfigurator._ocsp_refresh"
|
||||
with mock.patch(refresh_path) as mock_refresh:
|
||||
self.call_mocked(self.config.update_ocsp_prefetch, None)
|
||||
self.assertFalse(mock_refresh.called)
|
||||
|
||||
@mock.patch("certbot_apache.configurator.ApacheConfigurator.config_test")
|
||||
@mock.patch("certbot_apache.configurator.ApacheConfigurator._reload")
|
||||
def test_ocsp_prefetch_backup_db(self, mock_reload, _mock_test):
|
||||
|
|
@ -97,20 +112,54 @@ class OCSPPrefetchTest(util.ApacheTest):
|
|||
self.assertFalse(os.path.isfile(db_path))
|
||||
mock_reload.side_effect = ocsp_del_db
|
||||
|
||||
db_path = os.path.join(self.config_dir, "ocsp", "ocsp_cache.db")
|
||||
self.config._ensure_ocsp_dirs()
|
||||
odbm = dbm.open(db_path[:-3], 'c')
|
||||
odbm["mock_key"] = "mock_value"
|
||||
odbm.close()
|
||||
|
||||
# Mock OCSP prefetch dict to signify that there should be a db
|
||||
self.config._ocsp_prefetch = {"mock":"value"}
|
||||
self.config._ocsp_prefetch = {"mock": "value"}
|
||||
self.config.restart()
|
||||
|
||||
odbm = dbm.open(db_path[:-3], 'c')
|
||||
self.assertEquals(odbm["mock_key"], "mock_value")
|
||||
odbm.close()
|
||||
|
||||
@mock.patch("certbot_apache.configurator.ApacheConfigurator.config_test")
|
||||
@mock.patch("certbot_apache.configurator.ApacheConfigurator._reload")
|
||||
def test_ocsp_prefetch_backup_db_error(self, _mock_reload, _mock_test):
|
||||
self.config._ensure_ocsp_dirs()
|
||||
log_path = "certbot_apache.configurator.logger.debug"
|
||||
log_string = "Encountered an issue while trying to backup OCSP dbm file"
|
||||
log_string2 = "Encountered an issue when trying to restore OCSP dbm file"
|
||||
self.config._ocsp_prefetch = {"mock": "value"}
|
||||
with mock.patch("shutil.copy2", side_effect=IOError):
|
||||
with mock.patch(log_path) as mock_log:
|
||||
self.config.restart()
|
||||
self.assertTrue(mock_log.called)
|
||||
self.assertEquals(mock_log.call_count, 2)
|
||||
self.assertTrue(log_string in mock_log.call_args_list[0][0][0])
|
||||
self.assertTrue(log_string2 in mock_log.call_args_list[1][0][0])
|
||||
|
||||
@mock.patch("certbot_apache.configurator.ApacheConfigurator.restart")
|
||||
def test_ocsp_prefetch_refresh_fail(self, _mock_restart):
|
||||
ocsp_path = "certbot.ocsp.OCSPResponseHandler.ocsp_request_to_file"
|
||||
log_path = "certbot_apache.configurator.logger.warning"
|
||||
with mock.patch(ocsp_path) as mock_ocsp:
|
||||
mock_ocsp.return_value = False
|
||||
with mock.patch(log_path) as mock_log:
|
||||
self.call_mocked(self.config.enable_ocsp_prefetch,
|
||||
self.lineage,
|
||||
["ocspvhost.com"])
|
||||
self.assertTrue(mock_log.called)
|
||||
self.assertTrue(
|
||||
"trying to prefetch OCSP" in mock_log.call_args[0][0])
|
||||
|
||||
@mock.patch("certbot_apache.configurator.ApacheConfigurator._ocsp_refresh_if_needed")
|
||||
def test_ocsp_prefetch_update_noop(self, mock_refresh):
|
||||
self.config.update_ocsp_prefetch(None)
|
||||
self.assertFalse(mock_refresh.called)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main() # pragma: no cover
|
||||
|
|
@ -14,7 +14,7 @@ import six
|
|||
import zope.component
|
||||
from cryptography.exceptions import InvalidSignature
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives import hashes # type: ignore
|
||||
from cryptography.hazmat.primitives.asymmetric.ec import ECDSA
|
||||
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey
|
||||
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
|
||||
|
|
|
|||
|
|
@ -24,15 +24,9 @@ class OCSPBase(object):
|
|||
:returns: (OCSP server URL or None, OCSP server host or None)
|
||||
|
||||
"""
|
||||
url = None
|
||||
cert = crypto_util.load_cert(cert_path)
|
||||
ocsp_authinfo = cert.extensions.get_extension_for_oid(
|
||||
ExtensionOID.AUTHORITY_INFORMATION_ACCESS)
|
||||
for obj in ocsp_authinfo.value:
|
||||
if obj.access_method == AuthorityInformationAccessOID.OCSP:
|
||||
url = obj.access_location.value
|
||||
|
||||
url = url.strip()
|
||||
url = self._ocsp_host_from_cert(cert_path)
|
||||
if url:
|
||||
url = url.strip()
|
||||
host = url.partition("://")[2].rstrip("/")
|
||||
if host:
|
||||
return url, host
|
||||
|
|
@ -40,6 +34,17 @@ class OCSPBase(object):
|
|||
logger.info("Cannot process OCSP host from URL (%s) in cert at %s", url, cert_path)
|
||||
return None, None
|
||||
|
||||
def _ocsp_host_from_cert(self, cert_path):
|
||||
"""Helper method for determine_ocsp_server to read the actual OCSP
|
||||
server information from a certificate file"""
|
||||
cert = crypto_util.load_cert(cert_path)
|
||||
ocsp_authinfo = cert.extensions.get_extension_for_oid(
|
||||
ExtensionOID.AUTHORITY_INFORMATION_ACCESS)
|
||||
for obj in ocsp_authinfo.value:
|
||||
if obj.access_method == AuthorityInformationAccessOID.OCSP:
|
||||
return obj.access_location.value
|
||||
return None
|
||||
|
||||
def _request_status(self, response):
|
||||
"""Checks that the OCSP response was successful from the text output"""
|
||||
pattern = re.compile(r"OCSP Response Status: (.*)\n")
|
||||
|
|
|
|||
|
|
@ -73,20 +73,16 @@ class OCSPTest(unittest.TestCase):
|
|||
|
||||
|
||||
@mock.patch('certbot.ocsp.logger.info')
|
||||
@mock.patch('certbot.util.run_script')
|
||||
def test_determine_ocsp_server(self, mock_run, mock_info):
|
||||
@mock.patch('certbot.ocsp.RevocationChecker._ocsp_host_from_cert')
|
||||
def test_determine_ocsp_server(self, mock_host, mock_info):
|
||||
uri = "http://ocsp.stg-int-x1.letsencrypt.org/"
|
||||
host = "ocsp.stg-int-x1.letsencrypt.org"
|
||||
mock_run.return_value = uri, ""
|
||||
mock_host.return_value = uri
|
||||
self.assertEqual(self.checker.determine_ocsp_server("beep"), (uri, host))
|
||||
mock_run.return_value = "ftp:/" + host + "/", ""
|
||||
mock_host.return_value = "ftp:/" + host + "/"
|
||||
self.assertEqual(self.checker.determine_ocsp_server("beep"), (None, None))
|
||||
self.assertEqual(mock_info.call_count, 1)
|
||||
|
||||
c = "confusion"
|
||||
mock_run.side_effect = errors.SubprocessError(c)
|
||||
self.assertEqual(self.checker.determine_ocsp_server("beep"), (None, None))
|
||||
|
||||
@mock.patch('certbot.ocsp.logger')
|
||||
@mock.patch('certbot.util.run_script')
|
||||
def test_translate_ocsp(self, mock_run, mock_log):
|
||||
|
|
@ -111,7 +107,6 @@ class OCSPTest(unittest.TestCase):
|
|||
self.assertEqual(ocsp._translate_ocsp_query(*openssl_expired_ocsp_revoked), True)
|
||||
self.assertEqual(mock_log.info.call_count, 1)
|
||||
|
||||
|
||||
# pylint: disable=line-too-long
|
||||
openssl_confused = ("", """
|
||||
/etc/letsencrypt/live/example.org/cert.pem: good
|
||||
|
|
|
|||
Loading…
Reference in a new issue