Add ARI support to acme module and to Certbot (#10272)

Follow-up to #10241. The acme module code is mostly the same, except the
switch to return a tuple containing Retry-After.

This includes the CLI-side work to call out to the new `renewal_time`
method when checking for renewal.

I moved `should_autorenew` from `storage.py` into `renewal.py`, where it
fits better (and also this solves an import cycle problem). To make the
edits more visible I split this into one commit for the move and [one
commit for the subsequent
edits](4e137d9b00 (diff-fad906e31304c767d620bfd243f4c7adf1e63a3420fd634ee57a0f6651c182cf)).

This does not yet attempt to store the Retry-After info, or failure
retries, in renewal configs. I figured since that's a pretty big chunk
of work and design on its own, I wanted to get interim feedback as is. I
think this PR would be okay to land with the current default crons /
systemd timers that run twice a day. I think we should implement storage
of retry information before increasing the frequency of runs. And if the
team would like to hold off on landing any ARI until that storage is
done, I'm good with that too. 👍🏻
This commit is contained in:
Jacob Hoffman-Andrews 2025-05-13 10:34:19 -07:00 committed by GitHub
parent c5686e6653
commit 723fe64d4d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 485 additions and 195 deletions

View file

@ -452,6 +452,172 @@ class ClientV2Test(unittest.TestCase):
assert DIRECTORY_V2.to_partial_json() == \
ClientV2.get_directory('https://example.com/dir', self.net).to_partial_json()
def test_renewal_time_no_renewal_info(self):
# A directory with no 'renewalInfo' should result in default renewal periods.
self.client.directory = messages.Directory({})
cert_pem = make_cert_for_renewal(
not_before=datetime.datetime(2025, 3, 12, 00, 00, 00),
not_after=datetime.datetime(2025, 3, 20, 00, 00, 00),
)
t, _ = self.client.renewal_time(cert_pem)
assert t == datetime.datetime(2025, 3, 16, 00, 00, 00, tzinfo=datetime.timezone.utc)
cert_pem = make_cert_for_renewal(
not_before=datetime.datetime(2025, 3, 12, 00, 00, 00),
not_after=datetime.datetime(2025, 3, 30, 00, 00, 00),
)
t, _ = self.client.renewal_time(cert_pem)
assert t == datetime.datetime(2025, 3, 24, 00, 00, 00, tzinfo=datetime.timezone.utc)
def test_renewal_time_with_renewal_info(self):
from cryptography import x509
from acme.client import _renewal_info_path_component
cert_pem = make_cert_for_renewal(
not_before=datetime.datetime(2025, 3, 12, 00, 00, 00),
not_after=datetime.datetime(2025, 3, 20, 00, 00, 00),
)
self.client.directory = messages.Directory({
'renewalInfo': 'https://www.letsencrypt-demo.org/acme/renewal-info',
})
self.response.json.return_value = {
"suggestedWindow": {
"start": "2025-03-14T01:01:01Z",
"end": "2025-03-14T01:01:01Z",
},
"message": "Keep those certs fresh"
}
t, _ = self.client.renewal_time(cert_pem)
cert_parsed = x509.load_pem_x509_certificate(cert_pem)
ari_path_component = _renewal_info_path_component(cert_parsed)
self.net.get.assert_called_once_with("https://www.letsencrypt-demo.org/acme/renewal-info/" +
ari_path_component,
content_type='application/json')
assert t == datetime.datetime(2025, 3, 14, 1, 1, 1, tzinfo=datetime.timezone.utc)
self.net.reset_mock()
self.response.json.return_value = {
"suggestedWindow": {
"start": "2025-03-16T01:01:01Z",
"end": "2025-03-17T01:01:01Z",
},
"message": "Keep those certs fresh"
}
t, _ = self.client.renewal_time(cert_pem)
self.net.get.assert_called_once_with("https://www.letsencrypt-demo.org/acme/renewal-info/" +
ari_path_component,
content_type='application/json')
assert t >= datetime.datetime(2025, 3, 16, 1, 1, 1, tzinfo=datetime.timezone.utc)
assert t <= datetime.datetime(2025, 3, 17, 1, 1, 1, tzinfo=datetime.timezone.utc)
def test_renewal_time_renewal_info_errors(self):
self.client.directory = messages.Directory({
'renewalInfo': 'https://www.letsencrypt-demo.org/acme/renewal-info',
})
# Failure to fetch the 'renewalInfo' URL should return default timings
self.net.get.side_effect = requests.exceptions.RequestException
cert_pem = make_cert_for_renewal(
not_before=datetime.datetime(2025, 3, 12, 00, 00, 00),
not_after=datetime.datetime(2025, 3, 20, 00, 00, 00),
)
t, _ = self.client.renewal_time(cert_pem)
assert t == datetime.datetime(2025, 3, 16, 00, 00, 00, tzinfo=datetime.timezone.utc)
cert_pem = make_cert_for_renewal(
not_before=datetime.datetime(2025, 3, 12, 00, 00, 00),
not_after=datetime.datetime(2025, 3, 30, 00, 00, 00),
)
t, _ = self.client.renewal_time(cert_pem)
assert t == datetime.datetime(2025, 3, 24, 00, 00, 00, tzinfo=datetime.timezone.utc)
@mock.patch('acme.client.datetime')
def test_renewal_time_returns_retry_after(self, dt_mock):
dt_mock.datetime.now.return_value = datetime.datetime(2025, 5, 12, 0, 0, 0)
dt_mock.timedelta = datetime.timedelta
self.client.directory = messages.Directory({
'renewalInfo': 'https://www.letsencrypt-demo.org/acme/renewal-info',
})
cert_pem = make_cert_for_renewal(
not_before=datetime.datetime(2025, 3, 12, 00, 00, 00),
not_after=datetime.datetime(2025, 3, 20, 00, 00, 00),
)
self.response.json.return_value = {
"suggestedWindow": {
"start": "2025-03-14T01:01:01Z",
"end": "2025-03-14T01:01:01Z",
},
"message": "Keep those certs fresh"
}
# With no explicit Retry-After in header, default to six hours
_, retry_after = self.client.renewal_time(cert_pem)
assert retry_after == datetime.datetime(2025, 5, 12, 6, 0, 0)
# With an explicit Retry-After in header, use that
self.response.headers['Retry-After'] = '100'
_, retry_after = self.client.renewal_time(cert_pem)
assert retry_after == datetime.datetime(2025, 5, 12, 00, 1, 40)
def test_renewal_info_path_component():
from cryptography import x509
from acme.client import _renewal_info_path_component
cert = x509.load_pem_x509_certificate(test_util.load_vector('rsa2048_cert.pem'))
assert _renewal_info_path_component(cert) == "fL5sRirC8VS5AtOQh9DfoAzYNCI.ALVG_VbBb5U7"
# From https://www.ietf.org/archive/id/draft-ietf-acme-ari-08.html appendix A.
ARI_TEST_CERT = b"""
-----BEGIN CERTIFICATE-----
MIIBQzCB66ADAgECAgUAh2VDITAKBggqhkjOPQQDAjAVMRMwEQYDVQQDEwpFeGFt
cGxlIENBMCIYDzAwMDEwMTAxMDAwMDAwWhgPMDAwMTAxMDEwMDAwMDBaMBYxFDAS
BgNVBAMTC2V4YW1wbGUuY29tMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEeBZu
7cbpAYNXZLbbh8rNIzuOoqOOtmxA1v7cRm//AwyMwWxyHz4zfwmBhcSrf47NUAFf
qzLQ2PPQxdTXREYEnKMjMCEwHwYDVR0jBBgwFoAUaYhba4dGQEHhs3uEe6CuLN4B
yNQwCgYIKoZIzj0EAwIDRwAwRAIge09+S5TZAlw5tgtiVvuERV6cT4mfutXIlwTb
+FYN/8oCIClDsqBklhB9KAelFiYt9+6FDj3z4KGVelYM5MdsO3pK
-----END CERTIFICATE-----
"""
cert = x509.load_pem_x509_certificate(ARI_TEST_CERT)
assert _renewal_info_path_component(cert) == "aYhba4dGQEHhs3uEe6CuLN4ByNQ.AIdlQyE"
if __name__ == '__main__':
sys.exit(pytest.main(sys.argv[1:] + [__file__])) # pragma: no cover
def make_cert_for_renewal(not_before, not_after) -> bytes:
"""
Return a PEM-encoded, self-signed certificate with the given dates.
"""
from cryptography import x509
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import serialization, hashes
# AKID and serial are the inputs to constructing the renewalInfo URL
akid = x509.AuthorityKeyIdentifier(b"1234", None, None)
serial = 56789
key = ec.generate_private_key(ec.SECP256R1())
cert = x509.CertificateBuilder(
issuer_name=x509.Name([x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, "Some Issuer")]),
subject_name=x509.Name([]),
public_key=key.public_key(),
serial_number=serial,
not_valid_before=not_before,
not_valid_after=not_after,
).add_extension(
x509.SubjectAlternativeName([x509.DNSName('example.com')]),
critical=False,
).add_extension(
akid,
critical=False,
).sign(
private_key=key,
algorithm=hashes.SHA256(),
)
return cert.public_bytes(serialization.Encoding.PEM)
class MockJSONDeSerializable(jose.JSONDeSerializable):
# pylint: disable=missing-docstring

View file

@ -4,6 +4,8 @@ import datetime
from email.utils import parsedate_tz
import http.client as http_client
import logging
import math
import random
import re
import time
from typing import Any
@ -302,6 +304,58 @@ class ClientV2:
raise e
return self.poll_finalization(orderr, deadline, fetch_alternative_chains)
def renewal_time(self, cert_pem: bytes) -> Tuple[datetime.datetime, datetime.datetime]:
"""Return an appropriate time to attempt renewal of the certificate,
and the next time to ask the ACME server for renewal info.
If the ACME directory has a "renewalInfo" field, the response will be
based on a fetch of the renewal info resource for the certificate
(https://www.ietf.org/archive/id/draft-ietf-acme-ari-08.html).
If there is no "renewalInfo" field, this function will fall back to
reasonable defaults based on the certificate lifetime.
This function may make other network calls in the future (e.g., OCSP
or CRL).
"""
now = datetime.datetime.now()
# https://www.ietf.org/archive/id/draft-ietf-acme-ari-08.html#section-4.3.3
default_retry_after = datetime.timedelta(seconds=6 * 60 * 60)
cert = x509.load_pem_x509_certificate(cert_pem)
not_before = cert.not_valid_before_utc
lifetime = cert.not_valid_after_utc - not_before
if lifetime.total_seconds() < 10 * 86400:
default_renewal_time = not_before + lifetime / 2
else:
default_renewal_time = not_before + lifetime * 2 / 3
try:
renewal_info_base_url = self.directory['renewalInfo']
except KeyError:
return default_renewal_time, now + default_retry_after
ari_url = renewal_info_base_url + '/' + _renewal_info_path_component(cert)
try:
resp = self.net.get(ari_url, content_type='application/json')
except (requests.exceptions.RequestException, messages.Error) as error:
logger.warning("failed to fetch renewal_info URL (%s): %s", ari_url, error)
return default_renewal_time, now + default_retry_after
renewal_info: messages.RenewalInfo = messages.RenewalInfo.from_json(resp.json())
start = renewal_info.suggested_window.start # pylint: disable=no-member
end = renewal_info.suggested_window.end # pylint: disable=no-member
delta_seconds = (end - start).total_seconds()
random_seconds = random.uniform(0, delta_seconds)
random_time = start + datetime.timedelta(seconds=random_seconds)
retry_after = self.retry_after(resp, default_retry_after.seconds)
return random_time, retry_after
def revoke(self, cert: x509.Certificate, rsn: int) -> None:
"""Revoke certificate.
@ -781,3 +835,22 @@ class ClientNetwork:
response = self._check_response(response, content_type=content_type)
self._add_nonce(response)
return response
def _renewal_info_path_component(cert: x509.Certificate) -> str:
akid_ext = cert.extensions.get_extension_for_oid(x509.ExtensionOID.AUTHORITY_KEY_IDENTIFIER)
key_identifier = akid_ext.value.key_identifier # type: ignore[attr-defined]
akid_encoded = base64.urlsafe_b64encode(key_identifier).decode('ascii').replace("=", "")
# We add one to the reported bit_length so there is room for the sign bit.
# https://docs.python.org/3/library/stdtypes.html#int.bit_length
# "Return the number of bits necessary to represent an integer in binary, excluding
# the sign and leading zeros"
serial = cert.serial_number
encoded_serial_len = math.ceil((serial.bit_length()+1)/8)
# Serials are encoded as ASN.1 INTEGERS, which means big endian and signed (two's complement).
# https://letsencrypt.org/docs/a-warm-welcome-to-asn1-and-der/#integer-encoding
serial_bytes = serial.to_bytes(encoded_serial_len, byteorder='big', signed=True)
serial_encoded = base64.urlsafe_b64encode(serial_bytes).decode('ascii').replace("=", "")
return f"{akid_encoded}.{serial_encoded}"

View file

@ -684,3 +684,19 @@ class OrderResource(ResourceWithURI):
class NewOrder(Order):
"""New order."""
class RenewalInfo(ResourceBody):
"""Renewal Info Resource Body.
:ivar acme.messages.SuggestedWindow window: The suggested renewal window.
"""
class SuggestedWindow(jose.JSONObjectWithFields):
"""Suggested Renewal Window, sub-resource of Renewal Info Resource.
:ivar datetime.datetime start: Beginning of suggested renewal window
:ivar datetime.datetime end: End of suggested renewal window (inclusive)
"""
start: datetime.datetime = fields.rfc3339('start', omitempty=True)
end: datetime.datetime = fields.rfc3339('end', omitempty=True)
suggested_window: SuggestedWindow = jose.field('suggestedWindow',
decoder=SuggestedWindow.from_json)

View file

@ -135,7 +135,7 @@ def main() -> None:
# Invoke certbot in test mode, without capturing output so users see directly the outcome.
command, env = _prepare_args_env(args, directory_url, http_01_port, tls_alpn_01_port,
config_dir, workspace, True)
config_dir, workspace, False)
subprocess.check_call(command, universal_newlines=True, cwd=workspace, env=env)

View file

@ -265,8 +265,14 @@ def _handle_identical_cert_request(config: configuration.NamespaceConfig,
if not lineage.ensure_deployed():
return "reinstall", lineage
if is_key_type_changing or renewal.should_renew(config, lineage):
if is_key_type_changing:
return "renew", lineage
acme = client.acme_from_config_key(config)
if renewal.should_renew(config, lineage, acme):
return "renew", lineage
if config.reinstall:
# Set with --reinstall, force an identical certificate to be
# reinstalled without further prompting.

View file

@ -1,6 +1,7 @@
"""Functionality for autorenewal and associated juggling of configurations"""
import copy
import datetime
import itertools
import logging
import random
@ -21,6 +22,8 @@ from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from acme import client as acme_client
from certbot import configuration
from certbot import crypto_util
from certbot import errors
@ -309,12 +312,14 @@ def _restore_str(name: str, value: str) -> Optional[str]:
return None if value == "None" else value
def should_renew(config: configuration.NamespaceConfig, lineage: storage.RenewableCert) -> bool:
def should_renew(config: configuration.NamespaceConfig,
lineage: storage.RenewableCert,
acme: acme_client.ClientV2) -> bool:
"""Return true if any of the circumstances for automatic renewal apply."""
if config.renew_by_default:
logger.debug("Auto-renewal forced with --force-renewal...")
return True
if lineage.should_autorenew():
if should_autorenew(lineage, acme):
logger.info("Certificate is due for renewal, auto-renewing...")
return True
if config.dry_run:
@ -323,6 +328,58 @@ def should_renew(config: configuration.NamespaceConfig, lineage: storage.Renewab
display_util.notify("Certificate not yet due for renewal")
return False
def should_autorenew(lineage: storage.RenewableCert, acme: acme_client.ClientV2) -> bool:
"""Should we now try to autorenew the most recent cert version?
If ACME Renewal Info (ARI) is available in the directory, check that first,
and renew if ARI indicates it is time, or if we are within the default
renweal window.
If the certificate has an OCSP URL, renew if it is revoked.
If neither of the above is true, but the "renew_before_expiry" config
indicates it is time, renew. Otherwise, don't.
Note that this examines the numerically most recent cert version,
not the currently deployed version.
:returns: whether an attempt should now be made to autorenew the
most current cert version in this lineage
:rtype: bool
"""
if lineage.autorenewal_is_enabled():
cert = lineage.version("cert", lineage.latest_common_version())
# Consider whether to attempt to autorenew this cert now
renewal_time = None
with open(cert, 'rb') as f:
cert_pem = f.read()
renewal_time, _ = acme.renewal_time(cert_pem)
now = datetime.datetime.now(datetime.timezone.utc)
if renewal_time and now > renewal_time:
return True
# Renewals on the basis of revocation
if lineage.ocsp_revoked(lineage.latest_common_version()):
logger.debug("Should renew, certificate is revoked.")
return True
# The "renew_before_expiry" config field can make us renew earlier
# than the default.
config_interval = lineage.configuration.get("renew_before_expiry")
notAfter = crypto_util.notAfter(cert)
if (config_interval is not None and
notAfter < storage.add_time_interval(now, config_interval)):
logger.debug("Should renew, less than %s before certificate "
"expiry %s.", config_interval,
notAfter.strftime("%Y-%m-%d %H:%M:%S %Z"))
return True
return False
def _avoid_invalidating_lineage(config: configuration.NamespaceConfig,
lineage: storage.RenewableCert, original_server: str) -> None:
@ -499,6 +556,11 @@ def handle_renewal_request(config: configuration.NamespaceConfig) -> Tuple[list,
# shutting down a web service) aren't prolonged unnecessarily.
apply_random_sleep = not sys.stdin.isatty() and config.random_sleep_on_renew
# We initialize acme clients on a per-server basis, but most
# lineages use the same server. Memoize clients here so we can
# share the connection pool and reuse a single fetched directory.
acme_clients = {}
for renewal_file in conf_files:
display_util.notification("Processing " + renewal_file, pause=False)
lineage_config = copy.deepcopy(config)
@ -520,10 +582,16 @@ def handle_renewal_request(config: configuration.NamespaceConfig) -> Tuple[list,
if not renewal_candidate:
parse_failures.append(renewal_file)
else:
server = lineage_config.server
if not server:
raise errors.Error(f"Renewal config for {lineage_config.names} has no server.")
if server not in acme_clients:
acme_clients[server] = client.acme_from_config_key(config)
renewal_candidate.ensure_deployed()
from certbot._internal import main
plugins = plugins_disco.PluginsRegistry.find_all()
if should_renew(lineage_config, renewal_candidate):
if should_renew(lineage_config, renewal_candidate, acme_clients[server]):
# Apply random sleep upon first renewal if needed
if apply_random_sleep:
sleep_time = random.uniform(1, 60 * 8)

View file

@ -977,58 +977,6 @@ class RenewableCert(interfaces.RenewableCert):
return ("autorenew" not in self.configuration["renewalparams"] or
self.configuration["renewalparams"].as_bool("autorenew"))
def should_autorenew(self) -> bool:
"""Should we now try to autorenew the most recent cert version?
This is a policy question and does not only depend on whether
the cert is expired. (This considers whether autorenewal is
enabled, whether the cert is revoked, and whether the time
interval for autorenewal has been reached.)
Note that this examines the numerically most recent cert version,
not the currently deployed version.
:returns: whether an attempt should now be made to autorenew the
most current cert version in this lineage
:rtype: bool
"""
if self.autorenewal_is_enabled():
# Consider whether to attempt to autorenew this cert now
# Renewals on the basis of revocation
if self.ocsp_revoked(self.latest_common_version()):
logger.debug("Should renew, certificate is revoked.")
return True
cert = self.version("cert", self.latest_common_version())
notBefore = crypto_util.notBefore(cert)
notAfter = crypto_util.notAfter(cert)
lifetime = notAfter - notBefore
config_interval = self.configuration.get("renew_before_expiry")
now = datetime.datetime.now(pytz.UTC)
if config_interval is not None and notAfter < add_time_interval(now, config_interval):
logger.debug("Should renew, less than %s before certificate "
"expiry %s.", config_interval,
notAfter.strftime("%Y-%m-%d %H:%M:%S %Z"))
return True
# No config for "renew_before_expiry", provide default behavior.
# For most certs, renew with 1/3 of certificate lifetime remaining.
# For short lived certificates, renew at 1/2 of certificate lifetime.
default_interval = lifetime / 3
if lifetime.total_seconds() < 10 * 86400:
default_interval = lifetime / 2
remaining_time = notAfter - now
if remaining_time < default_interval:
logger.debug("Should renew, less than %ss before certificate "
"expiry %s.", default_interval,
notAfter.strftime("%Y-%m-%d %H:%M:%S %Z"))
return True
return False
@classmethod
def new_lineage(cls, lineagename: str, cert: bytes, privkey: bytes, chain: bytes,
cli_config: configuration.NamespaceConfig) -> "RenewableCert":

View file

@ -1419,7 +1419,6 @@ class MainTest(test_util.ConfigTestCase):
'live/foo.bar/fullchain.pem'))
mock_lineage = mock.MagicMock(cert=cert_path, fullchain=chain_path,
cert_path=cert_path, fullchain_path=chain_path)
mock_lineage.should_autorenew.return_value = due_for_renewal
mock_lineage.has_pending_deployment.return_value = False
mock_lineage.names.return_value = ['isnot.org']
mock_lineage.private_key_type = 'ecdsa'
@ -1448,21 +1447,23 @@ class MainTest(test_util.ConfigTestCase):
as mock_crypto_util:
mock_crypto_util.notAfter.return_value = expiry_date
with mock.patch('certbot._internal.eff.handle_subscription'):
if not args:
args = ['-d', 'isnot.org', '-a', 'standalone', 'certonly']
if extra_args:
args += extra_args
try:
ret, stdout, _, _ = self._call(args, stdout)
if ret:
print("Returned", ret)
raise AssertionError(ret)
assert not error_expected, "renewal should have errored"
except: # pylint: disable=bare-except
if not error_expected:
raise AssertionError(
"Unexpected renewal error:\n" +
traceback.format_exc())
with mock.patch('certbot._internal.renewal.should_autorenew') as should_autorenew:
should_autorenew.return_value = due_for_renewal
if not args:
args = ['-d', 'isnot.org', '-a', 'standalone', 'certonly']
if extra_args:
args += extra_args
try:
ret, stdout, _, _ = self._call(args, stdout)
if ret:
print("Returned", ret)
raise AssertionError(ret)
assert not error_expected, "renewal should have errored"
except: # pylint: disable=bare-except
if not error_expected:
raise AssertionError(
"Unexpected renewal error:\n" +
traceback.format_exc())
if should_renew:
if reuse_key and not new_key:

View file

@ -1,10 +1,13 @@
"""Tests for certbot._internal.renewal"""
import copy
import datetime
import sys
import tempfile
import unittest
from unittest import mock
import pytest
import pytz
from acme import challenges
from certbot import configuration
@ -12,6 +15,30 @@ from certbot import errors
from certbot._internal import storage
import certbot.tests.util as test_util
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import serialization, hashes
from cryptography import x509
from cryptography.x509 import Certificate
def make_cert_with_lifetime(not_before: datetime.datetime, lifetime_days: int) -> bytes:
"""Return PEM of a self-signed certificate with the given notBefore and lifetime."""
key = ec.generate_private_key(ec.SECP256R1())
not_after=not_before + datetime.timedelta(days=lifetime_days)
cert = x509.CertificateBuilder(
issuer_name=x509.Name([]),
subject_name=x509.Name([]),
public_key=key.public_key(),
serial_number=x509.random_serial_number(),
not_valid_before=not_before,
not_valid_after=not_after,
).add_extension(
x509.SubjectAlternativeName([x509.DNSName("example.com")]),
critical=False,
).sign(
private_key=key,
algorithm=hashes.SHA256(),
)
return cert.public_bytes(serialization.Encoding.PEM)
class RenewalTest(test_util.ConfigTestCase):
@mock.patch.object(configuration.NamespaceConfig, 'set_by_user')
@ -188,6 +215,113 @@ class RenewalTest(test_util.ConfigTestCase):
renewal.reconstitute(lineage_config, rc_path)
assert lineage_config.key_type == 'rsa'
@test_util.patch_display_util()
@mock.patch('certbot._internal.client.acme_from_config_key')
@mock.patch('certbot._internal.main.renew_cert')
@mock.patch("certbot._internal.renewal.datetime")
def test_renewal_via_ari(self, mock_datetime, mock_renew_cert, mock_acme_from_config, unused_mock_display):
from certbot._internal import renewal
acme_client = mock.MagicMock()
mock_acme_from_config.return_value = acme_client
past = datetime.datetime(2025, 3, 19, 0, 0, 0, tzinfo=pytz.UTC)
now = datetime.datetime(2025, 4, 19, 0, 0, 0, tzinfo=pytz.UTC)
future = datetime.datetime(2025, 4, 19, 12, 0, 0, tzinfo=pytz.UTC)
mock_datetime.datetime.now.return_value = now
acme_client.renewal_time.return_value = past, future
test_util.make_lineage(self.config.config_dir, 'sample-renewal.conf', ec=False)
lineage_config = copy.deepcopy(self.config)
with mock.patch('time.sleep') as sleep:
renewal.handle_renewal_request(lineage_config)
mock_renew_cert.assert_called_once()
@mock.patch.object(configuration.NamespaceConfig, 'set_by_user')
@mock.patch("certbot._internal.renewal.datetime")
def test_renew_before_expiry(self, mock_datetime, mock_set_by_user):
"""When neither OCSP nor the ACME client indicate it's time to renew,
obey the renew_before_expiry config.
"""
from certbot._internal import renewal
# This certificate has a lifetime of 7 days, and the tests below
# that use a "None" interval (i.e. choose a default) rely on that fact.
#
# Not Before: Dec 11 22:34:45 2014 GMT
# Not After : Dec 18 22:34:45 2014 GMT
not_before = datetime.datetime(2014, 12, 11, 22, 34, 45)
short_cert = make_cert_with_lifetime(not_before, 7)
mock_acme = mock.MagicMock()
future = datetime.datetime.now(pytz.UTC) + datetime.timedelta(days=100000)
mock_acme.renewal_time.return_value = (future, future)
mock_renewable_cert = mock.MagicMock()
mock_renewable_cert.autorenewal_is_enabled.return_value = True
mock_renewable_cert.version.return_value = "/tmp/abc"
mock_renewable_cert.ocsp_revoked.return_value = False
mock_datetime.timedelta = datetime.timedelta
mock_set_by_user.return_value = False
with tempfile.NamedTemporaryFile() as tmp_cert:
tmp_cert.close() # close now because of compatibility issues on Windows
with open(tmp_cert.name, 'wb') as c:
c.write(short_cert)
mock_renewable_cert.version.return_value = tmp_cert.name
for (current_time, interval, result) in [
# 2014-12-13 12:00 (about 5 days prior to expiry)
# Times that should result in autorenewal/autodeployment
(1418472000, "2 months", True), (1418472000, "1 week", True),
# With the "default" logic, this 7-day certificate should autorenew
# at 3.5 days prior to expiry. We haven't reached that yet,
# so don't renew.
(1418472000, None, False),
# Times that should not renew
(1418472000, "4 days", False), (1418472000, "2 days", False),
# 2009-05-01 12:00:00+00:00 (about 5 years prior to expiry)
# Times that should result in autorenewal/autodeployment
(1241179200, "7 years", True),
(1241179200, "11 years 2 months", True),
# Times that should not renew
(1241179200, "8 hours", False), (1241179200, "2 days", False),
(1241179200, "40 days", False), (1241179200, "9 months", False),
# 2015-01-01 (after expiry has already happened, so all
# intervals should cause autorenewal/autodeployment)
(1420070400, "0 seconds", True),
(1420070400, "10 seconds", True),
(1420070400, "10 minutes", True),
(1420070400, "10 weeks", True), (1420070400, "10 months", True),
(1420070400, "10 years", True), (1420070400, "99 months", True),
]:
sometime = datetime.datetime.fromtimestamp(current_time, pytz.UTC)
mock_datetime.datetime.now.return_value = sometime
mock_renewable_cert.configuration = {"renew_before_expiry": interval}
assert renewal.should_autorenew(mock_renewable_cert, mock_acme) == result, f"at {current_time}, with config '{interval}', expected {result}"
@mock.patch.object(configuration.NamespaceConfig, 'set_by_user')
@mock.patch("certbot._internal.storage.RenewableCert.ocsp_revoked")
def test_should_autorenew(self, mock_ocsp, mock_set_by_user):
from certbot._internal import renewal
mock_set_by_user.return_value = False
mock_acme = mock.MagicMock()
future = datetime.datetime.now(pytz.UTC) + datetime.timedelta(seconds=1000)
mock_acme.renewal_time.return_value = (future, future)
# Autorenewal turned off
mock_rc = mock.MagicMock()
mock_rc.autorenewal_is_enabled.return_value = False
assert not renewal.should_autorenew(mock_rc, mock_acme)
mock_rc.autorenewal_is_enabled.return_value = True
# Mandatory renewal on the basis of OCSP revocation
mock_ocsp.return_value = True
assert renewal.should_autorenew(mock_rc, mock_acme)
mock_ocsp.return_value = False
class RestoreRequiredConfigElementsTest(test_util.ConfigTestCase):
"""Tests for certbot._internal.renewal.restore_required_config_elements."""

View file

@ -19,34 +19,9 @@ from certbot.compat import filesystem
from certbot.compat import os
import certbot.tests.util as test_util
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import serialization, hashes
from cryptography import x509
from cryptography.x509 import Certificate
import datetime
from typing import Optional, Any
def make_cert_with_lifetime(not_before: datetime.datetime, lifetime_days: int) -> bytes:
"""Return PEM of a self-signed certificate with the given notBefore and lifetime."""
key = ec.generate_private_key(ec.SECP256R1())
not_after=not_before + datetime.timedelta(days=lifetime_days)
cert = x509.CertificateBuilder(
issuer_name=x509.Name([]),
subject_name=x509.Name([]),
public_key=key.public_key(),
serial_number=x509.random_serial_number(),
not_valid_before=not_before,
not_valid_after=not_after,
).add_extension(
x509.SubjectAlternativeName([x509.DNSName("example.com")]),
critical=False,
).sign(
private_key=key,
algorithm=hashes.SHA256(),
)
return cert.public_bytes(serialization.Encoding.PEM)
def unlink_all(rc_object):
"""Unlink all four items associated with this RenewableCert."""
for kind in ALL_FOUR:
@ -472,86 +447,6 @@ class RenewableCertTests(BaseRenewableCertTest):
with pytest.raises(errors.CertStorageError):
self.test_rc.names()
@mock.patch.object(configuration.NamespaceConfig, 'set_by_user')
@mock.patch("certbot._internal.storage.datetime")
def test_time_interval_judgments(self, mock_datetime, mock_set_by_user):
"""Test should_autorenew() on the basis of expiry time windows."""
# Note: this certificate happens to have a lifetime of 7 days,
# and the tests below that use a "None" interval (i.e. choose a
# default) rely on that fact.
#
# Not Before: Dec 11 22:34:45 2014 GMT
# Not After : Dec 18 22:34:45 2014 GMT
not_before = datetime.datetime(2014, 12, 11, 22, 34, 45)
short_cert = make_cert_with_lifetime(not_before, 7)
self._write_out_ex_kinds()
self.test_rc.update_all_links_to(12)
with open(self.test_rc.cert, "wb") as f:
f.write(short_cert)
self.test_rc.update_all_links_to(11)
with open(self.test_rc.cert, "wb") as f:
f.write(short_cert)
mock_datetime.timedelta = datetime.timedelta
mock_set_by_user.return_value = False
self.test_rc.configuration["renewalparams"] = {}
for (current_time, interval, result) in [
# 2014-12-13 12:00 (about 5 days prior to expiry)
# Times that should result in autorenewal/autodeployment
(1418472000, "2 months", True), (1418472000, "1 week", True),
# With the "default" logic, this 7-day certificate should autorenew
# at 3.5 days prior to expiry. We haven't reached that yet,
# so don't renew.
(1418472000, None, False),
# 2014-12-16 03:20, a little less than 3.5 days to expiry.
(1418700000, None, True),
# Times that should not renew
(1418472000, "4 days", False), (1418472000, "2 days", False),
# 2009-05-01 12:00:00+00:00 (about 5 years prior to expiry)
# Times that should result in autorenewal/autodeployment
(1241179200, "7 years", True),
(1241179200, "11 years 2 months", True),
# Times that should not renew
(1241179200, "8 hours", False), (1241179200, "2 days", False),
(1241179200, "40 days", False), (1241179200, "9 months", False),
# 2015-01-01 (after expiry has already happened, so all
# intervals should cause autorenewal/autodeployment)
(1420070400, "0 seconds", True),
(1420070400, "10 seconds", True),
(1420070400, "10 minutes", True),
(1420070400, "10 weeks", True), (1420070400, "10 months", True),
(1420070400, "10 years", True), (1420070400, "99 months", True),
(1420070400, None, True)
]:
sometime = datetime.datetime.fromtimestamp(current_time, pytz.UTC)
mock_datetime.datetime.now.return_value = sometime
self.test_rc.configuration["renew_before_expiry"] = interval
assert self.test_rc.should_autorenew() == result
# Lifetime: 31 years
# Default renewal: about 10 years from expiry
# Not Before: May 29 07:42:01 2017 GMT
# Not After : Mar 30 07:42:01 2048 GMT
not_before=datetime.datetime(2017, 5, 29, 7, 42, 1)
long_cert = make_cert_with_lifetime(not_before, 31 * 365)
self.test_rc.update_all_links_to(12)
with open(self.test_rc.cert, "wb") as f:
f.write(long_cert)
self.test_rc.update_all_links_to(11)
with open(self.test_rc.cert, "wb") as f:
f.write(long_cert)
for (current_time, result) in [
(2114380800, False), # 2037-01-01
(2148000000, True), # 2038-01-25
]:
sometime = datetime.datetime.fromtimestamp(current_time, pytz.UTC)
mock_datetime.datetime.now.return_value = sometime
self.test_rc.configuration["renew_before_expiry"] = interval
assert self.test_rc.should_autorenew() == result
def test_autorenewal_is_enabled(self):
self.test_rc.configuration["renewalparams"] = {}
assert self.test_rc.autorenewal_is_enabled()
@ -561,23 +456,6 @@ class RenewableCertTests(BaseRenewableCertTest):
self.test_rc.configuration["renewalparams"]["autorenew"] = "False"
assert not self.test_rc.autorenewal_is_enabled()
@mock.patch.object(configuration.NamespaceConfig, 'set_by_user')
@mock.patch("certbot._internal.storage.RenewableCert.ocsp_revoked")
def test_should_autorenew(self, mock_ocsp, mock_set_by_user):
"""Test should_autorenew on the basis of reasons other than
expiry time window."""
mock_set_by_user.return_value = False
# Autorenewal turned off
self.test_rc.configuration["renewalparams"] = {"autorenew": "False"}
assert not self.test_rc.should_autorenew()
self.test_rc.configuration["renewalparams"]["autorenew"] = "True"
for kind in ALL_FOUR:
self._write_out_kind(kind, 12)
# Mandatory renewal on the basis of OCSP revocation
mock_ocsp.return_value = True
assert self.test_rc.should_autorenew()
mock_ocsp.return_value = False
@mock.patch("certbot._internal.storage.relevant_values")
def test_save_successor(self, mock_rv):
# Mock relevant_values() to claim that all values are relevant here