mirror of
https://github.com/certbot/certbot.git
synced 2026-06-08 00:02:14 -04:00
acme: remove deprecated TLS-ALPN challenge functionality (#10378)
Fixes #10274 --------- Co-authored-by: ohemorange <erica@eff.org> Co-authored-by: Brad Warren <bmw@users.noreply.github.com> Co-authored-by: Brad Warren <bmw@eff.org>
This commit is contained in:
parent
f55ea6e70f
commit
15a145ac3f
14 changed files with 34 additions and 552 deletions
|
|
@ -263,126 +263,6 @@ class HTTP01Test(unittest.TestCase):
|
|||
assert not self.msg.update(token=b'..').good_token
|
||||
|
||||
|
||||
class TLSALPN01ResponseTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
from acme.challenges import TLSALPN01
|
||||
self.chall = TLSALPN01(
|
||||
token=jose.b64decode(b'a82d5ff8ef740d12881f6d3c2277ab2e'))
|
||||
self.domain = u'example.com'
|
||||
self.domain2 = u'example2.com'
|
||||
|
||||
self.response = self.chall.response(KEY)
|
||||
self.jmsg = {
|
||||
'resource': 'challenge',
|
||||
'type': 'tls-alpn-01',
|
||||
'keyAuthorization': self.response.key_authorization,
|
||||
}
|
||||
|
||||
def test_to_partial_json(self):
|
||||
assert {} == self.response.to_partial_json()
|
||||
|
||||
def test_from_json(self):
|
||||
from acme.challenges import TLSALPN01Response
|
||||
assert self.response == TLSALPN01Response.from_json(self.jmsg)
|
||||
|
||||
def test_from_json_hashable(self):
|
||||
from acme.challenges import TLSALPN01Response
|
||||
hash(TLSALPN01Response.from_json(self.jmsg))
|
||||
|
||||
def test_gen_verify_cert(self):
|
||||
key1 = test_util.load_pyopenssl_private_key('rsa512_key.pem')
|
||||
cert, key2 = self.response.gen_cert(self.domain, key1)
|
||||
assert key1 == key2
|
||||
assert self.response.verify_cert(self.domain, cert)
|
||||
|
||||
def test_gen_verify_cert_gen_key(self):
|
||||
cert, key = self.response.gen_cert(self.domain)
|
||||
assert isinstance(key, OpenSSL.crypto.PKey)
|
||||
assert self.response.verify_cert(self.domain, cert)
|
||||
|
||||
def test_verify_bad_cert(self):
|
||||
assert not self.response.verify_cert(self.domain,
|
||||
test_util.load_cert('cert.pem'))
|
||||
|
||||
def test_verify_bad_domain(self):
|
||||
key1 = test_util.load_pyopenssl_private_key('rsa512_key.pem')
|
||||
cert, key2 = self.response.gen_cert(self.domain, key1)
|
||||
assert key1 == key2
|
||||
assert not self.response.verify_cert(self.domain2, cert)
|
||||
|
||||
def test_simple_verify_bad_key_authorization(self):
|
||||
key2 = jose.JWKRSA.load(test_util.load_vector('rsa256_key.pem'))
|
||||
self.response.simple_verify(self.chall, "local", key2.public_key())
|
||||
|
||||
@mock.patch('acme.challenges.TLSALPN01Response.verify_cert', autospec=True)
|
||||
def test_simple_verify(self, mock_verify_cert):
|
||||
mock_verify_cert.return_value = mock.sentinel.verification
|
||||
assert mock.sentinel.verification == self.response.simple_verify(
|
||||
self.chall, self.domain, KEY.public_key(),
|
||||
cert=mock.sentinel.cert)
|
||||
mock_verify_cert.assert_called_once_with(
|
||||
self.response, self.domain, mock.sentinel.cert)
|
||||
|
||||
@mock.patch('acme.challenges.socket.gethostbyname')
|
||||
@mock.patch('acme.challenges.crypto_util.probe_sni')
|
||||
def test_probe_cert(self, mock_probe_sni, mock_gethostbyname):
|
||||
mock_gethostbyname.return_value = '127.0.0.1'
|
||||
self.response.probe_cert('foo.com')
|
||||
mock_gethostbyname.assert_called_once_with('foo.com')
|
||||
mock_probe_sni.assert_called_once_with(
|
||||
host=b'127.0.0.1', port=self.response.PORT, name=b'foo.com',
|
||||
alpn_protocols=[b'acme-tls/1'])
|
||||
|
||||
self.response.probe_cert('foo.com', host='8.8.8.8')
|
||||
mock_probe_sni.assert_called_with(
|
||||
host=b'8.8.8.8', port=mock.ANY, name=b'foo.com',
|
||||
alpn_protocols=[b'acme-tls/1'])
|
||||
|
||||
@mock.patch('acme.challenges.TLSALPN01Response.probe_cert')
|
||||
def test_simple_verify_false_on_probe_error(self, mock_probe_cert):
|
||||
mock_probe_cert.side_effect = errors.Error
|
||||
assert not self.response.simple_verify(
|
||||
self.chall, self.domain, KEY.public_key())
|
||||
|
||||
|
||||
class TLSALPN01Test(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
from acme.challenges import TLSALPN01
|
||||
self.msg = TLSALPN01(
|
||||
token=jose.b64decode('a82d5ff8ef740d12881f6d3c2277ab2e'))
|
||||
self.jmsg = {
|
||||
'type': 'tls-alpn-01',
|
||||
'token': 'a82d5ff8ef740d12881f6d3c2277ab2e',
|
||||
}
|
||||
|
||||
def test_to_partial_json(self):
|
||||
assert self.jmsg == self.msg.to_partial_json()
|
||||
|
||||
def test_from_json(self):
|
||||
from acme.challenges import TLSALPN01
|
||||
assert self.msg == TLSALPN01.from_json(self.jmsg)
|
||||
|
||||
def test_from_json_hashable(self):
|
||||
from acme.challenges import TLSALPN01
|
||||
hash(TLSALPN01.from_json(self.jmsg))
|
||||
|
||||
def test_from_json_invalid_token_length(self):
|
||||
from acme.challenges import TLSALPN01
|
||||
self.jmsg['token'] = jose.encode_b64jose(b'abcd')
|
||||
with pytest.raises(jose.DeserializationError):
|
||||
TLSALPN01.from_json(self.jmsg)
|
||||
|
||||
@mock.patch('acme.challenges.TLSALPN01Response.gen_cert')
|
||||
def test_validation(self, mock_gen_cert):
|
||||
mock_gen_cert.return_value = ('cert', 'key')
|
||||
assert ('cert', 'key') == self.msg.validation(
|
||||
KEY, cert_key=mock.sentinel.cert_key, domain=mock.sentinel.domain)
|
||||
mock_gen_cert.assert_called_once_with(key=mock.sentinel.cert_key,
|
||||
domain=mock.sentinel.domain)
|
||||
|
||||
|
||||
class DNSTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
|
|
|
|||
|
|
@ -20,24 +20,6 @@ from acme import errors
|
|||
from acme._internal.tests import test_util
|
||||
|
||||
|
||||
class TLSServerTest(unittest.TestCase):
|
||||
"""Tests for acme.standalone.TLSServer."""
|
||||
|
||||
|
||||
def test_bind(self): # pylint: disable=no-self-use
|
||||
from acme.standalone import TLSServer
|
||||
server = TLSServer(
|
||||
('', 0), socketserver.BaseRequestHandler, bind_and_activate=True)
|
||||
server.server_close()
|
||||
|
||||
def test_ipv6(self):
|
||||
if socket.has_ipv6:
|
||||
from acme.standalone import TLSServer
|
||||
server = TLSServer(
|
||||
('', 0), socketserver.BaseRequestHandler, bind_and_activate=True, ipv6=True)
|
||||
server.server_close()
|
||||
|
||||
|
||||
class HTTP01ServerTest(unittest.TestCase):
|
||||
"""Tests for acme.standalone.HTTP01Server."""
|
||||
|
||||
|
|
@ -112,57 +94,6 @@ class HTTP01ServerTest(unittest.TestCase):
|
|||
assert not is_hung, 'Server shutdown should not be hung'
|
||||
|
||||
|
||||
@unittest.skipIf(not challenges.TLSALPN01.is_supported(), "pyOpenSSL too old")
|
||||
class TLSALPN01ServerTest(unittest.TestCase):
|
||||
"""Test for acme.standalone.TLSALPN01Server."""
|
||||
|
||||
def setUp(self):
|
||||
self.certs = {b'localhost': (
|
||||
serialization.load_pem_private_key(test_util.load_vector('rsa2048_key.pem'), password=None),
|
||||
x509.load_pem_x509_certificate(test_util.load_vector('rsa2048_cert.pem')),
|
||||
)}
|
||||
# Use different certificate for challenge.
|
||||
self.challenge_certs = {b'localhost': (
|
||||
serialization.load_pem_private_key(test_util.load_vector('rsa4096_key.pem'), password=None),
|
||||
x509.load_pem_x509_certificate(test_util.load_vector('rsa4096_cert.pem')),
|
||||
)}
|
||||
from acme.standalone import TLSALPN01Server
|
||||
self.server = TLSALPN01Server(("localhost", 0), certs=self.certs,
|
||||
challenge_certs=self.challenge_certs)
|
||||
# pylint: disable=no-member
|
||||
self.thread = threading.Thread(target=self.server.serve_forever)
|
||||
self.thread.start()
|
||||
|
||||
def tearDown(self):
|
||||
self.server.shutdown() # pylint: disable=no-member
|
||||
self.thread.join()
|
||||
self.server.server_close()
|
||||
|
||||
# TODO: This is not implemented yet, see comments in standalone.py
|
||||
# def test_certs(self):
|
||||
# host, port = self.server.socket.getsockname()[:2]
|
||||
# cert = crypto_util.probe_sni(
|
||||
# b'localhost', host=host, port=port, timeout=1)
|
||||
# # Expect normal cert when connecting without ALPN.
|
||||
# self.assertEqual(cert,
|
||||
# self.certs[b'localhost'][1])
|
||||
|
||||
def test_challenge_certs(self):
|
||||
host, port = self.server.socket.getsockname()[:2]
|
||||
cert = crypto_util.probe_sni(
|
||||
b'localhost', host=host, port=port, timeout=1,
|
||||
alpn_protocols=[b"acme-tls/1"])
|
||||
# Expect challenge cert when connecting with ALPN.
|
||||
assert cert == self.challenge_certs[b'localhost'][1]
|
||||
|
||||
def test_bad_alpn(self):
|
||||
host, port = self.server.socket.getsockname()[:2]
|
||||
with pytest.raises(errors.Error):
|
||||
crypto_util.probe_sni(
|
||||
b'localhost', host=host, port=port, timeout=1,
|
||||
alpn_protocols=[b"bad-alpn"])
|
||||
|
||||
|
||||
class BaseDualNetworkedServersTest(unittest.TestCase):
|
||||
"""Test for acme.standalone.BaseDualNetworkedServers."""
|
||||
|
||||
|
|
|
|||
|
|
@ -3,7 +3,6 @@ import abc
|
|||
import functools
|
||||
import hashlib
|
||||
import logging
|
||||
import socket
|
||||
from typing import Any
|
||||
from typing import cast
|
||||
from typing import Dict
|
||||
|
|
@ -13,18 +12,11 @@ from typing import Tuple
|
|||
from typing import Type
|
||||
from typing import TypeVar
|
||||
from typing import Union
|
||||
import warnings
|
||||
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
import josepy as jose
|
||||
from OpenSSL import crypto
|
||||
from OpenSSL import SSL
|
||||
import requests
|
||||
|
||||
from acme import crypto_util
|
||||
from acme import errors
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
GenericChallenge = TypeVar('GenericChallenge', bound='Challenge')
|
||||
|
|
@ -398,211 +390,6 @@ class HTTP01(KeyAuthorizationChallenge):
|
|||
return self.key_authorization(account_key)
|
||||
|
||||
|
||||
@ChallengeResponse.register
|
||||
class TLSALPN01Response(KeyAuthorizationChallengeResponse):
|
||||
"""ACME tls-alpn-01 challenge response.
|
||||
|
||||
.. deprecated:: 4.1.0
|
||||
|
||||
"""
|
||||
typ = "tls-alpn-01"
|
||||
|
||||
PORT = 443
|
||||
"""Verification port as defined by the protocol.
|
||||
|
||||
You can override it (e.g. for testing) by passing ``port`` to
|
||||
`simple_verify`.
|
||||
|
||||
"""
|
||||
|
||||
ID_PE_ACME_IDENTIFIER_V1 = b"1.3.6.1.5.5.7.1.30.1"
|
||||
ACME_TLS_1_PROTOCOL = b"acme-tls/1"
|
||||
|
||||
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
||||
warnings.warn("TLSALPN01Response is deprecated and will be removed in an "
|
||||
"upcoming certbot major version update", DeprecationWarning)
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
@property
|
||||
def h(self) -> bytes:
|
||||
"""Hash value stored in challenge certificate"""
|
||||
return hashlib.sha256(self.key_authorization.encode('utf-8')).digest()
|
||||
|
||||
def gen_cert(self, domain: str, key: Optional[crypto.PKey] = None, bits: int = 2048
|
||||
) -> Tuple[x509.Certificate, crypto.PKey]:
|
||||
"""Generate tls-alpn-01 certificate.
|
||||
|
||||
:param str domain: Domain verified by the challenge.
|
||||
:param OpenSSL.crypto.PKey key: Optional private key used in
|
||||
certificate generation. If not provided (``None``), then
|
||||
fresh key will be generated.
|
||||
:param int bits: Number of bits for newly generated key.
|
||||
|
||||
:rtype: `tuple` of `x509.Certificate` and `OpenSSL.crypto.PKey`
|
||||
|
||||
"""
|
||||
if key is None:
|
||||
key = crypto.PKey()
|
||||
key.generate_key(crypto.TYPE_RSA, bits)
|
||||
|
||||
oid = x509.ObjectIdentifier(self.ID_PE_ACME_IDENTIFIER_V1.decode())
|
||||
acme_extension = x509.Extension(
|
||||
oid,
|
||||
critical=True,
|
||||
value=x509.UnrecognizedExtension(oid, self.h)
|
||||
)
|
||||
|
||||
cryptography_key = key.to_cryptography_key()
|
||||
assert isinstance(cryptography_key, crypto_util.CertificateIssuerPrivateKeyTypesTpl)
|
||||
cert = crypto_util.make_self_signed_cert(
|
||||
cryptography_key,
|
||||
[domain],
|
||||
force_san=True,
|
||||
extensions=[acme_extension]
|
||||
)
|
||||
return cert, key
|
||||
|
||||
def probe_cert(self, domain: str, host: Optional[str] = None,
|
||||
port: Optional[int] = None) -> x509.Certificate:
|
||||
"""Probe tls-alpn-01 challenge certificate.
|
||||
|
||||
:param str domain: domain being validated, required.
|
||||
:param str host: IP address used to probe the certificate.
|
||||
:param int port: Port used to probe the certificate.
|
||||
|
||||
"""
|
||||
if host is None:
|
||||
host = socket.gethostbyname(domain)
|
||||
logger.debug('%s resolved to %s', domain, host)
|
||||
if port is None:
|
||||
port = self.PORT
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.filterwarnings(
|
||||
'ignore',
|
||||
message='alpn_protocols parameter is deprecated'
|
||||
)
|
||||
return crypto_util.probe_sni(host=host.encode(), port=port, name=domain.encode(),
|
||||
alpn_protocols=[self.ACME_TLS_1_PROTOCOL])
|
||||
|
||||
def verify_cert(self, domain: str, cert: x509.Certificate, ) -> bool:
|
||||
"""Verify tls-alpn-01 challenge certificate.
|
||||
|
||||
:param str domain: Domain name being validated.
|
||||
:param cert: Challenge certificate.
|
||||
:type cert: `cryptography.x509.Certificate`
|
||||
|
||||
:returns: Whether the certificate was successfully verified.
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
names = crypto_util.get_names_from_subject_and_extensions(
|
||||
cert.subject, cert.extensions
|
||||
)
|
||||
logger.debug(
|
||||
"Certificate %s. SANs: %s", cert.fingerprint(hashes.SHA256()), names
|
||||
)
|
||||
if len(names) != 1 or names[0].lower() != domain.lower():
|
||||
return False
|
||||
|
||||
try:
|
||||
ext = cert.extensions.get_extension_for_oid(
|
||||
x509.ObjectIdentifier(self.ID_PE_ACME_IDENTIFIER_V1.decode())
|
||||
)
|
||||
except x509.ExtensionNotFound:
|
||||
return False
|
||||
|
||||
# This is for the type checker.
|
||||
assert isinstance(ext.value, x509.UnrecognizedExtension)
|
||||
return ext.value.value == self.h
|
||||
|
||||
# pylint: disable=too-many-arguments
|
||||
def simple_verify(self, chall: 'TLSALPN01', domain: str, account_public_key: jose.JWK,
|
||||
cert: Optional[x509.Certificate] = None, host: Optional[str] = None,
|
||||
port: Optional[int] = None) -> bool:
|
||||
"""Simple verify.
|
||||
|
||||
Verify ``validation`` using ``account_public_key``, optionally
|
||||
probe tls-alpn-01 certificate and check using `verify_cert`.
|
||||
|
||||
:param .challenges.TLSALPN01 chall: Corresponding challenge.
|
||||
:param str domain: Domain name being validated.
|
||||
:param JWK account_public_key:
|
||||
:param x509.Certificate cert: Optional certificate. If not
|
||||
provided (``None``) certificate will be retrieved using
|
||||
`probe_cert`.
|
||||
:param string host: IP address used to probe the certificate.
|
||||
:param int port: Port used to probe the certificate.
|
||||
|
||||
|
||||
:returns: ``True`` if and only if client's control of the domain has been verified.
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
if not self.verify(chall, account_public_key):
|
||||
logger.debug("Verification of key authorization in response failed")
|
||||
return False
|
||||
|
||||
if cert is None:
|
||||
try:
|
||||
cert = self.probe_cert(domain=domain, host=host, port=port)
|
||||
except errors.Error as error:
|
||||
logger.debug(str(error), exc_info=True)
|
||||
return False
|
||||
|
||||
return self.verify_cert(domain, cert)
|
||||
|
||||
|
||||
@Challenge.register # pylint: disable=too-many-ancestors
|
||||
class TLSALPN01(KeyAuthorizationChallenge):
|
||||
"""ACME tls-alpn-01 challenge.
|
||||
|
||||
.. deprecated:: 4.1.0
|
||||
|
||||
"""
|
||||
response_cls = TLSALPN01Response
|
||||
typ = response_cls.typ
|
||||
|
||||
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
||||
warnings.warn("TLSALPN01 is deprecated and will be removed in an "
|
||||
"upcoming certbot major version update", DeprecationWarning)
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def validation(self, account_key: jose.JWK,
|
||||
**kwargs: Any) -> Tuple[x509.Certificate, crypto.PKey]:
|
||||
"""Generate validation.
|
||||
|
||||
:param JWK account_key:
|
||||
:param str domain: Domain verified by the challenge.
|
||||
:param OpenSSL.crypto.PKey cert_key: Optional private key used
|
||||
in certificate generation. If not provided (``None``), then
|
||||
fresh key will be generated.
|
||||
|
||||
:rtype: `tuple` of `x509.Certificate` and `OpenSSL.crypto.PKey`
|
||||
|
||||
"""
|
||||
# TODO: Remove cast when response() is generic.
|
||||
return cast(TLSALPN01Response, self.response(account_key)).gen_cert(
|
||||
key=kwargs.get('cert_key'),
|
||||
domain=cast(str, kwargs.get('domain')))
|
||||
|
||||
@staticmethod
|
||||
def is_supported() -> bool:
|
||||
"""
|
||||
Check if TLS-ALPN-01 challenge is supported on this machine.
|
||||
This implies that a recent version of OpenSSL is installed (>= 1.0.2),
|
||||
or a recent cryptography version shipped with the OpenSSL library is installed.
|
||||
|
||||
:returns: ``True`` if TLS-ALPN-01 is supported on this machine, ``False`` otherwise.
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
warnings.warn("TLSALPN01 is deprecated and will be removed in an "
|
||||
"upcoming certbot major version update", DeprecationWarning)
|
||||
return (hasattr(SSL.Connection, "set_alpn_protos")
|
||||
and hasattr(SSL.Context, "set_alpn_select_callback"))
|
||||
|
||||
|
||||
@Challenge.register
|
||||
class DNS(_TokenChallenge):
|
||||
"""ACME "dns" challenge."""
|
||||
|
|
|
|||
|
|
@ -12,7 +12,6 @@ from typing import List
|
|||
from typing import Literal
|
||||
from typing import Mapping
|
||||
from typing import Optional
|
||||
from typing import Sequence
|
||||
from typing import Set
|
||||
from typing import Tuple
|
||||
from typing import Union
|
||||
|
|
@ -81,8 +80,6 @@ class SSLSocket: # pylint: disable=too-few-public-methods
|
|||
:ivar dict certs: Mapping from domain names (`bytes`) to
|
||||
`OpenSSL.crypto.X509`.
|
||||
:ivar method: See `OpenSSL.SSL.Context` for allowed values.
|
||||
:ivar alpn_selection: Hook to select negotiated ALPN protocol for
|
||||
connection.
|
||||
:ivar cert_selection: Hook to select certificate for connection. If given,
|
||||
`certs` parameter would be ignored, and therefore must be empty.
|
||||
|
||||
|
|
@ -92,7 +89,6 @@ class SSLSocket: # pylint: disable=too-few-public-methods
|
|||
sock: socket.socket,
|
||||
certs: Optional[Mapping[bytes, _KeyAndCert]] = None,
|
||||
method: int = _DEFAULT_SSL_METHOD,
|
||||
alpn_selection: Optional[Callable[[SSL.Connection, List[bytes]], bytes]] = None,
|
||||
cert_selection: Optional[
|
||||
Callable[
|
||||
[SSL.Connection],
|
||||
|
|
@ -103,7 +99,6 @@ class SSLSocket: # pylint: disable=too-few-public-methods
|
|||
warnings.warn("SSLSocket is deprecated and will be removed in an upcoming release",
|
||||
DeprecationWarning)
|
||||
self.sock = sock
|
||||
self.alpn_selection = alpn_selection
|
||||
self.method = method
|
||||
if not cert_selection and not certs:
|
||||
raise ValueError("Neither cert_selection or certs specified.")
|
||||
|
|
@ -140,8 +135,6 @@ class SSLSocket: # pylint: disable=too-few-public-methods
|
|||
if isinstance(cert, x509.Certificate):
|
||||
cert = crypto.X509.from_cryptography(cert)
|
||||
new_context.use_certificate(cert)
|
||||
if self.alpn_selection is not None:
|
||||
new_context.set_alpn_select_callback(self.alpn_selection)
|
||||
connection.set_context(new_context)
|
||||
|
||||
class FakeConnection:
|
||||
|
|
@ -178,8 +171,6 @@ class SSLSocket: # pylint: disable=too-few-public-methods
|
|||
context.set_options(SSL.OP_NO_SSLv2)
|
||||
context.set_options(SSL.OP_NO_SSLv3)
|
||||
context.set_tlsext_servername_callback(self._pick_certificate_cb)
|
||||
if self.alpn_selection is not None:
|
||||
context.set_alpn_select_callback(self.alpn_selection)
|
||||
|
||||
ssl_sock = self.FakeConnection(SSL.Connection(context, sock))
|
||||
ssl_sock.set_accept_state()
|
||||
|
|
@ -203,8 +194,8 @@ class SSLSocket: # pylint: disable=too-few-public-methods
|
|||
|
||||
|
||||
def probe_sni(name: bytes, host: bytes, port: int = 443, timeout: int = 300, # pylint: disable=too-many-arguments
|
||||
method: int = _DEFAULT_SSL_METHOD, source_address: Tuple[str, int] = ('', 0),
|
||||
alpn_protocols: Optional[Sequence[bytes]] = None) -> x509.Certificate:
|
||||
method: int = _DEFAULT_SSL_METHOD,
|
||||
source_address: Tuple[str, int] = ('', 0)) -> x509.Certificate:
|
||||
"""Probe SNI server for SSL certificate.
|
||||
|
||||
:param bytes name: Byte string to send as the server name in the
|
||||
|
|
@ -216,8 +207,6 @@ def probe_sni(name: bytes, host: bytes, port: int = 443, timeout: int = 300, #
|
|||
:param tuple source_address: Enables multi-path probing (selection
|
||||
of source interface). See `socket.creation_connection` for more
|
||||
info. Available only in Python 2.7+.
|
||||
:param alpn_protocols: Protocols to request using ALPN.
|
||||
:type alpn_protocols: `Sequence` of `bytes`
|
||||
|
||||
:raises acme.errors.Error: In case of any problems.
|
||||
|
||||
|
|
@ -249,10 +238,6 @@ def probe_sni(name: bytes, host: bytes, port: int = 443, timeout: int = 300, #
|
|||
client_ssl = SSL.Connection(context, client)
|
||||
client_ssl.set_connect_state()
|
||||
client_ssl.set_tlsext_host_name(name) # pyOpenSSL>=0.13
|
||||
if alpn_protocols is not None:
|
||||
client_ssl.set_alpn_protos(list(alpn_protocols))
|
||||
warnings.warn("alpn_protocols parameter is deprecated and will be removed in an "
|
||||
"upcoming certbot major version update", DeprecationWarning)
|
||||
try:
|
||||
client_ssl.do_handshake()
|
||||
client_ssl.shutdown()
|
||||
|
|
|
|||
|
|
@ -10,64 +10,17 @@ import socket
|
|||
import socketserver
|
||||
import threading
|
||||
from typing import Any
|
||||
from typing import cast
|
||||
from typing import List
|
||||
from typing import Mapping
|
||||
from typing import Optional
|
||||
from typing import Set
|
||||
from typing import Tuple
|
||||
from typing import Type
|
||||
import warnings
|
||||
|
||||
from OpenSSL import SSL
|
||||
|
||||
from acme import challenges
|
||||
from acme import crypto_util
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TLSServer(socketserver.TCPServer):
|
||||
"""Generic TLS Server
|
||||
|
||||
.. deprecated:: 4.1.0
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
||||
warnings.warn("TLSServer is deprecated and will be removed in an upcoming release",
|
||||
DeprecationWarning)
|
||||
self.ipv6 = kwargs.pop("ipv6", False)
|
||||
if self.ipv6:
|
||||
self.address_family = socket.AF_INET6
|
||||
else:
|
||||
self.address_family = socket.AF_INET
|
||||
self.certs = kwargs.pop("certs", {})
|
||||
self.method = kwargs.pop("method", crypto_util._DEFAULT_SSL_METHOD)
|
||||
self.allow_reuse_address = kwargs.pop("allow_reuse_address", True)
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def _wrap_sock(self) -> None:
|
||||
with warnings.catch_warnings():
|
||||
warnings.filterwarnings('ignore', 'SSLSocket is deprecated')
|
||||
self.socket = cast(socket.socket, crypto_util.SSLSocket(
|
||||
self.socket, cert_selection=self._cert_selection,
|
||||
alpn_selection=getattr(self, '_alpn_selection', None),
|
||||
method=self.method))
|
||||
|
||||
def _cert_selection(self, connection: SSL.Connection
|
||||
) -> Optional[crypto_util._KeyAndCert]: # pragma: no cover
|
||||
"""Callback selecting certificate for connection."""
|
||||
server_name = connection.get_servername()
|
||||
if server_name:
|
||||
return self.certs.get(server_name, None)
|
||||
return None
|
||||
|
||||
def server_bind(self) -> None:
|
||||
self._wrap_sock()
|
||||
return socketserver.TCPServer.server_bind(self)
|
||||
|
||||
|
||||
class ACMEServerMixin:
|
||||
"""ACME server common settings mixin."""
|
||||
# TODO: c.f. #858
|
||||
|
|
@ -156,56 +109,6 @@ class BaseDualNetworkedServers:
|
|||
self.threads = []
|
||||
|
||||
|
||||
class TLSALPN01Server(TLSServer, ACMEServerMixin):
|
||||
"""TLSALPN01 Server.
|
||||
|
||||
.. deprecated:: 4.1.0
|
||||
|
||||
"""
|
||||
|
||||
ACME_TLS_1_PROTOCOL = b"acme-tls/1"
|
||||
|
||||
def __init__(self, server_address: Tuple[str, int],
|
||||
certs: List[crypto_util._KeyAndCert],
|
||||
challenge_certs: Mapping[bytes, crypto_util._KeyAndCert],
|
||||
ipv6: bool = False) -> None:
|
||||
warnings.warn("TLSALPN01Server is deprecated and will be removed in an "
|
||||
"upcoming certbot major version update", DeprecationWarning)
|
||||
# We don't need to implement a request handler here because the work
|
||||
# (including logging) is being done by wrapped socket set up in the
|
||||
# parent TLSServer class.
|
||||
with warnings.catch_warnings():
|
||||
warnings.filterwarnings("ignore", "TLSServer is deprecated")
|
||||
TLSServer.__init__(
|
||||
self, server_address, socketserver.BaseRequestHandler, certs=certs,
|
||||
ipv6=ipv6)
|
||||
self.challenge_certs = challenge_certs
|
||||
|
||||
def _cert_selection(self, connection: SSL.Connection) -> Optional[crypto_util._KeyAndCert]:
|
||||
# TODO: We would like to serve challenge cert only if asked for it via
|
||||
# ALPN. To do this, we need to retrieve the list of protos from client
|
||||
# hello, but this is currently impossible with openssl [0], and ALPN
|
||||
# negotiation is done after cert selection.
|
||||
# Therefore, currently we always return challenge cert, and terminate
|
||||
# handshake in alpn_selection() if ALPN protos are not what we expect.
|
||||
# [0] https://github.com/openssl/openssl/issues/4952
|
||||
server_name = connection.get_servername()
|
||||
if server_name:
|
||||
logger.debug("Serving challenge cert for server name %s", server_name)
|
||||
return self.challenge_certs[server_name]
|
||||
return None # pragma: no cover
|
||||
|
||||
def _alpn_selection(self, _connection: SSL.Connection, alpn_protos: List[bytes]) -> bytes:
|
||||
"""Callback to select alpn protocol."""
|
||||
if len(alpn_protos) == 1 and alpn_protos[0] == self.ACME_TLS_1_PROTOCOL:
|
||||
logger.debug("Agreed on %s ALPN", self.ACME_TLS_1_PROTOCOL)
|
||||
return self.ACME_TLS_1_PROTOCOL
|
||||
logger.debug("Cannot agree on ALPN proto. Got: %s", str(alpn_protos))
|
||||
# Explicitly close the connection now, by returning an empty string.
|
||||
# See https://www.pyopenssl.org/en/stable/api/ssl.html#OpenSSL.SSL.Context.set_alpn_select_callback # pylint: disable=line-too-long
|
||||
return b""
|
||||
|
||||
|
||||
class HTTPServer(BaseHTTPServer.HTTPServer):
|
||||
"""Generic HTTP Server."""
|
||||
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ class IntegrationTestsContext:
|
|||
acme_xdist = request.config.acme_xdist # type: ignore[attr-defined]
|
||||
|
||||
self.directory_url = acme_xdist['directory_url']
|
||||
self.tls_alpn_01_port = acme_xdist['https_port'][self.worker_id]
|
||||
self.https_port = acme_xdist['https_port'][self.worker_id]
|
||||
self.http_01_port = acme_xdist['http_port'][self.worker_id]
|
||||
self.other_port = acme_xdist['other_port'][self.worker_id]
|
||||
# Challtestsrv REST API, that exposes entrypoints to register new DNS entries,
|
||||
|
|
@ -78,7 +78,7 @@ class IntegrationTestsContext:
|
|||
command = ['--authenticator', 'standalone', '--installer', 'null']
|
||||
command.extend(args)
|
||||
return certbot_call.certbot_test(
|
||||
command, self.directory_url, self.http_01_port, self.tls_alpn_01_port,
|
||||
command, self.directory_url, self.http_01_port, self.https_port,
|
||||
self.config_dir, self.workspace, force_renew=force_renew)
|
||||
|
||||
def get_domain(self, subdomain: str = 'le') -> str:
|
||||
|
|
|
|||
|
|
@ -96,17 +96,14 @@ def test_prepare_plugins(context: IntegrationTestsContext) -> None:
|
|||
|
||||
def test_http_01(context: IntegrationTestsContext) -> None:
|
||||
"""Test the HTTP-01 challenge using standalone plugin."""
|
||||
# We start a server listening on the port for the
|
||||
# TLS-SNI challenge to prevent regressions in #3601.
|
||||
with misc.create_http_server(context.tls_alpn_01_port):
|
||||
certname = context.get_domain('le2')
|
||||
context.certbot([
|
||||
'--domains', certname, '--preferred-challenges', 'http-01', 'run',
|
||||
'--cert-name', certname,
|
||||
'--pre-hook', misc.echo('wtf_pre', context.hook_probe),
|
||||
'--post-hook', misc.echo('wtf_post', context.hook_probe),
|
||||
'--deploy-hook', misc.echo('deploy', context.hook_probe),
|
||||
])
|
||||
certname = context.get_domain('le2')
|
||||
context.certbot([
|
||||
'--domains', certname, '--preferred-challenges', 'http-01', 'run',
|
||||
'--cert-name', certname,
|
||||
'--pre-hook', misc.echo('wtf_pre', context.hook_probe),
|
||||
'--post-hook', misc.echo('wtf_post', context.hook_probe),
|
||||
'--deploy-hook', misc.echo('deploy', context.hook_probe),
|
||||
])
|
||||
|
||||
assert_hook_execution(context.hook_probe, 'deploy')
|
||||
assert_saved_renew_hook(context.config_dir, certname)
|
||||
|
|
|
|||
|
|
@ -46,12 +46,12 @@ class IntegrationTestsContext(certbot_context.IntegrationTestsContext):
|
|||
'--nginx-server-root', self.nginx_root]
|
||||
command.extend(args)
|
||||
return certbot_call.certbot_test(
|
||||
command, self.directory_url, self.http_01_port, self.tls_alpn_01_port,
|
||||
command, self.directory_url, self.http_01_port, self.https_port,
|
||||
self.config_dir, self.workspace, force_renew=True)
|
||||
|
||||
def _start_nginx(self, default_server: bool) -> subprocess.Popen[bytes]:
|
||||
self.nginx_config = config.construct_nginx_config(
|
||||
self.nginx_root, self.webroot, self.http_01_port, self.tls_alpn_01_port,
|
||||
self.nginx_root, self.webroot, self.http_01_port, self.https_port,
|
||||
self.other_port, default_server, wtf_prefix=self.worker_id)
|
||||
with open(self.nginx_config_path, 'w') as file:
|
||||
file.write(self.nginx_config)
|
||||
|
|
|
|||
|
|
@ -44,7 +44,7 @@ def test_certificate_deployment(certname_pattern: str, params: List[str],
|
|||
context.certbot_test_nginx(command)
|
||||
|
||||
lineage = domains.split(',')[0]
|
||||
server_cert = ssl.get_server_certificate(('localhost', context.tls_alpn_01_port))
|
||||
server_cert = ssl.get_server_certificate(('localhost', context.https_port))
|
||||
with open(os.path.join(
|
||||
context.workspace, 'conf/live/{0}/cert.pem'.format(lineage)), 'r'
|
||||
) as file:
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ class IntegrationTestsContext(certbot_context.IntegrationTestsContext):
|
|||
command = ['--authenticator', 'dns-rfc2136', '--dns-rfc2136-propagation-seconds', '2']
|
||||
command.extend(args)
|
||||
return certbot_call.certbot_test(
|
||||
command, self.directory_url, self.http_01_port, self.tls_alpn_01_port,
|
||||
command, self.directory_url, self.http_01_port, self.https_port,
|
||||
self.config_dir, self.workspace, force_renew=True)
|
||||
|
||||
@contextmanager
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ from certbot_integration_tests.utils.constants import *
|
|||
|
||||
|
||||
def certbot_test(certbot_args: List[str], directory_url: Optional[str], http_01_port: int,
|
||||
tls_alpn_01_port: int, config_dir: str, workspace: str,
|
||||
https_port: int, config_dir: str, workspace: str,
|
||||
force_renew: bool = True) -> Tuple[str, str]:
|
||||
"""
|
||||
Invoke the certbot executable available in PATH in a test context for the given args.
|
||||
|
|
@ -25,14 +25,14 @@ def certbot_test(certbot_args: List[str], directory_url: Optional[str], http_01_
|
|||
:param list certbot_args: the arguments to pass to the certbot executable
|
||||
:param str directory_url: URL of the ACME directory server to use
|
||||
:param int http_01_port: port for the HTTP-01 challenges
|
||||
:param int tls_alpn_01_port: port for the TLS-ALPN-01 challenges
|
||||
:param int https_port: port Nginx expects will serve HTTPS
|
||||
:param str config_dir: certbot configuration directory to use
|
||||
:param str workspace: certbot current directory to use
|
||||
:param bool force_renew: set False to not force renew existing certificates (default: True)
|
||||
:return: stdout and stderr as strings
|
||||
:rtype: `tuple` of `str`
|
||||
"""
|
||||
command, env = _prepare_args_env(certbot_args, directory_url, http_01_port, tls_alpn_01_port,
|
||||
command, env = _prepare_args_env(certbot_args, directory_url, http_01_port, https_port,
|
||||
config_dir, workspace, force_renew)
|
||||
|
||||
proc = subprocess.run(command, stdout=subprocess.PIPE,
|
||||
|
|
@ -83,7 +83,7 @@ def _prepare_environ(workspace: str) -> Dict[str, str]:
|
|||
|
||||
|
||||
def _prepare_args_env(certbot_args: List[str], directory_url: Optional[str], http_01_port: int,
|
||||
tls_alpn_01_port: int, config_dir: str, workspace: str,
|
||||
https_port: int, config_dir: str, workspace: str,
|
||||
force_renew: bool) -> Tuple[List[str], Dict[str, str]]:
|
||||
|
||||
new_environ = _prepare_environ(workspace)
|
||||
|
|
@ -99,7 +99,7 @@ def _prepare_args_env(certbot_args: List[str], directory_url: Optional[str], htt
|
|||
'certbot',
|
||||
'--no-verify-ssl',
|
||||
'--http-01-port', str(http_01_port),
|
||||
'--https-port', str(tls_alpn_01_port),
|
||||
'--https-port', str(https_port),
|
||||
'--config-dir', config_dir,
|
||||
'--work-dir', os.path.join(workspace, 'work'),
|
||||
'--logs-dir', os.path.join(workspace, 'logs'),
|
||||
|
|
@ -126,7 +126,7 @@ def main() -> None:
|
|||
# Default config is pebble
|
||||
directory_url = os.environ.get('SERVER', PEBBLE_DIRECTORY_URL)
|
||||
http_01_port = int(os.environ.get('HTTP_01_PORT', DEFAULT_HTTP_01_PORT))
|
||||
tls_alpn_01_port = int(os.environ.get('TLS_ALPN_01_PORT', TLS_ALPN_01_PORT))
|
||||
https_port = int(os.environ.get('HTTPS_PORT', HTTPS_PORT))
|
||||
|
||||
# Execution of certbot in a self-contained workspace
|
||||
workspace = os.environ.get('WORKSPACE', os.path.join(os.getcwd(), '.certbot_test_workspace'))
|
||||
|
|
@ -138,7 +138,7 @@ def main() -> None:
|
|||
config_dir = os.path.join(workspace, 'conf')
|
||||
|
||||
# Invoke certbot in test mode, without capturing output so users see directly the outcome.
|
||||
command, env = _prepare_args_env(args, directory_url, http_01_port, tls_alpn_01_port,
|
||||
command, env = _prepare_args_env(args, directory_url, http_01_port, https_port,
|
||||
config_dir, workspace, False)
|
||||
subprocess.check_call(command, universal_newlines=True, cwd=workspace, env=env)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
"""Some useful constants to use throughout certbot-ci integration tests"""
|
||||
DEFAULT_HTTP_01_PORT = 5002
|
||||
TLS_ALPN_01_PORT = 5001
|
||||
HTTPS_PORT = 5001
|
||||
CHALLTESTSRV_PORT = 8055
|
||||
PEBBLE_DIRECTORY_URL = 'https://localhost:14000/dir'
|
||||
PEBBLE_MANAGEMENT_URL = 'https://localhost:15000'
|
||||
|
|
|
|||
5
newsfragments/10274.changed
Normal file
5
newsfragments/10274.changed
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
Removed a number of deprecated classes/interfaces
|
||||
* Removed `acme.challenges.TLSALPN01Response`
|
||||
* Removed `acme.challenges.TLSALPN01`
|
||||
* Removed `acme.standalone.TLSServer`
|
||||
* Removed `acme.standalone.TLSALPN01Server`
|
||||
16
pytest.ini
16
pytest.ini
|
|
@ -19,13 +19,12 @@
|
|||
# updated.
|
||||
# 3) Ignore DeprecationWarning for datetime.utcfromtimestamp() triggered
|
||||
# from dateutil. See https://github.com/dateutil/dateutil/issues/1314.
|
||||
# 4 & 5) The pyOpenSSL X509/PKey warnings are due to TLS-ALPN-01 support.
|
||||
# Resolving these warnings is being tracked by
|
||||
# 4 & 5) The pyOpenSSL X509/PKey warnings are due to probe_sni and/or tests for
|
||||
# it. Resolving these warnings is being tracked by
|
||||
# https://github.com/certbot/certbot/issues/10079.
|
||||
# 6 - 11) Planning to remove unused TLS-ALPN support in acme.
|
||||
# See https://github.com/certbot/certbot/issues/10266 and
|
||||
# https://github.com/certbot/certbot/pull/10294.
|
||||
# 12) Planning to remove support for checking OCSP via OpenSSL binary.
|
||||
# 6) Planning to remove SSLSocket. See
|
||||
# https://github.com/certbot/certbot/issues/10381
|
||||
# 7) Planning to remove support for checking OCSP via OpenSSL binary.
|
||||
# See https://github.com/certbot/certbot/issues/10291.
|
||||
# 13) Removing probe_sni from public acme, since it's only used in certbot-compatibility-test
|
||||
# after TLS-ALPN support is removed.
|
||||
|
|
@ -37,12 +36,7 @@ filterwarnings =
|
|||
ignore:.*datetime.utcfromtimestamp\(\) is deprecated:DeprecationWarning:dateutil
|
||||
ignore:Passing pyOpenSSL X509 objects is deprecated:DeprecationWarning
|
||||
ignore:Passing pyOpenSSL PKey objects is deprecated:DeprecationWarning
|
||||
ignore:alpn_protocols parameter is deprecated:DeprecationWarning
|
||||
ignore:SSLSocket is deprecated:DeprecationWarning
|
||||
ignore:TLSALPN01Server is deprecated:DeprecationWarning
|
||||
ignore:TLSALPN01Response is deprecated:DeprecationWarning
|
||||
ignore:TLSALPN01 is deprecated:DeprecationWarning
|
||||
ignore:TLSServer is deprecated:DeprecationWarning
|
||||
ignore:enforce_openssl_binary_usage parameter is deprecated:DeprecationWarning
|
||||
ignore:probe_sni is deprecated:DeprecationWarning
|
||||
ignore:Python 3.9 support will be dropped:DeprecationWarning
|
||||
|
|
|
|||
Loading…
Reference in a new issue