diff --git a/certbot/display/util.py b/certbot/display/util.py index 4d69f1263..7e797e71a 100644 --- a/certbot/display/util.py +++ b/certbot/display/util.py @@ -1,10 +1,10 @@ """Certbot display.""" import logging import os -import textwrap +import select import sys +import textwrap -import six import zope.interface from certbot import constants @@ -51,6 +51,37 @@ def _wrap_lines(msg): return os.linesep.join(fixed_l) + +def input_with_timeout(prompt=None, timeout=36000.0): + """Get user input with a timeout. + + Behaves the same as six.moves.input, however, an error is raised if + a user doesn't answer after timeout seconds. The default timeout + value was chosen to place it just under 12 hours for users following + our advice and running Certbot twice a day. + + :param str prompt: prompt to provide for input + :param float timeout: maximum number of seconds to wait for input + + :returns: user response + :rtype: str + + :raises errors.Error if no answer is given before the timeout + + """ + if prompt: + sys.stdout.write(prompt) + sys.stdout.flush() + + # select can only be used like this on UNIX + rlist, _, _ = select.select([sys.stdin], [], [], timeout) + if not rlist: + raise errors.Error( + "Timed out waiting for answer to prompt '{0}'".format(prompt)) + + return rlist[0].readline().rstrip('\n') + + @zope.interface.implementer(interfaces.IDisplay) class FileDisplay(object): """File-based display.""" @@ -83,7 +114,7 @@ class FileDisplay(object): line=os.linesep, frame=side_frame, msg=message)) if pause: if self._can_interact(force_interactive): - six.moves.input("Press Enter to Continue") + input_with_timeout("Press Enter to Continue") else: logger.debug("Not pausing for user confirmation") @@ -140,7 +171,7 @@ class FileDisplay(object): if self._return_default(message, default, cli_flag, force_interactive): return OK, default - ans = six.moves.input( + ans = input_with_timeout( textwrap.fill( "%s (Enter 'c' to cancel): " % message, 80, @@ -182,7 +213,7 @@ class FileDisplay(object): os.linesep, frame=side_frame, msg=message)) while True: - ans = six.moves.input("{yes}/{no}: ".format( + ans = input_with_timeout("{yes}/{no}: ".format( yes=_parens_around_char(yes_label), no=_parens_around_char(no_label))) @@ -388,7 +419,7 @@ class FileDisplay(object): input_msg = ("Press 1 [enter] to confirm the selection " "(press 'c' to cancel): ") while selection < 1: - ans = six.moves.input(input_msg) + ans = input_with_timeout(input_msg) if ans.startswith("c") or ans.startswith("C"): return CANCEL, -1 try: diff --git a/certbot/tests/display/util_test.py b/certbot/tests/display/util_test.py index f4d69b50d..a872917ec 100644 --- a/certbot/tests/display/util_test.py +++ b/certbot/tests/display/util_test.py @@ -1,8 +1,10 @@ """Test :mod:`certbot.display.util`.""" import inspect import os +import socket import unittest +import six import mock from certbot import errors @@ -15,6 +17,39 @@ CHOICES = [("First", "Description1"), ("Second", "Description2")] TAGS = ["tag1", "tag2", "tag3"] TAGS_CHOICES = [("1", "tag1"), ("2", "tag2"), ("3", "tag3")] + +class InputWithTimeoutTest(unittest.TestCase): + """Tests for certbot.display.util.input_with_timeout.""" + @classmethod + def _call(cls, *args, **kwargs): + from certbot.display.util import input_with_timeout + return input_with_timeout(*args, **kwargs) + + def setUp(self): + self.expected_msg = "foo bar" + self.stdin = six.StringIO(self.expected_msg + "\n") + + def test_input(self, prompt=None): + with mock.patch("certbot.display.util.select.select") as mock_select: + mock_select.return_value = ([self.stdin], [], [],) + self.assertEqual(display_util.input_with_timeout(prompt), + self.expected_msg) + + @mock.patch("certbot.display.util.sys.stdout") + def test_input_with_prompt(self, mock_stdout): + prompt = "test prompt: " + self.test_input(prompt) + mock_stdout.write.assert_called_once_with(prompt) + mock_stdout.flush.assert_called_once_with() + + def test_timeout(self): + stdin = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + stdin.bind(('', 0)) + stdin.listen(1) + with mock.patch("certbot.display.util.sys.stdin", stdin): + self.assertRaises(errors.Error, self._call, timeout=0.001) + + class FileOutputDisplayTest(unittest.TestCase): """Test stdout display. @@ -35,7 +70,8 @@ class FileOutputDisplayTest(unittest.TestCase): self.assertTrue("message" in string) def test_notification_pause(self): - with mock.patch("six.moves.input", return_value="enter"): + input_with_timeout = "certbot.display.util.input_with_timeout" + with mock.patch(input_with_timeout, return_value="enter"): self.displayer.notification("message", force_interactive=True) self.assertTrue("message" in self.mock_stdout.write.call_args[0][0]) @@ -72,13 +108,15 @@ class FileOutputDisplayTest(unittest.TestCase): self.assertEqual(result, (display_util.OK, default)) def test_input_cancel(self): - with mock.patch("six.moves.input", return_value="c"): + input_with_timeout = "certbot.display.util.input_with_timeout" + with mock.patch(input_with_timeout, return_value="c"): code, _ = self.displayer.input("message", force_interactive=True) self.assertTrue(code, display_util.CANCEL) def test_input_normal(self): - with mock.patch("six.moves.input", return_value="domain.com"): + input_with_timeout = "certbot.display.util.input_with_timeout" + with mock.patch(input_with_timeout, return_value="domain.com"): code, input_ = self.displayer.input("message", force_interactive=True) self.assertEqual(code, display_util.OK) @@ -104,23 +142,24 @@ class FileOutputDisplayTest(unittest.TestCase): self.displayer.input, "msg", cli_flag="--flag") def test_yesno(self): - with mock.patch("six.moves.input", return_value="Yes"): + input_with_timeout = "certbot.display.util.input_with_timeout" + with mock.patch(input_with_timeout, return_value="Yes"): self.assertTrue(self.displayer.yesno( "message", force_interactive=True)) - with mock.patch("six.moves.input", return_value="y"): + with mock.patch(input_with_timeout, return_value="y"): self.assertTrue(self.displayer.yesno( "message", force_interactive=True)) - with mock.patch("six.moves.input", side_effect=["maybe", "y"]): + with mock.patch(input_with_timeout, side_effect=["maybe", "y"]): self.assertTrue(self.displayer.yesno( "message", force_interactive=True)) - with mock.patch("six.moves.input", return_value="No"): + with mock.patch(input_with_timeout, return_value="No"): self.assertFalse(self.displayer.yesno( "message", force_interactive=True)) - with mock.patch("six.moves.input", side_effect=["cancel", "n"]): + with mock.patch(input_with_timeout, side_effect=["cancel", "n"]): self.assertFalse(self.displayer.yesno( "message", force_interactive=True)) - with mock.patch("six.moves.input", return_value="a"): + with mock.patch(input_with_timeout, return_value="a"): self.assertTrue(self.displayer.yesno( "msg", yes_label="Agree", force_interactive=True)) @@ -128,7 +167,7 @@ class FileOutputDisplayTest(unittest.TestCase): self.assertTrue(self._force_noninteractive( self.displayer.yesno, "message", default=True)) - @mock.patch("certbot.display.util.six.moves.input") + @mock.patch("certbot.display.util.input_with_timeout") def test_checklist_valid(self, mock_input): mock_input.return_value = "2 1" code, tag_list = self.displayer.checklist( @@ -136,21 +175,21 @@ class FileOutputDisplayTest(unittest.TestCase): self.assertEqual( (code, set(tag_list)), (display_util.OK, set(["tag1", "tag2"]))) - @mock.patch("certbot.display.util.six.moves.input") + @mock.patch("certbot.display.util.input_with_timeout") def test_checklist_empty(self, mock_input): mock_input.return_value = "" code, tag_list = self.displayer.checklist("msg", TAGS, force_interactive=True) self.assertEqual( (code, set(tag_list)), (display_util.OK, set(["tag1", "tag2", "tag3"]))) - @mock.patch("certbot.display.util.six.moves.input") + @mock.patch("certbot.display.util.input_with_timeout") def test_checklist_miss_valid(self, mock_input): mock_input.side_effect = ["10", "tag1 please", "1"] ret = self.displayer.checklist("msg", TAGS, force_interactive=True) self.assertEqual(ret, (display_util.OK, ["tag1"])) - @mock.patch("certbot.display.util.six.moves.input") + @mock.patch("certbot.display.util.input_with_timeout") def test_checklist_miss_quit(self, mock_input): mock_input.side_effect = ["10", "c"] @@ -182,7 +221,7 @@ class FileOutputDisplayTest(unittest.TestCase): self.displayer._scrub_checklist_input(list_, TAGS)) self.assertEqual(set_tags, exp[i]) - @mock.patch("certbot.display.util.six.moves.input") + @mock.patch("certbot.display.util.input_with_timeout") def test_directory_select(self, mock_input): # pylint: disable=star-args args = ["msg", "/var/www/html", "--flag", True] @@ -246,11 +285,12 @@ class FileOutputDisplayTest(unittest.TestCase): def test_get_valid_int_ans_valid(self): # pylint: disable=protected-access - with mock.patch("six.moves.input", return_value="1"): + input_with_timeout = "certbot.display.util.input_with_timeout" + with mock.patch(input_with_timeout, return_value="1"): self.assertEqual( self.displayer._get_valid_int_ans(1), (display_util.OK, 1)) ans = "2" - with mock.patch("six.moves.input", return_value=ans): + with mock.patch(input_with_timeout, return_value=ans): self.assertEqual( self.displayer._get_valid_int_ans(3), (display_util.OK, int(ans))) @@ -262,8 +302,9 @@ class FileOutputDisplayTest(unittest.TestCase): ["4", "one", "C"], ["c"], ] + input_with_timeout = "certbot.display.util.input_with_timeout" for ans in answers: - with mock.patch("six.moves.input", side_effect=ans): + with mock.patch(input_with_timeout, side_effect=ans): self.assertEqual( self.displayer._get_valid_int_ans(3), (display_util.CANCEL, -1))