mirror of
https://github.com/certbot/certbot.git
synced 2026-04-26 08:39:19 -04:00
Merge pull request #2729 from letsencrypt/renew-hooks
Implement renew hooks
This commit is contained in:
commit
3d260485cd
9 changed files with 348 additions and 77 deletions
|
|
@ -18,6 +18,7 @@ import letsencrypt
|
|||
from letsencrypt import constants
|
||||
from letsencrypt import crypto_util
|
||||
from letsencrypt import errors
|
||||
from letsencrypt import hooks
|
||||
from letsencrypt import interfaces
|
||||
from letsencrypt import le_util
|
||||
|
||||
|
|
@ -306,6 +307,8 @@ class HelpfulArgumentParser(object):
|
|||
if self.detect_defaults: # plumbing
|
||||
parsed_args.store_false_vars = self.store_false_vars
|
||||
|
||||
hooks.validate_hooks(parsed_args)
|
||||
|
||||
return parsed_args
|
||||
|
||||
def handle_csr(self, parsed_args):
|
||||
|
|
@ -555,7 +558,14 @@ def prepare_and_parse_args(plugins, args, detect_defaults=False):
|
|||
None, "--dry-run", action="store_true", dest="dry_run",
|
||||
help="Perform a test run of the client, obtaining test (invalid) certs"
|
||||
" but not saving them to disk. This can currently only be used"
|
||||
" with the 'certonly' subcommand.")
|
||||
" with the 'certonly' and 'renew' subcommands. \nNote: Although --dry-run"
|
||||
" tries to avoid making any persistent changes on a system, it "
|
||||
" is not completely side-effect free: if used with webserver authenticator plugins"
|
||||
" like apache and nginx, it makes and then reverts temporary config changes"
|
||||
" in order to obtain test certs, and reloads webservers to deploy and then"
|
||||
" roll back those changes. It also calls --pre-hook and --post-hook commands"
|
||||
" if they are defined because they may be necessary to accurately simulate"
|
||||
" renewal. --renew-hook commands are not called.")
|
||||
helpful.add(
|
||||
None, "--register-unsafely-without-email", action="store_true",
|
||||
help="Specifying this flag enables registering an account with no "
|
||||
|
|
@ -692,7 +702,26 @@ def prepare_and_parse_args(plugins, args, detect_defaults=False):
|
|||
" used to create obtain or most recently successfully renew each"
|
||||
" certificate lineage. You can try it with `--dry-run` first. For"
|
||||
" more fine-grained control, you can renew individual lineages with"
|
||||
" the `certonly` subcommand.")
|
||||
" the `certonly` subcommand. Hooks are available to run commands "
|
||||
" before and after renewal; see XXX for more information on these.")
|
||||
|
||||
helpful.add(
|
||||
"renew", "--pre-hook",
|
||||
help="Command to be run in a shell before obtaining any certificates. Intended"
|
||||
" primarily for renewal, where it can be used to temporarily shut down a"
|
||||
" webserver that might conflict with the standalone plugin. This will "
|
||||
" only be called if a certificate is actually to be obtained/renewed. ")
|
||||
helpful.add(
|
||||
"renew", "--post-hook",
|
||||
help="Command to be run in a shell after attempting to obtain/renew "
|
||||
" certificates. Can be used to deploy renewed certificates, or to restart"
|
||||
" any servers that were stopped by --pre-hook.")
|
||||
helpful.add(
|
||||
"renew", "--renew-hook",
|
||||
help="Command to be run in a shell once for each successfully renewed certificate."
|
||||
"For this command, the shell variable $RENEWED_LINEAGE will point to the"
|
||||
"config live subdirectory containing the new certs and keys; the shell variable "
|
||||
"$RENEWED_DOMAINS will conatain a space-delimited list of renewed cert domains")
|
||||
|
||||
helpful.add_deprecated_argument("--agree-dev-preview", 0)
|
||||
|
||||
|
|
|
|||
|
|
@ -25,6 +25,10 @@ class CertStorageError(Error):
|
|||
"""Generic `.CertStorage` error."""
|
||||
|
||||
|
||||
class HookCommandNotFound(Error):
|
||||
"""Failed to find a hook command in the PATH."""
|
||||
|
||||
|
||||
# Auth Handler Errors
|
||||
class AuthorizationError(Error):
|
||||
"""Authorization error."""
|
||||
|
|
|
|||
98
letsencrypt/hooks.py
Normal file
98
letsencrypt/hooks.py
Normal file
|
|
@ -0,0 +1,98 @@
|
|||
"""Facilities for implementing hooks that call shell commands."""
|
||||
from __future__ import print_function
|
||||
|
||||
import logging
|
||||
import os
|
||||
|
||||
from subprocess import Popen, PIPE
|
||||
|
||||
from letsencrypt import errors
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def validate_hooks(config):
|
||||
"""Check hook commands are executable."""
|
||||
_validate_hook(config.pre_hook, "pre")
|
||||
_validate_hook(config.post_hook, "post")
|
||||
_validate_hook(config.renew_hook, "renew")
|
||||
|
||||
def _prog(shell_cmd):
|
||||
"""Extract the program run by a shell command"""
|
||||
cmd = _which(shell_cmd)
|
||||
return os.path.basename(cmd) if cmd else None
|
||||
|
||||
def _validate_hook(shell_cmd, hook_name):
|
||||
"""Check that a command provided as a hook is plausibly executable.
|
||||
|
||||
:raises .errors.HookCommandNotFound: if the command is not found
|
||||
"""
|
||||
if shell_cmd:
|
||||
cmd = shell_cmd.partition(" ")[0]
|
||||
if not _prog(cmd):
|
||||
path = os.environ["PATH"]
|
||||
msg = "Unable to find {2}-hook command {0} in the PATH.\n(PATH is {1})".format(
|
||||
cmd, path, hook_name)
|
||||
raise errors.HookCommandNotFound(msg)
|
||||
|
||||
def pre_hook(config):
|
||||
"Run pre-hook if it's defined and hasn't been run."
|
||||
if config.pre_hook and not pre_hook.already:
|
||||
logger.info("Running pre-hook command: %s", config.pre_hook)
|
||||
_run_hook(config.pre_hook)
|
||||
pre_hook.already = True
|
||||
|
||||
pre_hook.already = False
|
||||
|
||||
def post_hook(config, final=False):
|
||||
"""Run post hook if defined.
|
||||
|
||||
If the verb is renew, we might have more certs to renew, so we wait until
|
||||
we're called with final=True before actually doing anything.
|
||||
"""
|
||||
if config.post_hook:
|
||||
if final or config.verb != "renew":
|
||||
logger.info("Running post-hook command: %s", config.post_hook)
|
||||
_run_hook(config.post_hook)
|
||||
|
||||
def renew_hook(config, domains, lineage_path):
|
||||
"Run post-renewal hook if defined."
|
||||
if config.renew_hook:
|
||||
if not config.dry_run:
|
||||
os.environ["RENEWED_DOMAINS"] = " ".join(domains)
|
||||
os.environ["RENEWED_LINEAGE"] = lineage_path
|
||||
_run_hook(config.renew_hook)
|
||||
else:
|
||||
print("Dry run: skipping renewal hook command: {0}".format(config.renew_hook))
|
||||
|
||||
def _run_hook(shell_cmd):
|
||||
"""Run a hook command.
|
||||
|
||||
:returns: stderr if there was any"""
|
||||
|
||||
cmd = Popen(shell_cmd, shell=True, stdout=PIPE, stderr=PIPE, stdin=PIPE)
|
||||
_out, err = cmd.communicate()
|
||||
if cmd.returncode != 0:
|
||||
logger.error('Hook command "%s" returned error code %d', shell_cmd, cmd.returncode)
|
||||
if err:
|
||||
logger.error('Error output from %s:\n%s', _prog(shell_cmd), err)
|
||||
|
||||
def _is_exe(fpath):
|
||||
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
|
||||
|
||||
def _which(program):
|
||||
"""Test if program is in the path."""
|
||||
# Borrowed from:
|
||||
# https://stackoverflow.com/questions/377017/test-if-executable-exists-in-python
|
||||
# XXX May need more porting to handle .exe extensions on Windows
|
||||
|
||||
fpath, _fname = os.path.split(program)
|
||||
if fpath:
|
||||
if _is_exe(program):
|
||||
return program
|
||||
else:
|
||||
for path in os.environ["PATH"].split(os.pathsep):
|
||||
exe_file = os.path.join(path, program)
|
||||
if _is_exe(exe_file):
|
||||
return exe_file
|
||||
|
||||
return None
|
||||
|
|
@ -8,7 +8,6 @@ import sys
|
|||
import time
|
||||
import traceback
|
||||
|
||||
import OpenSSL
|
||||
import zope.component
|
||||
|
||||
from acme import jose
|
||||
|
|
@ -23,6 +22,7 @@ from letsencrypt import colored_logging
|
|||
from letsencrypt import configuration
|
||||
from letsencrypt import constants
|
||||
from letsencrypt import errors
|
||||
from letsencrypt import hooks
|
||||
from letsencrypt import interfaces
|
||||
from letsencrypt import le_util
|
||||
from letsencrypt import log
|
||||
|
|
@ -52,28 +52,6 @@ def _suggest_donation_if_appropriate(config, action):
|
|||
reporter_util.add_message(msg, reporter_util.LOW_PRIORITY)
|
||||
|
||||
|
||||
def _avoid_invalidating_lineage(config, lineage, original_server):
|
||||
"Do not renew a valid cert with one from a staging server!"
|
||||
def _is_staging(srv):
|
||||
return srv == constants.STAGING_URI or "staging" in srv
|
||||
|
||||
# Some lineages may have begun with --staging, but then had production certs
|
||||
# added to them
|
||||
latest_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
|
||||
open(lineage.cert).read())
|
||||
# all our test certs are from happy hacker fake CA, though maybe one day
|
||||
# we should test more methodically
|
||||
now_valid = "fake" not in repr(latest_cert.get_issuer()).lower()
|
||||
|
||||
if _is_staging(config.server):
|
||||
if not _is_staging(original_server) or now_valid:
|
||||
if not config.break_my_certs:
|
||||
names = ", ".join(lineage.names())
|
||||
raise errors.Error(
|
||||
"You've asked to renew/replace a seemingly valid certificate with "
|
||||
"a test certificate (domains: {0}). We will not do that "
|
||||
"unless you use the --break-my-certs flag!".format(names))
|
||||
|
||||
|
||||
def _report_successful_dry_run(config):
|
||||
reporter_util = zope.component.getUtility(interfaces.IReporter)
|
||||
|
|
@ -82,6 +60,7 @@ def _report_successful_dry_run(config):
|
|||
reporter_util.HIGH_PRIORITY, on_crash=False)
|
||||
|
||||
|
||||
|
||||
def _auth_from_domains(le_client, config, domains, lineage=None):
|
||||
"""Authenticate and enroll certificate."""
|
||||
# Note: This can raise errors... caught above us though. This is now
|
||||
|
|
@ -105,31 +84,18 @@ def _auth_from_domains(le_client, config, domains, lineage=None):
|
|||
# The lineage already exists; allow the caller to try installing
|
||||
# it without getting a new certificate at all.
|
||||
return lineage, "reinstall"
|
||||
elif action == "renew":
|
||||
original_server = lineage.configuration["renewalparams"]["server"]
|
||||
_avoid_invalidating_lineage(config, lineage, original_server)
|
||||
# TODO: schoen wishes to reuse key - discussion
|
||||
# https://github.com/letsencrypt/letsencrypt/pull/777/files#r40498574
|
||||
new_certr, new_chain, new_key, _ = le_client.obtain_certificate(domains)
|
||||
# TODO: Check whether it worked! <- or make sure errors are thrown (jdk)
|
||||
if config.dry_run:
|
||||
logger.info("Dry run: skipping updating lineage at %s",
|
||||
os.path.dirname(lineage.cert))
|
||||
else:
|
||||
lineage.save_successor(
|
||||
lineage.latest_common_version(), OpenSSL.crypto.dump_certificate(
|
||||
OpenSSL.crypto.FILETYPE_PEM, new_certr.body.wrapped),
|
||||
new_key.pem, crypto_util.dump_pyopenssl_chain(new_chain),
|
||||
configuration.RenewerConfiguration(config.namespace))
|
||||
lineage.update_all_links_to(lineage.latest_common_version())
|
||||
# TODO: Check return value of save_successor
|
||||
# TODO: Also update lineage renewal config with any relevant
|
||||
# configuration values from this attempt? <- Absolutely (jdkasten)
|
||||
elif action == "newcert":
|
||||
# TREAT AS NEW REQUEST
|
||||
lineage = le_client.obtain_and_enroll_certificate(domains)
|
||||
if lineage is False:
|
||||
raise errors.Error("Certificate could not be obtained")
|
||||
|
||||
hooks.pre_hook(config)
|
||||
try:
|
||||
if action == "renew":
|
||||
renewal.renew_cert(config, domains, le_client, lineage)
|
||||
elif action == "newcert":
|
||||
# TREAT AS NEW REQUEST
|
||||
lineage = le_client.obtain_and_enroll_certificate(domains)
|
||||
if lineage is False:
|
||||
raise errors.Error("Certificate could not be obtained")
|
||||
finally:
|
||||
hooks.post_hook(config)
|
||||
|
||||
if not config.dry_run and not config.verb == "renew":
|
||||
_report_new_cert(lineage.cert, lineage.fullchain)
|
||||
|
|
@ -142,7 +108,8 @@ def _handle_subset_cert_request(config, domains, cert):
|
|||
|
||||
:param storage.RenewableCert cert:
|
||||
|
||||
:returns: Tuple of (string, cert_or_None) as per _treat_as_renewal
|
||||
:returns: Tuple of (str action, cert_or_None) as per _treat_as_renewal
|
||||
action can be: "newcert" | "renew" | "reinstall"
|
||||
:rtype: tuple
|
||||
|
||||
"""
|
||||
|
|
@ -183,7 +150,8 @@ def _handle_identical_cert_request(config, cert):
|
|||
|
||||
:param storage.RenewableCert cert:
|
||||
|
||||
:returns: Tuple of (string, cert_or_None) as per _treat_as_renewal
|
||||
:returns: Tuple of (str action, cert_or_None) as per _treat_as_renewal
|
||||
action can be: "newcert" | "renew" | "reinstall"
|
||||
:rtype: tuple
|
||||
|
||||
"""
|
||||
|
|
@ -507,41 +475,53 @@ def run(config, plugins): # pylint: disable=too-many-branches,too-many-locals
|
|||
_suggest_donation_if_appropriate(config, action)
|
||||
|
||||
|
||||
def _csr_obtain_cert(config, le_client):
|
||||
"""Obtain a cert using a user-supplied CSR
|
||||
|
||||
This works differently in the CSR case (for now) because we don't
|
||||
have the privkey, and therefore can't construct the files for a lineage.
|
||||
So we just save the cert & chain to disk :/
|
||||
"""
|
||||
csr, typ = config.actual_csr
|
||||
certr, chain = le_client.obtain_certificate_from_csr(config.domains, csr, typ)
|
||||
if config.dry_run:
|
||||
logger.info(
|
||||
"Dry run: skipping saving certificate to %s", config.cert_path)
|
||||
else:
|
||||
cert_path, _, cert_fullchain = le_client.save_certificate(
|
||||
certr, chain, config.cert_path, config.chain_path, config.fullchain_path)
|
||||
_report_new_cert(cert_path, cert_fullchain)
|
||||
|
||||
|
||||
def obtain_cert(config, plugins, lineage=None):
|
||||
"""Implements "certonly": authenticate & obtain cert, but do not install it."""
|
||||
# pylint: disable=too-many-locals
|
||||
"""Authenticate & obtain cert, but do not install it.
|
||||
|
||||
This implements the 'certonly' subcommand, and is also called from within the
|
||||
'renew' command."""
|
||||
|
||||
# SETUP: Select plugins and construct a client instance
|
||||
try:
|
||||
# installers are used in auth mode to determine domain names
|
||||
installer, authenticator = plug_sel.choose_configurator_plugins(config, plugins, "certonly")
|
||||
installer, auth = plug_sel.choose_configurator_plugins(config, plugins, "certonly")
|
||||
except errors.PluginSelectionError as e:
|
||||
logger.info("Could not choose appropriate plugin: %s", e)
|
||||
raise
|
||||
le_client = _init_le_client(config, auth, installer)
|
||||
|
||||
# TODO: Handle errors from _init_le_client?
|
||||
le_client = _init_le_client(config, authenticator, installer)
|
||||
|
||||
action = "newcert"
|
||||
# This is a special case; cert and chain are simply saved
|
||||
if config.csr is not None:
|
||||
assert lineage is None, "Did not expect a CSR with a RenewableCert"
|
||||
csr, typ = config.actual_csr
|
||||
certr, chain = le_client.obtain_certificate_from_csr(config.domains, csr, typ)
|
||||
if config.dry_run:
|
||||
logger.info(
|
||||
"Dry run: skipping saving certificate to %s", config.cert_path)
|
||||
else:
|
||||
cert_path, _, cert_fullchain = le_client.save_certificate(
|
||||
certr, chain, config.cert_path, config.chain_path, config.fullchain_path)
|
||||
_report_new_cert(cert_path, cert_fullchain)
|
||||
else:
|
||||
# SHOWTIME: Possibly obtain/renew a cert, and set action to renew | newcert | reinstall
|
||||
if config.csr is None: # the common case
|
||||
domains = _find_domains(config, installer)
|
||||
_, action = _auth_from_domains(le_client, config, domains, lineage)
|
||||
else:
|
||||
assert lineage is None, "Did not expect a CSR with a RenewableCert"
|
||||
_csr_obtain_cert(config, le_client)
|
||||
action = "newcert"
|
||||
|
||||
# POSTPRODUCTION: Cleanup, deployment & reporting
|
||||
if config.dry_run:
|
||||
_report_successful_dry_run(config)
|
||||
elif config.verb == "renew":
|
||||
if installer is None:
|
||||
# Tell the user that the server was not restarted.
|
||||
print("new certificate deployed without reload, fullchain is",
|
||||
lineage.fullchain)
|
||||
else:
|
||||
|
|
@ -553,10 +533,13 @@ def obtain_cert(config, plugins, lineage=None):
|
|||
config.installer, "server; fullchain is", lineage.fullchain)
|
||||
_suggest_donation_if_appropriate(config, action)
|
||||
|
||||
|
||||
def renew(config, unused_plugins):
|
||||
"""Renew previously-obtained certificates."""
|
||||
renewal.renew_all_lineages(config)
|
||||
|
||||
try:
|
||||
renewal.renew_all_lineages(config)
|
||||
finally:
|
||||
hooks.post_hook(config, final=True)
|
||||
|
||||
|
||||
def setup_log_file_handler(config, logfile, fmt):
|
||||
|
|
|
|||
|
|
@ -9,9 +9,15 @@ import traceback
|
|||
import six
|
||||
import zope.component
|
||||
|
||||
import OpenSSL
|
||||
|
||||
from letsencrypt import configuration
|
||||
from letsencrypt import cli
|
||||
from letsencrypt import constants
|
||||
|
||||
from letsencrypt import crypto_util
|
||||
from letsencrypt import errors
|
||||
from letsencrypt import hooks
|
||||
from letsencrypt import storage
|
||||
from letsencrypt.plugins import disco as plugins_disco
|
||||
|
||||
|
|
@ -199,6 +205,50 @@ def should_renew(config, lineage):
|
|||
return False
|
||||
|
||||
|
||||
def _avoid_invalidating_lineage(config, lineage, original_server):
|
||||
"Do not renew a valid cert with one from a staging server!"
|
||||
def _is_staging(srv):
|
||||
return srv == constants.STAGING_URI or "staging" in srv
|
||||
|
||||
# Some lineages may have begun with --staging, but then had production certs
|
||||
# added to them
|
||||
latest_cert = OpenSSL.crypto.load_certificate(
|
||||
OpenSSL.crypto.FILETYPE_PEM, open(lineage.cert).read())
|
||||
# all our test certs are from happy hacker fake CA, though maybe one day
|
||||
# we should test more methodically
|
||||
now_valid = "fake" not in repr(latest_cert.get_issuer()).lower()
|
||||
|
||||
if _is_staging(config.server):
|
||||
if not _is_staging(original_server) or now_valid:
|
||||
if not config.break_my_certs:
|
||||
names = ", ".join(lineage.names())
|
||||
raise errors.Error(
|
||||
"You've asked to renew/replace a seemingly valid certificate with "
|
||||
"a test certificate (domains: {0}). We will not do that "
|
||||
"unless you use the --break-my-certs flag!".format(names))
|
||||
|
||||
|
||||
def renew_cert(config, domains, le_client, lineage):
|
||||
"Renew a certificate lineage."
|
||||
original_server = lineage.configuration["renewalparams"]["server"]
|
||||
_avoid_invalidating_lineage(config, lineage, original_server)
|
||||
new_certr, new_chain, new_key, _ = le_client.obtain_certificate(domains)
|
||||
if config.dry_run:
|
||||
logger.info("Dry run: skipping updating lineage at %s",
|
||||
os.path.dirname(lineage.cert))
|
||||
else:
|
||||
prior_version = lineage.latest_common_version()
|
||||
new_cert = OpenSSL.crypto.dump_certificate(
|
||||
OpenSSL.crypto.FILETYPE_PEM, new_certr.body.wrapped)
|
||||
new_chain = crypto_util.dump_pyopenssl_chain(new_chain)
|
||||
renewal_conf = configuration.RenewerConfiguration(config.namespace)
|
||||
lineage.save_successor(prior_version, new_cert, new_key.pem, new_chain, renewal_conf)
|
||||
lineage.update_all_links_to(lineage.latest_common_version())
|
||||
|
||||
hooks.renew_hook(config, domains, lineage.live_dir)
|
||||
# TODO: Check return value of save_successor
|
||||
|
||||
|
||||
def _renew_describe_results(config, renew_successes, renew_failures,
|
||||
renew_skipped, parse_failures):
|
||||
def _status(msgs, category):
|
||||
|
|
|
|||
|
|
@ -252,6 +252,7 @@ class RenewableCert(object): # pylint: disable=too-many-instance-attributes
|
|||
self.privkey = self.configuration["privkey"]
|
||||
self.chain = self.configuration["chain"]
|
||||
self.fullchain = self.configuration["fullchain"]
|
||||
self.live_dir = os.path.dirname(self.cert)
|
||||
|
||||
self._fix_symlinks()
|
||||
self._check_symlinks()
|
||||
|
|
|
|||
|
|
@ -579,11 +579,11 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
|
|||
mock_init.return_value = mock_client
|
||||
get_utility_path = 'letsencrypt.main.zope.component.getUtility'
|
||||
with mock.patch(get_utility_path) as mock_get_utility:
|
||||
with mock.patch('letsencrypt.main.OpenSSL') as mock_ssl:
|
||||
with mock.patch('letsencrypt.main.renewal.OpenSSL') as mock_ssl:
|
||||
mock_latest = mock.MagicMock()
|
||||
mock_latest.get_issuer.return_value = "Fake fake"
|
||||
mock_ssl.crypto.load_certificate.return_value = mock_latest
|
||||
with mock.patch('letsencrypt.main.crypto_util'):
|
||||
with mock.patch('letsencrypt.main.renewal.crypto_util'):
|
||||
if not args:
|
||||
args = ['-d', 'isnot.org', '-a', 'standalone', 'certonly']
|
||||
if extra_args:
|
||||
|
|
|
|||
105
letsencrypt/tests/hook_test.py
Normal file
105
letsencrypt/tests/hook_test.py
Normal file
|
|
@ -0,0 +1,105 @@
|
|||
"""Tests for hooks.py"""
|
||||
# pylint: disable=protected-access
|
||||
|
||||
import os
|
||||
import unittest
|
||||
import sys
|
||||
|
||||
import mock
|
||||
|
||||
from letsencrypt import errors
|
||||
from letsencrypt import hooks
|
||||
|
||||
class HookTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
pass
|
||||
|
||||
def tearDown(self):
|
||||
pass
|
||||
|
||||
@mock.patch('letsencrypt.hooks._prog')
|
||||
def test_validate_hooks(self, mock_prog):
|
||||
config = mock.MagicMock(pre_hook="", post_hook="ls -lR", renew_hook="uptime")
|
||||
hooks.validate_hooks(config)
|
||||
self.assertEqual(mock_prog.call_count, 2)
|
||||
self.assertEqual(mock_prog.call_args_list[1][0][0], 'uptime')
|
||||
self.assertEqual(mock_prog.call_args_list[0][0][0], 'ls')
|
||||
mock_prog.return_value = None
|
||||
config = mock.MagicMock(pre_hook="explodinator", post_hook="", renew_hook="")
|
||||
self.assertRaises(errors.HookCommandNotFound, hooks.validate_hooks, config)
|
||||
|
||||
@mock.patch('letsencrypt.hooks._is_exe')
|
||||
def test_which(self, mock_is_exe):
|
||||
mock_is_exe.return_value = True
|
||||
self.assertEqual(hooks._which("/path/to/something"), "/path/to/something")
|
||||
|
||||
with mock.patch.dict('os.environ', {"PATH": "/floop:/fleep"}):
|
||||
mock_is_exe.return_value = True
|
||||
self.assertEqual(hooks._which("pingify"), "/floop/pingify")
|
||||
mock_is_exe.return_value = False
|
||||
self.assertEqual(hooks._which("pingify"), None)
|
||||
self.assertEqual(hooks._which("/path/to/something"), None)
|
||||
|
||||
@mock.patch('letsencrypt.hooks._which')
|
||||
def test_prog(self, mockwhich):
|
||||
mockwhich.return_value = "/very/very/funky"
|
||||
self.assertEqual(hooks._prog("funky"), "funky")
|
||||
mockwhich.return_value = None
|
||||
self.assertEqual(hooks._prog("funky"), None)
|
||||
|
||||
def _test_a_hook(self, config, hook_function, calls_expected):
|
||||
with mock.patch('letsencrypt.hooks.logger'):
|
||||
with mock.patch('letsencrypt.hooks._run_hook') as mock_run_hook:
|
||||
hook_function(config)
|
||||
hook_function(config)
|
||||
self.assertEqual(mock_run_hook.call_count, calls_expected)
|
||||
|
||||
def test_pre_hook(self):
|
||||
config = mock.MagicMock(pre_hook="true")
|
||||
self._test_a_hook(config, hooks.pre_hook, 1)
|
||||
config = mock.MagicMock(pre_hook="")
|
||||
self._test_a_hook(config, hooks.pre_hook, 0)
|
||||
|
||||
def test_post_hook(self):
|
||||
config = mock.MagicMock(post_hook="true", verb="splonk")
|
||||
self._test_a_hook(config, hooks.post_hook, 2)
|
||||
config = mock.MagicMock(post_hook="true", verb="renew")
|
||||
self._test_a_hook(config, hooks.post_hook, 0)
|
||||
|
||||
def test_renew_hook(self):
|
||||
with mock.patch.dict('os.environ', {}):
|
||||
domains = ["a", "b"]
|
||||
lineage = "thing"
|
||||
rhook = lambda x: hooks.renew_hook(x, domains, lineage)
|
||||
|
||||
config = mock.MagicMock(renew_hook="true", dry_run=False)
|
||||
self._test_a_hook(config, rhook, 2)
|
||||
self.assertEqual(os.environ["RENEWED_DOMAINS"], "a b")
|
||||
self.assertEqual(os.environ["RENEWED_LINEAGE"], "thing")
|
||||
|
||||
config = mock.MagicMock(renew_hook="true", dry_run=True)
|
||||
if sys.version_info < (2, 7):
|
||||
# the print() function is not mockable in py26
|
||||
self._test_a_hook(config, rhook, 0)
|
||||
else:
|
||||
with mock.patch("letsencrypt.hooks.print") as mock_print:
|
||||
self._test_a_hook(config, rhook, 0)
|
||||
self.assertEqual(mock_print.call_count, 2)
|
||||
|
||||
@mock.patch('letsencrypt.hooks.Popen')
|
||||
def test_run_hook(self, mock_popen):
|
||||
with mock.patch('letsencrypt.hooks.logger.error') as mock_error:
|
||||
mock_cmd = mock.MagicMock()
|
||||
mock_cmd.returncode = 1
|
||||
mock_cmd.communicate.return_value = ("", "")
|
||||
mock_popen.return_value = mock_cmd
|
||||
hooks._run_hook("ls")
|
||||
self.assertEqual(mock_error.call_count, 1)
|
||||
with mock.patch('letsencrypt.hooks.logger.error') as mock_error:
|
||||
mock_cmd.communicate.return_value = ("", "thing")
|
||||
hooks._run_hook("ls")
|
||||
self.assertEqual(mock_error.call_count, 2)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main() # pragma: no cover
|
||||
|
|
@ -1,4 +1,5 @@
|
|||
"""Tests for letsencrypt.storage."""
|
||||
# pylint disable=protected-access
|
||||
import datetime
|
||||
import os
|
||||
import shutil
|
||||
|
|
|
|||
Loading…
Reference in a new issue