Merge pull request #2652 from letsencrypt/split-plugin-selection

Move plugin selection from cli.py and display/ops.py into plugins/selection.py
This commit is contained in:
bmw 2016-03-24 18:27:24 -07:00
commit e5876ea162
9 changed files with 462 additions and 440 deletions

View file

@ -21,9 +21,8 @@ from letsencrypt import errors
from letsencrypt import interfaces
from letsencrypt import le_util
from letsencrypt.display import ops as display_ops
from letsencrypt.plugins import disco as plugins_disco
import letsencrypt.plugins.selection as plugin_selection
logger = logging.getLogger(__name__)
@ -33,9 +32,10 @@ helpful_parser = None
# For help strings, figure out how the user ran us.
# When invoked from letsencrypt-auto, sys.argv[0] is something like:
# "/home/user/.local/share/letsencrypt/bin/letsencrypt"
# Note that this won't work if the user set VENV_PATH or XDG_DATA_HOME before running
# letsencrypt-auto (and sudo stops us from seeing if they did), so it should only be used
# for purposes where inability to detect letsencrypt-auto fails safely
# Note that this won't work if the user set VENV_PATH or XDG_DATA_HOME before
# running letsencrypt-auto (and sudo stops us from seeing if they did), so it
# should only be used for purposes where inability to detect letsencrypt-auto
# fails safely
fragment = os.path.join(".local", "share", "letsencrypt")
cli_command = "letsencrypt-auto" if fragment in sys.argv[0] else "letsencrypt"
@ -99,147 +99,6 @@ def usage_strings(plugins):
return USAGE % (apache_doc, nginx_doc), SHORT_USAGE
def diagnose_configurator_problem(cfg_type, requested, plugins):
"""
Raise the most helpful error message about a plugin being unavailable
:param str cfg_type: either "installer" or "authenticator"
:param str requested: the plugin that was requested
:param .PluginsRegistry plugins: available plugins
:raises error.PluginSelectionError: if there was a problem
"""
if requested:
if requested not in plugins:
msg = "The requested {0} plugin does not appear to be installed".format(requested)
else:
msg = ("The {0} plugin is not working; there may be problems with "
"your existing configuration.\nThe error was: {1!r}"
.format(requested, plugins[requested].problem))
elif cfg_type == "installer":
if os.path.exists("/etc/debian_version"):
# Debian... installers are at least possible
msg = ('No installers seem to be present and working on your system; '
'fix that or try running letsencrypt with the "certonly" command')
else:
# XXX update this logic as we make progress on #788 and nginx support
msg = ('No installers are available on your OS yet; try running '
'"letsencrypt-auto certonly" to get a cert you can install manually')
else:
msg = "{0} could not be determined or is not installed".format(cfg_type)
raise errors.PluginSelectionError(msg)
def set_configurator(previously, now):
"""
Setting configurators multiple ways is okay, as long as they all agree
:param str previously: previously identified request for the installer/authenticator
:param str requested: the request currently being processed
"""
if not now:
# we're not actually setting anything
return previously
if previously:
if previously != now:
msg = "Too many flags setting configurators/installers/authenticators {0} -> {1}"
raise errors.PluginSelectionError(msg.format(repr(previously), repr(now)))
return now
def cli_plugin_requests(config):
"""
Figure out which plugins the user requested with CLI and config options
:returns: (requested authenticator string or None, requested installer string or None)
:rtype: tuple
"""
req_inst = req_auth = config.configurator
req_inst = set_configurator(req_inst, config.installer)
req_auth = set_configurator(req_auth, config.authenticator)
if config.nginx:
req_inst = set_configurator(req_inst, "nginx")
req_auth = set_configurator(req_auth, "nginx")
if config.apache:
req_inst = set_configurator(req_inst, "apache")
req_auth = set_configurator(req_auth, "apache")
if config.standalone:
req_auth = set_configurator(req_auth, "standalone")
if config.webroot:
req_auth = set_configurator(req_auth, "webroot")
if config.manual:
req_auth = set_configurator(req_auth, "manual")
logger.debug("Requested authenticator %s and installer %s", req_auth, req_inst)
return req_auth, req_inst
noninstaller_plugins = ["webroot", "manual", "standalone"]
def choose_configurator_plugins(config, plugins, verb):
"""
Figure out which configurator we're going to use, modifies
config.authenticator and config.istaller strings to reflect that choice if
necessary.
:raises errors.PluginSelectionError if there was a problem
:returns: (an `IAuthenticator` or None, an `IInstaller` or None)
:rtype: tuple
"""
req_auth, req_inst = cli_plugin_requests(config)
# Which plugins do we need?
if verb == "run":
need_inst = need_auth = True
if req_auth in noninstaller_plugins and not req_inst:
msg = ('With the {0} plugin, you probably want to use the "certonly" command, eg:{1}'
'{1} {2} certonly --{0}{1}{1}'
'(Alternatively, add a --installer flag. See https://eff.org/letsencrypt-plugins'
'{1} and "--help plugins" for more information.)'.format(
req_auth, os.linesep, cli_command))
raise errors.MissingCommandlineFlag(msg)
else:
need_inst = need_auth = False
if verb == "certonly":
need_auth = True
if verb == "install":
need_inst = True
if config.authenticator:
logger.warn("Specifying an authenticator doesn't make sense in install mode")
# Try to meet the user's request and/or ask them to pick plugins
authenticator = installer = None
if verb == "run" and req_auth == req_inst:
# Unless the user has explicitly asked for different auth/install,
# only consider offering a single choice
authenticator = installer = display_ops.pick_configurator(config, req_inst, plugins)
else:
if need_inst or req_inst:
installer = display_ops.pick_installer(config, req_inst, plugins)
if need_auth:
authenticator = display_ops.pick_authenticator(config, req_auth, plugins)
logger.debug("Selected authenticator %s and installer %s", authenticator, installer)
# Report on any failures
if need_inst and not installer:
diagnose_configurator_problem("installer", req_inst, plugins)
if need_auth and not authenticator:
diagnose_configurator_problem("authenticator", req_auth, plugins)
record_chosen_plugins(config, plugins, authenticator, installer)
return installer, authenticator
def record_chosen_plugins(config, plugins, auth, inst):
"Update the config entries to reflect the plugins we actually selected."
cn = config.namespace
cn.authenticator = plugins.find_init(auth).name if auth else "None"
cn.installer = plugins.find_init(inst).name if inst else "None"
def set_by_cli(var):
"""
Return True if a particular config variable has been set by the user
@ -256,7 +115,7 @@ def set_by_cli(var):
detector = set_by_cli.detector = prepare_and_parse_args(
plugins, reconstructed_args, detect_defaults=True)
# propagate plugin requests: eg --standalone modifies config.authenticator
auth, inst = cli_plugin_requests(detector)
auth, inst = plugin_selection.cli_plugin_requests(detector)
detector.authenticator = auth if auth else ""
detector.installer = inst if inst else ""
logger.debug("Default Detector is %r", detector)
@ -321,7 +180,7 @@ def flag_default(name):
def config_help(name, hidden=False):
"""Help message for `.IConfig` attribute."""
"""Extract the help message for an `.IConfig` attribute."""
if hidden:
return argparse.SUPPRESS
else:

View file

@ -27,6 +27,7 @@ from letsencrypt import storage
from letsencrypt.display import ops as display_ops
from letsencrypt.display import enhancements
from letsencrypt.plugins import selection as plugin_selection
logger = logging.getLogger(__name__)
@ -534,7 +535,7 @@ def rollback(default_installer, checkpoints, config, plugins):
"""
# Misconfigurations are only a slight problems... allow the user to rollback
installer = display_ops.pick_installer(
installer = plugin_selection.pick_installer(
config, default_installer, plugins, question="Which installer "
"should be used for rollback?")

View file

@ -9,130 +9,10 @@ from letsencrypt import interfaces
from letsencrypt import le_util
from letsencrypt.display import util as display_util
logger = logging.getLogger(__name__)
# Define a helper function to avoid verbose code
util = zope.component.getUtility
def choose_plugin(prepared, question):
"""Allow the user to choose their plugin.
:param list prepared: List of `~.PluginEntryPoint`.
:param str question: Question to be presented to the user.
:returns: Plugin entry point chosen by the user.
:rtype: `~.PluginEntryPoint`
"""
opts = [plugin_ep.description_with_name +
(" [Misconfigured]" if plugin_ep.misconfigured else "")
for plugin_ep in prepared]
while True:
disp = util(interfaces.IDisplay)
code, index = disp.menu(question, opts, help_label="More Info")
if code == display_util.OK:
plugin_ep = prepared[index]
if plugin_ep.misconfigured:
util(interfaces.IDisplay).notification(
"The selected plugin encountered an error while parsing "
"your server configuration and cannot be used. The error "
"was:\n\n{0}".format(plugin_ep.prepare()),
height=display_util.HEIGHT, pause=False)
else:
return plugin_ep
elif code == display_util.HELP:
if prepared[index].misconfigured:
msg = "Reported Error: %s" % prepared[index].prepare()
else:
msg = prepared[index].init().more_info()
util(interfaces.IDisplay).notification(
msg, height=display_util.HEIGHT)
else:
return None
def pick_plugin(config, default, plugins, question, ifaces):
"""Pick plugin.
:param letsencrypt.interfaces.IConfig: Configuration
:param str default: Plugin name supplied by user or ``None``.
:param letsencrypt.plugins.disco.PluginsRegistry plugins:
All plugins registered as entry points.
:param str question: Question to be presented to the user in case
multiple candidates are found.
:param list ifaces: Interfaces that plugins must provide.
:returns: Initialized plugin.
:rtype: IPlugin
"""
if default is not None:
# throw more UX-friendly error if default not in plugins
filtered = plugins.filter(lambda p_ep: p_ep.name == default)
else:
if config.noninteractive_mode:
# it's really bad to auto-select the single available plugin in
# non-interactive mode, because an update could later add a second
# available plugin
raise errors.MissingCommandlineFlag(
"Missing command line flags. For non-interactive execution, "
"you will need to specify a plugin on the command line. Run "
"with '--help plugins' to see a list of options, and see "
"https://eff.org/letsencrypt-plugins for more detail on what "
"the plugins do and how to use them.")
filtered = plugins.visible().ifaces(ifaces)
filtered.init(config)
verified = filtered.verify(ifaces)
verified.prepare()
prepared = verified.available()
if len(prepared) > 1:
logger.debug("Multiple candidate plugins: %s", prepared)
plugin_ep = choose_plugin(prepared.values(), question)
if plugin_ep is None:
return None
else:
return plugin_ep.init()
elif len(prepared) == 1:
plugin_ep = prepared.values()[0]
logger.debug("Single candidate plugin: %s", plugin_ep)
if plugin_ep.misconfigured:
return None
return plugin_ep.init()
else:
logger.debug("No candidate plugin")
return None
def pick_authenticator(
config, default, plugins, question="How would you "
"like to authenticate with the Let's Encrypt CA?"):
"""Pick authentication plugin."""
return pick_plugin(
config, default, plugins, question, (interfaces.IAuthenticator,))
def pick_installer(config, default, plugins,
question="How would you like to install certificates?"):
"""Pick installer plugin."""
return pick_plugin(
config, default, plugins, question, (interfaces.IInstaller,))
def pick_configurator(
config, default, plugins,
question="How would you like to authenticate and install "
"certificates?"):
"""Pick configurator plugin."""
return pick_plugin(
config, default, plugins, question,
(interfaces.IAuthenticator, interfaces.IInstaller))
z_util = zope.component.getUtility
def get_email(more=False, invalid=False):
@ -182,7 +62,7 @@ def choose_account(accounts):
# Note this will get more complicated once we start recording authorizations
labels = [acc.slug for acc in accounts]
code, index = util(interfaces.IDisplay).menu(
code, index = z_util(interfaces.IDisplay).menu(
"Please choose an account", labels)
if code == display_util.OK:
return accounts[index]
@ -208,7 +88,7 @@ def choose_names(installer):
names = get_valid_domains(domains)
if not names:
manual = util(interfaces.IDisplay).yesno(
manual = z_util(interfaces.IDisplay).yesno(
"No names were found in your configuration files.{0}You should "
"specify ServerNames in your config files in order to allow for "
"accurate installation of your certificate.{0}"
@ -256,7 +136,7 @@ def _filter_names(names):
:rtype: tuple
"""
code, names = util(interfaces.IDisplay).checklist(
code, names = z_util(interfaces.IDisplay).checklist(
"Which names would you like to activate HTTPS for?",
tags=names, cli_flag="--domains")
return code, [str(s) for s in names]
@ -265,7 +145,7 @@ def _filter_names(names):
def _choose_names_manually():
"""Manually input names for those without an installer."""
code, input_ = util(interfaces.IDisplay).input(
code, input_ = z_util(interfaces.IDisplay).input(
"Please enter in your domain name(s) (comma and/or space separated) ",
cli_flag="--domains")
@ -300,7 +180,7 @@ def _choose_names_manually():
if retry_message:
# We had error in input
retry = util(interfaces.IDisplay).yesno(retry_message)
retry = z_util(interfaces.IDisplay).yesno(retry_message)
if retry:
return _choose_names_manually()
else:
@ -316,7 +196,7 @@ def success_installation(domains):
:param list domains: domain names which were enabled
"""
util(interfaces.IDisplay).notification(
z_util(interfaces.IDisplay).notification(
"Congratulations! You have successfully enabled {0}{1}{1}"
"You should test your configuration at:{1}{2}".format(
_gen_https_names(domains),
@ -335,7 +215,7 @@ def success_renewal(domains, action):
:param str action: can be "reinstall" or "renew"
"""
util(interfaces.IDisplay).notification(
z_util(interfaces.IDisplay).notification(
"Your existing certificate has been successfully {3}ed, and the "
"new certificate has been installed.{1}{1}"
"The new certificate covers the following domains: {0}{1}{1}"

View file

@ -32,6 +32,7 @@ from letsencrypt import storage
from letsencrypt.display import util as display_util, ops as display_ops
from letsencrypt.plugins import disco as plugins_disco
from letsencrypt.plugins import selection as plug_sel
logger = logging.getLogger(__name__)
@ -406,7 +407,7 @@ def install(config, plugins):
# this function ...
try:
installer, _ = cli.choose_configurator_plugins(config, plugins, "install")
installer, _ = plug_sel.choose_configurator_plugins(config, plugins, "install")
except errors.PluginSelectionError as e:
return e.message
@ -481,7 +482,7 @@ def run(config, plugins): # pylint: disable=too-many-branches,too-many-locals
# TODO: Make run as close to auth + install as possible
# Possible difficulties: config.csr was hacked into auth
try:
installer, authenticator = cli.choose_configurator_plugins(config, plugins, "run")
installer, authenticator = plug_sel.choose_configurator_plugins(config, plugins, "run")
except errors.PluginSelectionError as e:
return e.message
@ -511,7 +512,7 @@ def obtain_cert(config, plugins, lineage=None):
# pylint: disable=too-many-locals
try:
# installers are used in auth mode to determine domain names
installer, authenticator = cli.choose_configurator_plugins(config, plugins, "certonly")
installer, authenticator = plug_sel.choose_configurator_plugins(config, plugins, "certonly")
except errors.PluginSelectionError as e:
logger.info("Could not choose appropriate plugin: %s", e)
raise

View file

@ -0,0 +1,273 @@
"""Decide which plugins to use for authentication & installation"""
from __future__ import print_function
import os
import logging
import zope.component
from letsencrypt import errors
from letsencrypt import interfaces
from letsencrypt.display import util as display_util
logger = logging.getLogger(__name__)
z_util = zope.component.getUtility
def pick_configurator(
config, default, plugins,
question="How would you like to authenticate and install "
"certificates?"):
"""Pick configurator plugin."""
return pick_plugin(
config, default, plugins, question,
(interfaces.IAuthenticator, interfaces.IInstaller))
def pick_installer(config, default, plugins,
question="How would you like to install certificates?"):
"""Pick installer plugin."""
return pick_plugin(
config, default, plugins, question, (interfaces.IInstaller,))
def pick_authenticator(
config, default, plugins, question="How would you "
"like to authenticate with the Let's Encrypt CA?"):
"""Pick authentication plugin."""
return pick_plugin(
config, default, plugins, question, (interfaces.IAuthenticator,))
def pick_plugin(config, default, plugins, question, ifaces):
"""Pick plugin.
:param letsencrypt.interfaces.IConfig: Configuration
:param str default: Plugin name supplied by user or ``None``.
:param letsencrypt.plugins.disco.PluginsRegistry plugins:
All plugins registered as entry points.
:param str question: Question to be presented to the user in case
multiple candidates are found.
:param list ifaces: Interfaces that plugins must provide.
:returns: Initialized plugin.
:rtype: IPlugin
"""
if default is not None:
# throw more UX-friendly error if default not in plugins
filtered = plugins.filter(lambda p_ep: p_ep.name == default)
else:
if config.noninteractive_mode:
# it's really bad to auto-select the single available plugin in
# non-interactive mode, because an update could later add a second
# available plugin
raise errors.MissingCommandlineFlag(
"Missing command line flags. For non-interactive execution, "
"you will need to specify a plugin on the command line. Run "
"with '--help plugins' to see a list of options, and see "
"https://eff.org/letsencrypt-plugins for more detail on what "
"the plugins do and how to use them.")
filtered = plugins.visible().ifaces(ifaces)
filtered.init(config)
verified = filtered.verify(ifaces)
verified.prepare()
prepared = verified.available()
if len(prepared) > 1:
logger.debug("Multiple candidate plugins: %s", prepared)
plugin_ep = choose_plugin(prepared.values(), question)
if plugin_ep is None:
return None
else:
return plugin_ep.init()
elif len(prepared) == 1:
plugin_ep = prepared.values()[0]
logger.debug("Single candidate plugin: %s", plugin_ep)
if plugin_ep.misconfigured:
return None
return plugin_ep.init()
else:
logger.debug("No candidate plugin")
return None
def choose_plugin(prepared, question):
"""Allow the user to choose their plugin.
:param list prepared: List of `~.PluginEntryPoint`.
:param str question: Question to be presented to the user.
:returns: Plugin entry point chosen by the user.
:rtype: `~.PluginEntryPoint`
"""
opts = [plugin_ep.description_with_name +
(" [Misconfigured]" if plugin_ep.misconfigured else "")
for plugin_ep in prepared]
while True:
disp = z_util(interfaces.IDisplay)
code, index = disp.menu(question, opts, help_label="More Info")
if code == display_util.OK:
plugin_ep = prepared[index]
if plugin_ep.misconfigured:
z_util(interfaces.IDisplay).notification(
"The selected plugin encountered an error while parsing "
"your server configuration and cannot be used. The error "
"was:\n\n{0}".format(plugin_ep.prepare()),
height=display_util.HEIGHT, pause=False)
else:
return plugin_ep
elif code == display_util.HELP:
if prepared[index].misconfigured:
msg = "Reported Error: %s" % prepared[index].prepare()
else:
msg = prepared[index].init().more_info()
z_util(interfaces.IDisplay).notification(
msg, height=display_util.HEIGHT)
else:
return None
noninstaller_plugins = ["webroot", "manual", "standalone"]
def record_chosen_plugins(config, plugins, auth, inst):
"Update the config entries to reflect the plugins we actually selected."
cn = config.namespace
cn.authenticator = plugins.find_init(auth).name if auth else "None"
cn.installer = plugins.find_init(inst).name if inst else "None"
def choose_configurator_plugins(config, plugins, verb):
"""
Figure out which configurator we're going to use, modifies
config.authenticator and config.installer strings to reflect that choice if
necessary.
:raises errors.PluginSelectionError if there was a problem
:returns: (an `IAuthenticator` or None, an `IInstaller` or None)
:rtype: tuple
"""
req_auth, req_inst = cli_plugin_requests(config)
# Which plugins do we need?
if verb == "run":
need_inst = need_auth = True
from letsencrypt.cli import cli_command
if req_auth in noninstaller_plugins and not req_inst:
msg = ('With the {0} plugin, you probably want to use the "certonly" command, eg:{1}'
'{1} {2} certonly --{0}{1}{1}'
'(Alternatively, add a --installer flag. See https://eff.org/letsencrypt-plugins'
'{1} and "--help plugins" for more information.)'.format(
req_auth, os.linesep, cli_command))
raise errors.MissingCommandlineFlag(msg)
else:
need_inst = need_auth = False
if verb == "certonly":
need_auth = True
if verb == "install":
need_inst = True
if config.authenticator:
logger.warn("Specifying an authenticator doesn't make sense in install mode")
# Try to meet the user's request and/or ask them to pick plugins
authenticator = installer = None
if verb == "run" and req_auth == req_inst:
# Unless the user has explicitly asked for different auth/install,
# only consider offering a single choice
authenticator = installer = pick_configurator(config, req_inst, plugins)
else:
if need_inst or req_inst:
installer = pick_installer(config, req_inst, plugins)
if need_auth:
authenticator = pick_authenticator(config, req_auth, plugins)
logger.debug("Selected authenticator %s and installer %s", authenticator, installer)
# Report on any failures
if need_inst and not installer:
diagnose_configurator_problem("installer", req_inst, plugins)
if need_auth and not authenticator:
diagnose_configurator_problem("authenticator", req_auth, plugins)
record_chosen_plugins(config, plugins, authenticator, installer)
return installer, authenticator
def set_configurator(previously, now):
"""
Setting configurators multiple ways is okay, as long as they all agree
:param str previously: previously identified request for the installer/authenticator
:param str requested: the request currently being processed
"""
if not now:
# we're not actually setting anything
return previously
if previously:
if previously != now:
msg = "Too many flags setting configurators/installers/authenticators {0} -> {1}"
raise errors.PluginSelectionError(msg.format(repr(previously), repr(now)))
return now
def cli_plugin_requests(config):
"""
Figure out which plugins the user requested with CLI and config options
:returns: (requested authenticator string or None, requested installer string or None)
:rtype: tuple
"""
req_inst = req_auth = config.configurator
req_inst = set_configurator(req_inst, config.installer)
req_auth = set_configurator(req_auth, config.authenticator)
if config.nginx:
req_inst = set_configurator(req_inst, "nginx")
req_auth = set_configurator(req_auth, "nginx")
if config.apache:
req_inst = set_configurator(req_inst, "apache")
req_auth = set_configurator(req_auth, "apache")
if config.standalone:
req_auth = set_configurator(req_auth, "standalone")
if config.webroot:
req_auth = set_configurator(req_auth, "webroot")
if config.manual:
req_auth = set_configurator(req_auth, "manual")
logger.debug("Requested authenticator %s and installer %s", req_auth, req_inst)
return req_auth, req_inst
def diagnose_configurator_problem(cfg_type, requested, plugins):
"""
Raise the most helpful error message about a plugin being unavailable
:param str cfg_type: either "installer" or "authenticator"
:param str requested: the plugin that was requested
:param .PluginsRegistry plugins: available plugins
:raises error.PluginSelectionError: if there was a problem
"""
if requested:
if requested not in plugins:
msg = "The requested {0} plugin does not appear to be installed".format(requested)
else:
msg = ("The {0} plugin is not working; there may be problems with "
"your existing configuration.\nThe error was: {1!r}"
.format(requested, plugins[requested].problem))
elif cfg_type == "installer":
if os.path.exists("/etc/debian_version"):
# Debian... installers are at least possible
msg = ('No installers seem to be present and working on your system; '
'fix that or try running letsencrypt with the "certonly" command')
else:
# XXX update this logic as we make progress on #788 and nginx support
msg = ('No installers are available on your OS yet; try running '
'"letsencrypt-auto certonly" to get a cert you can install manually')
else:
msg = "{0} could not be determined or is not installed".format(cfg_type)
raise errors.PluginSelectionError(msg)

View file

@ -0,0 +1,149 @@
"""Tests for letsenecrypt.plugins.selection"""
import sys
import unittest
import mock
import zope.component
from letsencrypt.display import util as display_util
from letsencrypt import interfaces
class ConveniencePickPluginTest(unittest.TestCase):
"""Tests for letsencrypt.plugins.selection.pick_*."""
def _test(self, fun, ifaces):
config = mock.Mock()
default = mock.Mock()
plugins = mock.Mock()
with mock.patch("letsencrypt.plugins.selection.pick_plugin") as mock_p:
mock_p.return_value = "foo"
self.assertEqual("foo", fun(config, default, plugins, "Question?"))
mock_p.assert_called_once_with(
config, default, plugins, "Question?", ifaces)
def test_authenticator(self):
from letsencrypt.plugins.selection import pick_authenticator
self._test(pick_authenticator, (interfaces.IAuthenticator,))
def test_installer(self):
from letsencrypt.plugins.selection import pick_installer
self._test(pick_installer, (interfaces.IInstaller,))
def test_configurator(self):
from letsencrypt.plugins.selection import pick_configurator
self._test(pick_configurator,
(interfaces.IAuthenticator, interfaces.IInstaller))
class PickPluginTest(unittest.TestCase):
"""Tests for letsencrypt.plugins.selection.pick_plugin."""
def setUp(self):
self.config = mock.Mock(noninteractive_mode=False)
self.default = None
self.reg = mock.MagicMock()
self.question = "Question?"
self.ifaces = []
def _call(self):
from letsencrypt.plugins.selection import pick_plugin
return pick_plugin(self.config, self.default, self.reg,
self.question, self.ifaces)
def test_default_provided(self):
self.default = "foo"
self._call()
self.assertEqual(1, self.reg.filter.call_count)
def test_no_default(self):
self._call()
self.assertEqual(1, self.reg.visible().ifaces.call_count)
def test_no_candidate(self):
self.assertTrue(self._call() is None)
def test_single(self):
plugin_ep = mock.MagicMock()
plugin_ep.init.return_value = "foo"
plugin_ep.misconfigured = False
self.reg.visible().ifaces().verify().available.return_value = {
"bar": plugin_ep}
self.assertEqual("foo", self._call())
def test_single_misconfigured(self):
plugin_ep = mock.MagicMock()
plugin_ep.init.return_value = "foo"
plugin_ep.misconfigured = True
self.reg.visible().ifaces().verify().available.return_value = {
"bar": plugin_ep}
self.assertTrue(self._call() is None)
def test_multiple(self):
plugin_ep = mock.MagicMock()
plugin_ep.init.return_value = "foo"
self.reg.visible().ifaces().verify().available.return_value = {
"bar": plugin_ep,
"baz": plugin_ep,
}
with mock.patch("letsencrypt.plugins.selection.choose_plugin") as mock_choose:
mock_choose.return_value = plugin_ep
self.assertEqual("foo", self._call())
mock_choose.assert_called_once_with(
[plugin_ep, plugin_ep], self.question)
def test_choose_plugin_none(self):
self.reg.visible().ifaces().verify().available.return_value = {
"bar": None,
"baz": None,
}
with mock.patch("letsencrypt.plugins.selection.choose_plugin") as mock_choose:
mock_choose.return_value = None
self.assertTrue(self._call() is None)
class ChoosePluginTest(unittest.TestCase):
"""Tests for letsencrypt.plugins.selection.choose_plugin."""
def setUp(self):
zope.component.provideUtility(display_util.FileDisplay(sys.stdout))
self.mock_apache = mock.Mock(
description_with_name="a", misconfigured=True)
self.mock_stand = mock.Mock(
description_with_name="s", misconfigured=False)
self.mock_stand.init().more_info.return_value = "standalone"
self.plugins = [
self.mock_apache,
self.mock_stand,
]
def _call(self):
from letsencrypt.plugins.selection import choose_plugin
return choose_plugin(self.plugins, "Question?")
@mock.patch("letsencrypt.plugins.selection.z_util")
def test_selection(self, mock_util):
mock_util().menu.side_effect = [(display_util.OK, 0),
(display_util.OK, 1)]
self.assertEqual(self.mock_stand, self._call())
self.assertEqual(mock_util().notification.call_count, 1)
@mock.patch("letsencrypt.plugins.selection.z_util")
def test_more_info(self, mock_util):
mock_util().menu.side_effect = [
(display_util.HELP, 0),
(display_util.HELP, 1),
(display_util.OK, 1),
]
self.assertEqual(self.mock_stand, self._call())
self.assertEqual(mock_util().notification.call_count, 2)
@mock.patch("letsencrypt.plugins.selection.z_util")
def test_no_choice(self, mock_util):
mock_util().menu.return_value = (display_util.CANCEL, 0)
self.assertTrue(self._call() is None)

View file

@ -203,12 +203,12 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
self.assertEqual(args.chain_path, os.path.abspath(chain))
self.assertEqual(args.fullchain_path, os.path.abspath(fullchain))
@mock.patch('letsencrypt.main.cli.record_chosen_plugins')
@mock.patch('letsencrypt.main.cli.display_ops')
def test_installer_selection(self, mock_display_ops, _rec):
@mock.patch('letsencrypt.main.plug_sel.record_chosen_plugins')
@mock.patch('letsencrypt.main.plug_sel.pick_installer')
def test_installer_selection(self, mock_pick_installer, _rec):
self._call(['install', '--domains', 'foo.bar', '--cert-path', 'cert',
'--key-path', 'key', '--chain-path', 'chain'])
self.assertEqual(mock_display_ops.pick_installer.call_count, 1)
self.assertEqual(mock_pick_installer.call_count, 1)
@mock.patch('letsencrypt.le_util.exe_exists')
def test_configurator_selection(self, mock_exe_exists):
@ -499,7 +499,7 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
self._webroot_map_test(simple_map, "/tmp2", "eg2.com,eg.com", expected_map, domains)
# test inclusion of interactively specified domains in the webroot map
with mock.patch('letsencrypt.cli.display_ops.choose_names') as mock_choose:
with mock.patch('letsencrypt.display.ops.choose_names') as mock_choose:
mock_choose.return_value = domains
expected_map["eg2.com"] = "/tmp"
self._webroot_map_test(None, "/tmp", None, expected_map, domains)

View file

@ -460,9 +460,8 @@ class RollbackTest(unittest.TestCase):
@classmethod
def _call(cls, checkpoints, side_effect):
from letsencrypt.client import rollback
with mock.patch("letsencrypt.client"
".display_ops.pick_installer") as mock_pick_installer:
mock_pick_installer.side_effect = side_effect
with mock.patch("letsencrypt.client.plugin_selection.pick_installer") as mpi:
mpi.side_effect = side_effect
rollback(None, checkpoints, {}, mock.MagicMock())
def test_no_problems(self):

View file

@ -22,146 +22,6 @@ from letsencrypt.tests import test_util
KEY = jose.JWKRSA.load(test_util.load_vector("rsa512_key.pem"))
class ChoosePluginTest(unittest.TestCase):
"""Tests for letsencrypt.display.ops.choose_plugin."""
def setUp(self):
zope.component.provideUtility(display_util.FileDisplay(sys.stdout))
self.mock_apache = mock.Mock(
description_with_name="a", misconfigured=True)
self.mock_stand = mock.Mock(
description_with_name="s", misconfigured=False)
self.mock_stand.init().more_info.return_value = "standalone"
self.plugins = [
self.mock_apache,
self.mock_stand,
]
def _call(self):
from letsencrypt.display.ops import choose_plugin
return choose_plugin(self.plugins, "Question?")
@mock.patch("letsencrypt.display.ops.util")
def test_selection(self, mock_util):
mock_util().menu.side_effect = [(display_util.OK, 0),
(display_util.OK, 1)]
self.assertEqual(self.mock_stand, self._call())
self.assertEqual(mock_util().notification.call_count, 1)
@mock.patch("letsencrypt.display.ops.util")
def test_more_info(self, mock_util):
mock_util().menu.side_effect = [
(display_util.HELP, 0),
(display_util.HELP, 1),
(display_util.OK, 1),
]
self.assertEqual(self.mock_stand, self._call())
self.assertEqual(mock_util().notification.call_count, 2)
@mock.patch("letsencrypt.display.ops.util")
def test_no_choice(self, mock_util):
mock_util().menu.return_value = (display_util.CANCEL, 0)
self.assertTrue(self._call() is None)
class PickPluginTest(unittest.TestCase):
"""Tests for letsencrypt.display.ops.pick_plugin."""
def setUp(self):
self.config = mock.Mock(noninteractive_mode=False)
self.default = None
self.reg = mock.MagicMock()
self.question = "Question?"
self.ifaces = []
def _call(self):
from letsencrypt.display.ops import pick_plugin
return pick_plugin(self.config, self.default, self.reg,
self.question, self.ifaces)
def test_default_provided(self):
self.default = "foo"
self._call()
self.assertEqual(1, self.reg.filter.call_count)
def test_no_default(self):
self._call()
self.assertEqual(1, self.reg.visible().ifaces.call_count)
def test_no_candidate(self):
self.assertTrue(self._call() is None)
def test_single(self):
plugin_ep = mock.MagicMock()
plugin_ep.init.return_value = "foo"
plugin_ep.misconfigured = False
self.reg.visible().ifaces().verify().available.return_value = {
"bar": plugin_ep}
self.assertEqual("foo", self._call())
def test_single_misconfigured(self):
plugin_ep = mock.MagicMock()
plugin_ep.init.return_value = "foo"
plugin_ep.misconfigured = True
self.reg.visible().ifaces().verify().available.return_value = {
"bar": plugin_ep}
self.assertTrue(self._call() is None)
def test_multiple(self):
plugin_ep = mock.MagicMock()
plugin_ep.init.return_value = "foo"
self.reg.visible().ifaces().verify().available.return_value = {
"bar": plugin_ep,
"baz": plugin_ep,
}
with mock.patch("letsencrypt.display.ops.choose_plugin") as mock_choose:
mock_choose.return_value = plugin_ep
self.assertEqual("foo", self._call())
mock_choose.assert_called_once_with(
[plugin_ep, plugin_ep], self.question)
def test_choose_plugin_none(self):
self.reg.visible().ifaces().verify().available.return_value = {
"bar": None,
"baz": None,
}
with mock.patch("letsencrypt.display.ops.choose_plugin") as mock_choose:
mock_choose.return_value = None
self.assertTrue(self._call() is None)
class ConveniencePickPluginTest(unittest.TestCase):
"""Tests for letsencrypt.display.ops.pick_*."""
def _test(self, fun, ifaces):
config = mock.Mock()
default = mock.Mock()
plugins = mock.Mock()
with mock.patch("letsencrypt.display.ops.pick_plugin") as mock_p:
mock_p.return_value = "foo"
self.assertEqual("foo", fun(config, default, plugins, "Question?"))
mock_p.assert_called_once_with(
config, default, plugins, "Question?", ifaces)
def test_authenticator(self):
from letsencrypt.display.ops import pick_authenticator
self._test(pick_authenticator, (interfaces.IAuthenticator,))
def test_installer(self):
from letsencrypt.display.ops import pick_installer
self._test(pick_installer, (interfaces.IInstaller,))
def test_configurator(self):
from letsencrypt.display.ops import pick_configurator
self._test(pick_configurator, (
interfaces.IAuthenticator, interfaces.IInstaller))
class GetEmailTest(unittest.TestCase):
"""Tests for letsencrypt.display.ops.get_email."""
@ -241,17 +101,17 @@ class ChooseAccountTest(unittest.TestCase):
from letsencrypt.display import ops
return ops.choose_account(accounts)
@mock.patch("letsencrypt.display.ops.util")
@mock.patch("letsencrypt.display.ops.z_util")
def test_one(self, mock_util):
mock_util().menu.return_value = (display_util.OK, 0)
self.assertEqual(self._call([self.acc1]), self.acc1)
@mock.patch("letsencrypt.display.ops.util")
@mock.patch("letsencrypt.display.ops.z_util")
def test_two(self, mock_util):
mock_util().menu.return_value = (display_util.OK, 1)
self.assertEqual(self._call([self.acc1, self.acc2]), self.acc2)
@mock.patch("letsencrypt.display.ops.util")
@mock.patch("letsencrypt.display.ops.z_util")
def test_cancel(self, mock_util):
mock_util().menu.return_value = (display_util.CANCEL, 1)
self.assertTrue(self._call([self.acc1, self.acc2]) is None)
@ -339,12 +199,12 @@ class ChooseNamesTest(unittest.TestCase):
self._call(None)
self.assertEqual(mock_manual.call_count, 1)
@mock.patch("letsencrypt.display.ops.util")
@mock.patch("letsencrypt.display.ops.z_util")
def test_no_installer_cancel(self, mock_util):
mock_util().input.return_value = (display_util.CANCEL, [])
self.assertEqual(self._call(None), [])
@mock.patch("letsencrypt.display.ops.util")
@mock.patch("letsencrypt.display.ops.z_util")
def test_no_names_choose(self, mock_util):
self.mock_install().get_all_names.return_value = set()
mock_util().yesno.return_value = True
@ -355,14 +215,14 @@ class ChooseNamesTest(unittest.TestCase):
self.assertEqual(mock_util().input.call_count, 1)
self.assertEqual(actual_doms, [domain])
@mock.patch("letsencrypt.display.ops.util")
@mock.patch("letsencrypt.display.ops.z_util")
def test_no_names_quit(self, mock_util):
self.mock_install().get_all_names.return_value = set()
mock_util().yesno.return_value = False
self.assertEqual(self._call(self.mock_install), [])
@mock.patch("letsencrypt.display.ops.util")
@mock.patch("letsencrypt.display.ops.z_util")
def test_filter_names_valid_return(self, mock_util):
self.mock_install.get_all_names.return_value = set(["example.com"])
mock_util().checklist.return_value = (display_util.OK, ["example.com"])
@ -371,14 +231,14 @@ class ChooseNamesTest(unittest.TestCase):
self.assertEqual(names, ["example.com"])
self.assertEqual(mock_util().checklist.call_count, 1)
@mock.patch("letsencrypt.display.ops.util")
@mock.patch("letsencrypt.display.ops.z_util")
def test_filter_names_nothing_selected(self, mock_util):
self.mock_install.get_all_names.return_value = set(["example.com"])
mock_util().checklist.return_value = (display_util.OK, [])
self.assertEqual(self._call(self.mock_install), [])
@mock.patch("letsencrypt.display.ops.util")
@mock.patch("letsencrypt.display.ops.z_util")
def test_filter_names_cancel(self, mock_util):
self.mock_install.get_all_names.return_value = set(["example.com"])
mock_util().checklist.return_value = (
@ -397,7 +257,7 @@ class ChooseNamesTest(unittest.TestCase):
self.assertEqual(get_valid_domains(all_invalid), [])
self.assertEqual(len(get_valid_domains(two_valid)), 2)
@mock.patch("letsencrypt.display.ops.util")
@mock.patch("letsencrypt.display.ops.z_util")
def test_choose_manually(self, mock_util):
from letsencrypt.display.ops import _choose_names_manually
# No retry
@ -445,7 +305,7 @@ class SuccessInstallationTest(unittest.TestCase):
from letsencrypt.display.ops import success_installation
success_installation(names)
@mock.patch("letsencrypt.display.ops.util")
@mock.patch("letsencrypt.display.ops.z_util")
def test_success_installation(self, mock_util):
mock_util().notification.return_value = None
names = ["example.com", "abc.com"]
@ -467,7 +327,7 @@ class SuccessRenewalTest(unittest.TestCase):
from letsencrypt.display.ops import success_renewal
success_renewal(names, "renew")
@mock.patch("letsencrypt.display.ops.util")
@mock.patch("letsencrypt.display.ops.z_util")
def test_success_renewal(self, mock_util):
mock_util().notification.return_value = None
names = ["example.com", "abc.com"]