Fully type all DNS plugins (#9125)

* Add types in all DNS plugins

* Order imports

* Fix type

* Update certbot-dns-route53/certbot_dns_route53/_internal/dns_route53.py

Co-authored-by: alexzorin <alex@zor.io>

* Clean up imports

Co-authored-by: alexzorin <alex@zor.io>
This commit is contained in:
Adrien Ferrand 2021-12-14 02:38:14 +01:00 committed by GitHub
parent cb3e1403cd
commit 89ccbccff0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 248 additions and 182 deletions

View file

@ -1,6 +1,7 @@
"""DNS Authenticator for Cloudflare."""
import logging
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
@ -26,20 +27,21 @@ class Authenticator(dns_common.DNSAuthenticator):
'DNS).')
ttl = 120
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.credentials: Optional[CredentialsConfiguration] = None
@classmethod
def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
super().add_parser_arguments(add)
def add_parser_arguments(cls, add: Callable[..., None],
default_propagation_seconds: int = 10) -> None:
super().add_parser_arguments(add, default_propagation_seconds)
add('credentials', help='Cloudflare credentials INI file.')
def more_info(self): # pylint: disable=missing-function-docstring
def more_info(self) -> str:
return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \
'the Cloudflare API.'
def _validate_credentials(self, credentials):
def _validate_credentials(self, credentials: CredentialsConfiguration) -> None:
token = credentials.conf('api-token')
email = credentials.conf('email')
key = credentials.conf('api-key')
@ -62,7 +64,7 @@ class Authenticator(dns_common.DNSAuthenticator):
'dns_cloudflare_email and dns_cloudflare_api_key are required.'
' (see {})'.format(credentials.confobj.filename, ACCOUNT_URL))
def _setup_credentials(self):
def _setup_credentials(self) -> None:
self.credentials = self._configure_credentials(
'credentials',
'Cloudflare credentials INI file',
@ -70,13 +72,13 @@ class Authenticator(dns_common.DNSAuthenticator):
self._validate_credentials
)
def _perform(self, domain, validation_name, validation):
def _perform(self, domain: str, validation_name: str, validation: str) -> None:
self._get_cloudflare_client().add_txt_record(domain, validation_name, validation, self.ttl)
def _cleanup(self, domain, validation_name, validation):
def _cleanup(self, domain: str, validation_name: str, validation: str) -> None:
self._get_cloudflare_client().del_txt_record(domain, validation_name, validation)
def _get_cloudflare_client(self):
def _get_cloudflare_client(self) -> "_CloudflareClient":
if not self.credentials: # pragma: no cover
raise errors.Error("Plugin has not been prepared.")
if self.credentials.conf('api-token'):
@ -89,10 +91,11 @@ class _CloudflareClient:
Encapsulates all communication with the Cloudflare API.
"""
def __init__(self, email, api_key):
def __init__(self, email: Optional[str], api_key: str) -> None:
self.cf = CloudFlare.CloudFlare(email, api_key)
def add_txt_record(self, domain, record_name, record_content, record_ttl):
def add_txt_record(self, domain: str, record_name: str, record_content: str,
record_ttl: int) -> None:
"""
Add a TXT record using the supplied information.
@ -127,7 +130,7 @@ class _CloudflareClient:
record_id = self._find_txt_record_id(zone_id, record_name, record_content)
logger.debug('Successfully added TXT record with record_id: %s', record_id)
def del_txt_record(self, domain, record_name, record_content):
def del_txt_record(self, domain: str, record_name: str, record_content: str) -> None:
"""
Delete a TXT record using the supplied information.
@ -161,7 +164,7 @@ class _CloudflareClient:
else:
logger.debug('Zone not found; no cleanup needed.')
def _find_zone_id(self, domain):
def _find_zone_id(self, domain: str) -> str:
"""
Find the zone_id for a given domain.
@ -226,7 +229,8 @@ class _CloudflareClient:
'supplied Cloudflare account.'
.format(domain, zone_name_guesses))
def _find_txt_record_id(self, zone_id, record_name, record_content):
def _find_txt_record_id(self, zone_id: str, record_name: str,
record_content: str) -> Optional[str]:
"""
Find the record_id for a TXT record with the given name and content.

View file

@ -1,8 +1,11 @@
"""DNS Authenticator for CloudXNS DNS."""
import logging
from typing import Any
from typing import Callable
from typing import Optional
from lexicon.providers import cloudxns
from requests import HTTPError
from certbot import errors
from certbot.plugins import dns_common
@ -23,20 +26,21 @@ class Authenticator(dns_common.DNSAuthenticator):
description = 'Obtain certificates using a DNS TXT record (if you are using CloudXNS for DNS).'
ttl = 60
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.credentials: Optional[CredentialsConfiguration] = None
@classmethod
def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
super().add_parser_arguments(add, default_propagation_seconds=30)
def add_parser_arguments(cls, add: Callable[..., None],
default_propagation_seconds: int = 30) -> None:
super().add_parser_arguments(add, default_propagation_seconds)
add('credentials', help='CloudXNS credentials INI file.')
def more_info(self): # pylint: disable=missing-function-docstring
def more_info(self) -> str:
return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \
'the CloudXNS API.'
def _setup_credentials(self):
def _setup_credentials(self) -> None:
self.credentials = self._configure_credentials(
'credentials',
'CloudXNS credentials INI file',
@ -47,13 +51,13 @@ class Authenticator(dns_common.DNSAuthenticator):
}
)
def _perform(self, domain, validation_name, validation):
def _perform(self, domain: str, validation_name: str, validation: str) -> None:
self._get_cloudxns_client().add_txt_record(domain, validation_name, validation)
def _cleanup(self, domain, validation_name, validation):
def _cleanup(self, domain: str, validation_name: str, validation: str) -> None:
self._get_cloudxns_client().del_txt_record(domain, validation_name, validation)
def _get_cloudxns_client(self):
def _get_cloudxns_client(self) -> "_CloudXNSLexiconClient":
if not self.credentials: # pragma: no cover
raise errors.Error("Plugin has not been prepared.")
return _CloudXNSLexiconClient(self.credentials.conf('api-key'),
@ -66,7 +70,7 @@ class _CloudXNSLexiconClient(dns_common_lexicon.LexiconClient):
Encapsulates all communication with the CloudXNS via Lexicon.
"""
def __init__(self, api_key, secret_key, ttl):
def __init__(self, api_key: str, secret_key: str, ttl: int) -> None:
super().__init__()
config = dns_common_lexicon.build_lexicon_config('cloudxns', {
@ -78,10 +82,12 @@ class _CloudXNSLexiconClient(dns_common_lexicon.LexiconClient):
self.provider = cloudxns.Provider(config)
def _handle_http_error(self, e, domain_name):
def _handle_http_error(self, e: HTTPError, domain_name: str) -> Optional[errors.PluginError]:
hint = None
if str(e).startswith('400 Client Error:'):
hint = 'Are your API key and Secret key values correct?'
return errors.PluginError('Error determining zone identifier for {0}: {1}.{2}'
.format(domain_name, e, ' ({0})'.format(hint) if hint else ''))
hint_disp = f' ({hint})' if hint else ''
return errors.PluginError(f'Error determining zone identifier for {domain_name}: '
f'{e}.{hint_disp}')

View file

@ -1,5 +1,7 @@
"""DNS Authenticator for DigitalOcean."""
import logging
from typing import Any
from typing import Callable
from typing import Optional
import digitalocean
@ -21,20 +23,21 @@ class Authenticator(dns_common.DNSAuthenticator):
'using DigitalOcean for DNS).'
ttl = 30
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.credentials: Optional[CredentialsConfiguration] = None
@classmethod
def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
super().add_parser_arguments(add)
def add_parser_arguments(cls, add: Callable[..., None],
default_propagation_seconds: int = 10) -> None:
super().add_parser_arguments(add, default_propagation_seconds)
add('credentials', help='DigitalOcean credentials INI file.')
def more_info(self): # pylint: disable=missing-function-docstring
def more_info(self) -> str:
return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \
'the DigitalOcean API.'
def _setup_credentials(self):
def _setup_credentials(self) -> None:
self.credentials = self._configure_credentials(
'credentials',
'DigitalOcean credentials INI file',
@ -43,14 +46,14 @@ class Authenticator(dns_common.DNSAuthenticator):
}
)
def _perform(self, domain, validation_name, validation):
def _perform(self, domain: str, validation_name: str, validation: str) -> None:
self._get_digitalocean_client().add_txt_record(domain, validation_name, validation,
self.ttl)
def _cleanup(self, domain, validation_name, validation):
def _cleanup(self, domain: str, validation_name: str, validation: str) -> None:
self._get_digitalocean_client().del_txt_record(domain, validation_name, validation)
def _get_digitalocean_client(self):
def _get_digitalocean_client(self) -> "_DigitalOceanClient":
if not self.credentials: # pragma: no cover
raise errors.Error("Plugin has not been prepared.")
return _DigitalOceanClient(self.credentials.conf('token'))
@ -61,11 +64,11 @@ class _DigitalOceanClient:
Encapsulates all communication with the DigitalOcean API.
"""
def __init__(self, token):
def __init__(self, token: str) -> None:
self.manager = digitalocean.Manager(token=token)
def add_txt_record(self, domain_name: str, record_name: str, record_content: str,
record_ttl: int):
record_ttl: int) -> None:
"""
Add a TXT record using the supplied information.
@ -104,7 +107,7 @@ class _DigitalOceanClient:
raise errors.PluginError('Error adding TXT record using the DigitalOcean API: {0}'
.format(e))
def del_txt_record(self, domain_name: str, record_name: str, record_content: str):
def del_txt_record(self, domain_name: str, record_name: str, record_content: str) -> None:
"""
Delete a TXT record using the supplied information.
@ -143,7 +146,7 @@ class _DigitalOceanClient:
logger.warning('Error deleting TXT record %s using the DigitalOcean API: %s',
record.id, e)
def _find_domain(self, domain_name):
def _find_domain(self, domain_name: str) -> digitalocean.Domain:
"""
Find the domain object for a given domain name.
@ -165,10 +168,10 @@ class _DigitalOceanClient:
logger.debug('Found base domain for %s using name %s', domain_name, guess)
return domain
raise errors.PluginError('Unable to determine base domain for {0} using names: {1}.'
.format(domain_name, domain_name_guesses))
raise errors.PluginError(f'Unable to determine base domain for {domain_name} using names: '
f'{domain_name_guesses}.')
@staticmethod
def _compute_record_name(domain, full_record_name):
def _compute_record_name(domain: digitalocean.Domain, full_record_name: str) -> str:
# The domain, from DigitalOcean's point of view, is automatically appended.
return full_record_name.rpartition("." + domain.name)[0]

View file

@ -1,8 +1,11 @@
"""DNS Authenticator for DNSimple DNS."""
import logging
from typing import Any
from typing import Callable
from typing import Optional
from lexicon.providers import dnsimple
from requests import HTTPError
from certbot import errors
from certbot.plugins import dns_common
@ -23,20 +26,21 @@ class Authenticator(dns_common.DNSAuthenticator):
description = 'Obtain certificates using a DNS TXT record (if you are using DNSimple for DNS).'
ttl = 60
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.credentials: Optional[CredentialsConfiguration] = None
@classmethod
def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
super().add_parser_arguments(add, default_propagation_seconds=30)
def add_parser_arguments(cls, add: Callable[..., None],
default_propagation_seconds: int = 30) -> None:
super().add_parser_arguments(add, default_propagation_seconds)
add('credentials', help='DNSimple credentials INI file.')
def more_info(self): # pylint: disable=missing-function-docstring
def more_info(self) -> str:
return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \
'the DNSimple API.'
def _setup_credentials(self):
def _setup_credentials(self) -> None:
self.credentials = self._configure_credentials(
'credentials',
'DNSimple credentials INI file',
@ -45,13 +49,13 @@ class Authenticator(dns_common.DNSAuthenticator):
}
)
def _perform(self, domain, validation_name, validation):
def _perform(self, domain: str, validation_name: str, validation: str) -> None:
self._get_dnsimple_client().add_txt_record(domain, validation_name, validation)
def _cleanup(self, domain, validation_name, validation):
def _cleanup(self, domain: str, validation_name: str, validation: str) -> None:
self._get_dnsimple_client().del_txt_record(domain, validation_name, validation)
def _get_dnsimple_client(self):
def _get_dnsimple_client(self) -> "_DNSimpleLexiconClient":
if not self.credentials: # pragma: no cover
raise errors.Error("Plugin has not been prepared.")
return _DNSimpleLexiconClient(self.credentials.conf('token'), self.ttl)
@ -62,7 +66,7 @@ class _DNSimpleLexiconClient(dns_common_lexicon.LexiconClient):
Encapsulates all communication with the DNSimple via Lexicon.
"""
def __init__(self, token, ttl):
def __init__(self, token: str, ttl: int) -> None:
super().__init__()
config = dns_common_lexicon.build_lexicon_config('dnssimple', {
@ -73,10 +77,12 @@ class _DNSimpleLexiconClient(dns_common_lexicon.LexiconClient):
self.provider = dnsimple.Provider(config)
def _handle_http_error(self, e, domain_name):
def _handle_http_error(self, e: HTTPError, domain_name: str) -> errors.PluginError:
hint = None
if str(e).startswith('401 Client Error: Unauthorized for url:'):
hint = 'Is your API token value correct?'
return errors.PluginError('Error determining zone identifier for {0}: {1}.{2}'
.format(domain_name, e, ' ({0})'.format(hint) if hint else ''))
hint_disp = f' ({hint})' if hint else ''
return errors.PluginError(f'Error determining zone identifier for {domain_name}: '
f'{e}.{hint_disp}')

View file

@ -1,8 +1,11 @@
"""DNS Authenticator for DNS Made Easy DNS."""
import logging
from typing import Any
from typing import Callable
from typing import Optional
from lexicon.providers import dnsmadeeasy
from requests import HTTPError
from certbot import errors
from certbot.plugins import dns_common
@ -24,20 +27,21 @@ class Authenticator(dns_common.DNSAuthenticator):
'DNS).')
ttl = 60
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.credentials: Optional[CredentialsConfiguration] = None
@classmethod
def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
super().add_parser_arguments(add, default_propagation_seconds=60)
def add_parser_arguments(cls, add: Callable[..., None],
default_propagation_seconds: int = 60) -> None:
super().add_parser_arguments(add, default_propagation_seconds)
add('credentials', help='DNS Made Easy credentials INI file.')
def more_info(self): # pylint: disable=missing-function-docstring
def more_info(self) -> str:
return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \
'the DNS Made Easy API.'
def _setup_credentials(self):
def _setup_credentials(self) -> None:
self.credentials = self._configure_credentials(
'credentials',
'DNS Made Easy credentials INI file',
@ -49,13 +53,13 @@ class Authenticator(dns_common.DNSAuthenticator):
}
)
def _perform(self, domain, validation_name, validation):
def _perform(self, domain: str, validation_name: str, validation: str) -> None:
self._get_dnsmadeeasy_client().add_txt_record(domain, validation_name, validation)
def _cleanup(self, domain, validation_name, validation):
def _cleanup(self, domain: str, validation_name: str, validation: str) -> None:
self._get_dnsmadeeasy_client().del_txt_record(domain, validation_name, validation)
def _get_dnsmadeeasy_client(self):
def _get_dnsmadeeasy_client(self) -> "_DNSMadeEasyLexiconClient":
if not self.credentials: # pragma: no cover
raise errors.Error("Plugin has not been prepared.")
return _DNSMadeEasyLexiconClient(self.credentials.conf('api-key'),
@ -68,7 +72,7 @@ class _DNSMadeEasyLexiconClient(dns_common_lexicon.LexiconClient):
Encapsulates all communication with the DNS Made Easy via Lexicon.
"""
def __init__(self, api_key, secret_key, ttl):
def __init__(self, api_key: str, secret_key: str, ttl: int) -> None:
super().__init__()
config = dns_common_lexicon.build_lexicon_config('dnsmadeeasy', {
@ -80,7 +84,7 @@ class _DNSMadeEasyLexiconClient(dns_common_lexicon.LexiconClient):
self.provider = dnsmadeeasy.Provider(config)
def _handle_http_error(self, e, domain_name):
def _handle_http_error(self, e: HTTPError, domain_name: str) -> Optional[errors.PluginError]:
if domain_name in str(e) and str(e).startswith('404 Client Error: Not Found for url:'):
return None
@ -88,5 +92,6 @@ class _DNSMadeEasyLexiconClient(dns_common_lexicon.LexiconClient):
if str(e).startswith('403 Client Error: Forbidden for url:'):
hint = 'Are your API key and Secret key values correct?'
return errors.PluginError('Error determining zone identifier: {0}.{1}'
.format(e, ' ({0})'.format(hint) if hint else ''))
hint_disp = f' ({hint})' if hint else ''
return errors.PluginError(f'Error determining zone identifier: {e}.{hint_disp}')

View file

@ -1,8 +1,11 @@
"""DNS Authenticator for Gehirn Infrastructure Service DNS."""
import logging
from typing import Any
from typing import Callable
from typing import Optional
from lexicon.providers import gehirn
from requests import HTTPError
from certbot import errors
from certbot.plugins import dns_common
@ -25,20 +28,21 @@ class Authenticator(dns_common.DNSAuthenticator):
'(if you are using Gehirn Infrastructure Service for DNS).'
ttl = 60
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.credentials: Optional[CredentialsConfiguration] = None
@classmethod
def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
super().add_parser_arguments(add, default_propagation_seconds=30)
def add_parser_arguments(cls, add: Callable[..., None],
default_propagation_seconds: int = 30) -> None:
super().add_parser_arguments(add, default_propagation_seconds)
add('credentials', help='Gehirn Infrastructure Service credentials file.')
def more_info(self): # pylint: disable=missing-function-docstring
def more_info(self) -> str:
return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \
'the Gehirn Infrastructure Service API.'
def _setup_credentials(self):
def _setup_credentials(self) -> None:
self.credentials = self._configure_credentials(
'credentials',
'Gehirn Infrastructure Service credentials file',
@ -50,13 +54,13 @@ class Authenticator(dns_common.DNSAuthenticator):
}
)
def _perform(self, domain, validation_name, validation):
def _perform(self, domain: str, validation_name: str, validation: str) -> None:
self._get_gehirn_client().add_txt_record(domain, validation_name, validation)
def _cleanup(self, domain, validation_name, validation):
def _cleanup(self, domain: str, validation_name: str, validation: str) -> None:
self._get_gehirn_client().del_txt_record(domain, validation_name, validation)
def _get_gehirn_client(self):
def _get_gehirn_client(self) -> "_GehirnLexiconClient":
if not self.credentials: # pragma: no cover
raise errors.Error("Plugin has not been prepared.")
return _GehirnLexiconClient(
@ -71,7 +75,7 @@ class _GehirnLexiconClient(dns_common_lexicon.LexiconClient):
Encapsulates all communication with the Gehirn Infrastructure Service via Lexicon.
"""
def __init__(self, api_token, api_secret, ttl):
def __init__(self, api_token: str, api_secret: str, ttl: int) -> None:
super().__init__()
config = dns_common_lexicon.build_lexicon_config('gehirn', {
@ -83,7 +87,7 @@ class _GehirnLexiconClient(dns_common_lexicon.LexiconClient):
self.provider = gehirn.Provider(config)
def _handle_http_error(self, e, domain_name):
def _handle_http_error(self, e: HTTPError, domain_name: str) -> Optional[errors.PluginError]:
if domain_name in str(e) and (str(e).startswith('404 Client Error: Not Found for url:')):
return None # Expected errors when zone name guess is wrong
return super()._handle_http_error(e, domain_name)

View file

@ -1,6 +1,10 @@
"""DNS Authenticator for Google Cloud DNS."""
import json
import logging
from typing import Any
from typing import Callable
from typing import Dict
from typing import Optional
from googleapiclient import discovery
from googleapiclient import errors as googleapiclient_errors
@ -29,7 +33,8 @@ class Authenticator(dns_common.DNSAuthenticator):
ttl = 60
@classmethod
def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
def add_parser_arguments(cls, add: Callable[..., None],
default_propagation_seconds: int = 60) -> None:
super().add_parser_arguments(add, default_propagation_seconds=60)
add('credentials',
help=('Path to Google Cloud DNS service account JSON file. (See {0} for' +
@ -37,11 +42,11 @@ class Authenticator(dns_common.DNSAuthenticator):
'required permissions.)').format(ACCT_URL, PERMISSIONS_URL),
default=None)
def more_info(self): # pylint: disable=missing-function-docstring
def more_info(self) -> str:
return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \
'the Google Cloud DNS API.'
def _setup_credentials(self):
def _setup_credentials(self) -> None:
if self.conf('credentials') is None:
try:
# use project_id query to check for availability of google metadata server
@ -58,13 +63,13 @@ class Authenticator(dns_common.DNSAuthenticator):
dns_common.validate_file_permissions(self.conf('credentials'))
def _perform(self, domain, validation_name, validation):
def _perform(self, domain: str, validation_name: str, validation: str) -> None:
self._get_google_client().add_txt_record(domain, validation_name, validation, self.ttl)
def _cleanup(self, domain, validation_name, validation):
def _cleanup(self, domain: str, validation_name: str, validation: str) -> None:
self._get_google_client().del_txt_record(domain, validation_name, validation, self.ttl)
def _get_google_client(self):
def _get_google_client(self) -> '_GoogleClient':
return _GoogleClient(self.conf('credentials'))
@ -73,7 +78,8 @@ class _GoogleClient:
Encapsulates all communication with the Google Cloud DNS API.
"""
def __init__(self, account_json=None, dns_api=None):
def __init__(self, account_json: Optional[str] = None,
dns_api: Optional[discovery.Resource] = None) -> None:
scopes = ['https://www.googleapis.com/auth/ndev.clouddns.readwrite']
if account_json is not None:
@ -95,7 +101,8 @@ class _GoogleClient:
else:
self.dns = dns_api
def add_txt_record(self, domain, record_name, record_content, record_ttl):
def add_txt_record(self, domain: str, record_name: str, record_content: str,
record_ttl: int) -> None:
"""
Add a TXT record using the supplied information.
@ -110,9 +117,9 @@ class _GoogleClient:
record_contents = self.get_existing_txt_rrset(zone_id, record_name)
if record_contents is None:
# If it wasn't possible to fetch the records at this label (missing .list permission),
# assume there aren't any (#5678). If there are actually records here, this will fail
# with HTTP 409/412 API errors.
# If it wasn't possible to fetch the records at this label (missing .list permission),
# assume there aren't any (#5678). If there are actually records here, this will fail
# with HTTP 409/412 API errors.
record_contents = {"rrdatas": []}
add_records = record_contents["rrdatas"][:]
@ -164,7 +171,8 @@ class _GoogleClient:
raise errors.PluginError('Error communicating with the Google Cloud DNS API: {0}'
.format(e))
def del_txt_record(self, domain, record_name, record_content, record_ttl):
def del_txt_record(self, domain: str, record_name: str, record_content: str,
record_ttl: int) -> None:
"""
Delete a TXT record using the supplied information.
@ -224,7 +232,7 @@ class _GoogleClient:
except googleapiclient_errors.Error as e:
logger.warning('Encountered error deleting TXT record: %s', e)
def get_existing_txt_rrset(self, zone_id, record_name):
def get_existing_txt_rrset(self, zone_id: str, record_name: str) -> Optional[Dict[str, Any]]:
"""
Get existing TXT records from the RRset for the record name.
@ -254,7 +262,7 @@ class _GoogleClient:
return response["rrsets"][0]
return None
def _find_managed_zone_id(self, domain):
def _find_managed_zone_id(self, domain: str) -> str:
"""
Find the managed zone for a given domain.
@ -286,7 +294,7 @@ class _GoogleClient:
.format(domain, zone_dns_name_guesses))
@staticmethod
def get_project_id():
def get_project_id() -> str:
"""
Query the google metadata service for the current project ID

View file

@ -1,6 +1,8 @@
"""DNS Authenticator for Linode."""
import logging
import re
from typing import Any
from typing import Callable
from typing import Optional
from typing import Union
@ -26,20 +28,21 @@ class Authenticator(dns_common.DNSAuthenticator):
description = 'Obtain certificates using a DNS TXT record (if you are using Linode for DNS).'
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.credentials: Optional[CredentialsConfiguration] = None
@classmethod
def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
super().add_parser_arguments(add, default_propagation_seconds=120)
def add_parser_arguments(cls, add: Callable[..., None],
default_propagation_seconds: int = 120) -> None:
super().add_parser_arguments(add, default_propagation_seconds)
add('credentials', help='Linode credentials INI file.')
def more_info(self): # pylint: disable=missing-function-docstring
def more_info(self) -> str:
return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \
'the Linode API.'
def _setup_credentials(self):
def _setup_credentials(self) -> None:
self.credentials = self._configure_credentials(
'credentials',
'Linode credentials INI file',
@ -49,13 +52,13 @@ class Authenticator(dns_common.DNSAuthenticator):
}
)
def _perform(self, domain, validation_name, validation):
def _perform(self, domain: str, validation_name: str, validation: str) -> None:
self._get_linode_client().add_txt_record(domain, validation_name, validation)
def _cleanup(self, domain, validation_name, validation):
def _cleanup(self, domain: str, validation_name: str, validation: str) -> None:
self._get_linode_client().del_txt_record(domain, validation_name, validation)
def _get_linode_client(self):
def _get_linode_client(self) -> '_LinodeLexiconClient':
if not self.credentials: # pragma: no cover
raise errors.Error("Plugin has not been prepared.")
api_key = self.credentials.conf('key')
@ -82,7 +85,7 @@ class _LinodeLexiconClient(dns_common_lexicon.LexiconClient):
Encapsulates all communication with the Linode API.
"""
def __init__(self, api_key, api_version):
def __init__(self, api_key: str, api_version: int) -> None:
super().__init__()
self.api_version = api_version
@ -103,8 +106,8 @@ class _LinodeLexiconClient(dns_common_lexicon.LexiconClient):
raise errors.PluginError('Invalid api version specified: {0}. (Supported: 3, 4)'
.format(api_version))
def _handle_general_error(self, e, domain_name):
def _handle_general_error(self, e: Exception, domain_name: str) -> Optional[errors.PluginError]:
if not str(e).startswith('Domain not found'):
return errors.PluginError('Unexpected error determining zone identifier for {0}: {1}'
.format(domain_name, e))
return errors.PluginError('Unexpected error determining zone identifier '
f'for {domain_name}: {e}')
return None

View file

@ -1,8 +1,11 @@
"""DNS Authenticator for LuaDNS DNS."""
import logging
from typing import Any
from typing import Callable
from typing import Optional
from lexicon.providers import luadns
from requests import HTTPError
from certbot import errors
from certbot.plugins import dns_common
@ -23,20 +26,21 @@ class Authenticator(dns_common.DNSAuthenticator):
description = 'Obtain certificates using a DNS TXT record (if you are using LuaDNS for DNS).'
ttl = 60
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.credentials: Optional[CredentialsConfiguration] = None
@classmethod
def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
super().add_parser_arguments(add, default_propagation_seconds=30)
def add_parser_arguments(cls, add: Callable[..., None],
default_propagation_seconds: int = 30) -> None:
super().add_parser_arguments(add, default_propagation_seconds)
add('credentials', help='LuaDNS credentials INI file.')
def more_info(self): # pylint: disable=missing-function-docstring
def more_info(self) -> str:
return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \
'the LuaDNS API.'
def _setup_credentials(self):
def _setup_credentials(self) -> None:
self.credentials = self._configure_credentials(
'credentials',
'LuaDNS credentials INI file',
@ -46,13 +50,13 @@ class Authenticator(dns_common.DNSAuthenticator):
}
)
def _perform(self, domain, validation_name, validation):
def _perform(self, domain: str, validation_name: str, validation: str) -> None:
self._get_luadns_client().add_txt_record(domain, validation_name, validation)
def _cleanup(self, domain, validation_name, validation):
def _cleanup(self, domain: str, validation_name: str, validation: str) -> None:
self._get_luadns_client().del_txt_record(domain, validation_name, validation)
def _get_luadns_client(self):
def _get_luadns_client(self) -> "_LuaDNSLexiconClient":
if not self.credentials: # pragma: no cover
raise errors.Error("Plugin has not been prepared.")
return _LuaDNSLexiconClient(self.credentials.conf('email'),
@ -65,7 +69,7 @@ class _LuaDNSLexiconClient(dns_common_lexicon.LexiconClient):
Encapsulates all communication with the LuaDNS via Lexicon.
"""
def __init__(self, email, token, ttl):
def __init__(self, email: str, token: str, ttl: int) -> None:
super().__init__()
config = dns_common_lexicon.build_lexicon_config('luadns', {
@ -77,10 +81,12 @@ class _LuaDNSLexiconClient(dns_common_lexicon.LexiconClient):
self.provider = luadns.Provider(config)
def _handle_http_error(self, e, domain_name):
def _handle_http_error(self, e: HTTPError, domain_name: str) -> errors.PluginError:
hint = None
if str(e).startswith('401 Client Error: Unauthorized for url:'):
hint = 'Are your email and API token values correct?'
return errors.PluginError('Error determining zone identifier for {0}: {1}.{2}'
.format(domain_name, e, ' ({0})'.format(hint) if hint else ''))
hint_disp = f' ({hint})' if hint else ''
return errors.PluginError(f'Error determining zone identifier for {domain_name}: '
f'{e}.{hint_disp}')

View file

@ -1,8 +1,11 @@
"""DNS Authenticator for NS1 DNS."""
import logging
from typing import Any
from typing import Callable
from typing import Optional
from lexicon.providers import nsone
from requests import HTTPError
from certbot import errors
from certbot.plugins import dns_common
@ -23,20 +26,21 @@ class Authenticator(dns_common.DNSAuthenticator):
description = 'Obtain certificates using a DNS TXT record (if you are using NS1 for DNS).'
ttl = 60
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.credentials: Optional[CredentialsConfiguration] = None
@classmethod
def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
super().add_parser_arguments(add, default_propagation_seconds=30)
def add_parser_arguments(cls, add: Callable[..., None],
default_propagation_seconds: int = 30) -> None:
super().add_parser_arguments(add, default_propagation_seconds)
add('credentials', help='NS1 credentials file.')
def more_info(self): # pylint: disable=missing-function-docstring
def more_info(self) -> str:
return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \
'the NS1 API.'
def _setup_credentials(self):
def _setup_credentials(self) -> None:
self.credentials = self._configure_credentials(
'credentials',
'NS1 credentials file',
@ -45,13 +49,13 @@ class Authenticator(dns_common.DNSAuthenticator):
}
)
def _perform(self, domain, validation_name, validation):
def _perform(self, domain: str, validation_name: str, validation: str) -> None:
self._get_nsone_client().add_txt_record(domain, validation_name, validation)
def _cleanup(self, domain, validation_name, validation):
def _cleanup(self, domain: str, validation_name: str, validation: str) -> None:
self._get_nsone_client().del_txt_record(domain, validation_name, validation)
def _get_nsone_client(self):
def _get_nsone_client(self) -> "_NS1LexiconClient":
if not self.credentials: # pragma: no cover
raise errors.Error("Plugin has not been prepared.")
return _NS1LexiconClient(self.credentials.conf('api-key'), self.ttl)
@ -62,7 +66,7 @@ class _NS1LexiconClient(dns_common_lexicon.LexiconClient):
Encapsulates all communication with the NS1 via Lexicon.
"""
def __init__(self, api_key, ttl):
def __init__(self, api_key: str, ttl: int) -> None:
super().__init__()
config = dns_common_lexicon.build_lexicon_config('nsone', {
@ -73,13 +77,14 @@ class _NS1LexiconClient(dns_common_lexicon.LexiconClient):
self.provider = nsone.Provider(config)
def _handle_http_error(self, e, domain_name):
if domain_name in str(e) and (str(e).startswith('404 Client Error: Not Found for url:') or \
def _handle_http_error(self, e: HTTPError, domain_name: str) -> Optional[errors.PluginError]:
if domain_name in str(e) and (str(e).startswith('404 Client Error: Not Found for url:') or
str(e).startswith("400 Client Error: Bad Request for url:")):
return None # Expected errors when zone name guess is wrong
hint = None
if str(e).startswith('401 Client Error: Unauthorized for url:'):
hint = 'Is your API key correct?'
return errors.PluginError('Error determining zone identifier: {0}.{1}'
.format(e, ' ({0})'.format(hint) if hint else ''))
hint_disp = f' ({hint})' if hint else ''
return errors.PluginError(f'Error determining zone identifier: {e}.{hint_disp}')

View file

@ -1,8 +1,11 @@
"""DNS Authenticator for OVH DNS."""
import logging
from typing import Any
from typing import Callable
from typing import Optional
from lexicon.providers import ovh
from requests import HTTPError
from certbot import errors
from certbot.plugins import dns_common
@ -23,20 +26,21 @@ class Authenticator(dns_common.DNSAuthenticator):
description = 'Obtain certificates using a DNS TXT record (if you are using OVH for DNS).'
ttl = 60
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.credentials: Optional[CredentialsConfiguration] = None
@classmethod
def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
super().add_parser_arguments(add, default_propagation_seconds=30)
def add_parser_arguments(cls, add: Callable[..., None],
default_propagation_seconds: int = 30) -> None:
super().add_parser_arguments(add, default_propagation_seconds)
add('credentials', help='OVH credentials INI file.')
def more_info(self): # pylint: disable=missing-function-docstring
def more_info(self) -> str:
return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \
'the OVH API.'
def _setup_credentials(self):
def _setup_credentials(self) -> None:
self.credentials = self._configure_credentials(
'credentials',
'OVH credentials INI file',
@ -51,13 +55,13 @@ class Authenticator(dns_common.DNSAuthenticator):
}
)
def _perform(self, domain, validation_name, validation):
def _perform(self, domain: str, validation_name: str, validation: str) -> None:
self._get_ovh_client().add_txt_record(domain, validation_name, validation)
def _cleanup(self, domain, validation_name, validation):
def _cleanup(self, domain: str, validation_name: str, validation: str) -> None:
self._get_ovh_client().del_txt_record(domain, validation_name, validation)
def _get_ovh_client(self):
def _get_ovh_client(self) -> "_OVHLexiconClient":
if not self.credentials: # pragma: no cover
raise errors.Error("Plugin has not been prepared.")
return _OVHLexiconClient(
@ -74,7 +78,8 @@ class _OVHLexiconClient(dns_common_lexicon.LexiconClient):
Encapsulates all communication with the OVH API via Lexicon.
"""
def __init__(self, endpoint, application_key, application_secret, consumer_key, ttl):
def __init__(self, endpoint: str, application_key: str, application_secret: str,
consumer_key: str, ttl: int) -> None:
super().__init__()
config = dns_common_lexicon.build_lexicon_config('ovh', {
@ -88,18 +93,20 @@ class _OVHLexiconClient(dns_common_lexicon.LexiconClient):
self.provider = ovh.Provider(config)
def _handle_http_error(self, e, domain_name):
def _handle_http_error(self, e: HTTPError, domain_name: str) -> errors.PluginError:
hint = None
if str(e).startswith('400 Client Error:'):
hint = 'Is your Application Secret value correct?'
if str(e).startswith('403 Client Error:'):
hint = 'Are your Application Key and Consumer Key values correct?'
return errors.PluginError('Error determining zone identifier for {0}: {1}.{2}'
.format(domain_name, e, ' ({0})'.format(hint) if hint else ''))
hint_disp = f' ({hint})' if hint else ''
def _handle_general_error(self, e, domain_name):
return errors.PluginError(f'Error determining zone identifier for {domain_name}: '
f'{e}.{hint_disp}')
def _handle_general_error(self, e: Exception, domain_name: str) -> Optional[errors.PluginError]:
if domain_name in str(e) and str(e).endswith('not found'):
return
return None
super()._handle_general_error(e, domain_name)
return super()._handle_general_error(e, domain_name)

View file

@ -1,5 +1,7 @@
"""DNS Authenticator using RFC 2136 Dynamic Updates."""
import logging
from typing import Any
from typing import Callable
from typing import Optional
import dns.flags
@ -42,20 +44,21 @@ class Authenticator(dns_common.DNSAuthenticator):
description = 'Obtain certificates using a DNS TXT record (if you are using BIND for DNS).'
ttl = 120
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.credentials: Optional[CredentialsConfiguration] = None
@classmethod
def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
def add_parser_arguments(cls, add: Callable[..., None],
default_propagation_seconds: int = 60) -> None:
super().add_parser_arguments(add, default_propagation_seconds=60)
add('credentials', help='RFC 2136 credentials INI file.')
def more_info(self): # pylint: disable=missing-function-docstring
def more_info(self) -> str:
return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \
'RFC 2136 Dynamic Updates.'
def _validate_credentials(self, credentials):
def _validate_credentials(self, credentials: CredentialsConfiguration) -> None:
server = credentials.conf('server')
if not is_ipaddress(server):
raise errors.PluginError("The configured target DNS server ({0}) is not a valid IPv4 "
@ -65,7 +68,7 @@ class Authenticator(dns_common.DNSAuthenticator):
if not self.ALGORITHMS.get(algorithm.upper()):
raise errors.PluginError("Unknown algorithm: {0}.".format(algorithm))
def _setup_credentials(self):
def _setup_credentials(self) -> None:
self.credentials = self._configure_credentials(
'credentials',
'RFC 2136 credentials INI file',
@ -77,13 +80,13 @@ class Authenticator(dns_common.DNSAuthenticator):
self._validate_credentials
)
def _perform(self, _domain, validation_name, validation):
def _perform(self, _domain: str, validation_name: str, validation: str) -> None:
self._get_rfc2136_client().add_txt_record(validation_name, validation, self.ttl)
def _cleanup(self, _domain, validation_name, validation):
def _cleanup(self, _domain: str, validation_name: str, validation: str) -> None:
self._get_rfc2136_client().del_txt_record(validation_name, validation)
def _get_rfc2136_client(self):
def _get_rfc2136_client(self) -> "_RFC2136Client":
if not self.credentials: # pragma: no cover
raise errors.Error("Plugin has not been prepared.")
return _RFC2136Client(self.credentials.conf('server'),
@ -98,8 +101,8 @@ class _RFC2136Client:
"""
Encapsulates all communication with the target DNS server.
"""
def __init__(self, server, port, key_name, key_secret, key_algorithm,
timeout=DEFAULT_NETWORK_TIMEOUT):
def __init__(self, server: str, port: int, key_name: str, key_secret: str,
key_algorithm: dns.name.Name, timeout: int = DEFAULT_NETWORK_TIMEOUT) -> None:
self.server = server
self.port = port
self.keyring = dns.tsigkeyring.from_text({
@ -108,7 +111,7 @@ class _RFC2136Client:
self.algorithm = key_algorithm
self._default_timeout = timeout
def add_txt_record(self, record_name, record_content, record_ttl):
def add_txt_record(self, record_name: str, record_content: str, record_ttl: int) -> None:
"""
Add a TXT record using the supplied information.
@ -143,7 +146,7 @@ class _RFC2136Client:
raise errors.PluginError('Received response from server: {0}'
.format(dns.rcode.to_text(rcode)))
def del_txt_record(self, record_name, record_content):
def del_txt_record(self, record_name: str, record_content: str) -> None:
"""
Delete a TXT record using the supplied information.
@ -178,7 +181,7 @@ class _RFC2136Client:
raise errors.PluginError('Received response from server: {0}'
.format(dns.rcode.to_text(rcode)))
def _find_domain(self, record_name):
def _find_domain(self, record_name: str) -> str:
"""
Find the closest domain with an SOA record for a given domain name.
@ -198,7 +201,7 @@ class _RFC2136Client:
raise errors.PluginError('Unable to determine base domain for {0} using names: {1}.'
.format(record_name, domain_name_guesses))
def _query_soa(self, domain_name):
def _query_soa(self, domain_name: str) -> bool:
"""
Query a domain name for an authoritative SOA record.

View file

@ -2,6 +2,7 @@
import collections
import logging
import time
from typing import Any
from typing import DefaultDict
from typing import Dict
from typing import List
@ -10,7 +11,9 @@ import boto3
from botocore.exceptions import ClientError
from botocore.exceptions import NoCredentialsError
from acme.challenges import ChallengeResponse
from certbot import errors
from certbot.achallenges import AnnotatedChallenge
from certbot.plugins import dns_common
logger = logging.getLogger(__name__)
@ -32,21 +35,21 @@ class Authenticator(dns_common.DNSAuthenticator):
"DNS).")
ttl = 10
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.r53 = boto3.client("route53")
self._resource_records: DefaultDict[str, List[Dict[str, str]]] = collections.defaultdict(list)
def more_info(self): # pylint: disable=missing-function-docstring
def more_info(self) -> str:
return "Solve a DNS01 challenge using AWS Route53"
def _setup_credentials(self):
def _setup_credentials(self) -> None:
pass
def _perform(self, domain, validation_name, validation):
def _perform(self, domain: str, validation_name: str, validation: str) -> None:
pass
def perform(self, achalls):
def perform(self, achalls: List[AnnotatedChallenge]) -> List[ChallengeResponse]:
self._attempt_cleanup = True
try:
@ -64,13 +67,13 @@ class Authenticator(dns_common.DNSAuthenticator):
raise errors.PluginError("\n".join([str(e), INSTRUCTIONS]))
return [achall.response(achall.account_key) for achall in achalls]
def _cleanup(self, domain, validation_name, validation):
def _cleanup(self, domain: str, validation_name: str, validation: str) -> None:
try:
self._change_txt_record("DELETE", validation_name, validation)
except (NoCredentialsError, ClientError) as e:
logger.debug('Encountered error during cleanup: %s', e, exc_info=True)
def _find_zone_id_for_domain(self, domain):
def _find_zone_id_for_domain(self, domain: str) -> str:
"""Find the zone id responsible a given FQDN.
That is, the id for the zone whose name is the longest parent of the
@ -100,7 +103,7 @@ class Authenticator(dns_common.DNSAuthenticator):
zones.sort(key=lambda z: len(z[0]), reverse=True)
return zones[0][1]
def _change_txt_record(self, action, validation_domain_name, validation):
def _change_txt_record(self, action: str, validation_domain_name: str, validation: str) -> str:
zone_id = self._find_zone_id_for_domain(validation_domain_name)
rrecords = self._resource_records[validation_domain_name]
@ -136,7 +139,7 @@ class Authenticator(dns_common.DNSAuthenticator):
)
return response["ChangeInfo"]["Id"]
def _wait_for_change(self, change_id):
def _wait_for_change(self, change_id: str) -> None:
"""Wait for a change to be propagated to all Route53 DNS servers.
https://docs.aws.amazon.com/Route53/latest/APIReference/API_GetChange.html
"""

View file

@ -1,4 +1,5 @@
"""Shim around `~certbot_dns_route53._internal.dns_route53` for backwards compatibility."""
from typing import Any
import warnings
from certbot_dns_route53._internal import dns_route53
@ -10,7 +11,7 @@ class Authenticator(dns_route53.Authenticator):
hidden = True
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
warnings.warn("The 'authenticator' module was renamed 'dns_route53'",
DeprecationWarning)
super().__init__(*args, **kwargs)

View file

@ -1,8 +1,11 @@
"""DNS Authenticator for Sakura Cloud DNS."""
import logging
from typing import Any
from typing import Callable
from typing import Optional
from lexicon.providers import sakuracloud
from requests import HTTPError
from certbot import errors
from certbot.plugins import dns_common
@ -24,41 +27,40 @@ class Authenticator(dns_common.DNSAuthenticator):
'(if you are using Sakura Cloud for DNS).'
ttl = 60
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.credentials: Optional[CredentialsConfiguration] = None
@classmethod
def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
def add_parser_arguments(cls, add: Callable[..., None],
default_propagation_seconds: int = 30) -> None:
super().add_parser_arguments(
add, default_propagation_seconds=90)
add('credentials', help='Sakura Cloud credentials file.')
def more_info(self): # pylint: disable=missing-function-docstring
def more_info(self) -> str:
return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \
'the Sakura Cloud API.'
def _setup_credentials(self):
def _setup_credentials(self) -> None:
self.credentials = self._configure_credentials(
'credentials',
'Sakura Cloud credentials file',
{
'api-token': \
'API token for Sakura Cloud API obtained from {0}'.format(APIKEY_URL),
'api-secret': \
'API secret for Sakura Cloud API obtained from {0}'.format(APIKEY_URL),
'api-token': f'API token for Sakura Cloud API obtained from {APIKEY_URL}',
'api-secret': f'API secret for Sakura Cloud API obtained from {APIKEY_URL}',
}
)
def _perform(self, domain, validation_name, validation):
def _perform(self, domain: str, validation_name: str, validation: str) -> None:
self._get_sakuracloud_client().add_txt_record(
domain, validation_name, validation)
def _cleanup(self, domain, validation_name, validation):
def _cleanup(self, domain: str, validation_name: str, validation: str) -> None:
self._get_sakuracloud_client().del_txt_record(
domain, validation_name, validation)
def _get_sakuracloud_client(self):
def _get_sakuracloud_client(self) -> "_SakuraCloudLexiconClient":
if not self.credentials: # pragma: no cover
raise errors.Error("Plugin has not been prepared.")
return _SakuraCloudLexiconClient(
@ -73,7 +75,7 @@ class _SakuraCloudLexiconClient(dns_common_lexicon.LexiconClient):
Encapsulates all communication with the Sakura Cloud via Lexicon.
"""
def __init__(self, api_token, api_secret, ttl):
def __init__(self, api_token: str, api_secret: str, ttl: int) -> None:
super().__init__()
config = dns_common_lexicon.build_lexicon_config('sakuracloud', {
@ -85,7 +87,7 @@ class _SakuraCloudLexiconClient(dns_common_lexicon.LexiconClient):
self.provider = sakuracloud.Provider(config)
def _handle_http_error(self, e, domain_name):
def _handle_http_error(self, e: HTTPError, domain_name: str) -> Optional[errors.PluginError]:
if domain_name in str(e) and (str(e).startswith('404 Client Error: Not Found for url:')):
return None # Expected errors when zone name guess is wrong
return super()._handle_http_error(e, domain_name)

View file

@ -109,7 +109,7 @@ class LexiconClient:
raise errors.PluginError('Unable to determine zone identifier for {0} using zone names: {1}'
.format(domain, domain_name_guesses))
def _handle_http_error(self, e: HTTPError, domain_name: str) -> errors.PluginError:
def _handle_http_error(self, e: HTTPError, domain_name: str) -> Optional[errors.PluginError]:
return errors.PluginError('Error determining zone identifier for {0}: {1}.'
.format(domain_name, e))

View file

@ -20,8 +20,8 @@ install_and_test = python {toxinidir}/tools/install_and_test.py
dns_packages = certbot-dns-cloudflare certbot-dns-cloudxns certbot-dns-digitalocean certbot-dns-dnsimple certbot-dns-dnsmadeeasy certbot-dns-gehirn certbot-dns-google certbot-dns-linode certbot-dns-luadns certbot-dns-nsone certbot-dns-ovh certbot-dns-rfc2136 certbot-dns-route53 certbot-dns-sakuracloud
win_all_packages = acme[test] certbot[test] {[base]dns_packages} certbot-nginx
all_packages = {[base]win_all_packages} certbot-apache
fully_typed_source_paths = acme/acme certbot/certbot certbot-ci/certbot_integration_tests certbot-ci/snap_integration_tests certbot-ci/windows_installer_integration_tests certbot-compatibility-test/certbot_compatibility_test tests/lock_test.py
partially_typed_source_paths = certbot-apache/certbot_apache certbot-dns-cloudflare/certbot_dns_cloudflare certbot-dns-cloudxns/certbot_dns_cloudxns certbot-dns-digitalocean/certbot_dns_digitalocean certbot-dns-dnsimple/certbot_dns_dnsimple certbot-dns-dnsmadeeasy/certbot_dns_dnsmadeeasy certbot-dns-gehirn/certbot_dns_gehirn certbot-dns-google/certbot_dns_google certbot-dns-linode/certbot_dns_linode certbot-dns-luadns/certbot_dns_luadns certbot-dns-nsone/certbot_dns_nsone certbot-dns-ovh/certbot_dns_ovh certbot-dns-rfc2136/certbot_dns_rfc2136 certbot-dns-route53/certbot_dns_route53 certbot-dns-sakuracloud/certbot_dns_sakuracloud certbot-nginx/certbot_nginx
fully_typed_source_paths = acme/acme certbot/certbot certbot-ci/certbot_integration_tests certbot-ci/snap_integration_tests certbot-ci/windows_installer_integration_tests certbot-compatibility-test/certbot_compatibility_test certbot-dns-cloudflare/certbot_dns_cloudflare certbot-dns-cloudxns/certbot_dns_cloudxns certbot-dns-digitalocean/certbot_dns_digitalocean certbot-dns-dnsimple/certbot_dns_dnsimple certbot-dns-dnsmadeeasy/certbot_dns_dnsmadeeasy certbot-dns-gehirn/certbot_dns_gehirn certbot-dns-google/certbot_dns_google certbot-dns-linode/certbot_dns_linode certbot-dns-luadns/certbot_dns_luadns certbot-dns-nsone/certbot_dns_nsone certbot-dns-ovh/certbot_dns_ovh certbot-dns-rfc2136/certbot_dns_rfc2136 certbot-dns-route53/certbot_dns_route53 certbot-dns-sakuracloud/certbot_dns_sakuracloud tests/lock_test.py
partially_typed_source_paths = certbot-apache/certbot_apache certbot-nginx/certbot_nginx
[testenv]
passenv =