Merge remote-tracking branch 'upstream/master' into typed-jose-fields

This commit is contained in:
Adrien Ferrand 2022-01-02 00:34:08 +01:00
commit 1706828f13
54 changed files with 760 additions and 510 deletions

View file

@ -26,58 +26,30 @@ jobs:
CERTBOT_NO_PIN: 1
linux-external-mock:
TOXENV: external-mock
linux-boulder-v1-integration-certbot-oldest:
PYTHON_VERSION: 3.6
TOXENV: integration-certbot-oldest
ACME_SERVER: boulder-v1
linux-boulder-v2-integration-certbot-oldest:
PYTHON_VERSION: 3.6
TOXENV: integration-certbot-oldest
ACME_SERVER: boulder-v2
linux-boulder-v1-integration-nginx-oldest:
PYTHON_VERSION: 3.6
TOXENV: integration-nginx-oldest
ACME_SERVER: boulder-v1
linux-boulder-v2-integration-nginx-oldest:
PYTHON_VERSION: 3.6
TOXENV: integration-nginx-oldest
ACME_SERVER: boulder-v2
linux-boulder-v1-py36-integration:
PYTHON_VERSION: 3.6
TOXENV: integration
ACME_SERVER: boulder-v1
linux-boulder-v2-py36-integration:
PYTHON_VERSION: 3.6
TOXENV: integration
ACME_SERVER: boulder-v2
linux-boulder-v1-py37-integration:
PYTHON_VERSION: 3.7
TOXENV: integration
ACME_SERVER: boulder-v1
linux-boulder-v2-py37-integration:
PYTHON_VERSION: 3.7
TOXENV: integration
ACME_SERVER: boulder-v2
linux-boulder-v1-py38-integration:
PYTHON_VERSION: 3.8
TOXENV: integration
ACME_SERVER: boulder-v1
linux-boulder-v2-py38-integration:
PYTHON_VERSION: 3.8
TOXENV: integration
ACME_SERVER: boulder-v2
linux-boulder-v1-py39-integration:
PYTHON_VERSION: 3.9
TOXENV: integration
ACME_SERVER: boulder-v1
linux-boulder-v2-py39-integration:
PYTHON_VERSION: 3.9
TOXENV: integration
ACME_SERVER: boulder-v2
linux-boulder-v1-py310-integration:
PYTHON_VERSION: 3.10
TOXENV: integration
ACME_SERVER: boulder-v1
linux-boulder-v2-py310-integration:
PYTHON_VERSION: 3.10
TOXENV: integration

View file

@ -54,8 +54,12 @@ jobs:
done
displayName: Run integration tests for Docker images
- job: installer_build
pool:
vmImage: vs2017-win2016
strategy:
matrix:
win-2019:
vmImage: windows-2019
win-2022:
vmImage: windows-2022
steps:
- task: UsePythonVersion@0
inputs:
@ -87,17 +91,11 @@ jobs:
matrix:
win2019:
imageName: windows-2019
win2016:
imageName: vs2017-win2016
win2022:
imageName: windows-2022
pool:
vmImage: $(imageName)
steps:
- powershell: |
if ($PSVersionTable.PSVersion.Major -ne 5) {
throw "Powershell version is not 5.x"
}
condition: eq(variables['imageName'], 'vs2017-win2016')
displayName: Check Powershell 5.x is used in vs2017-win2016
- task: UsePythonVersion@0
inputs:
versionSpec: 3.9

View file

@ -4,24 +4,24 @@ jobs:
PYTHON_VERSION: 3.10
strategy:
matrix:
macos-py36:
macos-py36-cover:
IMAGE_NAME: macOS-10.15
PYTHON_VERSION: 3.6
TOXENV: py36
macos-py310:
TOXENV: py36-cover
macos-py310-cover:
IMAGE_NAME: macOS-10.15
PYTHON_VERSION: 3.10
TOXENV: py39
TOXENV: py310-cover
windows-py36:
IMAGE_NAME: vs2017-win2016
IMAGE_NAME: windows-2019
PYTHON_VERSION: 3.6
TOXENV: py36-win
windows-py39-cover:
IMAGE_NAME: vs2017-win2016
IMAGE_NAME: windows-2019
PYTHON_VERSION: 3.9
TOXENV: py39-cover-win
windows-integration-certbot:
IMAGE_NAME: vs2017-win2016
IMAGE_NAME: windows-2019
PYTHON_VERSION: 3.9
TOXENV: integration-certbot
linux-oldest-tests-1:

View file

@ -3,7 +3,7 @@ stages:
jobs:
- job: prepare
pool:
vmImage: vs2017-win2016
vmImage: win2019
steps:
# If we change the output filename from `release_notes.md`, it should also be changed in tools/create_github_release.py
- bash: |

View file

@ -172,8 +172,7 @@ def parse_includes(apachectl):
:rtype: list of str
"""
inc_cmd = [apachectl, "-t", "-D",
"DUMP_INCLUDES"]
inc_cmd = [apachectl, "-t", "-D", "DUMP_INCLUDES"]
return parse_from_subprocess(inc_cmd, r"\(.*\) (.*)")
@ -188,8 +187,7 @@ def parse_modules(apachectl):
:rtype: list of str
"""
mod_cmd = [apachectl, "-t", "-D",
"DUMP_MODULES"]
mod_cmd = [apachectl, "-t", "-D", "DUMP_MODULES"]
return parse_from_subprocess(mod_cmd, r"(.*)_module")

View file

@ -14,7 +14,8 @@ class ApacheParserNode(interfaces.ParserNode):
"""
def __init__(self, **kwargs):
ancestor, dirty, filepath, metadata = util.parsernode_kwargs(kwargs) # pylint: disable=unused-variable
ancestor, dirty, filepath, metadata = util.parsernode_kwargs(
kwargs) # pylint: disable=unused-variable
super().__init__(**kwargs)
self.ancestor = ancestor
self.filepath = filepath

View file

@ -388,7 +388,7 @@ class AugeasBlockNode(AugeasDirectiveNode):
"""
Deletes a ParserNode from the sequence of children, and raises an
exception if it's unable to do so.
:param AugeasParserNode: child: A node to delete.
:param AugeasParserNode child: A node to delete.
"""
if not self.parser.aug.remove(child.metadata["augeaspath"]):
@ -531,7 +531,7 @@ class AugeasBlockNode(AugeasDirectiveNode):
position
)
return (insert_path, resulting_path, before)
return insert_path, resulting_path, before
interfaces.CommentNode.register(AugeasCommentNode)

View file

@ -154,7 +154,7 @@ class ApacheConfigurator(common.Configurator):
# So for old versions of Apache we pick a configuration without this option.
min_openssl_version = util.parse_loose_version('1.0.2l')
openssl_version = self.openssl_version(warn_on_no_mod_ssl)
if self.version < (2, 4, 11) or not openssl_version or\
if self.version < (2, 4, 11) or not openssl_version or \
util.parse_loose_version(openssl_version) < min_openssl_version:
return apache_util.find_ssl_apache_conf("old")
return apache_util.find_ssl_apache_conf("current")
@ -470,15 +470,16 @@ class ApacheConfigurator(common.Configurator):
"""Initializes the ParserNode parser root instance."""
if HAS_APACHECONFIG:
apache_vars = {}
apache_vars["defines"] = apache_util.parse_defines(self.options.ctl)
apache_vars["includes"] = apache_util.parse_includes(self.options.ctl)
apache_vars["modules"] = apache_util.parse_modules(self.options.ctl)
apache_vars = {
"defines": apache_util.parse_defines(self.options.ctl),
"includes": apache_util.parse_includes(self.options.ctl),
"modules": apache_util.parse_modules(self.options.ctl),
}
metadata["apache_vars"] = apache_vars
with open(self.parser.loc["root"]) as f:
with apacheconfig.make_loader(writable=True,
**apacheconfig.flavors.NATIVE_APACHE) as loader:
**apacheconfig.flavors.NATIVE_APACHE) as loader:
metadata["ac_ast"] = loader.loads(f.read())
return dualparser.DualBlockNode(
@ -627,7 +628,7 @@ class ApacheConfigurator(common.Configurator):
# If we haven't managed to enable mod_ssl by this point, error out
if "ssl_module" not in self.parser.modules:
raise errors.MisconfigurationError("Could not find ssl_module; "
"not installing certificate.")
"not installing certificate.")
# Add directives and remove duplicates
self._add_dummy_ssl_directives(vhost.path)
@ -928,7 +929,7 @@ class ApacheConfigurator(common.Configurator):
# Get last ServerName as each overwrites the previous
servername = self.parser.get_arg(servername_match[-1])
return (servername, serveraliases)
return servername, serveraliases
def _add_servernames(self, host):
"""Helper function for get_virtual_hosts().
@ -976,7 +977,7 @@ class ApacheConfigurator(common.Configurator):
is_ssl = True
filename = apache_util.get_file_path(
self.parser.aug.get("/augeas/files%s/path" % apache_util.get_file_path(path)))
self.parser.aug.get(f"/augeas/files{apache_util.get_file_path(path)}/path"))
if filename is None:
return None
@ -1031,7 +1032,7 @@ class ApacheConfigurator(common.Configurator):
for vhost_path in list(self.parser.parser_paths):
paths = self.parser.aug.match(
("/files%s//*[label()=~regexp('%s')]" %
(vhost_path, parser.case_i("VirtualHost"))))
(vhost_path, parser.case_i("VirtualHost"))))
paths = [path for path in paths if
"virtualhost" in os.path.basename(path).lower()]
for path in paths:
@ -1128,10 +1129,8 @@ class ApacheConfigurator(common.Configurator):
:type host: :class:`~certbot_apache.obj.VirtualHost`
"""
servername_match = vhost.node.find_directives("ServerName",
exclude=False)
serveralias_match = vhost.node.find_directives("ServerAlias",
exclude=False)
servername_match = vhost.node.find_directives("ServerName", exclude=False)
serveralias_match = vhost.node.find_directives("ServerAlias", exclude=False)
servername = None
if servername_match:
@ -1143,7 +1142,6 @@ class ApacheConfigurator(common.Configurator):
vhost.aliases.add(serveralias)
vhost.name = servername
def is_name_vhost(self, target_addr):
"""Returns if vhost is a name based vhost
@ -1210,7 +1208,7 @@ class ApacheConfigurator(common.Configurator):
# If HTTPS requested for nonstandard port, add service definition
if https and port != "443":
port_service = "%s %s" % (port, "https")
port_service = f"{port} https"
else:
port_service = port
@ -1240,7 +1238,7 @@ class ApacheConfigurator(common.Configurator):
_, ip = listen[::-1].split(":", 1)
ip = ip[::-1]
if "%s:%s" % (ip, port_service) not in listen_dirs and (
"%s:%s" % (ip, port_service) not in listen_dirs):
"%s:%s" % (ip, port_service) not in listen_dirs):
listen_dirs.add("%s:%s" % (ip, port_service))
if https:
self._add_listens_https(listen_dirs, listens, port)
@ -1262,15 +1260,15 @@ class ApacheConfigurator(common.Configurator):
# We have wildcard, skip the rest
self.parser.add_dir(parser.get_aug_path(self.parser.loc["listen"]),
"Listen", port)
self.save_notes += "Added Listen %s directive to %s\n" % (
port, self.parser.loc["listen"])
self.save_notes += (
f"Added Listen {port} directive to {self.parser.loc['listen']}\n"
)
else:
for listen in new_listens:
self.parser.add_dir(parser.get_aug_path(
self.parser.loc["listen"]), "Listen", listen.split(" "))
self.save_notes += ("Added Listen %s directive to "
"%s\n") % (listen,
self.parser.loc["listen"])
self.save_notes += (f"Added Listen {listen} directive to "
f"{self.parser.loc['listen']}\n")
def _add_listens_https(self, listens, listens_orig, port):
"""Helper method for ensure_listen to figure out which new
@ -1283,7 +1281,7 @@ class ApacheConfigurator(common.Configurator):
# Add service definition for non-standard ports
if port != "443":
port_service = "%s %s" % (port, "https")
port_service = f"{port} https"
else:
port_service = port
@ -1294,16 +1292,16 @@ class ApacheConfigurator(common.Configurator):
self.parser.add_dir_to_ifmodssl(
parser.get_aug_path(self.parser.loc["listen"]),
"Listen", port_service.split(" "))
self.save_notes += "Added Listen %s directive to %s\n" % (
port_service, self.parser.loc["listen"])
self.save_notes += (
f"Added Listen {port_service} directive to {self.parser.loc['listen']}\n"
)
else:
for listen in new_listens:
self.parser.add_dir_to_ifmodssl(
parser.get_aug_path(self.parser.loc["listen"]),
"Listen", listen.split(" "))
self.save_notes += ("Added Listen %s directive to "
"%s\n") % (listen,
self.parser.loc["listen"])
self.save_notes += (f"Added Listen {listen} directive to "
f"{self.parser.loc['listen']}\n")
def _has_port_already(self, listens, port):
"""Helper method for prepare_server_https to find out if user
@ -1368,8 +1366,8 @@ class ApacheConfigurator(common.Configurator):
ssl_fp = self._get_ssl_vhost_path(avail_fp)
orig_matches = self.parser.aug.match("/files%s//* [label()=~regexp('%s')]" %
(self._escape(ssl_fp),
parser.case_i("VirtualHost")))
(self._escape(ssl_fp),
parser.case_i("VirtualHost")))
self._copy_create_ssl_vhost_skeleton(nonssl_vhost, ssl_fp)
@ -1377,8 +1375,8 @@ class ApacheConfigurator(common.Configurator):
self.parser.aug.load()
# Get Vhost augeas path for new vhost
new_matches = self.parser.aug.match("/files%s//* [label()=~regexp('%s')]" %
(self._escape(ssl_fp),
parser.case_i("VirtualHost")))
(self._escape(ssl_fp),
parser.case_i("VirtualHost")))
vh_p = self._get_new_vh_path(orig_matches, new_matches)
@ -1612,7 +1610,7 @@ class ApacheConfigurator(common.Configurator):
span_val = self.parser.aug.span(vhost.path)
except ValueError:
logger.critical("Error while reading the VirtualHost %s from "
"file %s", vhost.name, vhost.filep, exc_info=True)
"file %s", vhost.name, vhost.filep, exc_info=True)
raise errors.PluginError("Unable to read VirtualHost from file")
span_filep = span_val[0]
span_start = span_val[5]
@ -1742,9 +1740,8 @@ class ApacheConfigurator(common.Configurator):
for test_vh in self.vhosts:
if (vhost.filep != test_vh.filep and
any(test_addr in addrs for
test_addr in test_vh.addrs) and
not self.is_name_vhost(addr)):
any(test_addr in addrs for
test_addr in test_vh.addrs) and not self.is_name_vhost(addr)):
self.add_name_vhost(addr)
logger.info("Enabling NameVirtualHosts on %s", addr)
need_to_save = True
@ -1940,10 +1937,7 @@ class ApacheConfigurator(common.Configurator):
Searches AutoHSTS managed VirtualHosts that belong to the lineage.
Matches the private key path.
"""
return bool(
self.parser.find_dir("SSLCertificateKeyFile",
lineage.key_path, vhost.path))
return bool(self.parser.find_dir("SSLCertificateKeyFile", lineage.key_path, vhost.path))
def _enable_ocsp_stapling(self, ssl_vhost, unused_options):
"""Enables OCSP Stapling
@ -1981,7 +1975,7 @@ class ApacheConfigurator(common.Configurator):
# Check if there's an existing SSLUseStapling directive on.
use_stapling_aug_path = self.parser.find_dir("SSLUseStapling",
"on", start=ssl_vhost.path)
"on", start=ssl_vhost.path)
if not use_stapling_aug_path:
self.parser.add_dir(ssl_vhost.path, "SSLUseStapling", "on")
@ -1989,20 +1983,20 @@ class ApacheConfigurator(common.Configurator):
# Check if there's an existing SSLStaplingCache directive.
stapling_cache_aug_path = self.parser.find_dir('SSLStaplingCache',
None, ssl_vhost_aug_path)
None, ssl_vhost_aug_path)
# We'll simply delete the directive, so that we'll have a
# consistent OCSP cache path.
if stapling_cache_aug_path:
self.parser.aug.remove(
re.sub(r"/\w*$", "", stapling_cache_aug_path[0]))
re.sub(r"/\w*$", "", stapling_cache_aug_path[0]))
self.parser.add_dir_to_ifmodssl(ssl_vhost_aug_path,
"SSLStaplingCache",
["shmcb:/var/run/apache2/stapling_cache(128000)"])
"SSLStaplingCache",
["shmcb:/var/run/apache2/stapling_cache(128000)"])
msg = "OCSP Stapling was enabled on SSL Vhost: %s.\n"%(
ssl_vhost.filep)
ssl_vhost.filep)
self.save_notes += msg
self.save()
logger.info(msg)
@ -2073,7 +2067,7 @@ class ApacheConfigurator(common.Configurator):
for match in header_path:
if re.search(pat, self.parser.aug.get(match).lower()):
raise errors.PluginEnhancementAlreadyPresent(
"Existing %s header" % (header_substring))
"Existing %s header" % header_substring)
def _enable_redirect(self, ssl_vhost, unused_options):
"""Redirect all equivalent HTTP traffic to ssl_vhost.
@ -2159,10 +2153,10 @@ class ApacheConfigurator(common.Configurator):
def _set_https_redirection_rewrite_rule(self, vhost):
if self.get_version() >= (2, 3, 9):
self.parser.add_dir(vhost.path, "RewriteRule",
constants.REWRITE_HTTPS_ARGS_WITH_END)
constants.REWRITE_HTTPS_ARGS_WITH_END)
else:
self.parser.add_dir(vhost.path, "RewriteRule",
constants.REWRITE_HTTPS_ARGS)
constants.REWRITE_HTTPS_ARGS)
def _verify_no_certbot_redirect(self, vhost):
"""Checks to see if a redirect was already installed by certbot.
@ -2234,7 +2228,7 @@ class ApacheConfigurator(common.Configurator):
"""
rewrite_engine_path_list = self.parser.find_dir("RewriteEngine", "on",
start=vhost.path)
start=vhost.path)
if rewrite_engine_path_list:
for re_path in rewrite_engine_path_list:
# A RewriteEngine directive may also be included in per
@ -2285,22 +2279,19 @@ class ApacheConfigurator(common.Configurator):
else:
rewrite_rule_args = constants.REWRITE_HTTPS_ARGS
return ("<VirtualHost %s>\n"
"%s \n"
"%s \n"
"ServerSignature Off\n"
"\n"
"RewriteEngine On\n"
"RewriteRule %s\n"
"\n"
"ErrorLog %s/redirect.error.log\n"
"LogLevel warn\n"
"</VirtualHost>\n"
% (" ".join(str(addr) for
addr in self._get_proposed_addrs(ssl_vhost)),
servername, serveralias,
" ".join(rewrite_rule_args),
self.options.logs_root))
return (
f"<VirtualHost {' '.join(str(addr) for addr in self._get_proposed_addrs(ssl_vhost))}>\n"
f"{servername} \n"
f"{serveralias} \n"
f"ServerSignature Off\n"
f"\n"
f"RewriteEngine On\n"
f"RewriteRule {' '.join(rewrite_rule_args)}\n"
"\n"
f"ErrorLog {self.options.logs_root}/redirect.error.log\n"
f"LogLevel warn\n"
f"</VirtualHost>\n"
)
def _write_out_redirect(self, ssl_vhost, text):
# This is the default name
@ -2409,11 +2400,13 @@ class ApacheConfigurator(common.Configurator):
generic fashion.
"""
mod_message = ("Apache needs to have module \"{0}\" active for the " +
"requested installation options. Unfortunately Certbot is unable " +
"to install or enable it for you. Please install the module, and " +
"run Certbot again.")
raise errors.MisconfigurationError(mod_message.format(mod_name))
mod_message = (
f"Apache needs to have module \"{mod_name}\" active for the "
"requested installation options. Unfortunately Certbot is unable "
"to install or enable it for you. Please install the module, and "
"run Certbot again."
)
raise errors.MisconfigurationError(mod_message)
def restart(self):
"""Runs a config test and reloads the Apache server.
@ -2645,7 +2638,7 @@ class ApacheConfigurator(common.Configurator):
self.parser.add_dir(ssl_vhost.path, "Header", hsts_header)
note_msg = ("Adding gradually increasing HSTS header with initial value "
"of {0} to VirtualHost in {1}\n".format(
initial_maxage, ssl_vhost.filep))
initial_maxage, ssl_vhost.filep))
self.save_notes += note_msg
# Save the current state to pluginstorage

View file

@ -108,17 +108,17 @@ def _vhost_menu(domain, vhosts):
try:
code, tag = display_util.menu(
"We were unable to find a vhost with a ServerName "
"or Address of {0}.{1}Which virtual host would you "
"like to choose?".format(domain, os.linesep),
f"We were unable to find a vhost with a ServerName "
f"or Address of {domain}.{os.linesep}Which virtual host would you "
f"like to choose?",
choices, force_interactive=True)
except errors.MissingCommandlineFlag:
msg = (
"Encountered vhost ambiguity when trying to find a vhost for "
"{0} but was unable to ask for user "
"guidance in non-interactive mode. Certbot may need "
"vhosts to be explicitly labelled with ServerName or "
"ServerAlias directives.".format(domain))
f"Encountered vhost ambiguity when trying to find a vhost for "
f"{domain} but was unable to ask for user "
f"guidance in non-interactive mode. Certbot may need "
f"vhosts to be explicitly labelled with ServerName or "
f"ServerAlias directives.")
logger.error(msg)
raise errors.MissingCommandlineFlag(msg)

View file

@ -102,7 +102,7 @@ For this reason the internal representation of data should not ignore the case.
import abc
class ParserNode(object, metaclass=abc.ABCMeta):
class ParserNode(metaclass=abc.ABCMeta):
"""
ParserNode is the basic building block of the tree of such nodes,
representing the structure of the configuration. It is largely meant to keep
@ -239,9 +239,9 @@ class CommentNode(ParserNode, metaclass=abc.ABCMeta):
:type dirty: bool
"""
super().__init__(ancestor=kwargs['ancestor'],
dirty=kwargs.get('dirty', False),
filepath=kwargs['filepath'],
metadata=kwargs.get('metadata', {})) # pragma: no cover
dirty=kwargs.get('dirty', False),
filepath=kwargs['filepath'],
metadata=kwargs.get('metadata', {})) # pragma: no cover
class DirectiveNode(ParserNode, metaclass=abc.ABCMeta):
@ -303,9 +303,9 @@ class DirectiveNode(ParserNode, metaclass=abc.ABCMeta):
"""
super().__init__(ancestor=kwargs['ancestor'],
dirty=kwargs.get('dirty', False),
filepath=kwargs['filepath'],
metadata=kwargs.get('metadata', {})) # pragma: no cover
dirty=kwargs.get('dirty', False),
filepath=kwargs['filepath'],
metadata=kwargs.get('metadata', {})) # pragma: no cover
@abc.abstractmethod
def set_parameters(self, parameters):

View file

@ -21,7 +21,7 @@ class Addr(common.Addr):
return False
def __repr__(self):
return "certbot_apache._internal.obj.Addr(" + repr(self.tup) + ")"
return f"certbot_apache._internal.obj.Addr({repr(self.tup)})"
def __hash__(self): # pylint: disable=useless-super-delegation
# Python 3 requires explicit overridden for __hash__ if __eq__ or
@ -147,34 +147,24 @@ class VirtualHost:
def __str__(self):
return (
"File: {filename}\n"
"Vhost path: {vhpath}\n"
"Addresses: {addrs}\n"
"Name: {name}\n"
"Aliases: {aliases}\n"
"TLS Enabled: {tls}\n"
"Site Enabled: {active}\n"
"mod_macro Vhost: {modmacro}".format(
filename=self.filep,
vhpath=self.path,
addrs=", ".join(str(addr) for addr in self.addrs),
name=self.name if self.name is not None else "",
aliases=", ".join(name for name in self.aliases),
tls="Yes" if self.ssl else "No",
active="Yes" if self.enabled else "No",
modmacro="Yes" if self.modmacro else "No"))
f"File: {self.filep}\n"
f"Vhost path: {self.path}\n"
f"Addresses: {', '.join(str(addr) for addr in self.addrs)}\n"
f"Name: {self.name if self.name is not None else ''}\n"
f"Aliases: {', '.join(name for name in self.aliases)}\n"
f"TLS Enabled: {'Yes' if self.ssl else 'No'}\n"
f"Site Enabled: {'Yes' if self.enabled else 'No'}\n"
f"mod_macro Vhost: {'Yes' if self.modmacro else 'No'}"
)
def display_repr(self):
"""Return a representation of VHost to be used in dialog"""
return (
"File: {filename}\n"
"Addresses: {addrs}\n"
"Names: {names}\n"
"HTTPS: {https}\n".format(
filename=self.filep,
addrs=", ".join(str(addr) for addr in self.addrs),
names=", ".join(self.get_names()),
https="Yes" if self.ssl else "No"))
f"File: {self.filep}\n"
f"Addresses: {', '.join(str(addr) for addr in self.addrs)}\n"
f"Names: {', '.join(self.get_names())}\n"
f"HTTPS: {'Yes' if self.ssl else 'No'}\n"
)
def __eq__(self, other):
if isinstance(other, self.__class__):

View file

@ -512,7 +512,7 @@ def test_default_curve_type(context: IntegrationTestsContext) -> None:
# Curve name, Curve class, ACME servers to skip
('secp256r1', SECP256R1, []),
('secp384r1', SECP384R1, []),
('secp521r1', SECP521R1, ['boulder-v1', 'boulder-v2'])]
('secp521r1', SECP521R1, ['boulder-v2'])]
)
def test_ecdsa_curves(context: IntegrationTestsContext, curve: str, curve_cls: Type[EllipticCurve],
skip_servers: Iterable[str]) -> None:
@ -689,9 +689,6 @@ def test_revoke_multiple_lineages(context: IntegrationTestsContext) -> None:
def test_wildcard_certificates(context: IntegrationTestsContext) -> None:
"""Test wildcard certificate issuance."""
if context.acme_server == 'boulder-v1':
pytest.skip('Wildcard certificates are not supported on ACME v1')
certname = context.get_domain('wild')
context.certbot([

View file

@ -21,9 +21,9 @@ def pytest_addoption(parser):
:param parser: current pytest parser that will be used on the CLI
"""
parser.addoption('--acme-server', default='pebble',
choices=['boulder-v1', 'boulder-v2', 'pebble'],
help='select the ACME server to use (boulder-v1, boulder-v2, '
'pebble), defaulting to pebble')
choices=['boulder-v2', 'pebble'],
help='select the ACME server to use (boulder-v2, pebble), '
'defaulting to pebble')
parser.addoption('--dns-server', default='challtestsrv',
choices=['bind', 'challtestsrv'],
help='select the DNS server to use (bind, challtestsrv), '

View file

@ -47,7 +47,7 @@ class ACMEServer:
http_01_port: int = DEFAULT_HTTP_01_PORT) -> None:
"""
Create an ACMEServer instance.
:param str acme_server: the type of acme server used (boulder-v1, boulder-v2 or pebble)
:param str acme_server: the type of acme server used (boulder-v2 or pebble)
:param list nodes: list of node names that will be setup by pytest xdist
:param bool http_proxy: if False do not start the HTTP proxy
:param bool stdout: if True stream all subprocesses stdout to standard stdout
@ -130,8 +130,7 @@ class ACMEServer:
if acme_server == 'pebble':
acme_xdist['directory_url'] = PEBBLE_DIRECTORY_URL
else: # boulder
acme_xdist['directory_url'] = BOULDER_V2_DIRECTORY_URL \
if acme_server == 'boulder-v2' else BOULDER_V1_DIRECTORY_URL
acme_xdist['directory_url'] = BOULDER_V2_DIRECTORY_URL
acme_xdist['http_port'] = {
node: port for (node, port) in # pylint: disable=unnecessary-comprehension
@ -267,9 +266,9 @@ def main() -> None:
parser = argparse.ArgumentParser(
description='CLI tool to start a local instance of Pebble or Boulder CA server.')
parser.add_argument('--server-type', '-s',
choices=['pebble', 'boulder-v1', 'boulder-v2'], default='pebble',
help='type of CA server to start: can be Pebble or Boulder '
'(in ACMEv1 or ACMEv2 mode), Pebble is used if not set.')
choices=['pebble', 'boulder-v2'], default='pebble',
help='type of CA server to start: can be Pebble or Boulder. '
'Pebble is used if not set.')
parser.add_argument('--dns-server', '-d',
help='specify the DNS server as `IP:PORT` to use by '
'Pebble; if not specified, a local mock DNS server will be used to '

View file

@ -2,7 +2,6 @@
DEFAULT_HTTP_01_PORT = 5002
TLS_ALPN_01_PORT = 5001
CHALLTESTSRV_PORT = 8055
BOULDER_V1_DIRECTORY_URL = 'http://localhost:4000/directory'
BOULDER_V2_DIRECTORY_URL = 'http://localhost:4001/directory'
PEBBLE_DIRECTORY_URL = 'https://localhost:14000/dir'
PEBBLE_MANAGEMENT_URL = 'https://localhost:15000'

View file

@ -1,21 +1,26 @@
"""Provides a common base for Apache proxies"""
import argparse
import os
import shutil
import subprocess
from typing import Set
from typing import Tuple
from unittest import mock
from certbot import errors as le_errors, configuration
from certbot import util as certbot_util
from certbot_apache._internal import entrypoint
from certbot_compatibility_test import errors
from certbot_compatibility_test import util
from certbot_compatibility_test.configurators import common as configurators_common
from certbot import configuration
from certbot import errors as le_errors
from certbot import util as certbot_util
class Proxy(configurators_common.Proxy):
"""A common base for Apache test configurators"""
def __init__(self, args):
def __init__(self, args: argparse.Namespace) -> None:
"""Initializes the plugin with the given command line args"""
super().__init__(args)
self.le_config.apache_le_vhost_ext = "-le-ssl.conf"
@ -27,7 +32,7 @@ class Proxy(configurators_common.Proxy):
mock_display.side_effect = le_errors.PluginError(
"Unable to determine vhost")
def load_config(self):
def load_config(self) -> str:
"""Loads the next configuration for the plugin to test"""
config = super().load_config()
self._all_names, self._test_names = _get_names(config)
@ -47,7 +52,7 @@ class Proxy(configurators_common.Proxy):
return config
def _prepare_configurator(self):
def _prepare_configurator(self) -> None:
"""Prepares the Apache plugin for testing"""
for k in entrypoint.ENTRYPOINT.OS_DEFAULTS.__dict__.keys():
setattr(self.le_config, "apache_" + k,
@ -58,13 +63,13 @@ class Proxy(configurators_common.Proxy):
name="apache")
self._configurator.prepare()
def cleanup_from_tests(self):
def cleanup_from_tests(self) -> None:
"""Performs any necessary cleanup from running plugin tests"""
super().cleanup_from_tests()
mock.patch.stopall()
def _get_server_root(config):
def _get_server_root(config: str) -> str:
"""Returns the server root directory in config"""
subdirs = [
name for name in os.listdir(config)
@ -76,7 +81,7 @@ def _get_server_root(config):
return os.path.join(config, subdirs[0].rstrip())
def _get_names(config):
def _get_names(config: str) -> Tuple[Set[str], Set[str]]:
"""Returns all and testable domain names in config"""
all_names = set()
non_ip_names = set()

View file

@ -1,15 +1,28 @@
"""Provides a common base for configurator proxies"""
from abc import abstractmethod
import argparse
import logging
import os
import shutil
import tempfile
from typing import Iterable
from typing import List
from typing import Optional
from typing import overload
from typing import Set
from typing import Tuple
from typing import Type
from typing import Union
from certbot._internal import constants
from certbot_compatibility_test import interfaces
from certbot_compatibility_test import errors
from certbot_compatibility_test import interfaces
from certbot_compatibility_test import util
from acme import challenges
from acme.challenges import Challenge
from certbot._internal import constants
from certbot.achallenges import AnnotatedChallenge
logger = logging.getLogger(__name__)
@ -17,10 +30,10 @@ class Proxy(interfaces.ConfiguratorProxy):
"""A common base for compatibility test configurators"""
@classmethod
def add_parser_arguments(cls, parser):
def add_parser_arguments(cls, parser: argparse.ArgumentParser) -> None:
"""Adds command line arguments needed by the plugin"""
def __init__(self, args):
def __init__(self, args: argparse.Namespace) -> None:
"""Initializes the plugin with the given command line args"""
super().__init__(args)
self._temp_dir = tempfile.mkdtemp()
@ -37,25 +50,34 @@ class Proxy(interfaces.ConfiguratorProxy):
self.http_port = 80
self.https_port = 443
self._configurator: interfaces.Configurator
self._all_names = None
self._test_names = None
self._all_names: Optional[Set[str]] = None
self._test_names: Optional[Set[str]] = None
def has_more_configs(self):
def has_more_configs(self) -> bool:
"""Returns true if there are more configs to test"""
return bool(self._configs)
@abstractmethod
def cleanup_from_tests(self):
def cleanup_from_tests(self) -> None:
"""Performs any necessary cleanup from running plugin tests"""
def load_config(self):
def load_config(self) -> str:
"""Returns the next config directory to be tested"""
shutil.rmtree(self.le_config.work_dir, ignore_errors=True)
backup = os.path.join(self.le_config.work_dir, constants.BACKUP_DIR)
os.makedirs(backup)
return self._configs.pop()
def copy_certs_and_keys(self, cert_path, key_path, chain_path=None):
@overload
def copy_certs_and_keys(self, cert_path: str, key_path: str,
chain_path: str) -> Tuple[str, str, str]: ...
@overload
def copy_certs_and_keys(self, cert_path: str, key_path: str,
chain_path: Optional[str]) -> Tuple[str, str, Optional[str]]: ...
def copy_certs_and_keys(self, cert_path: str, key_path: str,
chain_path: Optional[str] = None) -> Tuple[str, str, Optional[str]]:
"""Copies certs and keys into the temporary directory"""
cert_and_key_dir = os.path.join(self._temp_dir, "certs_and_keys")
if not os.path.isdir(cert_and_key_dir):
@ -72,68 +94,67 @@ class Proxy(interfaces.ConfiguratorProxy):
return cert, key, chain
def get_all_names_answer(self):
def get_all_names_answer(self) -> Set[str]:
"""Returns the set of domain names that the plugin should find"""
if self._all_names:
return self._all_names
raise errors.Error("No configuration file loaded")
def get_testable_domain_names(self):
def get_testable_domain_names(self) -> Set[str]:
"""Returns the set of domain names that can be tested against"""
if self._test_names:
return self._test_names
return {"example.com"}
def deploy_cert(self, domain, cert_path, key_path, chain_path=None,
fullchain_path=None):
def deploy_cert(self, domain: str, cert_path: str, key_path: str, chain_path: str,
fullchain_path: str) -> None:
"""Installs cert"""
cert_path, key_path, chain_path = self.copy_certs_and_keys(
cert_path, key_path, chain_path)
cert_path, key_path, chain_path = self.copy_certs_and_keys(cert_path, key_path, chain_path)
if not self._configurator:
raise ValueError("Configurator plugin is not set.")
self._configurator.deploy_cert(
domain, cert_path, key_path, chain_path, fullchain_path)
def cleanup(self, achalls):
def cleanup(self, achalls: List[AnnotatedChallenge]) -> None:
self._configurator.cleanup(achalls)
def config_test(self):
def config_test(self) -> None:
self._configurator.config_test()
def enhance(self, domain, enhancement, options = None):
def enhance(self, domain: str, enhancement: str,
options: Optional[Union[List[str], str]] = None) -> None:
self._configurator.enhance(domain, enhancement, options)
def get_all_names(self):
def get_all_names(self) -> Iterable[str]:
return self._configurator.get_all_names()
def get_chall_pref(self, domain):
def get_chall_pref(self, domain: str) -> Iterable[Type[Challenge]]:
return self._configurator.get_chall_pref(domain)
@classmethod
def inject_parser_options(cls, parser, name):
def inject_parser_options(cls, parser: argparse.ArgumentParser, name: str) -> None:
pass
def more_info(self):
def more_info(self) -> str:
return self._configurator.more_info()
def perform(self, achalls):
def perform(self, achalls: List[AnnotatedChallenge]) -> List[challenges.ChallengeResponse]:
return self._configurator.perform(achalls)
def prepare(self):
def prepare(self) -> None:
self._configurator.prepare()
def recovery_routine(self):
def recovery_routine(self) -> None:
self._configurator.recovery_routine()
def restart(self):
def restart(self) -> None:
self._configurator.restart()
def rollback_checkpoints(self, rollback = 1):
def rollback_checkpoints(self, rollback: int = 1) -> None:
self._configurator.rollback_checkpoints(rollback)
def save(self, title = None, temporary = False):
def save(self, title: Optional[str] = None, temporary: bool = False) -> None:
self._configurator.save(title, temporary)
def supported_enhancements(self):
def supported_enhancements(self) -> List[str]:
return self._configurator.supported_enhancements()

View file

@ -4,8 +4,8 @@ import shutil
import subprocess
from typing import cast
from typing import Set
from typing import Tuple
from certbot import configuration
from certbot_compatibility_test import errors
from certbot_compatibility_test import interfaces
from certbot_compatibility_test import util
@ -13,11 +13,13 @@ from certbot_compatibility_test.configurators import common as configurators_com
from certbot_nginx._internal import configurator
from certbot_nginx._internal import constants
from certbot import configuration
class Proxy(configurators_common.Proxy):
"""A common base for Nginx test configurators"""
def load_config(self):
def load_config(self) -> str:
"""Loads the next configuration for the plugin to test"""
config = super().load_config()
self._all_names, self._test_names = _get_names(config)
@ -40,7 +42,7 @@ class Proxy(configurators_common.Proxy):
return config
def _prepare_configurator(self):
def _prepare_configurator(self) -> None:
"""Prepares the Nginx plugin for testing"""
for k in constants.CLI_DEFAULTS:
setattr(self.le_config, "nginx_" + k, constants.os_constant(k))
@ -50,11 +52,11 @@ class Proxy(configurators_common.Proxy):
config=conf, name="nginx"))
self._configurator.prepare()
def cleanup_from_tests(self):
def cleanup_from_tests(self) -> None:
"""Performs any necessary cleanup from running plugin tests"""
def _get_server_root(config):
def _get_server_root(config: str) -> str:
"""Returns the server root directory in config"""
subdirs = [
name for name in os.listdir(config)
@ -66,7 +68,7 @@ def _get_server_root(config):
return os.path.join(config, subdirs[0].rstrip())
def _get_names(config):
def _get_names(config: str) -> Tuple[Set[str], Set[str]]:
"""Returns all and testable domain names in config"""
all_names: Set[str] = set()
for root, _dirs, files in os.walk(config):
@ -77,7 +79,7 @@ def _get_names(config):
return all_names, non_ip_names
def _get_server_names(root, filename):
def _get_server_names(root: str, filename: str) -> Set[str]:
"""Returns all names in a config file path"""
all_names = set()
with open(os.path.join(root, filename)) as f:

View file

@ -1,8 +1,12 @@
"""Certbot compatibility test interfaces"""
from abc import ABCMeta
from abc import abstractmethod
import argparse
from typing import cast
from typing import Set
from certbot import interfaces
from certbot.configuration import NamespaceConfig
class PluginProxy(interfaces.Plugin, metaclass=ABCMeta):
@ -16,16 +20,16 @@ class PluginProxy(interfaces.Plugin, metaclass=ABCMeta):
@classmethod
@abstractmethod
def add_parser_arguments(cls, parser):
def add_parser_arguments(cls, parser: argparse.ArgumentParser) -> None:
"""Adds command line arguments needed by the parser"""
@abstractmethod
def __init__(self, args):
def __init__(self, args: argparse.Namespace) -> None:
"""Initializes the plugin with the given command line args"""
super().__init__(args, 'proxy')
super().__init__(cast(NamespaceConfig, args), 'proxy')
@abstractmethod
def cleanup_from_tests(self):
def cleanup_from_tests(self) -> None:
"""Performs any necessary cleanup from running plugin tests.
This is guaranteed to be called before the program exits.
@ -33,15 +37,15 @@ class PluginProxy(interfaces.Plugin, metaclass=ABCMeta):
"""
@abstractmethod
def has_more_configs(self):
def has_more_configs(self) -> bool:
"""Returns True if there are more configs to test"""
@abstractmethod
def load_config(self):
def load_config(self) -> str:
"""Loads the next config and returns its name"""
@abstractmethod
def get_testable_domain_names(self):
def get_testable_domain_names(self) -> Set[str]:
"""Returns the domain names that can be used in testing"""
@ -53,7 +57,7 @@ class InstallerProxy(PluginProxy, interfaces.Installer, metaclass=ABCMeta):
"""Wraps a Certbot installer"""
@abstractmethod
def get_all_names_answer(self):
def get_all_names_answer(self) -> Set[str]:
"""Returns all names that should be found by the installer"""

View file

@ -5,13 +5,26 @@ import filecmp
import logging
import os
import shutil
import socket
import sys
import tempfile
import time
from typing import Any
from typing import Dict
from typing import Generator
from typing import Iterable
from typing import List
from typing import Optional
from typing import Tuple
from typing import Type
import OpenSSL
from certbot_compatibility_test import errors
from certbot_compatibility_test import util
from certbot_compatibility_test import validator
from certbot_compatibility_test.configurators import common
from certbot_compatibility_test.configurators.apache import common as a_common
from certbot_compatibility_test.configurators.nginx import common as n_common
from OpenSSL import crypto
from urllib3.util import connection
from acme import challenges
@ -19,14 +32,9 @@ from acme import crypto_util
from acme import messages
from certbot import achallenges
from certbot import errors as le_errors
from certbot.display import util as display_util
from certbot._internal.display import obj as display_obj
from certbot.display import util as display_util
from certbot.tests import acme_util
from certbot_compatibility_test import errors
from certbot_compatibility_test import util
from certbot_compatibility_test import validator
from certbot_compatibility_test.configurators.apache import common as a_common
from certbot_compatibility_test.configurators.nginx import common as n_common
DESCRIPTION = """
Tests Certbot plugins against different server configurations. It is
@ -35,13 +43,13 @@ tests that the plugin supports are performed.
"""
PLUGINS = {"apache": a_common.Proxy, "nginx": n_common.Proxy}
PLUGINS: Dict[str, Type[common.Proxy]] = {"apache": a_common.Proxy, "nginx": n_common.Proxy}
logger = logging.getLogger(__name__)
def test_authenticator(plugin, config, temp_dir):
def test_authenticator(plugin: common.Proxy, config: str, temp_dir: str) -> bool:
"""Tests authenticator, returning True if the tests are successful"""
backup = _create_backup(config, temp_dir)
@ -96,7 +104,7 @@ def test_authenticator(plugin, config, temp_dir):
return success
def _create_achalls(plugin):
def _create_achalls(plugin: common.Proxy) -> List[achallenges.AnnotatedChallenge]:
"""Returns a list of annotated challenges to test on plugin"""
achalls = []
names = plugin.get_testable_domain_names()
@ -117,7 +125,8 @@ def _create_achalls(plugin):
return achalls
def test_installer(args, plugin, config, temp_dir):
def test_installer(args: argparse.Namespace, plugin: common.Proxy, config: str,
temp_dir: str) -> bool:
"""Tests plugin as an installer"""
backup = _create_backup(config, temp_dir)
@ -137,13 +146,12 @@ def test_installer(args, plugin, config, temp_dir):
return names_match and success and good_rollback
def test_deploy_cert(plugin, temp_dir, domains):
def test_deploy_cert(plugin: common.Proxy, temp_dir: str, domains: List[str]) -> bool:
"""Tests deploy_cert returning True if the tests are successful"""
cert = crypto_util.gen_ss_cert(util.KEY, domains)
cert_path = os.path.join(temp_dir, "cert.pem")
with open(cert_path, "wb") as f:
f.write(OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, cert))
f.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
for domain in domains:
try:
@ -171,7 +179,7 @@ def test_deploy_cert(plugin, temp_dir, domains):
return success
def test_enhancements(plugin, domains):
def test_enhancements(plugin: common.Proxy, domains: Iterable[str]) -> bool:
"""Tests supported enhancements returning True if successful"""
supported = plugin.supported_enhancements()
@ -216,7 +224,7 @@ def test_enhancements(plugin, domains):
return success
def _save_and_restart(plugin, title=None):
def _save_and_restart(plugin: common.Proxy, title: Optional[str] = None) -> bool:
"""Saves and restart the plugin, returning True if no errors occurred"""
try:
plugin.save(title)
@ -227,7 +235,7 @@ def _save_and_restart(plugin, title=None):
return False
def test_rollback(plugin, config, backup):
def test_rollback(plugin: common.Proxy, config: str, backup: str) -> bool:
"""Tests the rollback checkpoints function"""
try:
plugin.rollback_checkpoints(1337)
@ -242,7 +250,7 @@ def test_rollback(plugin, config, backup):
return True
def _create_backup(config, temp_dir):
def _create_backup(config: str, temp_dir: str) -> str:
"""Creates a backup of config in temp_dir"""
backup = os.path.join(temp_dir, "backup")
shutil.rmtree(backup, ignore_errors=True)
@ -251,7 +259,7 @@ def _create_backup(config, temp_dir):
return backup
def _dirs_are_unequal(dir1, dir2):
def _dirs_are_unequal(dir1: str, dir2: str) -> bool:
"""Returns True if dir1 and dir2 are unequal"""
dircmps = [filecmp.dircmp(dir1, dir2)]
while dircmps:
@ -283,7 +291,7 @@ def _dirs_are_unequal(dir1, dir2):
return False
def get_args():
def get_args() -> argparse.Namespace:
"""Returns parsed command line arguments."""
parser = argparse.ArgumentParser(
description=DESCRIPTION,
@ -320,7 +328,7 @@ def get_args():
return args
def setup_logging(args):
def setup_logging(args: argparse.Namespace) -> None:
"""Prepares logging for the program"""
handler = logging.StreamHandler()
@ -329,13 +337,13 @@ def setup_logging(args):
root_logger.addHandler(handler)
def setup_display():
def setup_display() -> None:
""""Prepares a display utility instance for the Certbot plugins """
displayer = display_util.NoninteractiveDisplay(sys.stdout)
display_obj.set_display(displayer)
def main():
def main() -> None:
"""Main test script execution."""
args = get_args()
setup_logging(args)
@ -379,11 +387,12 @@ def main():
@contextlib.contextmanager
def _fake_dns_resolution(resolved_ip):
def _fake_dns_resolution(resolved_ip: str) -> Generator[None, None, None]:
"""Monkey patch urllib3 to make any hostname be resolved to the provided IP"""
_original_create_connection = connection.create_connection
def _patched_create_connection(address, *args, **kwargs):
def _patched_create_connection(address: Tuple[str, str],
*args: Any, **kwargs: Any) -> socket.socket:
_, port = address
return _original_create_connection((resolved_ip, port), *args, **kwargs)

View file

@ -6,11 +6,11 @@ import re
import shutil
import tarfile
from certbot_compatibility_test import errors
import josepy as jose
from certbot._internal import constants
from certbot.tests import util as test_util
from certbot_compatibility_test import errors
_KEY_BASE = "rsa2048_key.pem"
KEY_PATH = test_util.vector_path(_KEY_BASE)
@ -19,7 +19,7 @@ JWK = jose.JWKRSA(key=test_util.load_rsa_private_key(_KEY_BASE))
IP_REGEX = re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$")
def create_le_config(parent_dir):
def create_le_config(parent_dir: str) -> argparse.Namespace:
"""Sets up LE dirs in parent_dir and returns the config dict"""
config = copy.deepcopy(constants.CLI_DEFAULTS)
@ -36,7 +36,7 @@ def create_le_config(parent_dir):
return argparse.Namespace(**config)
def extract_configs(configs, parent_dir):
def extract_configs(configs: str, parent_dir: str) -> str:
"""Extracts configs to a new dir under parent_dir and returns it"""
config_dir = os.path.join(parent_dir, "configs")

View file

@ -2,7 +2,11 @@
import logging
import socket
from typing import cast
from typing import Mapping
from typing import Optional
from typing import Union
from OpenSSL import crypto
import requests
from acme import crypto_util
@ -14,10 +18,12 @@ logger = logging.getLogger(__name__)
class Validator:
"""Collection of functions to test a live webserver's configuration"""
def certificate(self, cert, name, alt_host=None, port=443):
def certificate(self, cert: crypto.X509, name: Union[str, bytes],
alt_host: Optional[str] = None, port: int = 443) -> bool:
"""Verifies the certificate presented at name is cert"""
if alt_host is None:
host = socket.gethostbyname(name).encode()
# In fact, socket.gethostbyname accepts both bytes and str, but types do not know that.
host = socket.gethostbyname(cast(str, name)).encode()
elif isinstance(alt_host, bytes):
host = alt_host
else:
@ -31,9 +37,10 @@ class Validator:
return False
# Despite documentation saying that bytes are expected for digest(), we must provide a str.
return presented_cert.digest(cast(bytes, "sha256")) == cert.digest("sha256")
return presented_cert.digest(cast(bytes, "sha256")) == cert.digest(cast(bytes, "sha256"))
def redirect(self, name, port=80, headers=None):
def redirect(self, name: str, port: int = 80,
headers: Optional[Mapping[str, str]] = None) -> bool:
"""Test whether webserver redirects to secure connection."""
url = "http://{0}:{1}".format(name, port)
if headers:
@ -54,7 +61,8 @@ class Validator:
return True
def any_redirect(self, name, port=80, headers=None):
def any_redirect(self, name: str, port: int = 80,
headers: Optional[Mapping[str, str]] = None) -> bool:
"""Test whether webserver redirects."""
url = "http://{0}:{1}".format(name, port)
if headers:
@ -64,7 +72,7 @@ class Validator:
return response.status_code in range(300, 309)
def hsts(self, name):
def hsts(self, name: str) -> bool:
"""Test for HTTP Strict Transport Security header"""
headers = requests.get("https://" + name).headers
hsts_header = headers.get("strict-transport-security")
@ -93,6 +101,6 @@ class Validator:
return True
def ocsp_stapling(self, name):
def ocsp_stapling(self, name: str) -> None:
"""Verify ocsp stapling for domain."""
raise NotImplementedError()

View file

@ -1,122 +1,126 @@
"""Tests for certbot_compatibility_test.validator."""
from typing import cast
from typing import Mapping
from typing import Optional
import unittest
from unittest import mock
import OpenSSL
from certbot_compatibility_test import validator
from OpenSSL import crypto
import requests
from acme import errors as acme_errors
from certbot_compatibility_test import validator
class ValidatorTest(unittest.TestCase):
def setUp(self):
def setUp(self) -> None:
self.validator = validator.Validator()
@mock.patch(
"certbot_compatibility_test.validator.crypto_util.probe_sni")
def test_certificate_success(self, mock_probe_sni):
cert = OpenSSL.crypto.X509()
def test_certificate_success(self, mock_probe_sni: mock.MagicMock) -> None:
cert = crypto.X509()
mock_probe_sni.return_value = cert
self.assertTrue(self.validator.certificate(
cert, "test.com", "127.0.0.1"))
@mock.patch(
"certbot_compatibility_test.validator.crypto_util.probe_sni")
def test_certificate_error(self, mock_probe_sni):
cert = OpenSSL.crypto.X509()
def test_certificate_error(self, mock_probe_sni: mock.MagicMock) -> None:
cert = crypto.X509()
mock_probe_sni.side_effect = [acme_errors.Error]
self.assertFalse(self.validator.certificate(
cert, "test.com", "127.0.0.1"))
@mock.patch(
"certbot_compatibility_test.validator.crypto_util.probe_sni")
def test_certificate_failure(self, mock_probe_sni):
cert = OpenSSL.crypto.X509()
def test_certificate_failure(self, mock_probe_sni: mock.MagicMock) -> None:
cert = crypto.X509()
cert.set_serial_number(1337)
mock_probe_sni.return_value = OpenSSL.crypto.X509()
mock_probe_sni.return_value = crypto.X509()
self.assertFalse(self.validator.certificate(
cert, "test.com", "127.0.0.1"))
@mock.patch("certbot_compatibility_test.validator.requests.get")
def test_successful_redirect(self, mock_get_request):
def test_successful_redirect(self, mock_get_request: mock.MagicMock) -> None:
mock_get_request.return_value = create_response(
301, {"location": "https://test.com"})
self.assertTrue(self.validator.redirect("test.com"))
@mock.patch("certbot_compatibility_test.validator.requests.get")
def test_redirect_with_headers(self, mock_get_request):
def test_redirect_with_headers(self, mock_get_request: mock.MagicMock) -> None:
mock_get_request.return_value = create_response(
301, {"location": "https://test.com"})
self.assertTrue(self.validator.redirect(
"test.com", headers={"Host": "test.com"}))
@mock.patch("certbot_compatibility_test.validator.requests.get")
def test_redirect_missing_location(self, mock_get_request):
def test_redirect_missing_location(self, mock_get_request: mock.MagicMock) -> None:
mock_get_request.return_value = create_response(301)
self.assertFalse(self.validator.redirect("test.com"))
@mock.patch("certbot_compatibility_test.validator.requests.get")
def test_redirect_wrong_status_code(self, mock_get_request):
def test_redirect_wrong_status_code(self, mock_get_request: mock.MagicMock) -> None:
mock_get_request.return_value = create_response(
201, {"location": "https://test.com"})
self.assertFalse(self.validator.redirect("test.com"))
@mock.patch("certbot_compatibility_test.validator.requests.get")
def test_redirect_wrong_redirect_code(self, mock_get_request):
def test_redirect_wrong_redirect_code(self, mock_get_request: mock.MagicMock) -> None:
mock_get_request.return_value = create_response(
303, {"location": "https://test.com"})
self.assertFalse(self.validator.redirect("test.com"))
@mock.patch("certbot_compatibility_test.validator.requests.get")
def test_hsts_empty(self, mock_get_request):
def test_hsts_empty(self, mock_get_request: mock.MagicMock) -> None:
mock_get_request.return_value = create_response(
headers={"strict-transport-security": ""})
self.assertFalse(self.validator.hsts("test.com"))
@mock.patch("certbot_compatibility_test.validator.requests.get")
def test_hsts_malformed(self, mock_get_request):
def test_hsts_malformed(self, mock_get_request: mock.MagicMock) -> None:
mock_get_request.return_value = create_response(
headers={"strict-transport-security": "sdfal"})
self.assertFalse(self.validator.hsts("test.com"))
@mock.patch("certbot_compatibility_test.validator.requests.get")
def test_hsts_bad_max_age(self, mock_get_request):
def test_hsts_bad_max_age(self, mock_get_request: mock.MagicMock) -> None:
mock_get_request.return_value = create_response(
headers={"strict-transport-security": "max-age=not-an-int"})
self.assertFalse(self.validator.hsts("test.com"))
@mock.patch("certbot_compatibility_test.validator.requests.get")
def test_hsts_expire(self, mock_get_request):
def test_hsts_expire(self, mock_get_request: mock.MagicMock) -> None:
mock_get_request.return_value = create_response(
headers={"strict-transport-security": "max-age=3600"})
self.assertFalse(self.validator.hsts("test.com"))
@mock.patch("certbot_compatibility_test.validator.requests.get")
def test_hsts(self, mock_get_request):
def test_hsts(self, mock_get_request: mock.MagicMock) -> None:
mock_get_request.return_value = create_response(
headers={"strict-transport-security": "max-age=31536000"})
self.assertTrue(self.validator.hsts("test.com"))
@mock.patch("certbot_compatibility_test.validator.requests.get")
def test_hsts_include_subdomains(self, mock_get_request):
def test_hsts_include_subdomains(self, mock_get_request: mock.MagicMock) -> None:
mock_get_request.return_value = create_response(
headers={"strict-transport-security":
"max-age=31536000;includeSubDomains"})
self.assertTrue(self.validator.hsts("test.com"))
def test_ocsp_stapling(self):
def test_ocsp_stapling(self) -> None:
self.assertRaises(
NotImplementedError, self.validator.ocsp_stapling, "test.com")
def create_response(status_code=200, headers=None):
def create_response(status_code: int = 200,
headers: Optional[Mapping[str, str]] = None) -> requests.Response:
"""Creates a requests.Response object for testing"""
response = requests.Response()
response.status_code = status_code
if headers:
response.headers = headers
response.headers = cast(requests.models.CaseInsensitiveDict, headers)
return response

View file

@ -1,6 +1,7 @@
"""DNS Authenticator for Cloudflare."""
import logging
from typing import Any
from typing import Callable
from typing import Dict
from typing import List
from typing import Optional
@ -26,20 +27,21 @@ class Authenticator(dns_common.DNSAuthenticator):
'DNS).')
ttl = 120
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.credentials: Optional[CredentialsConfiguration] = None
@classmethod
def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
super().add_parser_arguments(add)
def add_parser_arguments(cls, add: Callable[..., None],
default_propagation_seconds: int = 10) -> None:
super().add_parser_arguments(add, default_propagation_seconds)
add('credentials', help='Cloudflare credentials INI file.')
def more_info(self): # pylint: disable=missing-function-docstring
def more_info(self) -> str:
return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \
'the Cloudflare API.'
def _validate_credentials(self, credentials):
def _validate_credentials(self, credentials: CredentialsConfiguration) -> None:
token = credentials.conf('api-token')
email = credentials.conf('email')
key = credentials.conf('api-key')
@ -62,7 +64,7 @@ class Authenticator(dns_common.DNSAuthenticator):
'dns_cloudflare_email and dns_cloudflare_api_key are required.'
' (see {})'.format(credentials.confobj.filename, ACCOUNT_URL))
def _setup_credentials(self):
def _setup_credentials(self) -> None:
self.credentials = self._configure_credentials(
'credentials',
'Cloudflare credentials INI file',
@ -70,13 +72,13 @@ class Authenticator(dns_common.DNSAuthenticator):
self._validate_credentials
)
def _perform(self, domain, validation_name, validation):
def _perform(self, domain: str, validation_name: str, validation: str) -> None:
self._get_cloudflare_client().add_txt_record(domain, validation_name, validation, self.ttl)
def _cleanup(self, domain, validation_name, validation):
def _cleanup(self, domain: str, validation_name: str, validation: str) -> None:
self._get_cloudflare_client().del_txt_record(domain, validation_name, validation)
def _get_cloudflare_client(self):
def _get_cloudflare_client(self) -> "_CloudflareClient":
if not self.credentials: # pragma: no cover
raise errors.Error("Plugin has not been prepared.")
if self.credentials.conf('api-token'):
@ -89,10 +91,11 @@ class _CloudflareClient:
Encapsulates all communication with the Cloudflare API.
"""
def __init__(self, email, api_key):
def __init__(self, email: Optional[str], api_key: str) -> None:
self.cf = CloudFlare.CloudFlare(email, api_key)
def add_txt_record(self, domain, record_name, record_content, record_ttl):
def add_txt_record(self, domain: str, record_name: str, record_content: str,
record_ttl: int) -> None:
"""
Add a TXT record using the supplied information.
@ -127,7 +130,7 @@ class _CloudflareClient:
record_id = self._find_txt_record_id(zone_id, record_name, record_content)
logger.debug('Successfully added TXT record with record_id: %s', record_id)
def del_txt_record(self, domain, record_name, record_content):
def del_txt_record(self, domain: str, record_name: str, record_content: str) -> None:
"""
Delete a TXT record using the supplied information.
@ -161,7 +164,7 @@ class _CloudflareClient:
else:
logger.debug('Zone not found; no cleanup needed.')
def _find_zone_id(self, domain):
def _find_zone_id(self, domain: str) -> str:
"""
Find the zone_id for a given domain.
@ -226,7 +229,8 @@ class _CloudflareClient:
'supplied Cloudflare account.'
.format(domain, zone_name_guesses))
def _find_txt_record_id(self, zone_id, record_name, record_content):
def _find_txt_record_id(self, zone_id: str, record_name: str,
record_content: str) -> Optional[str]:
"""
Find the record_id for a TXT record with the given name and content.

View file

@ -1,8 +1,11 @@
"""DNS Authenticator for CloudXNS DNS."""
import logging
from typing import Any
from typing import Callable
from typing import Optional
from lexicon.providers import cloudxns
from requests import HTTPError
from certbot import errors
from certbot.plugins import dns_common
@ -23,20 +26,21 @@ class Authenticator(dns_common.DNSAuthenticator):
description = 'Obtain certificates using a DNS TXT record (if you are using CloudXNS for DNS).'
ttl = 60
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.credentials: Optional[CredentialsConfiguration] = None
@classmethod
def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
super().add_parser_arguments(add, default_propagation_seconds=30)
def add_parser_arguments(cls, add: Callable[..., None],
default_propagation_seconds: int = 30) -> None:
super().add_parser_arguments(add, default_propagation_seconds)
add('credentials', help='CloudXNS credentials INI file.')
def more_info(self): # pylint: disable=missing-function-docstring
def more_info(self) -> str:
return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \
'the CloudXNS API.'
def _setup_credentials(self):
def _setup_credentials(self) -> None:
self.credentials = self._configure_credentials(
'credentials',
'CloudXNS credentials INI file',
@ -47,13 +51,13 @@ class Authenticator(dns_common.DNSAuthenticator):
}
)
def _perform(self, domain, validation_name, validation):
def _perform(self, domain: str, validation_name: str, validation: str) -> None:
self._get_cloudxns_client().add_txt_record(domain, validation_name, validation)
def _cleanup(self, domain, validation_name, validation):
def _cleanup(self, domain: str, validation_name: str, validation: str) -> None:
self._get_cloudxns_client().del_txt_record(domain, validation_name, validation)
def _get_cloudxns_client(self):
def _get_cloudxns_client(self) -> "_CloudXNSLexiconClient":
if not self.credentials: # pragma: no cover
raise errors.Error("Plugin has not been prepared.")
return _CloudXNSLexiconClient(self.credentials.conf('api-key'),
@ -66,7 +70,7 @@ class _CloudXNSLexiconClient(dns_common_lexicon.LexiconClient):
Encapsulates all communication with the CloudXNS via Lexicon.
"""
def __init__(self, api_key, secret_key, ttl):
def __init__(self, api_key: str, secret_key: str, ttl: int) -> None:
super().__init__()
config = dns_common_lexicon.build_lexicon_config('cloudxns', {
@ -78,10 +82,12 @@ class _CloudXNSLexiconClient(dns_common_lexicon.LexiconClient):
self.provider = cloudxns.Provider(config)
def _handle_http_error(self, e, domain_name):
def _handle_http_error(self, e: HTTPError, domain_name: str) -> Optional[errors.PluginError]:
hint = None
if str(e).startswith('400 Client Error:'):
hint = 'Are your API key and Secret key values correct?'
return errors.PluginError('Error determining zone identifier for {0}: {1}.{2}'
.format(domain_name, e, ' ({0})'.format(hint) if hint else ''))
hint_disp = f' ({hint})' if hint else ''
return errors.PluginError(f'Error determining zone identifier for {domain_name}: '
f'{e}.{hint_disp}')

View file

@ -1,5 +1,7 @@
"""DNS Authenticator for DigitalOcean."""
import logging
from typing import Any
from typing import Callable
from typing import Optional
import digitalocean
@ -21,20 +23,21 @@ class Authenticator(dns_common.DNSAuthenticator):
'using DigitalOcean for DNS).'
ttl = 30
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.credentials: Optional[CredentialsConfiguration] = None
@classmethod
def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
super().add_parser_arguments(add)
def add_parser_arguments(cls, add: Callable[..., None],
default_propagation_seconds: int = 10) -> None:
super().add_parser_arguments(add, default_propagation_seconds)
add('credentials', help='DigitalOcean credentials INI file.')
def more_info(self): # pylint: disable=missing-function-docstring
def more_info(self) -> str:
return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \
'the DigitalOcean API.'
def _setup_credentials(self):
def _setup_credentials(self) -> None:
self.credentials = self._configure_credentials(
'credentials',
'DigitalOcean credentials INI file',
@ -43,14 +46,14 @@ class Authenticator(dns_common.DNSAuthenticator):
}
)
def _perform(self, domain, validation_name, validation):
def _perform(self, domain: str, validation_name: str, validation: str) -> None:
self._get_digitalocean_client().add_txt_record(domain, validation_name, validation,
self.ttl)
def _cleanup(self, domain, validation_name, validation):
def _cleanup(self, domain: str, validation_name: str, validation: str) -> None:
self._get_digitalocean_client().del_txt_record(domain, validation_name, validation)
def _get_digitalocean_client(self):
def _get_digitalocean_client(self) -> "_DigitalOceanClient":
if not self.credentials: # pragma: no cover
raise errors.Error("Plugin has not been prepared.")
return _DigitalOceanClient(self.credentials.conf('token'))
@ -61,11 +64,11 @@ class _DigitalOceanClient:
Encapsulates all communication with the DigitalOcean API.
"""
def __init__(self, token):
def __init__(self, token: str) -> None:
self.manager = digitalocean.Manager(token=token)
def add_txt_record(self, domain_name: str, record_name: str, record_content: str,
record_ttl: int):
record_ttl: int) -> None:
"""
Add a TXT record using the supplied information.
@ -104,7 +107,7 @@ class _DigitalOceanClient:
raise errors.PluginError('Error adding TXT record using the DigitalOcean API: {0}'
.format(e))
def del_txt_record(self, domain_name: str, record_name: str, record_content: str):
def del_txt_record(self, domain_name: str, record_name: str, record_content: str) -> None:
"""
Delete a TXT record using the supplied information.
@ -143,7 +146,7 @@ class _DigitalOceanClient:
logger.warning('Error deleting TXT record %s using the DigitalOcean API: %s',
record.id, e)
def _find_domain(self, domain_name):
def _find_domain(self, domain_name: str) -> digitalocean.Domain:
"""
Find the domain object for a given domain name.
@ -165,10 +168,10 @@ class _DigitalOceanClient:
logger.debug('Found base domain for %s using name %s', domain_name, guess)
return domain
raise errors.PluginError('Unable to determine base domain for {0} using names: {1}.'
.format(domain_name, domain_name_guesses))
raise errors.PluginError(f'Unable to determine base domain for {domain_name} using names: '
f'{domain_name_guesses}.')
@staticmethod
def _compute_record_name(domain, full_record_name):
def _compute_record_name(domain: digitalocean.Domain, full_record_name: str) -> str:
# The domain, from DigitalOcean's point of view, is automatically appended.
return full_record_name.rpartition("." + domain.name)[0]

View file

@ -1,8 +1,11 @@
"""DNS Authenticator for DNSimple DNS."""
import logging
from typing import Any
from typing import Callable
from typing import Optional
from lexicon.providers import dnsimple
from requests import HTTPError
from certbot import errors
from certbot.plugins import dns_common
@ -23,20 +26,21 @@ class Authenticator(dns_common.DNSAuthenticator):
description = 'Obtain certificates using a DNS TXT record (if you are using DNSimple for DNS).'
ttl = 60
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.credentials: Optional[CredentialsConfiguration] = None
@classmethod
def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
super().add_parser_arguments(add, default_propagation_seconds=30)
def add_parser_arguments(cls, add: Callable[..., None],
default_propagation_seconds: int = 30) -> None:
super().add_parser_arguments(add, default_propagation_seconds)
add('credentials', help='DNSimple credentials INI file.')
def more_info(self): # pylint: disable=missing-function-docstring
def more_info(self) -> str:
return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \
'the DNSimple API.'
def _setup_credentials(self):
def _setup_credentials(self) -> None:
self.credentials = self._configure_credentials(
'credentials',
'DNSimple credentials INI file',
@ -45,13 +49,13 @@ class Authenticator(dns_common.DNSAuthenticator):
}
)
def _perform(self, domain, validation_name, validation):
def _perform(self, domain: str, validation_name: str, validation: str) -> None:
self._get_dnsimple_client().add_txt_record(domain, validation_name, validation)
def _cleanup(self, domain, validation_name, validation):
def _cleanup(self, domain: str, validation_name: str, validation: str) -> None:
self._get_dnsimple_client().del_txt_record(domain, validation_name, validation)
def _get_dnsimple_client(self):
def _get_dnsimple_client(self) -> "_DNSimpleLexiconClient":
if not self.credentials: # pragma: no cover
raise errors.Error("Plugin has not been prepared.")
return _DNSimpleLexiconClient(self.credentials.conf('token'), self.ttl)
@ -62,7 +66,7 @@ class _DNSimpleLexiconClient(dns_common_lexicon.LexiconClient):
Encapsulates all communication with the DNSimple via Lexicon.
"""
def __init__(self, token, ttl):
def __init__(self, token: str, ttl: int) -> None:
super().__init__()
config = dns_common_lexicon.build_lexicon_config('dnssimple', {
@ -73,10 +77,12 @@ class _DNSimpleLexiconClient(dns_common_lexicon.LexiconClient):
self.provider = dnsimple.Provider(config)
def _handle_http_error(self, e, domain_name):
def _handle_http_error(self, e: HTTPError, domain_name: str) -> errors.PluginError:
hint = None
if str(e).startswith('401 Client Error: Unauthorized for url:'):
hint = 'Is your API token value correct?'
return errors.PluginError('Error determining zone identifier for {0}: {1}.{2}'
.format(domain_name, e, ' ({0})'.format(hint) if hint else ''))
hint_disp = f' ({hint})' if hint else ''
return errors.PluginError(f'Error determining zone identifier for {domain_name}: '
f'{e}.{hint_disp}')

View file

@ -1,8 +1,11 @@
"""DNS Authenticator for DNS Made Easy DNS."""
import logging
from typing import Any
from typing import Callable
from typing import Optional
from lexicon.providers import dnsmadeeasy
from requests import HTTPError
from certbot import errors
from certbot.plugins import dns_common
@ -24,20 +27,21 @@ class Authenticator(dns_common.DNSAuthenticator):
'DNS).')
ttl = 60
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.credentials: Optional[CredentialsConfiguration] = None
@classmethod
def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
super().add_parser_arguments(add, default_propagation_seconds=60)
def add_parser_arguments(cls, add: Callable[..., None],
default_propagation_seconds: int = 60) -> None:
super().add_parser_arguments(add, default_propagation_seconds)
add('credentials', help='DNS Made Easy credentials INI file.')
def more_info(self): # pylint: disable=missing-function-docstring
def more_info(self) -> str:
return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \
'the DNS Made Easy API.'
def _setup_credentials(self):
def _setup_credentials(self) -> None:
self.credentials = self._configure_credentials(
'credentials',
'DNS Made Easy credentials INI file',
@ -49,13 +53,13 @@ class Authenticator(dns_common.DNSAuthenticator):
}
)
def _perform(self, domain, validation_name, validation):
def _perform(self, domain: str, validation_name: str, validation: str) -> None:
self._get_dnsmadeeasy_client().add_txt_record(domain, validation_name, validation)
def _cleanup(self, domain, validation_name, validation):
def _cleanup(self, domain: str, validation_name: str, validation: str) -> None:
self._get_dnsmadeeasy_client().del_txt_record(domain, validation_name, validation)
def _get_dnsmadeeasy_client(self):
def _get_dnsmadeeasy_client(self) -> "_DNSMadeEasyLexiconClient":
if not self.credentials: # pragma: no cover
raise errors.Error("Plugin has not been prepared.")
return _DNSMadeEasyLexiconClient(self.credentials.conf('api-key'),
@ -68,7 +72,7 @@ class _DNSMadeEasyLexiconClient(dns_common_lexicon.LexiconClient):
Encapsulates all communication with the DNS Made Easy via Lexicon.
"""
def __init__(self, api_key, secret_key, ttl):
def __init__(self, api_key: str, secret_key: str, ttl: int) -> None:
super().__init__()
config = dns_common_lexicon.build_lexicon_config('dnsmadeeasy', {
@ -80,7 +84,7 @@ class _DNSMadeEasyLexiconClient(dns_common_lexicon.LexiconClient):
self.provider = dnsmadeeasy.Provider(config)
def _handle_http_error(self, e, domain_name):
def _handle_http_error(self, e: HTTPError, domain_name: str) -> Optional[errors.PluginError]:
if domain_name in str(e) and str(e).startswith('404 Client Error: Not Found for url:'):
return None
@ -88,5 +92,6 @@ class _DNSMadeEasyLexiconClient(dns_common_lexicon.LexiconClient):
if str(e).startswith('403 Client Error: Forbidden for url:'):
hint = 'Are your API key and Secret key values correct?'
return errors.PluginError('Error determining zone identifier: {0}.{1}'
.format(e, ' ({0})'.format(hint) if hint else ''))
hint_disp = f' ({hint})' if hint else ''
return errors.PluginError(f'Error determining zone identifier: {e}.{hint_disp}')

View file

@ -1,8 +1,11 @@
"""DNS Authenticator for Gehirn Infrastructure Service DNS."""
import logging
from typing import Any
from typing import Callable
from typing import Optional
from lexicon.providers import gehirn
from requests import HTTPError
from certbot import errors
from certbot.plugins import dns_common
@ -25,20 +28,21 @@ class Authenticator(dns_common.DNSAuthenticator):
'(if you are using Gehirn Infrastructure Service for DNS).'
ttl = 60
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.credentials: Optional[CredentialsConfiguration] = None
@classmethod
def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
super().add_parser_arguments(add, default_propagation_seconds=30)
def add_parser_arguments(cls, add: Callable[..., None],
default_propagation_seconds: int = 30) -> None:
super().add_parser_arguments(add, default_propagation_seconds)
add('credentials', help='Gehirn Infrastructure Service credentials file.')
def more_info(self): # pylint: disable=missing-function-docstring
def more_info(self) -> str:
return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \
'the Gehirn Infrastructure Service API.'
def _setup_credentials(self):
def _setup_credentials(self) -> None:
self.credentials = self._configure_credentials(
'credentials',
'Gehirn Infrastructure Service credentials file',
@ -50,13 +54,13 @@ class Authenticator(dns_common.DNSAuthenticator):
}
)
def _perform(self, domain, validation_name, validation):
def _perform(self, domain: str, validation_name: str, validation: str) -> None:
self._get_gehirn_client().add_txt_record(domain, validation_name, validation)
def _cleanup(self, domain, validation_name, validation):
def _cleanup(self, domain: str, validation_name: str, validation: str) -> None:
self._get_gehirn_client().del_txt_record(domain, validation_name, validation)
def _get_gehirn_client(self):
def _get_gehirn_client(self) -> "_GehirnLexiconClient":
if not self.credentials: # pragma: no cover
raise errors.Error("Plugin has not been prepared.")
return _GehirnLexiconClient(
@ -71,7 +75,7 @@ class _GehirnLexiconClient(dns_common_lexicon.LexiconClient):
Encapsulates all communication with the Gehirn Infrastructure Service via Lexicon.
"""
def __init__(self, api_token, api_secret, ttl):
def __init__(self, api_token: str, api_secret: str, ttl: int) -> None:
super().__init__()
config = dns_common_lexicon.build_lexicon_config('gehirn', {
@ -83,7 +87,7 @@ class _GehirnLexiconClient(dns_common_lexicon.LexiconClient):
self.provider = gehirn.Provider(config)
def _handle_http_error(self, e, domain_name):
def _handle_http_error(self, e: HTTPError, domain_name: str) -> Optional[errors.PluginError]:
if domain_name in str(e) and (str(e).startswith('404 Client Error: Not Found for url:')):
return None # Expected errors when zone name guess is wrong
return super()._handle_http_error(e, domain_name)

View file

@ -1,6 +1,10 @@
"""DNS Authenticator for Google Cloud DNS."""
import json
import logging
from typing import Any
from typing import Callable
from typing import Dict
from typing import Optional
from googleapiclient import discovery
from googleapiclient import errors as googleapiclient_errors
@ -29,7 +33,8 @@ class Authenticator(dns_common.DNSAuthenticator):
ttl = 60
@classmethod
def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
def add_parser_arguments(cls, add: Callable[..., None],
default_propagation_seconds: int = 60) -> None:
super().add_parser_arguments(add, default_propagation_seconds=60)
add('credentials',
help=('Path to Google Cloud DNS service account JSON file. (See {0} for' +
@ -37,11 +42,11 @@ class Authenticator(dns_common.DNSAuthenticator):
'required permissions.)').format(ACCT_URL, PERMISSIONS_URL),
default=None)
def more_info(self): # pylint: disable=missing-function-docstring
def more_info(self) -> str:
return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \
'the Google Cloud DNS API.'
def _setup_credentials(self):
def _setup_credentials(self) -> None:
if self.conf('credentials') is None:
try:
# use project_id query to check for availability of google metadata server
@ -58,13 +63,13 @@ class Authenticator(dns_common.DNSAuthenticator):
dns_common.validate_file_permissions(self.conf('credentials'))
def _perform(self, domain, validation_name, validation):
def _perform(self, domain: str, validation_name: str, validation: str) -> None:
self._get_google_client().add_txt_record(domain, validation_name, validation, self.ttl)
def _cleanup(self, domain, validation_name, validation):
def _cleanup(self, domain: str, validation_name: str, validation: str) -> None:
self._get_google_client().del_txt_record(domain, validation_name, validation, self.ttl)
def _get_google_client(self):
def _get_google_client(self) -> '_GoogleClient':
return _GoogleClient(self.conf('credentials'))
@ -73,7 +78,8 @@ class _GoogleClient:
Encapsulates all communication with the Google Cloud DNS API.
"""
def __init__(self, account_json=None, dns_api=None):
def __init__(self, account_json: Optional[str] = None,
dns_api: Optional[discovery.Resource] = None) -> None:
scopes = ['https://www.googleapis.com/auth/ndev.clouddns.readwrite']
if account_json is not None:
@ -95,7 +101,8 @@ class _GoogleClient:
else:
self.dns = dns_api
def add_txt_record(self, domain, record_name, record_content, record_ttl):
def add_txt_record(self, domain: str, record_name: str, record_content: str,
record_ttl: int) -> None:
"""
Add a TXT record using the supplied information.
@ -110,9 +117,9 @@ class _GoogleClient:
record_contents = self.get_existing_txt_rrset(zone_id, record_name)
if record_contents is None:
# If it wasn't possible to fetch the records at this label (missing .list permission),
# assume there aren't any (#5678). If there are actually records here, this will fail
# with HTTP 409/412 API errors.
# If it wasn't possible to fetch the records at this label (missing .list permission),
# assume there aren't any (#5678). If there are actually records here, this will fail
# with HTTP 409/412 API errors.
record_contents = {"rrdatas": []}
add_records = record_contents["rrdatas"][:]
@ -164,7 +171,8 @@ class _GoogleClient:
raise errors.PluginError('Error communicating with the Google Cloud DNS API: {0}'
.format(e))
def del_txt_record(self, domain, record_name, record_content, record_ttl):
def del_txt_record(self, domain: str, record_name: str, record_content: str,
record_ttl: int) -> None:
"""
Delete a TXT record using the supplied information.
@ -224,7 +232,7 @@ class _GoogleClient:
except googleapiclient_errors.Error as e:
logger.warning('Encountered error deleting TXT record: %s', e)
def get_existing_txt_rrset(self, zone_id, record_name):
def get_existing_txt_rrset(self, zone_id: str, record_name: str) -> Optional[Dict[str, Any]]:
"""
Get existing TXT records from the RRset for the record name.
@ -254,7 +262,7 @@ class _GoogleClient:
return response["rrsets"][0]
return None
def _find_managed_zone_id(self, domain):
def _find_managed_zone_id(self, domain: str) -> str:
"""
Find the managed zone for a given domain.
@ -286,7 +294,7 @@ class _GoogleClient:
.format(domain, zone_dns_name_guesses))
@staticmethod
def get_project_id():
def get_project_id() -> str:
"""
Query the google metadata service for the current project ID

View file

@ -1,6 +1,8 @@
"""DNS Authenticator for Linode."""
import logging
import re
from typing import Any
from typing import Callable
from typing import Optional
from typing import Union
@ -26,20 +28,21 @@ class Authenticator(dns_common.DNSAuthenticator):
description = 'Obtain certificates using a DNS TXT record (if you are using Linode for DNS).'
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.credentials: Optional[CredentialsConfiguration] = None
@classmethod
def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
super().add_parser_arguments(add, default_propagation_seconds=120)
def add_parser_arguments(cls, add: Callable[..., None],
default_propagation_seconds: int = 120) -> None:
super().add_parser_arguments(add, default_propagation_seconds)
add('credentials', help='Linode credentials INI file.')
def more_info(self): # pylint: disable=missing-function-docstring
def more_info(self) -> str:
return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \
'the Linode API.'
def _setup_credentials(self):
def _setup_credentials(self) -> None:
self.credentials = self._configure_credentials(
'credentials',
'Linode credentials INI file',
@ -49,13 +52,13 @@ class Authenticator(dns_common.DNSAuthenticator):
}
)
def _perform(self, domain, validation_name, validation):
def _perform(self, domain: str, validation_name: str, validation: str) -> None:
self._get_linode_client().add_txt_record(domain, validation_name, validation)
def _cleanup(self, domain, validation_name, validation):
def _cleanup(self, domain: str, validation_name: str, validation: str) -> None:
self._get_linode_client().del_txt_record(domain, validation_name, validation)
def _get_linode_client(self):
def _get_linode_client(self) -> '_LinodeLexiconClient':
if not self.credentials: # pragma: no cover
raise errors.Error("Plugin has not been prepared.")
api_key = self.credentials.conf('key')
@ -82,7 +85,7 @@ class _LinodeLexiconClient(dns_common_lexicon.LexiconClient):
Encapsulates all communication with the Linode API.
"""
def __init__(self, api_key, api_version):
def __init__(self, api_key: str, api_version: int) -> None:
super().__init__()
self.api_version = api_version
@ -103,8 +106,8 @@ class _LinodeLexiconClient(dns_common_lexicon.LexiconClient):
raise errors.PluginError('Invalid api version specified: {0}. (Supported: 3, 4)'
.format(api_version))
def _handle_general_error(self, e, domain_name):
def _handle_general_error(self, e: Exception, domain_name: str) -> Optional[errors.PluginError]:
if not str(e).startswith('Domain not found'):
return errors.PluginError('Unexpected error determining zone identifier for {0}: {1}'
.format(domain_name, e))
return errors.PluginError('Unexpected error determining zone identifier '
f'for {domain_name}: {e}')
return None

View file

@ -1,8 +1,11 @@
"""DNS Authenticator for LuaDNS DNS."""
import logging
from typing import Any
from typing import Callable
from typing import Optional
from lexicon.providers import luadns
from requests import HTTPError
from certbot import errors
from certbot.plugins import dns_common
@ -23,20 +26,21 @@ class Authenticator(dns_common.DNSAuthenticator):
description = 'Obtain certificates using a DNS TXT record (if you are using LuaDNS for DNS).'
ttl = 60
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.credentials: Optional[CredentialsConfiguration] = None
@classmethod
def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
super().add_parser_arguments(add, default_propagation_seconds=30)
def add_parser_arguments(cls, add: Callable[..., None],
default_propagation_seconds: int = 30) -> None:
super().add_parser_arguments(add, default_propagation_seconds)
add('credentials', help='LuaDNS credentials INI file.')
def more_info(self): # pylint: disable=missing-function-docstring
def more_info(self) -> str:
return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \
'the LuaDNS API.'
def _setup_credentials(self):
def _setup_credentials(self) -> None:
self.credentials = self._configure_credentials(
'credentials',
'LuaDNS credentials INI file',
@ -46,13 +50,13 @@ class Authenticator(dns_common.DNSAuthenticator):
}
)
def _perform(self, domain, validation_name, validation):
def _perform(self, domain: str, validation_name: str, validation: str) -> None:
self._get_luadns_client().add_txt_record(domain, validation_name, validation)
def _cleanup(self, domain, validation_name, validation):
def _cleanup(self, domain: str, validation_name: str, validation: str) -> None:
self._get_luadns_client().del_txt_record(domain, validation_name, validation)
def _get_luadns_client(self):
def _get_luadns_client(self) -> "_LuaDNSLexiconClient":
if not self.credentials: # pragma: no cover
raise errors.Error("Plugin has not been prepared.")
return _LuaDNSLexiconClient(self.credentials.conf('email'),
@ -65,7 +69,7 @@ class _LuaDNSLexiconClient(dns_common_lexicon.LexiconClient):
Encapsulates all communication with the LuaDNS via Lexicon.
"""
def __init__(self, email, token, ttl):
def __init__(self, email: str, token: str, ttl: int) -> None:
super().__init__()
config = dns_common_lexicon.build_lexicon_config('luadns', {
@ -77,10 +81,12 @@ class _LuaDNSLexiconClient(dns_common_lexicon.LexiconClient):
self.provider = luadns.Provider(config)
def _handle_http_error(self, e, domain_name):
def _handle_http_error(self, e: HTTPError, domain_name: str) -> errors.PluginError:
hint = None
if str(e).startswith('401 Client Error: Unauthorized for url:'):
hint = 'Are your email and API token values correct?'
return errors.PluginError('Error determining zone identifier for {0}: {1}.{2}'
.format(domain_name, e, ' ({0})'.format(hint) if hint else ''))
hint_disp = f' ({hint})' if hint else ''
return errors.PluginError(f'Error determining zone identifier for {domain_name}: '
f'{e}.{hint_disp}')

View file

@ -1,8 +1,11 @@
"""DNS Authenticator for NS1 DNS."""
import logging
from typing import Any
from typing import Callable
from typing import Optional
from lexicon.providers import nsone
from requests import HTTPError
from certbot import errors
from certbot.plugins import dns_common
@ -23,20 +26,21 @@ class Authenticator(dns_common.DNSAuthenticator):
description = 'Obtain certificates using a DNS TXT record (if you are using NS1 for DNS).'
ttl = 60
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.credentials: Optional[CredentialsConfiguration] = None
@classmethod
def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
super().add_parser_arguments(add, default_propagation_seconds=30)
def add_parser_arguments(cls, add: Callable[..., None],
default_propagation_seconds: int = 30) -> None:
super().add_parser_arguments(add, default_propagation_seconds)
add('credentials', help='NS1 credentials file.')
def more_info(self): # pylint: disable=missing-function-docstring
def more_info(self) -> str:
return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \
'the NS1 API.'
def _setup_credentials(self):
def _setup_credentials(self) -> None:
self.credentials = self._configure_credentials(
'credentials',
'NS1 credentials file',
@ -45,13 +49,13 @@ class Authenticator(dns_common.DNSAuthenticator):
}
)
def _perform(self, domain, validation_name, validation):
def _perform(self, domain: str, validation_name: str, validation: str) -> None:
self._get_nsone_client().add_txt_record(domain, validation_name, validation)
def _cleanup(self, domain, validation_name, validation):
def _cleanup(self, domain: str, validation_name: str, validation: str) -> None:
self._get_nsone_client().del_txt_record(domain, validation_name, validation)
def _get_nsone_client(self):
def _get_nsone_client(self) -> "_NS1LexiconClient":
if not self.credentials: # pragma: no cover
raise errors.Error("Plugin has not been prepared.")
return _NS1LexiconClient(self.credentials.conf('api-key'), self.ttl)
@ -62,7 +66,7 @@ class _NS1LexiconClient(dns_common_lexicon.LexiconClient):
Encapsulates all communication with the NS1 via Lexicon.
"""
def __init__(self, api_key, ttl):
def __init__(self, api_key: str, ttl: int) -> None:
super().__init__()
config = dns_common_lexicon.build_lexicon_config('nsone', {
@ -73,13 +77,14 @@ class _NS1LexiconClient(dns_common_lexicon.LexiconClient):
self.provider = nsone.Provider(config)
def _handle_http_error(self, e, domain_name):
if domain_name in str(e) and (str(e).startswith('404 Client Error: Not Found for url:') or \
def _handle_http_error(self, e: HTTPError, domain_name: str) -> Optional[errors.PluginError]:
if domain_name in str(e) and (str(e).startswith('404 Client Error: Not Found for url:') or
str(e).startswith("400 Client Error: Bad Request for url:")):
return None # Expected errors when zone name guess is wrong
hint = None
if str(e).startswith('401 Client Error: Unauthorized for url:'):
hint = 'Is your API key correct?'
return errors.PluginError('Error determining zone identifier: {0}.{1}'
.format(e, ' ({0})'.format(hint) if hint else ''))
hint_disp = f' ({hint})' if hint else ''
return errors.PluginError(f'Error determining zone identifier: {e}.{hint_disp}')

View file

@ -1,8 +1,11 @@
"""DNS Authenticator for OVH DNS."""
import logging
from typing import Any
from typing import Callable
from typing import Optional
from lexicon.providers import ovh
from requests import HTTPError
from certbot import errors
from certbot.plugins import dns_common
@ -23,20 +26,21 @@ class Authenticator(dns_common.DNSAuthenticator):
description = 'Obtain certificates using a DNS TXT record (if you are using OVH for DNS).'
ttl = 60
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.credentials: Optional[CredentialsConfiguration] = None
@classmethod
def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
super().add_parser_arguments(add, default_propagation_seconds=30)
def add_parser_arguments(cls, add: Callable[..., None],
default_propagation_seconds: int = 30) -> None:
super().add_parser_arguments(add, default_propagation_seconds)
add('credentials', help='OVH credentials INI file.')
def more_info(self): # pylint: disable=missing-function-docstring
def more_info(self) -> str:
return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \
'the OVH API.'
def _setup_credentials(self):
def _setup_credentials(self) -> None:
self.credentials = self._configure_credentials(
'credentials',
'OVH credentials INI file',
@ -51,13 +55,13 @@ class Authenticator(dns_common.DNSAuthenticator):
}
)
def _perform(self, domain, validation_name, validation):
def _perform(self, domain: str, validation_name: str, validation: str) -> None:
self._get_ovh_client().add_txt_record(domain, validation_name, validation)
def _cleanup(self, domain, validation_name, validation):
def _cleanup(self, domain: str, validation_name: str, validation: str) -> None:
self._get_ovh_client().del_txt_record(domain, validation_name, validation)
def _get_ovh_client(self):
def _get_ovh_client(self) -> "_OVHLexiconClient":
if not self.credentials: # pragma: no cover
raise errors.Error("Plugin has not been prepared.")
return _OVHLexiconClient(
@ -74,7 +78,8 @@ class _OVHLexiconClient(dns_common_lexicon.LexiconClient):
Encapsulates all communication with the OVH API via Lexicon.
"""
def __init__(self, endpoint, application_key, application_secret, consumer_key, ttl):
def __init__(self, endpoint: str, application_key: str, application_secret: str,
consumer_key: str, ttl: int) -> None:
super().__init__()
config = dns_common_lexicon.build_lexicon_config('ovh', {
@ -88,18 +93,20 @@ class _OVHLexiconClient(dns_common_lexicon.LexiconClient):
self.provider = ovh.Provider(config)
def _handle_http_error(self, e, domain_name):
def _handle_http_error(self, e: HTTPError, domain_name: str) -> errors.PluginError:
hint = None
if str(e).startswith('400 Client Error:'):
hint = 'Is your Application Secret value correct?'
if str(e).startswith('403 Client Error:'):
hint = 'Are your Application Key and Consumer Key values correct?'
return errors.PluginError('Error determining zone identifier for {0}: {1}.{2}'
.format(domain_name, e, ' ({0})'.format(hint) if hint else ''))
hint_disp = f' ({hint})' if hint else ''
def _handle_general_error(self, e, domain_name):
return errors.PluginError(f'Error determining zone identifier for {domain_name}: '
f'{e}.{hint_disp}')
def _handle_general_error(self, e: Exception, domain_name: str) -> Optional[errors.PluginError]:
if domain_name in str(e) and str(e).endswith('not found'):
return
return None
super()._handle_general_error(e, domain_name)
return super()._handle_general_error(e, domain_name)

View file

@ -1,5 +1,7 @@
"""DNS Authenticator using RFC 2136 Dynamic Updates."""
import logging
from typing import Any
from typing import Callable
from typing import Optional
import dns.flags
@ -42,20 +44,21 @@ class Authenticator(dns_common.DNSAuthenticator):
description = 'Obtain certificates using a DNS TXT record (if you are using BIND for DNS).'
ttl = 120
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.credentials: Optional[CredentialsConfiguration] = None
@classmethod
def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
def add_parser_arguments(cls, add: Callable[..., None],
default_propagation_seconds: int = 60) -> None:
super().add_parser_arguments(add, default_propagation_seconds=60)
add('credentials', help='RFC 2136 credentials INI file.')
def more_info(self): # pylint: disable=missing-function-docstring
def more_info(self) -> str:
return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \
'RFC 2136 Dynamic Updates.'
def _validate_credentials(self, credentials):
def _validate_credentials(self, credentials: CredentialsConfiguration) -> None:
server = credentials.conf('server')
if not is_ipaddress(server):
raise errors.PluginError("The configured target DNS server ({0}) is not a valid IPv4 "
@ -65,7 +68,7 @@ class Authenticator(dns_common.DNSAuthenticator):
if not self.ALGORITHMS.get(algorithm.upper()):
raise errors.PluginError("Unknown algorithm: {0}.".format(algorithm))
def _setup_credentials(self):
def _setup_credentials(self) -> None:
self.credentials = self._configure_credentials(
'credentials',
'RFC 2136 credentials INI file',
@ -77,13 +80,13 @@ class Authenticator(dns_common.DNSAuthenticator):
self._validate_credentials
)
def _perform(self, _domain, validation_name, validation):
def _perform(self, _domain: str, validation_name: str, validation: str) -> None:
self._get_rfc2136_client().add_txt_record(validation_name, validation, self.ttl)
def _cleanup(self, _domain, validation_name, validation):
def _cleanup(self, _domain: str, validation_name: str, validation: str) -> None:
self._get_rfc2136_client().del_txt_record(validation_name, validation)
def _get_rfc2136_client(self):
def _get_rfc2136_client(self) -> "_RFC2136Client":
if not self.credentials: # pragma: no cover
raise errors.Error("Plugin has not been prepared.")
return _RFC2136Client(self.credentials.conf('server'),
@ -98,8 +101,8 @@ class _RFC2136Client:
"""
Encapsulates all communication with the target DNS server.
"""
def __init__(self, server, port, key_name, key_secret, key_algorithm,
timeout=DEFAULT_NETWORK_TIMEOUT):
def __init__(self, server: str, port: int, key_name: str, key_secret: str,
key_algorithm: dns.name.Name, timeout: int = DEFAULT_NETWORK_TIMEOUT) -> None:
self.server = server
self.port = port
self.keyring = dns.tsigkeyring.from_text({
@ -108,7 +111,7 @@ class _RFC2136Client:
self.algorithm = key_algorithm
self._default_timeout = timeout
def add_txt_record(self, record_name, record_content, record_ttl):
def add_txt_record(self, record_name: str, record_content: str, record_ttl: int) -> None:
"""
Add a TXT record using the supplied information.
@ -143,7 +146,7 @@ class _RFC2136Client:
raise errors.PluginError('Received response from server: {0}'
.format(dns.rcode.to_text(rcode)))
def del_txt_record(self, record_name, record_content):
def del_txt_record(self, record_name: str, record_content: str) -> None:
"""
Delete a TXT record using the supplied information.
@ -178,7 +181,7 @@ class _RFC2136Client:
raise errors.PluginError('Received response from server: {0}'
.format(dns.rcode.to_text(rcode)))
def _find_domain(self, record_name):
def _find_domain(self, record_name: str) -> str:
"""
Find the closest domain with an SOA record for a given domain name.
@ -198,7 +201,7 @@ class _RFC2136Client:
raise errors.PluginError('Unable to determine base domain for {0} using names: {1}.'
.format(record_name, domain_name_guesses))
def _query_soa(self, domain_name):
def _query_soa(self, domain_name: str) -> bool:
"""
Query a domain name for an authoritative SOA record.

View file

@ -2,6 +2,7 @@
import collections
import logging
import time
from typing import Any
from typing import DefaultDict
from typing import Dict
from typing import List
@ -10,7 +11,9 @@ import boto3
from botocore.exceptions import ClientError
from botocore.exceptions import NoCredentialsError
from acme.challenges import ChallengeResponse
from certbot import errors
from certbot.achallenges import AnnotatedChallenge
from certbot.plugins import dns_common
logger = logging.getLogger(__name__)
@ -32,21 +35,21 @@ class Authenticator(dns_common.DNSAuthenticator):
"DNS).")
ttl = 10
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.r53 = boto3.client("route53")
self._resource_records: DefaultDict[str, List[Dict[str, str]]] = collections.defaultdict(list)
def more_info(self): # pylint: disable=missing-function-docstring
def more_info(self) -> str:
return "Solve a DNS01 challenge using AWS Route53"
def _setup_credentials(self):
def _setup_credentials(self) -> None:
pass
def _perform(self, domain, validation_name, validation):
def _perform(self, domain: str, validation_name: str, validation: str) -> None:
pass
def perform(self, achalls):
def perform(self, achalls: List[AnnotatedChallenge]) -> List[ChallengeResponse]:
self._attempt_cleanup = True
try:
@ -64,13 +67,13 @@ class Authenticator(dns_common.DNSAuthenticator):
raise errors.PluginError("\n".join([str(e), INSTRUCTIONS]))
return [achall.response(achall.account_key) for achall in achalls]
def _cleanup(self, domain, validation_name, validation):
def _cleanup(self, domain: str, validation_name: str, validation: str) -> None:
try:
self._change_txt_record("DELETE", validation_name, validation)
except (NoCredentialsError, ClientError) as e:
logger.debug('Encountered error during cleanup: %s', e, exc_info=True)
def _find_zone_id_for_domain(self, domain):
def _find_zone_id_for_domain(self, domain: str) -> str:
"""Find the zone id responsible a given FQDN.
That is, the id for the zone whose name is the longest parent of the
@ -100,7 +103,7 @@ class Authenticator(dns_common.DNSAuthenticator):
zones.sort(key=lambda z: len(z[0]), reverse=True)
return zones[0][1]
def _change_txt_record(self, action, validation_domain_name, validation):
def _change_txt_record(self, action: str, validation_domain_name: str, validation: str) -> str:
zone_id = self._find_zone_id_for_domain(validation_domain_name)
rrecords = self._resource_records[validation_domain_name]
@ -136,7 +139,7 @@ class Authenticator(dns_common.DNSAuthenticator):
)
return response["ChangeInfo"]["Id"]
def _wait_for_change(self, change_id):
def _wait_for_change(self, change_id: str) -> None:
"""Wait for a change to be propagated to all Route53 DNS servers.
https://docs.aws.amazon.com/Route53/latest/APIReference/API_GetChange.html
"""

View file

@ -1,4 +1,5 @@
"""Shim around `~certbot_dns_route53._internal.dns_route53` for backwards compatibility."""
from typing import Any
import warnings
from certbot_dns_route53._internal import dns_route53
@ -10,7 +11,7 @@ class Authenticator(dns_route53.Authenticator):
hidden = True
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
warnings.warn("The 'authenticator' module was renamed 'dns_route53'",
DeprecationWarning)
super().__init__(*args, **kwargs)

View file

@ -1,8 +1,11 @@
"""DNS Authenticator for Sakura Cloud DNS."""
import logging
from typing import Any
from typing import Callable
from typing import Optional
from lexicon.providers import sakuracloud
from requests import HTTPError
from certbot import errors
from certbot.plugins import dns_common
@ -24,41 +27,40 @@ class Authenticator(dns_common.DNSAuthenticator):
'(if you are using Sakura Cloud for DNS).'
ttl = 60
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.credentials: Optional[CredentialsConfiguration] = None
@classmethod
def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
def add_parser_arguments(cls, add: Callable[..., None],
default_propagation_seconds: int = 30) -> None:
super().add_parser_arguments(
add, default_propagation_seconds=90)
add('credentials', help='Sakura Cloud credentials file.')
def more_info(self): # pylint: disable=missing-function-docstring
def more_info(self) -> str:
return 'This plugin configures a DNS TXT record to respond to a dns-01 challenge using ' + \
'the Sakura Cloud API.'
def _setup_credentials(self):
def _setup_credentials(self) -> None:
self.credentials = self._configure_credentials(
'credentials',
'Sakura Cloud credentials file',
{
'api-token': \
'API token for Sakura Cloud API obtained from {0}'.format(APIKEY_URL),
'api-secret': \
'API secret for Sakura Cloud API obtained from {0}'.format(APIKEY_URL),
'api-token': f'API token for Sakura Cloud API obtained from {APIKEY_URL}',
'api-secret': f'API secret for Sakura Cloud API obtained from {APIKEY_URL}',
}
)
def _perform(self, domain, validation_name, validation):
def _perform(self, domain: str, validation_name: str, validation: str) -> None:
self._get_sakuracloud_client().add_txt_record(
domain, validation_name, validation)
def _cleanup(self, domain, validation_name, validation):
def _cleanup(self, domain: str, validation_name: str, validation: str) -> None:
self._get_sakuracloud_client().del_txt_record(
domain, validation_name, validation)
def _get_sakuracloud_client(self):
def _get_sakuracloud_client(self) -> "_SakuraCloudLexiconClient":
if not self.credentials: # pragma: no cover
raise errors.Error("Plugin has not been prepared.")
return _SakuraCloudLexiconClient(
@ -73,7 +75,7 @@ class _SakuraCloudLexiconClient(dns_common_lexicon.LexiconClient):
Encapsulates all communication with the Sakura Cloud via Lexicon.
"""
def __init__(self, api_token, api_secret, ttl):
def __init__(self, api_token: str, api_secret: str, ttl: int) -> None:
super().__init__()
config = dns_common_lexicon.build_lexicon_config('sakuracloud', {
@ -85,7 +87,7 @@ class _SakuraCloudLexiconClient(dns_common_lexicon.LexiconClient):
self.provider = sakuracloud.Provider(config)
def _handle_http_error(self, e, domain_name):
def _handle_http_error(self, e: HTTPError, domain_name: str) -> Optional[errors.PluginError]:
if domain_name in str(e) and (str(e).startswith('404 Client Error: Not Found for url:')):
return None # Expected errors when zone name guess is wrong
return super()._handle_http_error(e, domain_name)

View file

@ -6,7 +6,9 @@ Certbot adheres to [Semantic Versioning](https://semver.org/).
### Added
*
* Added `show_account` subcommand, which will fetch the account information
from the ACME server and show the account details (account URL and, if
applicable, email address or addresses)
### Changed

View file

@ -43,6 +43,7 @@ manage your account:
register Create an ACME account
unregister Deactivate an ACME account
update_account Update an ACME account
show_account Display account details
--agree-tos Agree to the ACME server's Subscriber Agreement
-m EMAIL Email address for important account notifications
"""

View file

@ -56,6 +56,7 @@ class HelpfulArgumentParser:
"plugins": main.plugins_cmd,
"register": main.register,
"update_account": main.update_account,
"show_account": main.show_account,
"unregister": main.unregister,
"renew": main.renew,
"revoke": main.revoke,
@ -71,11 +72,11 @@ class HelpfulArgumentParser:
self.notify = display_obj.NoninteractiveDisplay(sys.stdout).notification
# List of topics for which additional help can be provided
HELP_TOPICS = ["all", "security", "paths", "automation", "testing"]
HELP_TOPICS: List[Optional[str]] = ["all", "security", "paths", "automation", "testing"]
HELP_TOPICS += list(self.VERBS) + self.COMMANDS_TOPICS + ["manage"]
plugin_names = list(plugins)
self.help_topics = HELP_TOPICS + plugin_names + [None] # type: ignore
plugin_names: List[Optional[str]] = list(plugins)
self.help_topics: List[Optional[str]] = HELP_TOPICS + plugin_names + [None]
self.detect_defaults = detect_defaults
self.args = args
@ -319,7 +320,8 @@ class HelpfulArgumentParser:
self.verb = "run"
def prescan_for_flag(self, flag: str, possible_arguments: Iterable[str]) -> Union[str, bool]:
def prescan_for_flag(self, flag: str, possible_arguments: Iterable[Optional[str]]
) -> Union[str, bool]:
"""Checks cli input for flags.
Check for a flag, which accepts a fixed set of possible arguments, in
@ -376,7 +378,7 @@ class HelpfulArgumentParser:
if self.detect_defaults:
kwargs = self.modify_kwargs_for_default_detection(**kwargs)
if isinstance(topic, str) and self.visible_topics[topic]:
if not isinstance(topic, bool) and self.visible_topics[topic]:
if topic in self.groups:
group = self.groups[topic]
group.add_argument(*args, **kwargs)
@ -471,7 +473,8 @@ class HelpfulArgumentParser:
description=plugin_ep.long_description)
plugin_ep.plugin_cls.inject_parser_options(parser_or_group, name)
def determine_help_topics(self, chosen_topic: Union[str, bool]) -> Dict[str, bool]:
def determine_help_topics(self, chosen_topic: Union[str, bool]
) -> Dict[Optional[str], bool]:
"""
The user may have requested help on a topic, return a dict of which

View file

@ -46,5 +46,5 @@ def _paths_parser(helpful: "helpful.HelpfulArgumentParser") -> None:
help=config_help("work_dir"))
add("paths", "--logs-dir", default=flag_default("logs_dir"),
help="Logs directory.")
add("paths", "--server", default=flag_default("server"),
add(["paths", "show_account"], "--server", default=flag_default("server"),
help=config_help("server"))

View file

@ -97,6 +97,11 @@ VERB_HELP = [
"to already existing configuration."),
"usage": "\n\n certbot enhance [options]\n\n"
}),
("show_account", {
"short": "Show account details from an ACME server",
"opts": 'Options useful for the "show_account" subcommand:',
"usage": "\n\n certbot show_account [options]\n\n"
}),
]

View file

@ -811,7 +811,7 @@ def unregister(config: configuration.NamespaceConfig,
accounts = account_storage.find_all()
if not accounts:
return "Could not find existing account to deactivate."
return f"Could not find existing account for server {config.server}."
prompt = ("Are you sure you would like to irrevocably deactivate "
"your account?")
wants_deactivate = display_util.yesno(prompt, yes_label='Deactivate', no_label='Abort',
@ -846,7 +846,7 @@ def register(config: configuration.NamespaceConfig,
:param unused_plugins: List of plugins (deprecated)
:type unused_plugins: plugins_disco.PluginsRegistry
:returns: `None` or a string indicating and error
:returns: `None` or a string indicating an error
:rtype: None or str
"""
@ -877,7 +877,7 @@ def update_account(config: configuration.NamespaceConfig,
:param unused_plugins: List of plugins (deprecated)
:type unused_plugins: plugins_disco.PluginsRegistry
:returns: `None` or a string indicating and error
:returns: `None` or a string indicating an error
:rtype: None or str
"""
@ -887,7 +887,7 @@ def update_account(config: configuration.NamespaceConfig,
accounts = account_storage.find_all()
if not accounts:
return "Could not find an existing account to update."
return f"Could not find an existing account for server {config.server}."
if config.email is None and not config.register_unsafely_without_email:
config.email = display_ops.get_email(optional=False)
@ -921,6 +921,53 @@ def update_account(config: configuration.NamespaceConfig,
return None
def show_account(config: configuration.NamespaceConfig,
unused_plugins: plugins_disco.PluginsRegistry) -> Optional[str]:
"""Fetch account info from the ACME server and show it to the user.
:param config: Configuration object
:type config: configuration.NamespaceConfig
:param unused_plugins: List of plugins (deprecated)
:type unused_plugins: plugins_disco.PluginsRegistry
:returns: `None` or a string indicating an error
:rtype: None or str
"""
# Portion of _determine_account logic to see whether accounts already
# exist or not.
account_storage = account.AccountFileStorage(config)
accounts = account_storage.find_all()
if not accounts:
return f"Could not find an existing account for server {config.server}."
acc, acme = _determine_account(config)
cb_client = client.Client(config, acc, None, None, acme=acme)
if not cb_client.acme:
raise errors.Error("ACME client is not set.")
regr = cb_client.acme.query_registration(acc.regr)
output = [f"Account details for server {config.server}:",
f" Account URL: {regr.uri}"]
emails = []
for contact in regr.body.contact:
if contact.startswith('mailto:'):
emails.append(contact[7:])
output.append(" Email contact{}: {}".format(
"s" if len(emails) > 1 else "",
", ".join(emails) if len(emails) > 0 else "none"))
display_util.notify("\n".join(output))
return None
def _cert_name_from_config_or_lineage(config: configuration.NamespaceConfig,
lineage: Optional[storage.RenewableCert]) -> Optional[str]:
if lineage:

View file

@ -109,7 +109,7 @@ class LexiconClient:
raise errors.PluginError('Unable to determine zone identifier for {0} using zone names: {1}'
.format(domain, domain_name_guesses))
def _handle_http_error(self, e: HTTPError, domain_name: str) -> errors.PluginError:
def _handle_http_error(self, e: HTTPError, domain_name: str) -> Optional[errors.PluginError]:
return errors.PluginError('Error determining zone identifier for {0}: {1}.'
.format(domain_name, e))

View file

@ -31,13 +31,11 @@ running:
git clone https://github.com/certbot/certbot
If you're on macOS, we recommend you skip the rest of this section and instead
run Certbot in Docker. You can find instructions for how to do this :ref:`here
<docker-dev>`. If you're running on Linux, you can run the following commands to
If you're running on a UNIX-like OS, you can run the following commands to
install dependencies and set up a virtual environment where you can run
Certbot.
Install the OS system dependencies required to run Certbot.
Install and configure the OS system dependencies required to run Certbot.
.. code-block:: shell
@ -50,6 +48,14 @@ Install the OS system dependencies required to run Certbot.
# NB2: RHEL-based distributions use python3X-devel instead of python3-devel (e.g. python36-devel)
sudo dnf install python3-devel gcc augeas-libs openssl-devel libffi-devel \
redhat-rpm-config ca-certificates openssl
# For macOS installations with Homebrew already installed and configured
# NB: If you also run `brew install python` you don't need the ~/lib
# directory created below, however, Certbot's Apache plugin won't work
# if you use Python installed from other sources such as pyenv or the
# version provided by Apple.
brew install augeas
mkdir ~/lib
ln -s $(brew --prefix)/lib/libaugeas* ~/lib
Set up the Python virtual environment that will host your Certbot local instance.
@ -105,6 +111,10 @@ You can test your code in several ways:
- running the `automated integration`_ tests
- running an *ad hoc* `manual integration`_ test
.. note:: Running integration tests does not currently work on macOS. See
https://github.com/certbot/certbot/issues/6959. In the meantime, we
recommend developers on macOS open a PR to run integration tests.
.. _automated unit:
Running automated unit tests

View file

@ -1151,7 +1151,8 @@ certbot will begin rotating logs once there are 1000 logs in the log directory.
Meaning that once 1000 files are in ``/var/log/letsencrypt`` Certbot will delete
the oldest one to make room for new logs. The number of subsequent logs can be
changed by passing the desired number to the command line flag
``--max-log-backups``.
``--max-log-backups``. Setting this flag to 0 disables log rotation entirely,
causing certbot to always append to the same log file.
.. note:: Some distributions, including Debian and Ubuntu, disable
certbot's internal log rotation in favor of a more traditional

View file

@ -1595,10 +1595,11 @@ class UnregisterTest(unittest.TestCase):
self.mocks['client'].Client.return_value = cb_client
config = mock.MagicMock()
config.server = "https://acme.example.com/directory"
unused_plugins = mock.MagicMock()
res = main.unregister(config, unused_plugins)
m = "Could not find existing account to deactivate."
m = "Could not find existing account for server https://acme.example.com/directory."
self.assertEqual(res, m)
self.assertIs(cb_client.acme.deactivate_registration.called, False)
@ -2025,7 +2026,8 @@ class UpdateAccountTest(test_util.ConfigTestCase):
mock_storage.find_all.return_value = []
self.mocks['account'].AccountFileStorage.return_value = mock_storage
self.assertEqual(self._call(['update_account', '--email', 'user@example.org']),
'Could not find an existing account to update.')
'Could not find an existing account for server'
' https://acme-v02.api.letsencrypt.org/directory.')
def test_update_account_remove_email(self):
"""Test that --register-unsafely-without-email is handled as no email"""
@ -2070,5 +2072,106 @@ class UpdateAccountTest(test_util.ConfigTestCase):
'Your e-mail address was updated to user@example.com,user@example.org.')
class ShowAccountTest(test_util.ConfigTestCase):
"""Tests for certbot._internal.main.show_account"""
def setUp(self):
patches = {
'account': mock.patch('certbot._internal.main.account'),
'atexit': mock.patch('certbot.util.atexit'),
'client': mock.patch('certbot._internal.main.client'),
'determine_account': mock.patch('certbot._internal.main._determine_account'),
'notify': mock.patch('certbot._internal.main.display_util.notify'),
'util': test_util.patch_display_util()
}
self.mocks = { k: patches[k].start() for k in patches }
for patch in patches.values():
self.addCleanup(patch.stop)
return super().setUp()
def _call(self, args):
with mock.patch('certbot._internal.main.sys.stdout'), \
mock.patch('certbot._internal.main.sys.stderr'):
args = ['--config-dir', self.config.config_dir,
'--work-dir', self.config.work_dir,
'--logs-dir', self.config.logs_dir, '--text'] + args
return main.main(args[:]) # NOTE: parser can alter its args!
def _prepare_mock_account(self):
mock_storage = mock.MagicMock()
mock_account = mock.MagicMock()
mock_regr = mock.MagicMock()
mock_storage.find_all.return_value = [mock_account]
self.mocks['account'].AccountFileStorage.return_value = mock_storage
mock_account.regr.body = mock_regr.body
self.mocks['determine_account'].return_value = (mock_account, mock.MagicMock())
def _test_show_account(self, contact):
self._prepare_mock_account()
mock_client = mock.MagicMock()
mock_regr = mock.MagicMock()
mock_regr.body.contact = contact
mock_regr.uri = 'https://www.letsencrypt-demo.org/acme/reg/1'
mock_regr.body.key.thumbprint.return_value = b'foobarbaz'
mock_client.acme.query_registration.return_value = mock_regr
self.mocks['client'].Client.return_value = mock_client
args = ['show_account']
self._call(args)
self.assertEqual(mock_client.acme.query_registration.call_count, 1)
def test_no_existing_accounts(self):
"""Test that no existing account is handled correctly"""
mock_storage = mock.MagicMock()
mock_storage.find_all.return_value = []
self.mocks['account'].AccountFileStorage.return_value = mock_storage
self.assertEqual(self._call(['show_account']),
'Could not find an existing account for server'
' https://acme-v02.api.letsencrypt.org/directory.')
def test_no_existing_client(self):
"""Test that issues with the ACME client are handled correctly"""
self._prepare_mock_account()
mock_client = mock.MagicMock()
mock_client.acme = None
self.mocks['client'].Client.return_value = mock_client
try:
self._call(['show_account'])
except errors.Error as e:
self.assertEqual('ACME client is not set.', str(e))
def test_no_contacts(self):
self._test_show_account(())
self.assertEqual(self.mocks['notify'].call_count, 1)
self.mocks['notify'].assert_has_calls([
mock.call('Account details for server https://acme-v02.api.letsencr'
'ypt.org/directory:\n Account URL: https://www.letsencry'
'pt-demo.org/acme/reg/1\n Email contact: none')])
def test_single_email(self):
contact = ('mailto:foo@example.com',)
self._test_show_account(contact)
self.assertEqual(self.mocks['notify'].call_count, 1)
self.mocks['notify'].assert_has_calls([
mock.call('Account details for server https://acme-v02.api.letsencr'
'ypt.org/directory:\n Account URL: https://www.letsencry'
'pt-demo.org/acme/reg/1\n Email contact: foo@example.com')])
def test_double_email(self):
contact = ('mailto:foo@example.com', 'mailto:bar@example.com')
self._test_show_account(contact)
self.assertEqual(self.mocks['notify'].call_count, 1)
self.mocks['notify'].assert_has_calls([
mock.call('Account details for server https://acme-v02.api.letsencr'
'ypt.org/directory:\n Account URL: https://www.letsencry'
'pt-demo.org/acme/reg/1\n Email contacts: foo@example.com, bar@example.com')])
if __name__ == '__main__':
unittest.main() # pragma: no cover

View file

@ -6,8 +6,8 @@
targets:
#-----------------------------------------------------------------------------
#Ubuntu
- ami: ami-0f2e2c076f4c2f941
name: ubuntu20.10
- ami: ami-0c2d5393cb5b518f6
name: ubuntu21.10
type: ubuntu
virt: hvm
user: ubuntu

View file

@ -3,8 +3,8 @@
targets:
#-----------------------------------------------------------------------------
#Ubuntu
- ami: ami-0f2e2c076f4c2f941
name: ubuntu20.10
- ami: ami-0c2d5393cb5b518f6
name: ubuntu21.10
type: ubuntu
virt: hvm
user: ubuntu

View file

@ -9,6 +9,10 @@ import shutil
import subprocess
import sys
import tempfile
from typing import Iterable
from typing import List
from typing import Sequence
from typing import Tuple
from cryptography import x509
from cryptography.hazmat.backends import default_backend
@ -24,7 +28,7 @@ from certbot.tests import util as test_util
logger = logging.getLogger(__name__)
def main():
def main() -> None:
"""Run the lock tests."""
dirs, base_cmd = set_up()
for subcommand in ('certonly', 'install', 'renew', 'run',):
@ -33,7 +37,7 @@ def main():
logger.info('Lock test ran successfully.')
def set_up():
def set_up() -> Tuple[List[str], List[str]]:
"""Prepare tests to be run.
Logging is set up and temporary directories are set up to contain a
@ -65,7 +69,7 @@ def set_up():
return dirs, command
def set_up_dirs():
def set_up_dirs() -> Tuple[str, str, str, str]:
"""Set up directories for tests.
A temporary directory is created to contain the config, log, work,
@ -96,7 +100,7 @@ def set_up_dirs():
return config_dir, logs_dir, work_dir, nginx_dir
def set_up_nginx_dir(root_path):
def set_up_nginx_dir(root_path: str) -> None:
"""Create a basic Nginx configuration in nginx_dir.
:param str root_path: where the Nginx server root should be placed
@ -113,7 +117,7 @@ def set_up_nginx_dir(root_path):
f.write(check_call(['/bin/sh', conf_script, root_path, key_path, cert_path]))
def set_up_command(config_dir, logs_dir, work_dir, nginx_dir):
def set_up_command(config_dir: str, logs_dir: str, work_dir: str, nginx_dir: str) -> List[str]:
"""Build the Certbot command to run for testing.
You can test different subcommands by appending the desired command
@ -137,7 +141,7 @@ def set_up_command(config_dir, logs_dir, work_dir, nginx_dir):
config_dir, logs_dir, work_dir, nginx_dir).split())
def setup_certificate(workspace):
def setup_certificate(workspace: str) -> Tuple[str, str]:
"""Generate a self-signed certificate for nginx.
:param workspace: path of folder where to put the certificate
:return: tuple containing the key path and certificate path
@ -181,7 +185,7 @@ def setup_certificate(workspace):
return key_path, cert_path
def test_command(command, directories):
def test_command(command: Sequence[str], directories: Iterable[str]) -> None:
"""Assert Certbot acquires locks in a specific order.
command is run repeatedly testing that Certbot acquires locks on
@ -198,7 +202,7 @@ def test_command(command, directories):
dir_lock.release()
def check_error(command, dir_path):
def check_error(command: Sequence[str], dir_path: str) -> None:
"""Run command and verify it fails to acquire the lock for dir_path.
:param str command: certbot command to run
@ -223,7 +227,7 @@ def check_error(command, dir_path):
report_failure(err_msg, out, err)
def check_call(args):
def check_call(args: Sequence[str]) -> str:
"""Simple imitation of subprocess.check_call.
This function is only available in subprocess in Python 2.7+.
@ -240,7 +244,7 @@ def check_call(args):
return out
def report_failure(err_msg, out, err):
def report_failure(err_msg: str, out: str, err: str) -> None:
"""Report a subprocess failure and exit.
:param str err_msg: error message to report
@ -253,7 +257,7 @@ def report_failure(err_msg, out, err):
sys.exit(err_msg)
def subprocess_call(args):
def subprocess_call(args: Sequence[str]) -> Tuple[int, str, str]:
"""Run a command with subprocess and return the result.
:param list args: program and it's arguments to be run
@ -270,7 +274,7 @@ def subprocess_call(args):
return process.returncode, out, err
def log_output(level, out, err):
def log_output(level: int, out: str, err: str) -> None:
"""Logs stdout and stderr output at the requested level.
:param int level: logging level to use

View file

@ -19,6 +19,14 @@ Setup:
Run:
python tools/finish_release.py ~/.ssh/githubpat.txt
Testing:
This script can be safely run between releases. When this is done, the script
should execute successfully until the final step when it tries to set draft
equal to false on the GitHub release. This step should fail because a published
release with that name already exists.
"""
import argparse

View file

@ -14,7 +14,7 @@ DEFAULT_PACKAGES = [
'certbot_dns_sakuracloud', 'certbot_nginx']
COVER_THRESHOLDS = {
'certbot': {'linux': 95, 'windows': 96},
'certbot': {'linux': 94, 'windows': 96},
'acme': {'linux': 100, 'windows': 99},
'certbot_apache': {'linux': 100, 'windows': 100},
'certbot_dns_cloudflare': {'linux': 98, 'windows': 98},

View file

@ -20,8 +20,8 @@ install_and_test = python {toxinidir}/tools/install_and_test.py
dns_packages = certbot-dns-cloudflare certbot-dns-cloudxns certbot-dns-digitalocean certbot-dns-dnsimple certbot-dns-dnsmadeeasy certbot-dns-gehirn certbot-dns-google certbot-dns-linode certbot-dns-luadns certbot-dns-nsone certbot-dns-ovh certbot-dns-rfc2136 certbot-dns-route53 certbot-dns-sakuracloud
win_all_packages = acme[test] certbot[test] {[base]dns_packages} certbot-nginx
all_packages = {[base]win_all_packages} certbot-apache
fully_typed_source_paths = acme/acme certbot/certbot certbot-ci/certbot_integration_tests certbot-ci/snap_integration_tests certbot-ci/windows_installer_integration_tests
partially_typed_source_paths = certbot-apache/certbot_apache certbot-compatibility-test/certbot_compatibility_test certbot-dns-cloudflare/certbot_dns_cloudflare certbot-dns-cloudxns/certbot_dns_cloudxns certbot-dns-digitalocean/certbot_dns_digitalocean certbot-dns-dnsimple/certbot_dns_dnsimple certbot-dns-dnsmadeeasy/certbot_dns_dnsmadeeasy certbot-dns-gehirn/certbot_dns_gehirn certbot-dns-google/certbot_dns_google certbot-dns-linode/certbot_dns_linode certbot-dns-luadns/certbot_dns_luadns certbot-dns-nsone/certbot_dns_nsone certbot-dns-ovh/certbot_dns_ovh certbot-dns-rfc2136/certbot_dns_rfc2136 certbot-dns-route53/certbot_dns_route53 certbot-dns-sakuracloud/certbot_dns_sakuracloud certbot-nginx/certbot_nginx tests/lock_test.py
fully_typed_source_paths = acme/acme certbot/certbot certbot-ci/certbot_integration_tests certbot-ci/snap_integration_tests certbot-ci/windows_installer_integration_tests certbot-compatibility-test/certbot_compatibility_test certbot-dns-cloudflare/certbot_dns_cloudflare certbot-dns-cloudxns/certbot_dns_cloudxns certbot-dns-digitalocean/certbot_dns_digitalocean certbot-dns-dnsimple/certbot_dns_dnsimple certbot-dns-dnsmadeeasy/certbot_dns_dnsmadeeasy certbot-dns-gehirn/certbot_dns_gehirn certbot-dns-google/certbot_dns_google certbot-dns-linode/certbot_dns_linode certbot-dns-luadns/certbot_dns_luadns certbot-dns-nsone/certbot_dns_nsone certbot-dns-ovh/certbot_dns_ovh certbot-dns-rfc2136/certbot_dns_rfc2136 certbot-dns-route53/certbot_dns_route53 certbot-dns-sakuracloud/certbot_dns_sakuracloud tests/lock_test.py
partially_typed_source_paths = certbot-apache/certbot_apache certbot-nginx/certbot_nginx
[testenv]
passenv =