From 7e87acee3c3336f62104823976d49838ff4eab95 Mon Sep 17 00:00:00 2001 From: Will Greenberg Date: Wed, 15 Jan 2025 18:38:10 -0800 Subject: [PATCH] acme: deprecate gen_ss_cert in favor of make_self_signed_cert (#10097) gen_ss_cert()'s signature contains deprecated pyOpenSSL API, so here we deprecate it in favor of a new function that does the same thing, except with only cryptography types: make_self_signed_cert --- acme/acme/_internal/tests/crypto_util_test.py | 115 ++++++++++++++-- acme/acme/challenges.py | 21 ++- acme/acme/crypto_util.py | 130 ++++++++++++++++-- .../certbot_compatibility_test/test_driver.py | 2 +- .../certbot_compatibility_test/util.py | 4 +- .../certbot_compatibility_test/validator.py | 4 +- .../certbot_nginx/_internal/configurator.py | 17 ++- certbot/certbot/tests/acme_util.py | 2 +- certbot/certbot/tests/util.py | 20 ++- 9 files changed, 264 insertions(+), 51 deletions(-) diff --git a/acme/acme/_internal/tests/crypto_util_test.py b/acme/acme/_internal/tests/crypto_util_test.py index 02aaf92bb..b272712cc 100644 --- a/acme/acme/_internal/tests/crypto_util_test.py +++ b/acme/acme/_internal/tests/crypto_util_test.py @@ -8,6 +8,8 @@ import threading import time from typing import List import unittest +from unittest import mock +import warnings import josepy as jose import OpenSSL @@ -187,6 +189,92 @@ class PyOpenSSLCertOrReqSANTest(unittest.TestCase): ['chicago-cubs.venafi.example', 'cubs.venafi.example'] +class GenMakeSelfSignedCertTest(unittest.TestCase): + """Test for make_self_signed_cert.""" + + def setUp(self): + self.cert_count = 5 + self.serial_num: List[int] = [] + self.privkey = rsa.generate_private_key(public_exponent=65537, key_size=2048) + + def test_sn_collisions(self): + from acme.crypto_util import make_self_signed_cert + for _ in range(self.cert_count): + cert = make_self_signed_cert(self.privkey, ['dummy'], force_san=True, + ips=[ipaddress.ip_address("10.10.10.10")]) + self.serial_num.append(cert.serial_number) + assert len(set(self.serial_num)) >= self.cert_count + + def test_no_ips(self): + from acme.crypto_util import make_self_signed_cert + cert = make_self_signed_cert(self.privkey, ['dummy']) + + @mock.patch("acme.crypto_util._now") + def test_expiry_times(self, mock_now): + from acme.crypto_util import make_self_signed_cert + from datetime import datetime, timedelta, timezone + not_before = 1736200830 + validity = 100 + + not_before_dt = datetime.fromtimestamp(not_before) + validity_td = timedelta(validity) + not_after_dt = not_before_dt + validity_td + cert = make_self_signed_cert( + self.privkey, + ['dummy'], + not_before=not_before_dt, + validity=validity_td, + ) + # TODO: This should be `not_valid_before_utc` once we raise the minimum + # cryptography version. + # https://github.com/certbot/certbot/issues/10105 + with warnings.catch_warnings(): + warnings.filterwarnings( + 'ignore', + message='Properties that return.*datetime object' + ) + self.assertEqual(cert.not_valid_before, not_before_dt) + self.assertEqual(cert.not_valid_after, not_after_dt) + + now = not_before + 1 + now_dt = datetime.fromtimestamp(now) + mock_now.return_value = now_dt.replace(tzinfo=timezone.utc) + valid_after_now_dt = now_dt + validity_td + cert = make_self_signed_cert( + self.privkey, + ['dummy'], + validity=validity_td, + ) + with warnings.catch_warnings(): + warnings.filterwarnings( + 'ignore', + message='Properties that return.*datetime object' + ) + self.assertEqual(cert.not_valid_before, now_dt) + self.assertEqual(cert.not_valid_after, valid_after_now_dt) + + def test_no_name(self): + from acme.crypto_util import make_self_signed_cert + with pytest.raises(AssertionError): + make_self_signed_cert(self.privkey, ips=[ipaddress.ip_address("1.1.1.1")]) + make_self_signed_cert(self.privkey) + + def test_extensions(self): + from acme.crypto_util import make_self_signed_cert + extension_type = x509.TLSFeature([x509.TLSFeatureType.status_request]) + extension = x509.Extension( + x509.TLSFeature.oid, + False, + extension_type + ) + cert = make_self_signed_cert( + self.privkey, + ips=[ipaddress.ip_address("1.1.1.1")], + extensions=[extension] + ) + self.assertIn(extension, cert.extensions) + + class GenSsCertTest(unittest.TestCase): """Test for gen_ss_cert (generation of self-signed cert).""" @@ -199,18 +287,27 @@ class GenSsCertTest(unittest.TestCase): def test_sn_collisions(self): from acme.crypto_util import gen_ss_cert - for _ in range(self.cert_count): - cert = gen_ss_cert(self.key, ['dummy'], force_san=True, - ips=[ipaddress.ip_address("10.10.10.10")]) - self.serial_num.append(cert.get_serial_number()) - assert len(set(self.serial_num)) >= self.cert_count - + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + for _ in range(self.cert_count): + cert = gen_ss_cert(self.key, ['dummy'], force_san=True, + ips=[ipaddress.ip_address("10.10.10.10")]) + self.serial_num.append(cert.get_serial_number()) + assert len(set(self.serial_num)) >= self.cert_count def test_no_name(self): from acme.crypto_util import gen_ss_cert - with pytest.raises(AssertionError): - gen_ss_cert(self.key, ips=[ipaddress.ip_address("1.1.1.1")]) - gen_ss_cert(self.key) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + with pytest.raises(AssertionError): + gen_ss_cert(self.key, ips=[ipaddress.ip_address("1.1.1.1")]) + gen_ss_cert(self.key) + + def test_no_ips(self): + from acme.crypto_util import gen_ss_cert + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + gen_ss_cert(self.key, ['dummy']) class MakeCSRTest(unittest.TestCase): diff --git a/acme/acme/challenges.py b/acme/acme/challenges.py index 6a4d6d6dc..161637c5f 100644 --- a/acme/acme/challenges.py +++ b/acme/acme/challenges.py @@ -1,6 +1,5 @@ """ACME Identifier Validation Challenges.""" import abc -import codecs import functools import hashlib import logging @@ -436,12 +435,22 @@ class TLSALPN01Response(KeyAuthorizationChallengeResponse): key = crypto.PKey() key.generate_key(crypto.TYPE_RSA, bits) - der_value = b"DER:" + codecs.encode(self.h, 'hex') - acme_extension = crypto.X509Extension(self.ID_PE_ACME_IDENTIFIER_V1, - critical=True, value=der_value) + oid = x509.ObjectIdentifier(self.ID_PE_ACME_IDENTIFIER_V1.decode()) + acme_extension = x509.Extension( + oid, + critical=True, + value=x509.UnrecognizedExtension(oid, self.h) + ) - return crypto_util.gen_ss_cert(key, [domain], force_san=True, - extensions=[acme_extension]), key + cryptography_key = key.to_cryptography_key() + assert isinstance(cryptography_key, crypto_util.CertificateIssuerPrivateKeyTypesTpl) + cert = crypto_util.make_self_signed_cert( + cryptography_key, + [domain], + force_san=True, + extensions=[acme_extension] + ) + return crypto.X509.from_cryptography(cert), key def probe_cert(self, domain: str, host: Optional[str] = None, port: Optional[int] = None) -> crypto.X509: diff --git a/acme/acme/crypto_util.py b/acme/acme/crypto_util.py index 4fd24b33d..c410fe4ce 100644 --- a/acme/acme/crypto_util.py +++ b/acme/acme/crypto_util.py @@ -2,6 +2,7 @@ import binascii import contextlib import enum +from datetime import datetime, timedelta, timezone import ipaddress import logging import os @@ -25,6 +26,7 @@ from OpenSSL import crypto from OpenSSL import SSL from acme import errors +import warnings logger = logging.getLogger(__name__) @@ -240,6 +242,35 @@ def probe_sni(name: bytes, host: bytes, port: int = 443, timeout: int = 300, # return cert +# Annoyingly, we can't directly use cryptography's equivalent Union[] type for +# our type signatures since they're only public API in 40.0.x+, which is too new +# for some Certbot # distribution channels. Once we bump our oldest cryptography +# version past 40.0.x, usage of this type can be replaced with: +# cryptography.hazmat.primitives.asymmetric.types.CertificateIssuerPrivateKeyTypes +CertificateIssuerPrivateKeyTypes = Union[ + dsa.DSAPrivateKey, + rsa.RSAPrivateKey, + ec.EllipticCurvePrivateKey, + ed25519.Ed25519PrivateKey, + ed448.Ed448PrivateKey, +] +# Even *more* annoyingly, due to a mypy bug, we can't use Union[] types in +# isinstance expressions without causing false mypy errors. So we have to +# recreate the type collection as a tuple here. And no, typing.get_args doesn't +# work due to another mypy bug. +# +# mypy issues: +# * https://github.com/python/mypy/issues/17680 +# * https://github.com/python/mypy/issues/15106 +CertificateIssuerPrivateKeyTypesTpl = ( + dsa.DSAPrivateKey, + rsa.RSAPrivateKey, + ec.EllipticCurvePrivateKey, + ed25519.Ed25519PrivateKey, + ed448.Ed448PrivateKey, +) + + def make_csr( private_key_pem: bytes, domains: Optional[Union[Set[str], List[str]]] = None, @@ -262,18 +293,7 @@ def make_csr( """ private_key = serialization.load_pem_private_key(private_key_pem, password=None) - # There are a few things that aren't valid for x509 signing. mypy - # complains if we don't check. - if not isinstance( - private_key, - ( - dsa.DSAPrivateKey, - rsa.RSAPrivateKey, - ec.EllipticCurvePrivateKey, - ed25519.Ed25519PrivateKey, - ed448.Ed448PrivateKey, - ), - ): + if not isinstance(private_key, CertificateIssuerPrivateKeyTypesTpl): raise ValueError(f"Invalid private key type: {type(private_key)}") if domains is None: domains = [] @@ -371,6 +391,85 @@ def _pyopenssl_cert_or_req_san(cert_or_req: Union[crypto.X509, crypto.X509Req]) return san_ext.value.get_values_for_type(x509.DNSName) +# Helper function that can be mocked in unit tests +def _now() -> datetime: + return datetime.now(tz=timezone.utc) + + +def make_self_signed_cert(private_key: CertificateIssuerPrivateKeyTypes, + domains: Optional[List[str]] = None, + not_before: Optional[datetime] = None, + validity: Optional[timedelta] = None, force_san: bool = True, + extensions: Optional[List[x509.Extension]] = None, + ips: Optional[List[Union[ipaddress.IPv4Address, + ipaddress.IPv6Address]]] = None + ) -> x509.Certificate: + """Generate new self-signed certificate. + :param buffer private_key_pem: Private key, in PEM PKCS#8 format. + :type domains: `list` of `str` + :param int not_before: A datetime after which the cert is valid. If no + timezone is specified, UTC is assumed + :type not_before: `datetime.datetime` + :param validity: Duration for which the cert will be valid. Defaults to 1 + week + :type validity: `datetime.timedelta` + :param buffer private_key_pem: One of `CertificateIssuerPrivateKeyTypes` + :param bool force_san: + :param extensions: List of additional extensions to include in the cert. + :type extensions: `list` of `x509.Extension[x509.ExtensionType]` + :type ips: `list` of (`ipaddress.IPv4Address` or `ipaddress.IPv6Address`) + If more than one domain is provided, all of the domains are put into + ``subjectAltName`` X.509 extension and first domain is set as the + subject CN. If only one domain is provided no ``subjectAltName`` + extension is used, unless `force_san` is ``True``. + """ + assert domains or ips, "Must provide one or more hostnames or IPs for the cert." + + builder = x509.CertificateBuilder() + builder = builder.serial_number(x509.random_serial_number()) + + if extensions is not None: + for ext in extensions: + builder = builder.add_extension(ext.value, ext.critical) + if domains is None: + domains = [] + if ips is None: + ips = [] + builder = builder.add_extension(x509.BasicConstraints(ca=True, path_length=0), critical=True) + + name_attrs = [] + if len(domains) > 0: + name_attrs.append(x509.NameAttribute( + x509.OID_COMMON_NAME, + domains[0] + )) + + builder = builder.subject_name(x509.Name(name_attrs)) + builder = builder.issuer_name(x509.Name(name_attrs)) + + sanlist: List[x509.GeneralName] = [] + for address in domains: + sanlist.append(x509.DNSName(address)) + for ip in ips: + sanlist.append(x509.IPAddress(ip)) + if force_san or len(domains) > 1 or len(ips) > 0: + builder = builder.add_extension( + x509.SubjectAlternativeName(sanlist), + critical=False + ) + + if not_before is None: + not_before = _now() + if validity is None: + validity = timedelta(seconds=7 * 24 * 60 * 60) + builder = builder.not_valid_before(not_before) + builder = builder.not_valid_after(not_before + validity) + + public_key = private_key.public_key() + builder = builder.public_key(public_key) + return builder.sign(private_key, hashes.SHA256()) + + def gen_ss_cert(key: crypto.PKey, domains: Optional[List[str]] = None, not_before: Optional[int] = None, validity: int = (7 * 24 * 60 * 60), force_san: bool = True, @@ -391,7 +490,14 @@ def gen_ss_cert(key: crypto.PKey, domains: Optional[List[str]] = None, subject CN. If only one domain is provided no ``subjectAltName`` extension is used, unless `force_san` is ``True``. + .. deprecated: 2.10.0 """ + warnings.warn( + "acme.crypto_util.gen_ss_cert is deprecated and will be removed in the " + "next major release of Certbot. Please use " + "acme.crypto_util.make_self_signed_cert instead.", DeprecationWarning, + stacklevel=2 + ) assert domains or ips, "Must provide one or more hostnames or IPs for the cert." cert = crypto.X509() diff --git a/certbot-compatibility-test/certbot_compatibility_test/test_driver.py b/certbot-compatibility-test/certbot_compatibility_test/test_driver.py index f98191aa0..791b6f1f6 100644 --- a/certbot-compatibility-test/certbot_compatibility_test/test_driver.py +++ b/certbot-compatibility-test/certbot_compatibility_test/test_driver.py @@ -147,7 +147,7 @@ def test_installer(args: argparse.Namespace, plugin: common.Proxy, config: str, def test_deploy_cert(plugin: common.Proxy, temp_dir: str, domains: List[str]) -> bool: """Tests deploy_cert returning True if the tests are successful""" - cert = crypto_util.gen_ss_cert(util.KEY, domains).to_cryptography() + cert = crypto_util.make_self_signed_cert(util.KEY, domains) cert_path = os.path.join(temp_dir, "cert.pem") with open(cert_path, "wb") as f: f.write(cert.public_bytes(serialization.Encoding.PEM)) diff --git a/certbot-compatibility-test/certbot_compatibility_test/util.py b/certbot-compatibility-test/certbot_compatibility_test/util.py index 4fe2e417d..9aba0e0e3 100644 --- a/certbot-compatibility-test/certbot_compatibility_test/util.py +++ b/certbot-compatibility-test/certbot_compatibility_test/util.py @@ -14,8 +14,8 @@ from certbot_compatibility_test import errors _KEY_BASE = "rsa2048_key.pem" KEY_PATH = test_util.vector_path(_KEY_BASE) -KEY = test_util.load_pyopenssl_private_key(_KEY_BASE) -JWK = jose.JWKRSA(key=test_util.load_rsa_private_key(_KEY_BASE)) +KEY = test_util.load_rsa_private_key_pem(_KEY_BASE) +JWK = jose.JWKRSA(key=test_util.load_jose_rsa_private_key_pem(_KEY_BASE)) IP_REGEX = re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$") diff --git a/certbot-compatibility-test/certbot_compatibility_test/validator.py b/certbot-compatibility-test/certbot_compatibility_test/validator.py index d8083db48..5fc226e4c 100644 --- a/certbot-compatibility-test/certbot_compatibility_test/validator.py +++ b/certbot-compatibility-test/certbot_compatibility_test/validator.py @@ -34,12 +34,12 @@ class Validator: name = name if isinstance(name, bytes) else name.encode() try: - presented_cert = crypto_util.probe_sni(name, host, port) + presented_cert = crypto_util.probe_sni(name, host, port).to_cryptography() except acme_errors.Error as error: logger.exception(str(error)) return False - return presented_cert.to_cryptography() == cert + return presented_cert == cert def redirect(self, name: str, port: int = 80, headers: Optional[Mapping[str, str]] = None) -> bool: diff --git a/certbot-nginx/certbot_nginx/_internal/configurator.py b/certbot-nginx/certbot_nginx/_internal/configurator.py index 26afc65ac..2a7f9ce52 100644 --- a/certbot-nginx/certbot_nginx/_internal/configurator.py +++ b/certbot-nginx/certbot_nginx/_internal/configurator.py @@ -22,7 +22,8 @@ from typing import Tuple from typing import Type from typing import Union -import OpenSSL +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import rsa from acme import challenges from acme import crypto_util as acme_crypto_util @@ -696,14 +697,16 @@ class NginxConfigurator(common.Configurator): # TODO: generate only once tmp_dir = os.path.join(self.config.work_dir, "snakeoil") le_key = crypto_util.generate_key( - key_size=2048, key_dir=tmp_dir, keyname="key.pem", + key_type='rsa', key_size=2048, key_dir=tmp_dir, keyname="key.pem", strict_permissions=self.config.strict_permissions) assert le_key.file is not None - key = OpenSSL.crypto.load_privatekey( - OpenSSL.crypto.FILETYPE_PEM, le_key.pem) - cert = acme_crypto_util.gen_ss_cert(key, domains=[socket.gethostname()]) - cert_pem = OpenSSL.crypto.dump_certificate( - OpenSSL.crypto.FILETYPE_PEM, cert) + cryptography_key = serialization.load_pem_private_key(le_key.pem, password=None) + assert isinstance(cryptography_key, rsa.RSAPrivateKey) + cert = acme_crypto_util.make_self_signed_cert( + cryptography_key, + domains=[socket.gethostname()] + ) + cert_pem = cert.public_bytes(serialization.Encoding.PEM) cert_file, cert_path = util.unique_file( os.path.join(tmp_dir, "cert.pem"), mode="wb") with cert_file: diff --git a/certbot/certbot/tests/acme_util.py b/certbot/certbot/tests/acme_util.py index 6412f5dc7..80a20b014 100644 --- a/certbot/certbot/tests/acme_util.py +++ b/certbot/certbot/tests/acme_util.py @@ -12,7 +12,7 @@ from certbot._internal import auth_handler from certbot.tests import util JWK = jose.JWK.load(util.load_vector('rsa512_key.pem')) -KEY = util.load_rsa_private_key('rsa512_key.pem') +KEY = util.load_jose_rsa_private_key_pem('rsa512_key.pem') # Challenges HTTP01 = challenges.HTTP01( diff --git a/certbot/certbot/tests/util.py b/certbot/certbot/tests/util.py index bd21b1ac0..021581d7c 100644 --- a/certbot/certbot/tests/util.py +++ b/certbot/certbot/tests/util.py @@ -126,7 +126,12 @@ def load_comparable_csr(*names: str) -> jose.ComparableX509: return jose.ComparableX509(load_csr(*names)) -def load_rsa_private_key(*names: str) -> jose.ComparableRSAKey: +def load_jose_rsa_private_key_pem(*names: str) -> jose.ComparableRSAKey: + """Load RSA private key wrapped in jose.ComparableRSAKey""" + return jose.ComparableRSAKey(load_rsa_private_key_pem(*names)) + + +def load_rsa_private_key_pem(*names: str) -> RSAPrivateKey: """Load RSA private key.""" loader = _guess_loader(names[-1], crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1) loader_fn: Callable[..., Any] @@ -134,16 +139,9 @@ def load_rsa_private_key(*names: str) -> jose.ComparableRSAKey: loader_fn = serialization.load_pem_private_key else: loader_fn = serialization.load_der_private_key - return jose.ComparableRSAKey( - cast(RSAPrivateKey, - loader_fn(load_vector(*names), password=None, backend=default_backend()))) - - -def load_pyopenssl_private_key(*names: str) -> crypto.PKey: - """Load pyOpenSSL private key.""" - loader = _guess_loader( - names[-1], crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1) - return crypto.load_privatekey(loader, load_vector(*names)) + key = loader_fn(load_vector(*names), password=None, backend=default_backend()) + assert isinstance(key, RSAPrivateKey) + return key def make_lineage(config_dir: str, testfile: str, ec: bool = True) -> str: