Simple DVSNI verification

This commit is contained in:
Jakub Warmuz 2015-06-19 14:37:41 +00:00
parent 2f2137ef6b
commit ccc6a3212b
No known key found for this signature in database
GPG key ID: 2A7BAD3A489B52EA
5 changed files with 124 additions and 1 deletions

View file

@ -4,13 +4,18 @@ import functools
import hashlib
import logging
import os
import socket
import OpenSSL
import requests
from acme import crypto_util
from acme import interfaces
from acme import jose
from acme import other
from letsencrypt import crypto_util as le_crypto_util
logger = logging.getLogger(__name__)
@ -192,6 +197,18 @@ class DVSNI(DVChallenge):
"""Domain name used in SNI."""
return binascii.hexlify(self.nonce) + self.DOMAIN_SUFFIX
def probe_cert(self, domain, **kwargs):
"""Probe DVSNI challenge certificate."""
host = socket.gethostbyname(domain)
logging.debug('%s resolved to %s', domain, host)
kwargs.setdefault("port", self.PORT)
kwargs.setdefault("host", host)
kwargs["server_hostname"] = self.nonce_domain
# TODO: try different methods?
# pylint: disable=protected-access
return crypto_util._probe_sni(**kwargs)
@ChallengeResponse.register
class DVSNIResponse(ChallengeResponse):
@ -231,6 +248,39 @@ class DVSNIResponse(ChallengeResponse):
"""Domain name for certificate subjectAltName."""
return self.z(chall) + self.DOMAIN_SUFFIX
def simple_verify(self, chall, domain, key, **kwargs):
"""Verify DVSNI.
:param .challenges.DVSNI chall: Corresponding challenge.
:param str domain: Domain name being validated.
:param OpenSSL.crypto.PKey key: Public key for the key pair
being authorized. If ``None`` key verification is not
performed!
:returns: ``True`` iff client's control of the domain has been
verified, ``False`` otherwise.
:rtype: bool
"""
cert = chall.probe_cert(domain=domain, **kwargs)
# TODO: check "It is a valid self-signed certificate" and
# return False if not
# pylint: disable=protected-access
sans = le_crypto_util._pyopenssl_cert_or_req_san(cert)
logging.debug('Certificate %s. SANs: %s', cert.digest('sha1'), sans)
key_filetype = OpenSSL.crypto.FILETYPE_ASN1
if key is None:
logging.warn('No key verification is performed')
keys_match = key is None or OpenSSL.crypto.dump_privatekey(
key_filetype, key) == OpenSSL.crypto.dump_privatekey(
key_filetype, cert.get_pubkey())
return (keys_match and domain in sans and
self.z_domain(chall) in sans)
@Challenge.register
class RecoveryContact(ContinuityChallenge):
"""ACME "recoveryContact" challenge."""

View file

@ -181,6 +181,33 @@ class DVSNITest(unittest.TestCase):
self.assertRaises(
jose.DeserializationError, DVSNI.from_json, self.jmsg)
@mock.patch('acme.challenges.socket.gethostbyname')
@mock.patch('acme.challenges.crypto_util._probe_sni')
def test_probe_cert(self, mock_probe_sni, mock_gethostbyname):
mock_gethostbyname.return_value = '127.0.0.1'
self.msg.probe_cert('foo.com')
mock_gethostbyname.assert_called_once_with('foo.com')
mock_probe_sni.assert_called_once_with(
host='127.0.0.1', port=self.msg.PORT,
server_hostname='a82d5ff8ef740d12881f6d3c2277ab2e.acme.invalid')
self.msg.probe_cert('foo.com', host='8.8.8.8')
mock_probe_sni.assert_called_with(
host='8.8.8.8', port=mock.ANY, server_hostname=mock.ANY)
self.msg.probe_cert('foo.com', port=1234)
mock_probe_sni.assert_called_with(
host=mock.ANY, port=1234, server_hostname=mock.ANY)
self.msg.probe_cert('foo.com', bar='baz')
mock_probe_sni.assert_called_with(
host=mock.ANY, port=mock.ANY, server_hostname=mock.ANY, bar='baz')
self.msg.probe_cert('foo.com', server_hostname='xxx')
mock_probe_sni.assert_called_with(
host=mock.ANY, port=mock.ANY,
server_hostname='a82d5ff8ef740d12881f6d3c2277ab2e.acme.invalid')
class DVSNIResponseTest(unittest.TestCase):
@ -218,6 +245,13 @@ class DVSNIResponseTest(unittest.TestCase):
from acme.challenges import DVSNIResponse
hash(DVSNIResponse.from_json(self.jmsg))
def test_simple_verify(self): # TODO
chall = mock.MagicMock()
chall.probe_cert.return_value = OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, test_util.load_vector('cert.pem'))
self.assertFalse(self.msg.simple_verify(chall, "example.com", key=None))
# TODO: key not None
class RecoveryContactTest(unittest.TestCase):

19
acme/acme/crypto_util.py Normal file
View file

@ -0,0 +1,19 @@
"""Crypto utilities."""
import socket
import OpenSSL
def _probe_sni(server_hostname, host, port, timeout=10,
method=OpenSSL.SSL.SSLv23_METHOD):
sock = socket.create_connection((host, port), source_address=('0', 0))
context = OpenSSL.SSL.Context(method)
context.set_timeout(timeout)
connection = OpenSSL.SSL.Connection(context, sock)
connection.set_tlsext_host_name(server_hostname) # pyOpenSSL>=0.13
connection.set_connect_state()
connection.do_handshake()
cert = connection.get_peer_certificate()
sock.close()
# TODO: shutdown()
return cert

View file

@ -0,0 +1,20 @@
"""Tests for acme.crypto_util."""
import socket
import unittest
import OpenSSL
class ProbeSNITest(unittest.TestCase):
"""Tests for acme.crypto_util._probe_sni."""
def test_it(self):
from acme.crypto_util import _probe_sni
# TODO: mock this out
cert = _probe_sni(
"google.com", socket.gethostbyname("google.com"), port=443)
self.assertTrue(isinstance(cert, OpenSSL.crypto.X509))
if __name__ == "__main__":
unittest.main() # pragma: no cover

View file

@ -11,7 +11,7 @@ install_requires = [
'pyrfc3339',
'ndg-httpsclient', # urllib3 InsecurePlatformWarning (#304)
'pyasn1', # urllib3 InsecurePlatformWarning (#304)
'PyOpenSSL',
'PyOpenSSL>=0.13', # Connection.set_tlsext_host_name
'pytz',
'requests',
'werkzeug',