Add hook directories (#5151)

* Add hook dir constants

* Add hook dir properties to configuration

* test hook dir properties

* reuse certbot.util.is_exe

* Add certbot.hooks.list_hooks

* test list_hooks

* Run pre-hooks in directory

* Run deploy-hooks in directory

* Run post-hooks in directory

* Refactor and update certbot/tests/hook_test.py

* Add integration tests for hook directories

* Have Certbot create hook directories.

* document renewal hook directories

* Add --no-directory-hooks

* Make minor note about locale independent sorting
This commit is contained in:
Brad Warren 2017-10-03 13:52:02 -07:00 committed by GitHub
parent b9d129bd43
commit 356471cdf6
12 changed files with 824 additions and 148 deletions

View file

@ -1168,6 +1168,11 @@ def prepare_and_parse_args(plugins, args, detect_defaults=False): # pylint: dis
" simplistic and fails if you use more advanced shell"
" constructs, so you can use this switch to disable it."
" (default: False)")
helpful.add(
"renew", "--no-directory-hooks", action="store_false",
default=flag_default("directory_hooks"), dest="directory_hooks",
help="Disable running executables found in Certbot's hook directories"
" during renewal. (default: False)")
helpful.add_deprecated_argument("--agree-dev-preview", 0)
helpful.add_deprecated_argument("--dialog", 0)

View file

@ -108,6 +108,30 @@ class NamespaceConfig(object):
return os.path.join(
self.namespace.config_dir, constants.RENEWAL_CONFIGS_DIR)
@property
def renewal_hooks_dir(self):
"""Path to directory with hooks to run with the renew subcommand."""
return os.path.join(self.namespace.config_dir,
constants.RENEWAL_HOOKS_DIR)
@property
def renewal_pre_hooks_dir(self):
"""Path to the pre-hook directory for the renew subcommand."""
return os.path.join(self.renewal_hooks_dir,
constants.RENEWAL_PRE_HOOKS_DIR)
@property
def renewal_deploy_hooks_dir(self):
"""Path to the deploy-hook directory for the renew subcommand."""
return os.path.join(self.renewal_hooks_dir,
constants.RENEWAL_DEPLOY_HOOKS_DIR)
@property
def renewal_post_hooks_dir(self):
"""Path to the post-hook directory for the renew subcommand."""
return os.path.join(self.renewal_hooks_dir,
constants.RENEWAL_POST_HOOKS_DIR)
def check_config_sanity(config):
"""Validate command line options and display error message if

View file

@ -63,6 +63,7 @@ CLI_DEFAULTS = dict(
strict_permissions=False,
pref_challs=[],
validate_hooks=True,
directory_hooks=True,
# Subparsers
num=None,
@ -176,6 +177,18 @@ TEMP_CHECKPOINT_DIR = "temp_checkpoint"
RENEWAL_CONFIGS_DIR = "renewal"
"""Renewal configs directory, relative to `IConfig.config_dir`."""
RENEWAL_HOOKS_DIR = "renewal-hooks"
"""Basename of directory containing hooks to run with the renew command."""
RENEWAL_PRE_HOOKS_DIR = "pre"
"""Basename of directory containing pre-hooks to run with the renew command."""
RENEWAL_DEPLOY_HOOKS_DIR = "deploy"
"""Basename of directory containing deploy-hooks to run with the renew command."""
RENEWAL_POST_HOOKS_DIR = "post"
"""Basename of directory containing post-hooks to run with the renew command."""
FORCE_INTERACTIVE_FLAG = "--force-interactive"
"""Flag to disable TTY checking in IDisplay."""

View file

@ -57,30 +57,71 @@ def validate_hook(shell_cmd, hook_name):
def pre_hook(config):
"Run pre-hook if it's defined and hasn't been run."
"""Run pre-hooks if they exist and haven't already been run.
When Certbot is running with the renew subcommand, this function
runs any hooks found in the config.renewal_pre_hooks_dir (if they
have not already been run) followed by any pre-hook in the config.
If hooks in config.renewal_pre_hooks_dir are run and the pre-hook in
the config is a path to one of these scripts, it is not run twice.
:param configuration.NamespaceConfig config: Certbot settings
"""
if config.verb == "renew" and config.directory_hooks:
for hook in list_hooks(config.renewal_pre_hooks_dir):
_run_pre_hook_if_necessary(hook)
cmd = config.pre_hook
if cmd and cmd not in pre_hook.already:
logger.info("Running pre-hook command: %s", cmd)
_run_hook(cmd)
pre_hook.already.add(cmd)
elif cmd:
logger.info("Pre-hook command already run, skipping: %s", cmd)
if cmd:
_run_pre_hook_if_necessary(cmd)
pre_hook.already = set() # type: ignore
def post_hook(config):
"""Run post hook if defined.
def _run_pre_hook_if_necessary(command):
"""Run the specified pre-hook if we haven't already.
If we've already run this exact command before, a message is logged
saying the pre-hook was skipped.
:param str command: pre-hook to be run
"""
if command in pre_hook.already:
logger.info("Pre-hook command already run, skipping: %s", command)
else:
logger.info("Running pre-hook command: %s", command)
_run_hook(command)
pre_hook.already.add(command)
def post_hook(config):
"""Run post-hooks if defined.
This function also registers any executables found in
config.renewal_post_hooks_dir to be run when Certbot is used with
the renew subcommand.
If the verb is renew, we delay executing any post-hooks until
:func:`run_saved_post_hooks` is called. In this case, this function
registers all hooks found in config.renewal_post_hooks_dir to be
called followed by any post-hook in the config. If the post-hook in
the config is a path to an executable in the post-hook directory, it
is not scheduled to be run twice.
:param configuration.NamespaceConfig config: Certbot settings
If the verb is renew, we might have more certs to renew, so we wait until
run_saved_post_hooks() is called.
"""
cmd = config.post_hook
# In the "renew" case, we save these up to run at the end
if config.verb == "renew":
if cmd and cmd not in post_hook.eventually:
post_hook.eventually.append(cmd)
if config.directory_hooks:
for hook in list_hooks(config.renewal_post_hooks_dir):
_run_eventually(hook)
if cmd:
_run_eventually(cmd)
# certonly / run
elif cmd:
logger.info("Running post-hook command: %s", cmd)
@ -89,6 +130,19 @@ def post_hook(config):
post_hook.eventually = [] # type: ignore
def _run_eventually(command):
"""Registers a post-hook to be run eventually.
All commands given to this function will be run exactly once in the
order they were given when :func:`run_saved_post_hooks` is called.
:param str command: post-hook to register to be run
"""
if command not in post_hook.eventually:
post_hook.eventually.append(command)
def run_saved_post_hooks():
"""Run any post hooks that were saved up in the course of the 'renew' verb"""
for cmd in post_hook.eventually:
@ -106,20 +160,65 @@ def deploy_hook(config, domains, lineage_path):
"""
if config.deploy_hook:
renew_hook(config, domains, lineage_path)
_run_deploy_hook(config.deploy_hook, domains,
lineage_path, config.dry_run)
def renew_hook(config, domains, lineage_path):
"""Run post-renewal hook if defined."""
"""Run post-renewal hooks.
This function runs any hooks found in
config.renewal_deploy_hooks_dir followed by any renew-hook in the
config. If the renew-hook in the config is a path to a script in
config.renewal_deploy_hooks_dir, it is not run twice.
If Certbot is doing a dry run, no hooks are run and messages are
logged saying that they were skipped.
:param configuration.NamespaceConfig config: Certbot settings
:param domains: domains in the obtained certificate
:type domains: `list` of `str`
:param str lineage_path: live directory path for the new cert
"""
executed_dir_hooks = set()
if config.directory_hooks:
for hook in list_hooks(config.renewal_deploy_hooks_dir):
_run_deploy_hook(hook, domains, lineage_path, config.dry_run)
executed_dir_hooks.add(hook)
if config.renew_hook:
if not config.dry_run:
os.environ["RENEWED_DOMAINS"] = " ".join(domains)
os.environ["RENEWED_LINEAGE"] = lineage_path
logger.info("Running deploy-hook command: %s", config.renew_hook)
_run_hook(config.renew_hook)
if config.renew_hook in executed_dir_hooks:
logger.info("Skipping deploy-hook '%s' as it was already run.",
config.renew_hook)
else:
logger.warning(
"Dry run: skipping deploy hook command: %s", config.renew_hook)
_run_deploy_hook(config.renew_hook, domains,
lineage_path, config.dry_run)
def _run_deploy_hook(command, domains, lineage_path, dry_run):
"""Run the specified deploy-hook (if not doing a dry run).
If dry_run is True, command is not run and a message is logged
saying that it was skipped. If dry_run is False, the hook is run
after setting the appropriate environment variables.
:param str command: command to run as a deploy-hook
:param domains: domains in the obtained certificate
:type domains: `list` of `str`
:param str lineage_path: live directory path for the new cert
:param bool dry_run: True iff Certbot is doing a dry run
"""
if dry_run:
logger.warning("Dry run: skipping deploy hook command: %s",
command)
return
os.environ["RENEWED_DOMAINS"] = " ".join(domains)
os.environ["RENEWED_LINEAGE"] = lineage_path
logger.info("Running deploy-hook command: %s", command)
_run_hook(command)
def _run_hook(shell_cmd):
@ -151,3 +250,15 @@ def execute(shell_cmd):
logger.error('Error output from %s:\n%s', base_cmd, err)
return (err, out)
def list_hooks(dir_path):
"""List paths to all hooks found in dir_path in sorted order.
:param str dir_path: directory to search
:returns: `list` of `str`
:rtype: sorted list of absolute paths to executables in dir_path
"""
paths = (os.path.join(dir_path, f) for f in os.listdir(dir_path))
return sorted(path for path in paths if util.is_exe(path))

View file

@ -711,12 +711,20 @@ def renew(config, unused_plugins):
def make_or_verify_needed_dirs(config):
"""Create or verify existence of config and work directories"""
"""Create or verify existence of config, work, and hook directories."""
util.set_up_core_dir(config.config_dir, constants.CONFIG_DIRS_MODE,
os.geteuid(), config.strict_permissions)
util.set_up_core_dir(config.work_dir, constants.CONFIG_DIRS_MODE,
os.geteuid(), config.strict_permissions)
hook_dirs = (config.renewal_pre_hooks_dir,
config.renewal_deploy_hooks_dir,
config.renewal_post_hooks_dir,)
for hook_dir in hook_dirs:
util.make_or_verify_dir(hook_dir,
uid=os.geteuid(),
strict=config.strict_permissions)
def set_displayer(config):
"""Set the displayer"""

View file

@ -408,6 +408,12 @@ class ParseTest(unittest.TestCase): # pylint: disable=too-many-public-methods
self.assertEqual(namespace.domains, [])
self.assertEqual(namespace.pref_challs, [])
def test_no_directory_hooks_set(self):
self.assertFalse(self.parse(["--no-directory-hooks"]).directory_hooks)
def test_no_directory_hooks_unset(self):
self.assertTrue(self.parse([]).directory_hooks)
class DefaultTest(unittest.TestCase):
"""Tests for certbot.cli._Default."""

View file

@ -4,6 +4,7 @@ import unittest
import mock
from certbot import constants
from certbot import errors
from certbot.tests import util as test_util
@ -37,14 +38,14 @@ class NamespaceConfigTest(test_util.ConfigTestCase):
self.config.server_path.split(os.path.sep))
@mock.patch('certbot.configuration.constants')
def test_dynamic_dirs(self, constants):
constants.ACCOUNTS_DIR = 'acc'
constants.BACKUP_DIR = 'backups'
constants.CSR_DIR = 'csr'
def test_dynamic_dirs(self, mock_constants):
mock_constants.ACCOUNTS_DIR = 'acc'
mock_constants.BACKUP_DIR = 'backups'
mock_constants.CSR_DIR = 'csr'
constants.IN_PROGRESS_DIR = '../p'
constants.KEY_DIR = 'keys'
constants.TEMP_CHECKPOINT_DIR = 't'
mock_constants.IN_PROGRESS_DIR = '../p'
mock_constants.KEY_DIR = 'keys'
mock_constants.TEMP_CHECKPOINT_DIR = 't'
self.assertEqual(
self.config.accounts_dir, os.path.join(
@ -95,10 +96,10 @@ class NamespaceConfigTest(test_util.ConfigTestCase):
self.assertTrue(os.path.isabs(config.temp_checkpoint_dir))
@mock.patch('certbot.configuration.constants')
def test_renewal_dynamic_dirs(self, constants):
constants.ARCHIVE_DIR = 'a'
constants.LIVE_DIR = 'l'
constants.RENEWAL_CONFIGS_DIR = 'renewal_configs'
def test_renewal_dynamic_dirs(self, mock_constants):
mock_constants.ARCHIVE_DIR = 'a'
mock_constants.LIVE_DIR = 'l'
mock_constants.RENEWAL_CONFIGS_DIR = 'renewal_configs'
self.assertEqual(
self.config.default_archive_dir, os.path.join(self.config.config_dir, 'a'))
@ -134,6 +135,20 @@ class NamespaceConfigTest(test_util.ConfigTestCase):
self.config.namespace.bar = 1337
self.assertEqual(self.config.bar, 1337)
def test_hook_directories(self):
self.assertEqual(self.config.renewal_hooks_dir,
os.path.join(self.config.config_dir,
constants.RENEWAL_HOOKS_DIR))
self.assertEqual(self.config.renewal_pre_hooks_dir,
os.path.join(self.config.renewal_hooks_dir,
constants.RENEWAL_PRE_HOOKS_DIR))
self.assertEqual(self.config.renewal_deploy_hooks_dir,
os.path.join(self.config.renewal_hooks_dir,
constants.RENEWAL_DEPLOY_HOOKS_DIR))
self.assertEqual(self.config.renewal_post_hooks_dir,
os.path.join(self.config.renewal_hooks_dir,
constants.RENEWAL_POST_HOOKS_DIR))
if __name__ == '__main__':
unittest.main() # pragma: no cover

View file

@ -1,136 +1,488 @@
"""Tests for hooks.py"""
# pylint: disable=protected-access
"""Tests for certbot.hooks."""
import os
import stat
import unittest
import mock
from six.moves import reload_module # pylint: disable=import-error
from certbot import errors
from certbot import hooks
from certbot.tests import util
class HookTest(unittest.TestCase):
def setUp(self):
reload_module(hooks)
@mock.patch('certbot.hooks._prog')
def test_validate_hooks(self, mock_prog):
config = mock.MagicMock(deploy_hook=None, pre_hook="",
post_hook="ls -lR", renew_hook="uptime")
hooks.validate_hooks(config)
self.assertEqual(mock_prog.call_count, 2)
self.assertEqual(mock_prog.call_args_list[1][0][0], 'uptime')
self.assertEqual(mock_prog.call_args_list[0][0][0], 'ls')
mock_prog.return_value = None
config = mock.MagicMock(pre_hook="explodinator", post_hook="", renew_hook="")
self.assertRaises(errors.HookCommandNotFound, hooks.validate_hooks, config)
class ValidateHooksTest(unittest.TestCase):
"""Tests for certbot.hooks.validate_hooks."""
@mock.patch('certbot.hooks.validate_hook')
def test_validation_order(self, mock_validate_hook):
# This ensures error messages are about deploy hook when appropriate
config = mock.Mock(deploy_hook=None, pre_hook=None,
post_hook=None, renew_hook=None)
hooks.validate_hooks(config)
@classmethod
def _call(cls, *args, **kwargs):
from certbot.hooks import validate_hooks
return validate_hooks(*args, **kwargs)
order = [call[0][1] for call in mock_validate_hook.call_args_list]
self.assertTrue('pre' in order)
self.assertTrue('post' in order)
self.assertTrue('deploy' in order)
self.assertEqual(order[-1], 'renew')
@mock.patch("certbot.hooks.validate_hook")
def test_it(self, mock_validate_hook):
config = mock.MagicMock()
self._call(config)
@mock.patch('certbot.hooks.util.exe_exists')
@mock.patch('certbot.hooks.plug_util.path_surgery')
def test_prog(self, mock_ps, mock_exe_exists):
mock_exe_exists.return_value = True
self.assertEqual(hooks._prog("funky"), "funky")
self.assertEqual(mock_ps.call_count, 0)
types = [call[0][1] for call in mock_validate_hook.call_args_list]
self.assertEqual(set(("pre", "post", "deploy",)), set(types[:-1]))
# This ensures error messages are about deploy hooks when appropriate
self.assertEqual("renew", types[-1])
class ValidateHookTest(util.TempDirTestCase):
"""Tests for certbot.hooks.validate_hook."""
@classmethod
def _call(cls, *args, **kwargs):
from certbot.hooks import validate_hook
return validate_hook(*args, **kwargs)
def test_not_executable(self):
file_path = os.path.join(self.tempdir, "foo")
# create a non-executable file
os.close(os.open(file_path, os.O_CREAT | os.O_WRONLY, 0o666))
# prevent unnecessary modifications to PATH
with mock.patch("certbot.hooks.plug_util.path_surgery"):
self.assertRaises(errors.HookCommandNotFound,
self._call, file_path, "foo")
@mock.patch("certbot.hooks.util.exe_exists")
def test_not_found(self, mock_exe_exists):
mock_exe_exists.return_value = False
self.assertEqual(hooks._prog("funky"), None)
self.assertEqual(mock_ps.call_count, 1)
with mock.patch("certbot.hooks.plug_util.path_surgery") as mock_ps:
self.assertRaises(errors.HookCommandNotFound,
self._call, "foo", "bar")
self.assertTrue(mock_ps.called)
@mock.patch('certbot.hooks.renew_hook')
def test_deploy_hook(self, mock_renew_hook):
args = (mock.Mock(deploy_hook='foo'), ['example.org'], 'path',)
# pylint: disable=star-args
hooks.deploy_hook(*args)
mock_renew_hook.assert_called_once_with(*args)
@mock.patch("certbot.hooks._prog")
def test_unset(self, mock_prog):
self._call(None, "foo")
self.assertFalse(mock_prog.called)
@mock.patch('certbot.hooks.renew_hook')
def test_no_deploy_hook(self, mock_renew_hook):
args = (mock.Mock(deploy_hook=None), ['example.org'], 'path',)
hooks.deploy_hook(*args) # pylint: disable=star-args
mock_renew_hook.assert_not_called()
def _test_a_hook(self, config, hook_function, calls_expected, **kwargs):
with mock.patch('certbot.hooks.logger') as mock_logger:
mock_logger.warning = mock.MagicMock()
with mock.patch('certbot.hooks._run_hook') as mock_run_hook:
hook_function(config, **kwargs)
hook_function(config, **kwargs)
self.assertEqual(mock_run_hook.call_count, calls_expected)
return mock_logger.warning
class HookTest(util.ConfigTestCase):
"""Common base class for hook tests."""
def test_pre_hook(self):
config = mock.MagicMock(pre_hook="true")
self._test_a_hook(config, hooks.pre_hook, 1)
self._test_a_hook(config, hooks.pre_hook, 0)
config = mock.MagicMock(pre_hook="more_true")
self._test_a_hook(config, hooks.pre_hook, 1)
self._test_a_hook(config, hooks.pre_hook, 0)
config = mock.MagicMock(pre_hook="")
self._test_a_hook(config, hooks.pre_hook, 0)
@classmethod
def _call(cls, *args, **kwargs):
"""Calls the method being tested with the given arguments."""
raise NotImplementedError
def _test_renew_post_hooks(self, expected_count):
with mock.patch('certbot.hooks.logger.info') as mock_info:
with mock.patch('certbot.hooks._run_hook') as mock_run:
hooks.run_saved_post_hooks()
self.assertEqual(mock_run.call_count, expected_count)
self.assertEqual(mock_info.call_count, expected_count)
@classmethod
def _call_with_mock_execute(cls, *args, **kwargs):
"""Calls self._call after mocking out certbot.hooks.execute.
def test_post_hooks(self):
config = mock.MagicMock(post_hook="true", verb="splonk")
self._test_a_hook(config, hooks.post_hook, 2)
self._test_renew_post_hooks(0)
The mock execute object is returned rather than the return value
of self._call.
config = mock.MagicMock(post_hook="true", verb="renew")
self._test_a_hook(config, hooks.post_hook, 0)
self._test_renew_post_hooks(1)
self._test_a_hook(config, hooks.post_hook, 0)
self._test_renew_post_hooks(1)
"""
with mock.patch("certbot.hooks.execute") as mock_execute:
mock_execute.return_value = ("", "")
cls._call(*args, **kwargs)
return mock_execute
config = mock.MagicMock(post_hook="more_true", verb="renew")
self._test_a_hook(config, hooks.post_hook, 0)
self._test_renew_post_hooks(2)
def test_renew_hook(self):
with mock.patch.dict('os.environ', {}):
domains = ["a", "b"]
lineage = "thing"
rhook = lambda x: hooks.renew_hook(x, domains, lineage)
class PreHookTest(HookTest):
"""Tests for certbot.hooks.pre_hook."""
config = mock.MagicMock(renew_hook="true", dry_run=False)
self._test_a_hook(config, rhook, 2)
self.assertEqual(os.environ["RENEWED_DOMAINS"], "a b")
self.assertEqual(os.environ["RENEWED_LINEAGE"], "thing")
@classmethod
def _call(cls, *args, **kwargs):
from certbot.hooks import pre_hook
return pre_hook(*args, **kwargs)
config = mock.MagicMock(renew_hook="true", dry_run=True)
mock_warn = self._test_a_hook(config, rhook, 0)
self.assertEqual(mock_warn.call_count, 2)
def setUp(self):
super(PreHookTest, self).setUp()
self.config.pre_hook = "foo"
@mock.patch('certbot.hooks.Popen')
def test_run_hook(self, mock_popen):
with mock.patch('certbot.hooks.logger.error') as mock_error:
mock_cmd = mock.MagicMock()
mock_cmd.returncode = 1
mock_cmd.communicate.return_value = ("", "")
mock_popen.return_value = mock_cmd
hooks._run_hook("ls")
self.assertEqual(mock_error.call_count, 1)
with mock.patch('certbot.hooks.logger.error') as mock_error:
mock_cmd.communicate.return_value = ("", "thing")
hooks._run_hook("ls")
self.assertEqual(mock_error.call_count, 2)
os.makedirs(self.config.renewal_pre_hooks_dir)
self.dir_hook = os.path.join(self.config.renewal_pre_hooks_dir, "bar")
create_hook(self.dir_hook)
# Reset this value as it may have been modified by past tests
self._reset_pre_hook_already()
def tearDown(self):
# Reset this value so it's unmodified for future tests
self._reset_pre_hook_already()
super(PreHookTest, self).tearDown()
def _reset_pre_hook_already(self):
from certbot.hooks import pre_hook
pre_hook.already.clear()
def test_certonly(self):
self.config.verb = "certonly"
self._test_nonrenew_common()
def test_run(self):
self.config.verb = "run"
self._test_nonrenew_common()
def _test_nonrenew_common(self):
mock_execute = self._call_with_mock_execute(self.config)
mock_execute.assert_called_once_with(self.config.pre_hook)
self._test_no_executions_common()
def test_no_hooks(self):
self.config.pre_hook = None
self.config.verb = "renew"
os.remove(self.dir_hook)
with mock.patch("certbot.hooks.logger") as mock_logger:
mock_execute = self._call_with_mock_execute(self.config)
self.assertFalse(mock_execute.called)
self.assertFalse(mock_logger.info.called)
def test_renew_disabled_dir_hooks(self):
self.config.directory_hooks = False
mock_execute = self._call_with_mock_execute(self.config)
mock_execute.assert_called_once_with(self.config.pre_hook)
self._test_no_executions_common()
def test_renew_no_overlap(self):
self.config.verb = "renew"
mock_execute = self._call_with_mock_execute(self.config)
mock_execute.assert_any_call(self.dir_hook)
mock_execute.assert_called_with(self.config.pre_hook)
self._test_no_executions_common()
def test_renew_with_overlap(self):
self.config.pre_hook = self.dir_hook
self.config.verb = "renew"
mock_execute = self._call_with_mock_execute(self.config)
mock_execute.assert_called_once_with(self.dir_hook)
self._test_no_executions_common()
def _test_no_executions_common(self):
with mock.patch("certbot.hooks.logger") as mock_logger:
mock_execute = self._call_with_mock_execute(self.config)
self.assertFalse(mock_execute.called)
self.assertTrue(mock_logger.info.called)
class PostHookTest(HookTest):
"""Tests for certbot.hooks.post_hook."""
@classmethod
def _call(cls, *args, **kwargs):
from certbot.hooks import post_hook
return post_hook(*args, **kwargs)
def setUp(self):
super(PostHookTest, self).setUp()
self.config.post_hook = "bar"
os.makedirs(self.config.renewal_post_hooks_dir)
self.dir_hook = os.path.join(self.config.renewal_post_hooks_dir, "foo")
create_hook(self.dir_hook)
# Reset this value as it may have been modified by past tests
self._reset_post_hook_eventually()
def tearDown(self):
# Reset this value so it's unmodified for future tests
self._reset_post_hook_eventually()
super(PostHookTest, self).tearDown()
def _reset_post_hook_eventually(self):
from certbot.hooks import post_hook
post_hook.eventually = []
def test_certonly_and_run_with_hook(self):
for verb in ("certonly", "run",):
self.config.verb = verb
mock_execute = self._call_with_mock_execute(self.config)
mock_execute.assert_called_once_with(self.config.post_hook)
self.assertFalse(self._get_eventually())
def test_cert_only_and_run_without_hook(self):
self.config.post_hook = None
for verb in ("certonly", "run",):
self.config.verb = verb
self.assertFalse(self._call_with_mock_execute(self.config).called)
self.assertFalse(self._get_eventually())
def test_renew_disabled_dir_hooks(self):
self.config.directory_hooks = False
self._test_renew_common([self.config.post_hook])
def test_renew_no_config_hook(self):
self.config.post_hook = None
self._test_renew_common([self.dir_hook])
def test_renew_no_dir_hook(self):
os.remove(self.dir_hook)
self._test_renew_common([self.config.post_hook])
def test_renew_no_hooks(self):
self.config.post_hook = None
os.remove(self.dir_hook)
self._test_renew_common([])
def test_renew_no_overlap(self):
expected = [self.dir_hook, self.config.post_hook]
self._test_renew_common(expected)
self.config.post_hook = "baz"
expected.append(self.config.post_hook)
self._test_renew_common(expected)
def test_renew_with_overlap(self):
self.config.post_hook = self.dir_hook
self._test_renew_common([self.dir_hook])
def _test_renew_common(self, expected):
self.config.verb = "renew"
for _ in range(2):
self._call(self.config)
self.assertEqual(self._get_eventually(), expected)
def _get_eventually(self):
from certbot.hooks import post_hook
return post_hook.eventually
class RunSavedPostHooksTest(HookTest):
"""Tests for certbot.hooks.run_saved_post_hooks."""
@classmethod
def _call(cls, *args, **kwargs):
from certbot.hooks import run_saved_post_hooks
return run_saved_post_hooks(*args, **kwargs)
def _call_with_mock_execute_and_eventually(self, *args, **kwargs):
"""Call run_saved_post_hooks but mock out execute and eventually
certbot.hooks.post_hook.eventually is replaced with
self.eventually. The mock execute object is returned rather than
the return value of run_saved_post_hooks.
"""
eventually_path = "certbot.hooks.post_hook.eventually"
with mock.patch(eventually_path, new=self.eventually):
return self._call_with_mock_execute(*args, **kwargs)
def setUp(self):
super(RunSavedPostHooksTest, self).setUp()
self.eventually = []
def test_empty(self):
self.assertFalse(self._call_with_mock_execute_and_eventually().called)
def test_multiple(self):
self.eventually = ["foo", "bar", "baz", "qux"]
mock_execute = self._call_with_mock_execute_and_eventually()
calls = mock_execute.call_args_list
for actual_call, expected_arg in zip(calls, self.eventually):
self.assertEqual(actual_call[0][0], expected_arg)
def test_single(self):
self.eventually = ["foo"]
mock_execute = self._call_with_mock_execute_and_eventually()
mock_execute.assert_called_once_with(self.eventually[0])
class RenewalHookTest(HookTest):
"""Common base class for testing deploy/renew hooks."""
# Needed for https://github.com/PyCQA/pylint/issues/179
# pylint: disable=abstract-method
def _call_with_mock_execute(self, *args, **kwargs):
"""Calls self._call after mocking out certbot.hooks.execute.
The mock execute object is returned rather than the return value
of self._call. The mock execute object asserts that environment
variables were properly set.
"""
domains = kwargs["domains"] if "domains" in kwargs else args[1]
lineage = kwargs["lineage"] if "lineage" in kwargs else args[2]
def execute_side_effect(*unused_args, **unused_kwargs):
"""Assert environment variables are properly set.
:returns: two strings imitating no output from the hook
:rtype: `tuple` of `str`
"""
self.assertEqual(os.environ["RENEWED_DOMAINS"], " ".join(domains))
self.assertEqual(os.environ["RENEWED_LINEAGE"], lineage)
return ("", "")
with mock.patch("certbot.hooks.execute") as mock_execute:
mock_execute.side_effect = execute_side_effect
self._call(*args, **kwargs)
return mock_execute
def setUp(self):
super(RenewalHookTest, self).setUp()
self.vars_to_clear = set(
var for var in ("RENEWED_DOMAINS", "RENEWED_LINEAGE",)
if var not in os.environ)
def tearDown(self):
for var in self.vars_to_clear:
os.environ.pop(var, None)
super(RenewalHookTest, self).tearDown()
class DeployHookTest(RenewalHookTest):
"""Tests for certbot.hooks.deploy_hook."""
@classmethod
def _call(cls, *args, **kwargs):
from certbot.hooks import deploy_hook
return deploy_hook(*args, **kwargs)
@mock.patch("certbot.hooks.logger")
def test_dry_run(self, mock_logger):
self.config.deploy_hook = "foo"
self.config.dry_run = True
mock_execute = self._call_with_mock_execute(
self.config, ["example.org"], "/foo/bar")
self.assertFalse(mock_execute.called)
self.assertTrue(mock_logger.warning.called)
@mock.patch("certbot.hooks.logger")
def test_no_hook(self, mock_logger):
self.config.deploy_hook = None
mock_execute = self._call_with_mock_execute(
self.config, ["example.org"], "/foo/bar")
self.assertFalse(mock_execute.called)
self.assertFalse(mock_logger.info.called)
def test_success(self):
domains = ["example.org", "example.net"]
lineage = "/foo/bar"
self.config.deploy_hook = "foo"
mock_execute = self._call_with_mock_execute(
self.config, domains, lineage)
mock_execute.assert_called_once_with(self.config.deploy_hook)
class RenewHookTest(RenewalHookTest):
"""Tests for certbot.hooks.renew_hook"""
@classmethod
def _call(cls, *args, **kwargs):
from certbot.hooks import renew_hook
return renew_hook(*args, **kwargs)
def setUp(self):
super(RenewHookTest, self).setUp()
self.config.renew_hook = "foo"
os.makedirs(self.config.renewal_deploy_hooks_dir)
self.dir_hook = os.path.join(self.config.renewal_deploy_hooks_dir,
"bar")
create_hook(self.dir_hook)
def test_disabled_dir_hooks(self):
self.config.directory_hooks = False
mock_execute = self._call_with_mock_execute(
self.config, ["example.org"], "/foo/bar")
mock_execute.assert_called_once_with(self.config.renew_hook)
@mock.patch("certbot.hooks.logger")
def test_dry_run(self, mock_logger):
self.config.dry_run = True
mock_execute = self._call_with_mock_execute(
self.config, ["example.org"], "/foo/bar")
self.assertFalse(mock_execute.called)
self.assertEqual(mock_logger.warning.call_count, 2)
def test_no_hooks(self):
self.config.renew_hook = None
os.remove(self.dir_hook)
with mock.patch("certbot.hooks.logger") as mock_logger:
mock_execute = self._call_with_mock_execute(
self.config, ["example.org"], "/foo/bar")
self.assertFalse(mock_execute.called)
self.assertFalse(mock_logger.info.called)
def test_overlap(self):
self.config.renew_hook = self.dir_hook
mock_execute = self._call_with_mock_execute(
self.config, ["example.net", "example.org"], "/foo/bar")
mock_execute.assert_called_once_with(self.dir_hook)
def test_no_overlap(self):
mock_execute = self._call_with_mock_execute(
self.config, ["example.org"], "/foo/bar")
mock_execute.assert_any_call(self.dir_hook)
mock_execute.assert_called_with(self.config.renew_hook)
class ExecuteTest(unittest.TestCase):
"""Tests for certbot.hooks.execute."""
@classmethod
def _call(cls, *args, **kwargs):
from certbot.hooks import execute
return execute(*args, **kwargs)
def test_it(self):
for returncode in range(0, 2):
for stdout in ("", "Hello World!",):
for stderr in ("", "Goodbye Cruel World!"):
self._test_common(returncode, stdout, stderr)
def _test_common(self, returncode, stdout, stderr):
given_command = "foo"
with mock.patch("certbot.hooks.Popen") as mock_popen:
mock_popen.return_value.communicate.return_value = (stdout, stderr)
mock_popen.return_value.returncode = returncode
with mock.patch("certbot.hooks.logger") as mock_logger:
self.assertEqual(self._call(given_command), (stderr, stdout))
executed_command = mock_popen.call_args[1].get(
"args", mock_popen.call_args[0][0])
self.assertEqual(executed_command, given_command)
if stdout:
self.assertTrue(mock_logger.info.called)
if stderr or returncode:
self.assertTrue(mock_logger.error.called)
class ListHooksTest(util.TempDirTestCase):
"""Tests for certbot.hooks.list_hooks."""
@classmethod
def _call(cls, *args, **kwargs):
from certbot.hooks import list_hooks
return list_hooks(*args, **kwargs)
def test_empty(self):
self.assertFalse(self._call(self.tempdir))
def test_multiple(self):
names = sorted(
os.path.join(self.tempdir, basename)
for basename in ("foo", "bar", "baz", "qux")
)
for name in names:
create_hook(name)
self.assertEqual(self._call(self.tempdir), names)
def test_single(self):
name = os.path.join(self.tempdir, "foo")
create_hook(name)
self.assertEqual(self._call(self.tempdir), [name])
def create_hook(file_path):
"""Creates an executable file at the specified path.
:param str file_path: path to create the file at
"""
open(file_path, "w").close()
os.chmod(file_path, os.stat(file_path).st_mode | stat.S_IXUSR)
if __name__ == '__main__':

View file

@ -1201,5 +1201,27 @@ class UnregisterTest(unittest.TestCase):
self.assertFalse(cb_client.acme.deactivate_registration.called)
class MakeOrVerifyNeededDirs(test_util.ConfigTestCase):
"""Tests for certbot.main.make_or_verify_needed_dirs."""
@mock.patch("certbot.main.util")
def test_it(self, mock_util):
main.make_or_verify_needed_dirs(self.config)
for core_dir in (self.config.config_dir, self.config.work_dir,):
mock_util.set_up_core_dir.assert_any_call(
core_dir, constants.CONFIG_DIRS_MODE,
os.geteuid(), self.config.strict_permissions
)
hook_dirs = (self.config.renewal_pre_hooks_dir,
self.config.renewal_deploy_hooks_dir,
self.config.renewal_post_hooks_dir,)
for hook_dir in hook_dirs:
# default mode of 755 is used
mock_util.make_or_verify_dir.assert_any_call(
hook_dir, uid=os.geteuid(),
strict=self.config.strict_permissions)
if __name__ == '__main__':
unittest.main() # pragma: no cover

View file

@ -91,6 +91,18 @@ def run_script(params, log=logger.error):
return stdout, stderr
def is_exe(path):
"""Is path an executable file?
:param str path: path to test
:returns: True iff path is an executable file
:rtype: bool
"""
return os.path.isfile(path) and os.access(path, os.X_OK)
def exe_exists(exe):
"""Determine whether path/name refers to an executable.
@ -100,10 +112,6 @@ def exe_exists(exe):
:rtype: bool
"""
def is_exe(path):
"""Determine if path is an exe."""
return os.path.isfile(path) and os.access(path, os.X_OK)
path, _ = os.path.split(exe)
if path:
return is_exe(exe)

View file

@ -486,6 +486,25 @@ apply appropriate file permissions.
esac
done
You can also specify hooks by placing files in subdirectories of Certbot's
configuration directory. Assuming your configuration directory is
``/etc/letsencrypt``, any executable files found in
``/etc/letsencrypt/renewal-hooks/pre``,
``/etc/letsencrypt/renewal-hooks/deploy``, and
``/etc/letsencrypt/renewal-hooks/post`` will be run as pre, deploy, and post
hooks respectively when any certificate is renewed with the ``renew``
subcommand. These hooks are run in alphabetical order and are not run for other
subcommands. (The order the hooks are run is determined by the byte value of
the characters in their filenames and is not dependent on your locale.)
Hooks specified in the command line, :ref:`configuration file
<config-file>`, or :ref:`renewal configuration files <renewal-config-file>` are
run as usual after running all hooks in these directories. One minor exception
to this is if a hook specified elsewhere is simply the path to an executable
file in the hook directory of the same type (e.g. your pre-hook is the path to
an executable in ``/etc/letsencrypt/renewal-hooks/pre``), the file is not run a
second time.
More information about hooks can be found by running
``certbot --help renew``.
@ -542,6 +561,8 @@ commands into your individual environment.
you will need to use the ``--post-hook`` since the exit status will be 0 both on successful renewal
and when renewal is not necessary.
.. _renewal-config-file:
Modifying the Renewal Configuration File
----------------------------------------

View file

@ -28,11 +28,66 @@ cleanup_and_exit() {
: "------------------ ------------------ ------------------"
: "------------------ end boulder logs ------------------"
: "------------------ ------------------ ------------------"
if [ -f "$HOOK_DIRS_TEST" ]; then
rm -f "$HOOK_DIRS_TEST"
fi
exit $EXIT_STATUS
}
trap cleanup_and_exit EXIT
export HOOK_DIRS_TEST="$(mktemp)"
renewal_hooks_root="$config_dir/renewal-hooks"
renewal_hooks_dirs=$(echo "$renewal_hooks_root/"{pre,deploy,post})
renewal_dir_pre_hook="$(echo $renewal_hooks_dirs | cut -f 1 -d " ")/hook.sh"
renewal_dir_deploy_hook="$(echo $renewal_hooks_dirs | cut -f 2 -d " ")/hook.sh"
renewal_dir_post_hook="$(echo $renewal_hooks_dirs | cut -f 3 -d " ")/hook.sh"
# Creates hooks in Certbot's renewal hook directory that write to a file
CreateDirHooks() {
for hook_dir in $renewal_hooks_dirs; do
mkdir -p $hook_dir
hook_path="$hook_dir/hook.sh"
cat << EOF > "$hook_path"
#!/bin/bash -xe
if [ "\$0" = "$renewal_dir_deploy_hook" ]; then
if [ -z "\$RENEWED_DOMAINS" -o -z "\$RENEWED_LINEAGE" ]; then
echo "Environment variables not properly set!" >&2
exit 1
fi
fi
echo \$(basename \$(dirname "\$0")) >> "\$HOOK_DIRS_TEST"
EOF
chmod +x "$hook_path"
done
}
# Asserts that the hooks created by CreateDirHooks have been run once and
# resets the file.
#
# Arguments:
# The number of times the deploy hook should have been run. (It should run
# once for each certificate that was issued in that run of Certbot.)
CheckDirHooks() {
expected="pre\n"
for ((i=0; i<$1; i++)); do
expected=$expected"deploy\n"
done
expected=$expected"post"
if ! diff "$HOOK_DIRS_TEST" <(echo -e "$expected"); then
echo "Unexpected directory hook output!" >&2
echo "Expected:" >&2
echo -e "$expected" >&2
echo "Got:" >&2
cat "$HOOK_DIRS_TEST" >&2
exit 1
fi
rm -f "$HOOK_DIRS_TEST"
export HOOK_DIRS_TEST="$(mktemp)"
}
common_no_force_renew() {
certbot_test_no_force_renew \
--authenticator standalone \
@ -135,8 +190,15 @@ if [ $(get_num_tmp_files) -ne $num_tmp_files ]; then
echo "New files or directories created in /tmp!"
exit 1
fi
CreateDirHooks
common register
for dir in $renewal_hooks_dirs; do
if [ ! -d "$dir" ]; then
echo "Hook directory not created by Certbot!" >&2
exit 1
fi
done
common register --update-registration --email example@example.org
common plugins --init --prepare | grep webroot
@ -211,19 +273,36 @@ CheckCertCount "le.wtf" 1
# This won't renew (because it's not time yet)
common_no_force_renew renew
CheckCertCount "le.wtf" 1
if [ -s "$HOOK_DIRS_TEST" ]; then
echo "Directory hooks were executed for non-renewal!" >&2;
exit 1
fi
rm -rf "$renewal_hooks_root"
# renew using HTTP manual auth hooks
common renew --cert-name le.wtf --authenticator manual
CheckCertCount "le.wtf" 2
# test renewal with no executables in hook directories
for hook_dir in $renewal_hooks_dirs; do
touch "$hook_dir/file"
mkdir "$hook_dir/dir"
done
# renew using DNS manual auth hooks
common renew --cert-name dns.le.wtf --authenticator manual
CheckCertCount "dns.le.wtf" 2
# test with disabled directory hooks
rm -rf "$renewal_hooks_root"
CreateDirHooks
# This will renew because the expiry is less than 10 years from now
sed -i "4arenew_before_expiry = 4 years" "$root/conf/renewal/le.wtf.conf"
common_no_force_renew renew --rsa-key-size 2048
common_no_force_renew renew --rsa-key-size 2048 --no-directory-hooks
CheckCertCount "le.wtf" 3
if [ -s "$HOOK_DIRS_TEST" ]; then
echo "Directory hooks were executed with --no-directory-hooks!" >&2
exit 1
fi
# The 4096 bit setting should persist to the first renewal, but be overridden in the second
@ -242,6 +321,18 @@ fi
common renew
CheckCertCount "le.wtf" 4
CheckHooks
CheckDirHooks 5
# test with overlapping directory hooks on the command line
common renew --cert-name le2.wtf \
--pre-hook "$renewal_dir_pre_hook" \
--renew-hook "$renewal_dir_deploy_hook" \
--post-hook "$renewal_dir_post_hook"
CheckDirHooks 1
# test with overlapping directory hooks in the renewal conf files
common renew --cert-name le2.wtf
CheckDirHooks 1
# ECDSA
openssl ecparam -genkey -name secp384r1 -out "${root}/privkey-p384.pem"