mirror of
https://github.com/certbot/certbot.git
synced 2026-05-28 04:34:11 -04:00
Reimplement tls-alpn-01 in acme (#6886)
This PR is the first part of work described in #6724. It reintroduces the tls-alpn-01 challenge in `acme` module, that was introduced by #5894 and reverted by #6100. The reason it was removed in the past is because some tests showed that with `1.0.2` branch of OpenSSL, the self-signed certificate containing the authorization key is sent to the requester even if the ALPN protocol `acme-tls/1` was not declared as supported by the requester during the TLS handshake. However recent discussions lead to the conclusion that this behavior was not a security issue, because first it is coherent with the behavior with servers that do not support ALPN at all, and second it cannot make a tls-alpn-01 challenge be validated in this kind of corner case. On top of the original modifications given by #5894, I merged the code to be up-to-date with our `master`, and fixed tests to match recent evolution about not displaying the `keyAuthorization` in the deserialized JSON form of an ACME challenge. I also move the logic to verify if ALPN is available on the current system, and so that the tls-alpn-01 challenge can be used, to a dedicated static function `is_available` in `acme.challenge.TLSALPN01`. This function is used in the related tests to skip them, and will be used in the future from Certbot plugins to trigger or not the logic related to tls-alpn-01, depending on the OpenSSL version available to Python. * Reimplement TLS-ALPN-01 challenge and standalone TLS-ALPN server from #5894. * Setup a class method to check if tls-alpn-01 is supported. * Add potential missing parameter in validation for tls-alpn * Improve comments * Make a class private * Handle old versions of openssl that do not terminate the handshake when they should do. * Add changelog * Explicitly close the TLS connection by the book. * Remove unused exception * Fix lint
This commit is contained in:
parent
2fd85a4f36
commit
07abe7a8d6
9 changed files with 425 additions and 47 deletions
|
|
@ -1,14 +1,20 @@
|
|||
"""ACME Identifier Validation Challenges."""
|
||||
import abc
|
||||
import codecs
|
||||
import functools
|
||||
import hashlib
|
||||
import logging
|
||||
import socket
|
||||
|
||||
from cryptography.hazmat.primitives import hashes # type: ignore
|
||||
import josepy as jose
|
||||
import requests
|
||||
import six
|
||||
from OpenSSL import SSL # type: ignore # https://github.com/python/typeshed/issues/2052
|
||||
from OpenSSL import crypto
|
||||
|
||||
from acme import crypto_util
|
||||
from acme import errors
|
||||
from acme import fields
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
@ -362,29 +368,163 @@ class HTTP01(KeyAuthorizationChallenge):
|
|||
|
||||
@ChallengeResponse.register
|
||||
class TLSALPN01Response(KeyAuthorizationChallengeResponse):
|
||||
"""ACME TLS-ALPN-01 challenge response.
|
||||
|
||||
This class only allows initiating a TLS-ALPN-01 challenge returned from the
|
||||
CA. Full support for responding to TLS-ALPN-01 challenges by generating and
|
||||
serving the expected response certificate is not currently provided.
|
||||
"""
|
||||
"""ACME tls-alpn-01 challenge response."""
|
||||
typ = "tls-alpn-01"
|
||||
|
||||
PORT = 443
|
||||
"""Verification port as defined by the protocol.
|
||||
|
||||
@Challenge.register
|
||||
You can override it (e.g. for testing) by passing ``port`` to
|
||||
`simple_verify`.
|
||||
|
||||
"""
|
||||
|
||||
ID_PE_ACME_IDENTIFIER_V1 = b"1.3.6.1.5.5.7.1.30.1"
|
||||
ACME_TLS_1_PROTOCOL = "acme-tls/1"
|
||||
|
||||
@property
|
||||
def h(self):
|
||||
"""Hash value stored in challenge certificate"""
|
||||
return hashlib.sha256(self.key_authorization.encode('utf-8')).digest()
|
||||
|
||||
def gen_cert(self, domain, key=None, bits=2048):
|
||||
"""Generate tls-alpn-01 certificate.
|
||||
|
||||
:param unicode domain: Domain verified by the challenge.
|
||||
:param OpenSSL.crypto.PKey key: Optional private key used in
|
||||
certificate generation. If not provided (``None``), then
|
||||
fresh key will be generated.
|
||||
:param int bits: Number of bits for newly generated key.
|
||||
|
||||
:rtype: `tuple` of `OpenSSL.crypto.X509` and `OpenSSL.crypto.PKey`
|
||||
|
||||
"""
|
||||
if key is None:
|
||||
key = crypto.PKey()
|
||||
key.generate_key(crypto.TYPE_RSA, bits)
|
||||
|
||||
|
||||
der_value = b"DER:" + codecs.encode(self.h, 'hex')
|
||||
acme_extension = crypto.X509Extension(self.ID_PE_ACME_IDENTIFIER_V1,
|
||||
critical=True, value=der_value)
|
||||
|
||||
return crypto_util.gen_ss_cert(key, [domain], force_san=True,
|
||||
extensions=[acme_extension]), key
|
||||
|
||||
def probe_cert(self, domain, host=None, port=None):
|
||||
"""Probe tls-alpn-01 challenge certificate.
|
||||
|
||||
:param unicode domain: domain being validated, required.
|
||||
:param string host: IP address used to probe the certificate.
|
||||
:param int port: Port used to probe the certificate.
|
||||
|
||||
"""
|
||||
if host is None:
|
||||
host = socket.gethostbyname(domain)
|
||||
logger.debug('%s resolved to %s', domain, host)
|
||||
if port is None:
|
||||
port = self.PORT
|
||||
|
||||
return crypto_util.probe_sni(host=host, port=port, name=domain,
|
||||
alpn_protocols=[self.ACME_TLS_1_PROTOCOL])
|
||||
|
||||
def verify_cert(self, domain, cert):
|
||||
"""Verify tls-alpn-01 challenge certificate.
|
||||
|
||||
:param unicode domain: Domain name being validated.
|
||||
:param OpensSSL.crypto.X509 cert: Challenge certificate.
|
||||
|
||||
:returns: Whether the certificate was successfully verified.
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
# pylint: disable=protected-access
|
||||
names = crypto_util._pyopenssl_cert_or_req_all_names(cert)
|
||||
logger.debug('Certificate %s. SANs: %s', cert.digest('sha256'), names)
|
||||
if len(names) != 1 or names[0].lower() != domain.lower():
|
||||
return False
|
||||
|
||||
for i in range(cert.get_extension_count()):
|
||||
ext = cert.get_extension(i)
|
||||
# FIXME: assume this is the ACME extension. Currently there is no
|
||||
# way to get full OID of an unknown extension from pyopenssl.
|
||||
if ext.get_short_name() == b'UNDEF':
|
||||
data = ext.get_data()
|
||||
return data == self.h
|
||||
|
||||
return False
|
||||
|
||||
# pylint: disable=too-many-arguments
|
||||
def simple_verify(self, chall, domain, account_public_key,
|
||||
cert=None, host=None, port=None):
|
||||
"""Simple verify.
|
||||
|
||||
Verify ``validation`` using ``account_public_key``, optionally
|
||||
probe tls-alpn-01 certificate and check using `verify_cert`.
|
||||
|
||||
:param .challenges.TLSALPN01 chall: Corresponding challenge.
|
||||
:param str domain: Domain name being validated.
|
||||
:param JWK account_public_key:
|
||||
:param OpenSSL.crypto.X509 cert: Optional certificate. If not
|
||||
provided (``None``) certificate will be retrieved using
|
||||
`probe_cert`.
|
||||
:param string host: IP address used to probe the certificate.
|
||||
:param int port: Port used to probe the certificate.
|
||||
|
||||
|
||||
:returns: ``True`` if and only if client's control of the domain has been verified.
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
if not self.verify(chall, account_public_key):
|
||||
logger.debug("Verification of key authorization in response failed")
|
||||
return False
|
||||
|
||||
if cert is None:
|
||||
try:
|
||||
cert = self.probe_cert(domain=domain, host=host, port=port)
|
||||
except errors.Error as error:
|
||||
logger.debug(str(error), exc_info=True)
|
||||
return False
|
||||
|
||||
return self.verify_cert(domain, cert)
|
||||
|
||||
|
||||
@Challenge.register # pylint: disable=too-many-ancestors
|
||||
class TLSALPN01(KeyAuthorizationChallenge):
|
||||
"""ACME tls-alpn-01 challenge.
|
||||
|
||||
This class simply allows parsing the TLS-ALPN-01 challenge returned from
|
||||
the CA. Full TLS-ALPN-01 support is not currently provided.
|
||||
|
||||
"""
|
||||
typ = "tls-alpn-01"
|
||||
"""ACME tls-alpn-01 challenge."""
|
||||
response_cls = TLSALPN01Response
|
||||
typ = response_cls.typ
|
||||
|
||||
def validation(self, account_key, **kwargs):
|
||||
"""Generate validation for the challenge."""
|
||||
raise NotImplementedError()
|
||||
"""Generate validation.
|
||||
|
||||
:param JWK account_key:
|
||||
:param unicode domain: Domain verified by the challenge.
|
||||
:param OpenSSL.crypto.PKey cert_key: Optional private key used
|
||||
in certificate generation. If not provided (``None``), then
|
||||
fresh key will be generated.
|
||||
|
||||
:rtype: `tuple` of `OpenSSL.crypto.X509` and `OpenSSL.crypto.PKey`
|
||||
|
||||
"""
|
||||
return self.response(account_key).gen_cert(
|
||||
key=kwargs.get('cert_key'),
|
||||
domain=kwargs.get('domain'))
|
||||
|
||||
@staticmethod
|
||||
def is_supported():
|
||||
"""
|
||||
Check if TLS-ALPN-01 challenge is supported on this machine.
|
||||
This implies that a recent version of OpenSSL is installed (>= 1.0.2),
|
||||
or a recent cryptography version shipped with the OpenSSL library is installed.
|
||||
|
||||
:returns: ``True`` if TLS-ALPN-01 is supported on this machine, ``False`` otherwise.
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
return (hasattr(SSL.Connection, "set_alpn_protos")
|
||||
and hasattr(SSL.Context, "set_alpn_select_callback"))
|
||||
|
||||
|
||||
@Challenge.register
|
||||
|
|
|
|||
|
|
@ -27,19 +27,41 @@ logger = logging.getLogger(__name__)
|
|||
_DEFAULT_SSL_METHOD = SSL.SSLv23_METHOD # type: ignore
|
||||
|
||||
|
||||
class SSLSocket(object):
|
||||
class _DefaultCertSelection(object):
|
||||
def __init__(self, certs):
|
||||
self.certs = certs
|
||||
|
||||
def __call__(self, connection):
|
||||
server_name = connection.get_servername()
|
||||
return self.certs.get(server_name, None)
|
||||
|
||||
|
||||
class SSLSocket(object): # pylint: disable=too-few-public-methods
|
||||
"""SSL wrapper for sockets.
|
||||
|
||||
:ivar socket sock: Original wrapped socket.
|
||||
:ivar dict certs: Mapping from domain names (`bytes`) to
|
||||
`OpenSSL.crypto.X509`.
|
||||
:ivar method: See `OpenSSL.SSL.Context` for allowed values.
|
||||
:ivar alpn_selection: Hook to select negotiated ALPN protocol for
|
||||
connection.
|
||||
:ivar cert_selection: Hook to select certificate for connection. If given,
|
||||
`certs` parameter would be ignored, and therefore must be empty.
|
||||
|
||||
"""
|
||||
def __init__(self, sock, certs, method=_DEFAULT_SSL_METHOD):
|
||||
def __init__(self, sock, certs=None,
|
||||
method=_DEFAULT_SSL_METHOD, alpn_selection=None,
|
||||
cert_selection=None):
|
||||
self.sock = sock
|
||||
self.certs = certs
|
||||
self.alpn_selection = alpn_selection
|
||||
self.method = method
|
||||
if not cert_selection and not certs:
|
||||
raise ValueError("Neither cert_selection or certs specified.")
|
||||
if cert_selection and certs:
|
||||
raise ValueError("Both cert_selection and certs specified.")
|
||||
if cert_selection is None:
|
||||
cert_selection = _DefaultCertSelection(certs)
|
||||
self.cert_selection = cert_selection
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self.sock, name)
|
||||
|
|
@ -56,18 +78,19 @@ class SSLSocket(object):
|
|||
:type connection: :class:`OpenSSL.Connection`
|
||||
|
||||
"""
|
||||
server_name = connection.get_servername()
|
||||
try:
|
||||
key, cert = self.certs[server_name]
|
||||
except KeyError:
|
||||
logger.debug("Server name (%s) not recognized, dropping SSL",
|
||||
server_name)
|
||||
pair = self.cert_selection(connection)
|
||||
if pair is None:
|
||||
logger.debug("Certificate selection for server name %s failed, dropping SSL",
|
||||
connection.get_servername())
|
||||
return
|
||||
key, cert = pair
|
||||
new_context = SSL.Context(self.method)
|
||||
new_context.set_options(SSL.OP_NO_SSLv2)
|
||||
new_context.set_options(SSL.OP_NO_SSLv3)
|
||||
new_context.use_privatekey(key)
|
||||
new_context.use_certificate(cert)
|
||||
if self.alpn_selection is not None:
|
||||
new_context.set_alpn_select_callback(self.alpn_selection)
|
||||
connection.set_context(new_context)
|
||||
|
||||
class FakeConnection(object):
|
||||
|
|
@ -92,6 +115,8 @@ class SSLSocket(object):
|
|||
context.set_options(SSL.OP_NO_SSLv2)
|
||||
context.set_options(SSL.OP_NO_SSLv3)
|
||||
context.set_tlsext_servername_callback(self._pick_certificate_cb)
|
||||
if self.alpn_selection is not None:
|
||||
context.set_alpn_select_callback(self.alpn_selection)
|
||||
|
||||
ssl_sock = self.FakeConnection(SSL.Connection(context, sock))
|
||||
ssl_sock.set_accept_state()
|
||||
|
|
@ -107,8 +132,9 @@ class SSLSocket(object):
|
|||
return ssl_sock, addr
|
||||
|
||||
|
||||
def probe_sni(name, host, port=443, timeout=300,
|
||||
method=_DEFAULT_SSL_METHOD, source_address=('', 0)):
|
||||
def probe_sni(name, host, port=443, timeout=300, # pylint: disable=too-many-arguments
|
||||
method=_DEFAULT_SSL_METHOD, source_address=('', 0),
|
||||
alpn_protocols=None):
|
||||
"""Probe SNI server for SSL certificate.
|
||||
|
||||
:param bytes name: Byte string to send as the server name in the
|
||||
|
|
@ -120,6 +146,8 @@ def probe_sni(name, host, port=443, timeout=300,
|
|||
:param tuple source_address: Enables multi-path probing (selection
|
||||
of source interface). See `socket.creation_connection` for more
|
||||
info. Available only in Python 2.7+.
|
||||
:param alpn_protocols: Protocols to request using ALPN.
|
||||
:type alpn_protocols: `list` of `bytes`
|
||||
|
||||
:raises acme.errors.Error: In case of any problems.
|
||||
|
||||
|
|
@ -149,6 +177,8 @@ def probe_sni(name, host, port=443, timeout=300,
|
|||
client_ssl = SSL.Connection(context, client)
|
||||
client_ssl.set_connect_state()
|
||||
client_ssl.set_tlsext_host_name(name) # pyOpenSSL>=0.13
|
||||
if alpn_protocols is not None:
|
||||
client_ssl.set_alpn_protos(alpn_protocols)
|
||||
try:
|
||||
client_ssl.do_handshake()
|
||||
client_ssl.shutdown()
|
||||
|
|
@ -239,12 +269,14 @@ def _pyopenssl_cert_or_req_san(cert_or_req):
|
|||
|
||||
|
||||
def gen_ss_cert(key, domains, not_before=None,
|
||||
validity=(7 * 24 * 60 * 60), force_san=True):
|
||||
validity=(7 * 24 * 60 * 60), force_san=True, extensions=None):
|
||||
"""Generate new self-signed certificate.
|
||||
|
||||
:type domains: `list` of `unicode`
|
||||
:param OpenSSL.crypto.PKey key:
|
||||
:param bool force_san:
|
||||
:param extensions: List of additional extensions to include in the cert.
|
||||
:type extensions: `list` of `OpenSSL.crypto.X509Extension`
|
||||
|
||||
If more than one domain is provided, all of the domains are put into
|
||||
``subjectAltName`` X.509 extension and first domain is set as the
|
||||
|
|
@ -257,10 +289,13 @@ def gen_ss_cert(key, domains, not_before=None,
|
|||
cert.set_serial_number(int(binascii.hexlify(os.urandom(16)), 16))
|
||||
cert.set_version(2)
|
||||
|
||||
extensions = [
|
||||
if extensions is None:
|
||||
extensions = []
|
||||
|
||||
extensions.append(
|
||||
crypto.X509Extension(
|
||||
b"basicConstraints", True, b"CA:TRUE, pathlen:0"),
|
||||
]
|
||||
)
|
||||
|
||||
cert.get_subject().CN = domains[0]
|
||||
# TODO: what to put into cert.get_subject()?
|
||||
|
|
|
|||
|
|
@ -33,7 +33,14 @@ class TLSServer(socketserver.TCPServer):
|
|||
|
||||
def _wrap_sock(self):
|
||||
self.socket = crypto_util.SSLSocket(
|
||||
self.socket, certs=self.certs, method=self.method)
|
||||
self.socket, cert_selection=self._cert_selection,
|
||||
alpn_selection=getattr(self, '_alpn_selection', None),
|
||||
method=self.method)
|
||||
|
||||
def _cert_selection(self, connection): # pragma: no cover
|
||||
"""Callback selecting certificate for connection."""
|
||||
server_name = connection.get_servername()
|
||||
return self.certs.get(server_name, None)
|
||||
|
||||
def server_bind(self):
|
||||
self._wrap_sock()
|
||||
|
|
@ -120,6 +127,40 @@ class BaseDualNetworkedServers(object):
|
|||
self.threads = []
|
||||
|
||||
|
||||
class TLSALPN01Server(TLSServer, ACMEServerMixin):
|
||||
"""TLSALPN01 Server."""
|
||||
|
||||
ACME_TLS_1_PROTOCOL = b"acme-tls/1"
|
||||
|
||||
def __init__(self, server_address, certs, challenge_certs, ipv6=False):
|
||||
TLSServer.__init__(
|
||||
self, server_address, _BaseRequestHandlerWithLogging, certs=certs,
|
||||
ipv6=ipv6)
|
||||
self.challenge_certs = challenge_certs
|
||||
|
||||
def _cert_selection(self, connection):
|
||||
# TODO: We would like to serve challenge cert only if asked for it via
|
||||
# ALPN. To do this, we need to retrieve the list of protos from client
|
||||
# hello, but this is currently impossible with openssl [0], and ALPN
|
||||
# negotiation is done after cert selection.
|
||||
# Therefore, currently we always return challenge cert, and terminate
|
||||
# handshake in alpn_selection() if ALPN protos are not what we expect.
|
||||
# [0] https://github.com/openssl/openssl/issues/4952
|
||||
server_name = connection.get_servername()
|
||||
logger.debug("Serving challenge cert for server name %s", server_name)
|
||||
return self.challenge_certs.get(server_name, None)
|
||||
|
||||
def _alpn_selection(self, _connection, alpn_protos):
|
||||
"""Callback to select alpn protocol."""
|
||||
if len(alpn_protos) == 1 and alpn_protos[0] == self.ACME_TLS_1_PROTOCOL:
|
||||
logger.debug("Agreed on %s ALPN", self.ACME_TLS_1_PROTOCOL)
|
||||
return self.ACME_TLS_1_PROTOCOL
|
||||
logger.debug("Cannot agree on ALPN proto. Got: %s", str(alpn_protos))
|
||||
# Explicitly close the connection now, by returning an empty string.
|
||||
# See https://www.pyopenssl.org/en/stable/api/ssl.html#OpenSSL.SSL.Context.set_alpn_select_callback # pylint: disable=line-too-long
|
||||
return b""
|
||||
|
||||
|
||||
class HTTPServer(BaseHTTPServer.HTTPServer):
|
||||
"""Generic HTTP Server."""
|
||||
|
||||
|
|
@ -222,3 +263,16 @@ class HTTP01RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
|
|||
"""
|
||||
return functools.partial(
|
||||
cls, simple_http_resources=simple_http_resources)
|
||||
|
||||
|
||||
class _BaseRequestHandlerWithLogging(socketserver.BaseRequestHandler):
|
||||
"""BaseRequestHandler with logging."""
|
||||
|
||||
def log_message(self, format, *args): # pylint: disable=redefined-builtin
|
||||
"""Log arbitrary message."""
|
||||
logger.debug("%s - - %s", self.client_address[0], format % args)
|
||||
|
||||
def handle(self):
|
||||
"""Handle request."""
|
||||
self.log_message("Incoming request")
|
||||
socketserver.BaseRequestHandler.handle(self)
|
||||
|
|
|
|||
|
|
@ -2,10 +2,13 @@
|
|||
import unittest
|
||||
|
||||
import josepy as jose
|
||||
import OpenSSL
|
||||
import mock
|
||||
import requests
|
||||
from six.moves.urllib import parse as urllib_parse
|
||||
|
||||
from acme import errors
|
||||
|
||||
import test_util
|
||||
|
||||
CERT = test_util.load_comparable_cert('cert.pem')
|
||||
|
|
@ -256,30 +259,87 @@ class HTTP01Test(unittest.TestCase):
|
|||
class TLSALPN01ResponseTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
from acme.challenges import TLSALPN01Response
|
||||
self.msg = TLSALPN01Response(key_authorization=u'foo')
|
||||
from acme.challenges import TLSALPN01
|
||||
self.chall = TLSALPN01(
|
||||
token=jose.b64decode(b'a82d5ff8ef740d12881f6d3c2277ab2e'))
|
||||
self.domain = u'example.com'
|
||||
self.domain2 = u'example2.com'
|
||||
|
||||
self.response = self.chall.response(KEY)
|
||||
self.jmsg = {
|
||||
'resource': 'challenge',
|
||||
'type': 'tls-alpn-01',
|
||||
'keyAuthorization': u'foo',
|
||||
'keyAuthorization': self.response.key_authorization,
|
||||
}
|
||||
|
||||
from acme.challenges import TLSALPN01
|
||||
self.chall = TLSALPN01(token=(b'x' * 16))
|
||||
self.response = self.chall.response(KEY)
|
||||
|
||||
def test_to_partial_json(self):
|
||||
self.assertEqual({k: v for k, v in self.jmsg.items() if k != 'keyAuthorization'},
|
||||
self.msg.to_partial_json())
|
||||
self.response.to_partial_json())
|
||||
|
||||
def test_from_json(self):
|
||||
from acme.challenges import TLSALPN01Response
|
||||
self.assertEqual(self.msg, TLSALPN01Response.from_json(self.jmsg))
|
||||
self.assertEqual(self.response, TLSALPN01Response.from_json(self.jmsg))
|
||||
|
||||
def test_from_json_hashable(self):
|
||||
from acme.challenges import TLSALPN01Response
|
||||
hash(TLSALPN01Response.from_json(self.jmsg))
|
||||
|
||||
def test_gen_verify_cert(self):
|
||||
key1 = test_util.load_pyopenssl_private_key('rsa512_key.pem')
|
||||
cert, key2 = self.response.gen_cert(self.domain, key1)
|
||||
self.assertEqual(key1, key2)
|
||||
self.assertTrue(self.response.verify_cert(self.domain, cert))
|
||||
|
||||
def test_gen_verify_cert_gen_key(self):
|
||||
cert, key = self.response.gen_cert(self.domain)
|
||||
self.assertTrue(isinstance(key, OpenSSL.crypto.PKey))
|
||||
self.assertTrue(self.response.verify_cert(self.domain, cert))
|
||||
|
||||
def test_verify_bad_cert(self):
|
||||
self.assertFalse(self.response.verify_cert(self.domain,
|
||||
test_util.load_cert('cert.pem')))
|
||||
|
||||
def test_verify_bad_domain(self):
|
||||
key1 = test_util.load_pyopenssl_private_key('rsa512_key.pem')
|
||||
cert, key2 = self.response.gen_cert(self.domain, key1)
|
||||
self.assertEqual(key1, key2)
|
||||
self.assertFalse(self.response.verify_cert(self.domain2, cert))
|
||||
|
||||
def test_simple_verify_bad_key_authorization(self):
|
||||
key2 = jose.JWKRSA.load(test_util.load_vector('rsa256_key.pem'))
|
||||
self.response.simple_verify(self.chall, "local", key2.public_key())
|
||||
|
||||
@mock.patch('acme.challenges.TLSALPN01Response.verify_cert', autospec=True)
|
||||
def test_simple_verify(self, mock_verify_cert):
|
||||
mock_verify_cert.return_value = mock.sentinel.verification
|
||||
self.assertEqual(
|
||||
mock.sentinel.verification, self.response.simple_verify(
|
||||
self.chall, self.domain, KEY.public_key(),
|
||||
cert=mock.sentinel.cert))
|
||||
mock_verify_cert.assert_called_once_with(
|
||||
self.response, self.domain, mock.sentinel.cert)
|
||||
|
||||
@mock.patch('acme.challenges.socket.gethostbyname')
|
||||
@mock.patch('acme.challenges.crypto_util.probe_sni')
|
||||
def test_probe_cert(self, mock_probe_sni, mock_gethostbyname):
|
||||
mock_gethostbyname.return_value = '127.0.0.1'
|
||||
self.response.probe_cert('foo.com')
|
||||
mock_gethostbyname.assert_called_once_with('foo.com')
|
||||
mock_probe_sni.assert_called_once_with(
|
||||
host='127.0.0.1', port=self.response.PORT, name='foo.com',
|
||||
alpn_protocols=['acme-tls/1'])
|
||||
|
||||
self.response.probe_cert('foo.com', host='8.8.8.8')
|
||||
mock_probe_sni.assert_called_with(
|
||||
host='8.8.8.8', port=mock.ANY, name='foo.com',
|
||||
alpn_protocols=['acme-tls/1'])
|
||||
|
||||
@mock.patch('acme.challenges.TLSALPN01Response.probe_cert')
|
||||
def test_simple_verify_false_on_probe_error(self, mock_probe_cert):
|
||||
mock_probe_cert.side_effect = errors.Error
|
||||
self.assertFalse(self.response.simple_verify(
|
||||
self.chall, self.domain, KEY.public_key()))
|
||||
|
||||
|
||||
class TLSALPN01Test(unittest.TestCase):
|
||||
|
||||
|
|
@ -309,8 +369,13 @@ class TLSALPN01Test(unittest.TestCase):
|
|||
self.assertRaises(
|
||||
jose.DeserializationError, TLSALPN01.from_json, self.jmsg)
|
||||
|
||||
def test_validation(self):
|
||||
self.assertRaises(NotImplementedError, self.msg.validation, KEY)
|
||||
@mock.patch('acme.challenges.TLSALPN01Response.gen_cert')
|
||||
def test_validation(self, mock_gen_cert):
|
||||
mock_gen_cert.return_value = ('cert', 'key')
|
||||
self.assertEqual(('cert', 'key'), self.msg.validation(
|
||||
KEY, cert_key=mock.sentinel.cert_key, domain=mock.sentinel.domain))
|
||||
mock_gen_cert.assert_called_once_with(key=mock.sentinel.cert_key,
|
||||
domain=mock.sentinel.domain)
|
||||
|
||||
|
||||
class DNSTest(unittest.TestCase):
|
||||
|
|
|
|||
|
|
@ -18,7 +18,6 @@ import test_util
|
|||
class SSLSocketAndProbeSNITest(unittest.TestCase):
|
||||
"""Tests for acme.crypto_util.SSLSocket/probe_sni."""
|
||||
|
||||
|
||||
def setUp(self):
|
||||
self.cert = test_util.load_comparable_cert('rsa2048_cert.pem')
|
||||
key = test_util.load_pyopenssl_private_key('rsa2048_key.pem')
|
||||
|
|
@ -32,7 +31,8 @@ class SSLSocketAndProbeSNITest(unittest.TestCase):
|
|||
# six.moves.* | pylint: disable=attribute-defined-outside-init,no-init
|
||||
|
||||
def server_bind(self): # pylint: disable=missing-docstring
|
||||
self.socket = SSLSocket(socket.socket(), certs=certs)
|
||||
self.socket = SSLSocket(socket.socket(),
|
||||
certs)
|
||||
socketserver.TCPServer.server_bind(self)
|
||||
|
||||
self.server = _TestServer(('', 0), socketserver.BaseRequestHandler)
|
||||
|
|
@ -73,6 +73,18 @@ class SSLSocketAndProbeSNITest(unittest.TestCase):
|
|||
socket.setdefaulttimeout(original_timeout)
|
||||
|
||||
|
||||
class SSLSocketTest(unittest.TestCase):
|
||||
"""Tests for acme.crypto_util.SSLSocket."""
|
||||
|
||||
def test_ssl_socket_invalid_arguments(self):
|
||||
from acme.crypto_util import SSLSocket
|
||||
with self.assertRaises(ValueError):
|
||||
_ = SSLSocket(None, {'sni': ('key', 'cert')},
|
||||
cert_selection=lambda _: None)
|
||||
with self.assertRaises(ValueError):
|
||||
_ = SSLSocket(None)
|
||||
|
||||
|
||||
class PyOpenSSLCertOrReqAllNamesTest(unittest.TestCase):
|
||||
"""Test for acme.crypto_util._pyopenssl_cert_or_req_all_names."""
|
||||
|
||||
|
|
|
|||
|
|
@ -10,7 +10,10 @@ from six.moves import http_client # pylint: disable=import-error
|
|||
from six.moves import socketserver # type: ignore # pylint: disable=import-error
|
||||
|
||||
from acme import challenges
|
||||
from acme import crypto_util
|
||||
from acme import errors
|
||||
from acme.magic_typing import Set # pylint: disable=unused-import, no-name-in-module
|
||||
|
||||
import test_util
|
||||
|
||||
|
||||
|
|
@ -84,6 +87,59 @@ class HTTP01ServerTest(unittest.TestCase):
|
|||
self.assertFalse(self._test_http01(add=False))
|
||||
|
||||
|
||||
@unittest.skipIf(not challenges.TLSALPN01.is_supported(), "pyOpenSSL too old")
|
||||
class TLSALPN01ServerTest(unittest.TestCase):
|
||||
"""Test for acme.standalone.TLSALPN01Server."""
|
||||
|
||||
def setUp(self):
|
||||
self.certs = {b'localhost': (
|
||||
test_util.load_pyopenssl_private_key('rsa2048_key.pem'),
|
||||
test_util.load_cert('rsa2048_cert.pem'),
|
||||
)}
|
||||
# Use different certificate for challenge.
|
||||
self.challenge_certs = {b'localhost': (
|
||||
test_util.load_pyopenssl_private_key('rsa1024_key.pem'),
|
||||
test_util.load_cert('rsa1024_cert.pem'),
|
||||
)}
|
||||
from acme.standalone import TLSALPN01Server
|
||||
self.server = TLSALPN01Server(("localhost", 0), certs=self.certs,
|
||||
challenge_certs=self.challenge_certs)
|
||||
# pylint: disable=no-member
|
||||
self.thread = threading.Thread(target=self.server.serve_forever)
|
||||
self.thread.start()
|
||||
|
||||
def tearDown(self):
|
||||
self.server.shutdown() # pylint: disable=no-member
|
||||
self.thread.join()
|
||||
|
||||
# TODO: This is not implemented yet, see comments in standalone.py
|
||||
# def test_certs(self):
|
||||
# host, port = self.server.socket.getsockname()[:2]
|
||||
# cert = crypto_util.probe_sni(
|
||||
# b'localhost', host=host, port=port, timeout=1)
|
||||
# # Expect normal cert when connecting without ALPN.
|
||||
# self.assertEqual(jose.ComparableX509(cert),
|
||||
# jose.ComparableX509(self.certs[b'localhost'][1]))
|
||||
|
||||
def test_challenge_certs(self):
|
||||
host, port = self.server.socket.getsockname()[:2]
|
||||
cert = crypto_util.probe_sni(
|
||||
b'localhost', host=host, port=port, timeout=1,
|
||||
alpn_protocols=[b"acme-tls/1"])
|
||||
# Expect challenge cert when connecting with ALPN.
|
||||
self.assertEqual(
|
||||
jose.ComparableX509(cert),
|
||||
jose.ComparableX509(self.challenge_certs[b'localhost'][1])
|
||||
)
|
||||
|
||||
def test_bad_alpn(self):
|
||||
host, port = self.server.socket.getsockname()[:2]
|
||||
with self.assertRaises(errors.Error):
|
||||
crypto_util.probe_sni(
|
||||
b'localhost', host=host, port=port, timeout=1,
|
||||
alpn_protocols=[b"bad-alpn"])
|
||||
|
||||
|
||||
class BaseDualNetworkedServersTest(unittest.TestCase):
|
||||
"""Test for acme.standalone.BaseDualNetworkedServers."""
|
||||
|
||||
|
|
@ -138,7 +194,6 @@ class BaseDualNetworkedServersTest(unittest.TestCase):
|
|||
class HTTP01DualNetworkedServersTest(unittest.TestCase):
|
||||
"""Tests for acme.standalone.HTTP01DualNetworkedServers."""
|
||||
|
||||
|
||||
def setUp(self):
|
||||
self.account_key = jose.JWK.load(
|
||||
test_util.load_vector('rsa1024_key.pem'))
|
||||
|
|
|
|||
6
acme/tests/testdata/README
vendored
6
acme/tests/testdata/README
vendored
|
|
@ -10,6 +10,8 @@ and for the CSR:
|
|||
|
||||
openssl req -key rsa2048_key.pem -new -subj '/CN=example.com' -outform DER > csr.der
|
||||
|
||||
and for the certificate:
|
||||
and for the certificates:
|
||||
|
||||
openssl req -key rsa2047_key.pem -new -subj '/CN=example.com' -x509 -outform DER > cert.der
|
||||
openssl req -key rsa2048_key.pem -new -subj '/CN=example.com' -x509 -outform DER > cert.der
|
||||
openssl req -key rsa2048_key.pem -new -subj '/CN=example.com' -x509 > rsa2048_cert.pem
|
||||
openssl req -key rsa1024_key.pem -new -subj '/CN=example.com' -x509 > rsa1024_cert.pem
|
||||
|
|
|
|||
13
acme/tests/testdata/rsa1024_cert.pem
vendored
Normal file
13
acme/tests/testdata/rsa1024_cert.pem
vendored
Normal file
|
|
@ -0,0 +1,13 @@
|
|||
-----BEGIN CERTIFICATE-----
|
||||
MIIB/TCCAWagAwIBAgIJAOyRIBs3QT8QMA0GCSqGSIb3DQEBCwUAMBYxFDASBgNV
|
||||
BAMMC2V4YW1wbGUuY29tMB4XDTE4MDQyMzEwMzE0NFoXDTE4MDUyMzEwMzE0NFow
|
||||
FjEUMBIGA1UEAwwLZXhhbXBsZS5jb20wgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJ
|
||||
AoGBAJqJ87R8aVwByONxgQA9hwgvQd/QqI1r1UInXhEF2VnEtZGtUWLi100IpIqr
|
||||
Mq4qusDwNZ3g8cUPtSkvJGs89djoajMDIJP7lQUEKUYnYrI0q755Tr/DgLWSk7iW
|
||||
l5ezym0VzWUD0/xXUz8yRbNMTjTac80rS5SZk2ja2wWkYlRJAgMBAAGjUzBRMB0G
|
||||
A1UdDgQWBBSsaX0IVZ4XXwdeffVAbG7gnxSYjTAfBgNVHSMEGDAWgBSsaX0IVZ4X
|
||||
XwdeffVAbG7gnxSYjTAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4GB
|
||||
ADe7SVmvGH2nkwVfONk8TauRUDkePN1CJZKFb2zW1uO9ANJ2v5Arm/OQp0BG/xnI
|
||||
Djw/aLTNVESF89oe15dkrUErtcaF413MC1Ld5lTCaJLHLGqDKY69e02YwRuxW7jY
|
||||
qarpt7k7aR5FbcfO5r4V/FK/Gvp4Dmoky8uap7SJIW6x
|
||||
-----END CERTIFICATE-----
|
||||
|
|
@ -11,6 +11,8 @@ Certbot adheres to [Semantic Versioning](https://semver.org/).
|
|||
the `manual` plugin: `CERTBOT_REMAINING_CHALLENGES` is equal to the number of challenges
|
||||
remaining after the current challenge, `CERTBOT_ALL_DOMAINS` is a comma-separated list
|
||||
of all domains challenged for the current certificate.
|
||||
* Added TLS-ALPN-01 challenge support in the `acme` library. Support of this
|
||||
challenge in the Certbot client is planned to be added in a future release.
|
||||
|
||||
### Changed
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue