Common code for Lexicon-based DNS authenticators (#4583)

Introduce abstract classes to provide base functionality for Lexicon-based DNS Authenticator plugins and corresponding test cases.
This commit is contained in:
Zach Shepherd 2017-05-18 14:05:47 -07:00 committed by Brad Warren
parent 04759095c2
commit 7da5381968
4 changed files with 257 additions and 0 deletions

View file

@ -0,0 +1,97 @@
"""Common code for DNS Authenticator Plugins built on Lexicon."""
import logging
from requests.exceptions import HTTPError, RequestException
from certbot import errors
from certbot.plugins import dns_common
logger = logging.getLogger(__name__)
class LexiconClient(object):
"""
Encapsulates all communication with a DNS provider via Lexicon.
"""
def __init__(self):
self.provider = None
def add_txt_record(self, domain, record_name, record_content):
"""
Add a TXT record using the supplied information.
:param str domain: The domain to use to look up the managed zone.
:param str record_name: The record name (typically beginning with '_acme-challenge.').
:param str record_content: The record content (typically the challenge validation).
:raises errors.PluginError: if an error occurs communicating with the DNS Provider API
"""
self._find_domain_id(domain)
try:
self.provider.create_record(type='TXT', name=record_name, content=record_content)
except RequestException as e:
logger.debug('Encountered error adding TXT record: %s', e, exc_info=True)
raise errors.PluginError('Error adding TXT record: {0}'.format(e))
def del_txt_record(self, domain, record_name, record_content):
"""
Delete a TXT record using the supplied information.
:param str domain: The domain to use to look up the managed zone.
:param str record_name: The record name (typically beginning with '_acme-challenge.').
:param str record_content: The record content (typically the challenge validation).
:raises errors.PluginError: if an error occurs communicating with the DNS Provider API
"""
try:
self._find_domain_id(domain)
except errors.PluginError as e:
logger.debug('Encountered error finding domain_id during deletion: %s', e,
exc_info=True)
return
try:
self.provider.delete_record(type='TXT', name=record_name, content=record_content)
except RequestException as e:
logger.debug('Encountered error deleting TXT record: %s', e, exc_info=True)
def _find_domain_id(self, domain):
"""
Find the domain_id for a given domain.
:param str domain: The domain for which to find the domain_id.
:raises errors.PluginError: if the domain_id cannot be found.
"""
domain_name_guesses = dns_common.base_domain_name_guesses(domain)
for domain_name in domain_name_guesses:
try:
self.provider.options['domain'] = domain_name
self.provider.authenticate()
return # If `authenticate` doesn't throw an exception, we've found the right name
except HTTPError as e:
result = self._handle_http_error(e, domain_name)
if result:
raise result
except Exception as e: # pylint: disable=broad-except
result = self._handle_general_error(e, domain_name)
if result:
raise result
raise errors.PluginError('Unable to determine zone identifier for {0} using zone names: {1}'
.format(domain, domain_name_guesses))
def _handle_http_error(self, e, domain_name):
return errors.PluginError('Error determining zone identifier for {0}: {1}.'
.format(domain_name, e))
def _handle_general_error(self, e, domain_name):
if not str(e).startswith('No domain found'):
return errors.PluginError('Unexpected error determining zone identifier for {0}: {1}'
.format(domain_name, e))

View file

@ -0,0 +1,27 @@
"""Tests for certbot.plugins.dns_common_lexicon."""
import unittest
import mock
from certbot.plugins import dns_common_lexicon
from certbot.plugins import dns_test_common_lexicon
class LexiconClientTest(unittest.TestCase, dns_test_common_lexicon.BaseLexiconClientTest):
class _FakeLexiconClient(dns_common_lexicon.LexiconClient):
pass
def setUp(self):
super(LexiconClientTest, self).setUp()
self.client = LexiconClientTest._FakeLexiconClient()
self.provider_mock = mock.MagicMock()
self.client.provider = self.provider_mock
if __name__ == "__main__":
unittest.main() # pragma: no cover

View file

@ -0,0 +1,128 @@
"""Base test class for DNS authenticators built on Lexicon."""
import mock
from acme import jose
from requests.exceptions import HTTPError, RequestException
from certbot import errors
from certbot.plugins import dns_test_common
from certbot.tests import util as test_util
DOMAIN = 'example.com'
KEY = jose.JWKRSA.load(test_util.load_vector("rsa512_key.pem"))
# These classes are intended to be subclassed/mixed in, so not all members are defined.
# pylint: disable=no-member
class BaseLexiconAuthenticatorTest(dns_test_common.BaseAuthenticatorTest):
def test_perform(self):
self.auth.perform([self.achall])
expected = [mock.call.add_txt_record(DOMAIN, '_acme-challenge.'+DOMAIN, mock.ANY)]
self.assertEqual(expected, self.mock_client.mock_calls)
def test_cleanup(self):
self.auth._attempt_cleanup = True # _attempt_cleanup | pylint: disable=protected-access
self.auth.cleanup([self.achall])
expected = [mock.call.del_txt_record(DOMAIN, '_acme-challenge.'+DOMAIN, mock.ANY)]
self.assertEqual(expected, self.mock_client.mock_calls)
class BaseLexiconClientTest(object):
DOMAIN_NOT_FOUND = Exception('No domain found')
GENERIC_ERROR = RequestException
LOGIN_ERROR = HTTPError('400 Client Error: ...')
UNKNOWN_LOGIN_ERROR = HTTPError('500 Surprise! Error: ...')
record_prefix = "_acme-challenge"
record_name = record_prefix + "." + DOMAIN
record_content = "bar"
def test_add_txt_record(self):
self.client.add_txt_record(DOMAIN, self.record_name, self.record_content)
self.provider_mock.create_record.assert_called_with(type='TXT',
name=self.record_name,
content=self.record_content)
def test_add_txt_record_try_twice_to_find_domain(self):
self.provider_mock.authenticate.side_effect = [self.DOMAIN_NOT_FOUND, '']
self.client.add_txt_record(DOMAIN, self.record_name, self.record_content)
self.provider_mock.create_record.assert_called_with(type='TXT',
name=self.record_name,
content=self.record_content)
def test_add_txt_record_fail_to_find_domain(self):
self.provider_mock.authenticate.side_effect = [self.DOMAIN_NOT_FOUND,
self.DOMAIN_NOT_FOUND,
self.DOMAIN_NOT_FOUND,]
self.assertRaises(errors.PluginError,
self.client.add_txt_record,
DOMAIN, self.record_name, self.record_content)
def test_add_txt_record_fail_to_authenticate(self):
self.provider_mock.authenticate.side_effect = self.LOGIN_ERROR
self.assertRaises(errors.PluginError,
self.client.add_txt_record,
DOMAIN, self.record_name, self.record_content)
def test_add_txt_record_fail_to_authenticate_with_unknown_error(self):
self.provider_mock.authenticate.side_effect = self.UNKNOWN_LOGIN_ERROR
self.assertRaises(errors.PluginError,
self.client.add_txt_record,
DOMAIN, self.record_name, self.record_content)
def test_add_txt_record_error_finding_domain(self):
self.provider_mock.authenticate.side_effect = self.GENERIC_ERROR
self.assertRaises(errors.PluginError,
self.client.add_txt_record,
DOMAIN, self.record_name, self.record_content)
def test_add_txt_record_error_adding_record(self):
self.provider_mock.create_record.side_effect = self.GENERIC_ERROR
self.assertRaises(errors.PluginError,
self.client.add_txt_record,
DOMAIN, self.record_name, self.record_content)
def test_del_txt_record(self):
self.client.del_txt_record(DOMAIN, self.record_name, self.record_content)
self.provider_mock.delete_record.assert_called_with(type='TXT',
name=self.record_name,
content=self.record_content)
def test_del_txt_record_fail_to_find_domain(self):
self.provider_mock.authenticate.side_effect = [self.DOMAIN_NOT_FOUND,
self.DOMAIN_NOT_FOUND,
self.DOMAIN_NOT_FOUND, ]
self.client.del_txt_record(DOMAIN, self.record_name, self.record_content)
def test_del_txt_record_fail_to_authenticate(self):
self.provider_mock.authenticate.side_effect = self.LOGIN_ERROR
self.client.del_txt_record(DOMAIN, self.record_name, self.record_content)
def test_del_txt_record_fail_to_authenticate_with_unknown_error(self):
self.provider_mock.authenticate.side_effect = self.UNKNOWN_LOGIN_ERROR
self.client.del_txt_record(DOMAIN, self.record_name, self.record_content)
def test_del_txt_record_error_finding_domain(self):
self.provider_mock.authenticate.side_effect = self.GENERIC_ERROR
self.client.del_txt_record(DOMAIN, self.record_name, self.record_content)
def test_del_txt_record_error_deleting_record(self):
self.provider_mock.delete_record.side_effect = self.GENERIC_ERROR
self.client.del_txt_record(DOMAIN, self.record_name, self.record_content)

View file

@ -0,0 +1,5 @@
:mod:`certbot.plugins.dns_common_lexicon`
-----------------------------------------
.. automodule:: certbot.plugins.dns_common_lexicon
:members: