plugins.disco

This commit is contained in:
Jakub Warmuz 2015-03-30 12:33:49 +00:00
parent cd0b99ae5d
commit 8faa877c45
No known key found for this signature in database
GPG key ID: 2A7BAD3A489B52EA
4 changed files with 170 additions and 28 deletions

View file

@ -4,6 +4,9 @@ import pkg_resources
from letsencrypt.acme import challenges
SETUPTOOLS_PLUGINS_ENTRY_POINT = "letsencrypt.plugins"
"""Setuptools entry point group name for plugins."""
S_SIZE = 32
"""Size (in bytes) of secret base64-encoded octet string "s" used in
challenges."""

View file

@ -10,6 +10,28 @@ from letsencrypt.client.display import util as display_util
util = zope.component.getUtility # pylint: disable=invalid-name
def choose_plugin(prepared, question):
descs = [plugin.description if error is None
else "%s (Misconfigured)" % plugin.description
for (plugin, error) in prepared]
while True:
code, index = util(interfaces.IDisplay).menu(
question, descs, help_label="More Info")
if code == display_util.OK:
return prepared[index][0]
elif code == display_util.HELP:
if prepared[index][1] is not None:
msg = "Reported Error: %s" % prepared[index][1]
else:
msg = prepared[index][0].more_info()
util(interfaces.IDisplay).notification(
msg, height=display_util.HEIGHT)
else:
return
def choose_authenticator(auths, errs):
"""Allow the user to choose their authenticator.

View file

@ -5,16 +5,13 @@ import zope.interface
# pylint: disable=too-few-public-methods
class IAuthenticator(zope.interface.Interface):
"""Generic Let's Encrypt Authenticator.
class IPlugin(zope.interface.Interface):
"""Let's Encrypt plugin."""
Class represents all possible tools processes that have the
ability to perform challenges and attain a certificate.
"""
description = zope.interface.Attribute("Short plugin description")
def prepare():
"""Prepare the authenticator.
"""Prepare the plugin.
Finish up any additional initialization.
@ -25,6 +22,23 @@ class IAuthenticator(zope.interface.Interface):
"""
def more_info():
"""Human-readable string to help the user.
Should describe the steps taken and any relevant info to help the user
decide which plugin to use.
"""
class IAuthenticator(IPlugin):
"""Generic Let's Encrypt Authenticator.
Class represents all possible tools processes that have the
ability to perform challenges and attain a certificate.
"""
def get_chall_pref(domain):
"""Return list of challenge preferences.
@ -70,14 +84,6 @@ class IAuthenticator(zope.interface.Interface):
"""
def more_info():
"""Human-readable string to help the user.
Should describe the steps taken and any relevant info to help the user
decide which Authenticator to use.
"""
class IConfig(zope.interface.Interface):
"""Let's Encrypt user-supplied configuration.
@ -124,25 +130,13 @@ class IConfig(zope.interface.Interface):
"Contains standard Apache SSL directives.")
class IInstaller(zope.interface.Interface):
class IInstaller(IPlugin):
"""Generic Let's Encrypt Installer Interface.
Represents any server that an X509 certificate can be placed.
"""
def prepare():
"""Prepare the installer.
Finish up any additional initialization.
:raises letsencrypt.client.errors.LetsEncryptMisconfigurationError`:
when full initialization cannot be completed.
:raises letsencrypt.errors.LetsEncryptNoInstallationError`:
when the necessary programs/files cannot be located.
"""
def get_all_names():
"""Returns all names that may be authenticated."""

View file

@ -0,0 +1,123 @@
"""Utilities for plugins discovery and selection."""
import collections
import logging
import pkg_resources
import zope.interface
from letsencrypt.client import constants
from letsencrypt.client import errors
from letsencrypt.client import interfaces
from letsencrypt.client.display import ops as display_ops
def name_plugins(plugins):
# TODO: actually make it unambiguous...
names = {}
for plugin_cls, entry_points in plugins.iteritems():
entry_point = next(iter(entry_points)) # entry_points.peek()
names[plugin_cls] = entry_point.name
return names
def find_plugins():
"""Find plugins using setuptools entry points."""
plugins = collections.defaultdict(set)
for entry_point in pkg_resources.iter_entry_points(
constants.SETUPTOOLS_PLUGINS_ENTRY_POINT):
plugin_cls = entry_point.load()
plugins[plugin_cls].add(entry_point)
return plugins
def filter_plugins(plugins, *ifaces_groups):
"""Filter plugins based on interfaces."""
return dict(
(plugin_cls, entry_points)
for plugin_cls, entry_points in plugins.iteritems()
if not ifaces_groups or any(
all(iface.implementedBy(plugin_cls) for iface in ifaces)
for ifaces in ifaces_groups))
def verify_plugins(initialized, ifaces):
"""Verify plugin objects."""
verified = {}
for plugin_cls, plugin in initialized.iteritems():
verifies = True
for iface in ifaces: # zope.interface.providedBy(plugin)
try:
zope.interface.verify.verifyObject(iface, plugin)
except zope.interface.exceptions.BrokenImplementation:
if iface.implementedBy(plugin_cls):
logging.debug(
"%s implements %s but object does "
"not verify", plugin_cls, iface.__name__)
verifies = False
break
if verifies:
verified[plugin_cls] = plugin
return verified
def prepare_plugins(initialized):
"""Prepare plugins."""
prepared = {}
for plugin_cls, plugin in initialized.iteritems():
error = None
try:
plugin.prepare()
except errors.LetsEncryptMisconfigurationError as error:
logging.debug("Misconfigured %s: %s", plugin, error)
except errors.LetsEncryptNoInstallationError as error:
logging.debug("No installation (%s): %s", plugin, error)
continue
prepared[plugin_cls] = (plugin, error)
return prepared # succefully prepared + misconfigured
def pick_plugin(config, default, ifaces, question):
plugins = find_plugins()
names = name_plugins(plugins)
if default is not None:
filtered = [names[default]]
else:
filtered = filter_plugins(plugins, ifaces)
initialized = dict((plugin_cls, plugin_cls(config))
for plugin_cls in filtered)
verified = verify_plugins(initialized, ifaces)
prepared = prepare_plugins(initialized)
if len(prepared) > 1:
logging.debug("Multiple candidate plugins: %s", prepared)
return display_ops.choose_plugin(prepared.values(), question)
elif len(prepared) == 1:
logging.debug("Single candidate plugin: %s", prepared)
return prepared.values()[0]
else:
logging.debug("No candidate plugin")
return None
def pick_authenticator(config, default):
"""Pick authentication plugin."""
return pick_plugin(
config, default, (interfaces.IAuthenticator,),
"How would you like to authenticate with Let's Encrypt CA?")
def pick_installer(config, default):
"""Pick installer plugin."""
return pick_plugin(config, default, (interfaces.IInstaller,),
"How would you like to install certificates?")
def pick_configurator(config, default):
"""Pick configurator plugin."""
return pick_plugin(
config, default, (interfaces.IAuthenticator, interfaces.IInstaller),
"How would you like to install certificates?")