mirror of
https://github.com/certbot/certbot.git
synced 2026-06-04 06:15:36 -04:00
Initial verison of DNS-01 implementation
This commit is contained in:
parent
ffd0d5aa56
commit
55ca1b484f
3 changed files with 173 additions and 0 deletions
|
|
@ -1,5 +1,8 @@
|
|||
"""ACME Identifier Validation Challenges."""
|
||||
import abc
|
||||
import base64
|
||||
import dns.resolver
|
||||
import dns.exception
|
||||
import functools
|
||||
import hashlib
|
||||
import logging
|
||||
|
|
@ -215,6 +218,82 @@ class KeyAuthorizationChallenge(_TokenDVChallenge):
|
|||
self.validation(account_key, *args, **kwargs))
|
||||
|
||||
|
||||
@ChallengeResponse.register
|
||||
class DNS01Response(KeyAuthorizationChallengeResponse):
|
||||
"""ACME "dns-01" challenge response."""
|
||||
typ = "dns-01"
|
||||
|
||||
def simple_verify(self, chall, domain, account_public_key):
|
||||
"""Simple verify.
|
||||
|
||||
:param challenges.DNS01 chall: Corresponding challenge.
|
||||
:param unicode domain: Domain name being verified.
|
||||
:param account_public_key: Public key for the key pair
|
||||
being authorized. If ``None`` key verification is not
|
||||
performed!
|
||||
:param JWK account_public_key:
|
||||
|
||||
:returns: ``True`` iff validation is successful, ``False``
|
||||
otherwise.
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
if not self.verify(chall, account_public_key):
|
||||
logger.debug("Verification of key authorization in response failed")
|
||||
return False
|
||||
|
||||
validation_name = chall.validation_domain_name(domain)
|
||||
validation = chall.validation(account_public_key)
|
||||
logger.debug("Verifying %s at %s...", chall.typ, validation_name)
|
||||
txt_records = []
|
||||
try:
|
||||
dns_response = dns.resolver.query(validation_name, 'TXT')
|
||||
for rdata in dns_response:
|
||||
for txt_record in rdata.strings:
|
||||
txt_records.append(txt_record)
|
||||
except dns.exception.DNSException as error:
|
||||
logger.error("Unable to resolve %s: %s", validation_name, error)
|
||||
return False
|
||||
|
||||
for txt_record in txt_records:
|
||||
if txt_record == validation:
|
||||
return True
|
||||
|
||||
logger.debug("Key authorization from response (%r) doesn't match any "
|
||||
"DNS response in %r", self.key_authorization, txt_records)
|
||||
return False
|
||||
|
||||
@Challenge.register # pylint: disable=too-many-ancestors
|
||||
class DNS01(KeyAuthorizationChallenge):
|
||||
"""ACME "dns-01" challenge."""
|
||||
|
||||
response_cls = DNS01Response
|
||||
typ = response_cls.typ
|
||||
|
||||
LABEL = "_acme-challenge"
|
||||
"""Label clients prepend to the domain name being validated."""
|
||||
|
||||
def validation(self, account_key, **unused_kwargs):
|
||||
"""Generate validation.
|
||||
|
||||
:param JWK account_key:
|
||||
:rtype: unicode
|
||||
|
||||
"""
|
||||
key_authorization = self.key_authorization(account_key)
|
||||
# FIXME Once boulder response according to the spec this needs to be fixed
|
||||
# return base64.b64encode(hashlib.sha256(key_authorization).digest())
|
||||
return hashlib.sha256(key_authorization).hexdigest()
|
||||
|
||||
def validation_domain_name(self, name):
|
||||
"""Domain name for TXT validation record.
|
||||
|
||||
:param unicode name: Domain name being validated.
|
||||
|
||||
"""
|
||||
return "{0}.{1}".format(self.LABEL, name)
|
||||
|
||||
|
||||
@ChallengeResponse.register
|
||||
class HTTP01Response(KeyAuthorizationChallengeResponse):
|
||||
"""ACME http-01 challenge response."""
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
"""Tests for acme.challenges."""
|
||||
import unittest
|
||||
|
||||
import dns.rrset
|
||||
import mock
|
||||
import OpenSSL
|
||||
import requests
|
||||
|
|
@ -76,6 +77,98 @@ class KeyAuthorizationChallengeResponseTest(unittest.TestCase):
|
|||
key_authorization='.foo.oKGqedy-b-acd5eoybm2f-NVFxvyOoET5CNy3xnv8WY')
|
||||
self.assertFalse(response.verify(self.chall, KEY.public_key()))
|
||||
|
||||
class DNS01ResponseTest(unittest.TestCase):
|
||||
# pylint: disable=too-many-instance-attributes
|
||||
|
||||
def setUp(self):
|
||||
from acme.challenges import DNS01Response
|
||||
self.msg = DNS01Response(key_authorization=u'foo')
|
||||
self.jmsg = {
|
||||
'resource': 'challenge',
|
||||
'type': 'dns-01',
|
||||
'keyAuthorization': u'foo',
|
||||
}
|
||||
|
||||
from acme.challenges import DNS01
|
||||
self.chall = DNS01(token=(b'x' * 16))
|
||||
self.response = self.chall.response(KEY)
|
||||
|
||||
# This takes advantage of the fact that an answer object mostly behaves like
|
||||
# an RRset
|
||||
def create_txt_response(self, name, txt_record):
|
||||
return dns.rrset.from_text(name, 60, "IN", "TXT", txt_record)
|
||||
|
||||
def test_to_partial_json(self):
|
||||
self.assertEqual(self.jmsg, self.msg.to_partial_json())
|
||||
|
||||
def test_from_json(self):
|
||||
from acme.challenges import DNS01Response
|
||||
self.assertEqual(
|
||||
self.msg, DNS01Response.from_json(self.jmsg))
|
||||
|
||||
def test_from_json_hashable(self):
|
||||
from acme.challenges import DNS01Response
|
||||
hash(DNS01Response.from_json(self.jmsg))
|
||||
|
||||
def test_simple_verify_bad_key_authorization(self):
|
||||
key2 = jose.JWKRSA.load(test_util.load_vector('rsa256_key.pem'))
|
||||
self.response.simple_verify(self.chall, "local", key2.public_key())
|
||||
|
||||
@mock.patch("acme.challenges.dns.resolver.query")
|
||||
def test_simple_verify_good_validation(self, mock_dns):
|
||||
mock_dns.return_value = self.create_txt_response(
|
||||
self.chall.validation_domain_name("local"),
|
||||
self.chall.validation(KEY.public_key()))
|
||||
self.assertTrue(self.response.simple_verify(
|
||||
self.chall, "local", KEY.public_key()))
|
||||
mock_dns.assert_called_once_with(
|
||||
self.chall.validation_domain_name("local"), "TXT")
|
||||
|
||||
@mock.patch("acme.challenges.dns.resolver.query")
|
||||
def test_simple_verify_bad_validation(self, mock_dns):
|
||||
mock_dns.return_value = self.create_txt_response(
|
||||
self.chall.validation_domain_name("local"), "!")
|
||||
self.assertFalse(self.response.simple_verify(
|
||||
self.chall, "local", KEY.public_key()))
|
||||
|
||||
@mock.patch("acme.challenges.dns.resolver.query")
|
||||
def test_simple_verify_connection_error(self, mock_dns):
|
||||
mock_dns.side_effect = dns.exception.DNSException
|
||||
self.assertFalse(self.response.simple_verify(
|
||||
self.chall, "local", KEY.public_key()))
|
||||
|
||||
class DNS01Test(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
from acme.challenges import DNS01
|
||||
self.msg = DNS01(
|
||||
token=jose.decode_b64jose(
|
||||
'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ+PCt92wr+oA'))
|
||||
self.jmsg = {
|
||||
'type': 'dns-01',
|
||||
'token': 'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA',
|
||||
}
|
||||
|
||||
def test_validation_domain_name(self):
|
||||
self.assertEqual('_acme-challenge.www.example.com',
|
||||
self.msg.validation_domain_name('www.example.com'))
|
||||
|
||||
def test_validation(self):
|
||||
self.assertEqual(
|
||||
"ac06bb8888382b6cbaddfbd48427f2f1d3f55e5ef0121990ab4a02853704dd99",
|
||||
self.msg.validation(KEY))
|
||||
|
||||
def test_to_partial_json(self):
|
||||
self.assertEqual(self.jmsg, self.msg.to_partial_json())
|
||||
|
||||
def test_from_json(self):
|
||||
from acme.challenges import DNS01
|
||||
self.assertEqual(self.msg, DNS01.from_json(self.jmsg))
|
||||
|
||||
def test_from_json_hashable(self):
|
||||
from acme.challenges import DNS01
|
||||
hash(DNS01.from_json(self.jmsg))
|
||||
|
||||
|
||||
class HTTP01ResponseTest(unittest.TestCase):
|
||||
# pylint: disable=too-many-instance-attributes
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ install_requires = [
|
|||
'cryptography>=0.8',
|
||||
# Connection.set_tlsext_host_name (>=0.13), X509Req.get_extensions (>=0.15)
|
||||
'PyOpenSSL>=0.15',
|
||||
'dnspython',
|
||||
'pyrfc3339',
|
||||
'pytz',
|
||||
'requests',
|
||||
|
|
|
|||
Loading…
Reference in a new issue