Somewhat usable CLI+disco

This commit is contained in:
Jakub Warmuz 2015-05-02 08:53:06 +00:00
parent 19cff00835
commit c9a7172388
No known key found for this signature in database
GPG key ID: 2A7BAD3A489B52EA
5 changed files with 89 additions and 86 deletions

View file

@ -59,7 +59,8 @@ def _common_run(args, config, acc, authenticator, installer):
doms = args.domains
if not doms:
return None
sys.exit("Please specify --domains, or --installer that will "
"help in domain names autodiscovery")
acme = client.Client(config, acc, authenticator, installer)
@ -73,10 +74,10 @@ def _common_run(args, config, acc, authenticator, installer):
except errors.LetsEncryptClientError:
return None
return acme, doms, authkey
return acme, doms
def run(args, config):
def run(args, config, plugins):
"""Obtain a certificate and install."""
acc = _account_init(args, config)
if acc is None:
@ -89,63 +90,61 @@ def run(args, config):
if args.authenticator is not None or args.installer is not None:
installer = plugins_disco.pick_installer(
config, args.installer)
config, args.installer, plugins)
authenticator = plugins_disco.pick_authenticator(
config, args.authenticator)
config, args.authenticator, plugins)
else:
# TODO: this assume that user doesn't want to pick authenticator
# and installer separately...
authenticator = installer = plugins_disco.pick_configurator(
config, args.configurator)
config, args.configurator, plugins)
if installer is None or authenticator is None:
return "Configurator could not be determined"
acme, auth, installer, doms, auth_key = _common_run(args, config, acc)
cert_file, chain_file = acme.obtain_certificate(doms)
acme.deploy_certificate(doms, authkey, cert_file, chain_file)
acme, doms = _common_run(args, config, acc, authenticator, installer)
cert_path, chain_path = acme.obtain_certificate(doms)
acme.deploy_certificate(doms, acc.key, cert_path, chain_path)
acme.enhance_config(doms, args.redirect)
def auth(args, config):
def auth(args, config, plugins):
"""Obtain a certificate (no install)."""
acc = _account_init(args, config)
if acc is None:
return None
authenticator = plugins_disco.pick_authenticator(config, args.authenticator)
authenticator = plugins_disco.pick_authenticator(config, args.authenticator, plugins)
if authenticator is None:
return "Authenticator could not be determined"
if args.installer is not None:
installer = plugins_disco.pick_installer(config, args.installer)
installer = plugins_disco.pick_installer(config, args.installer, plugins)
else:
installer = None
if args.domains is None:
if args.installer is not None:
return ("--domains not set and provided --installer does not "
"help in autodiscovery")
else:
return ("Please specify --domains, or --installer that will "
"help in domain names autodiscovery")
acme, doms, _ = _common_run(
args, config, authenticator=authenticator, installer=None)
acme, doms = _common_run(
args, config, acc, authenticator=authenticator, installer=None)
acme.obtain_certificate(doms)
def install(args, config):
def install(args, config, plugins):
"""Install (no auth)."""
installer = plugins_disco.pick_installer(config, args.installer)
acc = _account_init(args, config)
if acc is None:
return None
installer = plugins_disco.pick_installer(config, args.installer, plugins)
if installer is None:
return "Installer could not be determined"
acme, doms, authkey = _common_run(
args, config, authenticator=None, installer=installer)
assert args.cert_file is not None and args.chain_file is not None
acme.deploy_certificate(doms, authkey, args.cert_file, args.chain_file)
acme, doms = _common_run(
args, config, acc, authenticator=None, installer=installer)
assert args.cert_path is not None and args.chain_path is not None
acme.deploy_certificate(doms, acc.key, args.cert_path, args.chain_path)
acme.enhance_config(doms, args.redirect)
def revoke(args, config):
def revoke(args, config, plugins):
"""Revoke."""
if args.rev_cert is None and args.rev_key is None:
return "At least one of --certificate or --key is required"
@ -156,18 +155,17 @@ def revoke(args, config):
#client.revoke(config, args.no_confirm, args.rev_cert, args.rev_key)
def rollback(args, config):
def rollback(args, config, plugins):
"""Rollback."""
client.rollback(args.checkpoints, config)
def config_changes(args, config):
def config_changes(args, config, plugins):
"""View config changes.
View checkpoints and associated configuration changes.
"""
print args, config
client.config_changes(config)
@ -195,9 +193,8 @@ def _print_plugins(plugins):
print # whitespace between plugins
def plugins(args, config):
def plugins_cmd(args, config, plugins):
"""List plugins."""
plugins = plugins_disco.PluginRegistry.find_all()
logging.debug("Discovered plugins: %s", plugins)
ifaces = [] if args.ifaces is None else args.ifaces
@ -243,7 +240,7 @@ def config_help(name):
return interfaces.IConfig[name].__doc__
def create_parser():
def create_parser(plugins):
"""Create parser."""
parser = configargparse.ArgParser(
description=__doc__,
@ -278,7 +275,7 @@ def create_parser():
parser_rollback = add_subparser("rollback", rollback)
parrser_config_changes = add_subparser("config_changes", config_changes)
parser_plugins = add_subparser("plugins", plugins)
parser_plugins = add_subparser("plugins", plugins_cmd)
parser_plugins.add_argument("--init", action="store_true")
parser_plugins.add_argument("--prepare", action="store_true")
parser_plugins.add_argument(
@ -288,12 +285,10 @@ def create_parser():
"--installers", action="append_const", dest="ifaces",
const=interfaces.IInstaller)
parser_run.add_argument("--configurator")
for subparser in parser_run, parser_auth:
subparser.add_argument("-a", "--authenticator")
for subparser in parser_run, parser_auth, parser_install:
# parser_auth uses --installer for domains autodiscovery
subparser.add_argument("-i", "--installer")
parser.add_argument("--configurator")
parser.add_argument("-a", "--authenticator")
parser.add_argument("-i", "--installer")
# positional arg shadows --domains, instead of appending, and
# --domains is useful, because it can be stored in config
#for subparser in parser_run, parser_auth, parser_install:
@ -328,10 +323,9 @@ def create_parser():
paths_parser(parser.add_argument_group("paths"))
# TODO: plugin_parser should be called for every detected plugin
for name, plugin_cls in [
("apache", apache_configurator.ApacheConfigurator),
("nginx", nginx_configurator.NginxConfigurator)]:
plugin_cls.inject_parser_options(parser.add_argument_group(name), name)
for name, plugin_ep in plugins.iteritems():
plugin_ep.plugin_cls.inject_parser_options(
parser.add_argument_group(name), name)
return parser
@ -362,7 +356,8 @@ def paths_parser(parser):
def main(args=sys.argv[1:]):
"""Command line argument parsing and main script execution."""
# note: arg parser internally handles --help (and exits afterwards)
args = create_parser().parse_args(args)
plugins = plugins_disco.PluginRegistry.find_all()
args = create_parser(plugins).parse_args(args)
config = configuration.NamespaceConfig(args)
# Displayer
@ -391,7 +386,7 @@ def main(args=sys.argv[1:]):
# "{0}Root is required to run letsencrypt. Please use sudo.{0}"
# .format(os.linesep))
return args.func(args, config)
return args.func(args, config, plugins)
if __name__ == "__main__":

View file

@ -1,4 +1,5 @@
"""Contains UI methods for LE user operations."""
import logging
import os
import zope.component
@ -11,13 +12,13 @@ util = zope.component.getUtility # pylint: disable=invalid-name
def choose_plugin(prepared, question):
descs = [plugin.description if error is None
else "%s (Misconfigured)" % plugin.description
for (plugin, error) in prepared]
opts = [plugin_ep.name_with_description if error is None
else "%s (Misconfigured)" % plugin_ep.name_with_description
for (plugin_ep, error) in prepared]
while True:
code, index = util(interfaces.IDisplay).menu(
question, descs, help_label="More Info")
question, opts, help_label="More Info")
if code == display_util.OK:
return prepared[index][0]
@ -25,11 +26,11 @@ def choose_plugin(prepared, question):
if prepared[index][1] is not None:
msg = "Reported Error: %s" % prepared[index][1]
else:
msg = prepared[index][0].more_info()
msg = prepared[index][0].init().more_info()
util(interfaces.IDisplay).notification(
msg, height=display_util.HEIGHT)
else:
return
return None
def choose_authenticator(auths, errs):
@ -98,6 +99,7 @@ def choose_names(installer):
"""
if installer is None:
logging.debug("No installer, picking names manually")
return _choose_names_manually()
names = list(installer.get_all_names())

View file

@ -13,6 +13,7 @@ from letsencrypt.acme import challenges
from letsencrypt.client import achallenges
from letsencrypt.client import augeas_configurator
from letsencrypt.client import constants as core_constants
from letsencrypt.client import errors
from letsencrypt.client import interfaces
from letsencrypt.client import le_util
@ -949,11 +950,11 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
"""
uid = os.geteuid()
le_util.make_or_verify_dir(
self.config.config_dir, constants.CONFIG_DIRS_MODE, uid)
self.config.config_dir, core_constants.CONFIG_DIRS_MODE, uid)
le_util.make_or_verify_dir(
self.config.work_dir, constants.CONFIG_DIRS_MODE, uid)
self.config.work_dir, core_constants.CONFIG_DIRS_MODE, uid)
le_util.make_or_verify_dir(
self.config.backup_dir, constants.CONFIG_DIRS_MODE, uid)
self.config.backup_dir, core_constants.CONFIG_DIRS_MODE, uid)
def get_version(self):
"""Return version of Apache Server.

View file

@ -44,6 +44,10 @@ class PluginEntryPoint(object):
def __repr__(self):
return 'PluginEntryPoint#{0}'.format(self.name)
@property
def name_with_description(self):
return "{0} ({1})".format(self.name, self.plugin_cls.description)
class PluginRegistry(collections.Mapping):
"""Plugin registry."""
@ -66,8 +70,8 @@ class PluginRegistry(collections.Mapping):
def filter(self, *ifaces_groups):
"""Filter plugins based on interfaces."""
return type(self)(dict(
plugin_ep
for plugin_ep in self.plugins.iteritems()
(name, plugin_ep)
for name, plugin_ep in self.plugins.iteritems()
if not ifaces_groups or any(
all(iface.implementedBy(plugin_ep.plugin_cls)
for iface in ifaces)
@ -110,59 +114,59 @@ def prepare_plugins(initialized):
"""Prepare plugins."""
prepared = {}
for plugin_cls, plugin in initialized.iteritems():
for name, plugin_ep in initialized.iteritems():
error = None
try:
plugin.prepare()
plugin_ep.init().prepare()
except errors.LetsEncryptMisconfigurationError as error:
logging.debug("Misconfigured %s: %s", plugin, error)
logging.debug("Misconfigured %s: %s", plugin_ep, error)
except errors.LetsEncryptNoInstallationError as error:
logging.debug("No installation (%s): %s", plugin, error)
logging.debug("No installation (%s): %s", plugin_ep, error)
continue
prepared[plugin_cls] = (plugin, error)
prepared[name] = (plugin_ep, error)
return prepared # succefully prepared + misconfigured
def pick_plugin(config, default, ifaces, question):
plugins = find_plugins()
names = name_plugins(plugins)
def pick_plugin(config, default, plugins, ifaces, question):
if default is not None:
filtered = [names[default]]
filtered = {default: plugins[default]}
else:
filtered = filter_plugins(plugins, ifaces)
filtered = plugins.filter(ifaces)
initialized = dict((plugin_cls, plugin_cls(config))
for plugin_cls in filtered)
verified = verify_plugins(initialized, ifaces)
prepared = prepare_plugins(initialized)
for plugin_ep in plugins.itervalues():
plugin_ep.init(config)
verified = verify_plugins(filtered, ifaces)
prepared = prepare_plugins(filtered)
if len(prepared) > 1:
logging.debug("Multiple candidate plugins: %s", prepared)
return display_ops.choose_plugin(prepared.values(), question)
return display_ops.choose_plugin(prepared.values(), question).init()
elif len(prepared) == 1:
logging.debug("Single candidate plugin: %s", prepared)
return prepared.values()[0]
plugin_ep = prepared.values()[0][0]
logging.debug("Single candidate plugin: %s", plugin_ep)
return plugin_ep.init()
else:
logging.debug("No candidate plugin")
return None
def pick_authenticator(config, default):
def pick_authenticator(config, default, plugins):
"""Pick authentication plugin."""
return pick_plugin(
config, default, (interfaces.IAuthenticator,),
config, default, plugins, (interfaces.IAuthenticator,),
"How would you like to authenticate with Let's Encrypt CA?")
def pick_installer(config, default):
def pick_installer(config, default, plugins):
"""Pick installer plugin."""
return pick_plugin(config, default, (interfaces.IInstaller,),
return pick_plugin(config, default, plugins, (interfaces.IInstaller,),
"How would you like to install certificates?")
def pick_configurator(config, default):
def pick_configurator(config, default, plugins):
"""Pick configurator plugin."""
return pick_plugin(
config, default, (interfaces.IAuthenticator, interfaces.IInstaller),
config, default, plugins,
(interfaces.IAuthenticator, interfaces.IInstaller),
"How would you like to install certificates?")

View file

@ -12,6 +12,7 @@ import zope.interface
from letsencrypt.acme import challenges
from letsencrypt.client import achallenges
from letsencrypt.client import constants as core_constants
from letsencrypt.client import errors
from letsencrypt.client import interfaces
from letsencrypt.client import le_util
@ -364,11 +365,11 @@ class NginxConfigurator(common.Plugin):
"""
uid = os.geteuid()
le_util.make_or_verify_dir(
self.config.work_dir, constants.CONFIG_DIRS_MODE, uid)
self.config.work_dir, core_constants.CONFIG_DIRS_MODE, uid)
le_util.make_or_verify_dir(
self.config.backup_dir, constants.CONFIG_DIRS_MODE, uid)
self.config.backup_dir, core_constants.CONFIG_DIRS_MODE, uid)
le_util.make_or_verify_dir(
self.config.config_dir, constants.CONFIG_DIRS_MODE, uid)
self.config.config_dir, core_constants.CONFIG_DIRS_MODE, uid)
def get_version(self):
"""Return version of Nginx Server.