Kill dvsni in acme

This commit is contained in:
Jakub Warmuz 2015-11-07 18:17:35 +00:00
parent 3ac4df4f48
commit bbb7606fe1
No known key found for this signature in database
GPG key ID: 2A7BAD3A489B52EA
3 changed files with 2 additions and 319 deletions

View file

@ -440,168 +440,6 @@ class TLSSNI01(KeyAuthorizationChallenge):
return self.response(account_key).gen_cert(key=kwargs.get('cert_key'))
@Challenge.register # pylint: disable=too-many-ancestors
class DVSNI(_TokenDVChallenge):
"""ACME "dvsni" challenge.
:ivar bytes token: Random data, **not** base64-encoded.
"""
typ = "dvsni"
PORT = 443
"""Port to perform DVSNI challenge."""
def gen_response(self, account_key, alg=jose.RS256, **kwargs):
"""Generate response.
:param .JWK account_key: Private account key.
:rtype: .DVSNIResponse
"""
return DVSNIResponse(validation=jose.JWS.sign(
payload=self.json_dumps(sort_keys=True).encode('utf-8'),
key=account_key, alg=alg, **kwargs))
@ChallengeResponse.register
class DVSNIResponse(ChallengeResponse):
"""ACME "dvsni" challenge response.
:param bytes s: Random data, **not** base64-encoded.
"""
typ = "dvsni"
DOMAIN_SUFFIX = b".acme.invalid"
"""Domain name suffix."""
PORT = DVSNI.PORT
"""Port to perform DVSNI challenge."""
validation = jose.Field("validation", decoder=jose.JWS.from_json)
@property
def z(self): # pylint: disable=invalid-name
"""The ``z`` parameter.
:rtype: bytes
"""
# Instance of 'Field' has no 'signature' member
# pylint: disable=no-member
return hashlib.sha256(self.validation.signature.encode(
"signature").encode("utf-8")).hexdigest().encode()
@property
def z_domain(self):
"""Domain name for certificate subjectAltName.
:rtype: bytes
"""
z = self.z # pylint: disable=invalid-name
return z[:32] + b'.' + z[32:] + self.DOMAIN_SUFFIX
@property
def chall(self):
"""Get challenge encoded in the `validation` payload.
:rtype: challenges.DVSNI
"""
# pylint: disable=no-member
return DVSNI.json_loads(self.validation.payload.decode('utf-8'))
def gen_cert(self, key=None, bits=2048):
"""Generate DVSNI certificate.
:param OpenSSL.crypto.PKey key: Optional private key used in
certificate generation. If not provided (``None``), then
fresh key will be generated.
:param int bits: Number of bits for newly generated key.
:rtype: `tuple` of `OpenSSL.crypto.X509` and
`OpenSSL.crypto.PKey`
"""
if key is None:
key = OpenSSL.crypto.PKey()
key.generate_key(OpenSSL.crypto.TYPE_RSA, bits)
return crypto_util.gen_ss_cert(key, [
# z_domain is too big to fit into CN, hence first dummy domain
'dummy', self.z_domain.decode()], force_san=True), key
def probe_cert(self, domain, **kwargs):
"""Probe DVSNI challenge certificate.
:param unicode domain:
"""
if "host" not in kwargs:
host = socket.gethostbyname(domain)
logging.debug('%s resolved to %s', domain, host)
kwargs["host"] = host
kwargs.setdefault("port", self.PORT)
kwargs["name"] = self.z_domain
# TODO: try different methods?
# pylint: disable=protected-access
return crypto_util.probe_sni(**kwargs)
def verify_cert(self, cert):
"""Verify DVSNI challenge certificate."""
# pylint: disable=protected-access
sans = crypto_util._pyopenssl_cert_or_req_san(cert)
logging.debug('Certificate %s. SANs: %s', cert.digest('sha1'), sans)
return self.z_domain.decode() in sans
def simple_verify(self, chall, domain, account_public_key,
cert=None, **kwargs):
"""Simple verify.
Verify ``validation`` using ``account_public_key``, optionally
probe DVSNI certificate and check using `verify_cert`.
:param .challenges.DVSNI chall: Corresponding challenge.
:param str domain: Domain name being validated.
:param JWK account_public_key:
:param OpenSSL.crypto.X509 cert: Optional certificate. If not
provided (``None``) certificate will be retrieved using
`probe_cert`.
:returns: ``True`` iff client's control of the domain has been
verified, ``False`` otherwise.
:rtype: bool
"""
# pylint: disable=no-member
if not self.validation.verify(key=account_public_key):
return False
# TODO: it's not checked that payload has exectly 2 fields!
try:
decoded_chall = self.chall
except jose.DeserializationError as error:
logger.debug(error, exc_info=True)
return False
if decoded_chall.token != chall.token:
logger.debug("Wrong token: expected %r, found %r",
chall.token, decoded_chall.token)
return False
if cert is None:
try:
cert = self.probe_cert(domain=domain, **kwargs)
except errors.Error as error:
logger.debug(error, exc_info=True)
return False
return self.verify_cert(cert)
@Challenge.register
class RecoveryContact(ContinuityChallenge):
"""ACME "recoveryContact" challenge.

View file

@ -320,161 +320,6 @@ class TLSSNI01Test(unittest.TestCase):
mock_gen_cert.assert_called_once_with(key=mock.sentinel.cert_key)
class DVSNITest(unittest.TestCase):
def setUp(self):
from acme.challenges import DVSNI
self.msg = DVSNI(
token=jose.b64decode('a82d5ff8ef740d12881f6d3c2277ab2e'))
self.jmsg = {
'type': 'dvsni',
'token': 'a82d5ff8ef740d12881f6d3c2277ab2e',
}
def test_to_partial_json(self):
self.assertEqual(self.jmsg, self.msg.to_partial_json())
def test_from_json(self):
from acme.challenges import DVSNI
self.assertEqual(self.msg, DVSNI.from_json(self.jmsg))
def test_from_json_hashable(self):
from acme.challenges import DVSNI
hash(DVSNI.from_json(self.jmsg))
def test_from_json_invalid_token_length(self):
from acme.challenges import DVSNI
self.jmsg['token'] = jose.encode_b64jose(b'abcd')
self.assertRaises(
jose.DeserializationError, DVSNI.from_json, self.jmsg)
def test_gen_response(self):
from acme.challenges import DVSNI
self.assertEqual(self.msg, DVSNI.json_loads(
self.msg.gen_response(KEY).validation.payload.decode()))
class DVSNIResponseTest(unittest.TestCase):
# pylint: disable=too-many-instance-attributes
def setUp(self):
from acme.challenges import DVSNI
self.chall = DVSNI(
token=jose.b64decode(b'a82d5ff8ef740d12881f6d3c2277ab2e'))
from acme.challenges import DVSNIResponse
self.validation = jose.JWS.sign(
payload=self.chall.json_dumps(sort_keys=True).encode(),
key=KEY, alg=jose.RS256)
self.msg = DVSNIResponse(validation=self.validation)
self.jmsg_to = {
'resource': 'challenge',
'type': 'dvsni',
'validation': self.validation,
}
self.jmsg_from = {
'resource': 'challenge',
'type': 'dvsni',
'validation': self.validation.to_json(),
}
# pylint: disable=invalid-name
label1 = b'e2df3498860637c667fedadc5a8494ec'
label2 = b'09dcc75553c9b3bd73662b50e71b1e42'
self.z = label1 + label2
self.z_domain = label1 + b'.' + label2 + b'.acme.invalid'
self.domain = 'foo.com'
def test_z_and_domain(self):
self.assertEqual(self.z, self.msg.z)
self.assertEqual(self.z_domain, self.msg.z_domain)
def test_to_partial_json(self):
self.assertEqual(self.jmsg_to, self.msg.to_partial_json())
def test_from_json(self):
from acme.challenges import DVSNIResponse
self.assertEqual(self.msg, DVSNIResponse.from_json(self.jmsg_from))
def test_from_json_hashable(self):
from acme.challenges import DVSNIResponse
hash(DVSNIResponse.from_json(self.jmsg_from))
@mock.patch('acme.challenges.socket.gethostbyname')
@mock.patch('acme.challenges.crypto_util.probe_sni')
def test_probe_cert(self, mock_probe_sni, mock_gethostbyname):
mock_gethostbyname.return_value = '127.0.0.1'
self.msg.probe_cert('foo.com')
mock_gethostbyname.assert_called_once_with('foo.com')
mock_probe_sni.assert_called_once_with(
host='127.0.0.1', port=self.msg.PORT,
name=self.z_domain)
self.msg.probe_cert('foo.com', host='8.8.8.8')
mock_probe_sni.assert_called_with(
host='8.8.8.8', port=mock.ANY, name=mock.ANY)
self.msg.probe_cert('foo.com', port=1234)
mock_probe_sni.assert_called_with(
host=mock.ANY, port=1234, name=mock.ANY)
self.msg.probe_cert('foo.com', bar='baz')
mock_probe_sni.assert_called_with(
host=mock.ANY, port=mock.ANY, name=mock.ANY, bar='baz')
self.msg.probe_cert('foo.com', name=b'xxx')
mock_probe_sni.assert_called_with(
host=mock.ANY, port=mock.ANY,
name=self.z_domain)
def test_gen_verify_cert(self):
key1 = test_util.load_pyopenssl_private_key('rsa512_key.pem')
cert, key2 = self.msg.gen_cert(key1)
self.assertEqual(key1, key2)
self.assertTrue(self.msg.verify_cert(cert))
def test_gen_verify_cert_gen_key(self):
cert, key = self.msg.gen_cert()
self.assertTrue(isinstance(key, OpenSSL.crypto.PKey))
self.assertTrue(self.msg.verify_cert(cert))
def test_verify_bad_cert(self):
self.assertFalse(self.msg.verify_cert(test_util.load_cert('cert.pem')))
def test_simple_verify_wrong_account_key(self):
self.assertFalse(self.msg.simple_verify(
self.chall, self.domain, jose.JWKRSA.load(
test_util.load_vector('rsa256_key.pem')).public_key()))
def test_simple_verify_wrong_payload(self):
for payload in b'', b'{}':
msg = self.msg.update(validation=jose.JWS.sign(
payload=payload, key=KEY, alg=jose.RS256))
self.assertFalse(msg.simple_verify(
self.chall, self.domain, KEY.public_key()))
def test_simple_verify_wrong_token(self):
msg = self.msg.update(validation=jose.JWS.sign(
payload=self.chall.update(token=(b'b' * 20)).json_dumps().encode(),
key=KEY, alg=jose.RS256))
self.assertFalse(msg.simple_verify(
self.chall, self.domain, KEY.public_key()))
@mock.patch('acme.challenges.DVSNIResponse.verify_cert', autospec=True)
def test_simple_verify(self, mock_verify_cert):
mock_verify_cert.return_value = mock.sentinel.verification
self.assertEqual(mock.sentinel.verification, self.msg.simple_verify(
self.chall, self.domain, KEY.public_key(),
cert=mock.sentinel.cert))
mock_verify_cert.assert_called_once_with(self.msg, mock.sentinel.cert)
@mock.patch('acme.challenges.DVSNIResponse.probe_cert')
def test_simple_verify_false_on_probe_error(self, mock_probe_cert):
mock_probe_cert.side_effect = errors.Error
self.assertFalse(self.msg.simple_verify(
self.chall, self.domain, KEY.public_key()))
class RecoveryContactTest(unittest.TestCase):
def setUp(self):

View file

@ -151,7 +151,7 @@ certificate for some domain name by solving challenges received from
the ACME server. From the protocol, there are essentially two
different types of challenges. Challenges that must be solved by
individual plugins in order to satisfy domain validation (subclasses
of `~.DVChallenge`, i.e. `~.challenges.DVSNI`,
of `~.DVChallenge`, i.e. `~.challenges.TLSSNI01`,
`~.challenges.HTTP01`, `~.challenges.DNS`) and continuity specific
challenges (subclasses of `~.ContinuityChallenge`,
i.e. `~.challenges.RecoveryToken`, `~.challenges.RecoveryContact`,
@ -160,7 +160,7 @@ always handled by the `~.ContinuityAuthenticator`, while plugins are
expected to handle `~.DVChallenge` types.
Right now, we have two authenticator plugins, the `~.ApacheConfigurator`
and the `~.StandaloneAuthenticator`. The Standalone and Apache
authenticators only solve the `~.challenges.DVSNI` challenge currently.
authenticators only solve the `~.challenges.TLSSNI01` challenge currently.
(You can set which challenges your authenticator can handle through the
:meth:`~.IAuthenticator.get_chall_pref`.