certbot/certbot/hooks.py
Brad Warren 39f5551305 Merge the manual and script plugins (#3890)
* Start of combined manual/script plugin

* Return str from hooks.execute, not bytes

* finish manual/script rewrite

* delete old manual and script plugins

* manually specify we want chall.token

* use consistent quotes

* specify chall for uri

* s/script/hook

* fix spacing on instructions

* remove unneeded response argument

* make achall more helpful

* simplify perform

* remove old test files

* add start of manual_tests

* fix ParseTest.test_help

* stop using manual_test_mode in cli tests

* Revert "make achall more helpful"

This reverts commit 54b01cea30.

* use bad response/validation methods on achalls

* simplify perform and cleanup environment

* finish manual tests

* Add HTTP manual hook integration test

* add manual http scripts

* Add manual DNS script integration test

* remove references to the script plugin

* they're hooks, not scripts

* add --manual-public-ip-logging-ok to integration tests

* use --pref-chall for dns integration

* does dns work?

* validate hooks

* test hook validation

* Revert "does dns work?"

This reverts commit 1224cc2961.

* busy wait in manual-http-auth

* remove DNS script test for now

* Fix challenge prefix and add trailing .

* Add comment about universal_newlines

* Fix typo from 0464ba2c4

* fix nits and typos

* Generalize HookCOmmandNotFound error

* Add verify_exe_exists

* Don't duplicate code in hooks.py

* Revert changes to hooks.py

* Use consistent hook error messages
2016-12-22 08:24:08 -08:00

119 lines
3.7 KiB
Python

"""Facilities for implementing hooks that call shell commands."""
from __future__ import print_function
import logging
import os
from subprocess import Popen, PIPE
from certbot import errors
logger = logging.getLogger(__name__)
def validate_hooks(config):
"""Check hook commands are executable."""
validate_hook(config.pre_hook, "pre")
validate_hook(config.post_hook, "post")
validate_hook(config.renew_hook, "renew")
def _prog(shell_cmd):
"""Extract the program run by a shell command"""
cmd = _which(shell_cmd)
return os.path.basename(cmd) if cmd else None
def validate_hook(shell_cmd, hook_name):
"""Check that a command provided as a hook is plausibly executable.
:raises .errors.HookCommandNotFound: if the command is not found
"""
if shell_cmd:
cmd = shell_cmd.split(None, 1)[0]
if not _prog(cmd):
path = os.environ["PATH"]
msg = "Unable to find {2}-hook command {0} in the PATH.\n(PATH is {1})".format(
cmd, path, hook_name)
raise errors.HookCommandNotFound(msg)
def pre_hook(config):
"Run pre-hook if it's defined and hasn't been run."
if config.pre_hook and not pre_hook.already:
logger.info("Running pre-hook command: %s", config.pre_hook)
_run_hook(config.pre_hook)
pre_hook.already = True
pre_hook.already = False
def post_hook(config, final=False):
"""Run post hook if defined.
If the verb is renew, we might have more certs to renew, so we wait until
we're called with final=True before actually doing anything.
"""
if config.post_hook:
if not pre_hook.already:
logger.info("No renewals attempted, so not running post-hook")
if config.verb != "renew":
logger.warning("Sanity failure in renewal hooks")
return
if final or config.verb != "renew":
logger.info("Running post-hook command: %s", config.post_hook)
_run_hook(config.post_hook)
def renew_hook(config, domains, lineage_path):
"Run post-renewal hook if defined."
if config.renew_hook:
if not config.dry_run:
os.environ["RENEWED_DOMAINS"] = " ".join(domains)
os.environ["RENEWED_LINEAGE"] = lineage_path
_run_hook(config.renew_hook)
else:
logger.warning("Dry run: skipping renewal hook command: %s", config.renew_hook)
def _run_hook(shell_cmd):
"""Run a hook command.
:returns: stderr if there was any"""
err, _ = execute(shell_cmd)
return err
def execute(shell_cmd):
"""Run a command.
:returns: `tuple` (`str` stderr, `str` stdout)"""
# universal_newlines causes Popen.communicate()
# to return str objects instead of bytes in Python 3
cmd = Popen(shell_cmd, shell=True, stdout=PIPE,
stderr=PIPE, universal_newlines=True)
out, err = cmd.communicate()
if cmd.returncode != 0:
logger.error('Hook command "%s" returned error code %d',
shell_cmd, cmd.returncode)
if err:
logger.error('Error output from %s:\n%s', _prog(shell_cmd), err)
return (err, out)
def _is_exe(fpath):
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
def _which(program):
"""Test if program is in the path."""
# Borrowed from:
# https://stackoverflow.com/questions/377017/test-if-executable-exists-in-python
# XXX May need more porting to handle .exe extensions on Windows
fpath, _fname = os.path.split(program)
if fpath:
if _is_exe(program):
return program
else:
for path in os.environ["PATH"].split(os.pathsep):
exe_file = os.path.join(path, program)
if _is_exe(exe_file):
return exe_file
return None