mirror of
https://github.com/certbot/certbot.git
synced 2026-06-04 06:15:36 -04:00
commit
a150828a79
4 changed files with 173 additions and 16 deletions
|
|
@ -514,10 +514,100 @@ class DNS(DVChallenge):
|
|||
|
||||
"""
|
||||
typ = "dns"
|
||||
token = jose.Field("token")
|
||||
|
||||
LABEL = "_acme-challenge"
|
||||
"""Label clients prepend to the domain name being validated."""
|
||||
|
||||
TOKEN_SIZE = 128 / 8 # Based on the entropy value from the spec
|
||||
"""Minimum size of the :attr:`token` in bytes."""
|
||||
|
||||
token = jose.Field(
|
||||
"token", encoder=jose.encode_b64jose, decoder=functools.partial(
|
||||
jose.decode_b64jose, size=TOKEN_SIZE, minimum=True))
|
||||
|
||||
def gen_validation(self, account_key, alg=jose.RS256, **kwargs):
|
||||
"""Generate validation.
|
||||
|
||||
:param .JWK account_key: Private account key.
|
||||
:param .JWA alg:
|
||||
|
||||
:returns: This challenge wrapped in `.JWS`
|
||||
:rtype: .JWS
|
||||
|
||||
"""
|
||||
return jose.JWS.sign(
|
||||
payload=self.json_dumps(sort_keys=True).encode('utf-8'),
|
||||
key=account_key, alg=alg, **kwargs)
|
||||
|
||||
def check_validation(self, validation, account_public_key):
|
||||
"""Check validation.
|
||||
|
||||
:param validation
|
||||
:type account_public_key:
|
||||
`~cryptography.hazmat.primitives.asymmetric.rsa.RSAPublicKey`
|
||||
or
|
||||
`~cryptography.hazmat.primitives.asymmetric.dsa.DSAPublicKey`
|
||||
or
|
||||
`~cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey`
|
||||
wrapped in `.ComparableKey`
|
||||
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
if not validation.verify(key=account_public_key):
|
||||
return False
|
||||
try:
|
||||
return self == self.json_loads(
|
||||
validation.payload.decode('utf-8'))
|
||||
except jose.DeserializationError as error:
|
||||
logger.debug("Checking validation for DNS failed: %s", error)
|
||||
return False
|
||||
|
||||
def gen_response(self, account_key, **kwargs):
|
||||
"""Generate response.
|
||||
|
||||
:param .JWK account_key: Private account key.
|
||||
:param .JWA alg:
|
||||
|
||||
:rtype: DNSResponse
|
||||
|
||||
"""
|
||||
return DNSResponse(validation=self.gen_validation(
|
||||
self, account_key, **kwargs))
|
||||
|
||||
def validation_domain_name(self, name):
|
||||
"""Domain name for TXT validation record.
|
||||
|
||||
:param unicode name: Domain name being validated.
|
||||
|
||||
"""
|
||||
return "{0}.{1}".format(self.LABEL, name)
|
||||
|
||||
|
||||
@ChallengeResponse.register
|
||||
class DNSResponse(ChallengeResponse):
|
||||
"""ACME "dns" challenge response."""
|
||||
"""ACME "dns" challenge response.
|
||||
|
||||
:param JWS validation:
|
||||
|
||||
"""
|
||||
typ = "dns"
|
||||
|
||||
validation = jose.Field("validation", decoder=jose.JWS.from_json)
|
||||
|
||||
def check_validation(self, chall, account_public_key):
|
||||
"""Check validation.
|
||||
|
||||
:param challenges.DNS chall:
|
||||
:type account_public_key:
|
||||
`~cryptography.hazmat.primitives.asymmetric.rsa.RSAPublicKey`
|
||||
or
|
||||
`~cryptography.hazmat.primitives.asymmetric.dsa.DSAPublicKey`
|
||||
or
|
||||
`~cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey`
|
||||
wrapped in `.ComparableKey`
|
||||
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
return chall.check_validation(self.validation, account_public_key)
|
||||
|
|
|
|||
|
|
@ -570,9 +570,15 @@ class ProofOfPossessionResponseTest(unittest.TestCase):
|
|||
class DNSTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.account_key = jose.JWKRSA.load(
|
||||
test_util.load_vector('rsa512_key.pem'))
|
||||
from acme.challenges import DNS
|
||||
self.msg = DNS(token='17817c66b60ce2e4012dfad92657527a')
|
||||
self.jmsg = {'type': 'dns', 'token': '17817c66b60ce2e4012dfad92657527a'}
|
||||
self.msg = DNS(token=jose.b64decode(
|
||||
b'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA'))
|
||||
self.jmsg = {
|
||||
'type': 'dns',
|
||||
'token': 'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA',
|
||||
}
|
||||
|
||||
def test_to_partial_json(self):
|
||||
self.assertEqual(self.jmsg, self.msg.to_partial_json())
|
||||
|
|
@ -585,27 +591,84 @@ class DNSTest(unittest.TestCase):
|
|||
from acme.challenges import DNS
|
||||
hash(DNS.from_json(self.jmsg))
|
||||
|
||||
def test_gen_check_validation(self):
|
||||
self.assertTrue(self.msg.check_validation(
|
||||
self.msg.gen_validation(self.account_key),
|
||||
self.account_key.public_key()))
|
||||
|
||||
def test_gen_check_validation_wrong_key(self):
|
||||
key2 = jose.JWKRSA.load(test_util.load_vector('rsa1024_key.pem'))
|
||||
self.assertFalse(self.msg.check_validation(
|
||||
self.msg.gen_validation(self.account_key), key2.public_key()))
|
||||
|
||||
def test_check_validation_wrong_payload(self):
|
||||
validations = tuple(
|
||||
jose.JWS.sign(payload=payload, alg=jose.RS256, key=self.account_key)
|
||||
for payload in (b'', b'{}')
|
||||
)
|
||||
for validation in validations:
|
||||
self.assertFalse(self.msg.check_validation(
|
||||
validation, self.account_key.public_key()))
|
||||
|
||||
def test_check_validation_wrong_fields(self):
|
||||
bad_validation = jose.JWS.sign(
|
||||
payload=self.msg.update(token=b'x' * 20).json_dumps().encode('utf-8'),
|
||||
alg=jose.RS256, key=self.account_key)
|
||||
self.assertFalse(self.msg.check_validation(
|
||||
bad_validation, self.account_key.public_key()))
|
||||
|
||||
def test_gen_response(self):
|
||||
with mock.patch('acme.challenges.DNS.gen_validation') as mock_gen:
|
||||
mock_gen.return_value = mock.sentinel.validation
|
||||
response = self.msg.gen_response(self.account_key)
|
||||
from acme.challenges import DNSResponse
|
||||
self.assertTrue(isinstance(response, DNSResponse))
|
||||
self.assertEqual(response.validation, mock.sentinel.validation)
|
||||
|
||||
def test_validation_domain_name(self):
|
||||
self.assertEqual(
|
||||
'_acme-challenge.le.wtf', self.msg.validation_domain_name('le.wtf'))
|
||||
|
||||
|
||||
class DNSResponseTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.key = jose.JWKRSA(key=KEY)
|
||||
|
||||
from acme.challenges import DNS
|
||||
self.chall = DNS(token=jose.b64decode(
|
||||
b"evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA"))
|
||||
self.validation = jose.JWS.sign(
|
||||
payload=self.chall.json_dumps(sort_keys=True).encode(),
|
||||
key=self.key, alg=jose.RS256)
|
||||
|
||||
from acme.challenges import DNSResponse
|
||||
self.msg = DNSResponse()
|
||||
self.jmsg = {
|
||||
self.msg = DNSResponse(validation=self.validation)
|
||||
self.jmsg_to = {
|
||||
'resource': 'challenge',
|
||||
'type': 'dns',
|
||||
'validation': self.validation,
|
||||
}
|
||||
self.jmsg_from = {
|
||||
'resource': 'challenge',
|
||||
'type': 'dns',
|
||||
'validation': self.validation.to_json(),
|
||||
}
|
||||
|
||||
def test_to_partial_json(self):
|
||||
self.assertEqual(self.jmsg, self.msg.to_partial_json())
|
||||
self.assertEqual(self.jmsg_to, self.msg.to_partial_json())
|
||||
|
||||
def test_from_json(self):
|
||||
from acme.challenges import DNSResponse
|
||||
self.assertEqual(self.msg, DNSResponse.from_json(self.jmsg))
|
||||
self.assertEqual(self.msg, DNSResponse.from_json(self.jmsg_from))
|
||||
|
||||
def test_from_json_hashable(self):
|
||||
from acme.challenges import DNSResponse
|
||||
hash(DNSResponse.from_json(self.jmsg))
|
||||
hash(DNSResponse.from_json(self.jmsg_from))
|
||||
|
||||
def test_check_validation(self):
|
||||
self.assertTrue(
|
||||
self.msg.check_validation(self.chall, self.key.public_key()))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
|||
|
|
@ -59,7 +59,8 @@ class ClientTest(unittest.TestCase):
|
|||
authzr_uri = 'https://www.letsencrypt-demo.org/acme/authz/1'
|
||||
challb = messages.ChallengeBody(
|
||||
uri=(authzr_uri + '/1'), status=messages.STATUS_VALID,
|
||||
chall=challenges.DNS(token='foo'))
|
||||
chall=challenges.DNS(token=jose.b64decode(
|
||||
'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA')))
|
||||
self.challr = messages.ChallengeResource(
|
||||
body=challb, authzr_uri=authzr_uri)
|
||||
self.authz = messages.Authorization(
|
||||
|
|
@ -166,7 +167,7 @@ class ClientTest(unittest.TestCase):
|
|||
self.response.links['up'] = {'url': self.challr.authzr_uri}
|
||||
self.response.json.return_value = self.challr.body.to_json()
|
||||
|
||||
chall_response = challenges.DNSResponse()
|
||||
chall_response = challenges.DNSResponse(validation=None)
|
||||
|
||||
self.client.answer_challenge(self.challr.body, chall_response)
|
||||
|
||||
|
|
@ -175,8 +176,9 @@ class ClientTest(unittest.TestCase):
|
|||
self.challr.body.update(uri='foo'), chall_response)
|
||||
|
||||
def test_answer_challenge_missing_next(self):
|
||||
self.assertRaises(errors.ClientError, self.client.answer_challenge,
|
||||
self.challr.body, challenges.DNSResponse())
|
||||
self.assertRaises(
|
||||
errors.ClientError, self.client.answer_challenge,
|
||||
self.challr.body, challenges.DNSResponse(validation=None))
|
||||
|
||||
def test_retry_after_date(self):
|
||||
self.response.headers['Retry-After'] = 'Fri, 31 Dec 1999 23:59:59 GMT'
|
||||
|
|
|
|||
|
|
@ -225,7 +225,8 @@ class ChallengeBodyTest(unittest.TestCase):
|
|||
"""Tests for acme.messages.ChallengeBody."""
|
||||
|
||||
def setUp(self):
|
||||
self.chall = challenges.DNS(token='foo')
|
||||
self.chall = challenges.DNS(token=jose.b64decode(
|
||||
'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA'))
|
||||
|
||||
from acme.messages import ChallengeBody
|
||||
from acme.messages import Error
|
||||
|
|
@ -241,7 +242,7 @@ class ChallengeBodyTest(unittest.TestCase):
|
|||
'uri': 'http://challb',
|
||||
'status': self.status,
|
||||
'type': 'dns',
|
||||
'token': 'foo',
|
||||
'token': 'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA',
|
||||
'error': error,
|
||||
}
|
||||
self.jobj_from = self.jobj_to.copy()
|
||||
|
|
@ -263,7 +264,8 @@ class ChallengeBodyTest(unittest.TestCase):
|
|||
hash(ChallengeBody.from_json(self.jobj_from))
|
||||
|
||||
def test_proxy(self):
|
||||
self.assertEqual('foo', self.challb.token)
|
||||
self.assertEqual(jose.b64decode(
|
||||
'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA'), self.challb.token)
|
||||
|
||||
|
||||
class AuthorizationTest(unittest.TestCase):
|
||||
|
|
|
|||
Loading…
Reference in a new issue