diff --git a/acme/acme/_internal/tests/crypto_util_test.py b/acme/acme/_internal/tests/crypto_util_test.py index 84142a012..b71b986ad 100644 --- a/acme/acme/_internal/tests/crypto_util_test.py +++ b/acme/acme/_internal/tests/crypto_util_test.py @@ -8,9 +8,11 @@ import threading import time from typing import List import unittest +import warnings import josepy as jose import OpenSSL +from cryptography import x509 import pytest from acme import errors @@ -219,10 +221,59 @@ class PyOpenSSLCertOrReqSANIPTest(unittest.TestCase): ['0:0:0:0:0:0:0:1', 'A3BE:32F3:206E:C75D:956:CEE:9858:5EC5'] +class GenMakeSelfSignedCertTest(unittest.TestCase): + """Test for make_self_signed_cert.""" + + def setUp(self): + self.cert_count = 5 + self.serial_num: List[int] = [] + self.key = OpenSSL.crypto.PKey() + self.key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048) + + def test_sn_collisions(self): + from acme.crypto_util import make_self_signed_cert + for _ in range(self.cert_count): + cert = make_self_signed_cert(self.key, ['dummy'], force_san=True, + ips=[ipaddress.ip_address("10.10.10.10")]) + self.serial_num.append(cert.get_serial_number()) + assert len(set(self.serial_num)) >= self.cert_count + + def test_no_name(self): + from acme.crypto_util import make_self_signed_cert + with pytest.raises(AssertionError): + make_self_signed_cert(self.key, ips=[ipaddress.ip_address("1.1.1.1")]) + make_self_signed_cert(self.key) + + def test_fail_with_public_key(self): + from acme.crypto_util import make_self_signed_cert + from acme.errors import Error + pubkey_bytes = OpenSSL.crypto.dump_publickey(OpenSSL.crypto.FILETYPE_PEM, self.key) + pubkey = OpenSSL.crypto.load_publickey(OpenSSL.crypto.FILETYPE_PEM, pubkey_bytes) + with pytest.raises(Error): + make_self_signed_cert(pubkey, ips=[ipaddress.ip_address("1.1.1.1")]) + + def test_extensions(self): + from acme.crypto_util import make_self_signed_cert + extension_type = x509.TLSFeature([x509.TLSFeatureType.status_request]) + extension = x509.Extension( + x509.TLSFeature.oid, + False, + extension_type + ) + cert = make_self_signed_cert( + self.key, + ips=[ipaddress.ip_address("1.1.1.1")], + extensions=[extension] + ) + # Since extensions in pyOpenSSL are deprecated, convert back to a + # cryptography type to check our extensions + cryptography_cert = cert.to_cryptography() + self.assertIn(extension, cryptography_cert.extensions) + + class GenSsCertTest(unittest.TestCase): """Test for gen_ss_cert (generation of self-signed cert).""" - def setUp(self): self.cert_count = 5 self.serial_num: List[int] = [] @@ -231,18 +282,27 @@ class GenSsCertTest(unittest.TestCase): def test_sn_collisions(self): from acme.crypto_util import gen_ss_cert - for _ in range(self.cert_count): - cert = gen_ss_cert(self.key, ['dummy'], force_san=True, - ips=[ipaddress.ip_address("10.10.10.10")]) - self.serial_num.append(cert.get_serial_number()) - assert len(set(self.serial_num)) >= self.cert_count + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + for _ in range(self.cert_count): + cert = gen_ss_cert(self.key, ['dummy'], force_san=True, + ips=[ipaddress.ip_address("10.10.10.10")]) + self.serial_num.append(cert.get_serial_number()) + assert len(set(self.serial_num)) >= self.cert_count + def test_no_ips(self): + from acme.crypto_util import gen_ss_cert + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + gen_ss_cert(self.key, domains=['a.example']) def test_no_name(self): from acme.crypto_util import gen_ss_cert - with pytest.raises(AssertionError): - gen_ss_cert(self.key, ips=[ipaddress.ip_address("1.1.1.1")]) - gen_ss_cert(self.key) + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + with pytest.raises(AssertionError): + gen_ss_cert(self.key, ips=[ipaddress.ip_address("1.1.1.1")]) + gen_ss_cert(self.key) class MakeCSRTest(unittest.TestCase): @@ -260,69 +320,54 @@ class MakeCSRTest(unittest.TestCase): csr_pem = self._call_with_key(["a.example", "b.example"]) assert b'--BEGIN CERTIFICATE REQUEST--' in csr_pem assert b'--END CERTIFICATE REQUEST--' in csr_pem - csr = OpenSSL.crypto.load_certificate_request( - OpenSSL.crypto.FILETYPE_PEM, csr_pem) - # In pyopenssl 0.13 (used with TOXENV=py27-oldest), csr objects don't - # have a get_extensions() method, so we skip this test if the method - # isn't available. - if hasattr(csr, 'get_extensions'): - assert len(csr.get_extensions()) == 1 - assert csr.get_extensions()[0].get_data() == \ - OpenSSL.crypto.X509Extension( - b'subjectAltName', - critical=False, - value=b'DNS:a.example, DNS:b.example', - ).get_data() + csr = x509.load_pem_x509_csr(csr_pem) + assert len(csr.extensions) == 1 + self.assertEqual( + csr.extensions[0].value, + x509.SubjectAlternativeName([ + x509.DNSName('a.example'), + x509.DNSName('b.example'), + ]) + ) def test_make_csr_ip(self): csr_pem = self._call_with_key(["a.example"], False, [ipaddress.ip_address('127.0.0.1'), ipaddress.ip_address('::1')]) assert b'--BEGIN CERTIFICATE REQUEST--' in csr_pem assert b'--END CERTIFICATE REQUEST--' in csr_pem - csr = OpenSSL.crypto.load_certificate_request( - OpenSSL.crypto.FILETYPE_PEM, csr_pem) - # In pyopenssl 0.13 (used with TOXENV=py27-oldest), csr objects don't - # have a get_extensions() method, so we skip this test if the method - # isn't available. - if hasattr(csr, 'get_extensions'): - assert len(csr.get_extensions()) == 1 - assert csr.get_extensions()[0].get_data() == \ - OpenSSL.crypto.X509Extension( - b'subjectAltName', - critical=False, - value=b'DNS:a.example, IP:127.0.0.1, IP:::1', - ).get_data() - # for IP san it's actually need to be octet-string, - # but somewhere downstream thankfully handle it for us + csr = x509.load_pem_x509_csr(csr_pem) + assert len(csr.extensions) == 1 + self.assertEqual( + csr.extensions[0].value, + x509.SubjectAlternativeName([ + x509.DNSName('a.example'), + x509.IPAddress(ipaddress.ip_address('127.0.0.1')), + x509.IPAddress(ipaddress.ip_address('::1')), + ]) + ) def test_make_csr_must_staple(self): csr_pem = self._call_with_key(["a.example"], must_staple=True) - csr = OpenSSL.crypto.load_certificate_request( - OpenSSL.crypto.FILETYPE_PEM, csr_pem) - - # In pyopenssl 0.13 (used with TOXENV=py27-oldest), csr objects don't - # have a get_extensions() method, so we skip this test if the method - # isn't available. - if hasattr(csr, 'get_extensions'): - assert len(csr.get_extensions()) == 2 - # NOTE: Ideally we would filter by the TLS Feature OID, but - # OpenSSL.crypto.X509Extension doesn't give us the extension's raw OID, - # and the shortname field is just "UNDEF" - must_staple_exts = [e for e in csr.get_extensions() - if e.get_data() == b"0\x03\x02\x01\x05"] - assert len(must_staple_exts) == 1, \ - "Expected exactly one Must Staple extension" + csr = x509.load_pem_x509_csr(csr_pem) + assert len(csr.extensions) == 2 + ext = csr.extensions.get_extension_for_class(x509.TLSFeature) + self.assertEqual(ext.value, x509.TLSFeature([x509.TLSFeatureType.status_request])) def test_make_csr_without_hostname(self): with pytest.raises(ValueError): self._call_with_key() - def test_make_csr_correct_version(self): - csr_pem = self._call_with_key(["a.example"]) - csr = OpenSSL.crypto.load_certificate_request( - OpenSSL.crypto.FILETYPE_PEM, csr_pem) - - assert csr.get_version() == 0, \ - "Expected CSR version to be v1 (encoded as 0), per RFC 2986, section 4" + def test_make_csr_with_invalid_private_key(self): + from cryptography.hazmat.primitives.asymmetric import dh + from cryptography.hazmat.primitives import serialization + dh_params = dh.generate_parameters(2, 1024) + priv_key = dh_params.generate_private_key() + priv_key_pem = priv_key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.PKCS8, + serialization.NoEncryption() + ) + from acme.crypto_util import make_csr + return make_csr(priv_key_pem, ['a.example']) class DumpPyopensslChainTest(unittest.TestCase): diff --git a/acme/acme/challenges.py b/acme/acme/challenges.py index 818df9032..30d8bc6e5 100644 --- a/acme/acme/challenges.py +++ b/acme/acme/challenges.py @@ -479,13 +479,15 @@ class TLSALPN01Response(KeyAuthorizationChallengeResponse): if len(names) != 1 or names[0].lower() != domain.lower(): return False - for i in range(cert.get_extension_count()): - ext = cert.get_extension(i) - # FIXME: assume this is the ACME extension. Currently there is no - # way to get full OID of an unknown extension from pyopenssl. - if ext.get_short_name() == b'UNDEF': - data = ext.get_data() - return data == self.h + crypto_cert = cert.to_cryptography() + oid = x509.ObjectIdentifier(self.ID_PE_ACME_IDENTIFIER_V1) + der_value = b"DER:" + codecs.encode(self.h, 'hex') + try: + ext = crypto_cert.extensions.get_extension_for_oid(oid) + if ext is not None: + return ext.value.value == der_value + except x509.ExtensionNotFound: + pass return False diff --git a/acme/acme/crypto_util.py b/acme/acme/crypto_util.py index f35b634b5..2e4bc026d 100644 --- a/acme/acme/crypto_util.py +++ b/acme/acme/crypto_util.py @@ -1,6 +1,7 @@ """Crypto utilities.""" import binascii import contextlib +from datetime import datetime import ipaddress import logging import os @@ -15,7 +16,14 @@ from typing import Sequence from typing import Set from typing import Tuple from typing import Union +import warnings +from cryptography import x509 +from cryptography.hazmat.primitives.hashes import SHA256 +from cryptography.hazmat.primitives.asymmetric.dsa import DSAPrivateKey +from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey +from cryptography.hazmat.primitives.serialization import Encoding +from cryptography.hazmat.primitives.serialization import load_pem_private_key import josepy as jose from OpenSSL import crypto from OpenSSL import SSL @@ -237,10 +245,11 @@ def make_csr(private_key_pem: bytes, domains: Optional[Union[Set[str], List[str] params ordered this way for backward competablity when called by positional argument. :returns: buffer PEM-encoded Certificate Signing Request. """ - private_key = crypto.load_privatekey( - crypto.FILETYPE_PEM, private_key_pem) - csr = crypto.X509Req() - sanlist = [] + private_key = load_pem_private_key(private_key_pem, None) + builder = x509.CertificateSigningRequestBuilder() + # set an empty subject name + builder = builder.subject_name(x509.Name([])) + sanlist: List[x509.GeneralName] = [] # if domain or ip list not supplied make it empty list so it's easier to iterate if domains is None: domains = [] @@ -249,32 +258,18 @@ def make_csr(private_key_pem: bytes, domains: Optional[Union[Set[str], List[str] if len(domains)+len(ipaddrs) == 0: raise ValueError("At least one of domains or ipaddrs parameter need to be not empty") for address in domains: - sanlist.append('DNS:' + address) + sanlist.append(x509.DNSName(address)) for ips in ipaddrs: - sanlist.append('IP:' + ips.exploded) - # make sure its ascii encoded - san_string = ', '.join(sanlist).encode('ascii') - # for IP san it's actually need to be octet-string, - # but somewhere downsteam thankfully handle it for us - extensions = [ - crypto.X509Extension( - b'subjectAltName', - critical=False, - value=san_string - ), - ] + sanlist.append(x509.IPAddress(ips)) + builder = builder.add_extension(x509.SubjectAlternativeName(sanlist), critical=False) + if must_staple: - extensions.append(crypto.X509Extension( - b"1.3.6.1.5.5.7.1.24", - critical=False, - value=b"DER:30:03:02:01:05")) - csr.add_extensions(extensions) - csr.set_pubkey(private_key) - # RFC 2986 Section 4.1 only defines version 0 - csr.set_version(0) - csr.sign(private_key, 'sha256') - return crypto.dump_certificate_request( - crypto.FILETYPE_PEM, csr) + builder = builder.add_extension( + x509.TLSFeature([x509.TLSFeatureType.status_request]), + critical=False + ) + csr = builder.sign(private_key, SHA256()) + return csr.public_bytes(Encoding.PEM) def _pyopenssl_cert_or_req_all_names(loaded_cert_or_req: Union[crypto.X509, crypto.X509Req] @@ -366,6 +361,78 @@ def _pyopenssl_extract_san_list_raw(cert_or_req: Union[crypto.X509, crypto.X509R return sans_parts +def make_self_signed_cert(key: crypto.PKey, domains: Optional[List[str]] = None, + not_before: Optional[int] = None, + validity: int = (7 * 24 * 60 * 60), force_san: bool = True, + extensions: Optional[List[x509.Extension[x509.ExtensionType]]] = None, + ips: Optional[List[Union[ipaddress.IPv4Address, + ipaddress.IPv6Address]]] = None + ) -> crypto.X509: + """Generate new self-signed certificate. + + :type domains: `list` of `str` + :param OpenSSL.crypto.PKey key: + :param bool force_san: + :param extensions: List of additional extensions to include in the cert. + :type extensions: `list` of `x509.Extension[x509.ExtensionType]` + :type ips: `list` of (`ipaddress.IPv4Address` or `ipaddress.IPv6Address`) + + If more than one domain is provided, all of the domains are put into + ``subjectAltName`` X.509 extension and first domain is set as the + subject CN. If only one domain is provided no ``subjectAltName`` + extension is used, unless `force_san` is ``True``. + + """ + assert domains or ips, "Must provide one or more hostnames or IPs for the cert." + + builder = x509.CertificateBuilder() + builder = builder.serial_number(int(binascii.hexlify(os.urandom(16)), 16)) + + if extensions is not None: + for ext in extensions: + builder = builder.add_extension(ext.value, ext.critical) + if domains is None: + domains = [] + if ips is None: + ips = [] + builder = builder.add_extension(x509.BasicConstraints(ca=True, path_length=0), critical=True) + + name_attrs = [] + if len(domains) > 0: + name_attrs.append(x509.NameAttribute( + x509.OID_COMMON_NAME, + domains[0] + )) + + builder = builder.subject_name(x509.Name(name_attrs)) + builder = builder.issuer_name(x509.Name(name_attrs)) + + sanlist: List[x509.GeneralName] = [] + for address in domains: + sanlist.append(x509.DNSName(address)) + for ip in ips: + sanlist.append(x509.IPAddress(ip)) + if force_san or len(domains) > 1 or len(ips) > 0: + builder = builder.add_extension( + x509.SubjectAlternativeName(sanlist), + critical=False + ) + + builder = builder.not_valid_before(datetime.fromtimestamp( + 0 if not_before is None else not_before + )) + builder = builder.not_valid_after(datetime.fromtimestamp(validity)) + + cryptography_priv = key.to_cryptography_key() + if not isinstance(cryptography_priv, (DSAPrivateKey, RSAPrivateKey)): + raise errors.Error("key must be a private key") + cryptography_pub = cryptography_priv.public_key() + builder = builder.public_key(cryptography_pub) + cryptography_cert = builder.sign(cryptography_priv, SHA256()) + + return crypto.X509.from_cryptography(cryptography_cert) + + def gen_ss_cert(key: crypto.PKey, domains: Optional[List[str]] = None, not_before: Optional[int] = None, validity: int = (7 * 24 * 60 * 60), force_san: bool = True, @@ -386,7 +453,14 @@ def gen_ss_cert(key: crypto.PKey, domains: Optional[List[str]] = None, subject CN. If only one domain is provided no ``subjectAltName`` extension is used, unless `force_san` is ``True``. + .. deprecated: 2.10.0 """ + warnings.warn( + "acme.crypto_util.gen_ss_cert is deprecated and will be removed in the " + "next major release of Certbot. Please use " + "acme.crypto_util.make_self_signed_cert instead.", DeprecationWarning, + stacklevel=2 + ) assert domains or ips, "Must provide one or more hostnames or IPs for the cert." cert = crypto.X509() diff --git a/certbot/CHANGELOG.md b/certbot/CHANGELOG.md index af026b836..82af26d5c 100644 --- a/certbot/CHANGELOG.md +++ b/certbot/CHANGELOG.md @@ -6,11 +6,13 @@ Certbot adheres to [Semantic Versioning](https://semver.org/). ### Added -* +* Added `acme.crypto_util.make_self_signed_cert` to replace `acme.crypto_util.gen_ss_cert`, which + has the same behavior but accepts `cryptography`'s `x509.Extension` types. ### Changed -* +* Deprecated `acme.crypto_util.gen_ss_cert` due to its use of deprecated types from `pyOpenSSL` and + will be removed in the next major release. ### Fixed