mirror of
https://github.com/certbot/certbot.git
synced 2026-05-28 04:34:11 -04:00
* Start of combined manual/script plugin * Return str from hooks.execute, not bytes * finish manual/script rewrite * delete old manual and script plugins * manually specify we want chall.token * use consistent quotes * specify chall for uri * s/script/hook * fix spacing on instructions * remove unneeded response argument * make achall more helpful * simplify perform * remove old test files * add start of manual_tests * fix ParseTest.test_help * stop using manual_test_mode in cli tests * Revert "make achall more helpful" This reverts commit54b01cea30. * use bad response/validation methods on achalls * simplify perform and cleanup environment * finish manual tests * Add HTTP manual hook integration test * add manual http scripts * Add manual DNS script integration test * remove references to the script plugin * they're hooks, not scripts * add --manual-public-ip-logging-ok to integration tests * use --pref-chall for dns integration * does dns work? * validate hooks * test hook validation * Revert "does dns work?" This reverts commit1224cc2961. * busy wait in manual-http-auth * remove DNS script test for now * Fix challenge prefix and add trailing . * Add comment about universal_newlines * Fix typo from0464ba2c4* fix nits and typos * Generalize HookCOmmandNotFound error * Add verify_exe_exists * Don't duplicate code in hooks.py * Revert changes to hooks.py * Use consistent hook error messages
119 lines
3.7 KiB
Python
119 lines
3.7 KiB
Python
"""Facilities for implementing hooks that call shell commands."""
|
|
from __future__ import print_function
|
|
|
|
import logging
|
|
import os
|
|
|
|
from subprocess import Popen, PIPE
|
|
|
|
from certbot import errors
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def validate_hooks(config):
|
|
"""Check hook commands are executable."""
|
|
validate_hook(config.pre_hook, "pre")
|
|
validate_hook(config.post_hook, "post")
|
|
validate_hook(config.renew_hook, "renew")
|
|
|
|
def _prog(shell_cmd):
|
|
"""Extract the program run by a shell command"""
|
|
cmd = _which(shell_cmd)
|
|
return os.path.basename(cmd) if cmd else None
|
|
|
|
def validate_hook(shell_cmd, hook_name):
|
|
"""Check that a command provided as a hook is plausibly executable.
|
|
|
|
:raises .errors.HookCommandNotFound: if the command is not found
|
|
"""
|
|
if shell_cmd:
|
|
cmd = shell_cmd.split(None, 1)[0]
|
|
if not _prog(cmd):
|
|
path = os.environ["PATH"]
|
|
msg = "Unable to find {2}-hook command {0} in the PATH.\n(PATH is {1})".format(
|
|
cmd, path, hook_name)
|
|
raise errors.HookCommandNotFound(msg)
|
|
|
|
def pre_hook(config):
|
|
"Run pre-hook if it's defined and hasn't been run."
|
|
if config.pre_hook and not pre_hook.already:
|
|
logger.info("Running pre-hook command: %s", config.pre_hook)
|
|
_run_hook(config.pre_hook)
|
|
pre_hook.already = True
|
|
|
|
pre_hook.already = False
|
|
|
|
def post_hook(config, final=False):
|
|
"""Run post hook if defined.
|
|
|
|
If the verb is renew, we might have more certs to renew, so we wait until
|
|
we're called with final=True before actually doing anything.
|
|
"""
|
|
if config.post_hook:
|
|
if not pre_hook.already:
|
|
logger.info("No renewals attempted, so not running post-hook")
|
|
if config.verb != "renew":
|
|
logger.warning("Sanity failure in renewal hooks")
|
|
return
|
|
if final or config.verb != "renew":
|
|
logger.info("Running post-hook command: %s", config.post_hook)
|
|
_run_hook(config.post_hook)
|
|
|
|
def renew_hook(config, domains, lineage_path):
|
|
"Run post-renewal hook if defined."
|
|
if config.renew_hook:
|
|
if not config.dry_run:
|
|
os.environ["RENEWED_DOMAINS"] = " ".join(domains)
|
|
os.environ["RENEWED_LINEAGE"] = lineage_path
|
|
_run_hook(config.renew_hook)
|
|
else:
|
|
logger.warning("Dry run: skipping renewal hook command: %s", config.renew_hook)
|
|
|
|
|
|
def _run_hook(shell_cmd):
|
|
"""Run a hook command.
|
|
|
|
:returns: stderr if there was any"""
|
|
|
|
err, _ = execute(shell_cmd)
|
|
return err
|
|
|
|
|
|
def execute(shell_cmd):
|
|
"""Run a command.
|
|
|
|
:returns: `tuple` (`str` stderr, `str` stdout)"""
|
|
|
|
# universal_newlines causes Popen.communicate()
|
|
# to return str objects instead of bytes in Python 3
|
|
cmd = Popen(shell_cmd, shell=True, stdout=PIPE,
|
|
stderr=PIPE, universal_newlines=True)
|
|
out, err = cmd.communicate()
|
|
if cmd.returncode != 0:
|
|
logger.error('Hook command "%s" returned error code %d',
|
|
shell_cmd, cmd.returncode)
|
|
if err:
|
|
logger.error('Error output from %s:\n%s', _prog(shell_cmd), err)
|
|
return (err, out)
|
|
|
|
|
|
def _is_exe(fpath):
|
|
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
|
|
|
|
def _which(program):
|
|
"""Test if program is in the path."""
|
|
# Borrowed from:
|
|
# https://stackoverflow.com/questions/377017/test-if-executable-exists-in-python
|
|
# XXX May need more porting to handle .exe extensions on Windows
|
|
|
|
fpath, _fname = os.path.split(program)
|
|
if fpath:
|
|
if _is_exe(program):
|
|
return program
|
|
else:
|
|
for path in os.environ["PATH"].split(os.pathsep):
|
|
exe_file = os.path.join(path, program)
|
|
if _is_exe(exe_file):
|
|
return exe_file
|
|
|
|
return None
|