ari: store Retry-After in lineage config (#10377)

This uses the new `storage.atomic_rewrite` function to write only the
ARI-related value to the config file, leaving other values the same.

Updates `storage.atomic_rewrite` to handle the case where the
`"renewalparams"` section might be empty (which occurs in the new
unittests), and adds some comments.

Note: `atomic_rewrite` is mocked out as a no-op in
`test_resilient_ari_directory_fetches` and `test_renew_before_expiry`
because those tests have a simplistic config object on their
mock_renewable_cert that doesn't include a filename. If we try to write
the config in those tests we'd get an error (trying to write to `None`).
Since those tests aren't intended to test the "store and obey
Retry-After" behavior, I figured it's reasonable to skip it in the name
of testing; but of course, open to idea about the best way to navigate
this.

Part of #10355
This commit is contained in:
Jacob Hoffman-Andrews 2025-08-18 11:05:29 -07:00 committed by GitHub
parent 56da12207b
commit a58d9e96d7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 126 additions and 19 deletions

View file

@ -1,5 +1,6 @@
"""Functionality for autorenewal and associated juggling of configurations"""
import configobj
import copy
import datetime
import itertools
@ -39,6 +40,8 @@ from certbot.display import util as display_util
logger = logging.getLogger(__name__)
ARI_RETRY_AFTER_CONFIG_ITEM = "ari_retry_after"
# These are the items which get pulled out of a renewal configuration
# file's renewalparams and actually used in the client configuration
# during the renewal process. We have to record their types here because
@ -370,12 +373,22 @@ def _ari_renewal_time(lineage: storage.RenewableCert,
logger.warning("Skipping ARI check because %s has no 'server' field. This issue will not "
"prevent certificate renewal", lineage.configfile.filename)
return None
ari_config_section = lineage.configfile.get("acme_renewal_info", {})
retry_after = ari_config_section.get(ARI_RETRY_AFTER_CONFIG_ITEM, None)
if retry_after:
retry_after_datetime = datetime.datetime.fromisoformat(retry_after)
now = datetime.datetime.now()
if now < retry_after_datetime:
logger.debug("Skipped ACME Renewal Info check because ari_retry_after %s is in "
"the future",
retry_after)
return None
renewal_time = None
try:
ari_client = ari_clients.get(lineage.server)
# Attempt to get the ARI-defined renewal time
if ari_client:
return ari_client.renewal_time(cert_pem)[0]
renewal_time, retry_after = ari_client.renewal_time(cert_pem)
except Exception: # pylint: disable=broad-except
# We want to stop errors around ARI preventing renewal so we catch all exceptions here
# with a warning asking users to tell us about any problems they are experiencing
@ -383,8 +396,17 @@ def _ari_renewal_time(lineage: storage.RenewableCert,
"problem persists and you think it's a bug in Certbot, please open an "
"issue at https://github.com/certbot/certbot/issues/new/choose.")
logger.debug("Error while requesting ARI was:", exc_info=True)
retry_after = datetime.datetime.now() + datetime.timedelta(seconds=60 * 60 * 6)
return None
config_update = configobj.ConfigObj()
config_update["acme_renewal_info"] = {
# Note: the ACME client returns naive (no timezone) datetimes for retry_after, and that
# is what we serialize here.
ARI_RETRY_AFTER_CONFIG_ITEM: retry_after.isoformat(timespec="seconds"),
}
storage.atomic_rewrite(lineage.configfile.filename, config_update)
return renewal_time
def _default_renewal_time(cert_pem: bytes) -> datetime.datetime:

View file

@ -140,11 +140,21 @@ def atomic_rewrite(config_filename: str, new_config: configobj.ConfigObj) -> Non
file_error=True)
merged_config.merge(new_config)
# We merge and then delete, rather than just replacing the 'renewalparams' section, so we can
# preserve comments from the on-disk config file.
for k in merged_config["renewalparams"]:
if k not in new_config["renewalparams"]:
del merged_config["renewalparams"][k]
# When updating the "renewalparams" section, only carry through fields that actually exist
# in the new config's "renewalparams" section. We could achieve this straightforwardly by
# assigning the new config's "renewalparams" section into the merged config. But then we would
# lose comments. Merging and then deleting allows us to preserve comments.
#
# As a concrete example, if a renewal params config has elliptic_curve=secp384r1, and the
# user executes a command to issue with `--key-type=rsa` instead, the old elliptic_curve value
# should disappear because it's not specified in the current command line.
#
# This only applies when "renewalparams" is actually being updated, because sometimes we
# update other sections independently (like "acme_renewal_info").
if "renewalparams" in new_config:
for k in merged_config["renewalparams"]:
if k not in new_config["renewalparams"]:
del merged_config["renewalparams"][k]
current_permissions = stat.S_IMODE(os.lstat(config_filename).st_mode)

View file

@ -6,10 +6,10 @@ import tempfile
import unittest
from unittest import mock
import configobj
import pytest
from acme import challenges
from acme import errors as acme_errors
from acme import challenges, errors as acme_errors
from certbot import configuration
from certbot import errors
from certbot._internal import storage
@ -279,8 +279,9 @@ class RenewalTest(test_util.ConfigTestCase):
t = renewal._default_renewal_time(cert_pem)
assert t == datetime.datetime(2025, 3, 24, 00, 00, 00, tzinfo=datetime.timezone.utc)
@mock.patch("certbot._internal.storage.atomic_rewrite")
@mock.patch("certbot._internal.renewal.datetime")
def test_renew_before_expiry(self, mock_datetime):
def test_renew_before_expiry(self, mock_datetime, unused_mock_atomic_rewrite):
"""When neither OCSP nor the ACME client indicate it's time to renew,
obey the renew_before_expiry config.
"""
@ -295,17 +296,15 @@ class RenewalTest(test_util.ConfigTestCase):
short_cert = make_cert_with_lifetime(not_before, 7)
ari_server = "http://ari"
mock_acme = mock.MagicMock()
future = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=100000)
mock_acme.renewal_time.return_value = (future, future)
ari_client_pool = mock.MagicMock()
ari_client_pool.get.return_value = mock_acme
ari_client_pool = MockAriClientPool(future, future)
mock_renewable_cert = mock.MagicMock()
mock_renewable_cert.server = ari_server
mock_renewable_cert.autorenewal_is_enabled.return_value = True
mock_renewable_cert.version.return_value = "/tmp/abc"
mock_renewable_cert.ocsp_revoked.return_value = False
mock_renewable_cert.configfile = configobj.ConfigObj()
mock_datetime.timedelta = datetime.timedelta
@ -353,7 +352,7 @@ class RenewalTest(test_util.ConfigTestCase):
# Now, test cases where ARI either fails (returns `(None, _)`) or
# the cert has no `server` value and ARI is skipped
mock_acme.renewal_time.return_value = (None, future)
ari_client_pool = MockAriClientPool(None, future)
for (current_time, interval, result) in [
# 2014-12-13 12:00 (about 5 days prior to expiry)
# Times that should result in autorenewal/autodeployment
@ -424,9 +423,11 @@ class RenewalTest(test_util.ConfigTestCase):
assert any(call.args[0].startswith('Skipping ARI check') for call in
mock_warning.call_args_list)
@mock.patch("certbot._internal.storage.atomic_rewrite")
@mock.patch('certbot._internal.storage.RenewableCert.ocsp_revoked')
@mock.patch('acme.client.ClientV2.renewal_time')
def test_resilient_ari_directory_fetches(self, mock_renewal_time, mock_ocsp):
def test_resilient_ari_directory_fetches(self, mock_renewal_time, mock_ocsp,
unused_mock_atomic_rewrite):
from certbot._internal import renewal
from acme import messages
@ -435,6 +436,7 @@ class RenewalTest(test_util.ConfigTestCase):
ari_client_pool.get.side_effect = messages.Error()
mock_rc = mock.MagicMock()
mock_rc.server = ari_server
mock_rc.configfile = configobj.ConfigObj()
mock_rc.autorenewal_is_enabled.return_value = True
mock_ocsp.return_value = True
@ -467,6 +469,79 @@ class RenewalTest(test_util.ConfigTestCase):
assert any('ARI' in call.args[0] for call in mock_logger.warning.call_args_list)
assert any(call.kwargs.get('exc_info') for call in mock_logger.debug.call_args_list)
def test_stores_ari_retry_after(self):
from certbot._internal import renewal
rc_path = test_util.make_lineage(self.config.config_dir, 'sample-renewal.conf')
renewable_cert = storage.RenewableCert(rc_path, self.config)
renewal_time = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(
seconds=1000)
retry_after = datetime.datetime.now() + datetime.timedelta(seconds=1000)
mock_ari_client_pool = MockAriClientPool(renewal_time, retry_after)
# Check for renewal. As a side effect, this should cause the lineage config to be
# updated with 'ari_retry_after' in the renewalparams section.
renewal.should_autorenew(renewable_cert, mock_ari_client_pool)
with open(renewable_cert.configfile.filename, 'r') as c:
renewable_cert_config = configobj.ConfigObj(c)
assert renewable_cert_config['acme_renewal_info']['ari_retry_after'] == retry_after.isoformat(
timespec='seconds')
def test_skips_ari_when_retry_after_future(self):
from certbot._internal import renewal
rc_path = test_util.make_lineage(self.config.config_dir, 'sample-renewal.conf')
renewable_cert = storage.RenewableCert(rc_path, self.config)
future = datetime.datetime.now() + datetime.timedelta(seconds=1000)
storage.atomic_rewrite(rc_path,
{"acme_renewal_info": {"ari_retry_after": future.isoformat(timespec="seconds")}})
# ARI shouldn't be checked at all because retry after is in the future.
mock_ari_client_pool = MockAriClientPool(None, None)
mock_ari_client_pool.mock_acme.renewal_time.side_effect = errors.Error("Shouldn't be called")
# Check for renewal. All we care about here is that renewal_time is not called; if it were,
# an exception would be raised.
renewal.should_autorenew(renewable_cert, mock_ari_client_pool)
def test_checks_ari_when_retry_after_absent(self):
from certbot._internal import renewal
rc_path = test_util.make_lineage(self.config.config_dir, 'sample-renewal.conf')
renewable_cert = storage.RenewableCert(rc_path, self.config)
renewal_time = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(seconds=1000)
retry_after = datetime.datetime.now() + datetime.timedelta(seconds=1000)
mock_ari_client_pool = MockAriClientPool(renewal_time, retry_after)
# The 'ari_retry_after' field is absent, so renewal_time _should_ be called.
# We don't care about the return value of should_autorenew.
renewal.should_autorenew(renewable_cert, mock_ari_client_pool)
mock_ari_client_pool.mock_acme.renewal_time.assert_called_once()
def test_checks_ari_when_retry_after_in_past(self):
from certbot._internal import renewal
rc_path = test_util.make_lineage(self.config.config_dir, 'sample-renewal.conf')
renewable_cert = storage.RenewableCert(rc_path, self.config)
past = datetime.datetime.now() - datetime.timedelta(seconds=1000)
storage.atomic_rewrite(rc_path,
{"acme_renewal_info": {"ari_retry_after": past.isoformat(timespec="seconds")}})
renewal_time = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(seconds=1000)
retry_after = datetime.datetime.now() + datetime.timedelta(seconds=1000)
mock_ari_client_pool = MockAriClientPool(renewal_time, retry_after)
# The 'ari_retry_after' field is in the past, so renewal_time _should_ be called.
# We don't care about the return value of should_autorenew.
renewal.should_autorenew(renewable_cert, mock_ari_client_pool)
mock_ari_client_pool.mock_acme.renewal_time.assert_called_once()
class MockAriClientPool:
def __init__(self, renewal_time, retry_after):