Allow using cryptography certs and keys in the standalone plugin (#10133)

Co-authored-by: Brad Warren <bmw@users.noreply.github.com>
This commit is contained in:
Alex Gaynor 2025-01-16 17:16:45 -05:00 committed by GitHub
parent ed972a130f
commit e050fe91a3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 48 additions and 34 deletions

View file

@ -11,6 +11,8 @@ from unittest import mock
import josepy as jose
import pytest
import requests
from cryptography import x509
from cryptography.hazmat.primitives import serialization
from acme import challenges
from acme import crypto_util
@ -116,13 +118,13 @@ class TLSALPN01ServerTest(unittest.TestCase):
def setUp(self):
self.certs = {b'localhost': (
test_util.load_pyopenssl_private_key('rsa2048_key.pem'),
test_util.load_cert('rsa2048_cert.pem'),
serialization.load_pem_private_key(test_util.load_vector('rsa2048_key.pem'), password=None),
x509.load_pem_x509_certificate(test_util.load_vector('rsa2048_cert.pem')),
)}
# Use different certificate for challenge.
self.challenge_certs = {b'localhost': (
test_util.load_pyopenssl_private_key('rsa4096_key.pem'),
test_util.load_cert('rsa4096_cert.pem'),
serialization.load_pem_private_key(test_util.load_vector('rsa4096_key.pem'), password=None),
x509.load_pem_x509_certificate(test_util.load_vector('rsa4096_cert.pem')),
)}
from acme.standalone import TLSALPN01Server
self.server = TLSALPN01Server(("localhost", 0), certs=self.certs,
@ -151,8 +153,7 @@ class TLSALPN01ServerTest(unittest.TestCase):
b'localhost', host=host, port=port, timeout=1,
alpn_protocols=[b"acme-tls/1"])
# Expect challenge cert when connecting with ALPN.
assert jose.ComparableX509(cert) == \
jose.ComparableX509(self.challenge_certs[b'localhost'][1])
assert cert.to_cryptography() == self.challenge_certs[b'localhost'][1]
def test_bad_alpn(self):
host, port = self.server.socket.getsockname()[:2]

View file

@ -20,7 +20,7 @@ from typing import Union
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import dsa, rsa, ec, ed25519, ed448
from cryptography.hazmat.primitives.asymmetric import dsa, rsa, ec, ed25519, ed448, types
import josepy as jose
from OpenSSL import crypto
from OpenSSL import SSL
@ -58,17 +58,22 @@ class Format(enum.IntEnum):
return serialization.Encoding.PEM
_KeyAndCert = Union[
Tuple[crypto.PKey, crypto.X509],
Tuple[types.CertificateIssuerPrivateKeyTypes, x509.Certificate],
]
class _DefaultCertSelection:
def __init__(self, certs: Mapping[bytes, Tuple[crypto.PKey, crypto.X509]]):
def __init__(self, certs: Mapping[bytes, _KeyAndCert]):
self.certs = certs
def __call__(self, connection: SSL.Connection) -> Optional[Tuple[crypto.PKey, crypto.X509]]:
def __call__(self, connection: SSL.Connection) -> Optional[_KeyAndCert]:
server_name = connection.get_servername()
if server_name:
return self.certs.get(server_name, None)
return None # pragma: no cover
class SSLSocket: # pylint: disable=too-few-public-methods
"""SSL wrapper for sockets.
@ -82,14 +87,19 @@ class SSLSocket: # pylint: disable=too-few-public-methods
`certs` parameter would be ignored, and therefore must be empty.
"""
def __init__(self, sock: socket.socket,
certs: Optional[Mapping[bytes, Tuple[crypto.PKey, crypto.X509]]] = None,
method: int = _DEFAULT_SSL_METHOD,
alpn_selection: Optional[Callable[[SSL.Connection, List[bytes]], bytes]] = None,
cert_selection: Optional[Callable[[SSL.Connection],
Optional[Tuple[crypto.PKey,
crypto.X509]]]] = None
) -> None:
def __init__(
self,
sock: socket.socket,
certs: Optional[Mapping[bytes, _KeyAndCert]] = None,
method: int = _DEFAULT_SSL_METHOD,
alpn_selection: Optional[Callable[[SSL.Connection, List[bytes]], bytes]] = None,
cert_selection: Optional[
Callable[
[SSL.Connection],
Optional[_KeyAndCert],
]
] = None,
) -> None:
self.sock = sock
self.alpn_selection = alpn_selection
self.method = method
@ -231,7 +241,7 @@ def probe_sni(name: bytes, host: bytes, port: int = 443, timeout: int = 300, #
client_ssl.set_connect_state()
client_ssl.set_tlsext_host_name(name) # pyOpenSSL>=0.13
if alpn_protocols is not None:
client_ssl.set_alpn_protos(alpn_protocols)
client_ssl.set_alpn_protos(list(alpn_protocols))
try:
client_ssl.do_handshake()
client_ssl.shutdown()

View file

@ -16,7 +16,6 @@ from typing import Set
from typing import Tuple
from typing import Type
from OpenSSL import crypto
from OpenSSL import SSL
from acme import challenges
@ -46,7 +45,7 @@ class TLSServer(socketserver.TCPServer):
method=self.method))
def _cert_selection(self, connection: SSL.Connection
) -> Optional[Tuple[crypto.PKey, crypto.X509]]: # pragma: no cover
) -> Optional[crypto_util._KeyAndCert]: # pragma: no cover
"""Callback selecting certificate for connection."""
server_name = connection.get_servername()
if server_name:
@ -152,8 +151,8 @@ class TLSALPN01Server(TLSServer, ACMEServerMixin):
ACME_TLS_1_PROTOCOL = b"acme-tls/1"
def __init__(self, server_address: Tuple[str, int],
certs: List[Tuple[crypto.PKey, crypto.X509]],
challenge_certs: Mapping[bytes, Tuple[crypto.PKey, crypto.X509]],
certs: List[crypto_util._KeyAndCert],
challenge_certs: Mapping[bytes, crypto_util._KeyAndCert],
ipv6: bool = False) -> None:
# We don't need to implement a request handler here because the work
# (including logging) is being done by wrapped socket set up in the
@ -163,8 +162,7 @@ class TLSALPN01Server(TLSServer, ACMEServerMixin):
ipv6=ipv6)
self.challenge_certs = challenge_certs
def _cert_selection(self, connection: SSL.Connection) -> Optional[Tuple[crypto.PKey,
crypto.X509]]:
def _cert_selection(self, connection: SSL.Connection) -> Optional[crypto_util._KeyAndCert]:
# TODO: We would like to serve challenge cert only if asked for it via
# ALPN. To do this, we need to retrieve the list of protos from client
# hello, but this is currently impossible with openssl [0], and ALPN

View file

@ -12,10 +12,9 @@ from typing import Mapping
from typing import Set
from typing import Tuple
from typing import Type
from typing import Union
from typing import TYPE_CHECKING
from OpenSSL import crypto
from acme import challenges
from acme import standalone as acme_standalone
from certbot import achallenges
@ -24,6 +23,10 @@ from certbot import interfaces
from certbot.display import util as display_util
from certbot.plugins import common
from cryptography import x509
from cryptography.hazmat.primitives.asymmetric import types
from OpenSSL import crypto
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
@ -32,6 +35,11 @@ if TYPE_CHECKING:
Set[achallenges.AnnotatedChallenge]
]
_KeyAndCert = Union[
Tuple[crypto.PKey, crypto.X509],
Tuple[types.CertificateIssuerPrivateKeyTypes, x509.Certificate],
]
class ServerManager:
"""Standalone servers manager.
@ -46,7 +54,7 @@ class ServerManager:
will serve the same URLs!
"""
def __init__(self, certs: Mapping[bytes, Tuple[crypto.PKey, crypto.X509]],
def __init__(self, certs: Mapping[bytes, _KeyAndCert],
http_01_resources: Set[acme_standalone.HTTP01RequestHandler.HTTP01Resource]
) -> None:
self._instances: Dict[int, acme_standalone.HTTP01DualNetworkedServers] = {}
@ -136,7 +144,7 @@ running. HTTP challenge only (wildcards not supported)."""
# values, main thread writes). Due to the nature of CPython's
# GIL, the operations are safe, c.f.
# https://docs.python.org/2/faq/library.html#what-kinds-of-global-value-mutation-are-thread-safe
self.certs: Mapping[bytes, Tuple[crypto.PKey, crypto.X509]] = {}
self.certs: Mapping[bytes, _KeyAndCert] = {}
self.http_01_resources: Set[acme_standalone.HTTP01RequestHandler.HTTP01Resource] = set()
self.servers = ServerManager(self.certs, self.http_01_resources)

View file

@ -9,7 +9,6 @@ import unittest
from unittest import mock
import josepy as jose
import OpenSSL.crypto
import pytest
from acme import challenges
@ -24,8 +23,8 @@ class ServerManagerTest(unittest.TestCase):
"""Tests for certbot._internal.plugins.standalone.ServerManager."""
def setUp(self):
from certbot._internal.plugins.standalone import ServerManager
self.certs: Dict[bytes, Tuple[OpenSSL.crypto.PKey, OpenSSL.crypto.X509]] = {}
from certbot._internal.plugins.standalone import ServerManager, _KeyAndCert
self.certs: Dict[bytes, _KeyAndCert] = {}
self.http_01_resources: Set[acme_standalone.HTTP01RequestHandler.HTTP01Resource] = {}
self.mgr = ServerManager(self.certs, self.http_01_resources)

View file

@ -77,7 +77,6 @@ test_extras = [
'setuptools',
'tox',
'types-httplib2',
'types-pyOpenSSL',
'types-pyRFC3339',
'types-pytz',
'types-pywin32',

View file

@ -165,7 +165,6 @@ trove-classifiers==2025.1.10.15 ; python_version >= "3.9" and python_version < "
twine==6.0.1 ; python_version >= "3.9" and python_version < "4.0"
types-cffi==1.16.0.20241221 ; python_version >= "3.9" and python_version < "4.0"
types-httplib2==0.22.0.20241221 ; python_version >= "3.9" and python_version < "4.0"
types-pyopenssl==24.1.0.20240722 ; python_version >= "3.9" and python_version < "4.0"
types-pyrfc3339==2.0.1.20241107 ; python_version >= "3.9" and python_version < "4.0"
types-python-dateutil==2.9.0.20241206 ; python_version >= "3.9" and python_version < "4.0"
types-pytz==2024.2.0.20241221 ; python_version >= "3.9" and python_version < "4.0"