Move DNS resolver to separate module to decouple dependencies and testing.

This commit is contained in:
Wilfried Teiken 2016-01-06 01:11:24 -05:00
parent 57c265c7f3
commit dc743fb57c
4 changed files with 84 additions and 57 deletions

View file

@ -15,7 +15,6 @@ from acme import fields
from acme import jose
from acme import other
logger = logging.getLogger(__name__)
@ -242,7 +241,13 @@ class DNS01Response(KeyAuthorizationChallengeResponse):
validation = chall.validation(account_public_key)
logger.debug("Verifying %s at %s...", chall.typ, validation_domain_name)
txt_records = txt_records_for_name(validation_domain_name)
try:
from acme import dns_resolver
txt_records = dns_resolver.txt_records_for_name(
validation_domain_name)
except ImportError as error:
raise ImportError("Local validation for 'dns-01' challenges "
"requires 'dnspython'")
exists = validation in txt_records
if not exists:
logger.debug("Key authorization from response (%r) doesn't match "
@ -701,24 +706,3 @@ class DNSResponse(ChallengeResponse):
"""
return chall.check_validation(self.validation, account_public_key)
def txt_records_for_name(name):
"""Resolve the name and return the TXT records.
:param unicode name: Domain name being verified.
:returns: A list of txt records, if empty the name could not be resolved
:rtype: list of unicode
"""
try:
import dns.resolver
import dns.exception
dns_response = dns.resolver.query(name, 'TXT')
except ImportError as error:
raise ImportError("Local validation for 'dns-01' challenges requires "
"'dnspython'")
except dns.exception.DNSException as error:
logger.error("Unable to resolve %s: %s", name, str(error))
return []
return [txt_rec for rdata in dns_response for txt_rec in rdata.strings]

View file

@ -1,7 +1,6 @@
"""Tests for acme.challenges."""
import unittest
import dns.rrset
import mock
import OpenSSL
import requests
@ -94,16 +93,6 @@ class DNS01ResponseTest(unittest.TestCase):
self.chall = DNS01(token=(b'x' * 16))
self.response = self.chall.response(KEY)
def create_txt_response(self, name, txt_records):
"""
Returns an RRSet containing the 'txt_records' as the result of a DNS
query for 'name'.
This takes advantage of the fact that an Answer object mostly behaves
like an RRset.
"""
return dns.rrset.from_text_list(name, 60, "IN", "TXT", txt_records)
def test_to_partial_json(self):
self.assertEqual(self.jmsg, self.msg.to_partial_json())
@ -119,36 +108,25 @@ class DNS01ResponseTest(unittest.TestCase):
key2 = jose.JWKRSA.load(test_util.load_vector('rsa256_key.pem'))
self.response.simple_verify(self.chall, "local", key2.public_key())
@mock.patch("acme.challenges.dns.resolver.query")
def test_simple_verify_good_validation(self, mock_dns):
mock_dns.return_value = self.create_txt_response(
self.chall.validation_domain_name("local"),
[self.chall.validation(KEY.public_key())])
@mock.patch("acme.dns_resolver.txt_records_for_name")
def test_simple_verify_good_validation(self, mock_resolver):
mock_resolver.return_value = [self.chall.validation(KEY.public_key())]
self.assertTrue(self.response.simple_verify(
self.chall, "local", KEY.public_key()))
mock_dns.assert_called_once_with(
self.chall.validation_domain_name("local"), "TXT")
mock_resolver.assert_called_once_with(
self.chall.validation_domain_name("local"))
@mock.patch("acme.challenges.dns.resolver.query")
def test_simple_verify_good_validation_multiple_txts(self, mock_dns):
mock_dns.return_value = self.create_txt_response(
self.chall.validation_domain_name("local"),
["!", self.chall.validation(KEY.public_key())])
@mock.patch("acme.dns_resolver.txt_records_for_name")
def test_simple_verify_good_validation_multiple_txts(self, mock_resolver):
mock_resolver.return_value = ["!", self.chall.validation(KEY.public_key())]
self.assertTrue(self.response.simple_verify(
self.chall, "local", KEY.public_key()))
mock_dns.assert_called_once_with(
self.chall.validation_domain_name("local"), "TXT")
mock_resolver.assert_called_once_with(
self.chall.validation_domain_name("local"))
@mock.patch("acme.challenges.dns.resolver.query")
@mock.patch("acme.dns_resolver.txt_records_for_name")
def test_simple_verify_bad_validation(self, mock_dns):
mock_dns.return_value = self.create_txt_response(
self.chall.validation_domain_name("local"), ["!"])
self.assertFalse(self.response.simple_verify(
self.chall, "local", KEY.public_key()))
@mock.patch("acme.challenges.dns.resolver.query")
def test_simple_verify_connection_error(self, mock_dns):
mock_dns.side_effect = dns.exception.DNSException
mock_dns.return_value = ["!"]
self.assertFalse(self.response.simple_verify(
self.chall, "local", KEY.public_key()))

28
acme/acme/dns_resolver.py Normal file
View file

@ -0,0 +1,28 @@
"""DNS Resolver for ACME client.
Required only for local validation of 'dns-01' challenges.
"""
import logging
import dns.resolver
import dns.exception
logger = logging.getLogger(__name__)
def txt_records_for_name(name):
"""Resolve the name and return the TXT records.
:param unicode name: Domain name being verified.
:returns: A list of txt records, if empty the name could not be resolved
:rtype: list of unicode
"""
try:
dns_response = dns.resolver.query(name, 'TXT')
except ImportError as error:
raise ImportError("Local validation for 'dns-01' challenges requires "
"'dnspython'")
except dns.exception.DNSException as error:
logger.error("Unable to resolve %s: %s", name, str(error))
return []
return [txt_rec for rdata in dns_response for txt_rec in rdata.strings]

View file

@ -0,0 +1,37 @@
"""Tests for acme.dns_resolver."""
import unittest
import dns
import mock
from acme import dns_resolver
class TxtRecordsForNameTest(unittest.TestCase):
def create_txt_response(self, name, txt_records):
"""
Returns an RRSet containing the 'txt_records' as the result of a DNS
query for 'name'.
This takes advantage of the fact that an Answer object mostly behaves
like an RRset.
"""
return dns.rrset.from_text_list(name, 60, "IN", "TXT", txt_records)
@mock.patch("acme.dns_resolver.dns.resolver.query")
def test_txt_records_for_name_test_with_single_response(self, mock_dns):
mock_dns.return_value = self.create_txt_response('name', ['response'])
self.assertEqual(['response'],
dns_resolver.txt_records_for_name('name'))
@mock.patch("acme.dns_resolver.dns.resolver.query")
def test_txt_records_for_name_with_multiple_responses(self, mock_dns):
mock_dns.return_value = self.create_txt_response(
'name', ['response1', 'response2'])
self.assertEqual(['response1', 'response2'],
dns_resolver.txt_records_for_name('name'))
@mock.patch("acme.dns_resolver.dns.resolver.query")
def test_txt_records_for_name_domain_not_found(self, mock_dns):
mock_dns.side_effect = dns.exception.DNSException
self.assertEquals([], dns_resolver.txt_records_for_name('name'))