Add a timeout to prompts (#4601)

* Add input_with_timeout

* use input_with_timeout
This commit is contained in:
Brad Warren 2017-05-01 14:55:31 -07:00 committed by GitHub
parent 5ca8f7c5b9
commit 79d5c890c3
2 changed files with 95 additions and 23 deletions

View file

@ -1,10 +1,10 @@
"""Certbot display."""
import logging
import os
import textwrap
import select
import sys
import textwrap
import six
import zope.interface
from certbot import constants
@ -51,6 +51,37 @@ def _wrap_lines(msg):
return os.linesep.join(fixed_l)
def input_with_timeout(prompt=None, timeout=36000.0):
"""Get user input with a timeout.
Behaves the same as six.moves.input, however, an error is raised if
a user doesn't answer after timeout seconds. The default timeout
value was chosen to place it just under 12 hours for users following
our advice and running Certbot twice a day.
:param str prompt: prompt to provide for input
:param float timeout: maximum number of seconds to wait for input
:returns: user response
:rtype: str
:raises errors.Error if no answer is given before the timeout
"""
if prompt:
sys.stdout.write(prompt)
sys.stdout.flush()
# select can only be used like this on UNIX
rlist, _, _ = select.select([sys.stdin], [], [], timeout)
if not rlist:
raise errors.Error(
"Timed out waiting for answer to prompt '{0}'".format(prompt))
return rlist[0].readline().rstrip('\n')
@zope.interface.implementer(interfaces.IDisplay)
class FileDisplay(object):
"""File-based display."""
@ -83,7 +114,7 @@ class FileDisplay(object):
line=os.linesep, frame=side_frame, msg=message))
if pause:
if self._can_interact(force_interactive):
six.moves.input("Press Enter to Continue")
input_with_timeout("Press Enter to Continue")
else:
logger.debug("Not pausing for user confirmation")
@ -140,7 +171,7 @@ class FileDisplay(object):
if self._return_default(message, default, cli_flag, force_interactive):
return OK, default
ans = six.moves.input(
ans = input_with_timeout(
textwrap.fill(
"%s (Enter 'c' to cancel): " % message,
80,
@ -182,7 +213,7 @@ class FileDisplay(object):
os.linesep, frame=side_frame, msg=message))
while True:
ans = six.moves.input("{yes}/{no}: ".format(
ans = input_with_timeout("{yes}/{no}: ".format(
yes=_parens_around_char(yes_label),
no=_parens_around_char(no_label)))
@ -388,7 +419,7 @@ class FileDisplay(object):
input_msg = ("Press 1 [enter] to confirm the selection "
"(press 'c' to cancel): ")
while selection < 1:
ans = six.moves.input(input_msg)
ans = input_with_timeout(input_msg)
if ans.startswith("c") or ans.startswith("C"):
return CANCEL, -1
try:

View file

@ -1,8 +1,10 @@
"""Test :mod:`certbot.display.util`."""
import inspect
import os
import socket
import unittest
import six
import mock
from certbot import errors
@ -15,6 +17,39 @@ CHOICES = [("First", "Description1"), ("Second", "Description2")]
TAGS = ["tag1", "tag2", "tag3"]
TAGS_CHOICES = [("1", "tag1"), ("2", "tag2"), ("3", "tag3")]
class InputWithTimeoutTest(unittest.TestCase):
"""Tests for certbot.display.util.input_with_timeout."""
@classmethod
def _call(cls, *args, **kwargs):
from certbot.display.util import input_with_timeout
return input_with_timeout(*args, **kwargs)
def setUp(self):
self.expected_msg = "foo bar"
self.stdin = six.StringIO(self.expected_msg + "\n")
def test_input(self, prompt=None):
with mock.patch("certbot.display.util.select.select") as mock_select:
mock_select.return_value = ([self.stdin], [], [],)
self.assertEqual(display_util.input_with_timeout(prompt),
self.expected_msg)
@mock.patch("certbot.display.util.sys.stdout")
def test_input_with_prompt(self, mock_stdout):
prompt = "test prompt: "
self.test_input(prompt)
mock_stdout.write.assert_called_once_with(prompt)
mock_stdout.flush.assert_called_once_with()
def test_timeout(self):
stdin = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
stdin.bind(('', 0))
stdin.listen(1)
with mock.patch("certbot.display.util.sys.stdin", stdin):
self.assertRaises(errors.Error, self._call, timeout=0.001)
class FileOutputDisplayTest(unittest.TestCase):
"""Test stdout display.
@ -35,7 +70,8 @@ class FileOutputDisplayTest(unittest.TestCase):
self.assertTrue("message" in string)
def test_notification_pause(self):
with mock.patch("six.moves.input", return_value="enter"):
input_with_timeout = "certbot.display.util.input_with_timeout"
with mock.patch(input_with_timeout, return_value="enter"):
self.displayer.notification("message", force_interactive=True)
self.assertTrue("message" in self.mock_stdout.write.call_args[0][0])
@ -72,13 +108,15 @@ class FileOutputDisplayTest(unittest.TestCase):
self.assertEqual(result, (display_util.OK, default))
def test_input_cancel(self):
with mock.patch("six.moves.input", return_value="c"):
input_with_timeout = "certbot.display.util.input_with_timeout"
with mock.patch(input_with_timeout, return_value="c"):
code, _ = self.displayer.input("message", force_interactive=True)
self.assertTrue(code, display_util.CANCEL)
def test_input_normal(self):
with mock.patch("six.moves.input", return_value="domain.com"):
input_with_timeout = "certbot.display.util.input_with_timeout"
with mock.patch(input_with_timeout, return_value="domain.com"):
code, input_ = self.displayer.input("message", force_interactive=True)
self.assertEqual(code, display_util.OK)
@ -104,23 +142,24 @@ class FileOutputDisplayTest(unittest.TestCase):
self.displayer.input, "msg", cli_flag="--flag")
def test_yesno(self):
with mock.patch("six.moves.input", return_value="Yes"):
input_with_timeout = "certbot.display.util.input_with_timeout"
with mock.patch(input_with_timeout, return_value="Yes"):
self.assertTrue(self.displayer.yesno(
"message", force_interactive=True))
with mock.patch("six.moves.input", return_value="y"):
with mock.patch(input_with_timeout, return_value="y"):
self.assertTrue(self.displayer.yesno(
"message", force_interactive=True))
with mock.patch("six.moves.input", side_effect=["maybe", "y"]):
with mock.patch(input_with_timeout, side_effect=["maybe", "y"]):
self.assertTrue(self.displayer.yesno(
"message", force_interactive=True))
with mock.patch("six.moves.input", return_value="No"):
with mock.patch(input_with_timeout, return_value="No"):
self.assertFalse(self.displayer.yesno(
"message", force_interactive=True))
with mock.patch("six.moves.input", side_effect=["cancel", "n"]):
with mock.patch(input_with_timeout, side_effect=["cancel", "n"]):
self.assertFalse(self.displayer.yesno(
"message", force_interactive=True))
with mock.patch("six.moves.input", return_value="a"):
with mock.patch(input_with_timeout, return_value="a"):
self.assertTrue(self.displayer.yesno(
"msg", yes_label="Agree", force_interactive=True))
@ -128,7 +167,7 @@ class FileOutputDisplayTest(unittest.TestCase):
self.assertTrue(self._force_noninteractive(
self.displayer.yesno, "message", default=True))
@mock.patch("certbot.display.util.six.moves.input")
@mock.patch("certbot.display.util.input_with_timeout")
def test_checklist_valid(self, mock_input):
mock_input.return_value = "2 1"
code, tag_list = self.displayer.checklist(
@ -136,21 +175,21 @@ class FileOutputDisplayTest(unittest.TestCase):
self.assertEqual(
(code, set(tag_list)), (display_util.OK, set(["tag1", "tag2"])))
@mock.patch("certbot.display.util.six.moves.input")
@mock.patch("certbot.display.util.input_with_timeout")
def test_checklist_empty(self, mock_input):
mock_input.return_value = ""
code, tag_list = self.displayer.checklist("msg", TAGS, force_interactive=True)
self.assertEqual(
(code, set(tag_list)), (display_util.OK, set(["tag1", "tag2", "tag3"])))
@mock.patch("certbot.display.util.six.moves.input")
@mock.patch("certbot.display.util.input_with_timeout")
def test_checklist_miss_valid(self, mock_input):
mock_input.side_effect = ["10", "tag1 please", "1"]
ret = self.displayer.checklist("msg", TAGS, force_interactive=True)
self.assertEqual(ret, (display_util.OK, ["tag1"]))
@mock.patch("certbot.display.util.six.moves.input")
@mock.patch("certbot.display.util.input_with_timeout")
def test_checklist_miss_quit(self, mock_input):
mock_input.side_effect = ["10", "c"]
@ -182,7 +221,7 @@ class FileOutputDisplayTest(unittest.TestCase):
self.displayer._scrub_checklist_input(list_, TAGS))
self.assertEqual(set_tags, exp[i])
@mock.patch("certbot.display.util.six.moves.input")
@mock.patch("certbot.display.util.input_with_timeout")
def test_directory_select(self, mock_input):
# pylint: disable=star-args
args = ["msg", "/var/www/html", "--flag", True]
@ -246,11 +285,12 @@ class FileOutputDisplayTest(unittest.TestCase):
def test_get_valid_int_ans_valid(self):
# pylint: disable=protected-access
with mock.patch("six.moves.input", return_value="1"):
input_with_timeout = "certbot.display.util.input_with_timeout"
with mock.patch(input_with_timeout, return_value="1"):
self.assertEqual(
self.displayer._get_valid_int_ans(1), (display_util.OK, 1))
ans = "2"
with mock.patch("six.moves.input", return_value=ans):
with mock.patch(input_with_timeout, return_value=ans):
self.assertEqual(
self.displayer._get_valid_int_ans(3),
(display_util.OK, int(ans)))
@ -262,8 +302,9 @@ class FileOutputDisplayTest(unittest.TestCase):
["4", "one", "C"],
["c"],
]
input_with_timeout = "certbot.display.util.input_with_timeout"
for ans in answers:
with mock.patch("six.moves.input", side_effect=ans):
with mock.patch(input_with_timeout, side_effect=ans):
self.assertEqual(
self.displayer._get_valid_int_ans(3),
(display_util.CANCEL, -1))