From 7da53819682fe1b320e0e2890d4cc4be0a637303 Mon Sep 17 00:00:00 2001 From: Zach Shepherd Date: Thu, 18 May 2017 14:05:47 -0700 Subject: [PATCH] Common code for Lexicon-based DNS authenticators (#4583) Introduce abstract classes to provide base functionality for Lexicon-based DNS Authenticator plugins and corresponding test cases. --- certbot/plugins/dns_common_lexicon.py | 97 ++++++++++++++++ certbot/plugins/dns_common_lexicon_test.py | 27 +++++ certbot/plugins/dns_test_common_lexicon.py | 128 +++++++++++++++++++++ docs/api/plugins/dns_common_lexicon.rst | 5 + 4 files changed, 257 insertions(+) create mode 100644 certbot/plugins/dns_common_lexicon.py create mode 100644 certbot/plugins/dns_common_lexicon_test.py create mode 100644 certbot/plugins/dns_test_common_lexicon.py create mode 100644 docs/api/plugins/dns_common_lexicon.rst diff --git a/certbot/plugins/dns_common_lexicon.py b/certbot/plugins/dns_common_lexicon.py new file mode 100644 index 000000000..7a97fc950 --- /dev/null +++ b/certbot/plugins/dns_common_lexicon.py @@ -0,0 +1,97 @@ +"""Common code for DNS Authenticator Plugins built on Lexicon.""" + +import logging + +from requests.exceptions import HTTPError, RequestException + +from certbot import errors +from certbot.plugins import dns_common + +logger = logging.getLogger(__name__) + + +class LexiconClient(object): + """ + Encapsulates all communication with a DNS provider via Lexicon. + """ + + def __init__(self): + self.provider = None + + def add_txt_record(self, domain, record_name, record_content): + """ + Add a TXT record using the supplied information. + + :param str domain: The domain to use to look up the managed zone. + :param str record_name: The record name (typically beginning with '_acme-challenge.'). + :param str record_content: The record content (typically the challenge validation). + :raises errors.PluginError: if an error occurs communicating with the DNS Provider API + """ + self._find_domain_id(domain) + + try: + self.provider.create_record(type='TXT', name=record_name, content=record_content) + except RequestException as e: + logger.debug('Encountered error adding TXT record: %s', e, exc_info=True) + raise errors.PluginError('Error adding TXT record: {0}'.format(e)) + + def del_txt_record(self, domain, record_name, record_content): + """ + Delete a TXT record using the supplied information. + + :param str domain: The domain to use to look up the managed zone. + :param str record_name: The record name (typically beginning with '_acme-challenge.'). + :param str record_content: The record content (typically the challenge validation). + :raises errors.PluginError: if an error occurs communicating with the DNS Provider API + """ + try: + self._find_domain_id(domain) + except errors.PluginError as e: + logger.debug('Encountered error finding domain_id during deletion: %s', e, + exc_info=True) + return + + try: + self.provider.delete_record(type='TXT', name=record_name, content=record_content) + except RequestException as e: + logger.debug('Encountered error deleting TXT record: %s', e, exc_info=True) + + def _find_domain_id(self, domain): + """ + Find the domain_id for a given domain. + + :param str domain: The domain for which to find the domain_id. + :raises errors.PluginError: if the domain_id cannot be found. + """ + + domain_name_guesses = dns_common.base_domain_name_guesses(domain) + + for domain_name in domain_name_guesses: + try: + self.provider.options['domain'] = domain_name + + self.provider.authenticate() + + return # If `authenticate` doesn't throw an exception, we've found the right name + except HTTPError as e: + result = self._handle_http_error(e, domain_name) + + if result: + raise result + except Exception as e: # pylint: disable=broad-except + result = self._handle_general_error(e, domain_name) + + if result: + raise result + + raise errors.PluginError('Unable to determine zone identifier for {0} using zone names: {1}' + .format(domain, domain_name_guesses)) + + def _handle_http_error(self, e, domain_name): + return errors.PluginError('Error determining zone identifier for {0}: {1}.' + .format(domain_name, e)) + + def _handle_general_error(self, e, domain_name): + if not str(e).startswith('No domain found'): + return errors.PluginError('Unexpected error determining zone identifier for {0}: {1}' + .format(domain_name, e)) diff --git a/certbot/plugins/dns_common_lexicon_test.py b/certbot/plugins/dns_common_lexicon_test.py new file mode 100644 index 000000000..986362ca9 --- /dev/null +++ b/certbot/plugins/dns_common_lexicon_test.py @@ -0,0 +1,27 @@ +"""Tests for certbot.plugins.dns_common_lexicon.""" + +import unittest + +import mock + +from certbot.plugins import dns_common_lexicon +from certbot.plugins import dns_test_common_lexicon + + +class LexiconClientTest(unittest.TestCase, dns_test_common_lexicon.BaseLexiconClientTest): + + class _FakeLexiconClient(dns_common_lexicon.LexiconClient): + pass + + def setUp(self): + super(LexiconClientTest, self).setUp() + + self.client = LexiconClientTest._FakeLexiconClient() + self.provider_mock = mock.MagicMock() + + self.client.provider = self.provider_mock + + + +if __name__ == "__main__": + unittest.main() # pragma: no cover diff --git a/certbot/plugins/dns_test_common_lexicon.py b/certbot/plugins/dns_test_common_lexicon.py new file mode 100644 index 000000000..f9c5735e8 --- /dev/null +++ b/certbot/plugins/dns_test_common_lexicon.py @@ -0,0 +1,128 @@ +"""Base test class for DNS authenticators built on Lexicon.""" + +import mock +from acme import jose +from requests.exceptions import HTTPError, RequestException + +from certbot import errors +from certbot.plugins import dns_test_common +from certbot.tests import util as test_util + +DOMAIN = 'example.com' +KEY = jose.JWKRSA.load(test_util.load_vector("rsa512_key.pem")) + +# These classes are intended to be subclassed/mixed in, so not all members are defined. +# pylint: disable=no-member + +class BaseLexiconAuthenticatorTest(dns_test_common.BaseAuthenticatorTest): + + def test_perform(self): + self.auth.perform([self.achall]) + + expected = [mock.call.add_txt_record(DOMAIN, '_acme-challenge.'+DOMAIN, mock.ANY)] + self.assertEqual(expected, self.mock_client.mock_calls) + + def test_cleanup(self): + self.auth._attempt_cleanup = True # _attempt_cleanup | pylint: disable=protected-access + self.auth.cleanup([self.achall]) + + expected = [mock.call.del_txt_record(DOMAIN, '_acme-challenge.'+DOMAIN, mock.ANY)] + self.assertEqual(expected, self.mock_client.mock_calls) + + +class BaseLexiconClientTest(object): + DOMAIN_NOT_FOUND = Exception('No domain found') + GENERIC_ERROR = RequestException + LOGIN_ERROR = HTTPError('400 Client Error: ...') + UNKNOWN_LOGIN_ERROR = HTTPError('500 Surprise! Error: ...') + + record_prefix = "_acme-challenge" + record_name = record_prefix + "." + DOMAIN + record_content = "bar" + + def test_add_txt_record(self): + self.client.add_txt_record(DOMAIN, self.record_name, self.record_content) + + self.provider_mock.create_record.assert_called_with(type='TXT', + name=self.record_name, + content=self.record_content) + + def test_add_txt_record_try_twice_to_find_domain(self): + self.provider_mock.authenticate.side_effect = [self.DOMAIN_NOT_FOUND, ''] + + self.client.add_txt_record(DOMAIN, self.record_name, self.record_content) + + self.provider_mock.create_record.assert_called_with(type='TXT', + name=self.record_name, + content=self.record_content) + + def test_add_txt_record_fail_to_find_domain(self): + self.provider_mock.authenticate.side_effect = [self.DOMAIN_NOT_FOUND, + self.DOMAIN_NOT_FOUND, + self.DOMAIN_NOT_FOUND,] + + self.assertRaises(errors.PluginError, + self.client.add_txt_record, + DOMAIN, self.record_name, self.record_content) + + def test_add_txt_record_fail_to_authenticate(self): + self.provider_mock.authenticate.side_effect = self.LOGIN_ERROR + + self.assertRaises(errors.PluginError, + self.client.add_txt_record, + DOMAIN, self.record_name, self.record_content) + + def test_add_txt_record_fail_to_authenticate_with_unknown_error(self): + self.provider_mock.authenticate.side_effect = self.UNKNOWN_LOGIN_ERROR + + self.assertRaises(errors.PluginError, + self.client.add_txt_record, + DOMAIN, self.record_name, self.record_content) + + def test_add_txt_record_error_finding_domain(self): + self.provider_mock.authenticate.side_effect = self.GENERIC_ERROR + + self.assertRaises(errors.PluginError, + self.client.add_txt_record, + DOMAIN, self.record_name, self.record_content) + + def test_add_txt_record_error_adding_record(self): + self.provider_mock.create_record.side_effect = self.GENERIC_ERROR + + self.assertRaises(errors.PluginError, + self.client.add_txt_record, + DOMAIN, self.record_name, self.record_content) + + def test_del_txt_record(self): + self.client.del_txt_record(DOMAIN, self.record_name, self.record_content) + + self.provider_mock.delete_record.assert_called_with(type='TXT', + name=self.record_name, + content=self.record_content) + + def test_del_txt_record_fail_to_find_domain(self): + self.provider_mock.authenticate.side_effect = [self.DOMAIN_NOT_FOUND, + self.DOMAIN_NOT_FOUND, + self.DOMAIN_NOT_FOUND, ] + + self.client.del_txt_record(DOMAIN, self.record_name, self.record_content) + + def test_del_txt_record_fail_to_authenticate(self): + self.provider_mock.authenticate.side_effect = self.LOGIN_ERROR + + self.client.del_txt_record(DOMAIN, self.record_name, self.record_content) + + def test_del_txt_record_fail_to_authenticate_with_unknown_error(self): + self.provider_mock.authenticate.side_effect = self.UNKNOWN_LOGIN_ERROR + + self.client.del_txt_record(DOMAIN, self.record_name, self.record_content) + + def test_del_txt_record_error_finding_domain(self): + self.provider_mock.authenticate.side_effect = self.GENERIC_ERROR + + self.client.del_txt_record(DOMAIN, self.record_name, self.record_content) + + def test_del_txt_record_error_deleting_record(self): + self.provider_mock.delete_record.side_effect = self.GENERIC_ERROR + + self.client.del_txt_record(DOMAIN, self.record_name, self.record_content) diff --git a/docs/api/plugins/dns_common_lexicon.rst b/docs/api/plugins/dns_common_lexicon.rst new file mode 100644 index 000000000..a48166828 --- /dev/null +++ b/docs/api/plugins/dns_common_lexicon.rst @@ -0,0 +1,5 @@ +:mod:`certbot.plugins.dns_common_lexicon` +----------------------------------------- + +.. automodule:: certbot.plugins.dns_common_lexicon + :members: