acme.crypto_util: deprecate gen_ss_cert, add make_self_signed_cert

gen_ss_cert includes deprecated pyOpenSSL API in its function signature,
so we can't update it without breaking backwards compatibility. A new
function, make_self_signed_cert, with nearly identical signature (except
for its use of cryptography's extension types) is added for users to
migrate to.
This commit is contained in:
Will Greenberg 2024-03-01 13:09:27 -08:00
parent 7abf143394
commit 5656a80f0e
4 changed files with 219 additions and 96 deletions

View file

@ -8,9 +8,11 @@ import threading
import time
from typing import List
import unittest
import warnings
import josepy as jose
import OpenSSL
from cryptography import x509
import pytest
from acme import errors
@ -219,10 +221,59 @@ class PyOpenSSLCertOrReqSANIPTest(unittest.TestCase):
['0:0:0:0:0:0:0:1', 'A3BE:32F3:206E:C75D:956:CEE:9858:5EC5']
class GenMakeSelfSignedCertTest(unittest.TestCase):
"""Test for make_self_signed_cert."""
def setUp(self):
self.cert_count = 5
self.serial_num: List[int] = []
self.key = OpenSSL.crypto.PKey()
self.key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048)
def test_sn_collisions(self):
from acme.crypto_util import make_self_signed_cert
for _ in range(self.cert_count):
cert = make_self_signed_cert(self.key, ['dummy'], force_san=True,
ips=[ipaddress.ip_address("10.10.10.10")])
self.serial_num.append(cert.get_serial_number())
assert len(set(self.serial_num)) >= self.cert_count
def test_no_name(self):
from acme.crypto_util import make_self_signed_cert
with pytest.raises(AssertionError):
make_self_signed_cert(self.key, ips=[ipaddress.ip_address("1.1.1.1")])
make_self_signed_cert(self.key)
def test_fail_with_public_key(self):
from acme.crypto_util import make_self_signed_cert
from acme.errors import Error
pubkey_bytes = OpenSSL.crypto.dump_publickey(OpenSSL.crypto.FILETYPE_PEM, self.key)
pubkey = OpenSSL.crypto.load_publickey(OpenSSL.crypto.FILETYPE_PEM, pubkey_bytes)
with pytest.raises(Error):
make_self_signed_cert(pubkey, ips=[ipaddress.ip_address("1.1.1.1")])
def test_extensions(self):
from acme.crypto_util import make_self_signed_cert
extension_type = x509.TLSFeature([x509.TLSFeatureType.status_request])
extension = x509.Extension(
x509.TLSFeature.oid,
False,
extension_type
)
cert = make_self_signed_cert(
self.key,
ips=[ipaddress.ip_address("1.1.1.1")],
extensions=[extension]
)
# Since extensions in pyOpenSSL are deprecated, convert back to a
# cryptography type to check our extensions
cryptography_cert = cert.to_cryptography()
self.assertIn(extension, cryptography_cert.extensions)
class GenSsCertTest(unittest.TestCase):
"""Test for gen_ss_cert (generation of self-signed cert)."""
def setUp(self):
self.cert_count = 5
self.serial_num: List[int] = []
@ -231,18 +282,27 @@ class GenSsCertTest(unittest.TestCase):
def test_sn_collisions(self):
from acme.crypto_util import gen_ss_cert
for _ in range(self.cert_count):
cert = gen_ss_cert(self.key, ['dummy'], force_san=True,
ips=[ipaddress.ip_address("10.10.10.10")])
self.serial_num.append(cert.get_serial_number())
assert len(set(self.serial_num)) >= self.cert_count
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
for _ in range(self.cert_count):
cert = gen_ss_cert(self.key, ['dummy'], force_san=True,
ips=[ipaddress.ip_address("10.10.10.10")])
self.serial_num.append(cert.get_serial_number())
assert len(set(self.serial_num)) >= self.cert_count
def test_no_ips(self):
from acme.crypto_util import gen_ss_cert
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
gen_ss_cert(self.key, domains=['a.example'])
def test_no_name(self):
from acme.crypto_util import gen_ss_cert
with pytest.raises(AssertionError):
gen_ss_cert(self.key, ips=[ipaddress.ip_address("1.1.1.1")])
gen_ss_cert(self.key)
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
with pytest.raises(AssertionError):
gen_ss_cert(self.key, ips=[ipaddress.ip_address("1.1.1.1")])
gen_ss_cert(self.key)
class MakeCSRTest(unittest.TestCase):
@ -260,69 +320,54 @@ class MakeCSRTest(unittest.TestCase):
csr_pem = self._call_with_key(["a.example", "b.example"])
assert b'--BEGIN CERTIFICATE REQUEST--' in csr_pem
assert b'--END CERTIFICATE REQUEST--' in csr_pem
csr = OpenSSL.crypto.load_certificate_request(
OpenSSL.crypto.FILETYPE_PEM, csr_pem)
# In pyopenssl 0.13 (used with TOXENV=py27-oldest), csr objects don't
# have a get_extensions() method, so we skip this test if the method
# isn't available.
if hasattr(csr, 'get_extensions'):
assert len(csr.get_extensions()) == 1
assert csr.get_extensions()[0].get_data() == \
OpenSSL.crypto.X509Extension(
b'subjectAltName',
critical=False,
value=b'DNS:a.example, DNS:b.example',
).get_data()
csr = x509.load_pem_x509_csr(csr_pem)
assert len(csr.extensions) == 1
self.assertEqual(
csr.extensions[0].value,
x509.SubjectAlternativeName([
x509.DNSName('a.example'),
x509.DNSName('b.example'),
])
)
def test_make_csr_ip(self):
csr_pem = self._call_with_key(["a.example"], False, [ipaddress.ip_address('127.0.0.1'), ipaddress.ip_address('::1')])
assert b'--BEGIN CERTIFICATE REQUEST--' in csr_pem
assert b'--END CERTIFICATE REQUEST--' in csr_pem
csr = OpenSSL.crypto.load_certificate_request(
OpenSSL.crypto.FILETYPE_PEM, csr_pem)
# In pyopenssl 0.13 (used with TOXENV=py27-oldest), csr objects don't
# have a get_extensions() method, so we skip this test if the method
# isn't available.
if hasattr(csr, 'get_extensions'):
assert len(csr.get_extensions()) == 1
assert csr.get_extensions()[0].get_data() == \
OpenSSL.crypto.X509Extension(
b'subjectAltName',
critical=False,
value=b'DNS:a.example, IP:127.0.0.1, IP:::1',
).get_data()
# for IP san it's actually need to be octet-string,
# but somewhere downstream thankfully handle it for us
csr = x509.load_pem_x509_csr(csr_pem)
assert len(csr.extensions) == 1
self.assertEqual(
csr.extensions[0].value,
x509.SubjectAlternativeName([
x509.DNSName('a.example'),
x509.IPAddress(ipaddress.ip_address('127.0.0.1')),
x509.IPAddress(ipaddress.ip_address('::1')),
])
)
def test_make_csr_must_staple(self):
csr_pem = self._call_with_key(["a.example"], must_staple=True)
csr = OpenSSL.crypto.load_certificate_request(
OpenSSL.crypto.FILETYPE_PEM, csr_pem)
# In pyopenssl 0.13 (used with TOXENV=py27-oldest), csr objects don't
# have a get_extensions() method, so we skip this test if the method
# isn't available.
if hasattr(csr, 'get_extensions'):
assert len(csr.get_extensions()) == 2
# NOTE: Ideally we would filter by the TLS Feature OID, but
# OpenSSL.crypto.X509Extension doesn't give us the extension's raw OID,
# and the shortname field is just "UNDEF"
must_staple_exts = [e for e in csr.get_extensions()
if e.get_data() == b"0\x03\x02\x01\x05"]
assert len(must_staple_exts) == 1, \
"Expected exactly one Must Staple extension"
csr = x509.load_pem_x509_csr(csr_pem)
assert len(csr.extensions) == 2
ext = csr.extensions.get_extension_for_class(x509.TLSFeature)
self.assertEqual(ext.value, x509.TLSFeature([x509.TLSFeatureType.status_request]))
def test_make_csr_without_hostname(self):
with pytest.raises(ValueError):
self._call_with_key()
def test_make_csr_correct_version(self):
csr_pem = self._call_with_key(["a.example"])
csr = OpenSSL.crypto.load_certificate_request(
OpenSSL.crypto.FILETYPE_PEM, csr_pem)
assert csr.get_version() == 0, \
"Expected CSR version to be v1 (encoded as 0), per RFC 2986, section 4"
def test_make_csr_with_invalid_private_key(self):
from cryptography.hazmat.primitives.asymmetric import dh
from cryptography.hazmat.primitives import serialization
dh_params = dh.generate_parameters(2, 1024)
priv_key = dh_params.generate_private_key()
priv_key_pem = priv_key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.PKCS8,
serialization.NoEncryption()
)
from acme.crypto_util import make_csr
return make_csr(priv_key_pem, ['a.example'])
class DumpPyopensslChainTest(unittest.TestCase):

View file

@ -479,13 +479,15 @@ class TLSALPN01Response(KeyAuthorizationChallengeResponse):
if len(names) != 1 or names[0].lower() != domain.lower():
return False
for i in range(cert.get_extension_count()):
ext = cert.get_extension(i)
# FIXME: assume this is the ACME extension. Currently there is no
# way to get full OID of an unknown extension from pyopenssl.
if ext.get_short_name() == b'UNDEF':
data = ext.get_data()
return data == self.h
crypto_cert = cert.to_cryptography()
oid = x509.ObjectIdentifier(self.ID_PE_ACME_IDENTIFIER_V1)
der_value = b"DER:" + codecs.encode(self.h, 'hex')
try:
ext = crypto_cert.extensions.get_extension_for_oid(oid)
if ext is not None:
return ext.value.value == der_value
except x509.ExtensionNotFound:
pass
return False

View file

@ -1,6 +1,7 @@
"""Crypto utilities."""
import binascii
import contextlib
from datetime import datetime
import ipaddress
import logging
import os
@ -15,7 +16,14 @@ from typing import Sequence
from typing import Set
from typing import Tuple
from typing import Union
import warnings
from cryptography import x509
from cryptography.hazmat.primitives.hashes import SHA256
from cryptography.hazmat.primitives.asymmetric.dsa import DSAPrivateKey
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
from cryptography.hazmat.primitives.serialization import Encoding
from cryptography.hazmat.primitives.serialization import load_pem_private_key
import josepy as jose
from OpenSSL import crypto
from OpenSSL import SSL
@ -237,10 +245,11 @@ def make_csr(private_key_pem: bytes, domains: Optional[Union[Set[str], List[str]
params ordered this way for backward competablity when called by positional argument.
:returns: buffer PEM-encoded Certificate Signing Request.
"""
private_key = crypto.load_privatekey(
crypto.FILETYPE_PEM, private_key_pem)
csr = crypto.X509Req()
sanlist = []
private_key = load_pem_private_key(private_key_pem, None)
builder = x509.CertificateSigningRequestBuilder()
# set an empty subject name
builder = builder.subject_name(x509.Name([]))
sanlist: List[x509.GeneralName] = []
# if domain or ip list not supplied make it empty list so it's easier to iterate
if domains is None:
domains = []
@ -249,32 +258,18 @@ def make_csr(private_key_pem: bytes, domains: Optional[Union[Set[str], List[str]
if len(domains)+len(ipaddrs) == 0:
raise ValueError("At least one of domains or ipaddrs parameter need to be not empty")
for address in domains:
sanlist.append('DNS:' + address)
sanlist.append(x509.DNSName(address))
for ips in ipaddrs:
sanlist.append('IP:' + ips.exploded)
# make sure its ascii encoded
san_string = ', '.join(sanlist).encode('ascii')
# for IP san it's actually need to be octet-string,
# but somewhere downsteam thankfully handle it for us
extensions = [
crypto.X509Extension(
b'subjectAltName',
critical=False,
value=san_string
),
]
sanlist.append(x509.IPAddress(ips))
builder = builder.add_extension(x509.SubjectAlternativeName(sanlist), critical=False)
if must_staple:
extensions.append(crypto.X509Extension(
b"1.3.6.1.5.5.7.1.24",
critical=False,
value=b"DER:30:03:02:01:05"))
csr.add_extensions(extensions)
csr.set_pubkey(private_key)
# RFC 2986 Section 4.1 only defines version 0
csr.set_version(0)
csr.sign(private_key, 'sha256')
return crypto.dump_certificate_request(
crypto.FILETYPE_PEM, csr)
builder = builder.add_extension(
x509.TLSFeature([x509.TLSFeatureType.status_request]),
critical=False
)
csr = builder.sign(private_key, SHA256())
return csr.public_bytes(Encoding.PEM)
def _pyopenssl_cert_or_req_all_names(loaded_cert_or_req: Union[crypto.X509, crypto.X509Req]
@ -366,6 +361,78 @@ def _pyopenssl_extract_san_list_raw(cert_or_req: Union[crypto.X509, crypto.X509R
return sans_parts
def make_self_signed_cert(key: crypto.PKey, domains: Optional[List[str]] = None,
not_before: Optional[int] = None,
validity: int = (7 * 24 * 60 * 60), force_san: bool = True,
extensions: Optional[List[x509.Extension[x509.ExtensionType]]] = None,
ips: Optional[List[Union[ipaddress.IPv4Address,
ipaddress.IPv6Address]]] = None
) -> crypto.X509:
"""Generate new self-signed certificate.
:type domains: `list` of `str`
:param OpenSSL.crypto.PKey key:
:param bool force_san:
:param extensions: List of additional extensions to include in the cert.
:type extensions: `list` of `x509.Extension[x509.ExtensionType]`
:type ips: `list` of (`ipaddress.IPv4Address` or `ipaddress.IPv6Address`)
If more than one domain is provided, all of the domains are put into
``subjectAltName`` X.509 extension and first domain is set as the
subject CN. If only one domain is provided no ``subjectAltName``
extension is used, unless `force_san` is ``True``.
"""
assert domains or ips, "Must provide one or more hostnames or IPs for the cert."
builder = x509.CertificateBuilder()
builder = builder.serial_number(int(binascii.hexlify(os.urandom(16)), 16))
if extensions is not None:
for ext in extensions:
builder = builder.add_extension(ext.value, ext.critical)
if domains is None:
domains = []
if ips is None:
ips = []
builder = builder.add_extension(x509.BasicConstraints(ca=True, path_length=0), critical=True)
name_attrs = []
if len(domains) > 0:
name_attrs.append(x509.NameAttribute(
x509.OID_COMMON_NAME,
domains[0]
))
builder = builder.subject_name(x509.Name(name_attrs))
builder = builder.issuer_name(x509.Name(name_attrs))
sanlist: List[x509.GeneralName] = []
for address in domains:
sanlist.append(x509.DNSName(address))
for ip in ips:
sanlist.append(x509.IPAddress(ip))
if force_san or len(domains) > 1 or len(ips) > 0:
builder = builder.add_extension(
x509.SubjectAlternativeName(sanlist),
critical=False
)
builder = builder.not_valid_before(datetime.fromtimestamp(
0 if not_before is None else not_before
))
builder = builder.not_valid_after(datetime.fromtimestamp(validity))
cryptography_priv = key.to_cryptography_key()
if not isinstance(cryptography_priv, (DSAPrivateKey, RSAPrivateKey)):
raise errors.Error("key must be a private key")
cryptography_pub = cryptography_priv.public_key()
builder = builder.public_key(cryptography_pub)
cryptography_cert = builder.sign(cryptography_priv, SHA256())
return crypto.X509.from_cryptography(cryptography_cert)
def gen_ss_cert(key: crypto.PKey, domains: Optional[List[str]] = None,
not_before: Optional[int] = None,
validity: int = (7 * 24 * 60 * 60), force_san: bool = True,
@ -386,7 +453,14 @@ def gen_ss_cert(key: crypto.PKey, domains: Optional[List[str]] = None,
subject CN. If only one domain is provided no ``subjectAltName``
extension is used, unless `force_san` is ``True``.
.. deprecated: 2.10.0
"""
warnings.warn(
"acme.crypto_util.gen_ss_cert is deprecated and will be removed in the "
"next major release of Certbot. Please use "
"acme.crypto_util.make_self_signed_cert instead.", DeprecationWarning,
stacklevel=2
)
assert domains or ips, "Must provide one or more hostnames or IPs for the cert."
cert = crypto.X509()

View file

@ -6,11 +6,13 @@ Certbot adheres to [Semantic Versioning](https://semver.org/).
### Added
*
* Added `acme.crypto_util.make_self_signed_cert` to replace `acme.crypto_util.gen_ss_cert`, which
has the same behavior but accepts `cryptography`'s `x509.Extension` types.
### Changed
*
* Deprecated `acme.crypto_util.gen_ss_cert` due to its use of deprecated types from `pyOpenSSL` and
will be removed in the next major release.
### Fixed