post renewal hook: Add RENEWED_DOMAINS and FAILED_DOMAINS as environment variables (#9724)

* renewal hook: Add RENEWED_DOMAINS and FAILED_DOMAINS as environment variables

* renewal hook: Updated documentation

* renewal hook: Updated CHANGELOG

* renew post hook: Add limit on variable sizes
This commit is contained in:
Michael Cassaniti 2023-07-06 23:56:31 +10:00 committed by GitHub
parent d0e11c81b1
commit 436b7fbe28
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 111 additions and 17 deletions

View file

@ -12,6 +12,7 @@ Certbot adheres to [Semantic Versioning](https://semver.org/).
* `NamespaceConfig` now tracks how its arguments were set via a dictionary, allowing us to remove a bunch
of global state previously needed to inspect whether a user set an argument or not.
* Added `RENEWED_DOMAINS` and `FAILED_DOMAINS` environment variables for consumption by post renewal hooks.
### Fixed

View file

@ -1,6 +1,7 @@
"""Facilities for implementing hooks that call shell commands."""
import logging
from typing import Dict
from typing import List
from typing import Optional
from typing import Set
@ -102,7 +103,11 @@ def _run_pre_hook_if_necessary(command: str) -> None:
executed_pre_hooks.add(command)
def post_hook(config: configuration.NamespaceConfig) -> None:
def post_hook(
config: configuration.NamespaceConfig,
renewed_domains: List[str]
) -> None:
"""Run post-hooks if defined.
This function also registers any executables found in
@ -130,7 +135,22 @@ def post_hook(config: configuration.NamespaceConfig) -> None:
_run_eventually(cmd)
# certonly / run
elif cmd:
_run_hook("post-hook", cmd)
renewed_domains_str = ' '.join(renewed_domains)
# 32k is reasonable on Windows and likely quite conservative on other platforms
if len(renewed_domains_str) > 32_000:
logger.warning("Limiting RENEWED_DOMAINS environment variable to 32k characters")
renewed_domains_str = renewed_domains_str[:32_000]
_run_hook(
"post-hook",
cmd,
{
'RENEWED_DOMAINS': renewed_domains_str,
# Since other commands stop certbot execution on failure,
# it doesn't make sense to have a FAILED_DOMAINS variable
'FAILED_DOMAINS': ""
}
)
post_hooks: List[str] = []
@ -149,10 +169,30 @@ def _run_eventually(command: str) -> None:
post_hooks.append(command)
def run_saved_post_hooks() -> None:
def run_saved_post_hooks(renewed_domains: List[str], failed_domains: List[str]) -> None:
"""Run any post hooks that were saved up in the course of the 'renew' verb"""
renewed_domains_str = ' '.join(renewed_domains)
failed_domains_str = ' '.join(failed_domains)
# 32k combined is reasonable on Windows and likely quite conservative on other platforms
if len(renewed_domains_str) > 16_000:
logger.warning("Limiting RENEWED_DOMAINS environment variable to 16k characters")
renewed_domains_str = renewed_domains_str[:16_000]
if len(failed_domains_str) > 16_000:
logger.warning("Limiting FAILED_DOMAINS environment variable to 16k characters")
renewed_domains_str = failed_domains_str[:16_000]
for cmd in post_hooks:
_run_hook("post-hook", cmd)
_run_hook(
"post-hook",
cmd,
{
'RENEWED_DOMAINS': renewed_domains_str,
'FAILED_DOMAINS': failed_domains_str
}
)
def deploy_hook(config: configuration.NamespaceConfig, domains: List[str],
@ -229,16 +269,20 @@ def _run_deploy_hook(command: str, domains: List[str], lineage_path: str, dry_ru
_run_hook("deploy-hook", command)
def _run_hook(cmd_name: str, shell_cmd: str) -> str:
def _run_hook(cmd_name: str, shell_cmd: str, extra_env: Optional[Dict[str, str]] = None) -> str:
"""Run a hook command.
:param str cmd_name: the user facing name of the hook being run
:param shell_cmd: shell command to execute
:type shell_cmd: `list` of `str` or `str`
:param dict extra_env: extra environment variables to set
:type extra_env: `dict` of `str` to `str`
:returns: stderr if there was any"""
env = util.env_no_snap_for_external_calls()
env.update(extra_env or {})
returncode, err, out = misc.execute_command_status(
cmd_name, shell_cmd, env=util.env_no_snap_for_external_calls())
cmd_name, shell_cmd, env=env)
display_ops.report_executed_command(f"Hook '{cmd_name}'", returncode, out, err)
return err

View file

@ -115,6 +115,8 @@ def _get_and_save_cert(le_client: client.Client, config: configuration.Namespace
"""
hooks.pre_hook(config)
renewed_domains: List[str] = []
try:
if lineage is not None:
# Renewal, where we already know the specific lineage we're
@ -143,8 +145,9 @@ def _get_and_save_cert(le_client: client.Client, config: configuration.Namespace
raise errors.Error("Certificate could not be obtained")
if lineage is not None:
hooks.deploy_hook(config, lineage.names(), lineage.live_dir)
renewed_domains.extend(domains)
finally:
hooks.post_hook(config)
hooks.post_hook(config, renewed_domains)
return lineage
@ -1632,10 +1635,13 @@ def renew(config: configuration.NamespaceConfig,
:rtype: None
"""
renewed_domains: List[str] = []
failed_domains: List[str] = []
try:
renewal.handle_renewal_request(config)
renewed_domains, failed_domains = renewal.handle_renewal_request(config)
finally:
hooks.run_saved_post_hooks()
hooks.run_saved_post_hooks(renewed_domains, failed_domains)
def make_or_verify_needed_dirs(config: configuration.NamespaceConfig) -> None:

View file

@ -13,6 +13,7 @@ from typing import Iterable
from typing import List
from typing import Mapping
from typing import Optional
from typing import Tuple
from typing import Union
from cryptography.hazmat.backends import default_backend
@ -459,7 +460,7 @@ def _renew_describe_results(config: configuration.NamespaceConfig, renew_success
notify(display_obj.SIDE_FRAME)
def handle_renewal_request(config: configuration.NamespaceConfig) -> None:
def handle_renewal_request(config: configuration.NamespaceConfig) -> Tuple[list, list]:
"""Examine each lineage; renew if due and report results"""
# This is trivially False if config.domains is empty
@ -484,6 +485,9 @@ def handle_renewal_request(config: configuration.NamespaceConfig) -> None:
renew_skipped = []
parse_failures = []
renewed_domains = []
failed_domains = []
# Noninteractive renewals include a random delay in order to spread
# out the load on the certificate authority servers, even if many
# users all pick the same time for renewals. This delay precedes
@ -532,6 +536,7 @@ def handle_renewal_request(config: configuration.NamespaceConfig) -> None:
# and we have a lineage in renewal_candidate
main.renew_cert(lineage_config, plugins, renewal_candidate)
renew_successes.append(renewal_candidate.fullchain)
renewed_domains.extend(renewal_candidate.names())
else:
expiry = crypto_util.notAfter(renewal_candidate.version(
"cert", renewal_candidate.latest_common_version()))
@ -550,6 +555,7 @@ def handle_renewal_request(config: configuration.NamespaceConfig) -> None:
logger.debug("Traceback was:\n%s", traceback.format_exc())
if renewal_candidate:
renew_failures.append(renewal_candidate.fullchain)
failed_domains.extend(renewal_candidate.names())
# Describe all the results
_renew_describe_results(config, renew_successes, renew_failures,
@ -563,6 +569,8 @@ def handle_renewal_request(config: configuration.NamespaceConfig) -> None:
# If the text below changes, these tests will need to be updated accordingly.
logger.debug("no renewal failures")
return (renewed_domains, failed_domains)
def _update_renewal_params_from_key(key_path: str, config: configuration.NamespaceConfig) -> None:
with open(key_path, 'rb') as file_h:

View file

@ -1,6 +1,7 @@
"""Tests for certbot._internal.hooks."""
import sys
import unittest
from platform import python_version_tuple
from unittest import mock
import pytest
@ -10,6 +11,18 @@ from certbot import util
from certbot.compat import filesystem
from certbot.compat import os
from certbot.tests import util as test_util
from typing import List
def pyver_lt(major: int, minor: int):
pymajor = int(python_version_tuple()[0])
pyminor = int(python_version_tuple()[1])
if pymajor < major:
return True
elif pymajor > major:
return False
else:
return pyminor < minor
class ValidateHooksTest(unittest.TestCase):
@ -194,7 +207,7 @@ class PostHookTest(HookTest):
def test_certonly_and_run_with_hook(self):
for verb in ("certonly", "run",):
self.config.verb = verb
mock_execute = self._call_with_mock_execute(self.config)
mock_execute = self._call_with_mock_execute(self.config, [])
mock_execute.assert_called_once_with("post-hook", self.config.post_hook, env=mock.ANY)
assert not self._get_eventually()
@ -202,9 +215,15 @@ class PostHookTest(HookTest):
self.config.post_hook = None
for verb in ("certonly", "run",):
self.config.verb = verb
assert not self._call_with_mock_execute(self.config).called
assert not self._call_with_mock_execute(self.config, []).called
assert not self._get_eventually()
@unittest.skipIf(pyver_lt(3, 8), "Python 3.8+ required for this test.")
def test_renew_env(self):
self.config.verb = "certonly"
args = self._call_with_mock_execute(self.config, ["success.org"]).call_args
assert args.kwargs['env']["RENEWED_DOMAINS"] == "success.org"
def test_renew_disabled_dir_hooks(self):
self.config.directory_hooks = False
self._test_renew_common([self.config.post_hook])
@ -238,7 +257,7 @@ class PostHookTest(HookTest):
self.config.verb = "renew"
for _ in range(2):
self._call(self.config)
self._call(self.config, [])
assert self._get_eventually() == expected
def _get_eventually(self):
@ -252,7 +271,10 @@ class RunSavedPostHooksTest(HookTest):
@classmethod
def _call(cls, *args, **kwargs):
from certbot._internal.hooks import run_saved_post_hooks
return run_saved_post_hooks()
renewed_domains = kwargs["renewed_domains"] if "renewed_domains" in kwargs else args[0]
failed_domains = kwargs["failed_domains"] if "failed_domains" in kwargs else args[1]
return run_saved_post_hooks(renewed_domains, failed_domains)
def _call_with_mock_execute_and_eventually(self, *args, **kwargs):
"""Call run_saved_post_hooks but mock out execute and eventually
@ -271,11 +293,11 @@ class RunSavedPostHooksTest(HookTest):
self.eventually: List[str] = []
def test_empty(self):
assert not self._call_with_mock_execute_and_eventually().called
assert not self._call_with_mock_execute_and_eventually([], []).called
def test_multiple(self):
self.eventually = ["foo", "bar", "baz", "qux"]
mock_execute = self._call_with_mock_execute_and_eventually()
mock_execute = self._call_with_mock_execute_and_eventually([], [])
calls = mock_execute.call_args_list
for actual_call, expected_arg in zip(calls, self.eventually):
@ -283,9 +305,16 @@ class RunSavedPostHooksTest(HookTest):
def test_single(self):
self.eventually = ["foo"]
mock_execute = self._call_with_mock_execute_and_eventually()
mock_execute = self._call_with_mock_execute_and_eventually([], [])
mock_execute.assert_called_once_with("post-hook", self.eventually[0], env=mock.ANY)
@unittest.skipIf(pyver_lt(3, 8), "Python 3.8+ required for this test.")
def test_env(self):
self.eventually = ["foo"]
mock_execute = self._call_with_mock_execute_and_eventually(["success.org"], ["failed.org"])
assert mock_execute.call_args.kwargs['env']["RENEWED_DOMAINS"] == "success.org"
assert mock_execute.call_args.kwargs['env']["FAILED_DOMAINS"] == "failed.org"
class RenewalHookTest(HookTest):
"""Common base class for testing deploy/renew hooks."""

View file

@ -707,6 +707,12 @@ form is not appropriate to run daily because each certificate will be
renewed every day, which will quickly run into the certificate authority
rate limit.)
Starting with Certbot 2.7.0, certbot provides the environment variables
`RENEWED_DOMAINS` and `FAILED_DOMAINS` to all post renewal hooks. These
variables contain a space separated list of domains. These variables can be used
to determine if a renewal has succeeded or failed as part of your post renewal
hook.
Note that options provided to ``certbot renew`` will apply to
*every* certificate for which renewal is attempted; for example,
``certbot renew --rsa-key-size 4096`` would try to replace every