Fix resolution of challenge zone

The zone containing the TXT record to update may be different from
the one of the requested certificate domain, in case that record
is a CNAME for some other location (in which case the target of
the CNAME is specified using --dns-<plugin>-override-challenge).
This commit is contained in:
Thomas Quinot 2018-02-15 21:53:59 +01:00
parent 439425fc42
commit 195363c66c
27 changed files with 148 additions and 135 deletions

View file

@ -49,10 +49,10 @@ class Authenticator(dns_common.DNSAuthenticator):
)
def _perform(self, domain, validation_name, validation):
self._get_cloudflare_client().add_txt_record(domain, validation_name, validation, self.ttl)
self._get_cloudflare_client().add_txt_record(validation_name, validation, self.ttl)
def _cleanup(self, domain, validation_name, validation):
self._get_cloudflare_client().del_txt_record(domain, validation_name, validation)
self._get_cloudflare_client().del_txt_record(validation_name, validation)
def _get_cloudflare_client(self):
return _CloudflareClient(self.credentials.conf('email'), self.credentials.conf('api-key'))
@ -66,18 +66,17 @@ class _CloudflareClient(object):
def __init__(self, email, api_key):
self.cf = CloudFlare.CloudFlare(email, api_key)
def add_txt_record(self, domain, record_name, record_content, record_ttl):
def add_txt_record(self, record_name, record_content, record_ttl):
"""
Add a TXT record using the supplied information.
:param str domain: The domain to use to look up the Cloudflare zone.
:param str record_name: The record name (typically beginning with '_acme-challenge.').
:param str record_content: The record content (typically the challenge validation).
:param int record_ttl: The record TTL (number of seconds that the record may be cached).
:raises certbot.errors.PluginError: if an error occurs communicating with the Cloudflare API
"""
zone_id = self._find_zone_id(domain)
zone_id = self._find_zone_id(record_name)
data = {'type': 'TXT',
'name': record_name,
@ -94,7 +93,7 @@ class _CloudflareClient(object):
record_id = self._find_txt_record_id(zone_id, record_name, record_content)
logger.debug('Successfully added TXT record with record_id: %s', record_id)
def del_txt_record(self, domain, record_name, record_content):
def del_txt_record(self, record_name, record_content):
"""
Delete a TXT record using the supplied information.
@ -103,13 +102,12 @@ class _CloudflareClient(object):
Failures are logged, but not raised.
:param str domain: The domain to use to look up the Cloudflare zone.
:param str record_name: The record name (typically beginning with '_acme-challenge.').
:param str record_content: The record content (typically the challenge validation).
"""
try:
zone_id = self._find_zone_id(domain)
zone_id = self._find_zone_id(record_name)
except errors.PluginError as e:
logger.debug('Encountered error finding zone_id during deletion: %s', e)
return

View file

@ -35,7 +35,7 @@ class AuthenticatorTest(test_util.TempDirTestCase, dns_test_common.BaseAuthentic
def test_perform(self):
self.auth.perform([self.achall])
expected = [mock.call.add_txt_record(DOMAIN, '_acme-challenge.'+DOMAIN, mock.ANY, mock.ANY)]
expected = [mock.call.add_txt_record('_acme-challenge.'+DOMAIN, mock.ANY, mock.ANY)]
self.assertEqual(expected, self.mock_client.mock_calls)
def test_cleanup(self):
@ -43,7 +43,7 @@ class AuthenticatorTest(test_util.TempDirTestCase, dns_test_common.BaseAuthentic
self.auth._attempt_cleanup = True
self.auth.cleanup([self.achall])
expected = [mock.call.del_txt_record(DOMAIN, '_acme-challenge.'+DOMAIN, mock.ANY)]
expected = [mock.call.del_txt_record('_acme-challenge.'+DOMAIN, mock.ANY)]
self.assertEqual(expected, self.mock_client.mock_calls)
@ -65,7 +65,7 @@ class CloudflareClientTest(unittest.TestCase):
def test_add_txt_record(self):
self.cf.zones.get.return_value = [{'id': self.zone_id}]
self.cloudflare_client.add_txt_record(DOMAIN, self.record_name, self.record_content,
self.cloudflare_client.add_txt_record(self.record_name, self.record_content,
self.record_ttl)
self.cf.zones.dns_records.post.assert_called_with(self.zone_id, data=mock.ANY)
@ -85,7 +85,7 @@ class CloudflareClientTest(unittest.TestCase):
self.assertRaises(
errors.PluginError,
self.cloudflare_client.add_txt_record,
DOMAIN, self.record_name, self.record_content, self.record_ttl)
self.record_name, self.record_content, self.record_ttl)
def test_add_txt_record_error_during_zone_lookup(self):
self.cf.zones.get.side_effect = API_ERROR
@ -93,7 +93,7 @@ class CloudflareClientTest(unittest.TestCase):
self.assertRaises(
errors.PluginError,
self.cloudflare_client.add_txt_record,
DOMAIN, self.record_name, self.record_content, self.record_ttl)
self.record_name, self.record_content, self.record_ttl)
def test_add_txt_record_zone_not_found(self):
self.cf.zones.get.return_value = []
@ -101,13 +101,13 @@ class CloudflareClientTest(unittest.TestCase):
self.assertRaises(
errors.PluginError,
self.cloudflare_client.add_txt_record,
DOMAIN, self.record_name, self.record_content, self.record_ttl)
self.record_name, self.record_content, self.record_ttl)
def test_del_txt_record(self):
self.cf.zones.get.return_value = [{'id': self.zone_id}]
self.cf.zones.dns_records.get.return_value = [{'id': self.record_id}]
self.cloudflare_client.del_txt_record(DOMAIN, self.record_name, self.record_content)
self.cloudflare_client.del_txt_record(self.record_name, self.record_content)
expected = [mock.call.zones.get(params=mock.ANY),
mock.call.zones.dns_records.get(self.zone_id, params=mock.ANY),
@ -124,14 +124,14 @@ class CloudflareClientTest(unittest.TestCase):
def test_del_txt_record_error_during_zone_lookup(self):
self.cf.zones.get.side_effect = API_ERROR
self.cloudflare_client.del_txt_record(DOMAIN, self.record_name, self.record_content)
self.cloudflare_client.del_txt_record(self.record_name, self.record_content)
def test_del_txt_record_error_during_delete(self):
self.cf.zones.get.return_value = [{'id': self.zone_id}]
self.cf.zones.dns_records.get.return_value = [{'id': self.record_id}]
self.cf.zones.dns_records.delete.side_effect = API_ERROR
self.cloudflare_client.del_txt_record(DOMAIN, self.record_name, self.record_content)
self.cloudflare_client.del_txt_record(self.record_name, self.record_content)
expected = [mock.call.zones.get(params=mock.ANY),
mock.call.zones.dns_records.get(self.zone_id, params=mock.ANY),
mock.call.zones.dns_records.delete(self.zone_id, self.record_id)]
@ -142,7 +142,7 @@ class CloudflareClientTest(unittest.TestCase):
self.cf.zones.get.return_value = [{'id': self.zone_id}]
self.cf.zones.dns_records.get.side_effect = API_ERROR
self.cloudflare_client.del_txt_record(DOMAIN, self.record_name, self.record_content)
self.cloudflare_client.del_txt_record(self.record_name, self.record_content)
expected = [mock.call.zones.get(params=mock.ANY),
mock.call.zones.dns_records.get(self.zone_id, params=mock.ANY)]
@ -152,7 +152,7 @@ class CloudflareClientTest(unittest.TestCase):
self.cf.zones.get.return_value = [{'id': self.zone_id}]
self.cf.zones.dns_records.get.return_value = []
self.cloudflare_client.del_txt_record(DOMAIN, self.record_name, self.record_content)
self.cloudflare_client.del_txt_record(self.record_name, self.record_content)
expected = [mock.call.zones.get(params=mock.ANY),
mock.call.zones.dns_records.get(self.zone_id, params=mock.ANY)]
@ -161,7 +161,7 @@ class CloudflareClientTest(unittest.TestCase):
def test_del_txt_record_no_zone(self):
self.cf.zones.get.return_value = [{'id': None}]
self.cloudflare_client.del_txt_record(DOMAIN, self.record_name, self.record_content)
self.cloudflare_client.del_txt_record(self.record_name, self.record_content)
expected = [mock.call.zones.get(params=mock.ANY)]
self.assertEqual(expected, self.cf.mock_calls)

View file

@ -50,10 +50,10 @@ class Authenticator(dns_common.DNSAuthenticator):
)
def _perform(self, domain, validation_name, validation):
self._get_cloudxns_client().add_txt_record(domain, validation_name, validation)
self._get_cloudxns_client().add_txt_record(validation_name, validation)
def _cleanup(self, domain, validation_name, validation):
self._get_cloudxns_client().del_txt_record(domain, validation_name, validation)
self._get_cloudxns_client().del_txt_record(validation_name, validation)
def _get_cloudxns_client(self):
return _CloudXNSLexiconClient(self.credentials.conf('api-key'),

View file

@ -4,16 +4,11 @@ import os
import unittest
import mock
from requests.exceptions import HTTPError, RequestException
from certbot.plugins import dns_test_common
from certbot.plugins import dns_test_common_lexicon
from certbot.tests import util as test_util
DOMAIN_NOT_FOUND = Exception('No domain found')
GENERIC_ERROR = RequestException
LOGIN_ERROR = HTTPError('400 Client Error: ...')
API_KEY = 'foo'
SECRET = 'bar'

View file

@ -44,10 +44,10 @@ class Authenticator(dns_common.DNSAuthenticator):
)
def _perform(self, domain, validation_name, validation):
self._get_digitalocean_client().add_txt_record(domain, validation_name, validation)
self._get_digitalocean_client().add_txt_record(validation_name, validation)
def _cleanup(self, domain, validation_name, validation):
self._get_digitalocean_client().del_txt_record(domain, validation_name, validation)
self._get_digitalocean_client().del_txt_record(validation_name, validation)
def _get_digitalocean_client(self):
return _DigitalOceanClient(self.credentials.conf('token'))
@ -61,11 +61,10 @@ class _DigitalOceanClient(object):
def __init__(self, token):
self.manager = digitalocean.Manager(token=token)
def add_txt_record(self, domain_name, record_name, record_content):
def add_txt_record(self, record_name, record_content):
"""
Add a TXT record using the supplied information.
:param str domain_name: The domain to use to associate the record with.
:param str record_name: The record name (typically beginning with '_acme-challenge.').
:param str record_content: The record content (typically the challenge validation).
:raises certbot.errors.PluginError: if an error occurs communicating with the DigitalOcean
@ -73,7 +72,7 @@ class _DigitalOceanClient(object):
"""
try:
domain = self._find_domain(domain_name)
domain = self._find_domain(record_name)
except digitalocean.Error as e:
hint = None
@ -98,7 +97,7 @@ class _DigitalOceanClient(object):
raise errors.PluginError('Error adding TXT record using the DigitalOcean API: {0}'
.format(e))
def del_txt_record(self, domain_name, record_name, record_content):
def del_txt_record(self, record_name, record_content):
"""
Delete a TXT record using the supplied information.
@ -107,13 +106,12 @@ class _DigitalOceanClient(object):
Failures are logged, but not raised.
:param str domain_name: The domain to use to associate the record with.
:param str record_name: The record name (typically beginning with '_acme-challenge.').
:param str record_content: The record content (typically the challenge validation).
"""
try:
domain = self._find_domain(domain_name)
domain = self._find_domain(record_name)
except digitalocean.Error as e:
logger.debug('Error finding domain using the DigitalOcean API: %s', e)
return

View file

@ -34,7 +34,7 @@ class AuthenticatorTest(test_util.TempDirTestCase, dns_test_common.BaseAuthentic
def test_perform(self):
self.auth.perform([self.achall])
expected = [mock.call.add_txt_record(DOMAIN, '_acme-challenge.'+DOMAIN, mock.ANY)]
expected = [mock.call.add_txt_record('_acme-challenge.'+DOMAIN, mock.ANY)]
self.assertEqual(expected, self.mock_client.mock_calls)
def test_cleanup(self):
@ -42,7 +42,7 @@ class AuthenticatorTest(test_util.TempDirTestCase, dns_test_common.BaseAuthentic
self.auth._attempt_cleanup = True
self.auth.cleanup([self.achall])
expected = [mock.call.del_txt_record(DOMAIN, '_acme-challenge.'+DOMAIN, mock.ANY)]
expected = [mock.call.del_txt_record('_acme-challenge.'+DOMAIN, mock.ANY)]
self.assertEqual(expected, self.mock_client.mock_calls)
@ -72,7 +72,7 @@ class DigitalOceanClientTest(unittest.TestCase):
self.manager.get_all_domains.return_value = [wrong_domain_mock, domain_mock]
self.digitalocean_client.add_txt_record(DOMAIN, self.record_name, self.record_content)
self.digitalocean_client.add_txt_record(self.record_name, self.record_content)
domain_mock.create_new_domain_record.assert_called_with(type='TXT',
name=self.record_prefix,
@ -83,14 +83,14 @@ class DigitalOceanClientTest(unittest.TestCase):
self.assertRaises(errors.PluginError,
self.digitalocean_client.add_txt_record,
DOMAIN, self.record_name, self.record_content)
self.record_name, self.record_content)
def test_add_txt_record_error_finding_domain(self):
self.manager.get_all_domains.side_effect = API_ERROR
self.assertRaises(errors.PluginError,
self.digitalocean_client.add_txt_record,
DOMAIN, self.record_name, self.record_content)
self.record_name, self.record_content)
def test_add_txt_record_error_creating_record(self):
domain_mock = mock.MagicMock()
@ -101,7 +101,7 @@ class DigitalOceanClientTest(unittest.TestCase):
self.assertRaises(errors.PluginError,
self.digitalocean_client.add_txt_record,
DOMAIN, self.record_name, self.record_content)
self.record_name, self.record_content)
def test_del_txt_record(self):
first_record_mock = mock.MagicMock()
@ -127,7 +127,7 @@ class DigitalOceanClientTest(unittest.TestCase):
self.manager.get_all_domains.return_value = [domain_mock]
self.digitalocean_client.del_txt_record(DOMAIN, self.record_name, self.record_content)
self.digitalocean_client.del_txt_record(self.record_name, self.record_content)
self.assertTrue(correct_record_mock.destroy.called)
@ -137,7 +137,7 @@ class DigitalOceanClientTest(unittest.TestCase):
def test_del_txt_record_error_finding_domain(self):
self.manager.get_all_domains.side_effect = API_ERROR
self.digitalocean_client.del_txt_record(DOMAIN, self.record_name, self.record_content)
self.digitalocean_client.del_txt_record(self.record_name, self.record_content)
def test_del_txt_record_error_finding_record(self):
domain_mock = mock.MagicMock()
@ -146,7 +146,7 @@ class DigitalOceanClientTest(unittest.TestCase):
self.manager.get_all_domains.return_value = [domain_mock]
self.digitalocean_client.del_txt_record(DOMAIN, self.record_name, self.record_content)
self.digitalocean_client.del_txt_record(self.record_name, self.record_content)
def test_del_txt_record_error_deleting_record(self):
record_mock = mock.MagicMock()
@ -161,7 +161,7 @@ class DigitalOceanClientTest(unittest.TestCase):
self.manager.get_all_domains.return_value = [domain_mock]
self.digitalocean_client.del_txt_record(DOMAIN, self.record_name, self.record_content)
self.digitalocean_client.del_txt_record(self.record_name, self.record_content)
if __name__ == "__main__":

View file

@ -48,10 +48,10 @@ class Authenticator(dns_common.DNSAuthenticator):
)
def _perform(self, domain, validation_name, validation):
self._get_dnsimple_client().add_txt_record(domain, validation_name, validation)
self._get_dnsimple_client().add_txt_record(validation_name, validation)
def _cleanup(self, domain, validation_name, validation):
self._get_dnsimple_client().del_txt_record(domain, validation_name, validation)
self._get_dnsimple_client().del_txt_record(validation_name, validation)
def _get_dnsimple_client(self):
return _DNSimpleLexiconClient(self.credentials.conf('token'), self.ttl)

View file

@ -33,7 +33,8 @@ class AuthenticatorTest(test_util.TempDirTestCase,
class DNSimpleLexiconClientTest(unittest.TestCase, dns_test_common_lexicon.BaseLexiconClientTest):
LOGIN_ERROR = HTTPError('401 Client Error: Unauthorized for url: ...')
def login_error(self, domain):
return HTTPError('401 Client Error: Unauthorized for url: {0}'.format(domain))
def setUp(self):
from certbot_dns_dnsimple.dns_dnsimple import _DNSimpleLexiconClient

View file

@ -52,10 +52,10 @@ class Authenticator(dns_common.DNSAuthenticator):
)
def _perform(self, domain, validation_name, validation):
self._get_dnsmadeeasy_client().add_txt_record(domain, validation_name, validation)
self._get_dnsmadeeasy_client().add_txt_record(validation_name, validation)
def _cleanup(self, domain, validation_name, validation):
self._get_dnsmadeeasy_client().del_txt_record(domain, validation_name, validation)
self._get_dnsmadeeasy_client().del_txt_record(validation_name, validation)
def _get_dnsmadeeasy_client(self):
return _DNSMadeEasyLexiconClient(self.credentials.conf('api-key'),

View file

@ -8,7 +8,6 @@ from requests.exceptions import HTTPError
from certbot.plugins import dns_test_common
from certbot.plugins import dns_test_common_lexicon
from certbot.plugins.dns_test_common import DOMAIN
from certbot.tests import util as test_util
API_KEY = 'foo'
@ -37,8 +36,12 @@ class AuthenticatorTest(test_util.TempDirTestCase,
class DNSMadeEasyLexiconClientTest(unittest.TestCase,
dns_test_common_lexicon.BaseLexiconClientTest):
DOMAIN_NOT_FOUND = HTTPError('404 Client Error: Not Found for url: {0}.'.format(DOMAIN))
LOGIN_ERROR = HTTPError('403 Client Error: Forbidden for url: {0}.'.format(DOMAIN))
def login_error(self, domain):
return HTTPError('403 Client Error: Forbidden for url: {0}.'.format(domain))
def domain_not_found(self, domain):
return HTTPError('404 Client Error: Not Found for url: {0}.'.format(domain))
def setUp(self):
from certbot_dns_dnsmadeeasy.dns_dnsmadeeasy import _DNSMadeEasyLexiconClient

View file

@ -51,10 +51,10 @@ class Authenticator(dns_common.DNSAuthenticator):
)
def _perform(self, domain, validation_name, validation):
self._get_gehirn_client().add_txt_record(domain, validation_name, validation)
self._get_gehirn_client().add_txt_record(validation_name, validation)
def _cleanup(self, domain, validation_name, validation):
self._get_gehirn_client().del_txt_record(domain, validation_name, validation)
self._get_gehirn_client().del_txt_record(validation_name, validation)
def _get_gehirn_client(self):
return _GehirnLexiconClient(

View file

@ -8,7 +8,6 @@ from requests.exceptions import HTTPError
from certbot.plugins import dns_test_common
from certbot.plugins import dns_test_common_lexicon
from certbot.plugins.dns_test_common import DOMAIN
from certbot.tests import util as test_util
API_TOKEN = '00000000-0000-0000-0000-000000000000'
@ -36,8 +35,12 @@ class AuthenticatorTest(test_util.TempDirTestCase,
class GehirnLexiconClientTest(unittest.TestCase, dns_test_common_lexicon.BaseLexiconClientTest):
DOMAIN_NOT_FOUND = HTTPError('404 Client Error: Not Found for url: {0}.'.format(DOMAIN))
LOGIN_ERROR = HTTPError('401 Client Error: Unauthorized for url: {0}.'.format(DOMAIN))
def domain_not_found(self, domain):
return HTTPError('404 Client Error: Not Found for url: {0}.'.format(domain))
def login_error(self, domain):
return HTTPError('401 Client Error: Unauthorized for url: {0}.'.format(domain))
def setUp(self):
from certbot_dns_gehirn.dns_gehirn import _GehirnLexiconClient

View file

@ -67,10 +67,10 @@ class Authenticator(dns_common.DNSAuthenticator):
dns_common.validate_file_permissions(self.conf('credentials'))
def _perform(self, domain, validation_name, validation):
self._get_google_client().add_txt_record(domain, validation_name, validation, self.ttl)
self._get_google_client().add_txt_record(validation_name, validation, self.ttl)
def _cleanup(self, domain, validation_name, validation):
self._get_google_client().del_txt_record(domain, validation_name, validation, self.ttl)
self._get_google_client().del_txt_record(validation_name, validation, self.ttl)
def _get_google_client(self):
return _GoogleClient(self.conf('credentials'))
@ -99,18 +99,17 @@ class _GoogleClient(object):
else:
self.dns = dns_api
def add_txt_record(self, domain, record_name, record_content, record_ttl):
def add_txt_record(self, record_name, record_content, record_ttl):
"""
Add a TXT record using the supplied information.
:param str domain: The domain to use to look up the managed zone.
:param str record_name: The record name (typically beginning with '_acme-challenge.').
:param str record_content: The record content (typically the challenge validation).
:param int record_ttl: The record TTL (number of seconds that the record may be cached).
:raises certbot.errors.PluginError: if an error occurs communicating with the Google API
"""
zone_id = self._find_managed_zone_id(domain)
zone_id = self._find_managed_zone_id(record_name)
record_contents = self.get_existing_txt_rrset(zone_id, record_name)
if record_contents is None:
@ -165,11 +164,10 @@ class _GoogleClient(object):
raise errors.PluginError('Error communicating with the Google Cloud DNS API: {0}'
.format(e))
def del_txt_record(self, domain, record_name, record_content, record_ttl):
def del_txt_record(self, record_name, record_content, record_ttl):
"""
Delete a TXT record using the supplied information.
:param str domain: The domain to use to look up the managed zone.
:param str record_name: The record name (typically beginning with '_acme-challenge.').
:param str record_content: The record content (typically the challenge validation).
:param int record_ttl: The record TTL (number of seconds that the record may be cached).
@ -177,7 +175,7 @@ class _GoogleClient(object):
"""
try:
zone_id = self._find_managed_zone_id(domain)
zone_id = self._find_managed_zone_id(record_name)
except errors.PluginError as e:
logger.warn('Error finding zone. Skipping cleanup.')
return

View file

@ -41,7 +41,7 @@ class AuthenticatorTest(test_util.TempDirTestCase, dns_test_common.BaseAuthentic
def test_perform(self):
self.auth.perform([self.achall])
expected = [mock.call.add_txt_record(DOMAIN, '_acme-challenge.'+DOMAIN, mock.ANY, mock.ANY)]
expected = [mock.call.add_txt_record('_acme-challenge.'+DOMAIN, mock.ANY, mock.ANY)]
self.assertEqual(expected, self.mock_client.mock_calls)
def test_cleanup(self):
@ -49,7 +49,7 @@ class AuthenticatorTest(test_util.TempDirTestCase, dns_test_common.BaseAuthentic
self.auth._attempt_cleanup = True
self.auth.cleanup([self.achall])
expected = [mock.call.del_txt_record(DOMAIN, '_acme-challenge.'+DOMAIN, mock.ANY, mock.ANY)]
expected = [mock.call.del_txt_record('_acme-challenge.'+DOMAIN, mock.ANY, mock.ANY)]
self.assertEqual(expected, self.mock_client.mock_calls)
@mock.patch('httplib2.Http.request', side_effect=ServerNotFoundError)
@ -111,7 +111,7 @@ class GoogleClientTest(unittest.TestCase):
credential_mock.assert_called_once_with('/not/a/real/path.json', mock.ANY)
self.assertFalse(get_project_id_mock.called)
client.add_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl)
client.add_txt_record(self.record_name, self.record_content, self.record_ttl)
expected_body = {
"kind": "dns#change",
@ -138,7 +138,7 @@ class GoogleClientTest(unittest.TestCase):
changes.create.return_value.execute.return_value = {'status': 'pending', 'id': self.change}
changes.get.return_value.execute.return_value = {'status': 'done'}
client.add_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl)
client.add_txt_record(self.record_name, self.record_content, self.record_ttl)
changes.create.assert_called_with(body=mock.ANY,
managedZone=self.zone,
@ -157,7 +157,7 @@ class GoogleClientTest(unittest.TestCase):
mock_get_rrs = "certbot_dns_google.dns_google._GoogleClient.get_existing_txt_rrset"
with mock.patch(mock_get_rrs) as mock_rrs:
mock_rrs.return_value = ["sample-txt-contents"]
client.add_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl)
client.add_txt_record(self.record_name, self.record_content, self.record_ttl)
self.assertTrue(changes.create.called)
self.assertTrue("sample-txt-contents" in
changes.create.call_args_list[0][1]["body"]["deletions"][0]["rrdatas"])
@ -168,7 +168,7 @@ class GoogleClientTest(unittest.TestCase):
def test_add_txt_record_noop(self, unused_credential_mock):
client, changes = self._setUp_client_with_mock(
[{'managedZones': [{'id': self.zone}]}])
client.add_txt_record(DOMAIN, "_acme-challenge.example.org",
client.add_txt_record("_acme-challenge.example.org",
"example-txt-contents", self.record_ttl)
self.assertFalse(changes.create.called)
@ -179,7 +179,7 @@ class GoogleClientTest(unittest.TestCase):
client, unused_changes = self._setUp_client_with_mock(API_ERROR)
self.assertRaises(errors.PluginError, client.add_txt_record,
DOMAIN, self.record_name, self.record_content, self.record_ttl)
self.record_name, self.record_content, self.record_ttl)
@mock.patch('oauth2client.service_account.ServiceAccountCredentials.from_json_keyfile_name')
@mock.patch('certbot_dns_google.dns_google.open',
@ -189,7 +189,7 @@ class GoogleClientTest(unittest.TestCase):
{'managedZones': []}])
self.assertRaises(errors.PluginError, client.add_txt_record,
DOMAIN, self.record_name, self.record_content, self.record_ttl)
self.record_name, self.record_content, self.record_ttl)
@mock.patch('oauth2client.service_account.ServiceAccountCredentials.from_json_keyfile_name')
@mock.patch('certbot_dns_google.dns_google.open',
@ -199,7 +199,7 @@ class GoogleClientTest(unittest.TestCase):
changes.create.side_effect = API_ERROR
self.assertRaises(errors.PluginError, client.add_txt_record,
DOMAIN, self.record_name, self.record_content, self.record_ttl)
self.record_name, self.record_content, self.record_ttl)
@mock.patch('oauth2client.service_account.ServiceAccountCredentials.from_json_keyfile_name')
@mock.patch('certbot_dns_google.dns_google.open',
@ -211,7 +211,7 @@ class GoogleClientTest(unittest.TestCase):
with mock.patch(mock_get_rrs) as mock_rrs:
mock_rrs.return_value = ["\"sample-txt-contents\"",
"\"example-txt-contents\""]
client.del_txt_record(DOMAIN, "_acme-challenge.example.org",
client.del_txt_record("_acme-challenge.example.org",
"example-txt-contents", self.record_ttl)
expected_body = {
@ -246,7 +246,7 @@ class GoogleClientTest(unittest.TestCase):
def test_del_txt_record_error_during_zone_lookup(self, unused_credential_mock):
client, unused_changes = self._setUp_client_with_mock(API_ERROR)
client.del_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl)
client.del_txt_record(self.record_name, self.record_content, self.record_ttl)
@mock.patch('oauth2client.service_account.ServiceAccountCredentials.from_json_keyfile_name')
@mock.patch('certbot_dns_google.dns_google.open',
@ -255,7 +255,7 @@ class GoogleClientTest(unittest.TestCase):
client, unused_changes = self._setUp_client_with_mock([{'managedZones': []},
{'managedZones': []}])
client.del_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl)
client.del_txt_record(self.record_name, self.record_content, self.record_ttl)
@mock.patch('oauth2client.service_account.ServiceAccountCredentials.from_json_keyfile_name')
@mock.patch('certbot_dns_google.dns_google.open',
@ -264,7 +264,7 @@ class GoogleClientTest(unittest.TestCase):
client, changes = self._setUp_client_with_mock([{'managedZones': [{'id': self.zone}]}])
changes.create.side_effect = API_ERROR
client.del_txt_record(DOMAIN, self.record_name, self.record_content, self.record_ttl)
client.del_txt_record(self.record_name, self.record_content, self.record_ttl)
@mock.patch('oauth2client.service_account.ServiceAccountCredentials.from_json_keyfile_name')
@mock.patch('certbot_dns_google.dns_google.open',

View file

@ -46,10 +46,10 @@ class Authenticator(dns_common.DNSAuthenticator):
)
def _perform(self, domain, validation_name, validation):
self._get_linode_client().add_txt_record(domain, validation_name, validation)
self._get_linode_client().add_txt_record(validation_name, validation)
def _cleanup(self, domain, validation_name, validation):
self._get_linode_client().del_txt_record(domain, validation_name, validation)
self._get_linode_client().del_txt_record(validation_name, validation)
def _get_linode_client(self):
return _LinodeLexiconClient(self.credentials.conf('key'))

View file

@ -30,7 +30,8 @@ class AuthenticatorTest(test_util.TempDirTestCase,
class LinodeLexiconClientTest(unittest.TestCase, dns_test_common_lexicon.BaseLexiconClientTest):
DOMAIN_NOT_FOUND = Exception('Domain not found')
def domain_not_found(self, domain):
return Exception('Domain not found')
def setUp(self):
from certbot_dns_linode.dns_linode import _LinodeLexiconClient

View file

@ -49,10 +49,10 @@ class Authenticator(dns_common.DNSAuthenticator):
)
def _perform(self, domain, validation_name, validation):
self._get_luadns_client().add_txt_record(domain, validation_name, validation)
self._get_luadns_client().add_txt_record(validation_name, validation)
def _cleanup(self, domain, validation_name, validation):
self._get_luadns_client().del_txt_record(domain, validation_name, validation)
self._get_luadns_client().del_txt_record(validation_name, validation)
def _get_luadns_client(self):
return _LuaDNSLexiconClient(self.credentials.conf('email'),

View file

@ -34,7 +34,8 @@ class AuthenticatorTest(test_util.TempDirTestCase,
class LuaDNSLexiconClientTest(unittest.TestCase, dns_test_common_lexicon.BaseLexiconClientTest):
LOGIN_ERROR = HTTPError("401 Client Error: Unauthorized for url: ...")
def login_error(self, domain):
return HTTPError("401 Client Error: Unauthorized for url: {0}".format(domain))
def setUp(self):
from certbot_dns_luadns.dns_luadns import _LuaDNSLexiconClient

View file

@ -48,10 +48,10 @@ class Authenticator(dns_common.DNSAuthenticator):
)
def _perform(self, domain, validation_name, validation):
self._get_nsone_client().add_txt_record(domain, validation_name, validation)
self._get_nsone_client().add_txt_record(validation_name, validation)
def _cleanup(self, domain, validation_name, validation):
self._get_nsone_client().del_txt_record(domain, validation_name, validation)
self._get_nsone_client().del_txt_record(validation_name, validation)
def _get_nsone_client(self):
return _NS1LexiconClient(self.credentials.conf('api-key'), self.ttl)

View file

@ -8,7 +8,6 @@ from requests.exceptions import HTTPError
from certbot.plugins import dns_test_common
from certbot.plugins import dns_test_common_lexicon
from certbot.plugins.dns_test_common import DOMAIN
from certbot.tests import util as test_util
API_KEY = 'foo'
@ -33,8 +32,11 @@ class AuthenticatorTest(test_util.TempDirTestCase,
class NS1LexiconClientTest(unittest.TestCase, dns_test_common_lexicon.BaseLexiconClientTest):
DOMAIN_NOT_FOUND = HTTPError('404 Client Error: Not Found for url: {0}.'.format(DOMAIN))
LOGIN_ERROR = HTTPError('401 Client Error: Unauthorized for url: {0}.'.format(DOMAIN))
def domain_not_found(self, domain):
return HTTPError('404 Client Error: Not Found for url: {0}.'.format(domain))
def login_error(self, domain):
return HTTPError('401 Client Error: Unauthorized for url: {0}.'.format(domain))
def setUp(self):
from certbot_dns_nsone.dns_nsone import _NS1LexiconClient

View file

@ -54,10 +54,10 @@ class Authenticator(dns_common.DNSAuthenticator):
)
def _perform(self, domain, validation_name, validation):
self._get_ovh_client().add_txt_record(domain, validation_name, validation)
self._get_ovh_client().add_txt_record(validation_name, validation)
def _cleanup(self, domain, validation_name, validation):
self._get_ovh_client().del_txt_record(domain, validation_name, validation)
self._get_ovh_client().del_txt_record(validation_name, validation)
def _get_ovh_client(self):
return _OVHLexiconClient(

View file

@ -41,8 +41,12 @@ class AuthenticatorTest(test_util.TempDirTestCase,
class OVHLexiconClientTest(unittest.TestCase, dns_test_common_lexicon.BaseLexiconClientTest):
DOMAIN_NOT_FOUND = Exception('Domain example.com not found')
LOGIN_ERROR = HTTPError('403 Client Error: Forbidden for url: https://eu.api.ovh.com/1.0/...')
def domain_not_found(self, domain):
return Exception('Domain {0} not found'.format(domain))
def login_error(self, domain):
return HTTPError('403 Client Error: Forbidden for url: https://eu.api.ovh.com/1.0/...')
def setUp(self):
from certbot_dns_ovh.dns_ovh import _OVHLexiconClient

View file

@ -52,12 +52,10 @@ class Authenticator(dns_common.DNSAuthenticator):
)
def _perform(self, domain, validation_name, validation):
self._get_sakuracloud_client().add_txt_record(
domain, validation_name, validation)
self._get_sakuracloud_client().add_txt_record(validation_name, validation)
def _cleanup(self, domain, validation_name, validation):
self._get_sakuracloud_client().del_txt_record(
domain, validation_name, validation)
self._get_sakuracloud_client().del_txt_record(validation_name, validation)
def _get_sakuracloud_client(self):
return _SakuraCloudLexiconClient(

View file

@ -8,7 +8,6 @@ from requests.exceptions import HTTPError
from certbot.plugins import dns_test_common
from certbot.plugins import dns_test_common_lexicon
from certbot.plugins.dns_test_common import DOMAIN
from certbot.tests import util as test_util
API_TOKEN = '00000000-0000-0000-0000-000000000000'
@ -37,8 +36,12 @@ class AuthenticatorTest(test_util.TempDirTestCase,
class SakuraCloudLexiconClientTest(unittest.TestCase,
dns_test_common_lexicon.BaseLexiconClientTest):
DOMAIN_NOT_FOUND = HTTPError('404 Client Error: Not Found for url: {0}.'.format(DOMAIN))
LOGIN_ERROR = HTTPError('401 Client Error: Unauthorized for url: {0}.'.format(DOMAIN))
def domain_not_found(self, domain):
return HTTPError('404 Client Error: Not Found for url: {0}.'.format(domain))
def login_error(self, domain):
return HTTPError('401 Client Error: Unauthorized for url: {0}.'.format(domain))
def setUp(self):
from certbot_dns_sakuracloud.dns_sakuracloud import _SakuraCloudLexiconClient

View file

@ -18,16 +18,15 @@ class LexiconClient(object):
def __init__(self):
self.provider = None
def add_txt_record(self, domain, record_name, record_content):
def add_txt_record(self, record_name, record_content):
"""
Add a TXT record using the supplied information.
:param str domain: The domain to use to look up the managed zone.
:param str record_name: The record name (typically beginning with '_acme-challenge.').
:param str record_content: The record content (typically the challenge validation).
:raises errors.PluginError: if an error occurs communicating with the DNS Provider API
"""
self._find_domain_id(domain)
self._find_domain_id(record_name)
try:
self.provider.create_record(type='TXT', name=record_name, content=record_content)
@ -35,17 +34,16 @@ class LexiconClient(object):
logger.debug('Encountered error adding TXT record: %s', e, exc_info=True)
raise errors.PluginError('Error adding TXT record: {0}'.format(e))
def del_txt_record(self, domain, record_name, record_content):
def del_txt_record(self, record_name, record_content):
"""
Delete a TXT record using the supplied information.
:param str domain: The domain to use to look up the managed zone.
:param str record_name: The record name (typically beginning with '_acme-challenge.').
:param str record_content: The record content (typically the challenge validation).
:raises errors.PluginError: if an error occurs communicating with the DNS Provider API
"""
try:
self._find_domain_id(domain)
self._find_domain_id(record_name)
except errors.PluginError as e:
logger.debug('Encountered error finding domain_id during deletion: %s', e,
exc_info=True)

View file

@ -5,7 +5,7 @@ import mock
from requests.exceptions import HTTPError, RequestException
from certbot import errors
from certbot.plugins import dns_test_common
from certbot.plugins import dns_common, dns_test_common
from certbot.tests import util as test_util
DOMAIN = 'example.com'
@ -19,110 +19,120 @@ class BaseLexiconAuthenticatorTest(dns_test_common.BaseAuthenticatorTest):
def test_perform(self):
self.auth.perform([self.achall])
expected = [mock.call.add_txt_record(DOMAIN, '_acme-challenge.'+DOMAIN, mock.ANY)]
expected = [mock.call.add_txt_record('_acme-challenge.'+DOMAIN, mock.ANY)]
self.assertEqual(expected, self.mock_client.mock_calls)
def test_cleanup(self):
self.auth._attempt_cleanup = True # _attempt_cleanup | pylint: disable=protected-access
self.auth.cleanup([self.achall])
expected = [mock.call.del_txt_record(DOMAIN, '_acme-challenge.'+DOMAIN, mock.ANY)]
expected = [mock.call.del_txt_record('_acme-challenge.'+DOMAIN, mock.ANY)]
self.assertEqual(expected, self.mock_client.mock_calls)
class BaseLexiconClientTest(object):
DOMAIN_NOT_FOUND = Exception('No domain found')
GENERIC_ERROR = RequestException
LOGIN_ERROR = HTTPError('400 Client Error: ...')
UNKNOWN_LOGIN_ERROR = HTTPError('500 Surprise! Error: ...')
record_prefix = "_acme-challenge"
record_name = record_prefix + "." + DOMAIN
record_content = "bar"
def domain_not_found(self, domain): #pylint: disable=unused-argument
"""Return expected for DOMAIN for found.
"""
return Exception('No domain found')
def login_error(self, domain): #pylint: disable=unused-argument
"""Return expected for login error for DOMAIN.
"""
return HTTPError('400 Client Error: ...')
def test_add_txt_record(self):
self.client.add_txt_record(DOMAIN, self.record_name, self.record_content)
self.client.add_txt_record(self.record_name, self.record_content)
self.provider_mock.create_record.assert_called_with(type='TXT',
name=self.record_name,
content=self.record_content)
def test_add_txt_record_try_twice_to_find_domain(self):
self.provider_mock.authenticate.side_effect = [self.DOMAIN_NOT_FOUND, '']
self.provider_mock.authenticate.side_effect = [self.domain_not_found(self.record_name), '']
self.client.add_txt_record(DOMAIN, self.record_name, self.record_content)
self.client.add_txt_record(self.record_name, self.record_content)
self.provider_mock.create_record.assert_called_with(type='TXT',
name=self.record_name,
content=self.record_content)
def test_add_txt_record_fail_to_find_domain(self):
self.provider_mock.authenticate.side_effect = [self.DOMAIN_NOT_FOUND,
self.DOMAIN_NOT_FOUND,
self.DOMAIN_NOT_FOUND,]
self.provider_mock.authenticate.side_effect = \
[self.domain_not_found(d)
for d in dns_common.base_domain_name_guesses(self.record_name)]
self.assertRaises(errors.PluginError,
self.client.add_txt_record,
DOMAIN, self.record_name, self.record_content)
self.record_name, self.record_content)
def test_add_txt_record_fail_to_authenticate(self):
self.provider_mock.authenticate.side_effect = self.LOGIN_ERROR
self.provider_mock.authenticate.side_effect = self.login_error(self.record_name)
self.assertRaises(errors.PluginError,
self.client.add_txt_record,
DOMAIN, self.record_name, self.record_content)
self.record_name, self.record_content)
def test_add_txt_record_fail_to_authenticate_with_unknown_error(self):
self.provider_mock.authenticate.side_effect = self.UNKNOWN_LOGIN_ERROR
self.assertRaises(errors.PluginError,
self.client.add_txt_record,
DOMAIN, self.record_name, self.record_content)
self.record_name, self.record_content)
def test_add_txt_record_error_finding_domain(self):
self.provider_mock.authenticate.side_effect = self.GENERIC_ERROR
self.assertRaises(errors.PluginError,
self.client.add_txt_record,
DOMAIN, self.record_name, self.record_content)
self.record_name, self.record_content)
def test_add_txt_record_error_adding_record(self):
self.provider_mock.create_record.side_effect = self.GENERIC_ERROR
self.assertRaises(errors.PluginError,
self.client.add_txt_record,
DOMAIN, self.record_name, self.record_content)
self.record_name, self.record_content)
def test_del_txt_record(self):
self.client.del_txt_record(DOMAIN, self.record_name, self.record_content)
self.client.del_txt_record(self.record_name, self.record_content)
self.provider_mock.delete_record.assert_called_with(type='TXT',
name=self.record_name,
content=self.record_content)
def test_del_txt_record_fail_to_find_domain(self):
self.provider_mock.authenticate.side_effect = [self.DOMAIN_NOT_FOUND,
self.DOMAIN_NOT_FOUND,
self.DOMAIN_NOT_FOUND, ]
self.provider_mock.authenticate.side_effect = \
[self.domain_not_found(d)
for d in dns_common.base_domain_name_guesses(self.record_name)]
self.client.del_txt_record(DOMAIN, self.record_name, self.record_content)
self.client.del_txt_record(self.record_name, self.record_content)
def test_del_txt_record_fail_to_authenticate(self):
self.provider_mock.authenticate.side_effect = self.LOGIN_ERROR
self.provider_mock.authenticate.side_effect = self.login_error(self.record_name)
self.client.del_txt_record(DOMAIN, self.record_name, self.record_content)
self.client.del_txt_record(self.record_name, self.record_content)
def test_del_txt_record_fail_to_authenticate_with_unknown_error(self):
self.provider_mock.authenticate.side_effect = self.UNKNOWN_LOGIN_ERROR
self.client.del_txt_record(DOMAIN, self.record_name, self.record_content)
self.client.del_txt_record(self.record_name, self.record_content)
def test_del_txt_record_error_finding_domain(self):
self.provider_mock.authenticate.side_effect = self.GENERIC_ERROR
self.client.del_txt_record(DOMAIN, self.record_name, self.record_content)
self.client.del_txt_record(self.record_name, self.record_content)
def test_del_txt_record_error_deleting_record(self):
self.provider_mock.delete_record.side_effect = self.GENERIC_ERROR
self.client.del_txt_record(DOMAIN, self.record_name, self.record_content)
self.client.del_txt_record(self.record_name, self.record_content)

View file

@ -24,7 +24,7 @@ cover () {
elif [ "$1" = "certbot_dns_cloudflare" ]; then
min=98
elif [ "$1" = "certbot_dns_cloudxns" ]; then
min=99
min=98
elif [ "$1" = "certbot_dns_digitalocean" ]; then
min=98
elif [ "$1" = "certbot_dns_dnsimple" ]; then