Merge branch 'master' into handle-merge-conflicts

This commit is contained in:
Brad Warren 2018-05-25 08:15:23 -07:00
commit 93d85920b9
86 changed files with 835 additions and 464 deletions

View file

@ -152,6 +152,7 @@ notifications:
irc:
channels:
- secure: "SGWZl3ownKx9xKVV2VnGt7DqkTmutJ89oJV9tjKhSs84kLijU6EYdPnllqISpfHMTxXflNZuxtGo0wTDYHXBuZL47w1O32W6nzuXdra5zC+i4sYQwYULUsyfOv9gJX8zWAULiK0Z3r0oho45U+FR5ZN6TPCidi8/eGU+EEPwaAw="
on_cancel: never
on_success: never
on_failure: always
use_notice: true

View file

@ -147,9 +147,9 @@ class KeyAuthorizationChallenge(_TokenChallenge):
:param response_cls: Subclass of `KeyAuthorizationChallengeResponse`
that will be used to generate `response`.
:param str typ: type of the challenge
"""
typ = NotImplemented
response_cls = NotImplemented
thumbprint_hash_function = (
KeyAuthorizationChallengeResponse.thumbprint_hash_function)

View file

@ -12,7 +12,9 @@ from six.moves import http_client # pylint: disable=import-error
import josepy as jose
import OpenSSL
import re
from requests_toolbelt.adapters.source import SourceAddressAdapter
import requests
from requests.adapters import HTTPAdapter
import sys
from acme import crypto_util
@ -857,9 +859,12 @@ class ClientNetwork(object): # pylint: disable=too-many-instance-attributes
:param bool verify_ssl: Whether to verify certificates on SSL connections.
:param str user_agent: String to send as User-Agent header.
:param float timeout: Timeout for requests.
:param source_address: Optional source address to bind to when making requests.
:type source_address: str or tuple(str, int)
"""
def __init__(self, key, account=None, alg=jose.RS256, verify_ssl=True,
user_agent='acme-python', timeout=DEFAULT_NETWORK_TIMEOUT):
user_agent='acme-python', timeout=DEFAULT_NETWORK_TIMEOUT,
source_address=None):
# pylint: disable=too-many-arguments
self.key = key
self.account = account
@ -869,6 +874,13 @@ class ClientNetwork(object): # pylint: disable=too-many-instance-attributes
self.user_agent = user_agent
self.session = requests.Session()
self._default_timeout = timeout
adapter = HTTPAdapter()
if source_address is not None:
adapter = SourceAddressAdapter(source_address)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
def __del__(self):
# Try to close the session, but don't show exceptions to the
@ -1018,7 +1030,7 @@ class ClientNetwork(object): # pylint: disable=too-many-instance-attributes
if response.headers.get("Content-Type") == DER_CONTENT_TYPE:
debug_content = base64.b64encode(response.content)
else:
debug_content = response.content
debug_content = response.content.decode("utf-8")
logger.debug('Received response:\nHTTP %d\n%s\n\n%s',
response.status_code,
"\n".join(["{0}: {1}".format(k, v)

View file

@ -1129,6 +1129,31 @@ class ClientNetworkWithMockedResponseTest(unittest.TestCase):
self.assertRaises(requests.exceptions.RequestException,
self.net.post, 'uri', obj=self.obj)
class ClientNetworkSourceAddressBindingTest(unittest.TestCase):
"""Tests that if ClientNetwork has a source IP set manually, the underlying library has
used the provided source address."""
def setUp(self):
self.source_address = "8.8.8.8"
def test_source_address_set(self):
from acme.client import ClientNetwork
net = ClientNetwork(key=None, alg=None, source_address=self.source_address)
for adapter in net.session.adapters.values():
self.assertTrue(self.source_address in adapter.source_address)
def test_behavior_assumption(self):
"""This is a test that guardrails the HTTPAdapter behavior so that if the default for
a Session() changes, the assumptions here aren't violated silently."""
from acme.client import ClientNetwork
# Source address not specified, so the default adapter type should be bound -- this
# test should fail if the default adapter type is changed by requests
net = ClientNetwork(key=None, alg=None)
session = requests.Session()
for scheme in session.adapters.keys():
client_network_adapter = net.session.adapters.get(scheme)
default_adapter = session.adapters.get(scheme)
self.assertEqual(client_network_adapter.__class__, default_adapter.__class__)
if __name__ == '__main__':
unittest.main() # pragma: no cover

View file

@ -12,7 +12,8 @@ import josepy as jose
from acme import errors
# pylint: disable=unused-import, no-name-in-module
from acme.magic_typing import Callable, Text, Union
from acme.magic_typing import Callable, Union, Tuple, Optional
# pylint: enable=unused-import, no-name-in-module
logger = logging.getLogger(__name__)
@ -135,14 +136,23 @@ def probe_sni(name, host, port=443, timeout=300,
socket_kwargs = {'source_address': source_address}
host_protocol_agnostic = None if host == '::' or host == '0' else host
host_protocol_agnostic = host
if host == '::' or host == '0':
# https://github.com/python/typeshed/pull/2136
# while PR is not merged, we need to ignore
host_protocol_agnostic = None
try:
# pylint: disable=star-args
logger.debug("Attempting to connect to %s:%d%s.", host_protocol_agnostic, port,
" from {0}:{1}".format(source_address[0], source_address[1]) if \
socket_kwargs else "")
sock = socket.create_connection((host_protocol_agnostic, port), **socket_kwargs)
logger.debug(
"Attempting to connect to %s:%d%s.", host_protocol_agnostic, port,
" from {0}:{1}".format(
source_address[0],
source_address[1]
) if socket_kwargs else ""
)
socket_tuple = (host_protocol_agnostic, port) # type: Tuple[Optional[str], int]
sock = socket.create_connection(socket_tuple, **socket_kwargs) # type: ignore
except socket.error as error:
raise errors.Error(error)

View file

@ -8,6 +8,9 @@ class TypingClass(object):
try:
# mypy doesn't respect modifying sys.modules
from typing import * # pylint: disable=wildcard-import, unused-wildcard-import
from typing import * # pylint: disable=wildcard-import, unused-wildcard-import
# pylint: disable=unused-import
from typing import Collection, IO # type: ignore
# pylint: enable=unused-import
except ImportError:
sys.modules[__name__] = TypingClass()

View file

@ -145,6 +145,7 @@ STATUS_PROCESSING = Status('processing')
STATUS_VALID = Status('valid')
STATUS_INVALID = Status('invalid')
STATUS_REVOKED = Status('revoked')
STATUS_READY = Status('ready')
class IdentifierType(_Constant):
@ -284,7 +285,7 @@ class Registration(ResourceBody):
if phone is not None:
details.append(cls.phone_prefix + phone)
if email is not None:
details.append(cls.email_prefix + email)
details.extend([cls.email_prefix + mail for mail in email.split(',')])
kwargs['contact'] = tuple(details)
return cls(**kwargs)

View file

@ -83,9 +83,22 @@ class BaseDualNetworkedServers(object):
new_address = (server_address[0],) + (port,) + server_address[2:]
new_args = (new_address,) + remaining_args
server = ServerClass(*new_args, **kwargs) # pylint: disable=star-args
except socket.error:
logger.debug("Failed to bind to %s:%s using %s", new_address[0],
logger.debug(
"Successfully bound to %s:%s using %s", new_address[0],
new_address[1], "IPv6" if ip_version else "IPv4")
except socket.error:
if self.servers:
# Already bound using IPv6.
logger.debug(
"Certbot wasn't able to bind to %s:%s using %s, this " +
"is often expected due to the dual stack nature of " +
"IPv6 socket implementations.",
new_address[0], new_address[1],
"IPv6" if ip_version else "IPv4")
else:
logger.debug(
"Failed to bind to %s:%s using %s", new_address[0],
new_address[1], "IPv6" if ip_version else "IPv4")
else:
self.servers.append(server)
# If two servers are set up and port 0 was passed in, ensure we always

View file

@ -19,6 +19,7 @@ install_requires = [
'pyrfc3339',
'pytz',
'requests[security]>=2.4.1', # security extras added in 2.4.1
'requests-toolbelt>=0.3.0',
'setuptools',
'six>=1.9.0', # needed for python_2_unicode_compatible
]

View file

@ -13,11 +13,13 @@ import zope.component
import zope.interface
from acme import challenges
from acme.magic_typing import DefaultDict, Dict, List, Set # pylint: disable=unused-import, no-name-in-module
from certbot import errors
from certbot import interfaces
from certbot import util
from certbot.achallenges import KeyAuthorizationAnnotatedChallenge # pylint: disable=unused-import
from certbot.plugins import common
from certbot.plugins.util import path_surgery
@ -150,14 +152,14 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
super(ApacheConfigurator, self).__init__(*args, **kwargs)
# Add name_server association dict
self.assoc = dict()
self.assoc = dict() # type: Dict[str, obj.VirtualHost]
# Outstanding challenges
self._chall_out = set()
self._chall_out = set() # type: Set[KeyAuthorizationAnnotatedChallenge]
# List of vhosts configured per wildcard domain on this run.
# used by deploy_cert() and enhance()
self._wildcard_vhosts = dict()
self._wildcard_vhosts = dict() # type: Dict[str, List[obj.VirtualHost]]
# Maps enhancements to vhosts we've enabled the enhancement for
self._enhanced_vhosts = defaultdict(set)
self._enhanced_vhosts = defaultdict(set) # type: DefaultDict[str, Set[obj.VirtualHost]]
# These will be set in the prepare function
self.parser = None
@ -659,7 +661,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
:rtype: set
"""
all_names = set()
all_names = set() # type: Set[str]
vhost_macro = []
@ -800,8 +802,8 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
"""
# Search base config, and all included paths for VirtualHosts
file_paths = {}
internal_paths = defaultdict(set)
file_paths = {} # type: Dict[str, str]
internal_paths = defaultdict(set) # type: DefaultDict[str, Set[str]]
vhs = []
# Make a list of parser paths because the parser_paths
# dictionary may be modified during the loop.
@ -1239,7 +1241,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
if not self.parser.parsed_in_current(ssl_fp):
self.parser.parse_file(ssl_fp)
except IOError:
logger.fatal("Error writing/reading to file in make_vhost_ssl")
logger.critical("Error writing/reading to file in make_vhost_ssl", exc_info=True)
raise errors.PluginError("Unable to write/read in make_vhost_ssl")
if sift:
@ -1327,7 +1329,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
try:
span_val = self.aug.span(vhost.path)
except ValueError:
logger.fatal("Error while reading the VirtualHost %s from "
logger.critical("Error while reading the VirtualHost %s from "
"file %s", vhost.name, vhost.filep, exc_info=True)
raise errors.PluginError("Unable to read VirtualHost from file")
span_filep = span_val[0]
@ -1770,7 +1772,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
# There can be other RewriteRule directive lines in vhost config.
# rewrite_args_dict keys are directive ids and the corresponding value
# for each is a list of arguments to that directive.
rewrite_args_dict = defaultdict(list)
rewrite_args_dict = defaultdict(list) # type: DefaultDict[str, List[str]]
pat = r'(.*directive\[\d+\]).*'
for match in rewrite_path:
m = re.match(pat, match)
@ -1864,7 +1866,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
if ssl_vhost.aliases:
serveralias = "ServerAlias " + " ".join(ssl_vhost.aliases)
rewrite_rule_args = []
rewrite_rule_args = [] # type: List[str]
if self.get_version() >= (2, 3, 9):
rewrite_rule_args = constants.REWRITE_HTTPS_ARGS_WITH_END
else:

View file

@ -2,9 +2,10 @@
import logging
import os
from acme.magic_typing import Set # pylint: disable=unused-import, no-name-in-module
from certbot import errors
from certbot.plugins import common
from certbot_apache.obj import VirtualHost # pylint: disable=unused-import
logger = logging.getLogger(__name__)
@ -51,7 +52,7 @@ class ApacheHttp01(common.TLSSNI01):
self.challenge_dir = os.path.join(
self.configurator.config.work_dir,
"http_challenges")
self.moded_vhosts = set()
self.moded_vhosts = set() # type: Set[VirtualHost]
def perform(self):
"""Perform all HTTP-01 challenges."""

View file

@ -1,6 +1,7 @@
"""Module contains classes used by the Apache Configurator."""
import re
from acme.magic_typing import Set # pylint: disable=unused-import, no-name-in-module
from certbot.plugins import common
@ -140,7 +141,7 @@ class VirtualHost(object): # pylint: disable=too-few-public-methods
def get_names(self):
"""Return a set of all names."""
all_names = set()
all_names = set() # type: Set[str]
all_names.update(self.aliases)
# Strip out any scheme:// and <port> field from servername
if self.name is not None:
@ -251,7 +252,7 @@ class VirtualHost(object): # pylint: disable=too-few-public-methods
# already_found acts to keep everything very conservative.
# Don't allow multiple ip:ports in same set.
already_found = set()
already_found = set() # type: Set[str]
for addr in vhost.addrs:
for local_addr in self.addrs:

View file

@ -47,10 +47,10 @@ class CentOSParser(parser.ApacheParser):
self.sysconfig_filep = "/etc/sysconfig/httpd"
super(CentOSParser, self).__init__(*args, **kwargs)
def update_runtime_variables(self, *args, **kwargs):
def update_runtime_variables(self):
""" Override for update_runtime_variables for custom parsing """
# Opportunistic, works if SELinux not enforced
super(CentOSParser, self).update_runtime_variables(*args, **kwargs)
super(CentOSParser, self).update_runtime_variables()
self.parse_sysconfig_var()
def parse_sysconfig_var(self):

View file

@ -9,6 +9,7 @@ import sys
import six
from acme.magic_typing import Dict, List, Set # pylint: disable=unused-import, no-name-in-module
from certbot import errors
logger = logging.getLogger(__name__)
@ -38,9 +39,9 @@ class ApacheParser(object):
# issues with aug.load() after adding new files / defines to parse tree
self.configurator = configurator
self.modules = set()
self.parser_paths = {}
self.variables = {}
self.modules = set() # type: Set[str]
self.parser_paths = {} # type: Dict[str, List[str]]
self.variables = {} # type: Dict[str, str]
self.aug = aug
# Find configuration root and make sure augeas can parse it.
@ -119,7 +120,7 @@ class ApacheParser(object):
the iteration issue. Else... parse and enable mods at same time.
"""
mods = set()
mods = set() # type: Set[str]
matches = self.find_dir("LoadModule")
iterator = iter(matches)
# Make sure prev_size != cur_size for do: while: iteration
@ -408,7 +409,7 @@ class ApacheParser(object):
else:
arg_suffix = "/*[self::arg=~regexp('%s')]" % case_i(arg)
ordered_matches = []
ordered_matches = [] # type: List[str]
# TODO: Wildcards should be included in alphabetical order
# https://httpd.apache.org/docs/2.4/mod/core.html#include

View file

@ -353,14 +353,11 @@ class MultipleVhostsTest(util.ApacheTest):
self.config.parser.find_dir = mock_find_dir
mock_add.reset_mock()
self.config._add_dummy_ssl_directives(self.vh_truth[0]) # pylint: disable=protected-access
tried_to_add = []
for a in mock_add.call_args_list:
tried_to_add.append(a[0][1] == "Include" and
a[0][2] == self.config.mod_ssl_conf)
# Include shouldn't be added, as patched find_dir "finds" existing one
self.assertFalse(any(tried_to_add))
if a[0][1] == "Include" and a[0][2] == self.config.mod_ssl_conf:
self.fail("Include shouldn't be added, as patched find_dir 'finds' existing one") \
# pragma: no cover
def test_deploy_cert(self):
self.config.parser.modules.add("ssl_module")

View file

@ -4,12 +4,12 @@ import os
import unittest
from acme import challenges
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
from certbot import achallenges
from certbot import errors
from certbot.tests import acme_util
from certbot_apache.tests import util
@ -23,7 +23,7 @@ class ApacheHttp01Test(util.ApacheTest):
super(ApacheHttp01Test, self).setUp(*args, **kwargs)
self.account_key = self.rsa512jwk
self.achalls = []
self.achalls = [] # type: List[achallenges.KeyAuthorizationAnnotatedChallenge]
vh_truth = util.get_vh_truth(
self.temp_dir, "debian_apache_2_4/multiple_vhosts")
# Takes the vhosts for encryption-example.demo, certbot.demo, and

View file

@ -87,7 +87,6 @@ class ParserTest(ApacheTest):
def get_apache_configurator( # pylint: disable=too-many-arguments, too-many-locals
config_path, vhost_path,
config_dir, work_dir, version=(2, 4, 7),
conf=None,
os_info="generic",
conf_vhost_path=None):
"""Create an Apache Configurator with the specified options.
@ -133,10 +132,6 @@ def get_apache_configurator( # pylint: disable=too-many-arguments, too-many-loc
config_class = configurator.ApacheConfigurator
config = config_class(config=mock_le_config, name="apache",
version=version)
# This allows testing scripts to set it a bit more
# quickly
if conf is not None:
config.conf = conf # pragma: no cover
config.prepare()
return config

View file

@ -3,6 +3,7 @@
import os
import logging
from acme.magic_typing import Set # pylint: disable=unused-import, no-name-in-module
from certbot.plugins import common
from certbot.errors import PluginError, MissingCommandlineFlag
@ -93,7 +94,7 @@ class ApacheTlsSni01(common.TLSSNI01):
:rtype: set
"""
addrs = set()
addrs = set() # type: Set[obj.Addr]
config_text = "<IfModule mod_ssl.c>\n"
for achall in self.achalls:

View file

@ -1,2 +1,2 @@
acme[dev]==0.21.1
-e acme[dev]
certbot[dev]==0.21.1

View file

@ -9,7 +9,7 @@ version = '0.25.0.dev0'
# Remember to update local-oldest-requirements.txt when changing the minimum
# acme/certbot version.
install_requires = [
'acme>=0.21.1',
'acme>0.24.0',
'certbot>=0.21.1',
'mock',
'python-augeas',

View file

@ -15,6 +15,7 @@ from six.moves import xrange # pylint: disable=import-error,redefined-builtin
from acme import challenges
from acme import crypto_util
from acme import messages
from acme.magic_typing import List, Tuple # pylint: disable=unused-import, no-name-in-module
from certbot import achallenges
from certbot import errors as le_errors
from certbot.tests import acme_util
@ -52,9 +53,8 @@ def test_authenticator(plugin, config, temp_dir):
try:
responses = plugin.perform(achalls)
except le_errors.Error as error:
logger.error("Performing challenges on %s caused an error:", config)
logger.exception(error)
except le_errors.Error:
logger.error("Performing challenges on %s caused an error:", config, exc_info=True)
return False
success = True
@ -82,9 +82,8 @@ def test_authenticator(plugin, config, temp_dir):
if success:
try:
plugin.cleanup(achalls)
except le_errors.Error as error:
logger.error("Challenge cleanup for %s caused an error:", config)
logger.exception(error)
except le_errors.Error:
logger.error("Challenge cleanup for %s caused an error:", config, exc_info=True)
success = False
if _dirs_are_unequal(config, backup):
@ -147,9 +146,8 @@ def test_deploy_cert(plugin, temp_dir, domains):
try:
plugin.deploy_cert(domain, cert_path, util.KEY_PATH, cert_path, cert_path)
plugin.save() # Needed by the Apache plugin
except le_errors.Error as error:
logger.error("**** Plugin failed to deploy certificate for %s:", domain)
logger.exception(error)
except le_errors.Error:
logger.error("**** Plugin failed to deploy certificate for %s:", domain, exc_info=True)
return False
if not _save_and_restart(plugin, "deployed"):
@ -179,7 +177,7 @@ def test_enhancements(plugin, domains):
"enhancements")
return False
domains_and_info = [(domain, []) for domain in domains]
domains_and_info = [(domain, []) for domain in domains] # type: List[Tuple[str, List[bool]]]
for domain, info in domains_and_info:
try:
@ -192,10 +190,9 @@ def test_enhancements(plugin, domains):
# Don't immediately fail because a redirect may already be enabled
logger.warning("*** Plugin failed to enable redirect for %s:", domain)
logger.warning("%s", error)
except le_errors.Error as error:
except le_errors.Error:
logger.error("*** An error occurred while enabling redirect for %s:",
domain)
logger.exception(error)
domain, exc_info=True)
if not _save_and_restart(plugin, "enhanced"):
return False
@ -222,9 +219,8 @@ def _save_and_restart(plugin, title=None):
plugin.save(title)
plugin.restart()
return True
except le_errors.Error as error:
logger.error("*** Plugin failed to save and restart server:")
logger.exception(error)
except le_errors.Error:
logger.error("*** Plugin failed to save and restart server:", exc_info=True)
return False
@ -232,9 +228,8 @@ def test_rollback(plugin, config, backup):
"""Tests the rollback checkpoints function"""
try:
plugin.rollback_checkpoints(1337)
except le_errors.Error as error:
logger.error("*** Plugin raised an exception during rollback:")
logger.exception(error)
except le_errors.Error:
logger.error("*** Plugin raised an exception during rollback:", exc_info=True)
return False
if _dirs_are_unequal(config, backup):
@ -263,21 +258,21 @@ def _dirs_are_unequal(dir1, dir2):
logger.error("The following files and directories are only "
"present in one directory")
if dircmp.left_only:
logger.error(dircmp.left_only)
logger.error(str(dircmp.left_only))
else:
logger.error(dircmp.right_only)
logger.error(str(dircmp.right_only))
return True
elif dircmp.common_funny or dircmp.funny_files:
logger.error("The following files and directories could not be "
"compared:")
if dircmp.common_funny:
logger.error(dircmp.common_funny)
logger.error(str(dircmp.common_funny))
else:
logger.error(dircmp.funny_files)
logger.error(str(dircmp.funny_files))
return True
elif dircmp.diff_files:
logger.error("The following files differ:")
logger.error(dircmp.diff_files)
logger.error(str(dircmp.diff_files))
return True
for subdir in dircmp.subdirs.itervalues():
@ -354,9 +349,8 @@ def main():
success = test_authenticator(plugin, config, temp_dir)
if success and args.install:
success = test_installer(args, plugin, config, temp_dir)
except errors.Error as error:
logger.error("Tests on %s raised:", config)
logger.exception(error)
except errors.Error:
logger.error("Tests on %s raised:", config, exc_info=True)
success = False
if success:

View file

@ -26,12 +26,12 @@ def create_le_config(parent_dir):
config = copy.deepcopy(constants.CLI_DEFAULTS)
le_dir = os.path.join(parent_dir, "certbot")
config["config_dir"] = os.path.join(le_dir, "config")
config["work_dir"] = os.path.join(le_dir, "work")
config["logs_dir"] = os.path.join(le_dir, "logs_dir")
os.makedirs(config["config_dir"])
os.mkdir(config["work_dir"])
os.mkdir(config["logs_dir"])
os.mkdir(le_dir)
for dir_name in ("config", "logs", "work"):
full_path = os.path.join(le_dir, dir_name)
os.mkdir(full_path)
full_name = dir_name + "_dir"
config[full_name] = full_path
config["domains"] = None

View file

@ -33,7 +33,7 @@ class Validator(object):
try:
presented_cert = crypto_util.probe_sni(name, host, port)
except acme_errors.Error as error:
logger.exception(error)
logger.exception(str(error))
return False
return presented_cert.digest("sha256") == cert.digest("sha256")
@ -86,8 +86,7 @@ class Validator(object):
return False
try:
_, max_age_value = max_age[0]
max_age_value = int(max_age_value)
max_age_value = int(max_age[0][1])
except ValueError:
logger.error("Server responded with invalid HSTS header field")
return False

View file

@ -11,6 +11,8 @@ from certbot import errors
from certbot import interfaces
from certbot.plugins import dns_common
from acme.magic_typing import DefaultDict, List, Dict # pylint: disable=unused-import, no-name-in-module
logger = logging.getLogger(__name__)
INSTRUCTIONS = (
@ -34,7 +36,7 @@ class Authenticator(dns_common.DNSAuthenticator):
def __init__(self, *args, **kwargs):
super(Authenticator, self).__init__(*args, **kwargs)
self.r53 = boto3.client("route53")
self._resource_records = collections.defaultdict(list)
self._resource_records = collections.defaultdict(list) # type: DefaultDict[str, List[Dict[str, str]]]
def more_info(self): # pylint: disable=missing-docstring,no-self-use
return "Solve a DNS01 challenge using AWS Route53"

View file

@ -1,2 +1,2 @@
acme[dev]==0.21.1
-e acme[dev]
certbot[dev]==0.21.1

View file

@ -8,7 +8,7 @@ version = '0.25.0.dev0'
# Remember to update local-oldest-requirements.txt when changing the minimum
# acme/certbot version.
install_requires = [
'acme>=0.21.1',
'acme>0.24.0',
'certbot>=0.21.1',
'boto3',
'mock',

View file

@ -28,6 +28,9 @@ from certbot_nginx import nginxparser
from certbot_nginx import parser
from certbot_nginx import tls_sni_01
from certbot_nginx import http_01
from certbot_nginx import obj # pylint: disable=unused-import
from acme.magic_typing import List, Dict, Set # pylint: disable=unused-import, no-name-in-module
logger = logging.getLogger(__name__)
@ -98,8 +101,8 @@ class NginxConfigurator(common.Installer):
# List of vhosts configured per wildcard domain on this run.
# used by deploy_cert() and enhance()
self._wildcard_vhosts = {}
self._wildcard_redirect_vhosts = {}
self._wildcard_vhosts = {} # type: Dict[str, List[obj.VirtualHost]]
self._wildcard_redirect_vhosts = {} # type: Dict[str, List[obj.VirtualHost]]
# Add number of outstanding challenges
self._chall_out = 0
@ -293,7 +296,7 @@ class NginxConfigurator(common.Installer):
("Cannot find a VirtualHost matching domain %s. "
"In order for Certbot to correctly perform the challenge "
"please add a corresponding server_name directive to your "
"nginx configuration: "
"nginx configuration for every domain on your certificate: "
"https://nginx.org/en/docs/http/server_names.html") % (target_name))
# Note: if we are enhancing with ocsp, vhost should already be ssl.
for vhost in vhosts:
@ -331,7 +334,7 @@ class NginxConfigurator(common.Installer):
def _vhost_from_duplicated_default(self, domain, port=None):
if self.new_vhost is None:
default_vhost = self._get_default_vhost(port)
default_vhost = self._get_default_vhost(port, domain)
self.new_vhost = self.parser.duplicate_vhost(default_vhost,
remove_singleton_listen_params=True)
self.new_vhost.names = set()
@ -347,7 +350,7 @@ class NginxConfigurator(common.Installer):
name_block[0].append(name)
self.parser.update_or_add_server_directives(vhost, name_block)
def _get_default_vhost(self, port):
def _get_default_vhost(self, port, domain):
vhost_list = self.parser.get_vhosts()
# if one has default_server set, return that one
default_vhosts = []
@ -364,7 +367,7 @@ class NginxConfigurator(common.Installer):
# TODO: present a list of vhosts for user to choose from
raise errors.MisconfigurationError("Could not automatically find a matching server"
" block. Set the `server_name` directive to use the Nginx installer.")
" block for %s. Set the `server_name` directive to use the Nginx installer." % domain)
def _get_ranked_matches(self, target_name):
"""Returns a ranked list of vhosts that match target_name.
@ -528,7 +531,7 @@ class NginxConfigurator(common.Installer):
:rtype: set
"""
all_names = set()
all_names = set() # type: Set[str]
for vhost in self.parser.get_vhosts():
all_names.update(vhost.names)
@ -824,7 +827,7 @@ class NginxConfigurator(common.Installer):
self.parser.add_server_directives(vhost,
stapling_directives)
except errors.MisconfigurationError as error:
logger.debug(error)
logger.debug(str(error))
raise errors.PluginError("An error occurred while enabling OCSP "
"stapling for {0}.".format(vhost.names))
@ -892,7 +895,7 @@ class NginxConfigurator(common.Installer):
universal_newlines=True)
text = proc.communicate()[1] # nginx prints output to stderr
except (OSError, ValueError) as error:
logger.debug(error, exc_info=True)
logger.debug(str(error), exc_info=True)
raise errors.PluginError(
"Unable to run %s -V" % self.conf('ctl'))

View file

@ -1,9 +1,14 @@
"""nginx plugin constants."""
import pkg_resources
import platform
if platform.system() in ('FreeBSD', 'Darwin'):
server_root_tmp = "/usr/local/etc/nginx"
else:
server_root_tmp = "/etc/nginx"
CLI_DEFAULTS = dict(
server_root="/etc/nginx",
server_root=server_root_tmp,
ctl="nginx",
)
"""CLI defaults."""

View file

@ -10,6 +10,7 @@ from certbot.plugins import common
from certbot_nginx import obj
from certbot_nginx import nginxparser
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
logger = logging.getLogger(__name__)
@ -113,7 +114,7 @@ class NginxHttp01(common.ChallengePerformer):
:returns: list of :class:`certbot_nginx.obj.Addr` to apply
:rtype: list
"""
addresses = []
addresses = [] # type: List[obj.Addr]
default_addr = "%s" % self.configurator.config.http01_port
ipv6_addr = "[::]:{0}".format(
self.configurator.config.http01_port)

View file

@ -248,7 +248,7 @@ class UnspacedList(list):
"""Recurse through the parse tree to figure out if any sublists are dirty"""
if self.dirty:
return True
return any((isinstance(x, list) and x.is_dirty() for x in self))
return any((isinstance(x, UnspacedList) and x.is_dirty() for x in self))
def _spaced_position(self, idx):
"Convert from indexes in the unspaced list to positions in the spaced one"

View file

@ -13,7 +13,7 @@ from certbot import errors
from certbot_nginx import obj
from certbot_nginx import nginxparser
from acme.magic_typing import Union, Dict, Set, Any, List, Tuple # pylint: disable=unused-import, no-name-in-module
logger = logging.getLogger(__name__)
@ -28,7 +28,7 @@ class NginxParser(object):
"""
def __init__(self, root):
self.parsed = {}
self.parsed = {} # type: Dict[str, Union[List, nginxparser.UnspacedList]]
self.root = os.path.abspath(root)
self.config_root = self._find_config_root()
@ -90,7 +90,7 @@ class NginxParser(object):
"""
servers = self._get_raw_servers()
addr_to_ssl = {}
addr_to_ssl = {} # type: Dict[Tuple[str, str], bool]
for filename in servers:
for server, _ in servers[filename]:
# Parse the server block to save addr info
@ -104,9 +104,10 @@ class NginxParser(object):
def _get_raw_servers(self):
# pylint: disable=cell-var-from-loop
# type: () -> Dict
"""Get a map of unparsed all server blocks
"""
servers = {}
servers = {} # type: Dict[str, Union[List, nginxparser.UnspacedList]]
for filename in self.parsed:
tree = self.parsed[filename]
servers[filename] = []
@ -727,9 +728,9 @@ def _parse_server_raw(server):
:rtype: dict
"""
parsed_server = {'addrs': set(),
'ssl': False,
'names': set()}
addrs = set() # type: Set[obj.Addr]
ssl = False # type: bool
names = set() # type: Set[str]
apply_ssl_to_all_addrs = False
@ -739,17 +740,21 @@ def _parse_server_raw(server):
if directive[0] == 'listen':
addr = obj.Addr.fromstring(" ".join(directive[1:]))
if addr:
parsed_server['addrs'].add(addr)
addrs.add(addr)
if addr.ssl:
parsed_server['ssl'] = True
ssl = True
elif directive[0] == 'server_name':
parsed_server['names'].update(x.strip('"\'') for x in directive[1:])
names.update(x.strip('"\'') for x in directive[1:])
elif _is_ssl_on_directive(directive):
parsed_server['ssl'] = True
ssl = True
apply_ssl_to_all_addrs = True
if apply_ssl_to_all_addrs:
for addr in parsed_server['addrs']:
for addr in addrs:
addr.ssl = True
return parsed_server
return {
'addrs': addrs,
'ssl': ssl,
'names': names
}

View file

@ -11,6 +11,7 @@ from certbot_nginx import nginxparser
from certbot_nginx import obj
from certbot_nginx import parser
from certbot_nginx.tests import util
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
class NginxParserTest(util.NginxTest): #pylint: disable=too-many-public-methods
@ -99,7 +100,7 @@ class NginxParserTest(util.NginxTest): #pylint: disable=too-many-public-methods
([[[0], [3], [4]], [[5], [3], [0]]], [])]
for mylist, result in mylists:
paths = []
paths = [] # type: List[List[int]]
parser._do_for_subarray(mylist,
lambda x: isinstance(x, list) and
len(x) >= 1 and

View file

@ -8,7 +8,9 @@ import zope.component
from acme import challenges
from acme import messages
# pylint: disable=unused-import, no-name-in-module
from acme.magic_typing import DefaultDict, Dict, List, Set, Collection
# pylint: enable=unused-import, no-name-in-module
from certbot import achallenges
from certbot import errors
from certbot import error_handler
@ -117,7 +119,7 @@ class AuthHandler(object):
def _solve_challenges(self, aauthzrs):
"""Get Responses for challenges from authenticators."""
resp = []
resp = [] # type: Collection[acme.challenges.ChallengeResponse]
all_achalls = self._get_all_achalls(aauthzrs)
try:
if all_achalls:
@ -133,10 +135,9 @@ class AuthHandler(object):
def _get_all_achalls(self, aauthzrs):
"""Return all active challenges."""
all_achalls = []
all_achalls = [] # type: Collection[challenges.ChallengeResponse]
for aauthzr in aauthzrs:
all_achalls.extend(aauthzr.achalls)
return all_achalls
def _respond(self, aauthzrs, resp, best_effort):
@ -146,7 +147,8 @@ class AuthHandler(object):
"""
# TODO: chall_update is a dirty hack to get around acme-spec #105
chall_update = dict()
chall_update = dict() \
# type: Dict[int, List[achallenges.KeyAuthorizationAnnotatedChallenge]]
self._send_responses(aauthzrs, resp, chall_update)
# Check for updated status...
@ -198,7 +200,7 @@ class AuthHandler(object):
while indices_to_check and rounds < max_rounds:
# TODO: Use retry-after...
time.sleep(min_sleep)
all_failed_achalls = set()
all_failed_achalls = set() # type: Set[achallenges.KeyAuthorizationAnnotatedChallenge]
for index in indices_to_check:
comp_achalls, failed_achalls = self._handle_check(
aauthzrs, index, chall_update[index])
@ -424,7 +426,7 @@ def _find_smart_path(challbs, preferences, combinations):
# max_cost is now equal to sum(indices) + 1
best_combo = []
best_combo = None
# Set above completing all of the available challenges
best_combo_cost = max_cost
@ -479,7 +481,7 @@ def _report_no_chall_path(challbs):
msg += (
" You may need to use an authenticator "
"plugin that can do challenges over DNS.")
logger.fatal(msg)
logger.critical(msg)
raise errors.AuthorizationError(msg)
@ -522,11 +524,11 @@ def _report_failed_challs(failed_achalls):
:class:`certbot.achallenges.AnnotatedChallenge`.
"""
problems = dict()
problems = collections.defaultdict(list)\
# type: DefaultDict[str, List[achallenges.KeyAuthorizationAnnotatedChallenge]]
for achall in failed_achalls:
if achall.error:
problems.setdefault(achall.error.typ, []).append(achall)
problems[achall.error.typ].append(achall)
reporter = zope.component.getUtility(interfaces.IReporter)
for achalls in six.itervalues(problems):
reporter.add_message(

View file

@ -7,6 +7,7 @@ import re
import traceback
import zope.component
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
from certbot import crypto_util
from certbot import errors
from certbot import interfaces
@ -226,7 +227,7 @@ def match_and_check_overlaps(cli_config, acceptable_matches, match_func, rv_func
def find_matches(candidate_lineage, return_value, acceptable_matches):
"""Returns a list of matches using _search_lineages."""
acceptable_matches = [func(candidate_lineage) for func in acceptable_matches]
acceptable_matches_rv = []
acceptable_matches_rv = [] # type: List[str]
for item in acceptable_matches:
if isinstance(item, list):
acceptable_matches_rv += item
@ -340,7 +341,7 @@ def _report_human_readable(config, parsed_certs):
def _describe_certs(config, parsed_certs, parse_failures):
"""Print information about the certs we know about"""
out = []
out = [] # type: List[str]
notify = out.append

View file

@ -12,10 +12,14 @@ import sys
import configargparse
import six
import zope.component
import zope.interface
from zope.interface import interfaces as zope_interfaces
from acme import challenges
# pylint: disable=unused-import, no-name-in-module
from acme.magic_typing import Any, Dict, Optional
# pylint: enable=unused-import, no-name-in-module
import certbot
@ -33,7 +37,7 @@ import certbot.plugins.selection as plugin_selection
logger = logging.getLogger(__name__)
# Global, to save us from a lot of argument passing within the scope of this module
helpful_parser = None
helpful_parser = None # type: Optional[HelpfulArgumentParser]
# For help strings, figure out how the user ran us.
# When invoked from letsencrypt-auto, sys.argv[0] is something like:
@ -196,17 +200,17 @@ def set_by_cli(var):
(CLI or config file) including if the user explicitly set it to the
default. Returns False if the variable was assigned a default value.
"""
detector = set_by_cli.detector
if detector is None:
detector = set_by_cli.detector # type: ignore
if detector is None and helpful_parser is not None:
# Setup on first run: `detector` is a weird version of config in which
# the default value of every attribute is wrangled to be boolean-false
plugins = plugins_disco.PluginsRegistry.find_all()
# reconstructed_args == sys.argv[1:], or whatever was passed to main()
reconstructed_args = helpful_parser.args + [helpful_parser.verb]
detector = set_by_cli.detector = prepare_and_parse_args(
detector = set_by_cli.detector = prepare_and_parse_args( # type: ignore
plugins, reconstructed_args, detect_defaults=True)
# propagate plugin requests: eg --standalone modifies config.authenticator
detector.authenticator, detector.installer = (
detector.authenticator, detector.installer = ( # type: ignore
plugin_selection.cli_plugin_requests(detector))
if not isinstance(getattr(detector, var), _Default):
@ -220,7 +224,10 @@ def set_by_cli(var):
return True
return False
# static housekeeping var
# functions attributed are not supported by mypy
# https://github.com/python/mypy/issues/2087
set_by_cli.detector = None # type: ignore
@ -236,8 +243,10 @@ def has_default_value(option, value):
:rtype: bool
"""
return (option in helpful_parser.defaults and
helpful_parser.defaults[option] == value)
if helpful_parser is not None:
return (option in helpful_parser.defaults and
helpful_parser.defaults[option] == value)
return False
def option_was_set(option, value):
@ -254,11 +263,12 @@ def option_was_set(option, value):
def argparse_type(variable):
"Return our argparse type function for a config variable (default: str)"
"""Return our argparse type function for a config variable (default: str)"""
# pylint: disable=protected-access
for action in helpful_parser.parser._actions:
if action.type is not None and action.dest == variable:
return action.type
if helpful_parser is not None:
for action in helpful_parser.parser._actions:
if action.type is not None and action.dest == variable:
return action.type
return str
def read_file(filename, mode="rb"):
@ -291,10 +301,12 @@ def flag_default(name):
def config_help(name, hidden=False):
"""Extract the help message for an `.IConfig` attribute."""
# pylint: disable=no-member
if hidden:
return argparse.SUPPRESS
else:
return interfaces.IConfig[name].__doc__
field = interfaces.IConfig.__getitem__(name) # type: zope.interface.interface.Attribute
return field.__doc__
class HelpfulArgumentGroup(object):
@ -418,7 +430,7 @@ VERB_HELP = [
}),
("enhance", {
"short": "Add security enhancements to your existing configuration",
"opts": ("Helps to harden the TLS configration by adding security enhancements "
"opts": ("Helps to harden the TLS configuration by adding security enhancements "
"to already existing configuration."),
"usage": "\n\n certbot enhance [options]\n\n"
}),
@ -473,7 +485,7 @@ class HelpfulArgumentParser(object):
HELP_TOPICS += list(self.VERBS) + self.COMMANDS_TOPICS + ["manage"]
plugin_names = list(plugins)
self.help_topics = HELP_TOPICS + plugin_names + [None]
self.help_topics = HELP_TOPICS + plugin_names + [None] # type: ignore
self.detect_defaults = detect_defaults
self.args = args
@ -492,8 +504,11 @@ class HelpfulArgumentParser(object):
short_usage = self._usage_string(plugins, self.help_arg)
self.visible_topics = self.determine_help_topics(self.help_arg)
self.groups = {} # elements are added by .add_group()
self.defaults = {} # elements are added by .parse_args()
# elements are added by .add_group()
self.groups = {} # type: Dict[str, argparse._ArgumentGroup]
# elements are added by .parse_args()
self.defaults = {} # type: Dict[str, Any]
self.parser = configargparse.ArgParser(
prog="certbot",
@ -805,7 +820,6 @@ class HelpfulArgumentParser(object):
if self.help_arg:
for v in verbs:
self.groups[topic].add_argument(v, help=VERB_HELP_MAP[v]["short"])
return HelpfulArgumentGroup(self, topic)
def add_plugin_args(self, plugins):
@ -1296,14 +1310,14 @@ def _paths_parser(helpful):
verb = helpful.help_arg
cph = "Path to where certificate is saved (with auth --csr), installed from, or revoked."
section = ["paths", "install", "revoke", "certonly", "manage"]
sections = ["paths", "install", "revoke", "certonly", "manage"]
if verb == "certonly":
add(section, "--cert-path", type=os.path.abspath,
add(sections, "--cert-path", type=os.path.abspath,
default=flag_default("auth_cert_path"), help=cph)
elif verb == "revoke":
add(section, "--cert-path", type=read_file, required=True, help=cph)
add(sections, "--cert-path", type=read_file, required=True, help=cph)
else:
add(section, "--cert-path", type=os.path.abspath, help=cph)
add(sections, "--cert-path", type=os.path.abspath, help=cph)
section = "paths"
if verb in ("install", "revoke"):

View file

@ -5,7 +5,9 @@ import os
import platform
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
# https://github.com/python/typeshed/blob/master/third_party/
# 2/cryptography/hazmat/primitives/asymmetric/rsa.pyi
from cryptography.hazmat.primitives.asymmetric.rsa import generate_private_key # type: ignore
import josepy as jose
import OpenSSL
import zope.component
@ -155,12 +157,16 @@ def register(config, account_storage, tos_cb=None):
if not config.dry_run:
logger.info("Registering without email!")
# If --dry-run is used, and there is no staging account, create one with no email.
if config.dry_run:
config.email = None
# Each new registration shall use a fresh new key
key = jose.JWKRSA(key=jose.ComparableRSAKey(
rsa.generate_private_key(
rsa_key = generate_private_key(
public_exponent=65537,
key_size=config.rsa_key_size,
backend=default_backend())))
backend=default_backend())
key = jose.JWKRSA(key=jose.ComparableRSAKey(rsa_key))
acme = acme_from_config_key(config, key)
# TODO: add phone?
regr = perform_registration(acme, config, tos_cb)
@ -179,8 +185,9 @@ def perform_registration(acme, config, tos_cb):
Actually register new account, trying repeatedly if there are email
problems
:param .IConfig config: Client configuration.
:param acme.client.Client client: ACME client object.
:param .IConfig config: Client configuration.
:param Callable tos_cb: a callback to handle Term of Service agreement.
:returns: Registration Resource.
:rtype: `acme.messages.RegistrationResource`
@ -605,8 +612,10 @@ def validate_key_csr(privkey, csr=None):
if csr.form == "der":
csr_obj = OpenSSL.crypto.load_certificate_request(
OpenSSL.crypto.FILETYPE_ASN1, csr.data)
csr = util.CSR(csr.file, OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, csr_obj), "pem")
cert_buffer = OpenSSL.crypto.dump_certificate_request(
OpenSSL.crypto.FILETYPE_PEM, csr_obj
)
csr = util.CSR(csr.file, cert_buffer, "pem")
# If CSR is provided, it must be readable and valid.
if csr.data and not crypto_util.valid_csr(csr.data):

View file

@ -8,15 +8,23 @@ import hashlib
import logging
import os
import OpenSSL
import pyrfc3339
import six
import zope.component
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric.ec import ECDSA
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey
from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey
# https://github.com/python/typeshed/tree/master/third_party/2/cryptography
from cryptography import x509 # type: ignore
from OpenSSL import crypto
from OpenSSL import SSL # type: ignore
from acme import crypto_util as acme_crypto_util
from acme.magic_typing import IO # pylint: disable=unused-import, no-name-in-module
from certbot import errors
from certbot import interfaces
from certbot import util
@ -47,7 +55,7 @@ def init_save_key(key_size, key_dir, keyname="key-certbot.pem"):
try:
key_pem = make_key(key_size)
except ValueError as err:
logger.exception(err)
logger.error("", exc_info=True)
raise err
config = zope.component.getUtility(interfaces.IConfig)
@ -111,11 +119,11 @@ def valid_csr(csr):
"""
try:
req = OpenSSL.crypto.load_certificate_request(
OpenSSL.crypto.FILETYPE_PEM, csr)
req = crypto.load_certificate_request(
crypto.FILETYPE_PEM, csr)
return req.verify(req.get_pubkey())
except OpenSSL.crypto.Error as error:
logger.debug(error, exc_info=True)
except crypto.Error:
logger.debug("", exc_info=True)
return False
@ -129,13 +137,13 @@ def csr_matches_pubkey(csr, privkey):
:rtype: bool
"""
req = OpenSSL.crypto.load_certificate_request(
OpenSSL.crypto.FILETYPE_PEM, csr)
pkey = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, privkey)
req = crypto.load_certificate_request(
crypto.FILETYPE_PEM, csr)
pkey = crypto.load_privatekey(crypto.FILETYPE_PEM, privkey)
try:
return req.verify(pkey)
except OpenSSL.crypto.Error as error:
logger.debug(error, exc_info=True)
except crypto.Error:
logger.debug("", exc_info=True)
return False
@ -145,26 +153,26 @@ def import_csr_file(csrfile, data):
:param str csrfile: CSR filename
:param str data: contents of the CSR file
:returns: (`OpenSSL.crypto.FILETYPE_PEM`,
:returns: (`crypto.FILETYPE_PEM`,
util.CSR object representing the CSR,
list of domains requested in the CSR)
:rtype: tuple
"""
PEM = OpenSSL.crypto.FILETYPE_PEM
load = OpenSSL.crypto.load_certificate_request
PEM = crypto.FILETYPE_PEM
load = crypto.load_certificate_request
try:
# Try to parse as DER first, then fall back to PEM.
csr = load(OpenSSL.crypto.FILETYPE_ASN1, data)
except OpenSSL.crypto.Error:
csr = load(crypto.FILETYPE_ASN1, data)
except crypto.Error:
try:
csr = load(PEM, data)
except OpenSSL.crypto.Error:
except crypto.Error:
raise errors.Error("Failed to parse CSR file: {0}".format(csrfile))
domains = _get_names_from_loaded_cert_or_req(csr)
# Internally we always use PEM, so re-encode as PEM before returning.
data_pem = OpenSSL.crypto.dump_certificate_request(PEM, csr)
data_pem = crypto.dump_certificate_request(PEM, csr)
return PEM, util.CSR(file=csrfile, data=data_pem, form="pem"), domains
@ -178,9 +186,9 @@ def make_key(bits):
"""
assert bits >= 1024 # XXX
key = OpenSSL.crypto.PKey()
key.generate_key(OpenSSL.crypto.TYPE_RSA, bits)
return OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, bits)
return crypto.dump_privatekey(crypto.FILETYPE_PEM, key)
def valid_privkey(privkey):
@ -193,9 +201,9 @@ def valid_privkey(privkey):
"""
try:
return OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, privkey).check()
except (TypeError, OpenSSL.crypto.Error):
return crypto.load_privatekey(
crypto.FILETYPE_PEM, privkey).check()
except (TypeError, crypto.Error):
return False
@ -224,13 +232,27 @@ def verify_renewable_cert_sig(renewable_cert):
:raises errors.Error: If signature verification fails.
"""
try:
with open(renewable_cert.chain, 'rb') as chain:
chain, _ = pyopenssl_load_certificate(chain.read())
with open(renewable_cert.cert, 'rb') as cert:
cert = x509.load_pem_x509_certificate(cert.read(), default_backend())
hash_name = cert.signature_hash_algorithm.name
OpenSSL.crypto.verify(chain, cert.signature, cert.tbs_certificate_bytes, hash_name)
except (IOError, ValueError, OpenSSL.crypto.Error) as e:
with open(renewable_cert.chain, 'rb') as chain_file: # type: IO[bytes]
chain = x509.load_pem_x509_certificate(chain_file.read(), default_backend())
with open(renewable_cert.cert, 'rb') as cert_file: # type: IO[bytes]
cert = x509.load_pem_x509_certificate(cert_file.read(), default_backend())
pk = chain.public_key()
if isinstance(pk, RSAPublicKey):
# https://github.com/python/typeshed/blob/master/third_party/2/cryptography/hazmat/primitives/asymmetric/rsa.pyi
verifier = pk.verifier( # type: ignore
cert.signature, PKCS1v15(), cert.signature_hash_algorithm
)
verifier.update(cert.tbs_certificate_bytes)
verifier.verify()
elif isinstance(pk, EllipticCurvePublicKey):
verifier = pk.verifier(
cert.signature, ECDSA(cert.signature_hash_algorithm)
)
verifier.update(cert.tbs_certificate_bytes)
verifier.verify()
else:
raise errors.Error("Unsupported public key type")
except (IOError, ValueError, InvalidSignature) as e:
error_str = "verifying the signature of the cert located at {0} has failed. \
Details: {1}".format(renewable_cert.cert, e)
logger.exception(error_str)
@ -246,11 +268,11 @@ def verify_cert_matches_priv_key(cert_path, key_path):
:raises errors.Error: If they don't match.
"""
try:
context = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD)
context = SSL.Context(SSL.SSLv23_METHOD)
context.use_certificate_file(cert_path)
context.use_privatekey_file(key_path)
context.check_privatekey()
except (IOError, OpenSSL.SSL.Error) as e:
except (IOError, SSL.Error) as e:
error_str = "verifying the cert located at {0} matches the \
private key located at {1} has failed. \
Details: {2}".format(cert_path,
@ -267,12 +289,12 @@ def verify_fullchain(renewable_cert):
:raises errors.Error: If cert and chain do not combine to fullchain.
"""
try:
with open(renewable_cert.chain) as chain:
chain = chain.read()
with open(renewable_cert.cert) as cert:
cert = cert.read()
with open(renewable_cert.fullchain) as fullchain:
fullchain = fullchain.read()
with open(renewable_cert.chain) as chain_file: # type: IO[str]
chain = chain_file.read()
with open(renewable_cert.cert) as cert_file: # type: IO[str]
cert = cert_file.read()
with open(renewable_cert.fullchain) as fullchain_file: # type: IO[str]
fullchain = fullchain_file.read()
if (cert + chain) != fullchain:
error_str = "fullchain does not match cert + chain for {0}!"
error_str = error_str.format(renewable_cert.lineagename)
@ -294,43 +316,43 @@ def pyopenssl_load_certificate(data):
openssl_errors = []
for file_type in (OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.FILETYPE_ASN1):
for file_type in (crypto.FILETYPE_PEM, crypto.FILETYPE_ASN1):
try:
return OpenSSL.crypto.load_certificate(file_type, data), file_type
except OpenSSL.crypto.Error as error: # TODO: other errors?
return crypto.load_certificate(file_type, data), file_type
except crypto.Error as error: # TODO: other errors?
openssl_errors.append(error)
raise errors.Error("Unable to load: {0}".format(",".join(
str(error) for error in openssl_errors)))
def _load_cert_or_req(cert_or_req_str, load_func,
typ=OpenSSL.crypto.FILETYPE_PEM):
typ=crypto.FILETYPE_PEM):
try:
return load_func(typ, cert_or_req_str)
except OpenSSL.crypto.Error as error:
logger.exception(error)
except crypto.Error:
logger.error("", exc_info=True)
raise
def _get_sans_from_cert_or_req(cert_or_req_str, load_func,
typ=OpenSSL.crypto.FILETYPE_PEM):
typ=crypto.FILETYPE_PEM):
# pylint: disable=protected-access
return acme_crypto_util._pyopenssl_cert_or_req_san(_load_cert_or_req(
cert_or_req_str, load_func, typ))
def get_sans_from_cert(cert, typ=OpenSSL.crypto.FILETYPE_PEM):
def get_sans_from_cert(cert, typ=crypto.FILETYPE_PEM):
"""Get a list of Subject Alternative Names from a certificate.
:param str cert: Certificate (encoded).
:param typ: `OpenSSL.crypto.FILETYPE_PEM` or `OpenSSL.crypto.FILETYPE_ASN1`
:param typ: `crypto.FILETYPE_PEM` or `crypto.FILETYPE_ASN1`
:returns: A list of Subject Alternative Names.
:rtype: list
"""
return _get_sans_from_cert_or_req(
cert, OpenSSL.crypto.load_certificate, typ)
cert, crypto.load_certificate, typ)
def _get_names_from_cert_or_req(cert_or_req, load_func, typ):
@ -343,24 +365,24 @@ def _get_names_from_loaded_cert_or_req(loaded_cert_or_req):
return acme_crypto_util._pyopenssl_cert_or_req_all_names(loaded_cert_or_req)
def get_names_from_cert(csr, typ=OpenSSL.crypto.FILETYPE_PEM):
def get_names_from_cert(csr, typ=crypto.FILETYPE_PEM):
"""Get a list of domains from a cert, including the CN if it is set.
:param str cert: Certificate (encoded).
:param typ: `OpenSSL.crypto.FILETYPE_PEM` or `OpenSSL.crypto.FILETYPE_ASN1`
:param typ: `crypto.FILETYPE_PEM` or `crypto.FILETYPE_ASN1`
:returns: A list of domain names.
:rtype: list
"""
return _get_names_from_cert_or_req(
csr, OpenSSL.crypto.load_certificate, typ)
csr, crypto.load_certificate, typ)
def dump_pyopenssl_chain(chain, filetype=OpenSSL.crypto.FILETYPE_PEM):
def dump_pyopenssl_chain(chain, filetype=crypto.FILETYPE_PEM):
"""Dump certificate chain into a bundle.
:param list chain: List of `OpenSSL.crypto.X509` (or wrapped in
:param list chain: List of `crypto.X509` (or wrapped in
:class:`josepy.util.ComparableX509`).
"""
@ -378,7 +400,7 @@ def notBefore(cert_path):
:rtype: :class:`datetime.datetime`
"""
return _notAfterBefore(cert_path, OpenSSL.crypto.X509.get_notBefore)
return _notAfterBefore(cert_path, crypto.X509.get_notBefore)
def notAfter(cert_path):
@ -390,15 +412,15 @@ def notAfter(cert_path):
:rtype: :class:`datetime.datetime`
"""
return _notAfterBefore(cert_path, OpenSSL.crypto.X509.get_notAfter)
return _notAfterBefore(cert_path, crypto.X509.get_notAfter)
def _notAfterBefore(cert_path, method):
"""Internal helper function for finding notbefore/notafter.
:param str cert_path: path to a cert in PEM format
:param function method: one of ``OpenSSL.crypto.X509.get_notBefore``
or ``OpenSSL.crypto.X509.get_notAfter``
:param function method: one of ``crypto.X509.get_notBefore``
or ``crypto.X509.get_notAfter``
:returns: the notBefore or notAfter value from the cert at cert_path
:rtype: :class:`datetime.datetime`
@ -406,7 +428,7 @@ def _notAfterBefore(cert_path, method):
"""
# pylint: disable=redefined-outer-name
with open(cert_path) as f:
x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
x509 = crypto.load_certificate(crypto.FILETYPE_PEM,
f.read())
# pyopenssl always returns bytes
timestamp = method(x509)
@ -443,7 +465,7 @@ def cert_and_chain_from_fullchain(fullchain_pem):
:rtype: tuple
"""
cert = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, fullchain_pem)).decode()
cert = crypto.dump_certificate(crypto.FILETYPE_PEM,
crypto.load_certificate(crypto.FILETYPE_PEM, fullchain_pem)).decode()
chain = fullchain_pem[len(cert):].lstrip()
return (cert, chain)

View file

@ -41,8 +41,8 @@ def _want_subscription():
'Would you be willing to share your email address with the '
"Electronic Frontier Foundation, a founding partner of the Let's "
'Encrypt project and the non-profit organization that develops '
"Certbot? We'd like to send you email about EFF and our work to "
'encrypt the web, protect its users and defend digital rights.')
"Certbot? We'd like to send you email about our work encrypting "
"the web, EFF news, campaigns, and ways to support digital freedom. ")
display = zope.component.getUtility(interfaces.IDisplay)
return display.yesno(prompt, default=False)
@ -71,11 +71,14 @@ def _check_response(response):
"""
logger.debug('Received response:\n%s', response.content)
if response.ok:
if not response.json()['status']:
try:
response.raise_for_status()
if response.json()['status'] == False:
_report_failure('your e-mail address appears to be invalid')
else:
except requests.exceptions.HTTPError:
_report_failure()
except (ValueError, KeyError):
_report_failure('there was a problem with the server response')
def _report_failure(reason=None):

View file

@ -5,6 +5,10 @@ import os
import signal
import traceback
# pylint: disable=unused-import, no-name-in-module
from acme.magic_typing import Any, Callable, Dict, List, Union
# pylint: enable=unused-import, no-name-in-module
from certbot import errors
logger = logging.getLogger(__name__)
@ -56,9 +60,9 @@ class ErrorHandler(object):
def __init__(self, func=None, *args, **kwargs):
self.call_on_regular_exit = False
self.body_executed = False
self.funcs = []
self.prev_handlers = {}
self.received_signals = []
self.funcs = [] # type: List[Callable[[], Any]]
self.prev_handlers = {} # type: Dict[int, Union[int, None, Callable]]
self.received_signals = [] # type: List[int]
if func is not None:
self.register(func, *args, **kwargs)
@ -88,6 +92,7 @@ class ErrorHandler(object):
return retval
def register(self, func, *args, **kwargs):
# type: (Callable, *Any, **Any) -> None
"""Sets func to be run with the given arguments during cleanup.
:param function func: function to be called in case of an error
@ -101,9 +106,8 @@ class ErrorHandler(object):
while self.funcs:
try:
self.funcs[-1]()
except Exception as error: # pylint: disable=broad-except
logger.error("Encountered exception during recovery")
logger.exception(error)
except Exception: # pylint: disable=broad-except
logger.error("Encountered exception during recovery: ", exc_info=True)
self.funcs.pop()
def _set_signal_handlers(self):

View file

@ -6,6 +6,7 @@ import os
from subprocess import Popen, PIPE
from acme.magic_typing import Set, List # pylint: disable=unused-import, no-name-in-module
from certbot import errors
from certbot import util
@ -76,7 +77,8 @@ def pre_hook(config):
if cmd:
_run_pre_hook_if_necessary(cmd)
pre_hook.already = set() # type: ignore
executed_pre_hooks = set() # type: Set[str]
def _run_pre_hook_if_necessary(command):
@ -88,12 +90,12 @@ def _run_pre_hook_if_necessary(command):
:param str command: pre-hook to be run
"""
if command in pre_hook.already:
if command in executed_pre_hooks:
logger.info("Pre-hook command already run, skipping: %s", command)
else:
logger.info("Running pre-hook command: %s", command)
_run_hook(command)
pre_hook.already.add(command)
executed_pre_hooks.add(command)
def post_hook(config):
@ -127,7 +129,8 @@ def post_hook(config):
logger.info("Running post-hook command: %s", cmd)
_run_hook(cmd)
post_hook.eventually = [] # type: ignore
post_hooks = [] # type: List[str]
def _run_eventually(command):
@ -139,13 +142,13 @@ def _run_eventually(command):
:param str command: post-hook to register to be run
"""
if command not in post_hook.eventually:
post_hook.eventually.append(command)
if command not in post_hooks:
post_hooks.append(command)
def run_saved_post_hooks():
"""Run any post hooks that were saved up in the course of the 'renew' verb"""
for cmd in post_hook.eventually:
for cmd in post_hooks:
logger.info("Running post-hook command: %s", cmd)
_run_hook(cmd)

View file

@ -201,7 +201,9 @@ class IConfig(zope.interface.Interface):
"""
server = zope.interface.Attribute("ACME Directory Resource URI.")
email = zope.interface.Attribute(
"Email used for registration and recovery contact. (default: Ask)")
"Email used for registration and recovery contact. Use comma to "
"register multiple emails, ex: u1@example.com,u2@example.com. "
"(default: Ask).")
rsa_key_size = zope.interface.Attribute("Size of the RSA key.")
must_staple = zope.interface.Attribute(
"Adds the OCSP Must Staple extension to the certificate. "

View file

@ -191,9 +191,8 @@ class MemoryHandler(logging.handlers.MemoryHandler):
only happens when flush(force=True) is called.
"""
def __init__(self, target=None):
def __init__(self, target=None, capacity=10000):
# capacity doesn't matter because should_flush() is overridden
capacity = float('inf')
super(MemoryHandler, self).__init__(capacity, target=target)
def close(self):

View file

@ -11,6 +11,7 @@ import josepy as jose
import zope.component
from acme import errors as acme_errors
from acme.magic_typing import Union # pylint: disable=unused-import, no-name-in-module
import certbot
@ -324,7 +325,7 @@ def _find_lineage_for_domains_and_certname(config, domains, certname):
return "newcert", None
else:
raise errors.ConfigurationError("No certificate with name {0} found. "
"Use -d to specify domains, or run certbot --certificates to see "
"Use -d to specify domains, or run certbot certificates to see "
"possible certificate names.".format(certname))
def _get_added_removed(after, before):
@ -340,7 +341,10 @@ def _get_added_removed(after, before):
def _format_list(character, strings):
"""Format list with given character
"""
formatted = "{br}{ch} " + "{br}{ch} ".join(strings)
if len(strings) == 0:
formatted = "{br}(None)"
else:
formatted = "{br}{ch} " + "{br}{ch} ".join(strings)
return formatted.format(
ch=character,
br=os.linesep
@ -483,6 +487,21 @@ def _determine_account(config):
:raises errors.Error: If unable to register an account with ACME server
"""
def _tos_cb(terms_of_service):
if config.tos:
return True
msg = ("Please read the Terms of Service at {0}. You "
"must agree in order to register with the ACME "
"server at {1}".format(
terms_of_service, config.server))
obj = zope.component.getUtility(interfaces.IDisplay)
result = obj.yesno(msg, "Agree", "Cancel",
cli_flag="--agree-tos", force_interactive=True)
if not result:
raise errors.Error(
"Registration cannot proceed without accepting "
"Terms of Service.")
account_storage = account.AccountFileStorage(config)
acme = None
@ -497,28 +516,13 @@ def _determine_account(config):
else: # no account registered yet
if config.email is None and not config.register_unsafely_without_email:
config.email = display_ops.get_email()
def _tos_cb(terms_of_service):
if config.tos:
return True
msg = ("Please read the Terms of Service at {0}. You "
"must agree in order to register with the ACME "
"server at {1}".format(
terms_of_service, config.server))
obj = zope.component.getUtility(interfaces.IDisplay)
result = obj.yesno(msg, "Agree", "Cancel",
cli_flag="--agree-tos", force_interactive=True)
if not result:
raise errors.Error(
"Registration cannot proceed without accepting "
"Terms of Service.")
try:
acc, acme = client.register(
config, account_storage, tos_cb=_tos_cb)
except errors.MissingCommandlineFlag:
raise
except errors.Error as error:
logger.debug(error, exc_info=True)
except errors.Error:
logger.debug("", exc_info=True)
raise errors.Error(
"Unable to register an account with ACME server")
@ -731,8 +735,9 @@ def register(config, unused_plugins):
acc, acme = _determine_account(config)
cb_client = client.Client(config, acc, None, None, acme=acme)
# We rely on an exception to interrupt this process if it didn't work.
acc_contacts = ['mailto:' + email for email in config.email.split(',')]
acc.regr = cb_client.acme.update_registration(acc.regr.update(
body=acc.regr.body.update(contact=('mailto:' + config.email,))))
body=acc.regr.body.update(contact=acc_contacts)))
account_storage.save_regr(acc, cb_client.acme)
eff.handle_subscription(config)
add_msg("Your e-mail address was updated to {0}.".format(config.email))
@ -1268,7 +1273,8 @@ def set_displayer(config):
"""
if config.quiet:
config.noninteractive_mode = True
displayer = display_util.NoninteractiveDisplay(open(os.devnull, "w"))
displayer = display_util.NoninteractiveDisplay(open(os.devnull, "w")) \
# type: Union[None, display_util.NoninteractiveDisplay, display_util.FileDisplay]
elif config.noninteractive_mode:
displayer = display_util.NoninteractiveDisplay(sys.stdout)
else:

View file

@ -11,6 +11,8 @@ import zope.interface
from josepy import util as jose_util
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
from certbot import achallenges # pylint: disable=unused-import
from certbot import constants
from certbot import crypto_util
from certbot import errors
@ -331,8 +333,8 @@ class ChallengePerformer(object):
def __init__(self, configurator):
self.configurator = configurator
self.achalls = []
self.indices = []
self.achalls = [] # type: List[achallenges.KeyAuthorizationAnnotatedChallenge]
self.indices = [] # type: List[int]
def add_chall(self, achall, idx=None):
"""Store challenge to be performed when perform() is called.

View file

@ -10,6 +10,7 @@ from collections import OrderedDict
import zope.interface
import zope.interface.verify
from acme.magic_typing import Dict # pylint: disable=unused-import, no-name-in-module
from certbot import constants
from certbot import errors
from certbot import interfaces
@ -189,7 +190,7 @@ class PluginsRegistry(collections.Mapping):
@classmethod
def find_all(cls):
"""Find plugins using setuptools entry points."""
plugins = {}
plugins = {} # type: Dict[str, PluginEntryPoint]
# pylint: disable=not-callable
entry_points = itertools.chain(
pkg_resources.iter_entry_points(

View file

@ -8,6 +8,7 @@ import pkg_resources
import six
import zope.interface
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
from certbot import errors
from certbot import interfaces
@ -250,7 +251,7 @@ class PluginsRegistryTest(unittest.TestCase):
self.plugin_ep.prepare.assert_called_once_with()
def test_prepare_order(self):
order = []
order = [] # type: List[str]
plugins = dict(
(c, mock.MagicMock(prepare=functools.partial(order.append, c)))
for c in string.ascii_letters)

View file

@ -5,7 +5,9 @@ import zope.component
import zope.interface
from acme import challenges
from acme.magic_typing import Dict # pylint: disable=unused-import, no-name-in-module
from certbot import achallenges # pylint: disable=unused-import
from certbot import interfaces
from certbot import errors
from certbot import hooks
@ -98,7 +100,8 @@ when it receives a TLS ClientHello with the SNI extension set to
super(Authenticator, self).__init__(*args, **kwargs)
self.reverter = reverter.Reverter(self.config)
self.reverter.recovery_routine()
self.env = dict()
self.env = dict() \
# type: Dict[achallenges.KeyAuthorizationAnnotatedChallenge, Dict[str, str]]
self.tls_sni_01 = None
@classmethod

View file

@ -6,6 +6,7 @@ import unittest
import mock
import zope.component
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
from certbot.display import util as display_util
from certbot.tests import util as test_util
from certbot import interfaces
@ -47,7 +48,7 @@ class PickPluginTest(unittest.TestCase):
self.default = None
self.reg = mock.MagicMock()
self.question = "Question?"
self.ifaces = []
self.ifaces = [] # type: List[interfaces.IPlugin]
def _call(self):
from certbot.plugins.selection import pick_plugin

View file

@ -3,6 +3,8 @@ import argparse
import collections
import logging
import socket
# https://github.com/python/typeshed/blob/master/stdlib/2and3/socket.pyi
from socket import errno as socket_errors # type: ignore
import OpenSSL
import six
@ -10,7 +12,10 @@ import zope.interface
from acme import challenges
from acme import standalone as acme_standalone
# pylint: disable=unused-import, no-name-in-module
from acme.magic_typing import DefaultDict, Dict, Set, Tuple, List, Type, TYPE_CHECKING
from certbot import achallenges # pylint: disable=unused-import
from certbot import errors
from certbot import interfaces
@ -18,6 +23,11 @@ from certbot.plugins import common
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
ServedType = DefaultDict[
acme_standalone.BaseDualNetworkedServers,
Set[achallenges.KeyAuthorizationAnnotatedChallenge]
]
class ServerManager(object):
"""Standalone servers manager.
@ -33,7 +43,7 @@ class ServerManager(object):
"""
def __init__(self, certs, http_01_resources):
self._instances = {}
self._instances = {} # type: Dict[int, acme_standalone.BaseDualNetworkedServers]
self.certs = certs
self.http_01_resources = http_01_resources
@ -59,7 +69,8 @@ class ServerManager(object):
address = (listenaddr, port)
try:
if challenge_type is challenges.TLSSNI01:
servers = acme_standalone.TLSSNI01DualNetworkedServers(address, self.certs)
servers = acme_standalone.TLSSNI01DualNetworkedServers(
address, self.certs) # type: acme_standalone.BaseDualNetworkedServers
else: # challenges.HTTP01
servers = acme_standalone.HTTP01DualNetworkedServers(
address, self.http_01_resources)
@ -103,7 +114,8 @@ class ServerManager(object):
return self._instances.copy()
SUPPORTED_CHALLENGES = [challenges.TLSSNI01, challenges.HTTP01]
SUPPORTED_CHALLENGES = [challenges.TLSSNI01, challenges.HTTP01] \
# type: List[Type[challenges.KeyAuthorizationChallenge]]
class SupportedChallengesAction(argparse.Action):
@ -179,14 +191,15 @@ class Authenticator(common.Plugin):
self.key = OpenSSL.crypto.PKey()
self.key.generate_key(OpenSSL.crypto.TYPE_RSA, 2048)
self.served = collections.defaultdict(set)
self.served = collections.defaultdict(set) # type: ServedType
# Stuff below is shared across threads (i.e. servers read
# values, main thread writes). Due to the nature of CPython's
# GIL, the operations are safe, c.f.
# https://docs.python.org/2/faq/library.html#what-kinds-of-global-value-mutation-are-thread-safe
self.certs = {}
self.http_01_resources = set()
self.certs = {} # type: Dict[bytes, Tuple[OpenSSL.crypto.PKey, OpenSSL.crypto.X509]]
self.http_01_resources = set() \
# type: Set[acme_standalone.HTTP01RequestHandler.HTTP01Resource]
self.servers = ServerManager(self.certs, self.http_01_resources)
@ -265,13 +278,13 @@ class Authenticator(common.Plugin):
def _handle_perform_error(error):
if error.socket_error.errno == socket.errno.EACCES:
if error.socket_error.errno == socket_errors.EACCES:
raise errors.PluginError(
"Could not bind TCP port {0} because you don't have "
"the appropriate permissions (for example, you "
"aren't running this program as "
"root).".format(error.port))
elif error.socket_error.errno == socket.errno.EADDRINUSE:
elif error.socket_error.errno == socket_errors.EADDRINUSE:
display = zope.component.getUtility(interfaces.IDisplay)
msg = (
"Could not bind TCP port {0} because it is already in "

View file

@ -2,12 +2,18 @@
import argparse
import socket
import unittest
# https://github.com/python/typeshed/blob/master/stdlib/2and3/socket.pyi
from socket import errno as socket_errors # type: ignore
import josepy as jose
import mock
import six
import OpenSSL.crypto # pylint: disable=unused-import
from acme import challenges
from acme import standalone as acme_standalone # pylint: disable=unused-import
from acme.magic_typing import Dict, Tuple, Set # pylint: disable=unused-import, no-name-in-module
from certbot import achallenges
from certbot import errors
@ -21,8 +27,9 @@ class ServerManagerTest(unittest.TestCase):
def setUp(self):
from certbot.plugins.standalone import ServerManager
self.certs = {}
self.http_01_resources = {}
self.certs = {} # type: Dict[bytes, Tuple[OpenSSL.crypto.PKey, OpenSSL.crypto.X509]]
self.http_01_resources = {} \
# type: Set[acme_standalone.HTTP01RequestHandler.HTTP01Resource]
self.mgr = ServerManager(self.certs, self.http_01_resources)
def test_init(self):
@ -159,7 +166,7 @@ class AuthenticatorTest(unittest.TestCase):
@test_util.patch_get_utility()
def test_perform_eaddrinuse_retry(self, mock_get_utility):
mock_utility = mock_get_utility()
errno = socket.errno.EADDRINUSE
errno = socket_errors.EADDRINUSE
error = errors.StandaloneBindError(mock.MagicMock(errno=errno), -1)
self.auth.servers.run.side_effect = [error] + 2 * [mock.MagicMock()]
mock_yesno = mock_utility.yesno
@ -174,7 +181,7 @@ class AuthenticatorTest(unittest.TestCase):
mock_yesno = mock_utility.yesno
mock_yesno.return_value = False
errno = socket.errno.EADDRINUSE
errno = socket_errors.EADDRINUSE
self.assertRaises(errors.PluginError, self._fail_perform, errno)
self._assert_correct_yesno_call(mock_yesno)
@ -184,11 +191,11 @@ class AuthenticatorTest(unittest.TestCase):
self.assertFalse(yesno_kwargs.get("default", True))
def test_perform_eacces(self):
errno = socket.errno.EACCES
errno = socket_errors.EACCES
self.assertRaises(errors.PluginError, self._fail_perform, errno)
def test_perform_unexpected_socket_error(self):
errno = socket.errno.ENOTCONN
errno = socket_errors.ENOTCONN
self.assertRaises(
errors.StandaloneBindError, self._fail_perform, errno)

View file

@ -3,6 +3,7 @@ import json
import logging
import os
from acme.magic_typing import Any, Dict # pylint: disable=unused-import, no-name-in-module
from certbot import errors
logger = logging.getLogger(__name__)
@ -38,7 +39,7 @@ class PluginStorage(object):
:raises .errors.PluginStorageError: when unable to open or read the file
"""
data = dict()
data = dict() # type: Dict[str, Any]
filedata = ""
try:
with open(self._storagepath, 'r') as fh:

View file

@ -10,8 +10,12 @@ import six
import zope.component
import zope.interface
from acme import challenges
from acme import challenges # pylint: disable=unused-import
# pylint: disable=unused-import, no-name-in-module
from acme.magic_typing import Dict, Set, DefaultDict, List
# pylint: enable=unused-import, no-name-in-module
from certbot import achallenges # pylint: disable=unused-import
from certbot import cli
from certbot import errors
from certbot import interfaces
@ -64,10 +68,11 @@ to serve all files under specified web root ({0})."""
def __init__(self, *args, **kwargs):
super(Authenticator, self).__init__(*args, **kwargs)
self.full_roots = {}
self.performed = collections.defaultdict(set)
self.full_roots = {} # type: Dict[str, str]
self.performed = collections.defaultdict(set) \
# type: DefaultDict[str, Set[achallenges.KeyAuthorizationAnnotatedChallenge]]
# stack of dirs successfully created by this authenticator
self._created_dirs = []
self._created_dirs = [] # type: List[str]
def prepare(self): # pylint: disable=missing-docstring
pass
@ -156,7 +161,6 @@ to serve all files under specified web root ({0})."""
" --help webroot for examples.")
for name, path in path_map.items():
self.full_roots[name] = os.path.join(path, challenges.HTTP01.URI_ROOT_PATH)
logger.debug("Creating root challenges validation dir at %s",
self.full_roots[name])
@ -207,7 +211,6 @@ to serve all files under specified web root ({0})."""
os.umask(old_umask)
self.performed[root_path].add(achall)
return response
def cleanup(self, achalls): # pylint: disable=missing-docstring
@ -219,7 +222,7 @@ to serve all files under specified web root ({0})."""
os.remove(validation_path)
self.performed[root_path].remove(achall)
not_removed = []
not_removed = [] # type: List[str]
while len(self._created_dirs) > 0:
path = self._created_dirs.pop()
try:

View file

@ -11,6 +11,8 @@ import zope.component
import OpenSSL
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
from certbot import cli
from certbot import crypto_util
from certbot import errors
@ -59,8 +61,8 @@ def _reconstitute(config, full_path):
"""
try:
renewal_candidate = storage.RenewableCert(full_path, config)
except (errors.CertStorageError, IOError) as exc:
logger.warning(exc)
except (errors.CertStorageError, IOError):
logger.warning("", exc_info=True)
logger.warning("Renewal configuration file %s is broken. Skipping.", full_path)
logger.debug("Traceback was:\n%s", traceback.format_exc())
return None
@ -133,14 +135,15 @@ def _restore_plugin_configs(config, renewalparams):
# longer defined, stored copies of that parameter will be
# deserialized as strings by this logic even if they were
# originally meant to be some other type.
plugin_prefixes = [] # type: List[str]
if renewalparams["authenticator"] == "webroot":
_restore_webroot_config(config, renewalparams)
plugin_prefixes = []
else:
plugin_prefixes = [renewalparams["authenticator"]]
plugin_prefixes.append(renewalparams["authenticator"])
if renewalparams.get("installer", None) is not None:
if renewalparams.get("installer") is not None:
plugin_prefixes.append(renewalparams["installer"])
for plugin_prefix in set(plugin_prefixes):
plugin_prefix = plugin_prefix.replace('-', '_')
for config_item, config_value in six.iteritems(renewalparams):
@ -316,13 +319,13 @@ def report(msgs, category):
def _renew_describe_results(config, renew_successes, renew_failures,
renew_skipped, parse_failures):
out = []
out = [] # type: List[str]
notify = out.append
disp = zope.component.getUtility(interfaces.IDisplay)
def notify_error(err):
"""Notify and log errors."""
notify(err)
notify(str(err))
logger.error(err)
if config.dry_run:

View file

@ -82,8 +82,10 @@ class Reverter(object):
self._recover_checkpoint(self.config.temp_checkpoint_dir)
except errors.ReverterError:
# We have a partial or incomplete recovery
logger.fatal("Incomplete or failed recovery for %s",
self.config.temp_checkpoint_dir)
logger.critical(
"Incomplete or failed recovery for %s",
self.config.temp_checkpoint_dir,
)
raise errors.ReverterError("Unable to revert temporary config")
def rollback_checkpoints(self, rollback=1):
@ -123,7 +125,7 @@ class Reverter(object):
try:
self._recover_checkpoint(cp_dir)
except errors.ReverterError:
logger.fatal("Failed to load checkpoint during rollback")
logger.critical("Failed to load checkpoint during rollback")
raise errors.ReverterError(
"Unable to load checkpoint during rollback")
rollback -= 1
@ -181,7 +183,7 @@ class Reverter(object):
if for_logging:
return os.linesep.join(output)
zope.component.getUtility(interfaces.IDisplay).notification(
os.linesep.join(output), force_interactive=True)
os.linesep.join(output), force_interactive=True, pause=False)
def add_to_temp_checkpoint(self, save_files, save_notes):
"""Add files to temporary checkpoint.
@ -457,7 +459,7 @@ class Reverter(object):
self._recover_checkpoint(self.config.in_progress_dir)
except errors.ReverterError:
# We have a partial or incomplete recovery
logger.fatal("Incomplete or failed recovery for IN_PROGRESS "
logger.critical("Incomplete or failed recovery for IN_PROGRESS "
"checkpoint - %s",
self.config.in_progress_dir)
raise errors.ReverterError(
@ -494,7 +496,7 @@ class Reverter(object):
"Certbot probably shut down unexpectedly",
os.linesep, path)
except (IOError, OSError):
logger.fatal(
logger.critical(
"Unable to remove filepaths contained within %s", file_list)
raise errors.ReverterError(
"Unable to remove filepaths contained within "

View file

@ -10,6 +10,7 @@ import zope.component
from acme import challenges
from acme import client as acme_client
from acme import messages
from acme.magic_typing import Dict # pylint: disable=unused-import, no-name-in-module
from certbot import achallenges
from certbot import errors
@ -354,12 +355,13 @@ class PollChallengesTest(unittest.TestCase):
acme_util.CHALLENGES, [messages.STATUS_PENDING] * 3, False), [])
]
self.chall_update = {}
self.chall_update = {} # type: Dict[int, achallenges.KeyAuthorizationAnnotatedChallenge]
for i, aauthzr in enumerate(self.aauthzrs):
self.chall_update[i] = [
challb_to_achall(challb, mock.Mock(key="dummy_key"), self.doms[i])
for challb in aauthzr.authzr.body.challenges]
@mock.patch("certbot.auth_handler.time")
def test_poll_challenges(self, unused_mock_time):
self.mock_net.poll.side_effect = self._mock_poll_solve_one_valid

View file

@ -495,7 +495,8 @@ class SetByCliTest(unittest.TestCase):
for v in ('manual', 'manual_auth_hook', 'manual_public_ip_logging_ok'):
self.assertTrue(_call_set_by_cli(v, args, verb))
cli.set_by_cli.detector = None
# https://github.com/python/mypy/issues/2087
cli.set_by_cli.detector = None # type: ignore
args = ['--manual-auth-hook', 'command']
for v in ('manual_auth_hook', 'manual_public_ip_logging_ok'):

View file

@ -12,7 +12,6 @@ from certbot import util
import certbot.tests.util as test_util
KEY = test_util.load_vector("rsa512_key.pem")
CSR_SAN = test_util.load_vector("csr-san_512.pem")
@ -92,6 +91,20 @@ class RegisterTest(test_util.ConfigTestCase):
mock_logger.info.assert_called_once_with(mock.ANY)
self.assertTrue(mock_handle.called)
@mock.patch("certbot.account.report_new_account")
@mock.patch("certbot.client.display_ops.get_email")
def test_dry_run_no_staging_account(self, _rep, mock_get_email):
"""Tests dry-run for no staging account, expect account created with no email"""
with mock.patch("certbot.client.acme_client.BackwardsCompatibleClientV2") as mock_client:
with mock.patch("certbot.eff.handle_subscription"):
with mock.patch("certbot.account.report_new_account"):
self.config.dry_run = True
self._call()
# check Certbot did not ask the user to provide an email
self.assertFalse(mock_get_email.called)
# check Certbot created an account with no email. Contact should return empty
self.assertFalse(mock_client().new_account_and_tos.call_args[0][0].contact)
def test_unsupported_error(self):
from acme import messages
msg = "Test"
@ -105,6 +118,7 @@ class RegisterTest(test_util.ConfigTestCase):
class ClientTestCommon(test_util.ConfigTestCase):
"""Common base class for certbot.client.Client tests."""
def setUp(self):
super(ClientTestCommon, self).setUp()
self.config.no_verify_ssl = False
@ -124,6 +138,7 @@ class ClientTestCommon(test_util.ConfigTestCase):
class ClientTest(ClientTestCommon):
"""Tests for certbot.client.Client."""
def setUp(self):
super(ClientTest, self).setUp()
@ -286,10 +301,10 @@ class ClientTest(ClientTestCommon):
@mock.patch('certbot.client.Client.obtain_certificate')
@mock.patch('certbot.storage.RenewableCert.new_lineage')
def test_obtain_and_enroll_certificate(self,
mock_storage, mock_obtain_certificate):
mock_storage, mock_obtain_certificate):
domains = ["*.example.com", "example.com"]
mock_obtain_certificate.return_value = (mock.MagicMock(),
mock.MagicMock(), mock.MagicMock(), None)
mock.MagicMock(), mock.MagicMock(), None)
self.client.config.dry_run = False
self.assertTrue(self.client.obtain_and_enroll_certificate(domains, "example_cert"))
@ -318,8 +333,8 @@ class ClientTest(ClientTestCommon):
candidate_fullchain_path = os.path.join(tmp_path, "chains", "fullchain.pem")
mock_parser.verb = "certonly"
mock_parser.args = ["--cert-path", candidate_cert_path,
"--chain-path", candidate_chain_path,
"--fullchain-path", candidate_fullchain_path]
"--chain-path", candidate_chain_path,
"--fullchain-path", candidate_fullchain_path]
cert_path, chain_path, fullchain_path = self.client.save_certificate(
cert_pem, chain_pem, candidate_cert_path, candidate_chain_path,
@ -407,6 +422,7 @@ class ClientTest(ClientTestCommon):
class EnhanceConfigTest(ClientTestCommon):
"""Tests for certbot.client.Client.enhance_config."""
def setUp(self):
super(EnhanceConfigTest, self).setUp()

View file

@ -21,6 +21,9 @@ CERT_PATH = test_util.vector_path('cert_512.pem')
CERT = test_util.load_vector('cert_512.pem')
SS_CERT_PATH = test_util.vector_path('cert_2048.pem')
SS_CERT = test_util.load_vector('cert_2048.pem')
P256_KEY = test_util.load_vector('nistp256_key.pem')
P256_CERT_PATH = test_util.vector_path('cert-nosans_nistp256.pem')
P256_CERT = test_util.load_vector('cert-nosans_nistp256.pem')
class InitSaveKeyTest(test_util.TempDirTestCase):
"""Tests for certbot.crypto_util.init_save_key."""
@ -217,6 +220,13 @@ class VerifyRenewableCertSigTest(VerifyCertSetup):
def test_cert_sig_match(self):
self.assertEqual(None, self._call(self.renewable_cert))
def test_cert_sig_match_ec(self):
renewable_cert = mock.MagicMock()
renewable_cert.cert = P256_CERT_PATH
renewable_cert.chain = P256_CERT_PATH
renewable_cert.privkey = P256_KEY
self.assertEqual(None, self._call(renewable_cert))
def test_cert_sig_mismatch(self):
self.bad_renewable_cert.cert = test_util.vector_path('cert_512_bad.pem')
self.assertRaises(errors.Error, self._call, self.bad_renewable_cert)

View file

@ -8,6 +8,7 @@ import unittest
import mock
from six.moves import reload_module # pylint: disable=import-error
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
from certbot.tests.util import TempDirTestCase
class CompleterTest(TempDirTestCase):
@ -21,7 +22,7 @@ class CompleterTest(TempDirTestCase):
if self.tempdir[-1] != os.sep:
self.tempdir += os.sep
self.paths = []
self.paths = [] # type: List[str]
# create some files and directories in temp_dir
for c in string.ascii_lowercase:
path = os.path.join(self.tempdir, c)

View file

@ -1,4 +1,5 @@
"""Tests for certbot.eff."""
import requests
import unittest
import mock
@ -118,11 +119,28 @@ class SubscribeTest(unittest.TestCase):
@test_util.patch_get_utility()
def test_not_ok(self, mock_get_utility):
self.response.ok = False
self.response.raise_for_status.side_effect = requests.exceptions.HTTPError
self._call() # pylint: disable=no-value-for-parameter
actual = self._get_reported_message(mock_get_utility)
unexpected_part = 'because'
self.assertFalse(unexpected_part in actual)
@test_util.patch_get_utility()
def test_response_not_json(self, mock_get_utility):
self.response.json.side_effect = ValueError()
self._call() # pylint: disable=no-value-for-parameter
actual = self._get_reported_message(mock_get_utility)
expected_part = 'problem'
self.assertTrue(expected_part in actual)
@test_util.patch_get_utility()
def test_response_json_missing_status_element(self, mock_get_utility):
self.json.clear()
self._call() # pylint: disable=no-value-for-parameter
actual = self._get_reported_message(mock_get_utility)
expected_part = 'problem'
self.assertTrue(expected_part in actual)
def _get_reported_message(self, mock_get_utility):
self.assertTrue(mock_get_utility().add_message.called)
return mock_get_utility().add_message.call_args[0][0]

View file

@ -6,6 +6,9 @@ import sys
import unittest
import mock
# pylint: disable=unused-import, no-name-in-module
from acme.magic_typing import Callable, Dict, Union
# pylint: enable=unused-import, no-name-in-module
def get_signals(signums):
@ -23,8 +26,7 @@ def set_signals(sig_handler_dict):
def signal_receiver(signums):
"""Context manager to catch signals"""
signals = []
prev_handlers = {}
prev_handlers = get_signals(signums)
prev_handlers = get_signals(signums) # type: Dict[int, Union[int, None, Callable]]
set_signals(dict((s, lambda s, _: signals.append(s)) for s in signums))
yield signals
set_signals(prev_handlers)

View file

@ -5,6 +5,7 @@ import unittest
import mock
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
from certbot import errors
from certbot.tests import util
@ -106,8 +107,8 @@ class PreHookTest(HookTest):
super(PreHookTest, self).tearDown()
def _reset_pre_hook_already(self):
from certbot.hooks import pre_hook
pre_hook.already.clear()
from certbot.hooks import executed_pre_hooks
executed_pre_hooks.clear()
def test_certonly(self):
self.config.verb = "certonly"
@ -184,8 +185,8 @@ class PostHookTest(HookTest):
super(PostHookTest, self).tearDown()
def _reset_post_hook_eventually(self):
from certbot.hooks import post_hook
post_hook.eventually = []
from certbot.hooks import post_hooks
del post_hooks[:]
def test_certonly_and_run_with_hook(self):
for verb in ("certonly", "run",):
@ -238,8 +239,8 @@ class PostHookTest(HookTest):
self.assertEqual(self._get_eventually(), expected)
def _get_eventually(self):
from certbot.hooks import post_hook
return post_hook.eventually
from certbot.hooks import post_hooks
return post_hooks
class RunSavedPostHooksTest(HookTest):
@ -248,23 +249,23 @@ class RunSavedPostHooksTest(HookTest):
@classmethod
def _call(cls, *args, **kwargs):
from certbot.hooks import run_saved_post_hooks
return run_saved_post_hooks(*args, **kwargs)
return run_saved_post_hooks()
def _call_with_mock_execute_and_eventually(self, *args, **kwargs):
"""Call run_saved_post_hooks but mock out execute and eventually
certbot.hooks.post_hook.eventually is replaced with
certbot.hooks.post_hooks is replaced with
self.eventually. The mock execute object is returned rather than
the return value of run_saved_post_hooks.
"""
eventually_path = "certbot.hooks.post_hook.eventually"
eventually_path = "certbot.hooks.post_hooks"
with mock.patch(eventually_path, new=self.eventually):
return self._call_with_mock_execute(*args, **kwargs)
def setUp(self):
super(RunSavedPostHooksTest, self).setUp()
self.eventually = []
self.eventually = [] # type: List[str]
def test_empty(self):
self.assertFalse(self._call_with_mock_execute_and_eventually().called)

View file

@ -10,6 +10,7 @@ import mock
import six
from acme import messages
from acme.magic_typing import Optional # pylint: disable=unused-import, no-name-in-module
from certbot import constants
from certbot import errors
@ -21,9 +22,9 @@ class PreArgParseSetupTest(unittest.TestCase):
"""Tests for certbot.log.pre_arg_parse_setup."""
@classmethod
def _call(cls, *args, **kwargs):
def _call(cls, *args, **kwargs): # pylint: disable=unused-argument
from certbot.log import pre_arg_parse_setup
return pre_arg_parse_setup(*args, **kwargs)
return pre_arg_parse_setup()
@mock.patch('certbot.log.sys')
@mock.patch('certbot.log.pre_arg_parse_except_hook')
@ -38,16 +39,16 @@ class PreArgParseSetupTest(unittest.TestCase):
mock_root_logger.setLevel.assert_called_once_with(logging.DEBUG)
self.assertEqual(mock_root_logger.addHandler.call_count, 2)
MemoryHandler = logging.handlers.MemoryHandler
memory_handler = None
memory_handler = None # type: Optional[logging.handlers.MemoryHandler]
for call in mock_root_logger.addHandler.call_args_list:
handler = call[0][0]
if memory_handler is None and isinstance(handler, MemoryHandler):
if memory_handler is None and isinstance(handler, logging.handlers.MemoryHandler):
memory_handler = handler
target = memory_handler.target # type: ignore
else:
self.assertTrue(isinstance(handler, logging.StreamHandler))
self.assertTrue(
isinstance(memory_handler.target, logging.StreamHandler))
isinstance(target, logging.StreamHandler))
mock_register.assert_called_once_with(logging.shutdown)
mock_sys.excepthook(1, 2, 3)

View file

@ -16,12 +16,14 @@ import josepy as jose
import six
from six.moves import reload_module # pylint: disable=import-error
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
from certbot import account
from certbot import cli
from certbot import constants
from certbot import configuration
from certbot import crypto_util
from certbot import errors
from certbot import interfaces # pylint: disable=unused-import
from certbot import main
from certbot import updater
from certbot import util
@ -600,14 +602,14 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
if mockisfile:
orig_open = os.path.isfile
def mock_isfile(fn, *args, **kwargs):
def mock_isfile(fn, *args, **kwargs): # pylint: disable=unused-argument
"""Mock os.path.isfile()"""
if (fn.endswith("cert") or
fn.endswith("chain") or
fn.endswith("privkey")):
return True
else:
return orig_open(fn, *args, **kwargs)
return orig_open(fn)
with mock.patch("os.path.isfile") as mock_if:
mock_if.side_effect = mock_isfile
@ -626,7 +628,8 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
toy_stdout = stdout if stdout else six.StringIO()
with mock.patch('certbot.main.sys.stdout', new=toy_stdout):
with mock.patch('certbot.main.sys.stderr') as stderr:
ret = main.main(args[:]) # NOTE: parser can alter its args!
with mock.patch("certbot.util.atexit"):
ret = main.main(args[:]) # NOTE: parser can alter its args!
return ret, toy_stdout, stderr
def test_no_flags(self):
@ -835,7 +838,7 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
@mock.patch('certbot.main.plugins_disco')
@mock.patch('certbot.main.cli.HelpfulArgumentParser.determine_help_topics')
def test_plugins_no_args(self, _det, mock_disco):
ifaces = []
ifaces = [] # type: List[interfaces.IPlugin]
plugins = mock_disco.PluginsRegistry.find_all()
stdout = six.StringIO()
@ -850,7 +853,7 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
@mock.patch('certbot.main.plugins_disco')
@mock.patch('certbot.main.cli.HelpfulArgumentParser.determine_help_topics')
def test_plugins_no_args_unprivileged(self, _det, mock_disco):
ifaces = []
ifaces = [] # type: List[interfaces.IPlugin]
plugins = mock_disco.PluginsRegistry.find_all()
def throw_error(directory, mode, uid, strict):
@ -872,7 +875,7 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
@mock.patch('certbot.main.plugins_disco')
@mock.patch('certbot.main.cli.HelpfulArgumentParser.determine_help_topics')
def test_plugins_init(self, _det, mock_disco):
ifaces = []
ifaces = [] # type: List[interfaces.IPlugin]
plugins = mock_disco.PluginsRegistry.find_all()
stdout = six.StringIO()
@ -890,7 +893,7 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
@mock.patch('certbot.main.plugins_disco')
@mock.patch('certbot.main.cli.HelpfulArgumentParser.determine_help_topics')
def test_plugins_prepare(self, _det, mock_disco):
ifaces = []
ifaces = [] # type: List[interfaces.IPlugin]
plugins = mock_disco.PluginsRegistry.find_all()
stdout = six.StringIO()
@ -1039,9 +1042,8 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
mock_client.obtain_certificate.return_value = (mock_certr, 'chain',
mock_key, 'csr')
def write_msg(message, *args, **kwargs):
def write_msg(message, *args, **kwargs): # pylint: disable=unused-argument
"""Write message to stdout."""
_, _ = args, kwargs
stdout.write(message)
try:
@ -1433,7 +1435,9 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
mocked_storage = mock.MagicMock()
mocked_account.AccountFileStorage.return_value = mocked_storage
mocked_storage.find_all.return_value = ["an account"]
mocked_det.return_value = (mock.MagicMock(), "foo")
mock_acc = mock.MagicMock()
mock_regr = mock_acc.regr
mocked_det.return_value = (mock_acc, "foo")
cb_client = mock.MagicMock()
mocked_client.Client.return_value = cb_client
x = self._call_no_clientmock(
@ -1443,8 +1447,10 @@ class MainTest(test_util.ConfigTestCase): # pylint: disable=too-many-public-met
self.assertTrue(x[0] is None)
# and we got supposedly did update the registration from
# the server
self.assertTrue(
cb_client.acme.update_registration.called)
reg_arg = cb_client.acme.update_registration.call_args[0][0]
# Test the return value of .update() was used because
# the regr is immutable.
self.assertEqual(reg_arg, mock_regr.update())
# and we saved the updated registration on disk
self.assertTrue(mocked_storage.save_regr.called)
self.assertTrue(

View file

@ -12,7 +12,7 @@ class ReporterTest(unittest.TestCase):
from certbot import reporter
self.reporter = reporter.Reporter(mock.MagicMock(quiet=False))
self.old_stdout = sys.stdout
self.old_stdout = sys.stdout # type: ignore
sys.stdout = six.StringIO()
def tearDown(self):
@ -21,32 +21,32 @@ class ReporterTest(unittest.TestCase):
def test_multiline_message(self):
self.reporter.add_message("Line 1\nLine 2", self.reporter.LOW_PRIORITY)
self.reporter.print_messages()
output = sys.stdout.getvalue()
output = sys.stdout.getvalue() # type: ignore
self.assertTrue("Line 1\n" in output)
self.assertTrue("Line 2" in output)
def test_tty_print_empty(self):
sys.stdout.isatty = lambda: True
sys.stdout.isatty = lambda: True # type: ignore
self.test_no_tty_print_empty()
def test_no_tty_print_empty(self):
self.reporter.print_messages()
self.assertEqual(sys.stdout.getvalue(), "")
self.assertEqual(sys.stdout.getvalue(), "") # type: ignore
try:
raise ValueError
except ValueError:
self.reporter.print_messages()
self.assertEqual(sys.stdout.getvalue(), "")
self.assertEqual(sys.stdout.getvalue(), "") # type: ignore
def test_tty_successful_exit(self):
sys.stdout.isatty = lambda: True
sys.stdout.isatty = lambda: True # type: ignore
self._successful_exit_common()
def test_no_tty_successful_exit(self):
self._successful_exit_common()
def test_tty_unsuccessful_exit(self):
sys.stdout.isatty = lambda: True
sys.stdout.isatty = lambda: True # type: ignore
self._unsuccessful_exit_common()
def test_no_tty_unsuccessful_exit(self):
@ -55,7 +55,7 @@ class ReporterTest(unittest.TestCase):
def _successful_exit_common(self):
self._add_messages()
self.reporter.print_messages()
output = sys.stdout.getvalue()
output = sys.stdout.getvalue() # type: ignore
self.assertTrue("IMPORTANT NOTES:" in output)
self.assertTrue("High" in output)
self.assertTrue("Med" in output)
@ -67,7 +67,7 @@ class ReporterTest(unittest.TestCase):
raise ValueError
except ValueError:
self.reporter.print_messages()
output = sys.stdout.getvalue()
output = sys.stdout.getvalue() # type: ignore
self.assertTrue("IMPORTANT NOTES:" in output)
self.assertTrue("High" in output)
self.assertTrue("Med" not in output)

View file

@ -0,0 +1,11 @@
-----BEGIN CERTIFICATE-----
MIIBoDCCAUYCCQDCnzfUZ7TQdDAKBggqhkjOPQQDAjBYMQswCQYDVQQGEwJVUzER
MA8GA1UECAwITWljaGlnYW4xEjAQBgNVBAcMCUFubiBBcmJvcjEMMAoGA1UECgwD
RUZGMRQwEgYDVQQDDAtleGFtcGxlLmNvbTAeFw0xODA1MTUxNzIyMzlaFw0xODA2
MTQxNzIyMzlaMFgxCzAJBgNVBAYTAlVTMREwDwYDVQQIDAhNaWNoaWdhbjESMBAG
A1UEBwwJQW5uIEFyYm9yMQwwCgYDVQQKDANFRkYxFDASBgNVBAMMC2V4YW1wbGUu
Y29tMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEPPl0JauSZukvAUWv4l5VNLAY
QXhuPXYQBf4dVET3s0E5q9ZCbSe+pNUbko9F+TFkuc7XVjQPsfkDbh0I9nD0tzAK
BggqhkjOPQQDAgNIADBFAiEAv8S2GXmWJqZ+j3DBfm72E1YK+HkOf+TOUHsbVR+O
Z1oCIFWNt1SPdIgRp4QAyzVk2pcTF8jDNajEMLWETDtxgRvM
-----END CERTIFICATE-----

View file

@ -0,0 +1,8 @@
-----BEGIN CERTIFICATE REQUEST-----
MIIBFDCBugIBADBYMQswCQYDVQQGEwJVUzERMA8GA1UECAwITWljaGlnYW4xEjAQ
BgNVBAcMCUFubiBBcmJvcjEMMAoGA1UECgwDRUZGMRQwEgYDVQQDDAtleGFtcGxl
LmNvbTBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABDz5dCWrkmbpLwFFr+JeVTSw
GEF4bj12EAX+HVRE97NBOavWQm0nvqTVG5KPRfkxZLnO11Y0D7H5A24dCPZw9Leg
ADAKBggqhkjOPQQDAgNJADBGAiEAuoZHrYA5sy2DRTdLAxJTBNHKFFKbtaGt+QaJ
A62qa8sCIQCUkSgSAiNaEnJ7r5fKphdjeORHqhpl6flYkLE3lGmGdg==
-----END CERTIFICATE REQUEST-----

View file

@ -0,0 +1,5 @@
-----BEGIN EC PRIVATE KEY-----
MHcCAQEEIOvXH384CyNNv2lfxvjc7hg2f7ScYoLvlk/VpINLJlGBoAoGCCqGSM49
AwEHoUQDQgAEPPl0JauSZukvAUWv4l5VNLAYQXhuPXYQBf4dVET3s0E5q9ZCbSe+
pNUbko9F+TFkuc7XVjQPsfkDbh0I9nD0tw==
-----END EC PRIVATE KEY-----

View file

@ -20,6 +20,7 @@ from collections import OrderedDict
import configargparse
from acme.magic_typing import Tuple, Union # pylint: disable=unused-import, no-name-in-module
from certbot import constants
from certbot import errors
from certbot import lock
@ -218,8 +219,12 @@ def safe_open(path, mode="w", chmod=None, buffering=None):
"""
# pylint: disable=star-args
open_args = () if chmod is None else (chmod,)
fdopen_args = () if buffering is None else (buffering,)
open_args = () # type: Union[Tuple[()], Tuple[int]]
if chmod is not None:
open_args = (chmod,)
fdopen_args = () # type: Union[Tuple[()], Tuple[int]]
if buffering is not None:
fdopen_args = (buffering,)
return os.fdopen(
os.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR, *open_args),
mode, *fdopen_args)
@ -303,9 +308,8 @@ def get_filtered_names(all_names):
for name in all_names:
try:
filtered_names.add(enforce_le_validity(name))
except errors.ConfigurationError as error:
logger.debug('Not suggesting name "%s"', name)
logger.debug(error)
except errors.ConfigurationError:
logger.debug('Not suggesting name "%s"', name, exc_info=True)
return filtered_names

View file

@ -11,6 +11,12 @@ Developer Guide
Getting Started
===============
Certbot has the same :ref:`system requirements <system_requirements>` when set
up for development. While the section below will help you install Certbot and
its dependencies, Certbot needs to be run on a UNIX-like OS so if you're using
Windows, you'll need to set up a (virtual) machine running an OS such as Linux
and continue with these instructions on that UNIX-like OS.
Running a local copy of the client
----------------------------------
@ -26,36 +32,40 @@ If you're on macOS, we recommend you skip the rest of this section and instead
run Certbot in Docker. You can find instructions for how to do this :ref:`here
<docker-dev>`. If you're running on Linux, you can run the following commands to
install dependencies and set up a virtual environment where you can run
Certbot. You will need to repeat this when Certbot's dependencies change or when
a new plugin is introduced.
Certbot.
.. code-block:: shell
cd certbot
sudo ./certbot-auto --os-packages-only
./tools/venv.sh
./certbot-auto --debug --os-packages-only
tools/venv.sh
You can now run the copy of Certbot from git either by executing
``venv/bin/certbot``, or by activating the virtual environment. If you're
actively modifying and testing the code, you may want to run commands like this in
each shell where you're working:
If you have Python3 available and want to use it, run the ``venv3.sh`` script.
.. code-block:: shell
source ./venv/bin/activate
export SERVER=https://acme-staging-v02.api.letsencrypt.org/directory
source tests/integration/_common.sh
tools/venv3.sh
After that, your shell will be using the virtual environment, your copy of
Certbot will default to requesting test (staging) certificates, and you run the
client by typing `certbot` or `certbot_test`. The latter is an alias that
includes several flags useful for testing. For instance, it sets various output
directories to point to /tmp/, and uses non-privileged ports for challenges, so
root privileges are not required.
.. note:: You may need to repeat this when
Certbot's dependencies change or when a new plugin is introduced.
Activating a shell with `venv/bin/activate` sets environment variables so that
Python pulls in the correct versions of various packages needed by Certbot.
More information can be found in the `virtualenv docs`_.
You can now run the copy of Certbot from git either by executing
``venv/bin/certbot``, or by activating the virtual environment. You can do the
latter by running:
.. code-block:: shell
source venv/bin/activate
# or
source venv3/bin/activate
After running this command, ``certbot`` and development tools like ``ipdb``,
``ipython``, ``pytest``, and ``tox`` are available in the shell where you ran
the command. These tools are installed in the virtual environment and are kept
separate from your global Python installation. This works by setting
environment variables so the right executables are found and Python can pull in
the versions of various packages needed by Certbot. More information can be
found in the `virtualenv docs`_.
.. _`virtualenv docs`: https://virtualenv.pypa.io
@ -95,25 +105,24 @@ Once all the unittests pass, check for sufficient test coverage using
``tox -e cover``, and then check for code style with ``tox -e lint`` (all files)
or ``pylint --rcfile=.pylintrc path/to/file.py`` (single file at a time).
Once all of the above is successful, you may run the full test suite,
including integration tests, using ``tox``. We recommend running the
commands above first, because running all tests with ``tox`` is very
slow, and the large amount of ``tox`` output can make it hard to find
specific failures when they happen. Also note that the full test suite
will attempt to modify your system's Apache config if your user has sudo
permissions, so it should not be run on a production Apache server.
Once all of the above is successful, you may run the full test suite using
``tox --skip-missing-interpreters``. We recommend running the commands above
first, because running all tests like this is very slow, and the large amount
of output can make it hard to find specific failures when they happen.
If you have trouble getting the full ``tox`` suite to run locally, it is
generally sufficient to open a pull request and let Github and Travis run
integration tests for you.
.. warning:: The full test suite may attempt to modify your system's Apache
config if your user has sudo permissions, so it should not be run on a
production Apache server.
.. _integration:
Integration testing with the Boulder CA
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
To run integration tests locally, you need Docker and docker-compose installed
and working. Fetch and start Boulder using:
Generally it is sufficient to open a pull request and let Github and Travis run
integration tests for you, however, if you want to run them locally you need
Docker and docker-compose installed and working. Fetch and start Boulder, Let's
Encrypt's ACME CA software, by using:
.. code-block:: shell
@ -316,14 +325,20 @@ Steps:
4. Run ``tox --skip-missing-interpreters`` to run the entire test suite
including coverage. The ``--skip-missing-interpreters`` argument ignores
missing versions of Python needed for running the tests. Fix any errors.
5. If your code touches communication with an ACME server/Boulder, you
should run the integration tests, see `integration`_.
6. Submit the PR.
7. Did your tests pass on Travis? If they didn't, fix any errors.
5. Submit the PR.
6. Did your tests pass on Travis? If they didn't, fix any errors.
Asking for help
===============
If you have any questions while working on a Certbot issue, don't hesitate to
ask for help! You can do this in the #letsencrypt-dev IRC channel on Freenode.
If you don't already have an IRC client set up, we recommend you join using
`Riot <https://riot.im/app/#/room/#freenode_#letsencrypt-dev:matrix.org>`_.
Updating certbot-auto and letsencrypt-auto
==========================================
Updating the scripts
--------------------
Developers should *not* modify the ``certbot-auto`` and ``letsencrypt-auto`` files
@ -386,8 +401,8 @@ Running the client with Docker
==============================
You can use Docker Compose to quickly set up an environment for running and
testing Certbot. This is especially useful for macOS users. To install Docker
Compose, follow the instructions at https://docs.docker.com/compose/install/.
testing Certbot. To install Docker Compose, follow the instructions at
https://docs.docker.com/compose/install/.
.. note:: Linux users can simply run ``pip install docker-compose`` to get
Docker Compose after installing Docker Engine and activating your shell as
@ -420,38 +435,23 @@ OS-level dependencies can be installed like so:
.. code-block:: shell
letsencrypt-auto-source/letsencrypt-auto --os-packages-only
./certbot-auto --debug --os-packages-only
In general...
* ``sudo`` is required as a suggested way of running privileged process
* `Python`_ 2.7 is required
* `Python`_ 2.7 or 3.4+ is required
* `Augeas`_ is required for the Python bindings
* ``virtualenv`` and ``pip`` are used for managing other python library
dependencies
* ``virtualenv`` is used for managing other Python library dependencies
.. _Python: https://wiki.python.org/moin/BeginnersGuide/Download
.. _Augeas: http://augeas.net/
.. _Virtualenv: https://virtualenv.pypa.io
Debian
------
For squeeze you will need to:
- Use ``virtualenv --no-site-packages -p python`` instead of ``-p python2``.
FreeBSD
-------
Packages can be installed on FreeBSD using ``pkg``,
or any other port-management tool (``portupgrade``, ``portmanager``, etc.)
from the pre-built package or can be built and installed from ports.
Either way will ensure proper installation of all the dependencies required
for the package.
FreeBSD by default uses ``tcsh``. In order to activate virtualenv (see
above), you will need a compatible shell, e.g. ``pkg install bash &&
bash``.

View file

@ -19,18 +19,21 @@ your system.
.. _certbot.eff.org: https://certbot.eff.org
.. _system_requirements:
System Requirements
===================
Certbot currently requires Python 2.7, or 3.4+. By default, it requires
root access in order to write to ``/etc/letsencrypt``,
``/var/log/letsencrypt``, ``/var/lib/letsencrypt``; to bind to ports 80 and 443
(if you use the ``standalone`` plugin) and to read and modify webserver
configurations (if you use the ``apache`` or ``nginx`` plugins). If none of
these apply to you, it is theoretically possible to run without root privileges,
but for most users who want to avoid running an ACME client as root, either
`letsencrypt-nosudo <https://github.com/diafygi/letsencrypt-nosudo>`_ or
`simp_le <https://github.com/zenhack/simp_le>`_ are more appropriate choices.
Certbot currently requires Python 2.7 or 3.4+ running on a UNIX-like operating
system. By default, it requires root access in order to write to
``/etc/letsencrypt``, ``/var/log/letsencrypt``, ``/var/lib/letsencrypt``; to
bind to ports 80 and 443 (if you use the ``standalone`` plugin) and to read and
modify webserver configurations (if you use the ``apache`` or ``nginx``
plugins). If none of these apply to you, it is theoretically possible to run
without root privileges, but for most users who want to avoid running an ACME
client as root, either `letsencrypt-nosudo
<https://github.com/diafygi/letsencrypt-nosudo>`_ or `simp_le
<https://github.com/zenhack/simp_le>`_ are more appropriate choices.
The Apache plugin currently requires an OS with augeas version 1.0; currently `it
supports

View file

@ -170,6 +170,14 @@ one of the options shown below on the command line.
It must still be possible for your machine to accept inbound connections from
the Internet on the specified port using each requested domain name.
By default, Certbot first attempts to bind to the port for all interfaces using
IPv6 and then bind to that port using IPv4; Certbot continues so long as at
least one bind succeeds. On most Linux systems, IPv4 traffic will be routed to
the bound IPv6 port and the failure during the second bind is expected.
Use ``--<challenge-type>-address`` to explicitly tell Certbot which interface
(and protocol) to bind.
.. note:: The ``--standalone-supported-challenges`` option has been
deprecated since ``certbot`` version 0.9.0.
@ -601,7 +609,7 @@ commands into your individual environment.
.. note:: ``certbot renew`` exit status will only be 1 if a renewal attempt failed.
This means ``certbot renew`` exit status will be 0 if no certificate needs to be updated.
If you write a custom script and expect to run a command only after a certificate was actually renewed
you will need to use the ``--post-hook`` since the exit status will be 0 both on successful renewal
you will need to use the ``--deploy-hook`` since the exit status will be 0 both on successful renewal
and when renewal is not necessary.
.. _renewal-config-file:

View file

@ -1055,9 +1055,11 @@ cffi==1.10.0 \
--hash=sha256:5576644b859197da7bbd8f8c7c2fb5dcc6cd505cadb42992d5f104c013f8a214 \
--hash=sha256:b3b02911eb1f6ada203b0763ba924234629b51586f72a21faacc638269f4ced5
ConfigArgParse==0.12.0 \
--hash=sha256:28cd7d67669651f2a4518367838c49539457504584a139709b2b8f6c208ef339
--hash=sha256:28cd7d67669651f2a4518367838c49539457504584a139709b2b8f6c208ef339 \
--no-binary ConfigArgParse
configobj==5.0.6 \
--hash=sha256:a2f5650770e1c87fb335af19a9b7eb73fc05ccf22144eb68db7d00cd2bcb0902
--hash=sha256:a2f5650770e1c87fb335af19a9b7eb73fc05ccf22144eb68db7d00cd2bcb0902 \
--no-binary configobj
cryptography==2.0.2 \
--hash=sha256:187ae17358436d2c760f28c2aeb02fefa3f37647a9c5b6f7f7c3e83cd1c5a972 \
--hash=sha256:19e43a13bbf52028dd1e810c803f2ad8880d0692d772f98d42e1eaf34bdee3d6 \
@ -1112,7 +1114,8 @@ mock==1.3.0 \
--hash=sha256:3f573a18be94de886d1191f27c168427ef693e8dcfcecf95b170577b2eb69cbb \
--hash=sha256:1e247dbecc6ce057299eb7ee019ad68314bb93152e81d9a6110d35f4d5eca0f6
ordereddict==1.1 \
--hash=sha256:1c35b4ac206cef2d24816c89f89cf289dd3d38cf7c449bb3fab7bf6d43f01b1f
--hash=sha256:1c35b4ac206cef2d24816c89f89cf289dd3d38cf7c449bb3fab7bf6d43f01b1f \
--no-binary ordereddict
packaging==16.8 \
--hash=sha256:99276dc6e3a7851f32027a68f1095cd3f77c148091b092ea867a351811cfe388 \
--hash=sha256:5d50835fdf0a7edf0b55e311b7c887786504efea1177abd7e69329a8e5ea619e
@ -1138,7 +1141,8 @@ pyRFC3339==1.0 \
--hash=sha256:eea31835c56e2096af4363a5745a784878a61d043e247d3a6d6a0a32a9741f56 \
--hash=sha256:8dfbc6c458b8daba1c0f3620a8c78008b323a268b27b7359e92a4ae41325f535
python-augeas==0.5.0 \
--hash=sha256:67d59d66cdba8d624e0389b87b2a83a176f21f16a87553b50f5703b23f29bac2
--hash=sha256:67d59d66cdba8d624e0389b87b2a83a176f21f16a87553b50f5703b23f29bac2 \
--no-binary python-augeas
pytz==2015.7 \
--hash=sha256:3abe6a6d3fc2fbbe4c60144211f45da2edbe3182a6f6511af6bbba0598b1f992 \
--hash=sha256:939ef9c1e1224d980405689a97ffcf7828c56d1517b31d73464356c1f2b7769e \
@ -1166,9 +1170,11 @@ unittest2==1.1.0 \
--hash=sha256:13f77d0875db6d9b435e1d4f41e74ad4cc2eb6e1d5c824996092b3430f088bb8 \
--hash=sha256:22882a0e418c284e1f718a822b3b022944d53d2d908e1690b319a9d3eb2c0579
zope.component==4.2.2 \
--hash=sha256:282c112b55dd8e3c869a3571f86767c150ab1284a9ace2bdec226c592acaf81a
--hash=sha256:282c112b55dd8e3c869a3571f86767c150ab1284a9ace2bdec226c592acaf81a \
--no-binary zope.component
zope.event==4.1.0 \
--hash=sha256:dc7a59a2fd91730d3793131a5d261b29e93ec4e2a97f1bc487ce8defee2fe786
--hash=sha256:dc7a59a2fd91730d3793131a5d261b29e93ec4e2a97f1bc487ce8defee2fe786 \
--no-binary zope.event
zope.interface==4.1.3 \
--hash=sha256:f07b631f7a601cd8cbd3332d54f43142c7088a83299f859356f08d1d4d4259b3 \
--hash=sha256:de5cca083b9439d8002fb76bbe6b4998c5a5a721fab25b84298967f002df4c94 \
@ -1187,6 +1193,9 @@ zope.interface==4.1.3 \
--hash=sha256:928138365245a0e8869a5999fbcc2a45475a0a6ed52a494d60dbdc540335fedd \
--hash=sha256:0d841ba1bb840eea0e6489dc5ecafa6125554971f53b5acb87764441e61bceba \
--hash=sha256:b09c8c1d47b3531c400e0195697f1414a63221de6ef478598a4f1460f7d9a392
requests-toolbelt==0.8.0 \
--hash=sha256:42c9c170abc2cacb78b8ab23ac957945c7716249206f90874651971a4acff237 \
--hash=sha256:f6a531936c6fa4c6cfce1b9c10d5c4f498d16528d2a54a22ca00011205a187b5
# Contains the requirements for the letsencrypt package.
#

View file

@ -59,9 +59,11 @@ cffi==1.10.0 \
--hash=sha256:5576644b859197da7bbd8f8c7c2fb5dcc6cd505cadb42992d5f104c013f8a214 \
--hash=sha256:b3b02911eb1f6ada203b0763ba924234629b51586f72a21faacc638269f4ced5
ConfigArgParse==0.12.0 \
--hash=sha256:28cd7d67669651f2a4518367838c49539457504584a139709b2b8f6c208ef339
--hash=sha256:28cd7d67669651f2a4518367838c49539457504584a139709b2b8f6c208ef339 \
--no-binary ConfigArgParse
configobj==5.0.6 \
--hash=sha256:a2f5650770e1c87fb335af19a9b7eb73fc05ccf22144eb68db7d00cd2bcb0902
--hash=sha256:a2f5650770e1c87fb335af19a9b7eb73fc05ccf22144eb68db7d00cd2bcb0902 \
--no-binary configobj
cryptography==2.0.2 \
--hash=sha256:187ae17358436d2c760f28c2aeb02fefa3f37647a9c5b6f7f7c3e83cd1c5a972 \
--hash=sha256:19e43a13bbf52028dd1e810c803f2ad8880d0692d772f98d42e1eaf34bdee3d6 \
@ -116,7 +118,8 @@ mock==1.3.0 \
--hash=sha256:3f573a18be94de886d1191f27c168427ef693e8dcfcecf95b170577b2eb69cbb \
--hash=sha256:1e247dbecc6ce057299eb7ee019ad68314bb93152e81d9a6110d35f4d5eca0f6
ordereddict==1.1 \
--hash=sha256:1c35b4ac206cef2d24816c89f89cf289dd3d38cf7c449bb3fab7bf6d43f01b1f
--hash=sha256:1c35b4ac206cef2d24816c89f89cf289dd3d38cf7c449bb3fab7bf6d43f01b1f \
--no-binary ordereddict
packaging==16.8 \
--hash=sha256:99276dc6e3a7851f32027a68f1095cd3f77c148091b092ea867a351811cfe388 \
--hash=sha256:5d50835fdf0a7edf0b55e311b7c887786504efea1177abd7e69329a8e5ea619e
@ -142,7 +145,8 @@ pyRFC3339==1.0 \
--hash=sha256:eea31835c56e2096af4363a5745a784878a61d043e247d3a6d6a0a32a9741f56 \
--hash=sha256:8dfbc6c458b8daba1c0f3620a8c78008b323a268b27b7359e92a4ae41325f535
python-augeas==0.5.0 \
--hash=sha256:67d59d66cdba8d624e0389b87b2a83a176f21f16a87553b50f5703b23f29bac2
--hash=sha256:67d59d66cdba8d624e0389b87b2a83a176f21f16a87553b50f5703b23f29bac2 \
--no-binary python-augeas
pytz==2015.7 \
--hash=sha256:3abe6a6d3fc2fbbe4c60144211f45da2edbe3182a6f6511af6bbba0598b1f992 \
--hash=sha256:939ef9c1e1224d980405689a97ffcf7828c56d1517b31d73464356c1f2b7769e \
@ -170,9 +174,11 @@ unittest2==1.1.0 \
--hash=sha256:13f77d0875db6d9b435e1d4f41e74ad4cc2eb6e1d5c824996092b3430f088bb8 \
--hash=sha256:22882a0e418c284e1f718a822b3b022944d53d2d908e1690b319a9d3eb2c0579
zope.component==4.2.2 \
--hash=sha256:282c112b55dd8e3c869a3571f86767c150ab1284a9ace2bdec226c592acaf81a
--hash=sha256:282c112b55dd8e3c869a3571f86767c150ab1284a9ace2bdec226c592acaf81a \
--no-binary zope.component
zope.event==4.1.0 \
--hash=sha256:dc7a59a2fd91730d3793131a5d261b29e93ec4e2a97f1bc487ce8defee2fe786
--hash=sha256:dc7a59a2fd91730d3793131a5d261b29e93ec4e2a97f1bc487ce8defee2fe786 \
--no-binary zope.event
zope.interface==4.1.3 \
--hash=sha256:f07b631f7a601cd8cbd3332d54f43142c7088a83299f859356f08d1d4d4259b3 \
--hash=sha256:de5cca083b9439d8002fb76bbe6b4998c5a5a721fab25b84298967f002df4c94 \
@ -191,3 +197,6 @@ zope.interface==4.1.3 \
--hash=sha256:928138365245a0e8869a5999fbcc2a45475a0a6ed52a494d60dbdc540335fedd \
--hash=sha256:0d841ba1bb840eea0e6489dc5ecafa6125554971f53b5acb87764441e61bceba \
--hash=sha256:b09c8c1d47b3531c400e0195697f1414a63221de6ef478598a4f1460f7d9a392
requests-toolbelt==0.8.0 \
--hash=sha256:42c9c170abc2cacb78b8ab23ac957945c7716249206f90874651971a4acff237 \
--hash=sha256:f6a531936c6fa4c6cfce1b9c10d5c4f498d16528d2a54a22ca00011205a187b5

View file

@ -16,6 +16,8 @@ import textwrap
import six
from letshelp_certbot.magic_typing import List # pylint: disable=unused-import, no-name-in-module
_DESCRIPTION = """
Let's Help is a simple script you can run to help out the Certbot
project. Since Certbot will support automatically configuring HTTPS on
@ -87,7 +89,8 @@ def copy_config(server_root, temp_dir):
:rtype: `tuple` of `list` of `str`
"""
copied_files, copied_dirs = [], []
copied_files = [] # type: List[str]
copied_dirs = [] # type: List[str]
dir_len = len(os.path.dirname(server_root))
for config_path, config_dirs, config_files in os.walk(server_root):

View file

@ -203,13 +203,19 @@ class LetsHelpApacheTest(unittest.TestCase):
tempdir_path, "config.tar.gz"))
tempdir = tar.next()
self.assertTrue(tempdir.isdir())
self.assertEqual(tempdir.name, ".")
if tempdir is None:
self.fail("Invalid tarball!") # pragma: no cover
else:
self.assertTrue(tempdir.isdir())
self.assertEqual(tempdir.name, ".")
testdir = tar.next()
self.assertTrue(testdir.isdir())
self.assertEqual(os.path.basename(testdir.name),
testdir_basename)
if testdir is None:
self.fail("Invalid tarball!") # pragma: no cover
else:
self.assertTrue(testdir.isdir())
self.assertEqual(os.path.basename(testdir.name),
testdir_basename)
self.assertEqual(tar.next(), None)

View file

@ -0,0 +1,16 @@
"""Shim class to not have to depend on typing module in prod."""
import sys
class TypingClass(object):
"""Ignore import errors by getting anything"""
def __getattr__(self, name):
return None
try:
# mypy doesn't respect modifying sys.modules
from typing import * # pylint: disable=wildcard-import, unused-wildcard-import
# pylint: disable=unused-import
from typing import Collection, IO # type: ignore
# pylint: enable=unused-import
except ImportError:
sys.modules[__name__] = TypingClass()

View file

@ -0,0 +1,41 @@
"""Tests for letshelp_certbot.magic_typing."""
import sys
import unittest
import mock
class MagicTypingTest(unittest.TestCase):
"""Tests for letshelp_certbot.magic_typing."""
def test_import_success(self):
try:
import typing as temp_typing
except ImportError: # pragma: no cover
temp_typing = None # pragma: no cover
typing_class_mock = mock.MagicMock()
text_mock = mock.MagicMock()
typing_class_mock.Text = text_mock
sys.modules['typing'] = typing_class_mock
if 'letshelp_certbot.magic_typing' in sys.modules:
del sys.modules['letshelp_certbot.magic_typing'] # pragma: no cover
from letshelp_certbot.magic_typing import Text # pylint: disable=no-name-in-module
self.assertEqual(Text, text_mock)
del sys.modules['letshelp_certbot.magic_typing']
sys.modules['typing'] = temp_typing
def test_import_failure(self):
try:
import typing as temp_typing
except ImportError: # pragma: no cover
temp_typing = None # pragma: no cover
sys.modules['typing'] = None
if 'letshelp_certbot.magic_typing' in sys.modules:
del sys.modules['letshelp_certbot.magic_typing'] # pragma: no cover
from letshelp_certbot.magic_typing import Text # pylint: disable=no-name-in-module
self.assertTrue(Text is None)
del sys.modules['letshelp_certbot.magic_typing']
sys.modules['typing'] = temp_typing
if __name__ == '__main__':
unittest.main() # pragma: no cover

View file

@ -1,6 +1,10 @@
[mypy]
python_version = 2.7
ignore_missing_imports = True
[mypy-acme.*]
check_untyped_defs = True
ignore_missing_imports = True
python_version = 2.7
[mypy-acme.magic_typing_test]
ignore_errors = True
[mypy-letshelp_certbot.magic_typing_test]
ignore_errors = True

View file

@ -34,7 +34,7 @@ version = meta['version']
# specified here to avoid masking the more specific request requirements in
# acme. See https://github.com/pypa/pip/issues/988 for more info.
install_requires = [
'acme>=0.22.1',
'acme>0.24.0',
# We technically need ConfigArgParse 0.10.0 for Python 2.6 support, but
# saying so here causes a runtime error against our temporary fork of 0.9.3
# in which we added 2.6 support (see #2243), so we relax the requirement.

View file

@ -11,22 +11,20 @@ if [ ! -d ${BOULDERPATH} ]; then
fi
cd ${BOULDERPATH}
FAKE_DNS=$(ifconfig docker0 | grep "inet addr:" | cut -d: -f2 | awk '{ print $1}')
[ -z "$FAKE_DNS" ] && FAKE_DNS=$(ifconfig docker0 | grep "inet " | xargs | cut -d ' ' -f 2)
[ -z "$FAKE_DNS" ] && FAKE_DNS=$(ip addr show dev docker0 | grep "inet " | xargs | cut -d ' ' -f 2 | cut -d '/' -f 1)
[ -z "$FAKE_DNS" ] && echo Unable to find the IP for docker0 && exit 1
sed -i "s/FAKE_DNS: .*/FAKE_DNS: ${FAKE_DNS}/" docker-compose.yml
sed -i "s/FAKE_DNS: .*/FAKE_DNS: 10.77.77.1/" docker-compose.yml
# If we're testing against ACMEv2, we need to use a newer boulder config for
# now. See https://github.com/letsencrypt/boulder#quickstart.
if [ "$BOULDER_INTEGRATION" = "v2" ]; then
sed -i 's/BOULDER_CONFIG_DIR: .*/BOULDER_CONFIG_DIR: test\/config-next/' docker-compose.yml
fi
docker-compose up -d
docker-compose up -d boulder
set +x # reduce verbosity while waiting for boulder
until curl http://localhost:4000/directory 2>/dev/null; do
echo waiting for boulder
sleep 1
for n in `seq 1 150` ; do
if curl http://localhost:4000/directory 2>/dev/null; then
break
else
sleep 1
fi
done
if ! curl http://localhost:4000/directory 2>/dev/null; then
echo "timed out waiting for boulder to start"
exit 1
fi

View file

@ -191,7 +191,14 @@ for dir in $renewal_hooks_dirs; do
exit 1
fi
done
common register --update-registration --email example@example.org
common unregister
common register --email ex1@domain.org,ex2@domain.org
common register --update-registration --email ex1@domain.org
common register --update-registration --email ex1@domain.org,ex2@domain.org
common plugins --init --prepare | grep webroot

View file

@ -198,7 +198,7 @@ def report_failure(err_msg, out, err):
:param str err: stderr output
"""
logger.fatal(err_msg)
logger.critical(err_msg)
log_output(logging.INFO, out, err)
sys.exit(err_msg)

View file

@ -30,7 +30,7 @@ josepy==1.0.1
logger==1.4
logilab-common==1.4.1
MarkupSafe==1.0
mypy==0.580
mypy==0.600
ndg-httpsclient==0.3.2
oauth2client==2.0.0
pathlib2==2.3.0

View file

@ -12,12 +12,18 @@ else
pip_install="$(dirname $0)/pip_install_editable.sh"
fi
temp_cwd=$(mktemp -d)
trap "rm -rf $temp_cwd" EXIT
set -x
for requirement in "$@" ; do
$pip_install $requirement
pkg=$(echo $requirement | cut -f1 -d\[) # remove any extras such as [dev]
pkg=$(echo "$pkg" | tr - _ ) # convert package names to Python import names
if [ $pkg = "." ]; then
pkg="certbot"
fi
cd "$temp_cwd"
pytest --numprocesses auto --quiet --pyargs $pkg
cd -
done

View file

@ -58,7 +58,6 @@ commands =
{[base]install_and_test} {[base]all_packages}
python tests/lock_test.py
setenv =
PYTHONPATH = {toxinidir}
PYTHONHASHSEED = 0
[testenv:py27-oldest]
@ -122,8 +121,8 @@ commands =
[testenv:mypy]
basepython = python3
commands =
{[base]pip_install} .[dev3]
{[base]install_packages}
{[base]pip_install} .[dev3]
mypy {[base]source_paths}
[testenv:apacheconftest]