From dbf5d086bda683679a12f70b70d91aa2de04166b Mon Sep 17 00:00:00 2001 From: Jakub Warmuz Date: Sat, 22 Aug 2015 15:41:37 +0000 Subject: [PATCH] v4 DNS challenge --- acme/acme/challenges.py | 94 +++++++++++++++++++++++++++++++++++- acme/acme/challenges_test.py | 77 ++++++++++++++++++++++++++--- acme/acme/client_test.py | 10 ++-- acme/acme/messages_test.py | 8 +-- 4 files changed, 173 insertions(+), 16 deletions(-) diff --git a/acme/acme/challenges.py b/acme/acme/challenges.py index a2235b61e..13186cc4f 100644 --- a/acme/acme/challenges.py +++ b/acme/acme/challenges.py @@ -514,10 +514,100 @@ class DNS(DVChallenge): """ typ = "dns" - token = jose.Field("token") + + LABEL = "_acme-challenge" + """Label clients prepend to the domain name being validated.""" + + TOKEN_SIZE = 128 / 8 # Based on the entropy value from the spec + """Minimum size of the :attr:`token` in bytes.""" + + token = jose.Field( + "token", encoder=jose.encode_b64jose, decoder=functools.partial( + jose.decode_b64jose, size=TOKEN_SIZE, minimum=True)) + + def gen_validation(self, account_key, alg=jose.RS256, **kwargs): + """Generate validation. + + :param .JWK account_key: Private account key. + :param .JWA alg: + + :returns: This challenge wrapped in `.JWS` + :rtype: .JWS + + """ + return jose.JWS.sign( + payload=self.json_dumps(sort_keys=True).encode('utf-8'), + key=account_key, alg=alg, **kwargs) + + def check_validation(self, validation, account_public_key): + """Check validation. + + :param validation + :type account_public_key: + `~cryptography.hazmat.primitives.asymmetric.rsa.RSAPublicKey` + or + `~cryptography.hazmat.primitives.asymmetric.dsa.DSAPublicKey` + or + `~cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey` + wrapped in `.ComparableKey` + + :rtype: bool + + """ + if not validation.verify(key=account_public_key): + return False + try: + return self == self.json_loads( + validation.payload.decode('utf-8')) + except jose.DeserializationError as error: + logger.debug("Checking validation for DNS failed: %s", error) + return False + + def gen_response(self, account_key, **kwargs): + """Generate response. + + :param .JWK account_key: Private account key. + :param .JWA alg: + + :rtype: DNSResponse + + """ + return DNSResponse(validation=self.gen_validation( + self, account_key, **kwargs)) + + def validation_domain_name(self, name): + """Domain name for TXT validation record. + + :param unicode name: Domain name being validated. + + """ + return "{0}.{1}".format(self.LABEL, name) @ChallengeResponse.register class DNSResponse(ChallengeResponse): - """ACME "dns" challenge response.""" + """ACME "dns" challenge response. + + :param JWS validation: + + """ typ = "dns" + + validation = jose.Field("validation", decoder=jose.JWS.from_json) + + def check_validation(self, chall, account_public_key): + """Check validation. + + :param challenges.DNS chall: + :type account_public_key: + `~cryptography.hazmat.primitives.asymmetric.rsa.RSAPublicKey` + or + `~cryptography.hazmat.primitives.asymmetric.dsa.DSAPublicKey` + or + `~cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey` + wrapped in `.ComparableKey` + + :rtype: bool + + """ + return chall.check_validation(self.validation, account_public_key) diff --git a/acme/acme/challenges_test.py b/acme/acme/challenges_test.py index d123eca20..06f5dffe1 100644 --- a/acme/acme/challenges_test.py +++ b/acme/acme/challenges_test.py @@ -570,9 +570,15 @@ class ProofOfPossessionResponseTest(unittest.TestCase): class DNSTest(unittest.TestCase): def setUp(self): + self.account_key = jose.JWKRSA.load( + test_util.load_vector('rsa512_key.pem')) from acme.challenges import DNS - self.msg = DNS(token='17817c66b60ce2e4012dfad92657527a') - self.jmsg = {'type': 'dns', 'token': '17817c66b60ce2e4012dfad92657527a'} + self.msg = DNS(token=jose.b64decode( + b'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA')) + self.jmsg = { + 'type': 'dns', + 'token': 'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA', + } def test_to_partial_json(self): self.assertEqual(self.jmsg, self.msg.to_partial_json()) @@ -585,27 +591,84 @@ class DNSTest(unittest.TestCase): from acme.challenges import DNS hash(DNS.from_json(self.jmsg)) + def test_gen_check_validation(self): + self.assertTrue(self.msg.check_validation( + self.msg.gen_validation(self.account_key), + self.account_key.public_key())) + + def test_gen_check_validation_wrong_key(self): + key2 = jose.JWKRSA.load(test_util.load_vector('rsa1024_key.pem')) + self.assertFalse(self.msg.check_validation( + self.msg.gen_validation(self.account_key), key2.public_key())) + + def test_check_validation_wrong_payload(self): + validations = tuple( + jose.JWS.sign(payload=payload, alg=jose.RS256, key=self.account_key) + for payload in (b'', b'{}') + ) + for validation in validations: + self.assertFalse(self.msg.check_validation( + validation, self.account_key.public_key())) + + def test_check_validation_wrong_fields(self): + bad_validation = jose.JWS.sign( + payload=self.msg.update(token=b'x' * 20).json_dumps().encode('utf-8'), + alg=jose.RS256, key=self.account_key) + self.assertFalse(self.msg.check_validation( + bad_validation, self.account_key.public_key())) + + def test_gen_response(self): + with mock.patch('acme.challenges.DNS.gen_validation') as mock_gen: + mock_gen.return_value = mock.sentinel.validation + response = self.msg.gen_response(self.account_key) + from acme.challenges import DNSResponse + self.assertTrue(isinstance(response, DNSResponse)) + self.assertEqual(response.validation, mock.sentinel.validation) + + def test_validation_domain_name(self): + self.assertEqual( + '_acme-challenge.le.wtf', self.msg.validation_domain_name('le.wtf')) + class DNSResponseTest(unittest.TestCase): def setUp(self): + self.key = jose.JWKRSA(key=KEY) + + from acme.challenges import DNS + self.chall = DNS(token=jose.b64decode( + b"evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA")) + self.validation = jose.JWS.sign( + payload=self.chall.json_dumps(sort_keys=True).encode(), + key=self.key, alg=jose.RS256) + from acme.challenges import DNSResponse - self.msg = DNSResponse() - self.jmsg = { + self.msg = DNSResponse(validation=self.validation) + self.jmsg_to = { 'resource': 'challenge', 'type': 'dns', + 'validation': self.validation, + } + self.jmsg_from = { + 'resource': 'challenge', + 'type': 'dns', + 'validation': self.validation.to_json(), } def test_to_partial_json(self): - self.assertEqual(self.jmsg, self.msg.to_partial_json()) + self.assertEqual(self.jmsg_to, self.msg.to_partial_json()) def test_from_json(self): from acme.challenges import DNSResponse - self.assertEqual(self.msg, DNSResponse.from_json(self.jmsg)) + self.assertEqual(self.msg, DNSResponse.from_json(self.jmsg_from)) def test_from_json_hashable(self): from acme.challenges import DNSResponse - hash(DNSResponse.from_json(self.jmsg)) + hash(DNSResponse.from_json(self.jmsg_from)) + + def test_check_validation(self): + self.assertTrue( + self.msg.check_validation(self.chall, self.key.public_key())) if __name__ == '__main__': diff --git a/acme/acme/client_test.py b/acme/acme/client_test.py index dcc0832e3..ed0c6f65a 100644 --- a/acme/acme/client_test.py +++ b/acme/acme/client_test.py @@ -55,7 +55,8 @@ class ClientTest(unittest.TestCase): authzr_uri = 'https://www.letsencrypt-demo.org/acme/authz/1' challb = messages.ChallengeBody( uri=(authzr_uri + '/1'), status=messages.STATUS_VALID, - chall=challenges.DNS(token='foo')) + chall=challenges.DNS(token=jose.b64decode( + 'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA'))) self.challr = messages.ChallengeResource( body=challb, authzr_uri=authzr_uri) self.authz = messages.Authorization( @@ -155,7 +156,7 @@ class ClientTest(unittest.TestCase): self.response.links['up'] = {'url': self.challr.authzr_uri} self.response.json.return_value = self.challr.body.to_json() - chall_response = challenges.DNSResponse() + chall_response = challenges.DNSResponse(validation=None) self.client.answer_challenge(self.challr.body, chall_response) @@ -164,8 +165,9 @@ class ClientTest(unittest.TestCase): self.challr.body.update(uri='foo'), chall_response) def test_answer_challenge_missing_next(self): - self.assertRaises(errors.ClientError, self.client.answer_challenge, - self.challr.body, challenges.DNSResponse()) + self.assertRaises( + errors.ClientError, self.client.answer_challenge, + self.challr.body, challenges.DNSResponse(validation=None)) def test_retry_after_date(self): self.response.headers['Retry-After'] = 'Fri, 31 Dec 1999 23:59:59 GMT' diff --git a/acme/acme/messages_test.py b/acme/acme/messages_test.py index 481c2e2a3..608ada2c2 100644 --- a/acme/acme/messages_test.py +++ b/acme/acme/messages_test.py @@ -185,7 +185,8 @@ class ChallengeBodyTest(unittest.TestCase): """Tests for acme.messages.ChallengeBody.""" def setUp(self): - self.chall = challenges.DNS(token='foo') + self.chall = challenges.DNS(token=jose.b64decode( + 'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA')) from acme.messages import ChallengeBody from acme.messages import Error @@ -201,7 +202,7 @@ class ChallengeBodyTest(unittest.TestCase): 'uri': 'http://challb', 'status': self.status, 'type': 'dns', - 'token': 'foo', + 'token': 'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA', 'error': error, } self.jobj_from = self.jobj_to.copy() @@ -224,7 +225,8 @@ class ChallengeBodyTest(unittest.TestCase): hash(ChallengeBody.from_json(self.jobj_from)) def test_proxy(self): - self.assertEqual('foo', self.challb.token) + self.assertEqual(jose.b64decode( + 'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA'), self.challb.token) class AuthorizationTest(unittest.TestCase):