acme: deprecate gen_ss_cert in favor of make_self_signed_cert (#10097)

gen_ss_cert()'s signature contains deprecated pyOpenSSL API, so here we
deprecate it in favor of a new function that does the same thing, except
with only cryptography types: make_self_signed_cert
This commit is contained in:
Will Greenberg 2025-01-15 18:38:10 -08:00 committed by GitHub
parent 680729655e
commit 7e87acee3c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 264 additions and 51 deletions

View file

@ -8,6 +8,8 @@ import threading
import time
from typing import List
import unittest
from unittest import mock
import warnings
import josepy as jose
import OpenSSL
@ -187,6 +189,92 @@ class PyOpenSSLCertOrReqSANTest(unittest.TestCase):
['chicago-cubs.venafi.example', 'cubs.venafi.example']
class GenMakeSelfSignedCertTest(unittest.TestCase):
"""Test for make_self_signed_cert."""
def setUp(self):
self.cert_count = 5
self.serial_num: List[int] = []
self.privkey = rsa.generate_private_key(public_exponent=65537, key_size=2048)
def test_sn_collisions(self):
from acme.crypto_util import make_self_signed_cert
for _ in range(self.cert_count):
cert = make_self_signed_cert(self.privkey, ['dummy'], force_san=True,
ips=[ipaddress.ip_address("10.10.10.10")])
self.serial_num.append(cert.serial_number)
assert len(set(self.serial_num)) >= self.cert_count
def test_no_ips(self):
from acme.crypto_util import make_self_signed_cert
cert = make_self_signed_cert(self.privkey, ['dummy'])
@mock.patch("acme.crypto_util._now")
def test_expiry_times(self, mock_now):
from acme.crypto_util import make_self_signed_cert
from datetime import datetime, timedelta, timezone
not_before = 1736200830
validity = 100
not_before_dt = datetime.fromtimestamp(not_before)
validity_td = timedelta(validity)
not_after_dt = not_before_dt + validity_td
cert = make_self_signed_cert(
self.privkey,
['dummy'],
not_before=not_before_dt,
validity=validity_td,
)
# TODO: This should be `not_valid_before_utc` once we raise the minimum
# cryptography version.
# https://github.com/certbot/certbot/issues/10105
with warnings.catch_warnings():
warnings.filterwarnings(
'ignore',
message='Properties that return.*datetime object'
)
self.assertEqual(cert.not_valid_before, not_before_dt)
self.assertEqual(cert.not_valid_after, not_after_dt)
now = not_before + 1
now_dt = datetime.fromtimestamp(now)
mock_now.return_value = now_dt.replace(tzinfo=timezone.utc)
valid_after_now_dt = now_dt + validity_td
cert = make_self_signed_cert(
self.privkey,
['dummy'],
validity=validity_td,
)
with warnings.catch_warnings():
warnings.filterwarnings(
'ignore',
message='Properties that return.*datetime object'
)
self.assertEqual(cert.not_valid_before, now_dt)
self.assertEqual(cert.not_valid_after, valid_after_now_dt)
def test_no_name(self):
from acme.crypto_util import make_self_signed_cert
with pytest.raises(AssertionError):
make_self_signed_cert(self.privkey, ips=[ipaddress.ip_address("1.1.1.1")])
make_self_signed_cert(self.privkey)
def test_extensions(self):
from acme.crypto_util import make_self_signed_cert
extension_type = x509.TLSFeature([x509.TLSFeatureType.status_request])
extension = x509.Extension(
x509.TLSFeature.oid,
False,
extension_type
)
cert = make_self_signed_cert(
self.privkey,
ips=[ipaddress.ip_address("1.1.1.1")],
extensions=[extension]
)
self.assertIn(extension, cert.extensions)
class GenSsCertTest(unittest.TestCase):
"""Test for gen_ss_cert (generation of self-signed cert)."""
@ -199,18 +287,27 @@ class GenSsCertTest(unittest.TestCase):
def test_sn_collisions(self):
from acme.crypto_util import gen_ss_cert
for _ in range(self.cert_count):
cert = gen_ss_cert(self.key, ['dummy'], force_san=True,
ips=[ipaddress.ip_address("10.10.10.10")])
self.serial_num.append(cert.get_serial_number())
assert len(set(self.serial_num)) >= self.cert_count
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
for _ in range(self.cert_count):
cert = gen_ss_cert(self.key, ['dummy'], force_san=True,
ips=[ipaddress.ip_address("10.10.10.10")])
self.serial_num.append(cert.get_serial_number())
assert len(set(self.serial_num)) >= self.cert_count
def test_no_name(self):
from acme.crypto_util import gen_ss_cert
with pytest.raises(AssertionError):
gen_ss_cert(self.key, ips=[ipaddress.ip_address("1.1.1.1")])
gen_ss_cert(self.key)
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
with pytest.raises(AssertionError):
gen_ss_cert(self.key, ips=[ipaddress.ip_address("1.1.1.1")])
gen_ss_cert(self.key)
def test_no_ips(self):
from acme.crypto_util import gen_ss_cert
with warnings.catch_warnings():
warnings.simplefilter("ignore", DeprecationWarning)
gen_ss_cert(self.key, ['dummy'])
class MakeCSRTest(unittest.TestCase):

View file

@ -1,6 +1,5 @@
"""ACME Identifier Validation Challenges."""
import abc
import codecs
import functools
import hashlib
import logging
@ -436,12 +435,22 @@ class TLSALPN01Response(KeyAuthorizationChallengeResponse):
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, bits)
der_value = b"DER:" + codecs.encode(self.h, 'hex')
acme_extension = crypto.X509Extension(self.ID_PE_ACME_IDENTIFIER_V1,
critical=True, value=der_value)
oid = x509.ObjectIdentifier(self.ID_PE_ACME_IDENTIFIER_V1.decode())
acme_extension = x509.Extension(
oid,
critical=True,
value=x509.UnrecognizedExtension(oid, self.h)
)
return crypto_util.gen_ss_cert(key, [domain], force_san=True,
extensions=[acme_extension]), key
cryptography_key = key.to_cryptography_key()
assert isinstance(cryptography_key, crypto_util.CertificateIssuerPrivateKeyTypesTpl)
cert = crypto_util.make_self_signed_cert(
cryptography_key,
[domain],
force_san=True,
extensions=[acme_extension]
)
return crypto.X509.from_cryptography(cert), key
def probe_cert(self, domain: str, host: Optional[str] = None,
port: Optional[int] = None) -> crypto.X509:

View file

@ -2,6 +2,7 @@
import binascii
import contextlib
import enum
from datetime import datetime, timedelta, timezone
import ipaddress
import logging
import os
@ -25,6 +26,7 @@ from OpenSSL import crypto
from OpenSSL import SSL
from acme import errors
import warnings
logger = logging.getLogger(__name__)
@ -240,6 +242,35 @@ def probe_sni(name: bytes, host: bytes, port: int = 443, timeout: int = 300, #
return cert
# Annoyingly, we can't directly use cryptography's equivalent Union[] type for
# our type signatures since they're only public API in 40.0.x+, which is too new
# for some Certbot # distribution channels. Once we bump our oldest cryptography
# version past 40.0.x, usage of this type can be replaced with:
# cryptography.hazmat.primitives.asymmetric.types.CertificateIssuerPrivateKeyTypes
CertificateIssuerPrivateKeyTypes = Union[
dsa.DSAPrivateKey,
rsa.RSAPrivateKey,
ec.EllipticCurvePrivateKey,
ed25519.Ed25519PrivateKey,
ed448.Ed448PrivateKey,
]
# Even *more* annoyingly, due to a mypy bug, we can't use Union[] types in
# isinstance expressions without causing false mypy errors. So we have to
# recreate the type collection as a tuple here. And no, typing.get_args doesn't
# work due to another mypy bug.
#
# mypy issues:
# * https://github.com/python/mypy/issues/17680
# * https://github.com/python/mypy/issues/15106
CertificateIssuerPrivateKeyTypesTpl = (
dsa.DSAPrivateKey,
rsa.RSAPrivateKey,
ec.EllipticCurvePrivateKey,
ed25519.Ed25519PrivateKey,
ed448.Ed448PrivateKey,
)
def make_csr(
private_key_pem: bytes,
domains: Optional[Union[Set[str], List[str]]] = None,
@ -262,18 +293,7 @@ def make_csr(
"""
private_key = serialization.load_pem_private_key(private_key_pem, password=None)
# There are a few things that aren't valid for x509 signing. mypy
# complains if we don't check.
if not isinstance(
private_key,
(
dsa.DSAPrivateKey,
rsa.RSAPrivateKey,
ec.EllipticCurvePrivateKey,
ed25519.Ed25519PrivateKey,
ed448.Ed448PrivateKey,
),
):
if not isinstance(private_key, CertificateIssuerPrivateKeyTypesTpl):
raise ValueError(f"Invalid private key type: {type(private_key)}")
if domains is None:
domains = []
@ -371,6 +391,85 @@ def _pyopenssl_cert_or_req_san(cert_or_req: Union[crypto.X509, crypto.X509Req])
return san_ext.value.get_values_for_type(x509.DNSName)
# Helper function that can be mocked in unit tests
def _now() -> datetime:
return datetime.now(tz=timezone.utc)
def make_self_signed_cert(private_key: CertificateIssuerPrivateKeyTypes,
domains: Optional[List[str]] = None,
not_before: Optional[datetime] = None,
validity: Optional[timedelta] = None, force_san: bool = True,
extensions: Optional[List[x509.Extension]] = None,
ips: Optional[List[Union[ipaddress.IPv4Address,
ipaddress.IPv6Address]]] = None
) -> x509.Certificate:
"""Generate new self-signed certificate.
:param buffer private_key_pem: Private key, in PEM PKCS#8 format.
:type domains: `list` of `str`
:param int not_before: A datetime after which the cert is valid. If no
timezone is specified, UTC is assumed
:type not_before: `datetime.datetime`
:param validity: Duration for which the cert will be valid. Defaults to 1
week
:type validity: `datetime.timedelta`
:param buffer private_key_pem: One of `CertificateIssuerPrivateKeyTypes`
:param bool force_san:
:param extensions: List of additional extensions to include in the cert.
:type extensions: `list` of `x509.Extension[x509.ExtensionType]`
:type ips: `list` of (`ipaddress.IPv4Address` or `ipaddress.IPv6Address`)
If more than one domain is provided, all of the domains are put into
``subjectAltName`` X.509 extension and first domain is set as the
subject CN. If only one domain is provided no ``subjectAltName``
extension is used, unless `force_san` is ``True``.
"""
assert domains or ips, "Must provide one or more hostnames or IPs for the cert."
builder = x509.CertificateBuilder()
builder = builder.serial_number(x509.random_serial_number())
if extensions is not None:
for ext in extensions:
builder = builder.add_extension(ext.value, ext.critical)
if domains is None:
domains = []
if ips is None:
ips = []
builder = builder.add_extension(x509.BasicConstraints(ca=True, path_length=0), critical=True)
name_attrs = []
if len(domains) > 0:
name_attrs.append(x509.NameAttribute(
x509.OID_COMMON_NAME,
domains[0]
))
builder = builder.subject_name(x509.Name(name_attrs))
builder = builder.issuer_name(x509.Name(name_attrs))
sanlist: List[x509.GeneralName] = []
for address in domains:
sanlist.append(x509.DNSName(address))
for ip in ips:
sanlist.append(x509.IPAddress(ip))
if force_san or len(domains) > 1 or len(ips) > 0:
builder = builder.add_extension(
x509.SubjectAlternativeName(sanlist),
critical=False
)
if not_before is None:
not_before = _now()
if validity is None:
validity = timedelta(seconds=7 * 24 * 60 * 60)
builder = builder.not_valid_before(not_before)
builder = builder.not_valid_after(not_before + validity)
public_key = private_key.public_key()
builder = builder.public_key(public_key)
return builder.sign(private_key, hashes.SHA256())
def gen_ss_cert(key: crypto.PKey, domains: Optional[List[str]] = None,
not_before: Optional[int] = None,
validity: int = (7 * 24 * 60 * 60), force_san: bool = True,
@ -391,7 +490,14 @@ def gen_ss_cert(key: crypto.PKey, domains: Optional[List[str]] = None,
subject CN. If only one domain is provided no ``subjectAltName``
extension is used, unless `force_san` is ``True``.
.. deprecated: 2.10.0
"""
warnings.warn(
"acme.crypto_util.gen_ss_cert is deprecated and will be removed in the "
"next major release of Certbot. Please use "
"acme.crypto_util.make_self_signed_cert instead.", DeprecationWarning,
stacklevel=2
)
assert domains or ips, "Must provide one or more hostnames or IPs for the cert."
cert = crypto.X509()

View file

@ -147,7 +147,7 @@ def test_installer(args: argparse.Namespace, plugin: common.Proxy, config: str,
def test_deploy_cert(plugin: common.Proxy, temp_dir: str, domains: List[str]) -> bool:
"""Tests deploy_cert returning True if the tests are successful"""
cert = crypto_util.gen_ss_cert(util.KEY, domains).to_cryptography()
cert = crypto_util.make_self_signed_cert(util.KEY, domains)
cert_path = os.path.join(temp_dir, "cert.pem")
with open(cert_path, "wb") as f:
f.write(cert.public_bytes(serialization.Encoding.PEM))

View file

@ -14,8 +14,8 @@ from certbot_compatibility_test import errors
_KEY_BASE = "rsa2048_key.pem"
KEY_PATH = test_util.vector_path(_KEY_BASE)
KEY = test_util.load_pyopenssl_private_key(_KEY_BASE)
JWK = jose.JWKRSA(key=test_util.load_rsa_private_key(_KEY_BASE))
KEY = test_util.load_rsa_private_key_pem(_KEY_BASE)
JWK = jose.JWKRSA(key=test_util.load_jose_rsa_private_key_pem(_KEY_BASE))
IP_REGEX = re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$")

View file

@ -34,12 +34,12 @@ class Validator:
name = name if isinstance(name, bytes) else name.encode()
try:
presented_cert = crypto_util.probe_sni(name, host, port)
presented_cert = crypto_util.probe_sni(name, host, port).to_cryptography()
except acme_errors.Error as error:
logger.exception(str(error))
return False
return presented_cert.to_cryptography() == cert
return presented_cert == cert
def redirect(self, name: str, port: int = 80,
headers: Optional[Mapping[str, str]] = None) -> bool:

View file

@ -22,7 +22,8 @@ from typing import Tuple
from typing import Type
from typing import Union
import OpenSSL
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from acme import challenges
from acme import crypto_util as acme_crypto_util
@ -696,14 +697,16 @@ class NginxConfigurator(common.Configurator):
# TODO: generate only once
tmp_dir = os.path.join(self.config.work_dir, "snakeoil")
le_key = crypto_util.generate_key(
key_size=2048, key_dir=tmp_dir, keyname="key.pem",
key_type='rsa', key_size=2048, key_dir=tmp_dir, keyname="key.pem",
strict_permissions=self.config.strict_permissions)
assert le_key.file is not None
key = OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, le_key.pem)
cert = acme_crypto_util.gen_ss_cert(key, domains=[socket.gethostname()])
cert_pem = OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, cert)
cryptography_key = serialization.load_pem_private_key(le_key.pem, password=None)
assert isinstance(cryptography_key, rsa.RSAPrivateKey)
cert = acme_crypto_util.make_self_signed_cert(
cryptography_key,
domains=[socket.gethostname()]
)
cert_pem = cert.public_bytes(serialization.Encoding.PEM)
cert_file, cert_path = util.unique_file(
os.path.join(tmp_dir, "cert.pem"), mode="wb")
with cert_file:

View file

@ -12,7 +12,7 @@ from certbot._internal import auth_handler
from certbot.tests import util
JWK = jose.JWK.load(util.load_vector('rsa512_key.pem'))
KEY = util.load_rsa_private_key('rsa512_key.pem')
KEY = util.load_jose_rsa_private_key_pem('rsa512_key.pem')
# Challenges
HTTP01 = challenges.HTTP01(

View file

@ -126,7 +126,12 @@ def load_comparable_csr(*names: str) -> jose.ComparableX509:
return jose.ComparableX509(load_csr(*names))
def load_rsa_private_key(*names: str) -> jose.ComparableRSAKey:
def load_jose_rsa_private_key_pem(*names: str) -> jose.ComparableRSAKey:
"""Load RSA private key wrapped in jose.ComparableRSAKey"""
return jose.ComparableRSAKey(load_rsa_private_key_pem(*names))
def load_rsa_private_key_pem(*names: str) -> RSAPrivateKey:
"""Load RSA private key."""
loader = _guess_loader(names[-1], crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1)
loader_fn: Callable[..., Any]
@ -134,16 +139,9 @@ def load_rsa_private_key(*names: str) -> jose.ComparableRSAKey:
loader_fn = serialization.load_pem_private_key
else:
loader_fn = serialization.load_der_private_key
return jose.ComparableRSAKey(
cast(RSAPrivateKey,
loader_fn(load_vector(*names), password=None, backend=default_backend())))
def load_pyopenssl_private_key(*names: str) -> crypto.PKey:
"""Load pyOpenSSL private key."""
loader = _guess_loader(
names[-1], crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1)
return crypto.load_privatekey(loader, load_vector(*names))
key = loader_fn(load_vector(*names), password=None, backend=default_backend())
assert isinstance(key, RSAPrivateKey)
return key
def make_lineage(config_dir: str, testfile: str, ec: bool = True) -> str: