diff --git a/acme/acme/_internal/tests/crypto_util_test.py b/acme/acme/_internal/tests/crypto_util_test.py index 84142a012..bd3151820 100644 --- a/acme/acme/_internal/tests/crypto_util_test.py +++ b/acme/acme/_internal/tests/crypto_util_test.py @@ -12,6 +12,9 @@ import unittest import josepy as jose import OpenSSL import pytest +from cryptography import x509 +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import rsa, x25519 from acme import errors from acme._internal.tests import test_util @@ -250,79 +253,74 @@ class MakeCSRTest(unittest.TestCase): @classmethod def _call_with_key(cls, *args, **kwargs): - privkey = OpenSSL.crypto.PKey() - privkey.generate_key(OpenSSL.crypto.TYPE_RSA, 2048) - privkey_pem = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, privkey) + privkey = rsa.generate_private_key(public_exponent=65537, key_size=2048) + privkey_pem = privkey.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.PKCS8, + serialization.NoEncryption(), + ) from acme.crypto_util import make_csr + return make_csr(privkey_pem, *args, **kwargs) def test_make_csr(self): csr_pem = self._call_with_key(["a.example", "b.example"]) - assert b'--BEGIN CERTIFICATE REQUEST--' in csr_pem - assert b'--END CERTIFICATE REQUEST--' in csr_pem - csr = OpenSSL.crypto.load_certificate_request( - OpenSSL.crypto.FILETYPE_PEM, csr_pem) - # In pyopenssl 0.13 (used with TOXENV=py27-oldest), csr objects don't - # have a get_extensions() method, so we skip this test if the method - # isn't available. - if hasattr(csr, 'get_extensions'): - assert len(csr.get_extensions()) == 1 - assert csr.get_extensions()[0].get_data() == \ - OpenSSL.crypto.X509Extension( - b'subjectAltName', - critical=False, - value=b'DNS:a.example, DNS:b.example', - ).get_data() + assert b"--BEGIN CERTIFICATE REQUEST--" in csr_pem + assert b"--END CERTIFICATE REQUEST--" in csr_pem + csr = x509.load_pem_x509_csr(csr_pem) + + assert len(csr.extensions) == 1 + assert list( + csr.extensions.get_extension_for_class(x509.SubjectAlternativeName).value + ) == [ + x509.DNSName("a.example"), + x509.DNSName("b.example"), + ] def test_make_csr_ip(self): - csr_pem = self._call_with_key(["a.example"], False, [ipaddress.ip_address('127.0.0.1'), ipaddress.ip_address('::1')]) - assert b'--BEGIN CERTIFICATE REQUEST--' in csr_pem - assert b'--END CERTIFICATE REQUEST--' in csr_pem - csr = OpenSSL.crypto.load_certificate_request( - OpenSSL.crypto.FILETYPE_PEM, csr_pem) - # In pyopenssl 0.13 (used with TOXENV=py27-oldest), csr objects don't - # have a get_extensions() method, so we skip this test if the method - # isn't available. - if hasattr(csr, 'get_extensions'): - assert len(csr.get_extensions()) == 1 - assert csr.get_extensions()[0].get_data() == \ - OpenSSL.crypto.X509Extension( - b'subjectAltName', - critical=False, - value=b'DNS:a.example, IP:127.0.0.1, IP:::1', - ).get_data() - # for IP san it's actually need to be octet-string, - # but somewhere downstream thankfully handle it for us + csr_pem = self._call_with_key( + ["a.example"], + False, + [ipaddress.ip_address("127.0.0.1"), ipaddress.ip_address("::1")], + ) + assert b"--BEGIN CERTIFICATE REQUEST--" in csr_pem + assert b"--END CERTIFICATE REQUEST--" in csr_pem + + csr = x509.load_pem_x509_csr(csr_pem) + + assert len(csr.extensions) == 1 + assert list( + csr.extensions.get_extension_for_class(x509.SubjectAlternativeName).value + ) == [ + x509.DNSName("a.example"), + x509.IPAddress(ipaddress.ip_address("127.0.0.1")), + x509.IPAddress(ipaddress.ip_address("::1")), + ] def test_make_csr_must_staple(self): csr_pem = self._call_with_key(["a.example"], must_staple=True) - csr = OpenSSL.crypto.load_certificate_request( - OpenSSL.crypto.FILETYPE_PEM, csr_pem) + csr = x509.load_pem_x509_csr(csr_pem) - # In pyopenssl 0.13 (used with TOXENV=py27-oldest), csr objects don't - # have a get_extensions() method, so we skip this test if the method - # isn't available. - if hasattr(csr, 'get_extensions'): - assert len(csr.get_extensions()) == 2 - # NOTE: Ideally we would filter by the TLS Feature OID, but - # OpenSSL.crypto.X509Extension doesn't give us the extension's raw OID, - # and the shortname field is just "UNDEF" - must_staple_exts = [e for e in csr.get_extensions() - if e.get_data() == b"0\x03\x02\x01\x05"] - assert len(must_staple_exts) == 1, \ - "Expected exactly one Must Staple extension" + assert len(csr.extensions) == 2 + assert list(csr.extensions.get_extension_for_class(x509.TLSFeature).value) == [ + x509.TLSFeatureType.status_request + ] def test_make_csr_without_hostname(self): with pytest.raises(ValueError): self._call_with_key() - def test_make_csr_correct_version(self): - csr_pem = self._call_with_key(["a.example"]) - csr = OpenSSL.crypto.load_certificate_request( - OpenSSL.crypto.FILETYPE_PEM, csr_pem) + def test_make_csr_invalid_key_type(self): + privkey = x25519.X25519PrivateKey.generate() + privkey_pem = privkey.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.PKCS8, + serialization.NoEncryption(), + ) + from acme.crypto_util import make_csr - assert csr.get_version() == 0, \ - "Expected CSR version to be v1 (encoded as 0), per RFC 2986, section 4" + with pytest.raises(ValueError): + make_csr(privkey_pem, ["a.example"]) class DumpPyopensslChainTest(unittest.TestCase): diff --git a/acme/acme/crypto_util.py b/acme/acme/crypto_util.py index f35b634b5..1267a6129 100644 --- a/acme/acme/crypto_util.py +++ b/acme/acme/crypto_util.py @@ -16,6 +16,9 @@ from typing import Set from typing import Tuple from typing import Union +from cryptography import x509 +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import dsa, rsa, ec, ed25519, ed448 import josepy as jose from OpenSSL import crypto from OpenSSL import SSL @@ -222,10 +225,12 @@ def probe_sni(name: bytes, host: bytes, port: int = 443, timeout: int = 300, # return cert -def make_csr(private_key_pem: bytes, domains: Optional[Union[Set[str], List[str]]] = None, - must_staple: bool = False, - ipaddrs: Optional[List[Union[ipaddress.IPv4Address, ipaddress.IPv6Address]]] = None - ) -> bytes: +def make_csr( + private_key_pem: bytes, + domains: Optional[Union[Set[str], List[str]]] = None, + must_staple: bool = False, + ipaddrs: Optional[List[Union[ipaddress.IPv4Address, ipaddress.IPv6Address]]] = None, +) -> bytes: """Generate a CSR containing domains or IPs as subjectAltNames. :param buffer private_key_pem: Private key, in PEM PKCS#8 format. @@ -237,44 +242,50 @@ def make_csr(private_key_pem: bytes, domains: Optional[Union[Set[str], List[str] params ordered this way for backward competablity when called by positional argument. :returns: buffer PEM-encoded Certificate Signing Request. """ - private_key = crypto.load_privatekey( - crypto.FILETYPE_PEM, private_key_pem) - csr = crypto.X509Req() - sanlist = [] - # if domain or ip list not supplied make it empty list so it's easier to iterate + private_key = serialization.load_pem_private_key(private_key_pem, password=None) + # There are a few things that aren't valid for x509 signing. mypy + # complains if we don't check. + if not isinstance( + private_key, + ( + dsa.DSAPrivateKey, + rsa.RSAPrivateKey, + ec.EllipticCurvePrivateKey, + ed25519.Ed25519PrivateKey, + ed448.Ed448PrivateKey, + ), + ): + raise ValueError(f"Invalid private key type: {type(private_key)}") if domains is None: domains = [] if ipaddrs is None: ipaddrs = [] - if len(domains)+len(ipaddrs) == 0: - raise ValueError("At least one of domains or ipaddrs parameter need to be not empty") - for address in domains: - sanlist.append('DNS:' + address) - for ips in ipaddrs: - sanlist.append('IP:' + ips.exploded) - # make sure its ascii encoded - san_string = ', '.join(sanlist).encode('ascii') - # for IP san it's actually need to be octet-string, - # but somewhere downsteam thankfully handle it for us - extensions = [ - crypto.X509Extension( - b'subjectAltName', + if len(domains) + len(ipaddrs) == 0: + raise ValueError( + "At least one of domains or ipaddrs parameter need to be not empty" + ) + + builder = ( + x509.CertificateSigningRequestBuilder() + .subject_name(x509.Name([])) + .add_extension( + x509.SubjectAlternativeName( + [x509.DNSName(d) for d in domains] + + [x509.IPAddress(i) for i in ipaddrs] + ), critical=False, - value=san_string - ), - ] + ) + ) if must_staple: - extensions.append(crypto.X509Extension( - b"1.3.6.1.5.5.7.1.24", + builder = builder.add_extension( + # "status_request" is the feature commonly known as OCSP + # Must-Staple + x509.TLSFeature([x509.TLSFeatureType.status_request]), critical=False, - value=b"DER:30:03:02:01:05")) - csr.add_extensions(extensions) - csr.set_pubkey(private_key) - # RFC 2986 Section 4.1 only defines version 0 - csr.set_version(0) - csr.sign(private_key, 'sha256') - return crypto.dump_certificate_request( - crypto.FILETYPE_PEM, csr) + ) + + csr = builder.sign(private_key, hashes.SHA256()) + return csr.public_bytes(serialization.Encoding.PEM) def _pyopenssl_cert_or_req_all_names(loaded_cert_or_req: Union[crypto.X509, crypto.X509Req]