diff --git a/acme/src/acme/_internal/tests/client_test.py b/acme/src/acme/_internal/tests/client_test.py index fb900e38c..11d56cdd3 100644 --- a/acme/src/acme/_internal/tests/client_test.py +++ b/acme/src/acme/_internal/tests/client_test.py @@ -452,6 +452,172 @@ class ClientV2Test(unittest.TestCase): assert DIRECTORY_V2.to_partial_json() == \ ClientV2.get_directory('https://example.com/dir', self.net).to_partial_json() + def test_renewal_time_no_renewal_info(self): + # A directory with no 'renewalInfo' should result in default renewal periods. + self.client.directory = messages.Directory({}) + cert_pem = make_cert_for_renewal( + not_before=datetime.datetime(2025, 3, 12, 00, 00, 00), + not_after=datetime.datetime(2025, 3, 20, 00, 00, 00), + ) + t, _ = self.client.renewal_time(cert_pem) + assert t == datetime.datetime(2025, 3, 16, 00, 00, 00, tzinfo=datetime.timezone.utc) + + cert_pem = make_cert_for_renewal( + not_before=datetime.datetime(2025, 3, 12, 00, 00, 00), + not_after=datetime.datetime(2025, 3, 30, 00, 00, 00), + ) + t, _ = self.client.renewal_time(cert_pem) + assert t == datetime.datetime(2025, 3, 24, 00, 00, 00, tzinfo=datetime.timezone.utc) + + def test_renewal_time_with_renewal_info(self): + from cryptography import x509 + from acme.client import _renewal_info_path_component + cert_pem = make_cert_for_renewal( + not_before=datetime.datetime(2025, 3, 12, 00, 00, 00), + not_after=datetime.datetime(2025, 3, 20, 00, 00, 00), + ) + + self.client.directory = messages.Directory({ + 'renewalInfo': 'https://www.letsencrypt-demo.org/acme/renewal-info', + }) + + self.response.json.return_value = { + "suggestedWindow": { + "start": "2025-03-14T01:01:01Z", + "end": "2025-03-14T01:01:01Z", + }, + "message": "Keep those certs fresh" + } + t, _ = self.client.renewal_time(cert_pem) + cert_parsed = x509.load_pem_x509_certificate(cert_pem) + ari_path_component = _renewal_info_path_component(cert_parsed) + self.net.get.assert_called_once_with("https://www.letsencrypt-demo.org/acme/renewal-info/" + + ari_path_component, + content_type='application/json') + assert t == datetime.datetime(2025, 3, 14, 1, 1, 1, tzinfo=datetime.timezone.utc) + + self.net.reset_mock() + + self.response.json.return_value = { + "suggestedWindow": { + "start": "2025-03-16T01:01:01Z", + "end": "2025-03-17T01:01:01Z", + }, + "message": "Keep those certs fresh" + } + t, _ = self.client.renewal_time(cert_pem) + self.net.get.assert_called_once_with("https://www.letsencrypt-demo.org/acme/renewal-info/" + + ari_path_component, + content_type='application/json') + assert t >= datetime.datetime(2025, 3, 16, 1, 1, 1, tzinfo=datetime.timezone.utc) + assert t <= datetime.datetime(2025, 3, 17, 1, 1, 1, tzinfo=datetime.timezone.utc) + + def test_renewal_time_renewal_info_errors(self): + self.client.directory = messages.Directory({ + 'renewalInfo': 'https://www.letsencrypt-demo.org/acme/renewal-info', + }) + # Failure to fetch the 'renewalInfo' URL should return default timings + self.net.get.side_effect = requests.exceptions.RequestException + + cert_pem = make_cert_for_renewal( + not_before=datetime.datetime(2025, 3, 12, 00, 00, 00), + not_after=datetime.datetime(2025, 3, 20, 00, 00, 00), + ) + t, _ = self.client.renewal_time(cert_pem) + assert t == datetime.datetime(2025, 3, 16, 00, 00, 00, tzinfo=datetime.timezone.utc) + + cert_pem = make_cert_for_renewal( + not_before=datetime.datetime(2025, 3, 12, 00, 00, 00), + not_after=datetime.datetime(2025, 3, 30, 00, 00, 00), + ) + t, _ = self.client.renewal_time(cert_pem) + assert t == datetime.datetime(2025, 3, 24, 00, 00, 00, tzinfo=datetime.timezone.utc) + + @mock.patch('acme.client.datetime') + def test_renewal_time_returns_retry_after(self, dt_mock): + dt_mock.datetime.now.return_value = datetime.datetime(2025, 5, 12, 0, 0, 0) + dt_mock.timedelta = datetime.timedelta + + self.client.directory = messages.Directory({ + 'renewalInfo': 'https://www.letsencrypt-demo.org/acme/renewal-info', + }) + cert_pem = make_cert_for_renewal( + not_before=datetime.datetime(2025, 3, 12, 00, 00, 00), + not_after=datetime.datetime(2025, 3, 20, 00, 00, 00), + ) + self.response.json.return_value = { + "suggestedWindow": { + "start": "2025-03-14T01:01:01Z", + "end": "2025-03-14T01:01:01Z", + }, + "message": "Keep those certs fresh" + } + + # With no explicit Retry-After in header, default to six hours + _, retry_after = self.client.renewal_time(cert_pem) + assert retry_after == datetime.datetime(2025, 5, 12, 6, 0, 0) + + # With an explicit Retry-After in header, use that + self.response.headers['Retry-After'] = '100' + _, retry_after = self.client.renewal_time(cert_pem) + assert retry_after == datetime.datetime(2025, 5, 12, 00, 1, 40) + +def test_renewal_info_path_component(): + from cryptography import x509 + from acme.client import _renewal_info_path_component + + cert = x509.load_pem_x509_certificate(test_util.load_vector('rsa2048_cert.pem')) + + assert _renewal_info_path_component(cert) == "fL5sRirC8VS5AtOQh9DfoAzYNCI.ALVG_VbBb5U7" + + # From https://www.ietf.org/archive/id/draft-ietf-acme-ari-08.html appendix A. + ARI_TEST_CERT = b""" +-----BEGIN CERTIFICATE----- +MIIBQzCB66ADAgECAgUAh2VDITAKBggqhkjOPQQDAjAVMRMwEQYDVQQDEwpFeGFt +cGxlIENBMCIYDzAwMDEwMTAxMDAwMDAwWhgPMDAwMTAxMDEwMDAwMDBaMBYxFDAS +BgNVBAMTC2V4YW1wbGUuY29tMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEeBZu +7cbpAYNXZLbbh8rNIzuOoqOOtmxA1v7cRm//AwyMwWxyHz4zfwmBhcSrf47NUAFf +qzLQ2PPQxdTXREYEnKMjMCEwHwYDVR0jBBgwFoAUaYhba4dGQEHhs3uEe6CuLN4B +yNQwCgYIKoZIzj0EAwIDRwAwRAIge09+S5TZAlw5tgtiVvuERV6cT4mfutXIlwTb ++FYN/8oCIClDsqBklhB9KAelFiYt9+6FDj3z4KGVelYM5MdsO3pK +-----END CERTIFICATE----- +""" + + cert = x509.load_pem_x509_certificate(ARI_TEST_CERT) + assert _renewal_info_path_component(cert) == "aYhba4dGQEHhs3uEe6CuLN4ByNQ.AIdlQyE" + +if __name__ == '__main__': + sys.exit(pytest.main(sys.argv[1:] + [__file__])) # pragma: no cover + +def make_cert_for_renewal(not_before, not_after) -> bytes: + """ + Return a PEM-encoded, self-signed certificate with the given dates. + """ + from cryptography import x509 + from cryptography.hazmat.primitives.asymmetric import ec + from cryptography.hazmat.primitives import serialization, hashes + # AKID and serial are the inputs to constructing the renewalInfo URL + akid = x509.AuthorityKeyIdentifier(b"1234", None, None) + serial = 56789 + key = ec.generate_private_key(ec.SECP256R1()) + cert = x509.CertificateBuilder( + issuer_name=x509.Name([x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, "Some Issuer")]), + subject_name=x509.Name([]), + public_key=key.public_key(), + serial_number=serial, + not_valid_before=not_before, + not_valid_after=not_after, + ).add_extension( + x509.SubjectAlternativeName([x509.DNSName('example.com')]), + critical=False, + ).add_extension( + akid, + critical=False, + ).sign( + private_key=key, + algorithm=hashes.SHA256(), + ) + return cert.public_bytes(serialization.Encoding.PEM) class MockJSONDeSerializable(jose.JSONDeSerializable): # pylint: disable=missing-docstring diff --git a/acme/src/acme/client.py b/acme/src/acme/client.py index 02ed8ad0b..763554058 100644 --- a/acme/src/acme/client.py +++ b/acme/src/acme/client.py @@ -4,6 +4,8 @@ import datetime from email.utils import parsedate_tz import http.client as http_client import logging +import math +import random import re import time from typing import Any @@ -302,6 +304,58 @@ class ClientV2: raise e return self.poll_finalization(orderr, deadline, fetch_alternative_chains) + def renewal_time(self, cert_pem: bytes) -> Tuple[datetime.datetime, datetime.datetime]: + """Return an appropriate time to attempt renewal of the certificate, + and the next time to ask the ACME server for renewal info. + + If the ACME directory has a "renewalInfo" field, the response will be + based on a fetch of the renewal info resource for the certificate + (https://www.ietf.org/archive/id/draft-ietf-acme-ari-08.html). + + If there is no "renewalInfo" field, this function will fall back to + reasonable defaults based on the certificate lifetime. + + This function may make other network calls in the future (e.g., OCSP + or CRL). + """ + now = datetime.datetime.now() + # https://www.ietf.org/archive/id/draft-ietf-acme-ari-08.html#section-4.3.3 + default_retry_after = datetime.timedelta(seconds=6 * 60 * 60) + + cert = x509.load_pem_x509_certificate(cert_pem) + + not_before = cert.not_valid_before_utc + lifetime = cert.not_valid_after_utc - not_before + if lifetime.total_seconds() < 10 * 86400: + default_renewal_time = not_before + lifetime / 2 + else: + default_renewal_time = not_before + lifetime * 2 / 3 + + try: + renewal_info_base_url = self.directory['renewalInfo'] + except KeyError: + return default_renewal_time, now + default_retry_after + + ari_url = renewal_info_base_url + '/' + _renewal_info_path_component(cert) + try: + resp = self.net.get(ari_url, content_type='application/json') + except (requests.exceptions.RequestException, messages.Error) as error: + logger.warning("failed to fetch renewal_info URL (%s): %s", ari_url, error) + return default_renewal_time, now + default_retry_after + + renewal_info: messages.RenewalInfo = messages.RenewalInfo.from_json(resp.json()) + + start = renewal_info.suggested_window.start # pylint: disable=no-member + end = renewal_info.suggested_window.end # pylint: disable=no-member + + delta_seconds = (end - start).total_seconds() + random_seconds = random.uniform(0, delta_seconds) + random_time = start + datetime.timedelta(seconds=random_seconds) + + retry_after = self.retry_after(resp, default_retry_after.seconds) + return random_time, retry_after + + def revoke(self, cert: x509.Certificate, rsn: int) -> None: """Revoke certificate. @@ -781,3 +835,22 @@ class ClientNetwork: response = self._check_response(response, content_type=content_type) self._add_nonce(response) return response + +def _renewal_info_path_component(cert: x509.Certificate) -> str: + akid_ext = cert.extensions.get_extension_for_oid(x509.ExtensionOID.AUTHORITY_KEY_IDENTIFIER) + key_identifier = akid_ext.value.key_identifier # type: ignore[attr-defined] + + akid_encoded = base64.urlsafe_b64encode(key_identifier).decode('ascii').replace("=", "") + + # We add one to the reported bit_length so there is room for the sign bit. + # https://docs.python.org/3/library/stdtypes.html#int.bit_length + # "Return the number of bits necessary to represent an integer in binary, excluding + # the sign and leading zeros" + serial = cert.serial_number + encoded_serial_len = math.ceil((serial.bit_length()+1)/8) + # Serials are encoded as ASN.1 INTEGERS, which means big endian and signed (two's complement). + # https://letsencrypt.org/docs/a-warm-welcome-to-asn1-and-der/#integer-encoding + serial_bytes = serial.to_bytes(encoded_serial_len, byteorder='big', signed=True) + serial_encoded = base64.urlsafe_b64encode(serial_bytes).decode('ascii').replace("=", "") + + return f"{akid_encoded}.{serial_encoded}" diff --git a/acme/src/acme/messages.py b/acme/src/acme/messages.py index b26082181..0ed1f5c78 100644 --- a/acme/src/acme/messages.py +++ b/acme/src/acme/messages.py @@ -684,3 +684,19 @@ class OrderResource(ResourceWithURI): class NewOrder(Order): """New order.""" + + +class RenewalInfo(ResourceBody): + """Renewal Info Resource Body. + :ivar acme.messages.SuggestedWindow window: The suggested renewal window. + """ + class SuggestedWindow(jose.JSONObjectWithFields): + """Suggested Renewal Window, sub-resource of Renewal Info Resource. + :ivar datetime.datetime start: Beginning of suggested renewal window + :ivar datetime.datetime end: End of suggested renewal window (inclusive) + """ + start: datetime.datetime = fields.rfc3339('start', omitempty=True) + end: datetime.datetime = fields.rfc3339('end', omitempty=True) + + suggested_window: SuggestedWindow = jose.field('suggestedWindow', + decoder=SuggestedWindow.from_json) diff --git a/certbot-ci/src/certbot_integration_tests/utils/certbot_call.py b/certbot-ci/src/certbot_integration_tests/utils/certbot_call.py index 757b5be2f..89210be9a 100755 --- a/certbot-ci/src/certbot_integration_tests/utils/certbot_call.py +++ b/certbot-ci/src/certbot_integration_tests/utils/certbot_call.py @@ -135,7 +135,7 @@ def main() -> None: # Invoke certbot in test mode, without capturing output so users see directly the outcome. command, env = _prepare_args_env(args, directory_url, http_01_port, tls_alpn_01_port, - config_dir, workspace, True) + config_dir, workspace, False) subprocess.check_call(command, universal_newlines=True, cwd=workspace, env=env) diff --git a/certbot/src/certbot/_internal/main.py b/certbot/src/certbot/_internal/main.py index aee68ac67..3130904bf 100644 --- a/certbot/src/certbot/_internal/main.py +++ b/certbot/src/certbot/_internal/main.py @@ -265,8 +265,14 @@ def _handle_identical_cert_request(config: configuration.NamespaceConfig, if not lineage.ensure_deployed(): return "reinstall", lineage - if is_key_type_changing or renewal.should_renew(config, lineage): + if is_key_type_changing: return "renew", lineage + + acme = client.acme_from_config_key(config) + + if renewal.should_renew(config, lineage, acme): + return "renew", lineage + if config.reinstall: # Set with --reinstall, force an identical certificate to be # reinstalled without further prompting. diff --git a/certbot/src/certbot/_internal/renewal.py b/certbot/src/certbot/_internal/renewal.py index 5d74c76b0..744bb4332 100644 --- a/certbot/src/certbot/_internal/renewal.py +++ b/certbot/src/certbot/_internal/renewal.py @@ -1,6 +1,7 @@ """Functionality for autorenewal and associated juggling of configurations""" import copy +import datetime import itertools import logging import random @@ -21,6 +22,8 @@ from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives.serialization import load_pem_private_key +from acme import client as acme_client + from certbot import configuration from certbot import crypto_util from certbot import errors @@ -309,12 +312,14 @@ def _restore_str(name: str, value: str) -> Optional[str]: return None if value == "None" else value -def should_renew(config: configuration.NamespaceConfig, lineage: storage.RenewableCert) -> bool: +def should_renew(config: configuration.NamespaceConfig, + lineage: storage.RenewableCert, + acme: acme_client.ClientV2) -> bool: """Return true if any of the circumstances for automatic renewal apply.""" if config.renew_by_default: logger.debug("Auto-renewal forced with --force-renewal...") return True - if lineage.should_autorenew(): + if should_autorenew(lineage, acme): logger.info("Certificate is due for renewal, auto-renewing...") return True if config.dry_run: @@ -323,6 +328,58 @@ def should_renew(config: configuration.NamespaceConfig, lineage: storage.Renewab display_util.notify("Certificate not yet due for renewal") return False +def should_autorenew(lineage: storage.RenewableCert, acme: acme_client.ClientV2) -> bool: + """Should we now try to autorenew the most recent cert version? + + If ACME Renewal Info (ARI) is available in the directory, check that first, + and renew if ARI indicates it is time, or if we are within the default + renweal window. + + If the certificate has an OCSP URL, renew if it is revoked. + + If neither of the above is true, but the "renew_before_expiry" config + indicates it is time, renew. Otherwise, don't. + + Note that this examines the numerically most recent cert version, + not the currently deployed version. + + :returns: whether an attempt should now be made to autorenew the + most current cert version in this lineage + :rtype: bool + + """ + if lineage.autorenewal_is_enabled(): + cert = lineage.version("cert", lineage.latest_common_version()) + + # Consider whether to attempt to autorenew this cert now + renewal_time = None + with open(cert, 'rb') as f: + cert_pem = f.read() + renewal_time, _ = acme.renewal_time(cert_pem) + + now = datetime.datetime.now(datetime.timezone.utc) + + if renewal_time and now > renewal_time: + return True + + # Renewals on the basis of revocation + if lineage.ocsp_revoked(lineage.latest_common_version()): + logger.debug("Should renew, certificate is revoked.") + return True + + # The "renew_before_expiry" config field can make us renew earlier + # than the default. + config_interval = lineage.configuration.get("renew_before_expiry") + notAfter = crypto_util.notAfter(cert) + if (config_interval is not None and + notAfter < storage.add_time_interval(now, config_interval)): + logger.debug("Should renew, less than %s before certificate " + "expiry %s.", config_interval, + notAfter.strftime("%Y-%m-%d %H:%M:%S %Z")) + return True + + return False + def _avoid_invalidating_lineage(config: configuration.NamespaceConfig, lineage: storage.RenewableCert, original_server: str) -> None: @@ -499,6 +556,11 @@ def handle_renewal_request(config: configuration.NamespaceConfig) -> Tuple[list, # shutting down a web service) aren't prolonged unnecessarily. apply_random_sleep = not sys.stdin.isatty() and config.random_sleep_on_renew + # We initialize acme clients on a per-server basis, but most + # lineages use the same server. Memoize clients here so we can + # share the connection pool and reuse a single fetched directory. + acme_clients = {} + for renewal_file in conf_files: display_util.notification("Processing " + renewal_file, pause=False) lineage_config = copy.deepcopy(config) @@ -520,10 +582,16 @@ def handle_renewal_request(config: configuration.NamespaceConfig) -> Tuple[list, if not renewal_candidate: parse_failures.append(renewal_file) else: + server = lineage_config.server + if not server: + raise errors.Error(f"Renewal config for {lineage_config.names} has no server.") + if server not in acme_clients: + acme_clients[server] = client.acme_from_config_key(config) + renewal_candidate.ensure_deployed() from certbot._internal import main plugins = plugins_disco.PluginsRegistry.find_all() - if should_renew(lineage_config, renewal_candidate): + if should_renew(lineage_config, renewal_candidate, acme_clients[server]): # Apply random sleep upon first renewal if needed if apply_random_sleep: sleep_time = random.uniform(1, 60 * 8) diff --git a/certbot/src/certbot/_internal/storage.py b/certbot/src/certbot/_internal/storage.py index a03848ae6..02e84ff5b 100644 --- a/certbot/src/certbot/_internal/storage.py +++ b/certbot/src/certbot/_internal/storage.py @@ -977,58 +977,6 @@ class RenewableCert(interfaces.RenewableCert): return ("autorenew" not in self.configuration["renewalparams"] or self.configuration["renewalparams"].as_bool("autorenew")) - def should_autorenew(self) -> bool: - """Should we now try to autorenew the most recent cert version? - - This is a policy question and does not only depend on whether - the cert is expired. (This considers whether autorenewal is - enabled, whether the cert is revoked, and whether the time - interval for autorenewal has been reached.) - - Note that this examines the numerically most recent cert version, - not the currently deployed version. - - :returns: whether an attempt should now be made to autorenew the - most current cert version in this lineage - :rtype: bool - - """ - if self.autorenewal_is_enabled(): - # Consider whether to attempt to autorenew this cert now - - # Renewals on the basis of revocation - if self.ocsp_revoked(self.latest_common_version()): - logger.debug("Should renew, certificate is revoked.") - return True - - cert = self.version("cert", self.latest_common_version()) - notBefore = crypto_util.notBefore(cert) - notAfter = crypto_util.notAfter(cert) - lifetime = notAfter - notBefore - - config_interval = self.configuration.get("renew_before_expiry") - now = datetime.datetime.now(pytz.UTC) - if config_interval is not None and notAfter < add_time_interval(now, config_interval): - logger.debug("Should renew, less than %s before certificate " - "expiry %s.", config_interval, - notAfter.strftime("%Y-%m-%d %H:%M:%S %Z")) - return True - - # No config for "renew_before_expiry", provide default behavior. - # For most certs, renew with 1/3 of certificate lifetime remaining. - # For short lived certificates, renew at 1/2 of certificate lifetime. - default_interval = lifetime / 3 - if lifetime.total_seconds() < 10 * 86400: - default_interval = lifetime / 2 - remaining_time = notAfter - now - if remaining_time < default_interval: - logger.debug("Should renew, less than %ss before certificate " - "expiry %s.", default_interval, - notAfter.strftime("%Y-%m-%d %H:%M:%S %Z")) - return True - - return False - @classmethod def new_lineage(cls, lineagename: str, cert: bytes, privkey: bytes, chain: bytes, cli_config: configuration.NamespaceConfig) -> "RenewableCert": diff --git a/certbot/src/certbot/_internal/tests/main_test.py b/certbot/src/certbot/_internal/tests/main_test.py index 6693a0500..2176c9da8 100644 --- a/certbot/src/certbot/_internal/tests/main_test.py +++ b/certbot/src/certbot/_internal/tests/main_test.py @@ -1419,7 +1419,6 @@ class MainTest(test_util.ConfigTestCase): 'live/foo.bar/fullchain.pem')) mock_lineage = mock.MagicMock(cert=cert_path, fullchain=chain_path, cert_path=cert_path, fullchain_path=chain_path) - mock_lineage.should_autorenew.return_value = due_for_renewal mock_lineage.has_pending_deployment.return_value = False mock_lineage.names.return_value = ['isnot.org'] mock_lineage.private_key_type = 'ecdsa' @@ -1448,21 +1447,23 @@ class MainTest(test_util.ConfigTestCase): as mock_crypto_util: mock_crypto_util.notAfter.return_value = expiry_date with mock.patch('certbot._internal.eff.handle_subscription'): - if not args: - args = ['-d', 'isnot.org', '-a', 'standalone', 'certonly'] - if extra_args: - args += extra_args - try: - ret, stdout, _, _ = self._call(args, stdout) - if ret: - print("Returned", ret) - raise AssertionError(ret) - assert not error_expected, "renewal should have errored" - except: # pylint: disable=bare-except - if not error_expected: - raise AssertionError( - "Unexpected renewal error:\n" + - traceback.format_exc()) + with mock.patch('certbot._internal.renewal.should_autorenew') as should_autorenew: + should_autorenew.return_value = due_for_renewal + if not args: + args = ['-d', 'isnot.org', '-a', 'standalone', 'certonly'] + if extra_args: + args += extra_args + try: + ret, stdout, _, _ = self._call(args, stdout) + if ret: + print("Returned", ret) + raise AssertionError(ret) + assert not error_expected, "renewal should have errored" + except: # pylint: disable=bare-except + if not error_expected: + raise AssertionError( + "Unexpected renewal error:\n" + + traceback.format_exc()) if should_renew: if reuse_key and not new_key: diff --git a/certbot/src/certbot/_internal/tests/renewal_test.py b/certbot/src/certbot/_internal/tests/renewal_test.py index 239744692..5ca6038fa 100644 --- a/certbot/src/certbot/_internal/tests/renewal_test.py +++ b/certbot/src/certbot/_internal/tests/renewal_test.py @@ -1,10 +1,13 @@ """Tests for certbot._internal.renewal""" import copy +import datetime import sys +import tempfile import unittest from unittest import mock import pytest +import pytz from acme import challenges from certbot import configuration @@ -12,6 +15,30 @@ from certbot import errors from certbot._internal import storage import certbot.tests.util as test_util +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives import serialization, hashes +from cryptography import x509 +from cryptography.x509 import Certificate + +def make_cert_with_lifetime(not_before: datetime.datetime, lifetime_days: int) -> bytes: + """Return PEM of a self-signed certificate with the given notBefore and lifetime.""" + key = ec.generate_private_key(ec.SECP256R1()) + not_after=not_before + datetime.timedelta(days=lifetime_days) + cert = x509.CertificateBuilder( + issuer_name=x509.Name([]), + subject_name=x509.Name([]), + public_key=key.public_key(), + serial_number=x509.random_serial_number(), + not_valid_before=not_before, + not_valid_after=not_after, + ).add_extension( + x509.SubjectAlternativeName([x509.DNSName("example.com")]), + critical=False, + ).sign( + private_key=key, + algorithm=hashes.SHA256(), + ) + return cert.public_bytes(serialization.Encoding.PEM) class RenewalTest(test_util.ConfigTestCase): @mock.patch.object(configuration.NamespaceConfig, 'set_by_user') @@ -188,6 +215,113 @@ class RenewalTest(test_util.ConfigTestCase): renewal.reconstitute(lineage_config, rc_path) assert lineage_config.key_type == 'rsa' + @test_util.patch_display_util() + @mock.patch('certbot._internal.client.acme_from_config_key') + @mock.patch('certbot._internal.main.renew_cert') + @mock.patch("certbot._internal.renewal.datetime") + def test_renewal_via_ari(self, mock_datetime, mock_renew_cert, mock_acme_from_config, unused_mock_display): + from certbot._internal import renewal + acme_client = mock.MagicMock() + mock_acme_from_config.return_value = acme_client + past = datetime.datetime(2025, 3, 19, 0, 0, 0, tzinfo=pytz.UTC) + now = datetime.datetime(2025, 4, 19, 0, 0, 0, tzinfo=pytz.UTC) + future = datetime.datetime(2025, 4, 19, 12, 0, 0, tzinfo=pytz.UTC) + mock_datetime.datetime.now.return_value = now + acme_client.renewal_time.return_value = past, future + + test_util.make_lineage(self.config.config_dir, 'sample-renewal.conf', ec=False) + lineage_config = copy.deepcopy(self.config) + + with mock.patch('time.sleep') as sleep: + renewal.handle_renewal_request(lineage_config) + + mock_renew_cert.assert_called_once() + + @mock.patch.object(configuration.NamespaceConfig, 'set_by_user') + @mock.patch("certbot._internal.renewal.datetime") + def test_renew_before_expiry(self, mock_datetime, mock_set_by_user): + """When neither OCSP nor the ACME client indicate it's time to renew, + obey the renew_before_expiry config. + """ + from certbot._internal import renewal + + # This certificate has a lifetime of 7 days, and the tests below + # that use a "None" interval (i.e. choose a default) rely on that fact. + # + # Not Before: Dec 11 22:34:45 2014 GMT + # Not After : Dec 18 22:34:45 2014 GMT + not_before = datetime.datetime(2014, 12, 11, 22, 34, 45) + short_cert = make_cert_with_lifetime(not_before, 7) + + mock_acme = mock.MagicMock() + future = datetime.datetime.now(pytz.UTC) + datetime.timedelta(days=100000) + mock_acme.renewal_time.return_value = (future, future) + + mock_renewable_cert = mock.MagicMock() + mock_renewable_cert.autorenewal_is_enabled.return_value = True + mock_renewable_cert.version.return_value = "/tmp/abc" + mock_renewable_cert.ocsp_revoked.return_value = False + + mock_datetime.timedelta = datetime.timedelta + mock_set_by_user.return_value = False + + with tempfile.NamedTemporaryFile() as tmp_cert: + tmp_cert.close() # close now because of compatibility issues on Windows + with open(tmp_cert.name, 'wb') as c: + c.write(short_cert) + + mock_renewable_cert.version.return_value = tmp_cert.name + + for (current_time, interval, result) in [ + # 2014-12-13 12:00 (about 5 days prior to expiry) + # Times that should result in autorenewal/autodeployment + (1418472000, "2 months", True), (1418472000, "1 week", True), + # With the "default" logic, this 7-day certificate should autorenew + # at 3.5 days prior to expiry. We haven't reached that yet, + # so don't renew. + (1418472000, None, False), + # Times that should not renew + (1418472000, "4 days", False), (1418472000, "2 days", False), + # 2009-05-01 12:00:00+00:00 (about 5 years prior to expiry) + # Times that should result in autorenewal/autodeployment + (1241179200, "7 years", True), + (1241179200, "11 years 2 months", True), + # Times that should not renew + (1241179200, "8 hours", False), (1241179200, "2 days", False), + (1241179200, "40 days", False), (1241179200, "9 months", False), + # 2015-01-01 (after expiry has already happened, so all + # intervals should cause autorenewal/autodeployment) + (1420070400, "0 seconds", True), + (1420070400, "10 seconds", True), + (1420070400, "10 minutes", True), + (1420070400, "10 weeks", True), (1420070400, "10 months", True), + (1420070400, "10 years", True), (1420070400, "99 months", True), + ]: + sometime = datetime.datetime.fromtimestamp(current_time, pytz.UTC) + mock_datetime.datetime.now.return_value = sometime + mock_renewable_cert.configuration = {"renew_before_expiry": interval} + assert renewal.should_autorenew(mock_renewable_cert, mock_acme) == result, f"at {current_time}, with config '{interval}', expected {result}" + + @mock.patch.object(configuration.NamespaceConfig, 'set_by_user') + @mock.patch("certbot._internal.storage.RenewableCert.ocsp_revoked") + def test_should_autorenew(self, mock_ocsp, mock_set_by_user): + from certbot._internal import renewal + + mock_set_by_user.return_value = False + mock_acme = mock.MagicMock() + future = datetime.datetime.now(pytz.UTC) + datetime.timedelta(seconds=1000) + mock_acme.renewal_time.return_value = (future, future) + + # Autorenewal turned off + mock_rc = mock.MagicMock() + mock_rc.autorenewal_is_enabled.return_value = False + assert not renewal.should_autorenew(mock_rc, mock_acme) + + mock_rc.autorenewal_is_enabled.return_value = True + # Mandatory renewal on the basis of OCSP revocation + mock_ocsp.return_value = True + assert renewal.should_autorenew(mock_rc, mock_acme) + mock_ocsp.return_value = False class RestoreRequiredConfigElementsTest(test_util.ConfigTestCase): """Tests for certbot._internal.renewal.restore_required_config_elements.""" diff --git a/certbot/src/certbot/_internal/tests/storage_test.py b/certbot/src/certbot/_internal/tests/storage_test.py index 61edd4b6c..e9359cb8e 100644 --- a/certbot/src/certbot/_internal/tests/storage_test.py +++ b/certbot/src/certbot/_internal/tests/storage_test.py @@ -19,34 +19,9 @@ from certbot.compat import filesystem from certbot.compat import os import certbot.tests.util as test_util -from cryptography.hazmat.primitives.asymmetric import ec -from cryptography.hazmat.primitives import serialization, hashes -from cryptography import x509 -from cryptography.x509 import Certificate - import datetime from typing import Optional, Any -def make_cert_with_lifetime(not_before: datetime.datetime, lifetime_days: int) -> bytes: - """Return PEM of a self-signed certificate with the given notBefore and lifetime.""" - key = ec.generate_private_key(ec.SECP256R1()) - not_after=not_before + datetime.timedelta(days=lifetime_days) - cert = x509.CertificateBuilder( - issuer_name=x509.Name([]), - subject_name=x509.Name([]), - public_key=key.public_key(), - serial_number=x509.random_serial_number(), - not_valid_before=not_before, - not_valid_after=not_after, - ).add_extension( - x509.SubjectAlternativeName([x509.DNSName("example.com")]), - critical=False, - ).sign( - private_key=key, - algorithm=hashes.SHA256(), - ) - return cert.public_bytes(serialization.Encoding.PEM) - def unlink_all(rc_object): """Unlink all four items associated with this RenewableCert.""" for kind in ALL_FOUR: @@ -472,86 +447,6 @@ class RenewableCertTests(BaseRenewableCertTest): with pytest.raises(errors.CertStorageError): self.test_rc.names() - @mock.patch.object(configuration.NamespaceConfig, 'set_by_user') - @mock.patch("certbot._internal.storage.datetime") - def test_time_interval_judgments(self, mock_datetime, mock_set_by_user): - """Test should_autorenew() on the basis of expiry time windows.""" - # Note: this certificate happens to have a lifetime of 7 days, - # and the tests below that use a "None" interval (i.e. choose a - # default) rely on that fact. - # - # Not Before: Dec 11 22:34:45 2014 GMT - # Not After : Dec 18 22:34:45 2014 GMT - not_before = datetime.datetime(2014, 12, 11, 22, 34, 45) - short_cert = make_cert_with_lifetime(not_before, 7) - - self._write_out_ex_kinds() - - self.test_rc.update_all_links_to(12) - with open(self.test_rc.cert, "wb") as f: - f.write(short_cert) - self.test_rc.update_all_links_to(11) - with open(self.test_rc.cert, "wb") as f: - f.write(short_cert) - - mock_datetime.timedelta = datetime.timedelta - mock_set_by_user.return_value = False - self.test_rc.configuration["renewalparams"] = {} - - for (current_time, interval, result) in [ - # 2014-12-13 12:00 (about 5 days prior to expiry) - # Times that should result in autorenewal/autodeployment - (1418472000, "2 months", True), (1418472000, "1 week", True), - # With the "default" logic, this 7-day certificate should autorenew - # at 3.5 days prior to expiry. We haven't reached that yet, - # so don't renew. - (1418472000, None, False), - # 2014-12-16 03:20, a little less than 3.5 days to expiry. - (1418700000, None, True), - # Times that should not renew - (1418472000, "4 days", False), (1418472000, "2 days", False), - # 2009-05-01 12:00:00+00:00 (about 5 years prior to expiry) - # Times that should result in autorenewal/autodeployment - (1241179200, "7 years", True), - (1241179200, "11 years 2 months", True), - # Times that should not renew - (1241179200, "8 hours", False), (1241179200, "2 days", False), - (1241179200, "40 days", False), (1241179200, "9 months", False), - # 2015-01-01 (after expiry has already happened, so all - # intervals should cause autorenewal/autodeployment) - (1420070400, "0 seconds", True), - (1420070400, "10 seconds", True), - (1420070400, "10 minutes", True), - (1420070400, "10 weeks", True), (1420070400, "10 months", True), - (1420070400, "10 years", True), (1420070400, "99 months", True), - (1420070400, None, True) - ]: - sometime = datetime.datetime.fromtimestamp(current_time, pytz.UTC) - mock_datetime.datetime.now.return_value = sometime - self.test_rc.configuration["renew_before_expiry"] = interval - assert self.test_rc.should_autorenew() == result - - # Lifetime: 31 years - # Default renewal: about 10 years from expiry - # Not Before: May 29 07:42:01 2017 GMT - # Not After : Mar 30 07:42:01 2048 GMT - not_before=datetime.datetime(2017, 5, 29, 7, 42, 1) - long_cert = make_cert_with_lifetime(not_before, 31 * 365) - self.test_rc.update_all_links_to(12) - with open(self.test_rc.cert, "wb") as f: - f.write(long_cert) - self.test_rc.update_all_links_to(11) - with open(self.test_rc.cert, "wb") as f: - f.write(long_cert) - for (current_time, result) in [ - (2114380800, False), # 2037-01-01 - (2148000000, True), # 2038-01-25 - ]: - sometime = datetime.datetime.fromtimestamp(current_time, pytz.UTC) - mock_datetime.datetime.now.return_value = sometime - self.test_rc.configuration["renew_before_expiry"] = interval - assert self.test_rc.should_autorenew() == result - def test_autorenewal_is_enabled(self): self.test_rc.configuration["renewalparams"] = {} assert self.test_rc.autorenewal_is_enabled() @@ -561,23 +456,6 @@ class RenewableCertTests(BaseRenewableCertTest): self.test_rc.configuration["renewalparams"]["autorenew"] = "False" assert not self.test_rc.autorenewal_is_enabled() - @mock.patch.object(configuration.NamespaceConfig, 'set_by_user') - @mock.patch("certbot._internal.storage.RenewableCert.ocsp_revoked") - def test_should_autorenew(self, mock_ocsp, mock_set_by_user): - """Test should_autorenew on the basis of reasons other than - expiry time window.""" - mock_set_by_user.return_value = False - # Autorenewal turned off - self.test_rc.configuration["renewalparams"] = {"autorenew": "False"} - assert not self.test_rc.should_autorenew() - self.test_rc.configuration["renewalparams"]["autorenew"] = "True" - for kind in ALL_FOUR: - self._write_out_kind(kind, 12) - # Mandatory renewal on the basis of OCSP revocation - mock_ocsp.return_value = True - assert self.test_rc.should_autorenew() - mock_ocsp.return_value = False - @mock.patch("certbot._internal.storage.relevant_values") def test_save_successor(self, mock_rv): # Mock relevant_values() to claim that all values are relevant here