Full coverage and lint for disco/disco_test

This commit is contained in:
Jakub Warmuz 2015-05-03 14:27:22 +00:00
parent 1878db3416
commit 138a9e9f01
No known key found for this signature in database
GPG key ID: 2A7BAD3A489B52EA
4 changed files with 130 additions and 48 deletions

View file

@ -176,7 +176,7 @@ def plugins_cmd(args, config, plugins): # TODO: Use IDiplay rathern than print
logging.debug("Discovered plugins: %s", plugins)
ifaces = [] if args.ifaces is None else args.ifaces
filtered = plugins.filter_ifaces(*((iface,) for iface in ifaces))
filtered = plugins.ifaces(*((iface,) for iface in ifaces))
logging.debug("Filtered plugins: %r", filtered)
if not args.init and not args.prepare:

View file

@ -43,7 +43,7 @@ def _pick_plugin(config, default, plugins, question, ifaces):
# throw more UX-friendly error if default not in plugins
filtered = plugins.filter(lambda p_ep: p_ep.name == default)
else:
filtered = plugins.filter_ifaces(ifaces)
filtered = plugins.ifaces(ifaces)
filtered.init(config)
verified = filtered.verify(ifaces)

View file

@ -35,6 +35,13 @@ class PluginEntryPoint(object):
"""Name with description. Handy for UI."""
return "{0} ({1})".format(self.name, self.plugin_cls.description)
def ifaces(self, *ifaces_groups):
"""Does plugin implements specified interface groups?"""
return not ifaces_groups or any(
all(iface.implementedBy(self.plugin_cls)
for iface in ifaces)
for ifaces in ifaces_groups)
@property
def initialized(self):
"""Has the plugin been initialized already?"""
@ -104,7 +111,7 @@ class PluginEntryPoint(object):
"Description: {0}".format(self.plugin_cls.description),
"Interfaces: {0}".format(", ".join(
iface.__name__ for iface in zope.interface.implementedBy(
self.plugin_cls))),
self.plugin_cls))),
"Entry point: {0}".format(self.entry_point),
]
@ -131,13 +138,23 @@ class PluginsRegistry(collections.Mapping):
plugin_ep = PluginEntryPoint(entry_point)
assert plugin_ep.name not in plugins, (
"PREFIX_FREE_DISTRIBUTIONS messed up")
# providedBy | pylint: disable=no-member
if interfaces.IPluginFactory.providedBy(plugin_ep.plugin_cls):
plugins[plugin_ep.name] = plugin_ep
else:
else: # pragma: no cover
logging.warning("Plugin entry point %s does not provide "
"IPluginFactory, skipping", plugin_ep)
return cls(plugins)
def __getitem__(self, name):
return self.plugins[name]
def __iter__(self):
return iter(self.plugins)
def __len__(self):
return len(self.plugins)
def init(self, config):
"""Initialize all plugins in the registry."""
return [plugin_ep.init(config) for plugin_ep
@ -148,18 +165,17 @@ class PluginsRegistry(collections.Mapping):
return type(self)(dict((name, plugin_ep) for name, plugin_ep
in self.plugins.iteritems() if pred(plugin_ep)))
def filter_ifaces(self, *ifaces_groups):
def ifaces(self, *ifaces_groups):
"""Filter plugins based on interfaces."""
return self.filter(lambda plugin_ep: not ifaces_groups or any(
all(iface.implementedBy(plugin_ep.plugin_cls)
for iface in ifaces)
for ifaces in ifaces_groups))
# pylint: disable=star-args
return self.filter(lambda p_ep: p_ep.ifaces(*ifaces_groups))
def verify(self, ifaces):
"""Filter plugins based on verification."""
return self.filter(lambda p_ep: p_ep.verify(ifaces))
def prepare(self):
"""Prepare all plugins in the registry."""
return [plugin_ep.prepare() for plugin_ep in self.plugins.itervalues()]
def available(self):
@ -171,16 +187,7 @@ class PluginsRegistry(collections.Mapping):
return "{0}({1!r})".format(
self.__class__.__name__, set(self.plugins.itervalues()))
def __getitem__(self, name):
return self.plugins[name]
def __iter__(self):
return iter(self.plugins)
def __len__(self):
return len(self.plugins)
def __str__(self):
if not self.plugins:
return "No plugins"
return "\n\n".join(map(str, self.plugins.itervalues()))
return "\n\n".join(str(p_ep) for p_ep in self.plugins.itervalues())

View file

@ -6,8 +6,15 @@ import mock
import zope.interface
from letsencrypt.client import errors
from letsencrypt.client import interfaces
from letsencrypt.client.plugins.standalone import authenticator
EP_SA = pkg_resources.EntryPoint(
"sa", "letsencrypt.client.plugins.standalone.authenticator",
attrs=("StandaloneAuthenticator",),
dist=mock.MagicMock(key="letsencrypt"))
class PluginEntryPointTest(unittest.TestCase):
"""Tests for letsencrypt.client.plugins.disco.PluginEntryPoint."""
@ -24,14 +31,8 @@ class PluginEntryPointTest(unittest.TestCase):
self.ep3 = pkg_resources.EntryPoint(
"ep3", "a.ep3", dist=mock.MagicMock(key="p3"))
# something we can load()/require(), TODO: use mock
self.ep_sa = pkg_resources.EntryPoint(
"sa", "letsencrypt.client.plugins.standalone.authenticator",
attrs=('StandaloneAuthenticator',),
dist=mock.MagicMock(key="letsencrypt"))
from letsencrypt.client.plugins.disco import PluginEntryPoint
self.plugin_ep = PluginEntryPoint(self.ep_sa)
self.plugin_ep = PluginEntryPoint(EP_SA)
def test_entry_point_to_plugin_name(self):
from letsencrypt.client.plugins.disco import PluginEntryPoint
@ -41,7 +42,7 @@ class PluginEntryPointTest(unittest.TestCase):
self.ep1prim: "p2:ep1",
self.ep2: "p2:ep2",
self.ep3: "p3:ep3",
self.ep_sa: "sa",
EP_SA: "sa",
}
for entry_point, name in names.iteritems():
@ -52,12 +53,18 @@ class PluginEntryPointTest(unittest.TestCase):
self.assertTrue(
self.plugin_ep.name_with_description.startswith("sa ("))
def test_ifaces(self):
self.assertTrue(self.plugin_ep.ifaces((interfaces.IAuthenticator,)))
self.assertFalse(self.plugin_ep.ifaces((interfaces.IInstaller,)))
self.assertFalse(self.plugin_ep.ifaces((
interfaces.IInstaller, interfaces.IAuthenticator)))
def test__init__(self):
self.assertFalse(self.plugin_ep.initialized)
self.assertFalse(self.plugin_ep.prepared)
self.assertFalse(self.plugin_ep.misconfigured)
self.assertFalse(self.plugin_ep.available)
self.assertTrue(self.plugin_ep.entry_point is self.ep_sa)
self.assertTrue(self.plugin_ep.entry_point is EP_SA)
self.assertEqual("sa", self.plugin_ep.name)
self.assertTrue(
@ -80,24 +87,26 @@ class PluginEntryPointTest(unittest.TestCase):
self.assertFalse(self.plugin_ep.available)
def test_verify(self):
i1 = mock.MagicMock(__name__="i1")
i2 = mock.MagicMock(__name__="i2")
i3 = mock.MagicMock(__name__="i3")
iface1 = mock.MagicMock(__name__="iface1")
iface2 = mock.MagicMock(__name__="iface2")
iface3 = mock.MagicMock(__name__="iface3")
# pylint: disable=protected-access
self.plugin_ep._initialized = plugin = mock.MagicMock()
exceptions = zope.interface.exceptions
with mock.patch("letsencrypt.client.plugins.disco.zope.interface") as mock_zope:
with mock.patch("letsencrypt.client.plugins."
"disco.zope.interface") as mock_zope:
mock_zope.exceptions = exceptions
def verify_object(iface, obj):
def verify_object(iface, obj): # pylint: disable=missing-docstring
assert obj is plugin
assert iface is i1 or iface is i2 or iface is i3
if iface is i3:
assert iface is iface1 or iface is iface2 or iface is iface3
if iface is iface3:
raise mock_zope.exceptions.BrokenImplementation(None, None)
mock_zope.verify.verifyObject.side_effect = verify_object
self.assertTrue(self.plugin_ep.verify((i1,)))
self.assertTrue(self.plugin_ep.verify((i1, i2)))
self.assertFalse(self.plugin_ep.verify((i3,)))
self.assertFalse(self.plugin_ep.verify((i1, i3)))
self.assertTrue(self.plugin_ep.verify((iface1,)))
self.assertTrue(self.plugin_ep.verify((iface1, iface2)))
self.assertFalse(self.plugin_ep.verify((iface3,)))
self.assertFalse(self.plugin_ep.verify((iface1, iface3)))
def test_prepare(self):
config = mock.MagicMock()
@ -105,11 +114,14 @@ class PluginEntryPointTest(unittest.TestCase):
self.plugin_ep.prepare()
self.assertTrue(self.plugin_ep.prepared)
self.assertFalse(self.plugin_ep.misconfigured)
str(self.plugin_ep) # output doesn't matter that much, just jest if it runs
# output doesn't matter that much, just test if it runs
str(self.plugin_ep)
def test_prepare_misconfigured(self):
plugin = mock.MagicMock()
plugin.prepare.side_effect = errors.LetsEncryptMisconfigurationError
# pylint: disable=protected-access
self.plugin_ep._initialized = plugin
self.assertTrue(isinstance(self.plugin_ep.prepare(),
errors.LetsEncryptMisconfigurationError))
@ -120,6 +132,7 @@ class PluginEntryPointTest(unittest.TestCase):
def test_prepare_no_installation(self):
plugin = mock.MagicMock()
plugin.prepare.side_effect = errors.LetsEncryptNoInstallationError
# pylint: disable=protected-access
self.plugin_ep._initialized = plugin
self.assertTrue(isinstance(self.plugin_ep.prepare(),
errors.LetsEncryptNoInstallationError))
@ -136,19 +149,81 @@ class PluginsRegistryTest(unittest.TestCase):
def setUp(self):
from letsencrypt.client.plugins.disco import PluginsRegistry
# TODO: mock out pkg_resources.iter_entry_points
self.plugins = PluginsRegistry.find_all()
self.plugin_ep = mock.MagicMock(name="mock")
self.plugins = {"mock": self.plugin_ep}
self.reg = PluginsRegistry(self.plugins)
def test_find_all(self):
from letsencrypt.client.plugins.disco import PluginsRegistry
with mock.patch("letsencrypt.client.plugins.disco"
".pkg_resources") as mock_pkg:
mock_pkg.iter_entry_points.return_value = iter([EP_SA])
plugins = PluginsRegistry.find_all()
self.assertTrue(plugins["sa"].plugin_cls
is authenticator.StandaloneAuthenticator)
self.assertTrue(plugins["sa"].entry_point is EP_SA)
def test_getitem(self):
self.assertEqual(self.plugin_ep, self.reg["mock"])
def test_iter(self):
self.assertEqual(["mock"], list(self.reg))
def test_len(self):
self.assertEqual(1, len(self.reg))
self.plugins.clear()
self.assertEqual(0, len(self.reg))
def test_init(self):
self.assertTrue(self.plugins["standalone"].plugin_cls
is authenticator.StandaloneAuthenticator)
self.plugin_ep.init.return_value = "baz"
self.assertEqual(["baz"], self.reg.init("bar"))
self.plugin_ep.init.assert_called_once_with("bar")
def test_id_filter(self):
filtered = self.plugins.filter(lambda _: True)
self.assertEqual(len(self.plugins), len(filtered))
def test_filter(self):
self.plugins.update({
"foo": "bar",
"bar": "foo",
"baz": "boo",
})
self.assertEqual(
{"foo": "bar", "baz": "boo"},
self.reg.filter(lambda p_ep: str(p_ep).startswith("b")))
def test_ifaces(self):
self.plugin_ep.ifaces.return_value = True
self.assertEqual(self.plugins, self.reg.ifaces().plugins)
self.plugin_ep.ifaces.return_value = False
self.assertEqual({}, self.reg.ifaces().plugins)
def test_verify(self):
self.plugin_ep.verify.return_value = True
self.assertEqual(
self.plugins, self.reg.verify(mock.MagicMock()).plugins)
self.plugin_ep.verify.return_value = False
self.assertEqual({}, self.reg.verify(mock.MagicMock()).plugins)
def test_prepare(self):
self.plugin_ep.prepare.return_value = "baz"
self.assertEqual(["baz"], self.reg.prepare())
self.plugin_ep.prepare.assert_called_once_with()
def test_available(self):
self.plugin_ep.available = True
self.assertEqual(self.plugins, self.reg.available().plugins)
self.plugin_ep.available = False
self.assertEqual({}, self.reg.available().plugins)
def test_repr(self):
repr(self.plugins)
self.plugin_ep.__repr__ = lambda _: "PluginEntryPoint#mock"
self.assertEqual("PluginsRegistry(set([PluginEntryPoint#mock]))",
repr(self.reg))
def test_str(self):
self.plugin_ep.__str__ = lambda _: "Mock"
self.plugins["foo"] = "Mock"
self.assertEqual("Mock\n\nMock", str(self.reg))
self.plugins.clear()
self.assertEqual("No plugins", str(self.reg))
if __name__ == "__main__":