mirror of
https://github.com/certbot/certbot.git
synced 2026-06-08 00:02:14 -04:00
Remove SSLSocket and probe_sni (#10395)
Fixes https://github.com/certbot/certbot/issues/10381. Also cleans out lingering unused OpenSSL references that I happened to notice. I can move these to a different PR if requested. Fixes https://github.com/certbot/certbot/issues/10079 as well. ``` $ git grep PKey $ git grep "crypto.x509" $ ``` towncrier draft: ``` ## 99.99.0 - 2025-08-05 ### Changed - Removed final instances of pyopenssl x509 and PKey objects * Removed `acme.crypto_util.SSLSocket` * Removed `acme.crypto_util.probe_sni` ([#10079](https://github.com/certbot/certbot/issues/10079), [#10381](https://github.com/certbot/certbot/issues/10381)) - Removed a number of deprecated classes/interfaces * Removed `acme.challenges.TLSALPN01Response` * Removed `acme.challenges.TLSALPN01` * Removed `acme.standalone.TLSServer` * Removed `acme.standalone.TLSALPN01Server` ([#10274](https://github.com/certbot/certbot/issues/10274)) ```
This commit is contained in:
parent
15a145ac3f
commit
f39a584db4
7 changed files with 10 additions and 310 deletions
|
|
@ -6,7 +6,6 @@ import urllib.parse as urllib_parse
|
|||
|
||||
import josepy as jose
|
||||
from josepy.jwk import JWKEC
|
||||
import OpenSSL
|
||||
import pytest
|
||||
import requests
|
||||
|
||||
|
|
|
|||
|
|
@ -27,73 +27,6 @@ class FormatTest(unittest.TestCase):
|
|||
assert Format.PEM.to_cryptography_encoding() == serialization.Encoding.PEM
|
||||
|
||||
|
||||
class SSLSocketAndProbeSNITest(unittest.TestCase):
|
||||
"""Tests for acme.crypto_util.SSLSocket/probe_sni."""
|
||||
|
||||
def setUp(self):
|
||||
self.cert = test_util.load_cert('rsa2048_cert.pem')
|
||||
key = test_util.load_pyopenssl_private_key('rsa2048_key.pem')
|
||||
# pylint: disable=protected-access
|
||||
certs = {b'foo': (key, self.cert)}
|
||||
|
||||
from acme.crypto_util import SSLSocket
|
||||
|
||||
class _TestServer(socketserver.TCPServer):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.socket = SSLSocket(self.socket, certs)
|
||||
|
||||
self.server = _TestServer(('', 0), socketserver.BaseRequestHandler)
|
||||
self.port = self.server.socket.getsockname()[1]
|
||||
self.server_thread = threading.Thread(
|
||||
target=self.server.handle_request)
|
||||
|
||||
def tearDown(self):
|
||||
if self.server_thread.is_alive():
|
||||
# The thread may have already terminated.
|
||||
self.server_thread.join() # pragma: no cover
|
||||
self.server.server_close()
|
||||
|
||||
def _probe(self, name):
|
||||
from acme.crypto_util import probe_sni
|
||||
return probe_sni(name, host='127.0.0.1', port=self.port)
|
||||
|
||||
def _start_server(self):
|
||||
self.server_thread.start()
|
||||
time.sleep(1) # TODO: avoid race conditions in other way
|
||||
|
||||
def test_probe_ok(self):
|
||||
self._start_server()
|
||||
assert self.cert == self._probe(b'foo')
|
||||
|
||||
def test_probe_not_recognized_name(self):
|
||||
self._start_server()
|
||||
with pytest.raises(errors.Error):
|
||||
self._probe(b'bar')
|
||||
|
||||
def test_probe_connection_error(self):
|
||||
self.server.server_close()
|
||||
original_timeout = socket.getdefaulttimeout()
|
||||
try:
|
||||
socket.setdefaulttimeout(1)
|
||||
with pytest.raises(errors.Error):
|
||||
self._probe(b'bar')
|
||||
finally:
|
||||
socket.setdefaulttimeout(original_timeout)
|
||||
|
||||
|
||||
class SSLSocketTest(unittest.TestCase):
|
||||
"""Tests for acme.crypto_util.SSLSocket."""
|
||||
|
||||
def test_ssl_socket_invalid_arguments(self):
|
||||
from acme.crypto_util import SSLSocket
|
||||
with pytest.raises(ValueError):
|
||||
_ = SSLSocket(None, {'sni': ('key', 'cert')},
|
||||
cert_selection=lambda _: None)
|
||||
with pytest.raises(ValueError):
|
||||
_ = SSLSocket(None)
|
||||
|
||||
|
||||
class MiscTests(unittest.TestCase):
|
||||
|
||||
def test_dump_cryptography_chain(self):
|
||||
|
|
|
|||
|
|
@ -12,7 +12,6 @@ from cryptography.hazmat.backends import default_backend
|
|||
from cryptography.hazmat.primitives import serialization
|
||||
import josepy as jose
|
||||
from josepy.util import ComparableECKey
|
||||
from OpenSSL import crypto
|
||||
|
||||
|
||||
def load_vector(*names):
|
||||
|
|
@ -32,14 +31,6 @@ def _guess_loader(filename: str, loader_pem: Callable, loader_der: Callable) ->
|
|||
raise ValueError("Loader could not be recognized based on extension")
|
||||
|
||||
|
||||
def _guess_pyopenssl_loader(filename: str, loader_pem: int, loader_der: int) -> int:
|
||||
_, ext = os.path.splitext(filename)
|
||||
if ext.lower() == ".pem":
|
||||
return loader_pem
|
||||
else: # pragma: no cover
|
||||
raise ValueError("Loader could not be recognized based on extension")
|
||||
|
||||
|
||||
def load_cert(*names: str) -> x509.Certificate:
|
||||
"""Load certificate."""
|
||||
loader = _guess_loader(
|
||||
|
|
@ -68,10 +59,3 @@ def load_ecdsa_private_key(*names):
|
|||
serialization.load_der_private_key)
|
||||
return ComparableECKey(loader(
|
||||
load_vector(*names), password=None, backend=default_backend()))
|
||||
|
||||
|
||||
def load_pyopenssl_private_key(*names):
|
||||
"""Load pyOpenSSL private key."""
|
||||
loader = _guess_pyopenssl_loader(
|
||||
names[-1], crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1)
|
||||
return crypto.load_privatekey(loader, load_vector(*names))
|
||||
|
|
|
|||
|
|
@ -1,42 +1,23 @@
|
|||
"""Crypto utilities."""
|
||||
import contextlib
|
||||
import enum
|
||||
from datetime import datetime, timedelta, timezone
|
||||
import ipaddress
|
||||
import logging
|
||||
import socket
|
||||
import typing
|
||||
from typing import Any
|
||||
from typing import Callable
|
||||
from typing import List
|
||||
from typing import Literal
|
||||
from typing import Mapping
|
||||
from typing import Optional
|
||||
from typing import Set
|
||||
from typing import Tuple
|
||||
from typing import Union
|
||||
import warnings
|
||||
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.primitives import hashes, serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import dsa, rsa, ec, ed25519, ed448, types
|
||||
from cryptography.hazmat.primitives.serialization import Encoding
|
||||
from OpenSSL import crypto
|
||||
from OpenSSL import SSL
|
||||
|
||||
from acme import errors
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Default SSL method selected here is the most compatible, while secure
|
||||
# SSL method: TLSv1_METHOD is only compatible with
|
||||
# TLSv1_METHOD, while TLS_method is compatible with all other
|
||||
# methods, including TLSv2_METHOD (read more at
|
||||
# https://docs.openssl.org/master/man3/SSL_CTX_new/#notes). _serve_sni
|
||||
# should be changed to use "set_options" to disable SSLv2 and SSLv3,
|
||||
# in case it's used for things other than probing/serving!
|
||||
_DEFAULT_SSL_METHOD = SSL.TLS_METHOD
|
||||
|
||||
|
||||
class Format(enum.IntEnum):
|
||||
"""File format to be used when parsing or serializing X.509 structures.
|
||||
|
|
@ -56,198 +37,6 @@ class Format(enum.IntEnum):
|
|||
return Encoding.PEM
|
||||
|
||||
|
||||
_KeyAndCert = Union[
|
||||
Tuple[crypto.PKey, crypto.X509],
|
||||
Tuple[types.CertificateIssuerPrivateKeyTypes, x509.Certificate],
|
||||
]
|
||||
|
||||
|
||||
class _DefaultCertSelection:
|
||||
def __init__(self, certs: Mapping[bytes, _KeyAndCert]):
|
||||
self.certs = certs
|
||||
|
||||
def __call__(self, connection: SSL.Connection) -> Optional[_KeyAndCert]:
|
||||
server_name = connection.get_servername()
|
||||
if server_name:
|
||||
return self.certs.get(server_name, None)
|
||||
return None # pragma: no cover
|
||||
|
||||
|
||||
class SSLSocket: # pylint: disable=too-few-public-methods
|
||||
"""SSL wrapper for sockets.
|
||||
|
||||
:ivar socket sock: Original wrapped socket.
|
||||
:ivar dict certs: Mapping from domain names (`bytes`) to
|
||||
`OpenSSL.crypto.X509`.
|
||||
:ivar method: See `OpenSSL.SSL.Context` for allowed values.
|
||||
:ivar cert_selection: Hook to select certificate for connection. If given,
|
||||
`certs` parameter would be ignored, and therefore must be empty.
|
||||
|
||||
"""
|
||||
def __init__(
|
||||
self,
|
||||
sock: socket.socket,
|
||||
certs: Optional[Mapping[bytes, _KeyAndCert]] = None,
|
||||
method: int = _DEFAULT_SSL_METHOD,
|
||||
cert_selection: Optional[
|
||||
Callable[
|
||||
[SSL.Connection],
|
||||
Optional[_KeyAndCert],
|
||||
]
|
||||
] = None,
|
||||
) -> None:
|
||||
warnings.warn("SSLSocket is deprecated and will be removed in an upcoming release",
|
||||
DeprecationWarning)
|
||||
self.sock = sock
|
||||
self.method = method
|
||||
if not cert_selection and not certs:
|
||||
raise ValueError("Neither cert_selection or certs specified.")
|
||||
if cert_selection and certs:
|
||||
raise ValueError("Both cert_selection and certs specified.")
|
||||
if cert_selection is None:
|
||||
cert_selection = _DefaultCertSelection(certs if certs else {})
|
||||
self.cert_selection = cert_selection
|
||||
|
||||
def __getattr__(self, name: str) -> Any:
|
||||
return getattr(self.sock, name)
|
||||
|
||||
def _pick_certificate_cb(self, connection: SSL.Connection) -> None:
|
||||
"""SNI certificate callback.
|
||||
|
||||
This method will set a new OpenSSL context object for this
|
||||
connection when an incoming connection provides an SNI name
|
||||
(in order to serve the appropriate certificate, if any).
|
||||
|
||||
:param connection: The TLS connection object on which the SNI
|
||||
extension was received.
|
||||
:type connection: :class:`OpenSSL.Connection`
|
||||
|
||||
"""
|
||||
pair = self.cert_selection(connection)
|
||||
if pair is None:
|
||||
logger.debug("Certificate selection for server name %s failed, dropping SSL",
|
||||
connection.get_servername())
|
||||
return
|
||||
key, cert = pair
|
||||
new_context = SSL.Context(self.method)
|
||||
new_context.set_min_proto_version(SSL.TLS1_2_VERSION)
|
||||
new_context.use_privatekey(key)
|
||||
if isinstance(cert, x509.Certificate):
|
||||
cert = crypto.X509.from_cryptography(cert)
|
||||
new_context.use_certificate(cert)
|
||||
connection.set_context(new_context)
|
||||
|
||||
class FakeConnection:
|
||||
"""Fake OpenSSL.SSL.Connection."""
|
||||
|
||||
# pylint: disable=missing-function-docstring
|
||||
|
||||
def __init__(self, connection: SSL.Connection) -> None:
|
||||
self._wrapped = connection
|
||||
|
||||
def __getattr__(self, name: str) -> Any:
|
||||
return getattr(self._wrapped, name)
|
||||
|
||||
def shutdown(self, *unused_args: Any) -> bool:
|
||||
# OpenSSL.SSL.Connection.shutdown doesn't accept any args
|
||||
try:
|
||||
return self._wrapped.shutdown()
|
||||
except SSL.Error as error: # pragma: no cover
|
||||
# We wrap the error so we raise the same error type as sockets
|
||||
# in the standard library. This is useful when this object is
|
||||
# used by code which expects a standard socket such as
|
||||
# socketserver in the standard library.
|
||||
#
|
||||
# We don't track code coverage in this "except" branch to avoid spurious CI failures
|
||||
# caused by missing test coverage. These aren't worth fixing because this entire
|
||||
# class has been deprecated. See https://github.com/certbot/certbot/issues/10284.
|
||||
raise OSError(error)
|
||||
|
||||
def accept(self) -> Tuple[FakeConnection, Any]: # pylint: disable=missing-function-docstring
|
||||
sock, addr = self.sock.accept()
|
||||
|
||||
try:
|
||||
context = SSL.Context(self.method)
|
||||
context.set_options(SSL.OP_NO_SSLv2)
|
||||
context.set_options(SSL.OP_NO_SSLv3)
|
||||
context.set_tlsext_servername_callback(self._pick_certificate_cb)
|
||||
|
||||
ssl_sock = self.FakeConnection(SSL.Connection(context, sock))
|
||||
ssl_sock.set_accept_state()
|
||||
|
||||
# This log line is especially desirable because without it requests to
|
||||
# our standalone TLSALPN server would not be logged.
|
||||
logger.debug("Performing handshake with %s", addr)
|
||||
try:
|
||||
ssl_sock.do_handshake()
|
||||
except SSL.Error as error:
|
||||
# _pick_certificate_cb might have returned without
|
||||
# creating SSL context (wrong server name)
|
||||
raise OSError(error)
|
||||
|
||||
return ssl_sock, addr
|
||||
except:
|
||||
# If we encounter any error, close the new socket before reraising
|
||||
# the exception.
|
||||
sock.close()
|
||||
raise
|
||||
|
||||
|
||||
def probe_sni(name: bytes, host: bytes, port: int = 443, timeout: int = 300, # pylint: disable=too-many-arguments
|
||||
method: int = _DEFAULT_SSL_METHOD,
|
||||
source_address: Tuple[str, int] = ('', 0)) -> x509.Certificate:
|
||||
"""Probe SNI server for SSL certificate.
|
||||
|
||||
:param bytes name: Byte string to send as the server name in the
|
||||
client hello message.
|
||||
:param bytes host: Host to connect to.
|
||||
:param int port: Port to connect to.
|
||||
:param int timeout: Timeout in seconds.
|
||||
:param method: See `OpenSSL.SSL.Context` for allowed values.
|
||||
:param tuple source_address: Enables multi-path probing (selection
|
||||
of source interface). See `socket.creation_connection` for more
|
||||
info. Available only in Python 2.7+.
|
||||
|
||||
:raises acme.errors.Error: In case of any problems.
|
||||
|
||||
:returns: SSL certificate presented by the server.
|
||||
:rtype: cryptography.x509.Certificate
|
||||
|
||||
"""
|
||||
warnings.warn("probe_sni is deprecated and will be removed in an upcoming release",
|
||||
DeprecationWarning)
|
||||
context = SSL.Context(method)
|
||||
context.set_timeout(timeout)
|
||||
|
||||
socket_kwargs = {'source_address': source_address}
|
||||
|
||||
try:
|
||||
logger.debug(
|
||||
"Attempting to connect to %s:%d%s.", host, port,
|
||||
" from {0}:{1}".format(
|
||||
source_address[0],
|
||||
source_address[1]
|
||||
) if any(source_address) else ""
|
||||
)
|
||||
socket_tuple: Tuple[bytes, int] = (host, port)
|
||||
sock = socket.create_connection(socket_tuple, **socket_kwargs) # type: ignore[arg-type]
|
||||
except OSError as error:
|
||||
raise errors.Error(error)
|
||||
|
||||
with contextlib.closing(sock) as client:
|
||||
client_ssl = SSL.Connection(context, client)
|
||||
client_ssl.set_connect_state()
|
||||
client_ssl.set_tlsext_host_name(name) # pyOpenSSL>=0.13
|
||||
try:
|
||||
client_ssl.do_handshake()
|
||||
client_ssl.shutdown()
|
||||
except SSL.Error as error:
|
||||
raise errors.Error(error)
|
||||
cert = client_ssl.get_peer_certificate()
|
||||
assert cert # Appease mypy. We would have crashed out by now if there was no certificate.
|
||||
return cert.to_cryptography()
|
||||
|
||||
|
||||
# Even *more* annoyingly, due to a mypy bug, we can't use Union[] types in
|
||||
# isinstance expressions without causing false mypy errors. So we have to
|
||||
# recreate the type collection as a tuple here. And no, typing.get_args doesn't
|
||||
|
|
@ -351,14 +140,14 @@ def get_names_from_subject_and_extensions(
|
|||
return dns_names
|
||||
else:
|
||||
# We only include the first CN, if there are multiple. This matches
|
||||
# the behavior of the previously implementation using pyOpenSSL.
|
||||
# the behavior of the previous implementation using pyOpenSSL.
|
||||
return [cns[0]] + [d for d in dns_names if d != cns[0]]
|
||||
|
||||
|
||||
def _cryptography_cert_or_req_san(
|
||||
cert_or_req: Union[x509.Certificate, x509.CertificateSigningRequest],
|
||||
) -> List[str]:
|
||||
"""Get Subject Alternative Names from certificate or CSR using pyOpenSSL.
|
||||
"""Get Subject Alternative Names from certificate or CSR using cryptography.
|
||||
|
||||
.. note:: Although this is `acme` internal API, it is used by
|
||||
`letsencrypt`.
|
||||
|
|
|
|||
3
newsfragments/10079.changed
Normal file
3
newsfragments/10079.changed
Normal file
|
|
@ -0,0 +1,3 @@
|
|||
Removed final instances of pyopenssl x509 and PKey objects
|
||||
* Removed `acme.crypto_util.SSLSocket`
|
||||
* Removed `acme.crypto_util.probe_sni`
|
||||
3
newsfragments/10381.changed
Normal file
3
newsfragments/10381.changed
Normal file
|
|
@ -0,0 +1,3 @@
|
|||
Removed final instances of pyopenssl x509 and PKey objects
|
||||
* Removed `acme.crypto_util.SSLSocket`
|
||||
* Removed `acme.crypto_util.probe_sni`
|
||||
15
pytest.ini
15
pytest.ini
|
|
@ -19,24 +19,13 @@
|
|||
# updated.
|
||||
# 3) Ignore DeprecationWarning for datetime.utcfromtimestamp() triggered
|
||||
# from dateutil. See https://github.com/dateutil/dateutil/issues/1314.
|
||||
# 4 & 5) The pyOpenSSL X509/PKey warnings are due to probe_sni and/or tests for
|
||||
# it. Resolving these warnings is being tracked by
|
||||
# https://github.com/certbot/certbot/issues/10079.
|
||||
# 6) Planning to remove SSLSocket. See
|
||||
# https://github.com/certbot/certbot/issues/10381
|
||||
# 7) Planning to remove support for checking OCSP via OpenSSL binary.
|
||||
# 4) Planning to remove support for checking OCSP via OpenSSL binary.
|
||||
# See https://github.com/certbot/certbot/issues/10291.
|
||||
# 13) Removing probe_sni from public acme, since it's only used in certbot-compatibility-test
|
||||
# after TLS-ALPN support is removed.
|
||||
# 14) Ignore our own DeprecationWarning about Python 3.9 soon to be dropped.
|
||||
# 5) Ignore our own DeprecationWarning about Python 3.9 soon to be dropped.
|
||||
filterwarnings =
|
||||
error
|
||||
ignore:.*rsyncdir:DeprecationWarning
|
||||
ignore:'urllib3.contrib.pyopenssl:DeprecationWarning:requests_toolbelt
|
||||
ignore:.*datetime.utcfromtimestamp\(\) is deprecated:DeprecationWarning:dateutil
|
||||
ignore:Passing pyOpenSSL X509 objects is deprecated:DeprecationWarning
|
||||
ignore:Passing pyOpenSSL PKey objects is deprecated:DeprecationWarning
|
||||
ignore:SSLSocket is deprecated:DeprecationWarning
|
||||
ignore:enforce_openssl_binary_usage parameter is deprecated:DeprecationWarning
|
||||
ignore:probe_sni is deprecated:DeprecationWarning
|
||||
ignore:Python 3.9 support will be dropped:DeprecationWarning
|
||||
|
|
|
|||
Loading…
Reference in a new issue