From bbb7606fe1914e186443b85bf192f55200400285 Mon Sep 17 00:00:00 2001 From: Jakub Warmuz Date: Sat, 7 Nov 2015 18:17:35 +0000 Subject: [PATCH] Kill dvsni in acme --- acme/acme/challenges.py | 162 ----------------------------------- acme/acme/challenges_test.py | 155 --------------------------------- docs/contributing.rst | 4 +- 3 files changed, 2 insertions(+), 319 deletions(-) diff --git a/acme/acme/challenges.py b/acme/acme/challenges.py index 5f97547ee..522e701e7 100644 --- a/acme/acme/challenges.py +++ b/acme/acme/challenges.py @@ -440,168 +440,6 @@ class TLSSNI01(KeyAuthorizationChallenge): return self.response(account_key).gen_cert(key=kwargs.get('cert_key')) -@Challenge.register # pylint: disable=too-many-ancestors -class DVSNI(_TokenDVChallenge): - """ACME "dvsni" challenge. - - :ivar bytes token: Random data, **not** base64-encoded. - - """ - typ = "dvsni" - - PORT = 443 - """Port to perform DVSNI challenge.""" - - def gen_response(self, account_key, alg=jose.RS256, **kwargs): - """Generate response. - - :param .JWK account_key: Private account key. - :rtype: .DVSNIResponse - - """ - return DVSNIResponse(validation=jose.JWS.sign( - payload=self.json_dumps(sort_keys=True).encode('utf-8'), - key=account_key, alg=alg, **kwargs)) - - -@ChallengeResponse.register -class DVSNIResponse(ChallengeResponse): - """ACME "dvsni" challenge response. - - :param bytes s: Random data, **not** base64-encoded. - - """ - typ = "dvsni" - - DOMAIN_SUFFIX = b".acme.invalid" - """Domain name suffix.""" - - PORT = DVSNI.PORT - """Port to perform DVSNI challenge.""" - - validation = jose.Field("validation", decoder=jose.JWS.from_json) - - @property - def z(self): # pylint: disable=invalid-name - """The ``z`` parameter. - - :rtype: bytes - - """ - # Instance of 'Field' has no 'signature' member - # pylint: disable=no-member - return hashlib.sha256(self.validation.signature.encode( - "signature").encode("utf-8")).hexdigest().encode() - - @property - def z_domain(self): - """Domain name for certificate subjectAltName. - - :rtype: bytes - - """ - z = self.z # pylint: disable=invalid-name - return z[:32] + b'.' + z[32:] + self.DOMAIN_SUFFIX - - @property - def chall(self): - """Get challenge encoded in the `validation` payload. - - :rtype: challenges.DVSNI - - """ - # pylint: disable=no-member - return DVSNI.json_loads(self.validation.payload.decode('utf-8')) - - def gen_cert(self, key=None, bits=2048): - """Generate DVSNI certificate. - - :param OpenSSL.crypto.PKey key: Optional private key used in - certificate generation. If not provided (``None``), then - fresh key will be generated. - :param int bits: Number of bits for newly generated key. - - :rtype: `tuple` of `OpenSSL.crypto.X509` and - `OpenSSL.crypto.PKey` - - """ - if key is None: - key = OpenSSL.crypto.PKey() - key.generate_key(OpenSSL.crypto.TYPE_RSA, bits) - return crypto_util.gen_ss_cert(key, [ - # z_domain is too big to fit into CN, hence first dummy domain - 'dummy', self.z_domain.decode()], force_san=True), key - - def probe_cert(self, domain, **kwargs): - """Probe DVSNI challenge certificate. - - :param unicode domain: - - """ - if "host" not in kwargs: - host = socket.gethostbyname(domain) - logging.debug('%s resolved to %s', domain, host) - kwargs["host"] = host - - kwargs.setdefault("port", self.PORT) - kwargs["name"] = self.z_domain - # TODO: try different methods? - # pylint: disable=protected-access - return crypto_util.probe_sni(**kwargs) - - def verify_cert(self, cert): - """Verify DVSNI challenge certificate.""" - # pylint: disable=protected-access - sans = crypto_util._pyopenssl_cert_or_req_san(cert) - logging.debug('Certificate %s. SANs: %s', cert.digest('sha1'), sans) - return self.z_domain.decode() in sans - - def simple_verify(self, chall, domain, account_public_key, - cert=None, **kwargs): - """Simple verify. - - Verify ``validation`` using ``account_public_key``, optionally - probe DVSNI certificate and check using `verify_cert`. - - :param .challenges.DVSNI chall: Corresponding challenge. - :param str domain: Domain name being validated. - :param JWK account_public_key: - :param OpenSSL.crypto.X509 cert: Optional certificate. If not - provided (``None``) certificate will be retrieved using - `probe_cert`. - - - :returns: ``True`` iff client's control of the domain has been - verified, ``False`` otherwise. - :rtype: bool - - """ - # pylint: disable=no-member - if not self.validation.verify(key=account_public_key): - return False - - # TODO: it's not checked that payload has exectly 2 fields! - try: - decoded_chall = self.chall - except jose.DeserializationError as error: - logger.debug(error, exc_info=True) - return False - - if decoded_chall.token != chall.token: - logger.debug("Wrong token: expected %r, found %r", - chall.token, decoded_chall.token) - return False - - if cert is None: - try: - cert = self.probe_cert(domain=domain, **kwargs) - except errors.Error as error: - logger.debug(error, exc_info=True) - return False - - return self.verify_cert(cert) - - @Challenge.register class RecoveryContact(ContinuityChallenge): """ACME "recoveryContact" challenge. diff --git a/acme/acme/challenges_test.py b/acme/acme/challenges_test.py index 3fcb01e4d..c4f3d6c61 100644 --- a/acme/acme/challenges_test.py +++ b/acme/acme/challenges_test.py @@ -320,161 +320,6 @@ class TLSSNI01Test(unittest.TestCase): mock_gen_cert.assert_called_once_with(key=mock.sentinel.cert_key) -class DVSNITest(unittest.TestCase): - - def setUp(self): - from acme.challenges import DVSNI - self.msg = DVSNI( - token=jose.b64decode('a82d5ff8ef740d12881f6d3c2277ab2e')) - self.jmsg = { - 'type': 'dvsni', - 'token': 'a82d5ff8ef740d12881f6d3c2277ab2e', - } - - def test_to_partial_json(self): - self.assertEqual(self.jmsg, self.msg.to_partial_json()) - - def test_from_json(self): - from acme.challenges import DVSNI - self.assertEqual(self.msg, DVSNI.from_json(self.jmsg)) - - def test_from_json_hashable(self): - from acme.challenges import DVSNI - hash(DVSNI.from_json(self.jmsg)) - - def test_from_json_invalid_token_length(self): - from acme.challenges import DVSNI - self.jmsg['token'] = jose.encode_b64jose(b'abcd') - self.assertRaises( - jose.DeserializationError, DVSNI.from_json, self.jmsg) - - def test_gen_response(self): - from acme.challenges import DVSNI - self.assertEqual(self.msg, DVSNI.json_loads( - self.msg.gen_response(KEY).validation.payload.decode())) - - -class DVSNIResponseTest(unittest.TestCase): - # pylint: disable=too-many-instance-attributes - - def setUp(self): - from acme.challenges import DVSNI - self.chall = DVSNI( - token=jose.b64decode(b'a82d5ff8ef740d12881f6d3c2277ab2e')) - - from acme.challenges import DVSNIResponse - self.validation = jose.JWS.sign( - payload=self.chall.json_dumps(sort_keys=True).encode(), - key=KEY, alg=jose.RS256) - self.msg = DVSNIResponse(validation=self.validation) - self.jmsg_to = { - 'resource': 'challenge', - 'type': 'dvsni', - 'validation': self.validation, - } - self.jmsg_from = { - 'resource': 'challenge', - 'type': 'dvsni', - 'validation': self.validation.to_json(), - } - - # pylint: disable=invalid-name - label1 = b'e2df3498860637c667fedadc5a8494ec' - label2 = b'09dcc75553c9b3bd73662b50e71b1e42' - self.z = label1 + label2 - self.z_domain = label1 + b'.' + label2 + b'.acme.invalid' - self.domain = 'foo.com' - - def test_z_and_domain(self): - self.assertEqual(self.z, self.msg.z) - self.assertEqual(self.z_domain, self.msg.z_domain) - - def test_to_partial_json(self): - self.assertEqual(self.jmsg_to, self.msg.to_partial_json()) - - def test_from_json(self): - from acme.challenges import DVSNIResponse - self.assertEqual(self.msg, DVSNIResponse.from_json(self.jmsg_from)) - - def test_from_json_hashable(self): - from acme.challenges import DVSNIResponse - hash(DVSNIResponse.from_json(self.jmsg_from)) - - @mock.patch('acme.challenges.socket.gethostbyname') - @mock.patch('acme.challenges.crypto_util.probe_sni') - def test_probe_cert(self, mock_probe_sni, mock_gethostbyname): - mock_gethostbyname.return_value = '127.0.0.1' - self.msg.probe_cert('foo.com') - mock_gethostbyname.assert_called_once_with('foo.com') - mock_probe_sni.assert_called_once_with( - host='127.0.0.1', port=self.msg.PORT, - name=self.z_domain) - - self.msg.probe_cert('foo.com', host='8.8.8.8') - mock_probe_sni.assert_called_with( - host='8.8.8.8', port=mock.ANY, name=mock.ANY) - - self.msg.probe_cert('foo.com', port=1234) - mock_probe_sni.assert_called_with( - host=mock.ANY, port=1234, name=mock.ANY) - - self.msg.probe_cert('foo.com', bar='baz') - mock_probe_sni.assert_called_with( - host=mock.ANY, port=mock.ANY, name=mock.ANY, bar='baz') - - self.msg.probe_cert('foo.com', name=b'xxx') - mock_probe_sni.assert_called_with( - host=mock.ANY, port=mock.ANY, - name=self.z_domain) - - def test_gen_verify_cert(self): - key1 = test_util.load_pyopenssl_private_key('rsa512_key.pem') - cert, key2 = self.msg.gen_cert(key1) - self.assertEqual(key1, key2) - self.assertTrue(self.msg.verify_cert(cert)) - - def test_gen_verify_cert_gen_key(self): - cert, key = self.msg.gen_cert() - self.assertTrue(isinstance(key, OpenSSL.crypto.PKey)) - self.assertTrue(self.msg.verify_cert(cert)) - - def test_verify_bad_cert(self): - self.assertFalse(self.msg.verify_cert(test_util.load_cert('cert.pem'))) - - def test_simple_verify_wrong_account_key(self): - self.assertFalse(self.msg.simple_verify( - self.chall, self.domain, jose.JWKRSA.load( - test_util.load_vector('rsa256_key.pem')).public_key())) - - def test_simple_verify_wrong_payload(self): - for payload in b'', b'{}': - msg = self.msg.update(validation=jose.JWS.sign( - payload=payload, key=KEY, alg=jose.RS256)) - self.assertFalse(msg.simple_verify( - self.chall, self.domain, KEY.public_key())) - - def test_simple_verify_wrong_token(self): - msg = self.msg.update(validation=jose.JWS.sign( - payload=self.chall.update(token=(b'b' * 20)).json_dumps().encode(), - key=KEY, alg=jose.RS256)) - self.assertFalse(msg.simple_verify( - self.chall, self.domain, KEY.public_key())) - - @mock.patch('acme.challenges.DVSNIResponse.verify_cert', autospec=True) - def test_simple_verify(self, mock_verify_cert): - mock_verify_cert.return_value = mock.sentinel.verification - self.assertEqual(mock.sentinel.verification, self.msg.simple_verify( - self.chall, self.domain, KEY.public_key(), - cert=mock.sentinel.cert)) - mock_verify_cert.assert_called_once_with(self.msg, mock.sentinel.cert) - - @mock.patch('acme.challenges.DVSNIResponse.probe_cert') - def test_simple_verify_false_on_probe_error(self, mock_probe_cert): - mock_probe_cert.side_effect = errors.Error - self.assertFalse(self.msg.simple_verify( - self.chall, self.domain, KEY.public_key())) - - class RecoveryContactTest(unittest.TestCase): def setUp(self): diff --git a/docs/contributing.rst b/docs/contributing.rst index efc6c27ae..c71aefeec 100644 --- a/docs/contributing.rst +++ b/docs/contributing.rst @@ -151,7 +151,7 @@ certificate for some domain name by solving challenges received from the ACME server. From the protocol, there are essentially two different types of challenges. Challenges that must be solved by individual plugins in order to satisfy domain validation (subclasses -of `~.DVChallenge`, i.e. `~.challenges.DVSNI`, +of `~.DVChallenge`, i.e. `~.challenges.TLSSNI01`, `~.challenges.HTTP01`, `~.challenges.DNS`) and continuity specific challenges (subclasses of `~.ContinuityChallenge`, i.e. `~.challenges.RecoveryToken`, `~.challenges.RecoveryContact`, @@ -160,7 +160,7 @@ always handled by the `~.ContinuityAuthenticator`, while plugins are expected to handle `~.DVChallenge` types. Right now, we have two authenticator plugins, the `~.ApacheConfigurator` and the `~.StandaloneAuthenticator`. The Standalone and Apache -authenticators only solve the `~.challenges.DVSNI` challenge currently. +authenticators only solve the `~.challenges.TLSSNI01` challenge currently. (You can set which challenges your authenticator can handle through the :meth:`~.IAuthenticator.get_chall_pref`.