mirror of
https://github.com/certbot/certbot.git
synced 2026-06-07 07:42:08 -04:00
Move the OCSP prefetch functionality to a mixin class
This commit is contained in:
parent
11fce9a870
commit
fa8a68d45f
4 changed files with 295 additions and 244 deletions
|
|
@ -5,7 +5,6 @@ import copy
|
|||
import fnmatch
|
||||
import logging
|
||||
import re
|
||||
import shutil
|
||||
import socket
|
||||
import time
|
||||
|
||||
|
|
@ -23,7 +22,6 @@ from acme.magic_typing import Union # pylint: disable=unused-import, no-name-in
|
|||
from certbot import errors
|
||||
from certbot import interfaces
|
||||
from certbot import util
|
||||
from certbot._internal import ocsp
|
||||
|
||||
from certbot.achallenges import KeyAuthorizationAnnotatedChallenge # pylint: disable=unused-import
|
||||
from certbot.compat import filesystem
|
||||
|
|
@ -197,8 +195,6 @@ class ApacheConfigurator(common.Installer):
|
|||
self._enhanced_vhosts = defaultdict(set) # type: DefaultDict[str, Set[obj.VirtualHost]]
|
||||
# Temporary state for AutoHSTS enhancement
|
||||
self._autohsts = {} # type: Dict[str, Dict[str, Union[int, float]]]
|
||||
self._ocsp_prefetch = {} # type: Dict[str, str]
|
||||
self._ocsp_dbm_bsddb = False
|
||||
# Reverter save notes
|
||||
self.save_notes = ""
|
||||
|
||||
|
|
@ -2205,20 +2201,8 @@ class ApacheConfigurator(common.Installer):
|
|||
|
||||
"""
|
||||
self.config_test()
|
||||
|
||||
if not self._ocsp_prefetch:
|
||||
# Try to populate OCSP prefetch structure from pluginstorage
|
||||
self._ocsp_prefetch_fetch_state()
|
||||
if self._ocsp_prefetch:
|
||||
# OCSP prefetching is enabled, so back up the db
|
||||
self._ocsp_prefetch_backup_db()
|
||||
|
||||
self._reload()
|
||||
|
||||
if self._ocsp_prefetch:
|
||||
# Restore the backed up dbm database
|
||||
self._ocsp_prefetch_restore_db()
|
||||
|
||||
def _reload(self):
|
||||
"""Reloads the Apache server.
|
||||
|
||||
|
|
@ -2403,226 +2387,6 @@ class ApacheConfigurator(common.Installer):
|
|||
# Save the current state to pluginstorage
|
||||
self._autohsts_save_state()
|
||||
|
||||
def _ensure_ocsp_dirs(self):
|
||||
"""Makes sure that the OCSP directory paths exist."""
|
||||
ocsp_work = os.path.join(self.config.work_dir, "ocsp")
|
||||
ocsp_save = os.path.join(self.config.config_dir, "ocsp")
|
||||
for path in [ocsp_work, ocsp_save]:
|
||||
if not os.path.isdir(path):
|
||||
filesystem.makedirs(path)
|
||||
filesystem.chmod(path, 0o755)
|
||||
|
||||
def _ensure_ocsp_prefetch_compatibility(self):
|
||||
"""Make sure that the operating system supports the required libraries
|
||||
to manage Apache DBM files.
|
||||
|
||||
:raises: errors.NotSupportedError
|
||||
"""
|
||||
try:
|
||||
import bsddb # pylint: disable=unused-variable
|
||||
except ImportError:
|
||||
import dbm
|
||||
if not hasattr(dbm, 'ndbm') or dbm.ndbm.library != 'Berkeley DB': # pylint: disable=no-member
|
||||
msg = ("Unfortunately your operating system does not have a "
|
||||
"compatible database module available for managing "
|
||||
"Apache OCSP stapling cache database.")
|
||||
raise errors.NotSupportedError(msg)
|
||||
|
||||
def _ocsp_dbm_open(self, filepath):
|
||||
"""Helper method to open an DBM file in a way that depends on the platform
|
||||
that Certbot is run on. Returns an open database structure."""
|
||||
|
||||
if not os.path.isfile(filepath+".db"):
|
||||
raise errors.PluginError(
|
||||
"The OCSP stapling cache DBM file wasn't created by Apache.")
|
||||
try:
|
||||
import bsddb
|
||||
self._ocsp_dbm_bsddb = True
|
||||
cache_path = filepath + ".db"
|
||||
try:
|
||||
database = bsddb.hashopen(cache_path, 'w')
|
||||
except Exception:
|
||||
raise errors.PluginError("Unable to open dbm database file.")
|
||||
except ImportError:
|
||||
# Python3 doesn't have bsddb module, so we use dbm.ndbm instead
|
||||
import dbm
|
||||
try:
|
||||
database = dbm.ndbm.open(filepath, 'w') # pylint: disable=no-member
|
||||
except Exception:
|
||||
# This is raised if a file cannot be found
|
||||
raise errors.PluginError("Unable to open dbm database file.")
|
||||
return database
|
||||
|
||||
def _ocsp_dbm_close(self, database):
|
||||
"""Helper method to sync and close a DBM file, in a way required by the
|
||||
used dbm implementation."""
|
||||
if self._ocsp_dbm_bsddb:
|
||||
database.sync()
|
||||
database.close()
|
||||
else:
|
||||
database.close()
|
||||
|
||||
def _ocsp_refresh_if_needed(self, pf_obj):
|
||||
"""Refreshes OCSP response for a certiifcate if it's due
|
||||
|
||||
:param dict pf_obj: OCSP prefetch object from pluginstorage
|
||||
|
||||
:returns: If OCSP response was updated
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
ttl = pf_obj["lastupdate"] + constants.OCSP_INTERNAL_TTL
|
||||
if ttl < time.time():
|
||||
self._ocsp_refresh(pf_obj["cert_path"], pf_obj["chain_path"])
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _ocsp_refresh(self, cert_path, chain_path):
|
||||
"""Refresh the OCSP response for a certificate
|
||||
|
||||
:param str cert_path: Filesystem path to certificate file
|
||||
:param str chain_path: Filesystem path to certificate chain file
|
||||
|
||||
"""
|
||||
|
||||
self._ensure_ocsp_dirs()
|
||||
ocsp_workfile = os.path.join(
|
||||
self.config.work_dir, "ocsp",
|
||||
apache_util.certid_sha1_hex(cert_path))
|
||||
handler = ocsp.RevocationChecker()
|
||||
if not handler.revoked(cert_path, chain_path, ocsp_workfile):
|
||||
# Guaranteed good response
|
||||
cache_path = os.path.join(self.config.config_dir, "ocsp", "ocsp_cache")
|
||||
# dbm.open automatically adds the file extension, it will be
|
||||
db = self._ocsp_dbm_open(cache_path)
|
||||
cert_sha = apache_util.certid_sha1(cert_path)
|
||||
db[cert_sha] = self._ocsp_response_dbm(ocsp_workfile)
|
||||
self._ocsp_dbm_close(db)
|
||||
else:
|
||||
logger.warning("Encountered an issue while trying to prefetch OCSP "
|
||||
"response for certificate: %s", cert_path)
|
||||
|
||||
def _ocsp_response_dbm(self, workfile):
|
||||
"""Creates a dbm entry for OCSP response data
|
||||
|
||||
:param str workfile: File path for raw OCSP response
|
||||
|
||||
:returns: OCSP response cache data that Apache can use
|
||||
:rtype: string
|
||||
|
||||
"""
|
||||
|
||||
with open(workfile, 'rb') as fh:
|
||||
response = fh.read()
|
||||
ttl = constants.OCSP_APACHE_TTL
|
||||
return apache_util.get_apache_ocsp_struct(ttl, response)
|
||||
|
||||
def _ocsp_prefetch_save(self, cert_path, chain_path):
|
||||
"""Saves status of current OCSP prefetch, including the last update
|
||||
time to determine if an update is needed on later run.
|
||||
|
||||
:param str cert_path: Filesystem path to certificate
|
||||
:param str chain_path: Filesystem path to certificate chain file
|
||||
|
||||
"""
|
||||
status = {
|
||||
"lastupdate": time.time(),
|
||||
"cert_path": cert_path,
|
||||
"chain_path": chain_path
|
||||
}
|
||||
cert_id = apache_util.certid_sha1_hex(cert_path)
|
||||
self._ocsp_prefetch[cert_id] = status
|
||||
self.storage.put("ocsp_prefetch", self._ocsp_prefetch)
|
||||
self.storage.save()
|
||||
|
||||
def _ocsp_prefetch_fetch_state(self):
|
||||
"""
|
||||
Populates the OCSP prefetch state from the pluginstorage.
|
||||
"""
|
||||
try:
|
||||
self._ocsp_prefetch = self.storage.fetch("ocsp_prefetch")
|
||||
except KeyError:
|
||||
self._ocsp_prefetch = dict()
|
||||
|
||||
def _ocsp_prefetch_backup_db(self):
|
||||
"""
|
||||
Copies the active dbm file to work directory.
|
||||
"""
|
||||
self._ensure_ocsp_dirs()
|
||||
cache_path = os.path.join(self.config.config_dir, "ocsp", "ocsp_cache.db")
|
||||
try:
|
||||
shutil.copy2(cache_path, os.path.join(self.config.work_dir, "ocsp"))
|
||||
except IOError:
|
||||
logger.debug("Encountered an issue while trying to backup OCSP dbm file")
|
||||
|
||||
def _ocsp_prefetch_restore_db(self):
|
||||
"""
|
||||
Restores the active dbm file from work directory.
|
||||
"""
|
||||
self._ensure_ocsp_dirs()
|
||||
cache_path = os.path.join(self.config.config_dir, "ocsp", "ocsp_cache.db")
|
||||
work_file_path = os.path.join(self.config.work_dir, "ocsp", "ocsp_cache.db")
|
||||
try:
|
||||
shutil.copy2(work_file_path, cache_path)
|
||||
except IOError:
|
||||
logger.debug("Encountered an issue when trying to restore OCSP dbm file")
|
||||
|
||||
def enable_ocsp_prefetch(self, lineage, domains):
|
||||
"""Enable OCSP Stapling and prefetching of the responses.
|
||||
|
||||
In OCSP, each client (e.g. browser) would have to query the
|
||||
OCSP Responder to validate that the site certificate was not revoked.
|
||||
|
||||
Enabling OCSP Stapling, would allow the web-server to query the OCSP
|
||||
Responder, and staple its response to the offered certificate during
|
||||
TLS. i.e. clients would not have to query the OCSP responder.
|
||||
|
||||
"""
|
||||
|
||||
# Fail early if we are not able to support this
|
||||
self._ensure_ocsp_prefetch_compatibility()
|
||||
prefetch_vhosts = set()
|
||||
for domain in domains:
|
||||
matched_vhosts = self.choose_vhosts(domain, create_if_no_ssl=False)
|
||||
# We should be handling only SSL vhosts
|
||||
for vh in matched_vhosts:
|
||||
if vh.ssl:
|
||||
prefetch_vhosts.add(vh)
|
||||
|
||||
if prefetch_vhosts:
|
||||
for vh in prefetch_vhosts:
|
||||
self._enable_ocsp_stapling(vh, None, prefetch=True)
|
||||
self.restart()
|
||||
# Ensure Apache has enough time to properly restart and create the file
|
||||
time.sleep(2)
|
||||
try:
|
||||
self._ocsp_refresh(lineage.cert_path, lineage.chain_path)
|
||||
self._ocsp_prefetch_save(lineage.cert_path, lineage.chain_path)
|
||||
self.save("Enabled OCSP prefetching")
|
||||
except errors.PluginError as err:
|
||||
# Revert the OCSP prefetch configuration
|
||||
self.recovery_routine()
|
||||
self.restart()
|
||||
msg = ("Encountered an error while trying to enable OCSP prefetch "
|
||||
"enhancement: %s.\nOCSP prefetch was not enabled.")
|
||||
raise errors.PluginError(msg % str(err))
|
||||
|
||||
def update_ocsp_prefetch(self, _unused_lineage):
|
||||
"""Checks all certificates that are managed by OCSP prefetch, and
|
||||
refreshes OCSP responses for them if required."""
|
||||
|
||||
self._ocsp_prefetch_fetch_state()
|
||||
if not self._ocsp_prefetch:
|
||||
# No OCSP prefetching enabled for any certificate
|
||||
return
|
||||
|
||||
for _, pf in self._ocsp_prefetch.items():
|
||||
if self._ocsp_refresh_if_needed(pf):
|
||||
# Save the status to pluginstorage
|
||||
self._ocsp_prefetch_save(pf["cert_path"], pf["chain_path"])
|
||||
|
||||
|
||||
def _enable_autohsts_domain(self, ssl_vhost):
|
||||
"""Do the initial AutoHSTS deployment to a vhost
|
||||
|
||||
|
|
|
|||
|
|
@ -13,12 +13,13 @@ from certbot.plugins.enhancements import OCSPPrefetchEnhancement
|
|||
|
||||
from certbot_apache._internal import apache_util
|
||||
from certbot_apache._internal import configurator
|
||||
from certbot_apache._internal import prefetch_ocsp
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@zope.interface.provider(interfaces.IPluginFactory)
|
||||
class DebianConfigurator(configurator.ApacheConfigurator):
|
||||
class DebianConfigurator(prefetch_ocsp.OCSPPrefetchMixin, configurator.ApacheConfigurator):
|
||||
"""Debian specific ApacheConfigurator override class"""
|
||||
|
||||
OS_DEFAULTS = dict(
|
||||
|
|
@ -145,5 +146,27 @@ class DebianConfigurator(configurator.ApacheConfigurator):
|
|||
temp, [self.option("dismod"), "-f", mod_name])
|
||||
util.run_script([self.option("enmod"), mod_name])
|
||||
|
||||
def restart(self):
|
||||
"""Runs a config test and reloads the Apache server.
|
||||
|
||||
:raises .errors.MisconfigurationError: If either the config test
|
||||
or reload fails.
|
||||
|
||||
"""
|
||||
self.config_test()
|
||||
|
||||
if not self._ocsp_prefetch:
|
||||
# Try to populate OCSP prefetch structure from pluginstorage
|
||||
self._ocsp_prefetch_fetch_state()
|
||||
if self._ocsp_prefetch:
|
||||
# OCSP prefetching is enabled, so back up the db
|
||||
self._ocsp_prefetch_backup_db()
|
||||
|
||||
self._reload()
|
||||
|
||||
if self._ocsp_prefetch:
|
||||
# Restore the backed up dbm database
|
||||
self._ocsp_prefetch_restore_db()
|
||||
|
||||
|
||||
OCSPPrefetchEnhancement.register(DebianConfigurator) # pylint: disable=no-member
|
||||
|
|
|
|||
246
certbot-apache/certbot_apache/_internal/prefetch_ocsp.py
Normal file
246
certbot-apache/certbot_apache/_internal/prefetch_ocsp.py
Normal file
|
|
@ -0,0 +1,246 @@
|
|||
"""A mixin class for OCSP response prefetching for Apache plugin"""
|
||||
import logging
|
||||
import shutil
|
||||
import time
|
||||
|
||||
from acme.magic_typing import Dict # pylint: disable=unused-import, no-name-in-module
|
||||
|
||||
from certbot import errors
|
||||
from certbot._internal import ocsp
|
||||
|
||||
from certbot.compat import filesystem
|
||||
from certbot.compat import os
|
||||
from certbot_apache._internal import apache_util
|
||||
from certbot_apache._internal import constants
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class OCSPPrefetchMixin(object):
|
||||
"""OCSPPrefetchMixin implements OCSP response prefetching"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self._ocsp_prefetch = {} # type: Dict[str, str]
|
||||
self._ocsp_dbm_bsddb = False
|
||||
# This is required because of python super() call chain
|
||||
# mypy isn't able to figure the chain out and needs to be
|
||||
# disabled for this line.
|
||||
super(OCSPPrefetchMixin, self).__init__(*args, **kwargs) # type: ignore
|
||||
|
||||
def _ensure_ocsp_dirs(self):
|
||||
"""Makes sure that the OCSP directory paths exist."""
|
||||
ocsp_work = os.path.join(self.config.work_dir, "ocsp")
|
||||
ocsp_save = os.path.join(self.config.config_dir, "ocsp")
|
||||
for path in [ocsp_work, ocsp_save]:
|
||||
if not os.path.isdir(path):
|
||||
filesystem.makedirs(path)
|
||||
filesystem.chmod(path, 0o755)
|
||||
|
||||
def _ensure_ocsp_prefetch_compatibility(self):
|
||||
"""Make sure that the operating system supports the required libraries
|
||||
to manage Apache DBM files.
|
||||
|
||||
:raises: errors.NotSupportedError
|
||||
"""
|
||||
try:
|
||||
import bsddb # pylint: disable=unused-variable
|
||||
except ImportError:
|
||||
import dbm
|
||||
if not hasattr(dbm, 'ndbm') or dbm.ndbm.library != 'Berkeley DB': # pylint: disable=no-member
|
||||
msg = ("Unfortunately your operating system does not have a "
|
||||
"compatible database module available for managing "
|
||||
"Apache OCSP stapling cache database.")
|
||||
raise errors.NotSupportedError(msg)
|
||||
|
||||
def _ocsp_dbm_open(self, filepath):
|
||||
"""Helper method to open an DBM file in a way that depends on the platform
|
||||
that Certbot is run on. Returns an open database structure."""
|
||||
|
||||
if not os.path.isfile(filepath+".db"):
|
||||
raise errors.PluginError(
|
||||
"The OCSP stapling cache DBM file wasn't created by Apache.")
|
||||
try:
|
||||
import bsddb
|
||||
self._ocsp_dbm_bsddb = True
|
||||
cache_path = filepath + ".db"
|
||||
try:
|
||||
database = bsddb.hashopen(cache_path, 'w')
|
||||
except Exception:
|
||||
raise errors.PluginError("Unable to open dbm database file.")
|
||||
except ImportError:
|
||||
# Python3 doesn't have bsddb module, so we use dbm.ndbm instead
|
||||
import dbm
|
||||
try:
|
||||
database = dbm.ndbm.open(filepath, 'w') # pylint: disable=no-member
|
||||
except Exception:
|
||||
# This is raised if a file cannot be found
|
||||
raise errors.PluginError("Unable to open dbm database file.")
|
||||
return database
|
||||
|
||||
def _ocsp_dbm_close(self, database):
|
||||
"""Helper method to sync and close a DBM file, in a way required by the
|
||||
used dbm implementation."""
|
||||
if self._ocsp_dbm_bsddb:
|
||||
database.sync()
|
||||
database.close()
|
||||
else:
|
||||
database.close()
|
||||
|
||||
def _ocsp_refresh_if_needed(self, pf_obj):
|
||||
"""Refreshes OCSP response for a certiifcate if it's due
|
||||
|
||||
:param dict pf_obj: OCSP prefetch object from pluginstorage
|
||||
|
||||
:returns: If OCSP response was updated
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
ttl = pf_obj["lastupdate"] + constants.OCSP_INTERNAL_TTL
|
||||
if ttl < time.time():
|
||||
self._ocsp_refresh(pf_obj["cert_path"], pf_obj["chain_path"])
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _ocsp_refresh(self, cert_path, chain_path):
|
||||
"""Refresh the OCSP response for a certificate
|
||||
|
||||
:param str cert_path: Filesystem path to certificate file
|
||||
:param str chain_path: Filesystem path to certificate chain file
|
||||
|
||||
"""
|
||||
|
||||
self._ensure_ocsp_dirs()
|
||||
ocsp_workfile = os.path.join(
|
||||
self.config.work_dir, "ocsp",
|
||||
apache_util.certid_sha1_hex(cert_path))
|
||||
handler = ocsp.RevocationChecker()
|
||||
if not handler.revoked(cert_path, chain_path, ocsp_workfile):
|
||||
# Guaranteed good response
|
||||
cache_path = os.path.join(self.config.config_dir, "ocsp", "ocsp_cache")
|
||||
# dbm.open automatically adds the file extension, it will be
|
||||
db = self._ocsp_dbm_open(cache_path)
|
||||
cert_sha = apache_util.certid_sha1(cert_path)
|
||||
db[cert_sha] = self._ocsp_response_dbm(ocsp_workfile)
|
||||
self._ocsp_dbm_close(db)
|
||||
else:
|
||||
logger.warning("Encountered an issue while trying to prefetch OCSP "
|
||||
"response for certificate: %s", cert_path)
|
||||
|
||||
def _ocsp_response_dbm(self, workfile):
|
||||
"""Creates a dbm entry for OCSP response data
|
||||
|
||||
:param str workfile: File path for raw OCSP response
|
||||
|
||||
:returns: OCSP response cache data that Apache can use
|
||||
:rtype: string
|
||||
|
||||
"""
|
||||
|
||||
with open(workfile, 'rb') as fh:
|
||||
response = fh.read()
|
||||
ttl = constants.OCSP_APACHE_TTL
|
||||
return apache_util.get_apache_ocsp_struct(ttl, response)
|
||||
|
||||
def _ocsp_prefetch_save(self, cert_path, chain_path):
|
||||
"""Saves status of current OCSP prefetch, including the last update
|
||||
time to determine if an update is needed on later run.
|
||||
|
||||
:param str cert_path: Filesystem path to certificate
|
||||
:param str chain_path: Filesystem path to certificate chain file
|
||||
|
||||
"""
|
||||
status = {
|
||||
"lastupdate": time.time(),
|
||||
"cert_path": cert_path,
|
||||
"chain_path": chain_path
|
||||
}
|
||||
cert_id = apache_util.certid_sha1_hex(cert_path)
|
||||
self._ocsp_prefetch[cert_id] = status
|
||||
self.storage.put("ocsp_prefetch", self._ocsp_prefetch)
|
||||
self.storage.save()
|
||||
|
||||
def _ocsp_prefetch_fetch_state(self):
|
||||
"""
|
||||
Populates the OCSP prefetch state from the pluginstorage.
|
||||
"""
|
||||
try:
|
||||
self._ocsp_prefetch = self.storage.fetch("ocsp_prefetch")
|
||||
except KeyError:
|
||||
self._ocsp_prefetch = dict()
|
||||
|
||||
def _ocsp_prefetch_backup_db(self):
|
||||
"""
|
||||
Copies the active dbm file to work directory.
|
||||
"""
|
||||
self._ensure_ocsp_dirs()
|
||||
cache_path = os.path.join(self.config.config_dir, "ocsp", "ocsp_cache.db")
|
||||
try:
|
||||
shutil.copy2(cache_path, os.path.join(self.config.work_dir, "ocsp"))
|
||||
except IOError:
|
||||
logger.debug("Encountered an issue while trying to backup OCSP dbm file")
|
||||
|
||||
def _ocsp_prefetch_restore_db(self):
|
||||
"""
|
||||
Restores the active dbm file from work directory.
|
||||
"""
|
||||
self._ensure_ocsp_dirs()
|
||||
cache_path = os.path.join(self.config.config_dir, "ocsp", "ocsp_cache.db")
|
||||
work_file_path = os.path.join(self.config.work_dir, "ocsp", "ocsp_cache.db")
|
||||
try:
|
||||
shutil.copy2(work_file_path, cache_path)
|
||||
except IOError:
|
||||
logger.debug("Encountered an issue when trying to restore OCSP dbm file")
|
||||
|
||||
def enable_ocsp_prefetch(self, lineage, domains):
|
||||
"""Enable OCSP Stapling and prefetching of the responses.
|
||||
|
||||
In OCSP, each client (e.g. browser) would have to query the
|
||||
OCSP Responder to validate that the site certificate was not revoked.
|
||||
|
||||
Enabling OCSP Stapling, would allow the web-server to query the OCSP
|
||||
Responder, and staple its response to the offered certificate during
|
||||
TLS. i.e. clients would not have to query the OCSP responder.
|
||||
|
||||
"""
|
||||
|
||||
# Fail early if we are not able to support this
|
||||
self._ensure_ocsp_prefetch_compatibility()
|
||||
prefetch_vhosts = set()
|
||||
for domain in domains:
|
||||
matched_vhosts = self.choose_vhosts(domain, create_if_no_ssl=False)
|
||||
# We should be handling only SSL vhosts
|
||||
for vh in matched_vhosts:
|
||||
if vh.ssl:
|
||||
prefetch_vhosts.add(vh)
|
||||
|
||||
if prefetch_vhosts:
|
||||
for vh in prefetch_vhosts:
|
||||
self._enable_ocsp_stapling(vh, None, prefetch=True)
|
||||
self.restart()
|
||||
# Ensure Apache has enough time to properly restart and create the file
|
||||
time.sleep(2)
|
||||
try:
|
||||
self._ocsp_refresh(lineage.cert_path, lineage.chain_path)
|
||||
self._ocsp_prefetch_save(lineage.cert_path, lineage.chain_path)
|
||||
self.save("Enabled OCSP prefetching")
|
||||
except errors.PluginError as err:
|
||||
# Revert the OCSP prefetch configuration
|
||||
self.recovery_routine()
|
||||
self.restart()
|
||||
msg = ("Encountered an error while trying to enable OCSP prefetch "
|
||||
"enhancement: %s.\nOCSP prefetch was not enabled.")
|
||||
raise errors.PluginError(msg % str(err))
|
||||
|
||||
def update_ocsp_prefetch(self, _unused_lineage):
|
||||
"""Checks all certificates that are managed by OCSP prefetch, and
|
||||
refreshes OCSP responses for them if required."""
|
||||
|
||||
self._ocsp_prefetch_fetch_state()
|
||||
if not self._ocsp_prefetch:
|
||||
# No OCSP prefetching enabled for any certificate
|
||||
return
|
||||
|
||||
for _, pf in self._ocsp_prefetch.items():
|
||||
if self._ocsp_refresh_if_needed(pf):
|
||||
# Save the status to pluginstorage
|
||||
self._ocsp_prefetch_save(pf["cert_path"], pf["chain_path"])
|
||||
|
|
@ -111,7 +111,7 @@ class OCSPPrefetchTest(util.ApacheTest):
|
|||
open(db_fullpath, 'a').close()
|
||||
|
||||
ver_path = "certbot_apache._internal.configurator.ApacheConfigurator.get_version"
|
||||
res_path = "certbot_apache._internal.configurator.ApacheConfigurator.restart"
|
||||
res_path = "certbot_apache._internal.override_debian.DebianConfigurator.restart"
|
||||
cry_path = "certbot.crypto_util.cert_sha1_fingerprint"
|
||||
|
||||
with mock.patch(ver_path) as mock_ver:
|
||||
|
|
@ -151,7 +151,7 @@ class OCSPPrefetchTest(util.ApacheTest):
|
|||
self.config.parser.modules.discard("headers_module")
|
||||
self.config.parser.modules.discard("mod_header.c")
|
||||
|
||||
ref_path = "certbot_apache._internal.configurator.ApacheConfigurator._ocsp_refresh"
|
||||
ref_path = "certbot_apache._internal.override_debian.DebianConfigurator._ocsp_refresh"
|
||||
with mock.patch(ref_path):
|
||||
self.call_mocked_py2(self.config.enable_ocsp_prefetch,
|
||||
self.lineage, ["ocspvhost.com"])
|
||||
|
|
@ -160,7 +160,7 @@ class OCSPPrefetchTest(util.ApacheTest):
|
|||
|
||||
@mock.patch("certbot_apache._internal.override_debian.DebianConfigurator.enable_mod")
|
||||
def test_ocsp_prefetch_enable_error(self, _mock_enable):
|
||||
ref_path = "certbot_apache._internal.configurator.ApacheConfigurator._ocsp_refresh"
|
||||
ref_path = "certbot_apache._internal.override_debian.DebianConfigurator._ocsp_refresh"
|
||||
self.config.recovery_routine = mock.MagicMock()
|
||||
with mock.patch(ref_path, side_effect=errors.PluginError("failed")):
|
||||
self.assertRaises(errors.PluginError,
|
||||
|
|
@ -204,7 +204,7 @@ class OCSPPrefetchTest(util.ApacheTest):
|
|||
self.call_mocked_py2(self.config.enable_ocsp_prefetch,
|
||||
self.lineage, ["ocspvhost.com"])
|
||||
self.assertEqual(len(self.config._ocsp_prefetch), 1)
|
||||
refresh_path = "certbot_apache._internal.configurator.ApacheConfigurator._ocsp_refresh"
|
||||
refresh_path = "certbot_apache._internal.override_debian.DebianConfigurator._ocsp_refresh"
|
||||
with mock.patch(refresh_path) as mock_refresh:
|
||||
self.call_mocked_py2(self.config.update_ocsp_prefetch, None)
|
||||
self.assertFalse(mock_refresh.called)
|
||||
|
|
@ -236,7 +236,7 @@ class OCSPPrefetchTest(util.ApacheTest):
|
|||
@mock.patch("certbot_apache._internal.configurator.ApacheConfigurator.config_test")
|
||||
@mock.patch("certbot_apache._internal.configurator.ApacheConfigurator._reload")
|
||||
def test_ocsp_prefetch_backup_db_error(self, _mock_reload, _mock_test):
|
||||
log_path = "certbot_apache._internal.configurator.logger.debug"
|
||||
log_path = "certbot_apache._internal.prefetch_ocsp.logger.debug"
|
||||
log_string = "Encountered an issue while trying to backup OCSP dbm file"
|
||||
log_string2 = "Encountered an issue when trying to restore OCSP dbm file"
|
||||
self.config._ocsp_prefetch = {"mock": "value"}
|
||||
|
|
@ -251,7 +251,7 @@ class OCSPPrefetchTest(util.ApacheTest):
|
|||
@mock.patch("certbot_apache._internal.configurator.ApacheConfigurator.restart")
|
||||
def test_ocsp_prefetch_refresh_fail(self, _mock_restart):
|
||||
ocsp_path = "certbot._internal.ocsp.RevocationChecker.revoked"
|
||||
log_path = "certbot_apache._internal.configurator.logger.warning"
|
||||
log_path = "certbot_apache._internal.prefetch_ocsp.logger.warning"
|
||||
with mock.patch(ocsp_path) as mock_ocsp:
|
||||
mock_ocsp.return_value = True
|
||||
with mock.patch(log_path) as mock_log:
|
||||
|
|
@ -261,7 +261,7 @@ class OCSPPrefetchTest(util.ApacheTest):
|
|||
self.assertTrue(
|
||||
"trying to prefetch OCSP" in mock_log.call_args[0][0])
|
||||
|
||||
@mock.patch("certbot_apache._internal.configurator.ApacheConfigurator._ocsp_refresh_if_needed")
|
||||
@mock.patch("certbot_apache._internal.override_debian.DebianConfigurator._ocsp_refresh_if_needed")
|
||||
def test_ocsp_prefetch_update_noop(self, mock_refresh):
|
||||
self.config.update_ocsp_prefetch(None)
|
||||
self.assertFalse(mock_refresh.called)
|
||||
|
|
@ -328,6 +328,24 @@ class OCSPPrefetchTest(util.ApacheTest):
|
|||
db2 = self.call_mocked_py3(self.config._ocsp_dbm_open, self.db_path)
|
||||
self.assertEqual(db2[b'key'], expected_val)
|
||||
|
||||
@mock.patch("certbot_apache._internal.prefetch_ocsp.OCSPPrefetchMixin._ocsp_prefetch_fetch_state")
|
||||
@mock.patch("certbot_apache._internal.configurator.ApacheConfigurator.config_test")
|
||||
@mock.patch("certbot_apache._internal.configurator.ApacheConfigurator._reload")
|
||||
def test_restart_load_state_call(self, _rl, _ct, mock_fch):
|
||||
self.assertFalse(self.config._ocsp_prefetch)
|
||||
self.config.restart()
|
||||
self.assertTrue(mock_fch.called)
|
||||
|
||||
@mock.patch("certbot_apache._internal.prefetch_ocsp.OCSPPrefetchMixin._ocsp_prefetch_backup_db")
|
||||
@mock.patch("certbot_apache._internal.prefetch_ocsp.OCSPPrefetchMixin._ocsp_prefetch_restore_db")
|
||||
@mock.patch("certbot_apache._internal.configurator.ApacheConfigurator.config_test")
|
||||
@mock.patch("certbot_apache._internal.configurator.ApacheConfigurator._reload")
|
||||
def test_restart_backupdb_call(self, _rl, _ctest, mock_rest, mock_bck):
|
||||
self.config._ocsp_prefetch = True
|
||||
self.config.restart()
|
||||
self.assertTrue(mock_rest.called)
|
||||
self.assertTrue(mock_bck.called)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main() # pragma: no cover
|
||||
|
|
|
|||
Loading…
Reference in a new issue