From 723fe64d4dbc778f1e8fd0b93320c1a30dc6fa95 Mon Sep 17 00:00:00 2001 From: Jacob Hoffman-Andrews Date: Tue, 13 May 2025 10:34:19 -0700 Subject: [PATCH] Add ARI support to acme module and to Certbot (#10272) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to #10241. The acme module code is mostly the same, except the switch to return a tuple containing Retry-After. This includes the CLI-side work to call out to the new `renewal_time` method when checking for renewal. I moved `should_autorenew` from `storage.py` into `renewal.py`, where it fits better (and also this solves an import cycle problem). To make the edits more visible I split this into one commit for the move and [one commit for the subsequent edits](https://github.com/certbot/certbot/pull/10272/commits/4e137d9b00fb0a299723978d7a289b881716d36b#diff-fad906e31304c767d620bfd243f4c7adf1e63a3420fd634ee57a0f6651c182cf). This does not yet attempt to store the Retry-After info, or failure retries, in renewal configs. I figured since that's a pretty big chunk of work and design on its own, I wanted to get interim feedback as is. I think this PR would be okay to land with the current default crons / systemd timers that run twice a day. I think we should implement storage of retry information before increasing the frequency of runs. And if the team would like to hold off on landing any ARI until that storage is done, I'm good with that too. 👍🏻 --- acme/src/acme/_internal/tests/client_test.py | 166 ++++++++++++++++++ acme/src/acme/client.py | 73 ++++++++ acme/src/acme/messages.py | 16 ++ .../utils/certbot_call.py | 2 +- certbot/src/certbot/_internal/main.py | 8 +- certbot/src/certbot/_internal/renewal.py | 74 +++++++- certbot/src/certbot/_internal/storage.py | 52 ------ .../src/certbot/_internal/tests/main_test.py | 33 ++-- .../certbot/_internal/tests/renewal_test.py | 134 ++++++++++++++ .../certbot/_internal/tests/storage_test.py | 122 ------------- 10 files changed, 485 insertions(+), 195 deletions(-) diff --git a/acme/src/acme/_internal/tests/client_test.py b/acme/src/acme/_internal/tests/client_test.py index fb900e38c..11d56cdd3 100644 --- a/acme/src/acme/_internal/tests/client_test.py +++ b/acme/src/acme/_internal/tests/client_test.py @@ -452,6 +452,172 @@ class ClientV2Test(unittest.TestCase): assert DIRECTORY_V2.to_partial_json() == \ ClientV2.get_directory('https://example.com/dir', self.net).to_partial_json() + def test_renewal_time_no_renewal_info(self): + # A directory with no 'renewalInfo' should result in default renewal periods. + self.client.directory = messages.Directory({}) + cert_pem = make_cert_for_renewal( + not_before=datetime.datetime(2025, 3, 12, 00, 00, 00), + not_after=datetime.datetime(2025, 3, 20, 00, 00, 00), + ) + t, _ = self.client.renewal_time(cert_pem) + assert t == datetime.datetime(2025, 3, 16, 00, 00, 00, tzinfo=datetime.timezone.utc) + + cert_pem = make_cert_for_renewal( + not_before=datetime.datetime(2025, 3, 12, 00, 00, 00), + not_after=datetime.datetime(2025, 3, 30, 00, 00, 00), + ) + t, _ = self.client.renewal_time(cert_pem) + assert t == datetime.datetime(2025, 3, 24, 00, 00, 00, tzinfo=datetime.timezone.utc) + + def test_renewal_time_with_renewal_info(self): + from cryptography import x509 + from acme.client import _renewal_info_path_component + cert_pem = make_cert_for_renewal( + not_before=datetime.datetime(2025, 3, 12, 00, 00, 00), + not_after=datetime.datetime(2025, 3, 20, 00, 00, 00), + ) + + self.client.directory = messages.Directory({ + 'renewalInfo': 'https://www.letsencrypt-demo.org/acme/renewal-info', + }) + + self.response.json.return_value = { + "suggestedWindow": { + "start": "2025-03-14T01:01:01Z", + "end": "2025-03-14T01:01:01Z", + }, + "message": "Keep those certs fresh" + } + t, _ = self.client.renewal_time(cert_pem) + cert_parsed = x509.load_pem_x509_certificate(cert_pem) + ari_path_component = _renewal_info_path_component(cert_parsed) + self.net.get.assert_called_once_with("https://www.letsencrypt-demo.org/acme/renewal-info/" + + ari_path_component, + content_type='application/json') + assert t == datetime.datetime(2025, 3, 14, 1, 1, 1, tzinfo=datetime.timezone.utc) + + self.net.reset_mock() + + self.response.json.return_value = { + "suggestedWindow": { + "start": "2025-03-16T01:01:01Z", + "end": "2025-03-17T01:01:01Z", + }, + "message": "Keep those certs fresh" + } + t, _ = self.client.renewal_time(cert_pem) + self.net.get.assert_called_once_with("https://www.letsencrypt-demo.org/acme/renewal-info/" + + ari_path_component, + content_type='application/json') + assert t >= datetime.datetime(2025, 3, 16, 1, 1, 1, tzinfo=datetime.timezone.utc) + assert t <= datetime.datetime(2025, 3, 17, 1, 1, 1, tzinfo=datetime.timezone.utc) + + def test_renewal_time_renewal_info_errors(self): + self.client.directory = messages.Directory({ + 'renewalInfo': 'https://www.letsencrypt-demo.org/acme/renewal-info', + }) + # Failure to fetch the 'renewalInfo' URL should return default timings + self.net.get.side_effect = requests.exceptions.RequestException + + cert_pem = make_cert_for_renewal( + not_before=datetime.datetime(2025, 3, 12, 00, 00, 00), + not_after=datetime.datetime(2025, 3, 20, 00, 00, 00), + ) + t, _ = self.client.renewal_time(cert_pem) + assert t == datetime.datetime(2025, 3, 16, 00, 00, 00, tzinfo=datetime.timezone.utc) + + cert_pem = make_cert_for_renewal( + not_before=datetime.datetime(2025, 3, 12, 00, 00, 00), + not_after=datetime.datetime(2025, 3, 30, 00, 00, 00), + ) + t, _ = self.client.renewal_time(cert_pem) + assert t == datetime.datetime(2025, 3, 24, 00, 00, 00, tzinfo=datetime.timezone.utc) + + @mock.patch('acme.client.datetime') + def test_renewal_time_returns_retry_after(self, dt_mock): + dt_mock.datetime.now.return_value = datetime.datetime(2025, 5, 12, 0, 0, 0) + dt_mock.timedelta = datetime.timedelta + + self.client.directory = messages.Directory({ + 'renewalInfo': 'https://www.letsencrypt-demo.org/acme/renewal-info', + }) + cert_pem = make_cert_for_renewal( + not_before=datetime.datetime(2025, 3, 12, 00, 00, 00), + not_after=datetime.datetime(2025, 3, 20, 00, 00, 00), + ) + self.response.json.return_value = { + "suggestedWindow": { + "start": "2025-03-14T01:01:01Z", + "end": "2025-03-14T01:01:01Z", + }, + "message": "Keep those certs fresh" + } + + # With no explicit Retry-After in header, default to six hours + _, retry_after = self.client.renewal_time(cert_pem) + assert retry_after == datetime.datetime(2025, 5, 12, 6, 0, 0) + + # With an explicit Retry-After in header, use that + self.response.headers['Retry-After'] = '100' + _, retry_after = self.client.renewal_time(cert_pem) + assert retry_after == datetime.datetime(2025, 5, 12, 00, 1, 40) + +def test_renewal_info_path_component(): + from cryptography import x509 + from acme.client import _renewal_info_path_component + + cert = x509.load_pem_x509_certificate(test_util.load_vector('rsa2048_cert.pem')) + + assert _renewal_info_path_component(cert) == "fL5sRirC8VS5AtOQh9DfoAzYNCI.ALVG_VbBb5U7" + + # From https://www.ietf.org/archive/id/draft-ietf-acme-ari-08.html appendix A. + ARI_TEST_CERT = b""" +-----BEGIN CERTIFICATE----- +MIIBQzCB66ADAgECAgUAh2VDITAKBggqhkjOPQQDAjAVMRMwEQYDVQQDEwpFeGFt +cGxlIENBMCIYDzAwMDEwMTAxMDAwMDAwWhgPMDAwMTAxMDEwMDAwMDBaMBYxFDAS +BgNVBAMTC2V4YW1wbGUuY29tMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEeBZu +7cbpAYNXZLbbh8rNIzuOoqOOtmxA1v7cRm//AwyMwWxyHz4zfwmBhcSrf47NUAFf +qzLQ2PPQxdTXREYEnKMjMCEwHwYDVR0jBBgwFoAUaYhba4dGQEHhs3uEe6CuLN4B +yNQwCgYIKoZIzj0EAwIDRwAwRAIge09+S5TZAlw5tgtiVvuERV6cT4mfutXIlwTb ++FYN/8oCIClDsqBklhB9KAelFiYt9+6FDj3z4KGVelYM5MdsO3pK +-----END CERTIFICATE----- +""" + + cert = x509.load_pem_x509_certificate(ARI_TEST_CERT) + assert _renewal_info_path_component(cert) == "aYhba4dGQEHhs3uEe6CuLN4ByNQ.AIdlQyE" + +if __name__ == '__main__': + sys.exit(pytest.main(sys.argv[1:] + [__file__])) # pragma: no cover + +def make_cert_for_renewal(not_before, not_after) -> bytes: + """ + Return a PEM-encoded, self-signed certificate with the given dates. + """ + from cryptography import x509 + from cryptography.hazmat.primitives.asymmetric import ec + from cryptography.hazmat.primitives import serialization, hashes + # AKID and serial are the inputs to constructing the renewalInfo URL + akid = x509.AuthorityKeyIdentifier(b"1234", None, None) + serial = 56789 + key = ec.generate_private_key(ec.SECP256R1()) + cert = x509.CertificateBuilder( + issuer_name=x509.Name([x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, "Some Issuer")]), + subject_name=x509.Name([]), + public_key=key.public_key(), + serial_number=serial, + not_valid_before=not_before, + not_valid_after=not_after, + ).add_extension( + x509.SubjectAlternativeName([x509.DNSName('example.com')]), + critical=False, + ).add_extension( + akid, + critical=False, + ).sign( + private_key=key, + algorithm=hashes.SHA256(), + ) + return cert.public_bytes(serialization.Encoding.PEM) class MockJSONDeSerializable(jose.JSONDeSerializable): # pylint: disable=missing-docstring diff --git a/acme/src/acme/client.py b/acme/src/acme/client.py index 02ed8ad0b..763554058 100644 --- a/acme/src/acme/client.py +++ b/acme/src/acme/client.py @@ -4,6 +4,8 @@ import datetime from email.utils import parsedate_tz import http.client as http_client import logging +import math +import random import re import time from typing import Any @@ -302,6 +304,58 @@ class ClientV2: raise e return self.poll_finalization(orderr, deadline, fetch_alternative_chains) + def renewal_time(self, cert_pem: bytes) -> Tuple[datetime.datetime, datetime.datetime]: + """Return an appropriate time to attempt renewal of the certificate, + and the next time to ask the ACME server for renewal info. + + If the ACME directory has a "renewalInfo" field, the response will be + based on a fetch of the renewal info resource for the certificate + (https://www.ietf.org/archive/id/draft-ietf-acme-ari-08.html). + + If there is no "renewalInfo" field, this function will fall back to + reasonable defaults based on the certificate lifetime. + + This function may make other network calls in the future (e.g., OCSP + or CRL). + """ + now = datetime.datetime.now() + # https://www.ietf.org/archive/id/draft-ietf-acme-ari-08.html#section-4.3.3 + default_retry_after = datetime.timedelta(seconds=6 * 60 * 60) + + cert = x509.load_pem_x509_certificate(cert_pem) + + not_before = cert.not_valid_before_utc + lifetime = cert.not_valid_after_utc - not_before + if lifetime.total_seconds() < 10 * 86400: + default_renewal_time = not_before + lifetime / 2 + else: + default_renewal_time = not_before + lifetime * 2 / 3 + + try: + renewal_info_base_url = self.directory['renewalInfo'] + except KeyError: + return default_renewal_time, now + default_retry_after + + ari_url = renewal_info_base_url + '/' + _renewal_info_path_component(cert) + try: + resp = self.net.get(ari_url, content_type='application/json') + except (requests.exceptions.RequestException, messages.Error) as error: + logger.warning("failed to fetch renewal_info URL (%s): %s", ari_url, error) + return default_renewal_time, now + default_retry_after + + renewal_info: messages.RenewalInfo = messages.RenewalInfo.from_json(resp.json()) + + start = renewal_info.suggested_window.start # pylint: disable=no-member + end = renewal_info.suggested_window.end # pylint: disable=no-member + + delta_seconds = (end - start).total_seconds() + random_seconds = random.uniform(0, delta_seconds) + random_time = start + datetime.timedelta(seconds=random_seconds) + + retry_after = self.retry_after(resp, default_retry_after.seconds) + return random_time, retry_after + + def revoke(self, cert: x509.Certificate, rsn: int) -> None: """Revoke certificate. @@ -781,3 +835,22 @@ class ClientNetwork: response = self._check_response(response, content_type=content_type) self._add_nonce(response) return response + +def _renewal_info_path_component(cert: x509.Certificate) -> str: + akid_ext = cert.extensions.get_extension_for_oid(x509.ExtensionOID.AUTHORITY_KEY_IDENTIFIER) + key_identifier = akid_ext.value.key_identifier # type: ignore[attr-defined] + + akid_encoded = base64.urlsafe_b64encode(key_identifier).decode('ascii').replace("=", "") + + # We add one to the reported bit_length so there is room for the sign bit. + # https://docs.python.org/3/library/stdtypes.html#int.bit_length + # "Return the number of bits necessary to represent an integer in binary, excluding + # the sign and leading zeros" + serial = cert.serial_number + encoded_serial_len = math.ceil((serial.bit_length()+1)/8) + # Serials are encoded as ASN.1 INTEGERS, which means big endian and signed (two's complement). + # https://letsencrypt.org/docs/a-warm-welcome-to-asn1-and-der/#integer-encoding + serial_bytes = serial.to_bytes(encoded_serial_len, byteorder='big', signed=True) + serial_encoded = base64.urlsafe_b64encode(serial_bytes).decode('ascii').replace("=", "") + + return f"{akid_encoded}.{serial_encoded}" diff --git a/acme/src/acme/messages.py b/acme/src/acme/messages.py index b26082181..0ed1f5c78 100644 --- a/acme/src/acme/messages.py +++ b/acme/src/acme/messages.py @@ -684,3 +684,19 @@ class OrderResource(ResourceWithURI): class NewOrder(Order): """New order.""" + + +class RenewalInfo(ResourceBody): + """Renewal Info Resource Body. + :ivar acme.messages.SuggestedWindow window: The suggested renewal window. + """ + class SuggestedWindow(jose.JSONObjectWithFields): + """Suggested Renewal Window, sub-resource of Renewal Info Resource. + :ivar datetime.datetime start: Beginning of suggested renewal window + :ivar datetime.datetime end: End of suggested renewal window (inclusive) + """ + start: datetime.datetime = fields.rfc3339('start', omitempty=True) + end: datetime.datetime = fields.rfc3339('end', omitempty=True) + + suggested_window: SuggestedWindow = jose.field('suggestedWindow', + decoder=SuggestedWindow.from_json) diff --git a/certbot-ci/src/certbot_integration_tests/utils/certbot_call.py b/certbot-ci/src/certbot_integration_tests/utils/certbot_call.py index 757b5be2f..89210be9a 100755 --- a/certbot-ci/src/certbot_integration_tests/utils/certbot_call.py +++ b/certbot-ci/src/certbot_integration_tests/utils/certbot_call.py @@ -135,7 +135,7 @@ def main() -> None: # Invoke certbot in test mode, without capturing output so users see directly the outcome. command, env = _prepare_args_env(args, directory_url, http_01_port, tls_alpn_01_port, - config_dir, workspace, True) + config_dir, workspace, False) subprocess.check_call(command, universal_newlines=True, cwd=workspace, env=env) diff --git a/certbot/src/certbot/_internal/main.py b/certbot/src/certbot/_internal/main.py index aee68ac67..3130904bf 100644 --- a/certbot/src/certbot/_internal/main.py +++ b/certbot/src/certbot/_internal/main.py @@ -265,8 +265,14 @@ def _handle_identical_cert_request(config: configuration.NamespaceConfig, if not lineage.ensure_deployed(): return "reinstall", lineage - if is_key_type_changing or renewal.should_renew(config, lineage): + if is_key_type_changing: return "renew", lineage + + acme = client.acme_from_config_key(config) + + if renewal.should_renew(config, lineage, acme): + return "renew", lineage + if config.reinstall: # Set with --reinstall, force an identical certificate to be # reinstalled without further prompting. diff --git a/certbot/src/certbot/_internal/renewal.py b/certbot/src/certbot/_internal/renewal.py index 5d74c76b0..744bb4332 100644 --- a/certbot/src/certbot/_internal/renewal.py +++ b/certbot/src/certbot/_internal/renewal.py @@ -1,6 +1,7 @@ """Functionality for autorenewal and associated juggling of configurations""" import copy +import datetime import itertools import logging import random @@ -21,6 +22,8 @@ from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives.serialization import load_pem_private_key +from acme import client as acme_client + from certbot import configuration from certbot import crypto_util from certbot import errors @@ -309,12 +312,14 @@ def _restore_str(name: str, value: str) -> Optional[str]: return None if value == "None" else value -def should_renew(config: configuration.NamespaceConfig, lineage: storage.RenewableCert) -> bool: +def should_renew(config: configuration.NamespaceConfig, + lineage: storage.RenewableCert, + acme: acme_client.ClientV2) -> bool: """Return true if any of the circumstances for automatic renewal apply.""" if config.renew_by_default: logger.debug("Auto-renewal forced with --force-renewal...") return True - if lineage.should_autorenew(): + if should_autorenew(lineage, acme): logger.info("Certificate is due for renewal, auto-renewing...") return True if config.dry_run: @@ -323,6 +328,58 @@ def should_renew(config: configuration.NamespaceConfig, lineage: storage.Renewab display_util.notify("Certificate not yet due for renewal") return False +def should_autorenew(lineage: storage.RenewableCert, acme: acme_client.ClientV2) -> bool: + """Should we now try to autorenew the most recent cert version? + + If ACME Renewal Info (ARI) is available in the directory, check that first, + and renew if ARI indicates it is time, or if we are within the default + renweal window. + + If the certificate has an OCSP URL, renew if it is revoked. + + If neither of the above is true, but the "renew_before_expiry" config + indicates it is time, renew. Otherwise, don't. + + Note that this examines the numerically most recent cert version, + not the currently deployed version. + + :returns: whether an attempt should now be made to autorenew the + most current cert version in this lineage + :rtype: bool + + """ + if lineage.autorenewal_is_enabled(): + cert = lineage.version("cert", lineage.latest_common_version()) + + # Consider whether to attempt to autorenew this cert now + renewal_time = None + with open(cert, 'rb') as f: + cert_pem = f.read() + renewal_time, _ = acme.renewal_time(cert_pem) + + now = datetime.datetime.now(datetime.timezone.utc) + + if renewal_time and now > renewal_time: + return True + + # Renewals on the basis of revocation + if lineage.ocsp_revoked(lineage.latest_common_version()): + logger.debug("Should renew, certificate is revoked.") + return True + + # The "renew_before_expiry" config field can make us renew earlier + # than the default. + config_interval = lineage.configuration.get("renew_before_expiry") + notAfter = crypto_util.notAfter(cert) + if (config_interval is not None and + notAfter < storage.add_time_interval(now, config_interval)): + logger.debug("Should renew, less than %s before certificate " + "expiry %s.", config_interval, + notAfter.strftime("%Y-%m-%d %H:%M:%S %Z")) + return True + + return False + def _avoid_invalidating_lineage(config: configuration.NamespaceConfig, lineage: storage.RenewableCert, original_server: str) -> None: @@ -499,6 +556,11 @@ def handle_renewal_request(config: configuration.NamespaceConfig) -> Tuple[list, # shutting down a web service) aren't prolonged unnecessarily. apply_random_sleep = not sys.stdin.isatty() and config.random_sleep_on_renew + # We initialize acme clients on a per-server basis, but most + # lineages use the same server. Memoize clients here so we can + # share the connection pool and reuse a single fetched directory. + acme_clients = {} + for renewal_file in conf_files: display_util.notification("Processing " + renewal_file, pause=False) lineage_config = copy.deepcopy(config) @@ -520,10 +582,16 @@ def handle_renewal_request(config: configuration.NamespaceConfig) -> Tuple[list, if not renewal_candidate: parse_failures.append(renewal_file) else: + server = lineage_config.server + if not server: + raise errors.Error(f"Renewal config for {lineage_config.names} has no server.") + if server not in acme_clients: + acme_clients[server] = client.acme_from_config_key(config) + renewal_candidate.ensure_deployed() from certbot._internal import main plugins = plugins_disco.PluginsRegistry.find_all() - if should_renew(lineage_config, renewal_candidate): + if should_renew(lineage_config, renewal_candidate, acme_clients[server]): # Apply random sleep upon first renewal if needed if apply_random_sleep: sleep_time = random.uniform(1, 60 * 8) diff --git a/certbot/src/certbot/_internal/storage.py b/certbot/src/certbot/_internal/storage.py index a03848ae6..02e84ff5b 100644 --- a/certbot/src/certbot/_internal/storage.py +++ b/certbot/src/certbot/_internal/storage.py @@ -977,58 +977,6 @@ class RenewableCert(interfaces.RenewableCert): return ("autorenew" not in self.configuration["renewalparams"] or self.configuration["renewalparams"].as_bool("autorenew")) - def should_autorenew(self) -> bool: - """Should we now try to autorenew the most recent cert version? - - This is a policy question and does not only depend on whether - the cert is expired. (This considers whether autorenewal is - enabled, whether the cert is revoked, and whether the time - interval for autorenewal has been reached.) - - Note that this examines the numerically most recent cert version, - not the currently deployed version. - - :returns: whether an attempt should now be made to autorenew the - most current cert version in this lineage - :rtype: bool - - """ - if self.autorenewal_is_enabled(): - # Consider whether to attempt to autorenew this cert now - - # Renewals on the basis of revocation - if self.ocsp_revoked(self.latest_common_version()): - logger.debug("Should renew, certificate is revoked.") - return True - - cert = self.version("cert", self.latest_common_version()) - notBefore = crypto_util.notBefore(cert) - notAfter = crypto_util.notAfter(cert) - lifetime = notAfter - notBefore - - config_interval = self.configuration.get("renew_before_expiry") - now = datetime.datetime.now(pytz.UTC) - if config_interval is not None and notAfter < add_time_interval(now, config_interval): - logger.debug("Should renew, less than %s before certificate " - "expiry %s.", config_interval, - notAfter.strftime("%Y-%m-%d %H:%M:%S %Z")) - return True - - # No config for "renew_before_expiry", provide default behavior. - # For most certs, renew with 1/3 of certificate lifetime remaining. - # For short lived certificates, renew at 1/2 of certificate lifetime. - default_interval = lifetime / 3 - if lifetime.total_seconds() < 10 * 86400: - default_interval = lifetime / 2 - remaining_time = notAfter - now - if remaining_time < default_interval: - logger.debug("Should renew, less than %ss before certificate " - "expiry %s.", default_interval, - notAfter.strftime("%Y-%m-%d %H:%M:%S %Z")) - return True - - return False - @classmethod def new_lineage(cls, lineagename: str, cert: bytes, privkey: bytes, chain: bytes, cli_config: configuration.NamespaceConfig) -> "RenewableCert": diff --git a/certbot/src/certbot/_internal/tests/main_test.py b/certbot/src/certbot/_internal/tests/main_test.py index 6693a0500..2176c9da8 100644 --- a/certbot/src/certbot/_internal/tests/main_test.py +++ b/certbot/src/certbot/_internal/tests/main_test.py @@ -1419,7 +1419,6 @@ class MainTest(test_util.ConfigTestCase): 'live/foo.bar/fullchain.pem')) mock_lineage = mock.MagicMock(cert=cert_path, fullchain=chain_path, cert_path=cert_path, fullchain_path=chain_path) - mock_lineage.should_autorenew.return_value = due_for_renewal mock_lineage.has_pending_deployment.return_value = False mock_lineage.names.return_value = ['isnot.org'] mock_lineage.private_key_type = 'ecdsa' @@ -1448,21 +1447,23 @@ class MainTest(test_util.ConfigTestCase): as mock_crypto_util: mock_crypto_util.notAfter.return_value = expiry_date with mock.patch('certbot._internal.eff.handle_subscription'): - if not args: - args = ['-d', 'isnot.org', '-a', 'standalone', 'certonly'] - if extra_args: - args += extra_args - try: - ret, stdout, _, _ = self._call(args, stdout) - if ret: - print("Returned", ret) - raise AssertionError(ret) - assert not error_expected, "renewal should have errored" - except: # pylint: disable=bare-except - if not error_expected: - raise AssertionError( - "Unexpected renewal error:\n" + - traceback.format_exc()) + with mock.patch('certbot._internal.renewal.should_autorenew') as should_autorenew: + should_autorenew.return_value = due_for_renewal + if not args: + args = ['-d', 'isnot.org', '-a', 'standalone', 'certonly'] + if extra_args: + args += extra_args + try: + ret, stdout, _, _ = self._call(args, stdout) + if ret: + print("Returned", ret) + raise AssertionError(ret) + assert not error_expected, "renewal should have errored" + except: # pylint: disable=bare-except + if not error_expected: + raise AssertionError( + "Unexpected renewal error:\n" + + traceback.format_exc()) if should_renew: if reuse_key and not new_key: diff --git a/certbot/src/certbot/_internal/tests/renewal_test.py b/certbot/src/certbot/_internal/tests/renewal_test.py index 239744692..5ca6038fa 100644 --- a/certbot/src/certbot/_internal/tests/renewal_test.py +++ b/certbot/src/certbot/_internal/tests/renewal_test.py @@ -1,10 +1,13 @@ """Tests for certbot._internal.renewal""" import copy +import datetime import sys +import tempfile import unittest from unittest import mock import pytest +import pytz from acme import challenges from certbot import configuration @@ -12,6 +15,30 @@ from certbot import errors from certbot._internal import storage import certbot.tests.util as test_util +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives import serialization, hashes +from cryptography import x509 +from cryptography.x509 import Certificate + +def make_cert_with_lifetime(not_before: datetime.datetime, lifetime_days: int) -> bytes: + """Return PEM of a self-signed certificate with the given notBefore and lifetime.""" + key = ec.generate_private_key(ec.SECP256R1()) + not_after=not_before + datetime.timedelta(days=lifetime_days) + cert = x509.CertificateBuilder( + issuer_name=x509.Name([]), + subject_name=x509.Name([]), + public_key=key.public_key(), + serial_number=x509.random_serial_number(), + not_valid_before=not_before, + not_valid_after=not_after, + ).add_extension( + x509.SubjectAlternativeName([x509.DNSName("example.com")]), + critical=False, + ).sign( + private_key=key, + algorithm=hashes.SHA256(), + ) + return cert.public_bytes(serialization.Encoding.PEM) class RenewalTest(test_util.ConfigTestCase): @mock.patch.object(configuration.NamespaceConfig, 'set_by_user') @@ -188,6 +215,113 @@ class RenewalTest(test_util.ConfigTestCase): renewal.reconstitute(lineage_config, rc_path) assert lineage_config.key_type == 'rsa' + @test_util.patch_display_util() + @mock.patch('certbot._internal.client.acme_from_config_key') + @mock.patch('certbot._internal.main.renew_cert') + @mock.patch("certbot._internal.renewal.datetime") + def test_renewal_via_ari(self, mock_datetime, mock_renew_cert, mock_acme_from_config, unused_mock_display): + from certbot._internal import renewal + acme_client = mock.MagicMock() + mock_acme_from_config.return_value = acme_client + past = datetime.datetime(2025, 3, 19, 0, 0, 0, tzinfo=pytz.UTC) + now = datetime.datetime(2025, 4, 19, 0, 0, 0, tzinfo=pytz.UTC) + future = datetime.datetime(2025, 4, 19, 12, 0, 0, tzinfo=pytz.UTC) + mock_datetime.datetime.now.return_value = now + acme_client.renewal_time.return_value = past, future + + test_util.make_lineage(self.config.config_dir, 'sample-renewal.conf', ec=False) + lineage_config = copy.deepcopy(self.config) + + with mock.patch('time.sleep') as sleep: + renewal.handle_renewal_request(lineage_config) + + mock_renew_cert.assert_called_once() + + @mock.patch.object(configuration.NamespaceConfig, 'set_by_user') + @mock.patch("certbot._internal.renewal.datetime") + def test_renew_before_expiry(self, mock_datetime, mock_set_by_user): + """When neither OCSP nor the ACME client indicate it's time to renew, + obey the renew_before_expiry config. + """ + from certbot._internal import renewal + + # This certificate has a lifetime of 7 days, and the tests below + # that use a "None" interval (i.e. choose a default) rely on that fact. + # + # Not Before: Dec 11 22:34:45 2014 GMT + # Not After : Dec 18 22:34:45 2014 GMT + not_before = datetime.datetime(2014, 12, 11, 22, 34, 45) + short_cert = make_cert_with_lifetime(not_before, 7) + + mock_acme = mock.MagicMock() + future = datetime.datetime.now(pytz.UTC) + datetime.timedelta(days=100000) + mock_acme.renewal_time.return_value = (future, future) + + mock_renewable_cert = mock.MagicMock() + mock_renewable_cert.autorenewal_is_enabled.return_value = True + mock_renewable_cert.version.return_value = "/tmp/abc" + mock_renewable_cert.ocsp_revoked.return_value = False + + mock_datetime.timedelta = datetime.timedelta + mock_set_by_user.return_value = False + + with tempfile.NamedTemporaryFile() as tmp_cert: + tmp_cert.close() # close now because of compatibility issues on Windows + with open(tmp_cert.name, 'wb') as c: + c.write(short_cert) + + mock_renewable_cert.version.return_value = tmp_cert.name + + for (current_time, interval, result) in [ + # 2014-12-13 12:00 (about 5 days prior to expiry) + # Times that should result in autorenewal/autodeployment + (1418472000, "2 months", True), (1418472000, "1 week", True), + # With the "default" logic, this 7-day certificate should autorenew + # at 3.5 days prior to expiry. We haven't reached that yet, + # so don't renew. + (1418472000, None, False), + # Times that should not renew + (1418472000, "4 days", False), (1418472000, "2 days", False), + # 2009-05-01 12:00:00+00:00 (about 5 years prior to expiry) + # Times that should result in autorenewal/autodeployment + (1241179200, "7 years", True), + (1241179200, "11 years 2 months", True), + # Times that should not renew + (1241179200, "8 hours", False), (1241179200, "2 days", False), + (1241179200, "40 days", False), (1241179200, "9 months", False), + # 2015-01-01 (after expiry has already happened, so all + # intervals should cause autorenewal/autodeployment) + (1420070400, "0 seconds", True), + (1420070400, "10 seconds", True), + (1420070400, "10 minutes", True), + (1420070400, "10 weeks", True), (1420070400, "10 months", True), + (1420070400, "10 years", True), (1420070400, "99 months", True), + ]: + sometime = datetime.datetime.fromtimestamp(current_time, pytz.UTC) + mock_datetime.datetime.now.return_value = sometime + mock_renewable_cert.configuration = {"renew_before_expiry": interval} + assert renewal.should_autorenew(mock_renewable_cert, mock_acme) == result, f"at {current_time}, with config '{interval}', expected {result}" + + @mock.patch.object(configuration.NamespaceConfig, 'set_by_user') + @mock.patch("certbot._internal.storage.RenewableCert.ocsp_revoked") + def test_should_autorenew(self, mock_ocsp, mock_set_by_user): + from certbot._internal import renewal + + mock_set_by_user.return_value = False + mock_acme = mock.MagicMock() + future = datetime.datetime.now(pytz.UTC) + datetime.timedelta(seconds=1000) + mock_acme.renewal_time.return_value = (future, future) + + # Autorenewal turned off + mock_rc = mock.MagicMock() + mock_rc.autorenewal_is_enabled.return_value = False + assert not renewal.should_autorenew(mock_rc, mock_acme) + + mock_rc.autorenewal_is_enabled.return_value = True + # Mandatory renewal on the basis of OCSP revocation + mock_ocsp.return_value = True + assert renewal.should_autorenew(mock_rc, mock_acme) + mock_ocsp.return_value = False class RestoreRequiredConfigElementsTest(test_util.ConfigTestCase): """Tests for certbot._internal.renewal.restore_required_config_elements.""" diff --git a/certbot/src/certbot/_internal/tests/storage_test.py b/certbot/src/certbot/_internal/tests/storage_test.py index 61edd4b6c..e9359cb8e 100644 --- a/certbot/src/certbot/_internal/tests/storage_test.py +++ b/certbot/src/certbot/_internal/tests/storage_test.py @@ -19,34 +19,9 @@ from certbot.compat import filesystem from certbot.compat import os import certbot.tests.util as test_util -from cryptography.hazmat.primitives.asymmetric import ec -from cryptography.hazmat.primitives import serialization, hashes -from cryptography import x509 -from cryptography.x509 import Certificate - import datetime from typing import Optional, Any -def make_cert_with_lifetime(not_before: datetime.datetime, lifetime_days: int) -> bytes: - """Return PEM of a self-signed certificate with the given notBefore and lifetime.""" - key = ec.generate_private_key(ec.SECP256R1()) - not_after=not_before + datetime.timedelta(days=lifetime_days) - cert = x509.CertificateBuilder( - issuer_name=x509.Name([]), - subject_name=x509.Name([]), - public_key=key.public_key(), - serial_number=x509.random_serial_number(), - not_valid_before=not_before, - not_valid_after=not_after, - ).add_extension( - x509.SubjectAlternativeName([x509.DNSName("example.com")]), - critical=False, - ).sign( - private_key=key, - algorithm=hashes.SHA256(), - ) - return cert.public_bytes(serialization.Encoding.PEM) - def unlink_all(rc_object): """Unlink all four items associated with this RenewableCert.""" for kind in ALL_FOUR: @@ -472,86 +447,6 @@ class RenewableCertTests(BaseRenewableCertTest): with pytest.raises(errors.CertStorageError): self.test_rc.names() - @mock.patch.object(configuration.NamespaceConfig, 'set_by_user') - @mock.patch("certbot._internal.storage.datetime") - def test_time_interval_judgments(self, mock_datetime, mock_set_by_user): - """Test should_autorenew() on the basis of expiry time windows.""" - # Note: this certificate happens to have a lifetime of 7 days, - # and the tests below that use a "None" interval (i.e. choose a - # default) rely on that fact. - # - # Not Before: Dec 11 22:34:45 2014 GMT - # Not After : Dec 18 22:34:45 2014 GMT - not_before = datetime.datetime(2014, 12, 11, 22, 34, 45) - short_cert = make_cert_with_lifetime(not_before, 7) - - self._write_out_ex_kinds() - - self.test_rc.update_all_links_to(12) - with open(self.test_rc.cert, "wb") as f: - f.write(short_cert) - self.test_rc.update_all_links_to(11) - with open(self.test_rc.cert, "wb") as f: - f.write(short_cert) - - mock_datetime.timedelta = datetime.timedelta - mock_set_by_user.return_value = False - self.test_rc.configuration["renewalparams"] = {} - - for (current_time, interval, result) in [ - # 2014-12-13 12:00 (about 5 days prior to expiry) - # Times that should result in autorenewal/autodeployment - (1418472000, "2 months", True), (1418472000, "1 week", True), - # With the "default" logic, this 7-day certificate should autorenew - # at 3.5 days prior to expiry. We haven't reached that yet, - # so don't renew. - (1418472000, None, False), - # 2014-12-16 03:20, a little less than 3.5 days to expiry. - (1418700000, None, True), - # Times that should not renew - (1418472000, "4 days", False), (1418472000, "2 days", False), - # 2009-05-01 12:00:00+00:00 (about 5 years prior to expiry) - # Times that should result in autorenewal/autodeployment - (1241179200, "7 years", True), - (1241179200, "11 years 2 months", True), - # Times that should not renew - (1241179200, "8 hours", False), (1241179200, "2 days", False), - (1241179200, "40 days", False), (1241179200, "9 months", False), - # 2015-01-01 (after expiry has already happened, so all - # intervals should cause autorenewal/autodeployment) - (1420070400, "0 seconds", True), - (1420070400, "10 seconds", True), - (1420070400, "10 minutes", True), - (1420070400, "10 weeks", True), (1420070400, "10 months", True), - (1420070400, "10 years", True), (1420070400, "99 months", True), - (1420070400, None, True) - ]: - sometime = datetime.datetime.fromtimestamp(current_time, pytz.UTC) - mock_datetime.datetime.now.return_value = sometime - self.test_rc.configuration["renew_before_expiry"] = interval - assert self.test_rc.should_autorenew() == result - - # Lifetime: 31 years - # Default renewal: about 10 years from expiry - # Not Before: May 29 07:42:01 2017 GMT - # Not After : Mar 30 07:42:01 2048 GMT - not_before=datetime.datetime(2017, 5, 29, 7, 42, 1) - long_cert = make_cert_with_lifetime(not_before, 31 * 365) - self.test_rc.update_all_links_to(12) - with open(self.test_rc.cert, "wb") as f: - f.write(long_cert) - self.test_rc.update_all_links_to(11) - with open(self.test_rc.cert, "wb") as f: - f.write(long_cert) - for (current_time, result) in [ - (2114380800, False), # 2037-01-01 - (2148000000, True), # 2038-01-25 - ]: - sometime = datetime.datetime.fromtimestamp(current_time, pytz.UTC) - mock_datetime.datetime.now.return_value = sometime - self.test_rc.configuration["renew_before_expiry"] = interval - assert self.test_rc.should_autorenew() == result - def test_autorenewal_is_enabled(self): self.test_rc.configuration["renewalparams"] = {} assert self.test_rc.autorenewal_is_enabled() @@ -561,23 +456,6 @@ class RenewableCertTests(BaseRenewableCertTest): self.test_rc.configuration["renewalparams"]["autorenew"] = "False" assert not self.test_rc.autorenewal_is_enabled() - @mock.patch.object(configuration.NamespaceConfig, 'set_by_user') - @mock.patch("certbot._internal.storage.RenewableCert.ocsp_revoked") - def test_should_autorenew(self, mock_ocsp, mock_set_by_user): - """Test should_autorenew on the basis of reasons other than - expiry time window.""" - mock_set_by_user.return_value = False - # Autorenewal turned off - self.test_rc.configuration["renewalparams"] = {"autorenew": "False"} - assert not self.test_rc.should_autorenew() - self.test_rc.configuration["renewalparams"]["autorenew"] = "True" - for kind in ALL_FOUR: - self._write_out_kind(kind, 12) - # Mandatory renewal on the basis of OCSP revocation - mock_ocsp.return_value = True - assert self.test_rc.should_autorenew() - mock_ocsp.return_value = False - @mock.patch("certbot._internal.storage.relevant_values") def test_save_successor(self, mock_rv): # Mock relevant_values() to claim that all values are relevant here