Stop IDisplay AssertionErrors (#4010)

Fixes #3996.

I'm pretty confident this PR solves the problem. I've audited all calls to IDisplay methods and the assertions done in certbot.display.util are now done in all our unit tests.

With that said, it wouldn't hurt to have someone else double check I didn't miss anything. The easiest way to do this is to grep for IDisplay in our code and ensure all calls to IDisplay methods are valid. This means every method call other than notification (because a notification call is always OK) either provides a value for default or force_interactive. This is defined in interfaces.py.

I've also been considering removing the assertion that's been causing us trouble here from our release. The only argument I have for not doing so is it may hinder 3rd party plugin development. When they use IDisplay, they have the same problem as we do with prompting users without a TTY. Not keeping this assertion in makes it more likely they won't notice the issue and Certbot will crash on an unsuspecting user.

With that said, none of our known 3rd party plugins use IDisplay at all.

* Provide force_interactive in _get_certname

* Use force_interactive when asking for webroot

* Factor IDisplay assertion into it's own function

* Add util.patch_get_utility()

* Allow custom path to patch_get_utiity

* Change GetEmailTest to use patch_get_utility

* Use new_callable to create new objects

* Modify tests to use patch_get_utility

* Improve FreezableMock documentation

* Add user facing error to TTY magic

* Comment out assert_valid_call

* Add test_input_assertion_fail2()
This commit is contained in:
Brad Warren 2017-01-10 16:25:33 -08:00 committed by GitHub
parent d8e72ee3bf
commit 9c9004aff1
17 changed files with 212 additions and 87 deletions

View file

@ -13,6 +13,7 @@ from certbot import achallenges
from certbot import errors
from certbot.tests import acme_util
from certbot.tests import util as certbot_util
from certbot_apache import configurator
from certbot_apache import parser
@ -97,7 +98,7 @@ class MultipleVhostsTest(util.ApacheTest):
# Weak test..
ApacheConfigurator.add_parser_arguments(mock.MagicMock())
@mock.patch("zope.component.getUtility")
@certbot_util.patch_get_utility()
def test_get_all_names(self, mock_getutility):
mock_getutility.notification = mock.MagicMock(return_value=True)
names = self.config.get_all_names()
@ -105,7 +106,7 @@ class MultipleVhostsTest(util.ApacheTest):
["certbot.demo", "ocspvhost.com", "encryption-example.demo",
"ip-172-30-0-17", "*.blue.purple.com"]))
@mock.patch("zope.component.getUtility")
@certbot_util.patch_get_utility()
@mock.patch("certbot_apache.configurator.socket.gethostbyaddr")
def test_get_all_names_addrs(self, mock_gethost, mock_getutility):
mock_gethost.side_effect = [("google.com", "", ""), socket.error]
@ -1117,7 +1118,7 @@ class MultipleVhostsTest(util.ApacheTest):
not_rewriterule = "NotRewriteRule ^ ..."
self.assertFalse(self.config._sift_rewrite_rule(not_rewriterule))
@mock.patch("certbot_apache.configurator.zope.component.getUtility")
@certbot_util.patch_get_utility()
def test_make_vhost_ssl_with_existing_rewrite_rule(self, mock_get_utility):
self.config.parser.modules.add("rewrite_module")
@ -1146,7 +1147,7 @@ class MultipleVhostsTest(util.ApacheTest):
mock_get_utility().add_message.assert_called_once_with(mock.ANY,
mock.ANY)
@mock.patch("certbot_apache.configurator.zope.component.getUtility")
@certbot_util.patch_get_utility()
def test_make_vhost_ssl_with_existing_rewrite_conds(self, mock_get_utility):
self.config.parser.modules.add("rewrite_module")

View file

@ -1,12 +1,13 @@
"""Test certbot_apache.display_ops."""
import sys
import unittest
import mock
import zope.component
from certbot import errors
from certbot.display import util as display_util
from certbot import errors
from certbot.tests import util as certbot_util
from certbot_apache import obj
@ -17,8 +18,6 @@ class SelectVhostTest(unittest.TestCase):
"""Tests for certbot_apache.display_ops.select_vhost."""
def setUp(self):
zope.component.provideUtility(display_util.FileDisplay(sys.stdout,
False))
self.base_dir = "/example_path"
self.vhosts = util.get_vh_truth(
self.base_dir, "debian_apache_2_4/multiple_vhosts")
@ -28,12 +27,12 @@ class SelectVhostTest(unittest.TestCase):
from certbot_apache.display_ops import select_vhost
return select_vhost("example.com", vhosts)
@mock.patch("certbot_apache.display_ops.zope.component.getUtility")
@certbot_util.patch_get_utility()
def test_successful_choice(self, mock_util):
mock_util().menu.return_value = (display_util.OK, 3)
self.assertEqual(self.vhosts[3], self._call(self.vhosts))
@mock.patch("certbot_apache.display_ops.zope.component.getUtility")
@certbot_util.patch_get_utility()
def test_noninteractive(self, mock_util):
mock_util().menu.side_effect = errors.MissingCommandlineFlag("no vhost default")
try:
@ -41,7 +40,7 @@ class SelectVhostTest(unittest.TestCase):
except errors.MissingCommandlineFlag as e:
self.assertTrue("vhost ambiguity" in e.message)
@mock.patch("certbot_apache.display_ops.zope.component.getUtility")
@certbot_util.patch_get_utility()
def test_more_info_cancel(self, mock_util):
mock_util().menu.side_effect = [
(display_util.HELP, 1),
@ -56,7 +55,7 @@ class SelectVhostTest(unittest.TestCase):
self.assertEqual(self._call([]), None)
@mock.patch("certbot_apache.display_ops.display_util")
@mock.patch("certbot_apache.display_ops.zope.component.getUtility")
@certbot_util.patch_get_utility()
@mock.patch("certbot_apache.display_ops.logger")
def test_small_display(self, mock_logger, mock_util, mock_display_util):
mock_display_util.WIDTH = 20
@ -65,7 +64,7 @@ class SelectVhostTest(unittest.TestCase):
self.assertEqual(mock_logger.debug.call_count, 1)
@mock.patch("certbot_apache.display_ops.zope.component.getUtility")
@certbot_util.patch_get_utility()
def test_multiple_names(self, mock_util):
mock_util().menu.return_value = (display_util.OK, 5)

View file

@ -156,7 +156,8 @@ def _get_certname(config, verb):
if not choices:
raise errors.Error("No existing certificates found.")
code, index = disp.menu("Which certificate would you like to {0}?".format(verb),
choices, ok_label="Select", flag="--cert-name")
choices, ok_label="Select", flag="--cert-name",
force_interactive=True)
if code != display_util.OK or not index in range(0, len(choices)):
raise errors.Error("User ended interaction.")
certname = choices[index]

View file

@ -253,14 +253,16 @@ class FileDisplay(object):
:rtype: bool
"""
msg = "Invalid IDisplay call for this prompt:\n{0}".format(prompt)
if cli_flag:
msg += ("\nYou can set an answer to "
"this prompt with the {0} flag".format(cli_flag))
assert default is not None or force_interactive, msg
# assert_valid_call(prompt, default, cli_flag, force_interactive)
if self._can_interact(force_interactive):
return False
elif default is None:
msg = "Unable to get an answer for the question:\n{0}".format(prompt)
if cli_flag:
msg += (
"\nYou can provide an answer on the "
"command line with the {0} flag.".format(cli_flag))
raise errors.Error(msg)
else:
logger.debug(
"Falling back to default %s for the prompt:\n%s",
@ -403,6 +405,24 @@ class FileDisplay(object):
return OK, selection
def assert_valid_call(prompt, default, cli_flag, force_interactive):
"""Verify that provided arguments is a valid IDisplay call.
:param str prompt: prompt for the user
:param default: default answer to prompt
:param str cli_flag: command line option for setting an answer
to this question
:param bool force_interactive: if interactivity is forced by the
IDisplay call
"""
msg = "Invalid IDisplay call for this prompt:\n{0}".format(prompt)
if cli_flag:
msg += ("\nYou can set an answer to "
"this prompt with the {0} flag".format(cli_flag))
assert default is not None or force_interactive, msg
@zope.interface.implementer(interfaces.IDisplay)
class NoninteractiveDisplay(object):
"""An iDisplay implementation that never asks for interactive user input"""

View file

@ -8,7 +8,9 @@ import mock
from acme import challenges
from certbot import errors
from certbot.tests import acme_util
from certbot.tests import util as test_util
class AuthenticatorTest(unittest.TestCase):
@ -42,12 +44,12 @@ class AuthenticatorTest(unittest.TestCase):
self.assertEqual(self.auth.get_chall_pref('example.org'),
[challenges.HTTP01, challenges.DNS01])
@mock.patch('certbot.plugins.manual.zope.component.getUtility')
@test_util.patch_get_utility()
def test_ip_logging_not_ok(self, mock_get_utility):
mock_get_utility().yesno.return_value = False
self.assertRaises(errors.PluginError, self.auth.perform, [])
@mock.patch('certbot.plugins.manual.zope.component.getUtility')
@test_util.patch_get_utility()
def test_ip_logging_ok(self, mock_get_utility):
mock_get_utility().yesno.return_value = True
self.auth.perform([])
@ -75,7 +77,7 @@ class AuthenticatorTest(unittest.TestCase):
self.auth.env[self.http_achall.domain]['CERTBOT_AUTH_OUTPUT'],
http_expected)
@mock.patch('certbot.plugins.manual.zope.component.getUtility')
@test_util.patch_get_utility()
def test_manual_perform(self, mock_get_utility):
self.config.manual_public_ip_logging_ok = True
self.assertEqual(

View file

@ -169,7 +169,7 @@ class AuthenticatorTest(unittest.TestCase):
mock_util.already_listening.assert_called_once_with(port, False)
mock_util.already_listening.reset_mock()
@mock.patch("certbot.plugins.standalone.zope.component.getUtility")
@test_util.patch_get_utility()
def test_perform(self, unused_mock_get_utility):
achalls = self._get_achalls()
@ -177,7 +177,7 @@ class AuthenticatorTest(unittest.TestCase):
self.assertEqual(mock.sentinel.responses, self.auth.perform(achalls))
self.auth.perform2.assert_called_once_with(achalls)
@mock.patch("certbot.plugins.standalone.zope.component.getUtility")
@test_util.patch_get_utility()
def _test_perform_bind_errors(self, errno, achalls, mock_get_utility):
port = get_open_port()
def _perform2(unused_achalls):

View file

@ -51,7 +51,7 @@ class AlreadyListeningTestNoPsutil(AlreadyListeningTest):
return super(
AlreadyListeningTestNoPsutil, cls)._call(*args, **kwargs)
@mock.patch("certbot.plugins.util.zope.component.getUtility")
@test_util.patch_get_utility()
def test_ports_available(self, mock_getutil):
# Ensure we don't get error
with mock.patch("socket.socket.bind"):
@ -59,7 +59,7 @@ class AlreadyListeningTestNoPsutil(AlreadyListeningTest):
self.assertFalse(self._call(80, True))
self.assertEqual(mock_getutil.call_count, 0)
@mock.patch("certbot.plugins.util.zope.component.getUtility")
@test_util.patch_get_utility()
def test_ports_blocked(self, mock_getutil):
with mock.patch("certbot.plugins.util.socket.socket.bind") as mock_bind:
mock_bind.side_effect = socket.error
@ -77,7 +77,7 @@ class AlreadyListeningTestPsutil(AlreadyListeningTest):
"""Tests for certbot.plugins.already_listening."""
@mock.patch("certbot.plugins.util.psutil.net_connections")
@mock.patch("certbot.plugins.util.psutil.Process")
@mock.patch("certbot.plugins.util.zope.component.getUtility")
@test_util.patch_get_utility()
def test_race_condition(self, mock_get_utility, mock_process, mock_net):
# This tests a race condition, or permission problem, or OS
# incompatibility in which, for some reason, no process name can be
@ -103,7 +103,7 @@ class AlreadyListeningTestPsutil(AlreadyListeningTest):
@mock.patch("certbot.plugins.util.psutil.net_connections")
@mock.patch("certbot.plugins.util.psutil.Process")
@mock.patch("certbot.plugins.util.zope.component.getUtility")
@test_util.patch_get_utility()
def test_not_listening(self, mock_get_utility, mock_process, mock_net):
from psutil._common import sconn
conns = [
@ -121,7 +121,7 @@ class AlreadyListeningTestPsutil(AlreadyListeningTest):
@mock.patch("certbot.plugins.util.psutil.net_connections")
@mock.patch("certbot.plugins.util.psutil.Process")
@mock.patch("certbot.plugins.util.zope.component.getUtility")
@test_util.patch_get_utility()
def test_listening_ipv4(self, mock_get_utility, mock_process, mock_net):
from psutil._common import sconn
conns = [
@ -142,7 +142,7 @@ class AlreadyListeningTestPsutil(AlreadyListeningTest):
@mock.patch("certbot.plugins.util.psutil.net_connections")
@mock.patch("certbot.plugins.util.psutil.Process")
@mock.patch("certbot.plugins.util.zope.component.getUtility")
@test_util.patch_get_utility()
def test_listening_ipv6(self, mock_get_utility, mock_process, mock_net):
from psutil._common import sconn
conns = [

View file

@ -110,12 +110,13 @@ to serve all files under specified web root ({0})."""
def _prompt_with_webroot_list(self, domain, known_webroots):
display = zope.component.getUtility(interfaces.IDisplay)
path_flag = "--" + self.option_name("path")
while True:
code, index = display.menu(
"Select the webroot for {0}:".format(domain),
["Enter a new webroot"] + known_webroots,
help_label="Help", cli_flag="--" + self.option_name("path"))
help_label="Help", cli_flag=path_flag, force_interactive=True)
if code == display_util.CANCEL:
raise errors.PluginError(
"Every requested domain must have a "

View file

@ -61,7 +61,7 @@ class AuthenticatorTest(unittest.TestCase):
def test_prepare(self):
self.auth.prepare() # shouldn't raise any exceptions
@mock.patch("certbot.plugins.webroot.zope.component.getUtility")
@test_util.patch_get_utility()
def test_webroot_from_list(self, mock_get_utility):
self.config.webroot_path = []
self.config.webroot_map = {"otherthing.com": self.path}
@ -78,7 +78,7 @@ class AuthenticatorTest(unittest.TestCase):
self.assertEqual(self.config.webroot_map[self.achall.domain],
self.path)
@mock.patch("certbot.plugins.webroot.zope.component.getUtility")
@test_util.patch_get_utility()
def test_webroot_from_list_help_and_cancel(self, mock_get_utility):
self.config.webroot_path = []
self.config.webroot_map = {"otherthing.com": self.path}
@ -95,7 +95,7 @@ class AuthenticatorTest(unittest.TestCase):
webroot in call[0][1]
for webroot in six.itervalues(self.config.webroot_map)))
@mock.patch("certbot.plugins.webroot.zope.component.getUtility")
@test_util.patch_get_utility()
def test_new_webroot(self, mock_get_utility):
self.config.webroot_path = []
self.config.webroot_map = {}

View file

@ -15,6 +15,7 @@ from certbot import errors
from certbot import util
from certbot.tests import acme_util
from certbot.tests import util as test_util
class ChallengeFactoryTest(unittest.TestCase):
@ -251,7 +252,7 @@ class PollChallengesTest(unittest.TestCase):
self.assertEqual(authzr.body.status, messages.STATUS_PENDING)
@mock.patch("certbot.auth_handler.time")
@mock.patch("certbot.auth_handler.zope.component.getUtility")
@test_util.patch_get_utility()
def test_poll_challenges_failure(self, unused_mock_time, unused_mock_zope):
self.mock_net.poll.side_effect = self._mock_poll_solve_one_invalid
self.assertRaises(
@ -412,7 +413,7 @@ class ReportFailedChallsTest(unittest.TestCase):
domain="foo.bar",
account_key="key")
@mock.patch("certbot.auth_handler.zope.component.getUtility")
@test_util.patch_get_utility()
def test_same_error_and_domain(self, mock_zope):
from certbot import auth_handler
@ -421,7 +422,7 @@ class ReportFailedChallsTest(unittest.TestCase):
self.assertTrue(len(call_list) == 1)
self.assertTrue("Domain: example.com\nType: tls\nDetail: detail" in call_list[0][0][0])
@mock.patch("certbot.auth_handler.zope.component.getUtility")
@test_util.patch_get_utility()
def test_different_errors_and_domains(self, mock_zope):
from certbot import auth_handler

View file

@ -115,7 +115,7 @@ class UpdateLiveSymlinksTest(BaseCertManagerTest):
class DeleteTest(storage_test.BaseRenewableCertTest):
"""Tests for certbot.cert_manager.delete
"""
@mock.patch('zope.component.getUtility')
@test_util.patch_get_utility()
@mock.patch('certbot.cert_manager.lineage_for_certname')
@mock.patch('certbot.storage.delete_files')
def test_delete(self, mock_delete_files, mock_lineage_for_certname, unused_get_utility):
@ -135,14 +135,14 @@ class CertificatesTest(BaseCertManagerTest):
return certificates(*args, **kwargs)
@mock.patch('certbot.cert_manager.logger')
@mock.patch('zope.component.getUtility')
@test_util.patch_get_utility()
def test_certificates_parse_fail(self, mock_utility, mock_logger):
self._certificates(self.cli_config)
self.assertTrue(mock_logger.warning.called) #pylint: disable=no-member
self.assertTrue(mock_utility.called)
@mock.patch('certbot.cert_manager.logger')
@mock.patch('zope.component.getUtility')
@test_util.patch_get_utility()
def test_certificates_quiet(self, mock_utility, mock_logger):
self.cli_config.quiet = True
self._certificates(self.cli_config)
@ -150,7 +150,7 @@ class CertificatesTest(BaseCertManagerTest):
self.assertTrue(mock_logger.warning.called) #pylint: disable=no-member
@mock.patch('certbot.cert_manager.logger')
@mock.patch('zope.component.getUtility')
@test_util.patch_get_utility()
@mock.patch("certbot.storage.RenewableCert")
@mock.patch('certbot.cert_manager._report_human_readable')
def test_certificates_parse_success(self, mock_report, mock_renewable_cert,
@ -163,7 +163,7 @@ class CertificatesTest(BaseCertManagerTest):
self.assertTrue(mock_renewable_cert.called)
@mock.patch('certbot.cert_manager.logger')
@mock.patch('zope.component.getUtility')
@test_util.patch_get_utility()
def test_certificates_no_files(self, mock_utility, mock_logger):
tempdir = tempfile.mkdtemp()
@ -348,7 +348,7 @@ class RenameLineageTest(BaseCertManagerTest):
return cert_manager.rename_lineage(*args, **kwargs)
@mock.patch('certbot.storage.renewal_conf_files')
@mock.patch('certbot.main.zope.component.getUtility')
@test_util.patch_get_utility()
def test_no_certname(self, mock_get_utility, mock_renewal_conf_files):
mock_config = mock.Mock(certname=None, new_certname="two")
@ -365,7 +365,7 @@ class RenameLineageTest(BaseCertManagerTest):
util_mock.menu.return_value = (display_util.OK, -1)
self.assertRaises(errors.Error, self._call, mock_config)
@mock.patch('certbot.main.zope.component.getUtility')
@test_util.patch_get_utility()
def test_no_new_certname(self, mock_get_utility):
mock_config = mock.Mock(certname="one", new_certname=None)
@ -379,7 +379,7 @@ class RenameLineageTest(BaseCertManagerTest):
mock_get_utility.return_value = util_mock
self.assertRaises(errors.Error, self._call, mock_config)
@mock.patch('certbot.main.zope.component.getUtility')
@test_util.patch_get_utility()
@mock.patch('certbot.cert_manager.lineage_for_certname')
def test_no_existing_certname(self, mock_lineage_for_certname, unused_get_utility):
mock_config = mock.Mock(certname="one", new_certname="two")
@ -387,7 +387,7 @@ class RenameLineageTest(BaseCertManagerTest):
self.assertRaises(errors.ConfigurationError,
self._call, mock_config)
@mock.patch('certbot.main.zope.component.getUtility')
@test_util.patch_get_utility()
@mock.patch("certbot.storage.RenewableCert._check_symlinks")
def test_rename_cert(self, mock_check, unused_get_utility):
mock_check.return_value = True
@ -398,7 +398,7 @@ class RenameLineageTest(BaseCertManagerTest):
self.assertTrue(updated_lineage is not None)
self.assertEqual(updated_lineage.lineagename, mock_config.new_certname)
@mock.patch('certbot.main.zope.component.getUtility')
@test_util.patch_get_utility()
@mock.patch("certbot.storage.RenewableCert._check_symlinks")
def test_rename_cert_interactive_certname(self, mock_check, mock_get_utility):
mock_check.return_value = True
@ -413,7 +413,7 @@ class RenameLineageTest(BaseCertManagerTest):
self.assertTrue(updated_lineage is not None)
self.assertEqual(updated_lineage.lineagename, mock_config.new_certname)
@mock.patch('certbot.main.zope.component.getUtility')
@test_util.patch_get_utility()
@mock.patch("certbot.storage.RenewableCert._check_symlinks")
def test_rename_cert_bad_new_certname(self, mock_check, unused_get_utility):
mock_check.return_value = True

View file

@ -323,7 +323,7 @@ class ClientTest(ClientTestCommon):
["foo.bar"], "key", "cert", "chain", "fullchain")
installer.recovery_routine.assert_called_once_with()
@mock.patch("certbot.client.zope.component.getUtility")
@test_util.patch_get_utility()
def test_deploy_certificate_restart_failure(self, mock_get_utility):
installer = mock.MagicMock()
installer.restart.side_effect = [errors.PluginError, None]
@ -335,7 +335,7 @@ class ClientTest(ClientTestCommon):
installer.rollback_checkpoints.assert_called_once_with()
self.assertEqual(installer.restart.call_count, 2)
@mock.patch("certbot.client.zope.component.getUtility")
@test_util.patch_get_utility()
def test_deploy_certificate_restart_failure2(self, mock_get_utility):
installer = mock.MagicMock()
installer.restart.side_effect = errors.PluginError
@ -438,7 +438,7 @@ class EnhanceConfigTest(ClientTestCommon):
def _test_error(self):
self.config.redirect = True
with mock.patch("certbot.client.zope.component.getUtility") as mock_gu:
with test_util.patch_get_utility() as mock_gu:
self.assertRaises(
errors.PluginError, self._test_with_all_supported)
self.assertEqual(mock_gu().add_message.call_count, 1)

View file

@ -13,7 +13,6 @@ from acme import messages
from certbot import account
from certbot import errors
from certbot import interfaces
from certbot.display import util as display_util
@ -26,59 +25,66 @@ KEY = jose.JWKRSA.load(test_util.load_vector("rsa512_key.pem"))
class GetEmailTest(unittest.TestCase):
"""Tests for certbot.display.ops.get_email."""
def setUp(self):
mock_display = mock.MagicMock()
self.input = mock_display.input
zope.component.provideUtility(mock_display, interfaces.IDisplay)
@classmethod
def _call(cls, **kwargs):
from certbot.display.ops import get_email
return get_email(**kwargs)
def test_cancel_none(self):
self.input.return_value = (display_util.CANCEL, "foo@bar.baz")
@test_util.patch_get_utility("certbot.display.ops.z_util")
def test_cancel_none(self, mock_get_utility):
mock_input = mock_get_utility().input
mock_input.return_value = (display_util.CANCEL, "foo@bar.baz")
self.assertRaises(errors.Error, self._call)
self.assertRaises(errors.Error, self._call, optional=False)
def test_ok_safe(self):
self.input.return_value = (display_util.OK, "foo@bar.baz")
@test_util.patch_get_utility("certbot.display.ops.z_util")
def test_ok_safe(self, mock_get_utility):
mock_input = mock_get_utility().input
mock_input.return_value = (display_util.OK, "foo@bar.baz")
with mock.patch("certbot.display.ops.util.safe_email") as mock_safe_email:
mock_safe_email.return_value = True
self.assertTrue(self._call() is "foo@bar.baz")
def test_ok_not_safe(self):
self.input.return_value = (display_util.OK, "foo@bar.baz")
@test_util.patch_get_utility("certbot.display.ops.z_util")
def test_ok_not_safe(self, mock_get_utility):
mock_input = mock_get_utility().input
mock_input.return_value = (display_util.OK, "foo@bar.baz")
with mock.patch("certbot.display.ops.util.safe_email") as mock_safe_email:
mock_safe_email.side_effect = [False, True]
self.assertTrue(self._call() is "foo@bar.baz")
def test_invalid_flag(self):
@test_util.patch_get_utility("certbot.display.ops.z_util")
def test_invalid_flag(self, mock_get_utility):
invalid_txt = "There seem to be problems"
self.input.return_value = (display_util.OK, "foo@bar.baz")
mock_input = mock_get_utility().input
mock_input.return_value = (display_util.OK, "foo@bar.baz")
with mock.patch("certbot.display.ops.util.safe_email") as mock_safe_email:
mock_safe_email.return_value = True
self._call()
self.assertTrue(invalid_txt not in self.input.call_args[0][0])
self.assertTrue(invalid_txt not in mock_input.call_args[0][0])
self._call(invalid=True)
self.assertTrue(invalid_txt in self.input.call_args[0][0])
self.assertTrue(invalid_txt in mock_input.call_args[0][0])
def test_optional_flag(self):
self.input.return_value = (display_util.OK, "foo@bar.baz")
@test_util.patch_get_utility("certbot.display.ops.z_util")
def test_optional_flag(self, mock_get_utility):
mock_input = mock_get_utility().input
mock_input.return_value = (display_util.OK, "foo@bar.baz")
with mock.patch("certbot.display.ops.util.safe_email") as mock_safe_email:
mock_safe_email.side_effect = [False, True]
self._call(optional=False)
for call in self.input.call_args_list:
for call in mock_input.call_args_list:
self.assertTrue(
"--register-unsafely-without-email" not in call[0][0])
def test_optional_invalid_unsafe(self):
@test_util.patch_get_utility("certbot.display.ops.z_util")
def test_optional_invalid_unsafe(self, mock_get_utility):
invalid_txt = "There seem to be problems"
self.input.return_value = (display_util.OK, "foo@bar.baz")
mock_input = mock_get_utility().input
mock_input.return_value = (display_util.OK, "foo@bar.baz")
with mock.patch("certbot.display.ops.util.safe_email") as mock_safe_email:
mock_safe_email.side_effect = [False, True]
self._call(invalid=True)
self.assertTrue(invalid_txt in self.input.call_args[0][0])
self.assertTrue(invalid_txt in mock_input.call_args[0][0])
class ChooseAccountTest(unittest.TestCase):

View file

@ -93,9 +93,16 @@ class FileOutputDisplayTest(unittest.TestCase):
self.assertEqual(input_, default)
def test_input_assertion_fail(self):
self.assertRaises(AssertionError, self._force_noninteractive,
# If the call to util.assert_valid_call is commented out, an
# error.Error is raised, otherwise, an AssertionError is raised.
self.assertRaises(Exception, self._force_noninteractive,
self.displayer.input, "message", cli_flag="--flag")
def test_input_assertion_fail2(self):
with mock.patch("certbot.display.util.assert_valid_call"):
self.assertRaises(errors.Error, self._force_noninteractive,
self.displayer.input, "msg", cli_flag="--flag")
def test_yesno(self):
with mock.patch("six.moves.input", return_value="Yes"):
self.assertTrue(self.displayer.yesno(

View file

@ -100,8 +100,7 @@ class ObtainCertTest(unittest.TestCase):
"""Tests for certbot.main.obtain_cert."""
def setUp(self):
self.get_utility_patch = mock.patch(
'certbot.main.zope.component.getUtility')
self.get_utility_patch = test_util.patch_get_utility()
self.mock_get_utility = self.get_utility_patch.start()
def tearDown(self):
@ -713,7 +712,7 @@ class MainTest(unittest.TestCase): # pylint: disable=too-many-public-methods
args += '-d foo.bar -a standalone certonly'.split()
self._call(args)
@mock.patch('certbot.main.zope.component.getUtility')
@test_util.patch_get_utility()
def test_certonly_dry_run_new_request_success(self, mock_get_utility):
mock_client = mock.MagicMock()
mock_client.obtain_and_enroll_certificate.return_value = None
@ -726,7 +725,7 @@ class MainTest(unittest.TestCase): # pylint: disable=too-many-public-methods
self.assertEqual(mock_get_utility().add_message.call_count, 1)
@mock.patch('certbot.crypto_util.notAfter')
@mock.patch('certbot.main.zope.component.getUtility')
@test_util.patch_get_utility()
def test_certonly_new_request_success(self, mock_get_utility, mock_notAfter):
cert_path = '/etc/letsencrypt/live/foo.bar'
date = '1970-01-01'
@ -770,8 +769,7 @@ class MainTest(unittest.TestCase): # pylint: disable=too-many-public-methods
mock_fdc.return_value = (mock_lineage, None)
with mock.patch('certbot.main._init_le_client') as mock_init:
mock_init.return_value = mock_client
get_utility_path = 'certbot.main.zope.component.getUtility'
with mock.patch(get_utility_path) as mock_get_utility:
with test_util.patch_get_utility() as mock_get_utility:
with mock.patch('certbot.main.renewal.OpenSSL') as mock_ssl:
mock_latest = mock.MagicMock()
mock_latest.get_issuer.return_value = "Fake fake"
@ -1000,7 +998,7 @@ class MainTest(unittest.TestCase): # pylint: disable=too-many-public-methods
args=['renew', '--post-hook', 'echo hello world'])
self.assertTrue('No hooks were run.' in stdout.getvalue())
@mock.patch('certbot.main.zope.component.getUtility')
@test_util.patch_get_utility()
@mock.patch('certbot.main._find_lineage_for_domains_and_certname')
@mock.patch('certbot.main._init_le_client')
def test_certonly_reinstall(self, mock_init, mock_renewal, mock_get_utility):
@ -1021,8 +1019,7 @@ class MainTest(unittest.TestCase): # pylint: disable=too-many-public-methods
mock_client.save_certificate.return_value = cert_path, None, None
with mock.patch('certbot.main._init_le_client') as mock_init:
mock_init.return_value = mock_client
get_utility_path = 'certbot.main.zope.component.getUtility'
with mock.patch(get_utility_path) as mock_get_utility:
with test_util.patch_get_utility() as mock_get_utility:
chain_path = '/etc/letsencrypt/live/example.com/chain.pem'
full_path = '/etc/letsencrypt/live/example.com/fullchain.pem'
args = ('-a standalone certonly --csr {0} --cert-path {1} '
@ -1121,7 +1118,7 @@ class MainTest(unittest.TestCase): # pylint: disable=too-many-public-methods
self.assertTrue("--register-unsafely-without-email" in x[0])
@mock.patch('certbot.main.display_ops.get_email')
@mock.patch('certbot.main.zope.component.getUtility')
@test_util.patch_get_utility()
def test_update_registration_with_email(self, mock_utility, mock_email):
email = "user@example.com"
mock_email.return_value = email

View file

@ -11,6 +11,8 @@ import six
from certbot import errors
from certbot.tests import util as test_util
class ReverterCheckpointLocalTest(unittest.TestCase):
# pylint: disable=too-many-instance-attributes, too-many-public-methods
@ -375,7 +377,7 @@ class TestFullCheckpointsReverter(unittest.TestCase):
self.assertEqual(read_in(self.config2), "directive-dir2")
self.assertFalse(os.path.isfile(config3))
@mock.patch("certbot.reverter.zope.component.getUtility")
@test_util.patch_get_utility()
def test_view_config_changes(self, mock_output):
"""This is not strict as this is subject to change."""
self._setup_three_checkpoints()

View file

@ -10,6 +10,7 @@ import unittest
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import mock
import OpenSSL
from acme import errors
@ -17,8 +18,11 @@ from acme import jose
from acme import util
from certbot import constants
from certbot import interfaces
from certbot import storage
from certbot.display import util as display_util
def vector_path(*names):
"""Path to a test vector."""
@ -158,3 +162,87 @@ def make_lineage(self, testfile):
line.replace('MAGICDIR', self.config_dir) for line in src)
return conf_path
def patch_get_utility(target='zope.component.getUtility'):
"""Patch zope.component.getUtility to use a special mock IDisplay.
The mock IDisplay works like a regular mock object, except it also
also asserts that methods are called with valid arguments.
:param str target: path to patch
:returns: mock zope.component.getUtility
:rtype: mock.MagicMock
"""
return mock.patch(target, new_callable=_create_get_utility_mock)
class FreezableMock(object):
"""Mock object with the ability to freeze attributes.
This class works like a regular mock.MagicMock object, except
attributes and behavior can be set and frozen so they cannot be
changed during tests.
If a func argument is provided to the constructor, this function
is called first when an instance of FreezableMock is called,
followed by the usual behavior defined by MagicMock. The return
value of func is ignored.
"""
def __init__(self, frozen=False, func=None):
self._frozen_set = set() if frozen else set(('freeze',))
self._func = func
self._mock = mock.MagicMock()
self._frozen = frozen
def freeze(self):
"""Freeze object preventing further changes."""
self._frozen = True
def __call__(self, *args, **kwargs):
if self._func is not None:
self._func(*args, **kwargs)
return self._mock(*args, **kwargs)
def __getattribute__(self, name):
if name == '_frozen':
try:
return object.__getattribute__(self, name)
except AttributeError:
return False
elif name == '_frozen_set' or name in self._frozen_set:
return object.__getattribute__(self, name)
else:
return getattr(object.__getattribute__(self, '_mock'), name)
def __setattr__(self, name, value):
if self._frozen:
return setattr(self._mock, name, value)
elif name != '_frozen_set':
self._frozen_set.add(name)
return object.__setattr__(self, name, value)
def _create_get_utility_mock():
display = FreezableMock()
for name in interfaces.IDisplay.names(): # pylint: disable=no-member
if name != 'notification':
frozen_mock = FreezableMock(frozen=True, func=_assert_valid_call)
setattr(display, name, frozen_mock)
display.freeze()
return mock.MagicMock(return_value=display)
def _assert_valid_call(*args, **kwargs):
assert_args = [args[0] if args else kwargs['message']]
assert_kwargs = {}
assert_kwargs['default'] = kwargs.get('default', None)
assert_kwargs['cli_flag'] = kwargs.get('cli_flag', None)
assert_kwargs['force_interactive'] = kwargs.get('force_interactive', False)
# pylint: disable=star-args
display_util.assert_valid_call(*assert_args, **assert_kwargs)