When making an ARI request, use the server listed in the cert's renewal
conf rather than the one passed in certbot's config.

Fixes #10339

---------

Co-authored-by: Brad Warren <bmw@users.noreply.github.com>
This commit is contained in:
Will Greenberg 2025-06-27 10:51:55 -07:00 committed by GitHub
parent b70a9f0987
commit 372813175d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 102 additions and 68 deletions

View file

@ -14,6 +14,10 @@ Certbot adheres to [Semantic Versioning](https://semver.org/).
### Fixed
* Certbot now always uses the server value from the renewal configuration file
for ARI checks instead of the server value from the current invocation of
Certbot. This helps prevent ARI requests from going to the wrong server if
the user changes CAs.
* Previously, we claimed to set FAILED_DOMAINS and RENEWED_DOMAINS env variables for use by
post-hooks when certificate renewals fail, but we were not actually setting them. Now, we are.

View file

@ -48,9 +48,10 @@ from certbot.interfaces import AccountStorage
logger = logging.getLogger(__name__)
def acme_from_config_key(config: configuration.NamespaceConfig,
def create_acme_client(config: configuration.NamespaceConfig,
key: Optional[jose.JWK] = None,
regr: Optional[messages.RegistrationResource] = None,
server_override: Optional[str] = None,
) -> acme_client.ClientV2:
"""Wrangle ACME client construction"""
alg = RS256
@ -70,7 +71,10 @@ def acme_from_config_key(config: configuration.NamespaceConfig,
verify_ssl=(not config.no_verify_ssl),
user_agent=determine_user_agent(config))
directory = acme_client.ClientV2.get_directory(config.server, net)
server = config.server
if server_override:
server = server_override
directory = acme_client.ClientV2.get_directory(server, net)
return acme_client.ClientV2(directory, net)
@ -199,7 +203,7 @@ def register(config: configuration.NamespaceConfig, account_storage: AccountStor
key_size=config.rsa_key_size,
backend=default_backend())
key = jose.JWKRSA(key=jose.ComparableRSAKey(rsa_key))
acme = acme_from_config_key(config, key)
acme = create_acme_client(config, key)
# TODO: add phone?
regr = perform_registration(acme, config, tos_cb)
@ -291,7 +295,7 @@ class Client:
# Initialize ACME if account is provided
if acme is None and self.account is not None:
acme = acme_from_config_key(config, self.account.key, self.account.regr)
acme = create_acme_client(config, self.account.key, self.account.regr)
self.acme = acme
self.auth_handler: Optional[auth_handler.AuthHandler]

View file

@ -269,7 +269,7 @@ def _handle_identical_cert_request(config: configuration.NamespaceConfig,
return "renew", lineage
acme_clients = {}
acme_clients[config.server] = client.acme_from_config_key(config)
acme_clients[config.server] = client.create_acme_client(config)
if renewal.should_renew(config, lineage, acme_clients):
return "renew", lineage
@ -1365,11 +1365,11 @@ def revoke(config: configuration.NamespaceConfig,
crypto_util.verify_cert_matches_priv_key(config.cert_path, config.key_path)
with open(config.key_path, 'rb') as f:
key = jose.JWK.load(f.read())
acme = client.acme_from_config_key(config, key)
acme = client.create_acme_client(config, key)
else: # revocation by account key
logger.debug("Revoking %s using Account Key", config.cert_path)
acc, _ = _determine_account(config)
acme = client.acme_from_config_key(config, acc.key, acc.regr)
acme = client.create_acme_client(config, acc.key, acc.regr)
with open(config.cert_path, 'rb') as f:
cert = x509.load_pem_x509_certificate(f.read())

View file

@ -367,48 +367,61 @@ def should_autorenew(config: configuration.NamespaceConfig,
:rtype: bool
"""
if lineage.autorenewal_is_enabled():
# Don't initialize the acme client (making a network request) until
# we know we're actually going to have to check ARI
if config.server not in acme_clients:
acme_clients[config.server] = client.acme_from_config_key(config)
acme = acme_clients[config.server]
if not lineage.autorenewal_is_enabled():
return False
cert = lineage.version("cert", lineage.latest_common_version())
cert = lineage.version("cert", lineage.latest_common_version())
with open(cert, 'rb') as f:
cert_pem = f.read()
# Consider whether to attempt to autorenew this cert now
renewal_time = None
with open(cert, 'rb') as f:
cert_pem = f.read()
renewal_time = None
# For ARI requests, we want to use the ACME directory URL from which the
# cert was originally requested. Since `config.server` can be overridden on
# the command line, we're using the server stored in the cert's renewal
# conf, i.e. `lineage.server`
#
# Fixes https://github.com/certbot/certbot/issues/10339
if lineage.server:
# Creating a new ACME client makes a network request, so check if we have
# one cached for this cert's server already
if lineage.server not in acme_clients:
acme_clients[lineage.server] = \
client.create_acme_client(config, server_override=lineage.server)
acme = acme_clients[lineage.server]
# Attempt to get the ARI-defined renewal time
renewal_time, _ = acme.renewal_time(cert_pem)
else:
logger.info("Certificate has no 'server' field configured, unable to "
"perform ACME Renewal Information (ARI) request.")
now = datetime.datetime.now(datetime.timezone.utc)
now = datetime.datetime.now(datetime.timezone.utc)
if renewal_time and now > renewal_time:
if renewal_time and now > renewal_time:
return True
# Renewals on the basis of revocation
if lineage.ocsp_revoked(lineage.latest_common_version()):
logger.debug("Should renew, certificate is revoked.")
return True
# The "renew_before_expiry" config field can make us renew earlier than the
# default. If ARI response was None and no "renew_before_expiry" is set,
# check against the default.
config_interval = lineage.configuration.get("renew_before_expiry")
if config_interval is not None:
notAfter = crypto_util.notAfter(cert)
if notAfter < storage.add_time_interval(now, config_interval):
logger.debug("Should renew, less than %s before certificate "
"expiry %s.", config_interval,
notAfter.strftime("%Y-%m-%d %H:%M:%S %Z"))
return True
# Renewals on the basis of revocation
if lineage.ocsp_revoked(lineage.latest_common_version()):
logger.debug("Should renew, certificate is revoked.")
# Only use the default if we don't have an ARI response
elif renewal_time is None:
default_renewal_time = _default_renewal_time(cert_pem)
if now > default_renewal_time:
return True
# The "renew_before_expiry" config field can make us renew earlier
# than the default. If ARI response was None and no "renew_before_expiry"
# is set, check against the default.
config_interval = lineage.configuration.get("renew_before_expiry")
if config_interval is not None:
notAfter = crypto_util.notAfter(cert)
if notAfter < storage.add_time_interval(now, config_interval):
logger.debug("Should renew, less than %s before certificate "
"expiry %s.", config_interval,
notAfter.strftime("%Y-%m-%d %H:%M:%S %Z"))
return True
# Only use the default if we don't have an ARI response
elif renewal_time is None:
default_renewal_time = _default_renewal_time(cert_pem)
if now > default_renewal_time:
return True
return False

View file

@ -458,25 +458,25 @@ class RevokeTest(test_util.TempDirTestCase):
@mock.patch('certbot._internal.main._delete_if_appropriate')
@mock.patch('certbot._internal.storage.RenewableCert')
@mock.patch('certbot._internal.storage.renewal_file_for_certname')
@mock.patch('certbot._internal.client.acme_from_config_key')
def test_revoke_by_certname(self, mock_acme_from_config,
@mock.patch('certbot._internal.client.create_acme_client')
def test_revoke_by_certname(self, mock_create_acme,
unused_mock_renewal_file_for_certname, mock_cert,
mock_delete_if_appropriate):
mock_acme_from_config.return_value = self.mock_acme_client
mock_create_acme.return_value = self.mock_acme_client
mock_cert.return_value = mock.MagicMock(cert_path=self.tmp_cert_path,
server="https://acme.example")
args = 'revoke --cert-name=example.com'.split()
mock_delete_if_appropriate.return_value = False
self._call(args)
assert mock_acme_from_config.call_args_list[0][0][0].server == \
assert mock_create_acme.call_args_list[0][0][0].server == \
'https://acme.example'
self.mock_success_revoke.assert_called_once_with(self.tmp_cert_path)
@mock.patch('certbot._internal.main._delete_if_appropriate')
@mock.patch('certbot._internal.storage.RenewableCert')
@mock.patch('certbot._internal.storage.renewal_file_for_certname')
@mock.patch('certbot._internal.client.acme_from_config_key')
def test_revoke_by_certname_and_server(self, mock_acme_from_config,
@mock.patch('certbot._internal.client.create_acme_client')
def test_revoke_by_certname_and_server(self, mock_create_acme,
unused_mock_renewal_file_for_certname, mock_cert,
mock_delete_if_appropriate):
"""Revoking with --server should use the server from the CLI"""
@ -485,15 +485,15 @@ class RevokeTest(test_util.TempDirTestCase):
args = 'revoke --cert-name=example.com --server https://other.example'.split()
mock_delete_if_appropriate.return_value = False
self._call(args)
assert mock_acme_from_config.call_args_list[0][0][0].server == \
assert mock_create_acme.call_args_list[0][0][0].server == \
'https://other.example'
self.mock_success_revoke.assert_called_once_with(self.tmp_cert_path)
@mock.patch('certbot._internal.main._delete_if_appropriate')
@mock.patch('certbot._internal.storage.RenewableCert')
@mock.patch('certbot._internal.storage.renewal_file_for_certname')
@mock.patch('certbot._internal.client.acme_from_config_key')
def test_revoke_by_certname_empty_server(self, mock_acme_from_config,
@mock.patch('certbot._internal.client.create_acme_client')
def test_revoke_by_certname_empty_server(self, mock_create_acme,
unused_mock_renewal_file_for_certname,
mock_cert, mock_delete_if_appropriate):
"""Revoking with --cert-name where the lineage server is empty shouldn't crash """
@ -501,7 +501,7 @@ class RevokeTest(test_util.TempDirTestCase):
args = 'revoke --cert-name=example.com'.split()
mock_delete_if_appropriate.return_value = False
self._call(args)
assert mock_acme_from_config.call_args_list[0][0][0].server == \
assert mock_create_acme.call_args_list[0][0][0].server == \
constants.CLI_DEFAULTS['server']
self.mock_success_revoke.assert_called_once_with(self.tmp_cert_path)
@ -1823,7 +1823,7 @@ class MainTest(test_util.ConfigTestCase):
_, _, _, client = self._call(['--cert-path', CERT, 'revoke'])
with open(CERT, 'rb') as f:
cert = x509.load_pem_x509_certificate(f.read())
mock_revoke = client.acme_from_config_key().revoke
mock_revoke = client.create_acme_client().revoke
mock_revoke.assert_called_once_with(cert, mock.ANY)
@mock.patch('certbot._internal.log.post_arg_parse_setup')

View file

@ -217,7 +217,7 @@ class RenewalTest(test_util.ConfigTestCase):
@test_util.patch_display_util()
@mock.patch.object(configuration.NamespaceConfig, 'set_by_user')
@mock.patch('certbot._internal.client.acme_from_config_key')
@mock.patch('certbot._internal.client.create_acme_client')
@mock.patch('certbot._internal.main.renew_cert')
@mock.patch("certbot._internal.renewal.datetime")
def test_renewal_via_ari(self, mock_datetime, mock_renew_cert, mock_acme_from_config, mock_set_by_user, unused_mock_display):
@ -259,7 +259,6 @@ class RenewalTest(test_util.ConfigTestCase):
assert mock_client_network_get.call_count == 0
@mock.patch('acme.client.ClientV2')
def test_dry_run_no_ari_call(self, mock_acme):
from certbot._internal import renewal
@ -295,13 +294,15 @@ class RenewalTest(test_util.ConfigTestCase):
not_before = datetime.datetime(2014, 12, 11, 22, 34, 45)
short_cert = make_cert_with_lifetime(not_before, 7)
ari_server = "http://ari"
mock_acme = mock.MagicMock()
future = datetime.datetime.now(pytz.UTC) + datetime.timedelta(days=100000)
mock_acme.renewal_time.return_value = (future, future)
acme_clients = {}
acme_clients[self.config.server] = mock_acme
acme_clients[ari_server] = mock_acme
mock_renewable_cert = mock.MagicMock()
mock_renewable_cert.server = ari_server
mock_renewable_cert.autorenewal_is_enabled.return_value = True
mock_renewable_cert.version.return_value = "/tmp/abc"
mock_renewable_cert.ocsp_revoked.return_value = False
@ -315,6 +316,7 @@ class RenewalTest(test_util.ConfigTestCase):
mock_renewable_cert.version.return_value = tmp_cert.name
# First, test cases where ARI returns a renewal_time far in the future
for (current_time, interval, result) in [
# 2014-12-13 12:00 (about 5 days prior to expiry)
# Times that should result in autorenewal/autodeployment
@ -349,6 +351,8 @@ class RenewalTest(test_util.ConfigTestCase):
mock_renewable_cert.configuration = {"renew_before_expiry": interval}
assert renewal.should_autorenew(self.config, mock_renewable_cert, acme_clients) == result, f"at {current_time}, with config '{interval}', ari response in future, expected {result}"
# Now, test cases where ARI either fails (returns `(None, _)`) or
# the cert has no `server` value and ARI is skipped
mock_acme.renewal_time.return_value = (None, future)
for (current_time, interval, result) in [
# 2014-12-13 12:00 (about 5 days prior to expiry)
@ -383,30 +387,39 @@ class RenewalTest(test_util.ConfigTestCase):
sometime = datetime.datetime.fromtimestamp(current_time, pytz.UTC)
mock_datetime.datetime.now.return_value = sometime
mock_renewable_cert.configuration = {"renew_before_expiry": interval}
mock_renewable_cert.server = ari_server
assert renewal.should_autorenew(self.config, mock_renewable_cert, acme_clients) == result, f"at {current_time}, with config '{interval}', no ari response, expected {result}"
mock_renewable_cert.server = None
assert renewal.should_autorenew(self.config, mock_renewable_cert, acme_clients) == result, f"at {current_time}, with config '{interval}', skipped ari, expected {result}"
@mock.patch.object(configuration.NamespaceConfig, 'set_by_user')
@mock.patch("certbot._internal.storage.RenewableCert.ocsp_revoked")
def test_should_autorenew(self, mock_ocsp, mock_set_by_user):
def test_should_autorenew(self, mock_ocsp):
from certbot._internal import renewal
mock_set_by_user.return_value = False
mock_acme = mock.MagicMock()
ari_server = "http://ari"
future = datetime.datetime.now(pytz.UTC) + datetime.timedelta(seconds=1000)
mock_acme.renewal_time.return_value = (future, future)
acme_clients = {}
acme_clients[self.config.server] = mock_acme
# Autorenewal turned off
acme_clients[ari_server] = mock_acme
mock_rc = mock.MagicMock()
mock_rc.autorenewal_is_enabled.return_value = False
assert not renewal.should_autorenew(self.config, mock_rc, acme_clients)
mock_rc.autorenewal_is_enabled.return_value = True
# Mandatory renewal on the basis of OCSP revocation
mock_ocsp.return_value = True
assert renewal.should_autorenew(self.config, mock_rc, acme_clients)
mock_ocsp.return_value = False
with mock.patch('certbot._internal.renewal.open', mock.mock_open(read_data=b'')):
# Autorenewal turned off
mock_rc.autorenewal_is_enabled.return_value = False
mock_rc.server = ari_server
assert not renewal.should_autorenew(self.config, mock_rc, acme_clients)
mock_rc.server = None
assert not renewal.should_autorenew(self.config, mock_rc, acme_clients)
# Autorenewal turned on, mandatory renewal on the basis of OCSP
# revocation
mock_rc.autorenewal_is_enabled.return_value = True
mock_ocsp.return_value = True
assert renewal.should_autorenew(self.config, mock_rc, acme_clients)
mock_rc.server = None
assert renewal.should_autorenew(self.config, mock_rc, acme_clients)
class RestoreRequiredConfigElementsTest(test_util.ConfigTestCase):
"""Tests for certbot._internal.renewal.restore_required_config_elements."""