diff --git a/certbot-apache/certbot_apache/_internal/configurator.py b/certbot-apache/certbot_apache/_internal/configurator.py index 2d97f2135..bafecf533 100644 --- a/certbot-apache/certbot_apache/_internal/configurator.py +++ b/certbot-apache/certbot_apache/_internal/configurator.py @@ -5,7 +5,6 @@ import copy import fnmatch import logging import re -import shutil import socket import time @@ -23,7 +22,6 @@ from acme.magic_typing import Union # pylint: disable=unused-import, no-name-in from certbot import errors from certbot import interfaces from certbot import util -from certbot._internal import ocsp from certbot.achallenges import KeyAuthorizationAnnotatedChallenge # pylint: disable=unused-import from certbot.compat import filesystem @@ -197,8 +195,6 @@ class ApacheConfigurator(common.Installer): self._enhanced_vhosts = defaultdict(set) # type: DefaultDict[str, Set[obj.VirtualHost]] # Temporary state for AutoHSTS enhancement self._autohsts = {} # type: Dict[str, Dict[str, Union[int, float]]] - self._ocsp_prefetch = {} # type: Dict[str, str] - self._ocsp_dbm_bsddb = False # Reverter save notes self.save_notes = "" @@ -2205,20 +2201,8 @@ class ApacheConfigurator(common.Installer): """ self.config_test() - - if not self._ocsp_prefetch: - # Try to populate OCSP prefetch structure from pluginstorage - self._ocsp_prefetch_fetch_state() - if self._ocsp_prefetch: - # OCSP prefetching is enabled, so back up the db - self._ocsp_prefetch_backup_db() - self._reload() - if self._ocsp_prefetch: - # Restore the backed up dbm database - self._ocsp_prefetch_restore_db() - def _reload(self): """Reloads the Apache server. @@ -2403,226 +2387,6 @@ class ApacheConfigurator(common.Installer): # Save the current state to pluginstorage self._autohsts_save_state() - def _ensure_ocsp_dirs(self): - """Makes sure that the OCSP directory paths exist.""" - ocsp_work = os.path.join(self.config.work_dir, "ocsp") - ocsp_save = os.path.join(self.config.config_dir, "ocsp") - for path in [ocsp_work, ocsp_save]: - if not os.path.isdir(path): - filesystem.makedirs(path) - filesystem.chmod(path, 0o755) - - def _ensure_ocsp_prefetch_compatibility(self): - """Make sure that the operating system supports the required libraries - to manage Apache DBM files. - - :raises: errors.NotSupportedError - """ - try: - import bsddb # pylint: disable=unused-variable - except ImportError: - import dbm - if not hasattr(dbm, 'ndbm') or dbm.ndbm.library != 'Berkeley DB': # pylint: disable=no-member - msg = ("Unfortunately your operating system does not have a " - "compatible database module available for managing " - "Apache OCSP stapling cache database.") - raise errors.NotSupportedError(msg) - - def _ocsp_dbm_open(self, filepath): - """Helper method to open an DBM file in a way that depends on the platform - that Certbot is run on. Returns an open database structure.""" - - if not os.path.isfile(filepath+".db"): - raise errors.PluginError( - "The OCSP stapling cache DBM file wasn't created by Apache.") - try: - import bsddb - self._ocsp_dbm_bsddb = True - cache_path = filepath + ".db" - try: - database = bsddb.hashopen(cache_path, 'w') - except Exception: - raise errors.PluginError("Unable to open dbm database file.") - except ImportError: - # Python3 doesn't have bsddb module, so we use dbm.ndbm instead - import dbm - try: - database = dbm.ndbm.open(filepath, 'w') # pylint: disable=no-member - except Exception: - # This is raised if a file cannot be found - raise errors.PluginError("Unable to open dbm database file.") - return database - - def _ocsp_dbm_close(self, database): - """Helper method to sync and close a DBM file, in a way required by the - used dbm implementation.""" - if self._ocsp_dbm_bsddb: - database.sync() - database.close() - else: - database.close() - - def _ocsp_refresh_if_needed(self, pf_obj): - """Refreshes OCSP response for a certiifcate if it's due - - :param dict pf_obj: OCSP prefetch object from pluginstorage - - :returns: If OCSP response was updated - :rtype: bool - - """ - ttl = pf_obj["lastupdate"] + constants.OCSP_INTERNAL_TTL - if ttl < time.time(): - self._ocsp_refresh(pf_obj["cert_path"], pf_obj["chain_path"]) - return True - return False - - - def _ocsp_refresh(self, cert_path, chain_path): - """Refresh the OCSP response for a certificate - - :param str cert_path: Filesystem path to certificate file - :param str chain_path: Filesystem path to certificate chain file - - """ - - self._ensure_ocsp_dirs() - ocsp_workfile = os.path.join( - self.config.work_dir, "ocsp", - apache_util.certid_sha1_hex(cert_path)) - handler = ocsp.RevocationChecker() - if not handler.revoked(cert_path, chain_path, ocsp_workfile): - # Guaranteed good response - cache_path = os.path.join(self.config.config_dir, "ocsp", "ocsp_cache") - # dbm.open automatically adds the file extension, it will be - db = self._ocsp_dbm_open(cache_path) - cert_sha = apache_util.certid_sha1(cert_path) - db[cert_sha] = self._ocsp_response_dbm(ocsp_workfile) - self._ocsp_dbm_close(db) - else: - logger.warning("Encountered an issue while trying to prefetch OCSP " - "response for certificate: %s", cert_path) - - def _ocsp_response_dbm(self, workfile): - """Creates a dbm entry for OCSP response data - - :param str workfile: File path for raw OCSP response - - :returns: OCSP response cache data that Apache can use - :rtype: string - - """ - - with open(workfile, 'rb') as fh: - response = fh.read() - ttl = constants.OCSP_APACHE_TTL - return apache_util.get_apache_ocsp_struct(ttl, response) - - def _ocsp_prefetch_save(self, cert_path, chain_path): - """Saves status of current OCSP prefetch, including the last update - time to determine if an update is needed on later run. - - :param str cert_path: Filesystem path to certificate - :param str chain_path: Filesystem path to certificate chain file - - """ - status = { - "lastupdate": time.time(), - "cert_path": cert_path, - "chain_path": chain_path - } - cert_id = apache_util.certid_sha1_hex(cert_path) - self._ocsp_prefetch[cert_id] = status - self.storage.put("ocsp_prefetch", self._ocsp_prefetch) - self.storage.save() - - def _ocsp_prefetch_fetch_state(self): - """ - Populates the OCSP prefetch state from the pluginstorage. - """ - try: - self._ocsp_prefetch = self.storage.fetch("ocsp_prefetch") - except KeyError: - self._ocsp_prefetch = dict() - - def _ocsp_prefetch_backup_db(self): - """ - Copies the active dbm file to work directory. - """ - self._ensure_ocsp_dirs() - cache_path = os.path.join(self.config.config_dir, "ocsp", "ocsp_cache.db") - try: - shutil.copy2(cache_path, os.path.join(self.config.work_dir, "ocsp")) - except IOError: - logger.debug("Encountered an issue while trying to backup OCSP dbm file") - - def _ocsp_prefetch_restore_db(self): - """ - Restores the active dbm file from work directory. - """ - self._ensure_ocsp_dirs() - cache_path = os.path.join(self.config.config_dir, "ocsp", "ocsp_cache.db") - work_file_path = os.path.join(self.config.work_dir, "ocsp", "ocsp_cache.db") - try: - shutil.copy2(work_file_path, cache_path) - except IOError: - logger.debug("Encountered an issue when trying to restore OCSP dbm file") - - def enable_ocsp_prefetch(self, lineage, domains): - """Enable OCSP Stapling and prefetching of the responses. - - In OCSP, each client (e.g. browser) would have to query the - OCSP Responder to validate that the site certificate was not revoked. - - Enabling OCSP Stapling, would allow the web-server to query the OCSP - Responder, and staple its response to the offered certificate during - TLS. i.e. clients would not have to query the OCSP responder. - - """ - - # Fail early if we are not able to support this - self._ensure_ocsp_prefetch_compatibility() - prefetch_vhosts = set() - for domain in domains: - matched_vhosts = self.choose_vhosts(domain, create_if_no_ssl=False) - # We should be handling only SSL vhosts - for vh in matched_vhosts: - if vh.ssl: - prefetch_vhosts.add(vh) - - if prefetch_vhosts: - for vh in prefetch_vhosts: - self._enable_ocsp_stapling(vh, None, prefetch=True) - self.restart() - # Ensure Apache has enough time to properly restart and create the file - time.sleep(2) - try: - self._ocsp_refresh(lineage.cert_path, lineage.chain_path) - self._ocsp_prefetch_save(lineage.cert_path, lineage.chain_path) - self.save("Enabled OCSP prefetching") - except errors.PluginError as err: - # Revert the OCSP prefetch configuration - self.recovery_routine() - self.restart() - msg = ("Encountered an error while trying to enable OCSP prefetch " - "enhancement: %s.\nOCSP prefetch was not enabled.") - raise errors.PluginError(msg % str(err)) - - def update_ocsp_prefetch(self, _unused_lineage): - """Checks all certificates that are managed by OCSP prefetch, and - refreshes OCSP responses for them if required.""" - - self._ocsp_prefetch_fetch_state() - if not self._ocsp_prefetch: - # No OCSP prefetching enabled for any certificate - return - - for _, pf in self._ocsp_prefetch.items(): - if self._ocsp_refresh_if_needed(pf): - # Save the status to pluginstorage - self._ocsp_prefetch_save(pf["cert_path"], pf["chain_path"]) - - def _enable_autohsts_domain(self, ssl_vhost): """Do the initial AutoHSTS deployment to a vhost diff --git a/certbot-apache/certbot_apache/_internal/override_debian.py b/certbot-apache/certbot_apache/_internal/override_debian.py index 0527a66ff..fcc61ff2b 100644 --- a/certbot-apache/certbot_apache/_internal/override_debian.py +++ b/certbot-apache/certbot_apache/_internal/override_debian.py @@ -13,12 +13,13 @@ from certbot.plugins.enhancements import OCSPPrefetchEnhancement from certbot_apache._internal import apache_util from certbot_apache._internal import configurator +from certbot_apache._internal import prefetch_ocsp logger = logging.getLogger(__name__) @zope.interface.provider(interfaces.IPluginFactory) -class DebianConfigurator(configurator.ApacheConfigurator): +class DebianConfigurator(prefetch_ocsp.OCSPPrefetchMixin, configurator.ApacheConfigurator): """Debian specific ApacheConfigurator override class""" OS_DEFAULTS = dict( @@ -145,5 +146,27 @@ class DebianConfigurator(configurator.ApacheConfigurator): temp, [self.option("dismod"), "-f", mod_name]) util.run_script([self.option("enmod"), mod_name]) + def restart(self): + """Runs a config test and reloads the Apache server. + + :raises .errors.MisconfigurationError: If either the config test + or reload fails. + + """ + self.config_test() + + if not self._ocsp_prefetch: + # Try to populate OCSP prefetch structure from pluginstorage + self._ocsp_prefetch_fetch_state() + if self._ocsp_prefetch: + # OCSP prefetching is enabled, so back up the db + self._ocsp_prefetch_backup_db() + + self._reload() + + if self._ocsp_prefetch: + # Restore the backed up dbm database + self._ocsp_prefetch_restore_db() + OCSPPrefetchEnhancement.register(DebianConfigurator) # pylint: disable=no-member diff --git a/certbot-apache/certbot_apache/_internal/prefetch_ocsp.py b/certbot-apache/certbot_apache/_internal/prefetch_ocsp.py new file mode 100644 index 000000000..a770062f1 --- /dev/null +++ b/certbot-apache/certbot_apache/_internal/prefetch_ocsp.py @@ -0,0 +1,246 @@ +"""A mixin class for OCSP response prefetching for Apache plugin""" +import logging +import shutil +import time + +from acme.magic_typing import Dict # pylint: disable=unused-import, no-name-in-module + +from certbot import errors +from certbot._internal import ocsp + +from certbot.compat import filesystem +from certbot.compat import os +from certbot_apache._internal import apache_util +from certbot_apache._internal import constants + +logger = logging.getLogger(__name__) + +class OCSPPrefetchMixin(object): + """OCSPPrefetchMixin implements OCSP response prefetching""" + + def __init__(self, *args, **kwargs): + self._ocsp_prefetch = {} # type: Dict[str, str] + self._ocsp_dbm_bsddb = False + # This is required because of python super() call chain + # mypy isn't able to figure the chain out and needs to be + # disabled for this line. + super(OCSPPrefetchMixin, self).__init__(*args, **kwargs) # type: ignore + + def _ensure_ocsp_dirs(self): + """Makes sure that the OCSP directory paths exist.""" + ocsp_work = os.path.join(self.config.work_dir, "ocsp") + ocsp_save = os.path.join(self.config.config_dir, "ocsp") + for path in [ocsp_work, ocsp_save]: + if not os.path.isdir(path): + filesystem.makedirs(path) + filesystem.chmod(path, 0o755) + + def _ensure_ocsp_prefetch_compatibility(self): + """Make sure that the operating system supports the required libraries + to manage Apache DBM files. + + :raises: errors.NotSupportedError + """ + try: + import bsddb # pylint: disable=unused-variable + except ImportError: + import dbm + if not hasattr(dbm, 'ndbm') or dbm.ndbm.library != 'Berkeley DB': # pylint: disable=no-member + msg = ("Unfortunately your operating system does not have a " + "compatible database module available for managing " + "Apache OCSP stapling cache database.") + raise errors.NotSupportedError(msg) + + def _ocsp_dbm_open(self, filepath): + """Helper method to open an DBM file in a way that depends on the platform + that Certbot is run on. Returns an open database structure.""" + + if not os.path.isfile(filepath+".db"): + raise errors.PluginError( + "The OCSP stapling cache DBM file wasn't created by Apache.") + try: + import bsddb + self._ocsp_dbm_bsddb = True + cache_path = filepath + ".db" + try: + database = bsddb.hashopen(cache_path, 'w') + except Exception: + raise errors.PluginError("Unable to open dbm database file.") + except ImportError: + # Python3 doesn't have bsddb module, so we use dbm.ndbm instead + import dbm + try: + database = dbm.ndbm.open(filepath, 'w') # pylint: disable=no-member + except Exception: + # This is raised if a file cannot be found + raise errors.PluginError("Unable to open dbm database file.") + return database + + def _ocsp_dbm_close(self, database): + """Helper method to sync and close a DBM file, in a way required by the + used dbm implementation.""" + if self._ocsp_dbm_bsddb: + database.sync() + database.close() + else: + database.close() + + def _ocsp_refresh_if_needed(self, pf_obj): + """Refreshes OCSP response for a certiifcate if it's due + + :param dict pf_obj: OCSP prefetch object from pluginstorage + + :returns: If OCSP response was updated + :rtype: bool + + """ + ttl = pf_obj["lastupdate"] + constants.OCSP_INTERNAL_TTL + if ttl < time.time(): + self._ocsp_refresh(pf_obj["cert_path"], pf_obj["chain_path"]) + return True + return False + + + def _ocsp_refresh(self, cert_path, chain_path): + """Refresh the OCSP response for a certificate + + :param str cert_path: Filesystem path to certificate file + :param str chain_path: Filesystem path to certificate chain file + + """ + + self._ensure_ocsp_dirs() + ocsp_workfile = os.path.join( + self.config.work_dir, "ocsp", + apache_util.certid_sha1_hex(cert_path)) + handler = ocsp.RevocationChecker() + if not handler.revoked(cert_path, chain_path, ocsp_workfile): + # Guaranteed good response + cache_path = os.path.join(self.config.config_dir, "ocsp", "ocsp_cache") + # dbm.open automatically adds the file extension, it will be + db = self._ocsp_dbm_open(cache_path) + cert_sha = apache_util.certid_sha1(cert_path) + db[cert_sha] = self._ocsp_response_dbm(ocsp_workfile) + self._ocsp_dbm_close(db) + else: + logger.warning("Encountered an issue while trying to prefetch OCSP " + "response for certificate: %s", cert_path) + + def _ocsp_response_dbm(self, workfile): + """Creates a dbm entry for OCSP response data + + :param str workfile: File path for raw OCSP response + + :returns: OCSP response cache data that Apache can use + :rtype: string + + """ + + with open(workfile, 'rb') as fh: + response = fh.read() + ttl = constants.OCSP_APACHE_TTL + return apache_util.get_apache_ocsp_struct(ttl, response) + + def _ocsp_prefetch_save(self, cert_path, chain_path): + """Saves status of current OCSP prefetch, including the last update + time to determine if an update is needed on later run. + + :param str cert_path: Filesystem path to certificate + :param str chain_path: Filesystem path to certificate chain file + + """ + status = { + "lastupdate": time.time(), + "cert_path": cert_path, + "chain_path": chain_path + } + cert_id = apache_util.certid_sha1_hex(cert_path) + self._ocsp_prefetch[cert_id] = status + self.storage.put("ocsp_prefetch", self._ocsp_prefetch) + self.storage.save() + + def _ocsp_prefetch_fetch_state(self): + """ + Populates the OCSP prefetch state from the pluginstorage. + """ + try: + self._ocsp_prefetch = self.storage.fetch("ocsp_prefetch") + except KeyError: + self._ocsp_prefetch = dict() + + def _ocsp_prefetch_backup_db(self): + """ + Copies the active dbm file to work directory. + """ + self._ensure_ocsp_dirs() + cache_path = os.path.join(self.config.config_dir, "ocsp", "ocsp_cache.db") + try: + shutil.copy2(cache_path, os.path.join(self.config.work_dir, "ocsp")) + except IOError: + logger.debug("Encountered an issue while trying to backup OCSP dbm file") + + def _ocsp_prefetch_restore_db(self): + """ + Restores the active dbm file from work directory. + """ + self._ensure_ocsp_dirs() + cache_path = os.path.join(self.config.config_dir, "ocsp", "ocsp_cache.db") + work_file_path = os.path.join(self.config.work_dir, "ocsp", "ocsp_cache.db") + try: + shutil.copy2(work_file_path, cache_path) + except IOError: + logger.debug("Encountered an issue when trying to restore OCSP dbm file") + + def enable_ocsp_prefetch(self, lineage, domains): + """Enable OCSP Stapling and prefetching of the responses. + + In OCSP, each client (e.g. browser) would have to query the + OCSP Responder to validate that the site certificate was not revoked. + + Enabling OCSP Stapling, would allow the web-server to query the OCSP + Responder, and staple its response to the offered certificate during + TLS. i.e. clients would not have to query the OCSP responder. + + """ + + # Fail early if we are not able to support this + self._ensure_ocsp_prefetch_compatibility() + prefetch_vhosts = set() + for domain in domains: + matched_vhosts = self.choose_vhosts(domain, create_if_no_ssl=False) + # We should be handling only SSL vhosts + for vh in matched_vhosts: + if vh.ssl: + prefetch_vhosts.add(vh) + + if prefetch_vhosts: + for vh in prefetch_vhosts: + self._enable_ocsp_stapling(vh, None, prefetch=True) + self.restart() + # Ensure Apache has enough time to properly restart and create the file + time.sleep(2) + try: + self._ocsp_refresh(lineage.cert_path, lineage.chain_path) + self._ocsp_prefetch_save(lineage.cert_path, lineage.chain_path) + self.save("Enabled OCSP prefetching") + except errors.PluginError as err: + # Revert the OCSP prefetch configuration + self.recovery_routine() + self.restart() + msg = ("Encountered an error while trying to enable OCSP prefetch " + "enhancement: %s.\nOCSP prefetch was not enabled.") + raise errors.PluginError(msg % str(err)) + + def update_ocsp_prefetch(self, _unused_lineage): + """Checks all certificates that are managed by OCSP prefetch, and + refreshes OCSP responses for them if required.""" + + self._ocsp_prefetch_fetch_state() + if not self._ocsp_prefetch: + # No OCSP prefetching enabled for any certificate + return + + for _, pf in self._ocsp_prefetch.items(): + if self._ocsp_refresh_if_needed(pf): + # Save the status to pluginstorage + self._ocsp_prefetch_save(pf["cert_path"], pf["chain_path"]) diff --git a/certbot-apache/tests/ocsp_prefetch_test.py b/certbot-apache/tests/ocsp_prefetch_test.py index fd17d32ef..c55f867fa 100644 --- a/certbot-apache/tests/ocsp_prefetch_test.py +++ b/certbot-apache/tests/ocsp_prefetch_test.py @@ -111,7 +111,7 @@ class OCSPPrefetchTest(util.ApacheTest): open(db_fullpath, 'a').close() ver_path = "certbot_apache._internal.configurator.ApacheConfigurator.get_version" - res_path = "certbot_apache._internal.configurator.ApacheConfigurator.restart" + res_path = "certbot_apache._internal.override_debian.DebianConfigurator.restart" cry_path = "certbot.crypto_util.cert_sha1_fingerprint" with mock.patch(ver_path) as mock_ver: @@ -151,7 +151,7 @@ class OCSPPrefetchTest(util.ApacheTest): self.config.parser.modules.discard("headers_module") self.config.parser.modules.discard("mod_header.c") - ref_path = "certbot_apache._internal.configurator.ApacheConfigurator._ocsp_refresh" + ref_path = "certbot_apache._internal.override_debian.DebianConfigurator._ocsp_refresh" with mock.patch(ref_path): self.call_mocked_py2(self.config.enable_ocsp_prefetch, self.lineage, ["ocspvhost.com"]) @@ -160,7 +160,7 @@ class OCSPPrefetchTest(util.ApacheTest): @mock.patch("certbot_apache._internal.override_debian.DebianConfigurator.enable_mod") def test_ocsp_prefetch_enable_error(self, _mock_enable): - ref_path = "certbot_apache._internal.configurator.ApacheConfigurator._ocsp_refresh" + ref_path = "certbot_apache._internal.override_debian.DebianConfigurator._ocsp_refresh" self.config.recovery_routine = mock.MagicMock() with mock.patch(ref_path, side_effect=errors.PluginError("failed")): self.assertRaises(errors.PluginError, @@ -204,7 +204,7 @@ class OCSPPrefetchTest(util.ApacheTest): self.call_mocked_py2(self.config.enable_ocsp_prefetch, self.lineage, ["ocspvhost.com"]) self.assertEqual(len(self.config._ocsp_prefetch), 1) - refresh_path = "certbot_apache._internal.configurator.ApacheConfigurator._ocsp_refresh" + refresh_path = "certbot_apache._internal.override_debian.DebianConfigurator._ocsp_refresh" with mock.patch(refresh_path) as mock_refresh: self.call_mocked_py2(self.config.update_ocsp_prefetch, None) self.assertFalse(mock_refresh.called) @@ -236,7 +236,7 @@ class OCSPPrefetchTest(util.ApacheTest): @mock.patch("certbot_apache._internal.configurator.ApacheConfigurator.config_test") @mock.patch("certbot_apache._internal.configurator.ApacheConfigurator._reload") def test_ocsp_prefetch_backup_db_error(self, _mock_reload, _mock_test): - log_path = "certbot_apache._internal.configurator.logger.debug" + log_path = "certbot_apache._internal.prefetch_ocsp.logger.debug" log_string = "Encountered an issue while trying to backup OCSP dbm file" log_string2 = "Encountered an issue when trying to restore OCSP dbm file" self.config._ocsp_prefetch = {"mock": "value"} @@ -251,7 +251,7 @@ class OCSPPrefetchTest(util.ApacheTest): @mock.patch("certbot_apache._internal.configurator.ApacheConfigurator.restart") def test_ocsp_prefetch_refresh_fail(self, _mock_restart): ocsp_path = "certbot._internal.ocsp.RevocationChecker.revoked" - log_path = "certbot_apache._internal.configurator.logger.warning" + log_path = "certbot_apache._internal.prefetch_ocsp.logger.warning" with mock.patch(ocsp_path) as mock_ocsp: mock_ocsp.return_value = True with mock.patch(log_path) as mock_log: @@ -261,7 +261,7 @@ class OCSPPrefetchTest(util.ApacheTest): self.assertTrue( "trying to prefetch OCSP" in mock_log.call_args[0][0]) - @mock.patch("certbot_apache._internal.configurator.ApacheConfigurator._ocsp_refresh_if_needed") + @mock.patch("certbot_apache._internal.override_debian.DebianConfigurator._ocsp_refresh_if_needed") def test_ocsp_prefetch_update_noop(self, mock_refresh): self.config.update_ocsp_prefetch(None) self.assertFalse(mock_refresh.called) @@ -328,6 +328,24 @@ class OCSPPrefetchTest(util.ApacheTest): db2 = self.call_mocked_py3(self.config._ocsp_dbm_open, self.db_path) self.assertEqual(db2[b'key'], expected_val) + @mock.patch("certbot_apache._internal.prefetch_ocsp.OCSPPrefetchMixin._ocsp_prefetch_fetch_state") + @mock.patch("certbot_apache._internal.configurator.ApacheConfigurator.config_test") + @mock.patch("certbot_apache._internal.configurator.ApacheConfigurator._reload") + def test_restart_load_state_call(self, _rl, _ct, mock_fch): + self.assertFalse(self.config._ocsp_prefetch) + self.config.restart() + self.assertTrue(mock_fch.called) + + @mock.patch("certbot_apache._internal.prefetch_ocsp.OCSPPrefetchMixin._ocsp_prefetch_backup_db") + @mock.patch("certbot_apache._internal.prefetch_ocsp.OCSPPrefetchMixin._ocsp_prefetch_restore_db") + @mock.patch("certbot_apache._internal.configurator.ApacheConfigurator.config_test") + @mock.patch("certbot_apache._internal.configurator.ApacheConfigurator._reload") + def test_restart_backupdb_call(self, _rl, _ctest, mock_rest, mock_bck): + self.config._ocsp_prefetch = True + self.config.restart() + self.assertTrue(mock_rest.called) + self.assertTrue(mock_bck.called) + if __name__ == "__main__": unittest.main() # pragma: no cover