mirror of
https://github.com/certbot/certbot.git
synced 2026-06-06 23:32:06 -04:00
# Conflicts: # acme/acme/client.py # acme/acme/crypto_util.py # acme/acme/standalone.py # certbot-apache/certbot_apache/configurator.py # certbot-apache/certbot_apache/parser.py # certbot-apache/certbot_apache/tests/tls_sni_01_test.py # certbot-apache/certbot_apache/tests/util.py # certbot-apache/certbot_apache/tls_sni_01.py # certbot-nginx/certbot_nginx/configurator.py # certbot-nginx/certbot_nginx/parser.py # certbot-nginx/certbot_nginx/tests/util.py # certbot/account.py # certbot/cert_manager.py # certbot/cli.py # certbot/configuration.py # certbot/main.py # certbot/ocsp.py # certbot/plugins/dns_common_lexicon.py # certbot/plugins/standalone.py # certbot/plugins/util.py # certbot/plugins/webroot.py # certbot/tests/auth_handler_test.py # certbot/tests/cert_manager_test.py # certbot/tests/display/util_test.py # certbot/tests/main_test.py # certbot/tests/util.py # certbot/util.py # tox.ini
168 lines
6.5 KiB
Python
168 lines
6.5 KiB
Python
"""DNS Authenticator for DigitalOcean."""
|
|
import logging
|
|
|
|
import digitalocean
|
|
import zope.interface
|
|
|
|
from certbot import errors
|
|
from certbot import interfaces
|
|
from certbot.plugins import dns_common
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@zope.interface.implementer(interfaces.IAuthenticator)
|
|
@zope.interface.provider(interfaces.IPluginFactory)
|
|
class Authenticator(dns_common.DNSAuthenticator):
|
|
"""DNS Authenticator for DigitalOcean
|
|
|
|
This Authenticator uses the DigitalOcean API to fulfill a dns-01 challenge.
|
|
"""
|
|
|
|
description = 'Obtain certs using a DNS TXT record (if you are using DigitalOcean for DNS).'
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(Authenticator, self).__init__(*args, **kwargs)
|
|
self.credentials = None
|
|
|
|
@classmethod
|
|
def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
|
|
super(Authenticator, cls).add_parser_arguments(add)
|
|
add('credentials', help='DigitalOcean credentials INI file.')
|
|
|
|
def more_info(self): # pylint: disable=missing-docstring,no-self-use
|
|
return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \
|
|
'the DigitalOcean API.'
|
|
|
|
def _setup_credentials(self):
|
|
self.credentials = self._configure_credentials(
|
|
'credentials',
|
|
'DigitalOcean credentials INI file',
|
|
{
|
|
'token': 'API token for DigitalOcean account'
|
|
}
|
|
)
|
|
|
|
def _perform(self, domain, validation_name, validation):
|
|
self._get_digitalocean_client().add_txt_record(domain, validation_name, validation)
|
|
|
|
def _cleanup(self, domain, validation_name, validation):
|
|
self._get_digitalocean_client().del_txt_record(domain, validation_name, validation)
|
|
|
|
def _get_digitalocean_client(self):
|
|
return _DigitalOceanClient(self.credentials.conf('token'))
|
|
|
|
|
|
class _DigitalOceanClient(object):
|
|
"""
|
|
Encapsulates all communication with the DigitalOcean API.
|
|
"""
|
|
|
|
def __init__(self, token):
|
|
self.manager = digitalocean.Manager(token=token)
|
|
|
|
def add_txt_record(self, domain_name, record_name, record_content):
|
|
"""
|
|
Add a TXT record using the supplied information.
|
|
|
|
:param str domain_name: The domain to use to associate the record with.
|
|
:param str record_name: The record name (typically beginning with '_acme-challenge.').
|
|
:param str record_content: The record content (typically the challenge validation).
|
|
:raises certbot.errors.PluginError: if an error occurs communicating with the DigitalOcean
|
|
API
|
|
"""
|
|
|
|
try:
|
|
domain = self._find_domain(domain_name)
|
|
except digitalocean.Error as e:
|
|
hint = None
|
|
|
|
if str(e).startswith("Unable to authenticate"):
|
|
hint = 'Did you provide a valid API token?'
|
|
|
|
logger.debug('Error finding domain using the DigitalOcean API: %s', e)
|
|
raise errors.PluginError('Error finding domain using the DigitalOcean API: {0}{1}'
|
|
.format(e, ' ({0})'.format(hint) if hint else ''))
|
|
|
|
try:
|
|
result = domain.create_new_domain_record(
|
|
type='TXT',
|
|
name=self._compute_record_name(domain, record_name),
|
|
data=record_content)
|
|
|
|
record_id = result['domain_record']['id']
|
|
|
|
logger.debug('Successfully added TXT record with id: %d', record_id)
|
|
except digitalocean.Error as e:
|
|
logger.debug('Error adding TXT record using the DigitalOcean API: %s', e)
|
|
raise errors.PluginError('Error adding TXT record using the DigitalOcean API: {0}'
|
|
.format(e))
|
|
|
|
def del_txt_record(self, domain_name, record_name, record_content):
|
|
"""
|
|
Delete a TXT record using the supplied information.
|
|
|
|
Note that both the record's name and content are used to ensure that similar records
|
|
created concurrently (e.g., due to concurrent invocations of this plugin) are not deleted.
|
|
|
|
Failures are logged, but not raised.
|
|
|
|
:param str domain_name: The domain to use to associate the record with.
|
|
:param str record_name: The record name (typically beginning with '_acme-challenge.').
|
|
:param str record_content: The record content (typically the challenge validation).
|
|
"""
|
|
|
|
try:
|
|
domain = self._find_domain(domain_name)
|
|
except digitalocean.Error as e:
|
|
logger.debug('Error finding domain using the DigitalOcean API: %s', e)
|
|
return
|
|
|
|
try:
|
|
domain_records = domain.get_records()
|
|
|
|
matching_records = [record for record in domain_records
|
|
if record.type == 'TXT'
|
|
and record.name == self._compute_record_name(domain, record_name)
|
|
and record.data == record_content]
|
|
except digitalocean.Error as e:
|
|
logger.debug('Error getting DNS records using the DigitalOcean API: %s', e)
|
|
return
|
|
|
|
for record in matching_records:
|
|
try:
|
|
logger.debug('Removing TXT record with id: %s', record.id)
|
|
record.destroy()
|
|
except digitalocean.Error as e:
|
|
logger.warning('Error deleting TXT record %s using the DigitalOcean API: %s',
|
|
record.id, e)
|
|
|
|
def _find_domain(self, domain_name):
|
|
"""
|
|
Find the domain object for a given domain name.
|
|
|
|
:param str domain_name: The domain name for which to find the corresponding Domain.
|
|
:returns: The Domain, if found.
|
|
:rtype: `~digitalocean.Domain`
|
|
:raises certbot.errors.PluginError: if no matching Domain is found.
|
|
"""
|
|
|
|
domain_name_guesses = dns_common.base_domain_name_guesses(domain_name)
|
|
|
|
domains = self.manager.get_all_domains()
|
|
|
|
for guess in domain_name_guesses:
|
|
matches = [domain for domain in domains if domain.name == guess]
|
|
|
|
if matches:
|
|
domain = matches[0]
|
|
logger.debug('Found base domain for %s using name %s', domain_name, guess)
|
|
return domain
|
|
|
|
raise errors.PluginError('Unable to determine base domain for {0} using names: {1}.'
|
|
.format(domain_name, domain_name_guesses))
|
|
|
|
@staticmethod
|
|
def _compute_record_name(domain, full_record_name):
|
|
# The domain, from DigitalOcean's point of view, is automatically appended.
|
|
return full_record_name.rpartition("." + domain.name)[0]
|