Merge pull request #2061 from wteiken/add_dns01_challenge

Add dns-01 challenge support to the ACME client
This commit is contained in:
Brad Warren 2016-08-01 14:45:21 -07:00 committed by GitHub
commit eff181c68c
6 changed files with 253 additions and 10 deletions

View file

@ -14,7 +14,6 @@ from acme import crypto_util
from acme import fields
from acme import jose
logger = logging.getLogger(__name__)
@ -206,6 +205,74 @@ class KeyAuthorizationChallenge(_TokenChallenge):
self.validation(account_key, *args, **kwargs))
@ChallengeResponse.register
class DNS01Response(KeyAuthorizationChallengeResponse):
"""ACME dns-01 challenge response."""
typ = "dns-01"
def simple_verify(self, chall, domain, account_public_key):
"""Simple verify.
:param challenges.DNS01 chall: Corresponding challenge.
:param unicode domain: Domain name being verified.
:param JWK account_public_key: Public key for the key pair
being authorized.
:returns: ``True`` iff validation with the TXT records resolved from a
DNS server is successful.
:rtype: bool
"""
if not self.verify(chall, account_public_key):
logger.debug("Verification of key authorization in response failed")
return False
validation_domain_name = chall.validation_domain_name(domain)
validation = chall.validation(account_public_key)
logger.debug("Verifying %s at %s...", chall.typ, validation_domain_name)
try:
from acme import dns_resolver
except ImportError: # pragma: no cover
raise errors.Error("Local validation for 'dns-01' challenges "
"requires 'dnspython'")
txt_records = dns_resolver.txt_records_for_name(validation_domain_name)
exists = validation in txt_records
if not exists:
logger.debug("Key authorization from response (%r) doesn't match "
"any DNS response in %r", self.key_authorization,
txt_records)
return exists
@Challenge.register # pylint: disable=too-many-ancestors
class DNS01(KeyAuthorizationChallenge):
"""ACME dns-01 challenge."""
response_cls = DNS01Response
typ = response_cls.typ
LABEL = "_acme-challenge"
"""Label clients prepend to the domain name being validated."""
def validation(self, account_key, **unused_kwargs):
"""Generate validation.
:param JWK account_key:
:rtype: unicode
"""
return jose.b64encode(hashlib.sha256(self.key_authorization(
account_key).encode("utf-8")).digest()).decode()
def validation_domain_name(self, name):
"""Domain name for TXT validation record.
:param unicode name: Domain name being validated.
"""
return "{0}.{1}".format(self.LABEL, name)
@ChallengeResponse.register
class HTTP01Response(KeyAuthorizationChallengeResponse):
"""ACME http-01 challenge response."""
@ -231,8 +298,8 @@ class HTTP01Response(KeyAuthorizationChallengeResponse):
being authorized.
:param int port: Port used in the validation.
:returns: ``True`` iff validation is successful, ``False``
otherwise.
:returns: ``True`` iff validation with the files currently served by the
HTTP server is successful.
:rtype: bool
"""
@ -410,7 +477,7 @@ class TLSSNI01Response(KeyAuthorizationChallengeResponse):
:returns: ``True`` iff client's control of the domain has been
verified, ``False`` otherwise.
verified.
:rtype: bool
"""

View file

@ -77,6 +77,93 @@ class KeyAuthorizationChallengeResponseTest(unittest.TestCase):
self.assertFalse(response.verify(self.chall, KEY.public_key()))
class DNS01ResponseTest(unittest.TestCase):
# pylint: disable=too-many-instance-attributes
def setUp(self):
from acme.challenges import DNS01Response
self.msg = DNS01Response(key_authorization=u'foo')
self.jmsg = {
'resource': 'challenge',
'type': 'dns-01',
'keyAuthorization': u'foo',
}
from acme.challenges import DNS01
self.chall = DNS01(token=(b'x' * 16))
self.response = self.chall.response(KEY)
def test_to_partial_json(self):
self.assertEqual(self.jmsg, self.msg.to_partial_json())
def test_from_json(self):
from acme.challenges import DNS01Response
self.assertEqual(self.msg, DNS01Response.from_json(self.jmsg))
def test_from_json_hashable(self):
from acme.challenges import DNS01Response
hash(DNS01Response.from_json(self.jmsg))
def test_simple_verify_bad_key_authorization(self):
key2 = jose.JWKRSA.load(test_util.load_vector('rsa256_key.pem'))
self.response.simple_verify(self.chall, "local", key2.public_key())
@mock.patch("acme.dns_resolver.txt_records_for_name")
def test_simple_verify_good_validation(self, mock_resolver):
mock_resolver.return_value = [self.chall.validation(KEY.public_key())]
self.assertTrue(self.response.simple_verify(
self.chall, "local", KEY.public_key()))
mock_resolver.assert_called_once_with(
self.chall.validation_domain_name("local"))
@mock.patch("acme.dns_resolver.txt_records_for_name")
def test_simple_verify_good_validation_multiple_txts(self, mock_resolver):
mock_resolver.return_value = [
"!", self.chall.validation(KEY.public_key())]
self.assertTrue(self.response.simple_verify(
self.chall, "local", KEY.public_key()))
mock_resolver.assert_called_once_with(
self.chall.validation_domain_name("local"))
@mock.patch("acme.dns_resolver.txt_records_for_name")
def test_simple_verify_bad_validation(self, mock_dns):
mock_dns.return_value = ["!"]
self.assertFalse(self.response.simple_verify(
self.chall, "local", KEY.public_key()))
class DNS01Test(unittest.TestCase):
def setUp(self):
from acme.challenges import DNS01
self.msg = DNS01(token=jose.decode_b64jose(
'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ+PCt92wr+oA'))
self.jmsg = {
'type': 'dns-01',
'token': 'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA',
}
def test_validation_domain_name(self):
self.assertEqual('_acme-challenge.www.example.com',
self.msg.validation_domain_name('www.example.com'))
def test_validation(self):
self.assertEqual(
"rAa7iIg4K2y63fvUhCfy8dP1Xl7wEhmQq0oChTcE3Zk",
self.msg.validation(KEY))
def test_to_partial_json(self):
self.assertEqual(self.jmsg, self.msg.to_partial_json())
def test_from_json(self):
from acme.challenges import DNS01
self.assertEqual(self.msg, DNS01.from_json(self.jmsg))
def test_from_json_hashable(self):
from acme.challenges import DNS01
hash(DNS01.from_json(self.jmsg))
class HTTP01ResponseTest(unittest.TestCase):
# pylint: disable=too-many-instance-attributes

30
acme/acme/dns_resolver.py Normal file
View file

@ -0,0 +1,30 @@
"""DNS Resolver for ACME client.
Required only for local validation of 'dns-01' challenges.
"""
import logging
import dns.resolver
import dns.exception
logger = logging.getLogger(__name__)
def txt_records_for_name(name):
"""Resolve the name and return the TXT records.
:param unicode name: Domain name being verified.
:returns: A list of txt records, if empty the name could not be resolved
:rtype: list of unicode
"""
try:
dns_response = dns.resolver.query(name, 'TXT')
except dns.resolver.NXDOMAIN as error:
return []
except dns.exception.DNSException as error:
logger.error("Error resolving %s: %s", name, str(error))
return []
return [txt_rec.decode("utf-8") for rdata in dns_response
for txt_rec in rdata.strings]

View file

@ -0,0 +1,53 @@
"""Tests for acme.dns_resolver."""
import unittest
import mock
from acme import dns_resolver
try:
import dns
except ImportError: # pragma: no cover
dns = None
def create_txt_response(name, txt_records):
"""
Returns an RRSet containing the 'txt_records' as the result of a DNS
query for 'name'.
This takes advantage of the fact that an Answer object mostly behaves
like an RRset.
"""
return dns.rrset.from_text_list(name, 60, "IN", "TXT", txt_records)
class TxtRecordsForNameTest(unittest.TestCase):
@mock.patch("acme.dns_resolver.dns.resolver.query")
def test_txt_records_for_name_with_single_response(self, mock_dns):
mock_dns.return_value = create_txt_response('name', ['response'])
self.assertEqual(['response'],
dns_resolver.txt_records_for_name('name'))
@mock.patch("acme.dns_resolver.dns.resolver.query")
def test_txt_records_for_name_with_multiple_responses(self, mock_dns):
mock_dns.return_value = create_txt_response(
'name', ['response1', 'response2'])
self.assertEqual(['response1', 'response2'],
dns_resolver.txt_records_for_name('name'))
@mock.patch("acme.dns_resolver.dns.resolver.query")
def test_txt_records_for_name_domain_not_found(self, mock_dns):
mock_dns.side_effect = dns.resolver.NXDOMAIN
self.assertEquals([], dns_resolver.txt_records_for_name('name'))
@mock.patch("acme.dns_resolver.dns.resolver.query")
def test_txt_records_for_name_domain_other_error(self, mock_dns):
mock_dns.side_effect = dns.exception.DNSException
self.assertEquals([], dns_resolver.txt_records_for_name('name'))
def run(self, result=None):
if dns is None: # pragma: no cover
print(self, "... SKIPPING, no dnspython available")
return
super(TxtRecordsForNameTest, self).run(result)

View file

@ -35,6 +35,11 @@ if sys.version_info < (2, 7):
else:
install_requires.append('mock')
# dnspython 1.12 is required to support both Python 2 and Python 3.
dns_extras = [
'dnspython>=1.12',
]
dev_extras = [
'nose',
'pep8',
@ -76,6 +81,7 @@ setup(
include_package_data=True,
install_requires=install_requires,
extras_require={
'dns': dns_extras,
'dev': dev_extras,
'docs': docs_extras,
},

12
tox.ini
View file

@ -13,7 +13,7 @@ envlist = py{26,33,34,35},cover,lint
# packages installed separately to ensure that downstream deps problems
# are detected, c.f. #1002
commands =
pip install -e acme[dev]
pip install -e acme[dns,dev]
nosetests -v acme
pip install -e .[dev]
nosetests -v certbot
@ -38,23 +38,23 @@ deps =
[testenv:py33]
commands =
pip install -e acme[dev]
pip install -e acme[dns,dev]
nosetests -v acme
[testenv:py34]
commands =
pip install -e acme[dev]
pip install -e acme[dns,dev]
nosetests -v acme
[testenv:py35]
commands =
pip install -e acme[dev]
pip install -e acme[dns,dev]
nosetests -v acme
[testenv:cover]
basepython = python2.7
commands =
pip install -e acme[dev] -e .[dev] -e certbot-apache -e certbot-nginx -e letshelp-certbot
pip install -e acme[dns,dev] -e .[dev] -e certbot-apache -e certbot-nginx -e letshelp-certbot
./tox.cover.sh
[testenv:lint]
@ -64,7 +64,7 @@ basepython = python2.7
# duplicate code checking; if one of the commands fails, others will
# continue, but tox return code will reflect previous error
commands =
pip install -q -e acme[dev] -e .[dev] -e certbot-apache -e certbot-nginx -e certbot-compatibility-test -e letshelp-certbot
pip install -q -e acme[dns,dev] -e .[dev] -e certbot-apache -e certbot-nginx -e certbot-compatibility-test -e letshelp-certbot
./pep8.travis.sh
pylint --reports=n --rcfile=.pylintrc certbot
pylint --reports=n --rcfile=acme/.pylintrc acme/acme