v4 DNS challenge

This commit is contained in:
Jakub Warmuz 2015-08-22 15:41:37 +00:00
parent c3941b1a8d
commit dbf5d086bd
No known key found for this signature in database
GPG key ID: 2A7BAD3A489B52EA
4 changed files with 173 additions and 16 deletions

View file

@ -514,10 +514,100 @@ class DNS(DVChallenge):
"""
typ = "dns"
token = jose.Field("token")
LABEL = "_acme-challenge"
"""Label clients prepend to the domain name being validated."""
TOKEN_SIZE = 128 / 8 # Based on the entropy value from the spec
"""Minimum size of the :attr:`token` in bytes."""
token = jose.Field(
"token", encoder=jose.encode_b64jose, decoder=functools.partial(
jose.decode_b64jose, size=TOKEN_SIZE, minimum=True))
def gen_validation(self, account_key, alg=jose.RS256, **kwargs):
"""Generate validation.
:param .JWK account_key: Private account key.
:param .JWA alg:
:returns: This challenge wrapped in `.JWS`
:rtype: .JWS
"""
return jose.JWS.sign(
payload=self.json_dumps(sort_keys=True).encode('utf-8'),
key=account_key, alg=alg, **kwargs)
def check_validation(self, validation, account_public_key):
"""Check validation.
:param validation
:type account_public_key:
`~cryptography.hazmat.primitives.asymmetric.rsa.RSAPublicKey`
or
`~cryptography.hazmat.primitives.asymmetric.dsa.DSAPublicKey`
or
`~cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey`
wrapped in `.ComparableKey`
:rtype: bool
"""
if not validation.verify(key=account_public_key):
return False
try:
return self == self.json_loads(
validation.payload.decode('utf-8'))
except jose.DeserializationError as error:
logger.debug("Checking validation for DNS failed: %s", error)
return False
def gen_response(self, account_key, **kwargs):
"""Generate response.
:param .JWK account_key: Private account key.
:param .JWA alg:
:rtype: DNSResponse
"""
return DNSResponse(validation=self.gen_validation(
self, account_key, **kwargs))
def validation_domain_name(self, name):
"""Domain name for TXT validation record.
:param unicode name: Domain name being validated.
"""
return "{0}.{1}".format(self.LABEL, name)
@ChallengeResponse.register
class DNSResponse(ChallengeResponse):
"""ACME "dns" challenge response."""
"""ACME "dns" challenge response.
:param JWS validation:
"""
typ = "dns"
validation = jose.Field("validation", decoder=jose.JWS.from_json)
def check_validation(self, chall, account_public_key):
"""Check validation.
:param challenges.DNS chall:
:type account_public_key:
`~cryptography.hazmat.primitives.asymmetric.rsa.RSAPublicKey`
or
`~cryptography.hazmat.primitives.asymmetric.dsa.DSAPublicKey`
or
`~cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey`
wrapped in `.ComparableKey`
:rtype: bool
"""
return chall.check_validation(self.validation, account_public_key)

View file

@ -570,9 +570,15 @@ class ProofOfPossessionResponseTest(unittest.TestCase):
class DNSTest(unittest.TestCase):
def setUp(self):
self.account_key = jose.JWKRSA.load(
test_util.load_vector('rsa512_key.pem'))
from acme.challenges import DNS
self.msg = DNS(token='17817c66b60ce2e4012dfad92657527a')
self.jmsg = {'type': 'dns', 'token': '17817c66b60ce2e4012dfad92657527a'}
self.msg = DNS(token=jose.b64decode(
b'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA'))
self.jmsg = {
'type': 'dns',
'token': 'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA',
}
def test_to_partial_json(self):
self.assertEqual(self.jmsg, self.msg.to_partial_json())
@ -585,27 +591,84 @@ class DNSTest(unittest.TestCase):
from acme.challenges import DNS
hash(DNS.from_json(self.jmsg))
def test_gen_check_validation(self):
self.assertTrue(self.msg.check_validation(
self.msg.gen_validation(self.account_key),
self.account_key.public_key()))
def test_gen_check_validation_wrong_key(self):
key2 = jose.JWKRSA.load(test_util.load_vector('rsa1024_key.pem'))
self.assertFalse(self.msg.check_validation(
self.msg.gen_validation(self.account_key), key2.public_key()))
def test_check_validation_wrong_payload(self):
validations = tuple(
jose.JWS.sign(payload=payload, alg=jose.RS256, key=self.account_key)
for payload in (b'', b'{}')
)
for validation in validations:
self.assertFalse(self.msg.check_validation(
validation, self.account_key.public_key()))
def test_check_validation_wrong_fields(self):
bad_validation = jose.JWS.sign(
payload=self.msg.update(token=b'x' * 20).json_dumps().encode('utf-8'),
alg=jose.RS256, key=self.account_key)
self.assertFalse(self.msg.check_validation(
bad_validation, self.account_key.public_key()))
def test_gen_response(self):
with mock.patch('acme.challenges.DNS.gen_validation') as mock_gen:
mock_gen.return_value = mock.sentinel.validation
response = self.msg.gen_response(self.account_key)
from acme.challenges import DNSResponse
self.assertTrue(isinstance(response, DNSResponse))
self.assertEqual(response.validation, mock.sentinel.validation)
def test_validation_domain_name(self):
self.assertEqual(
'_acme-challenge.le.wtf', self.msg.validation_domain_name('le.wtf'))
class DNSResponseTest(unittest.TestCase):
def setUp(self):
self.key = jose.JWKRSA(key=KEY)
from acme.challenges import DNS
self.chall = DNS(token=jose.b64decode(
b"evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA"))
self.validation = jose.JWS.sign(
payload=self.chall.json_dumps(sort_keys=True).encode(),
key=self.key, alg=jose.RS256)
from acme.challenges import DNSResponse
self.msg = DNSResponse()
self.jmsg = {
self.msg = DNSResponse(validation=self.validation)
self.jmsg_to = {
'resource': 'challenge',
'type': 'dns',
'validation': self.validation,
}
self.jmsg_from = {
'resource': 'challenge',
'type': 'dns',
'validation': self.validation.to_json(),
}
def test_to_partial_json(self):
self.assertEqual(self.jmsg, self.msg.to_partial_json())
self.assertEqual(self.jmsg_to, self.msg.to_partial_json())
def test_from_json(self):
from acme.challenges import DNSResponse
self.assertEqual(self.msg, DNSResponse.from_json(self.jmsg))
self.assertEqual(self.msg, DNSResponse.from_json(self.jmsg_from))
def test_from_json_hashable(self):
from acme.challenges import DNSResponse
hash(DNSResponse.from_json(self.jmsg))
hash(DNSResponse.from_json(self.jmsg_from))
def test_check_validation(self):
self.assertTrue(
self.msg.check_validation(self.chall, self.key.public_key()))
if __name__ == '__main__':

View file

@ -55,7 +55,8 @@ class ClientTest(unittest.TestCase):
authzr_uri = 'https://www.letsencrypt-demo.org/acme/authz/1'
challb = messages.ChallengeBody(
uri=(authzr_uri + '/1'), status=messages.STATUS_VALID,
chall=challenges.DNS(token='foo'))
chall=challenges.DNS(token=jose.b64decode(
'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA')))
self.challr = messages.ChallengeResource(
body=challb, authzr_uri=authzr_uri)
self.authz = messages.Authorization(
@ -155,7 +156,7 @@ class ClientTest(unittest.TestCase):
self.response.links['up'] = {'url': self.challr.authzr_uri}
self.response.json.return_value = self.challr.body.to_json()
chall_response = challenges.DNSResponse()
chall_response = challenges.DNSResponse(validation=None)
self.client.answer_challenge(self.challr.body, chall_response)
@ -164,8 +165,9 @@ class ClientTest(unittest.TestCase):
self.challr.body.update(uri='foo'), chall_response)
def test_answer_challenge_missing_next(self):
self.assertRaises(errors.ClientError, self.client.answer_challenge,
self.challr.body, challenges.DNSResponse())
self.assertRaises(
errors.ClientError, self.client.answer_challenge,
self.challr.body, challenges.DNSResponse(validation=None))
def test_retry_after_date(self):
self.response.headers['Retry-After'] = 'Fri, 31 Dec 1999 23:59:59 GMT'

View file

@ -185,7 +185,8 @@ class ChallengeBodyTest(unittest.TestCase):
"""Tests for acme.messages.ChallengeBody."""
def setUp(self):
self.chall = challenges.DNS(token='foo')
self.chall = challenges.DNS(token=jose.b64decode(
'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA'))
from acme.messages import ChallengeBody
from acme.messages import Error
@ -201,7 +202,7 @@ class ChallengeBodyTest(unittest.TestCase):
'uri': 'http://challb',
'status': self.status,
'type': 'dns',
'token': 'foo',
'token': 'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA',
'error': error,
}
self.jobj_from = self.jobj_to.copy()
@ -224,7 +225,8 @@ class ChallengeBodyTest(unittest.TestCase):
hash(ChallengeBody.from_json(self.jobj_from))
def test_proxy(self):
self.assertEqual('foo', self.challb.token)
self.assertEqual(jose.b64decode(
'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA'), self.challb.token)
class AuthorizationTest(unittest.TestCase):