Enhance display.util to support input validation (#4372)

* display: support validation of user input

To avoid each caller of `display.input` and `display.directory_select`
needing to implement validation logic, this allows for a validator to be
supplied as a part of the call.

Following the existing pattern from `webroot`, this validator is expected
to throw a `Error` when it encounters invalid input. The user then
receives a `notification` is re-prompted.

Testing Done:
 * tox -e py27
 * tox -e lint

* plugins: update webroot to use display's validation functionality

This change updates the webroot plugin to use the now-built-in validation
functionality in display, reducing duplicated code.

Testing Done:
 * tox -e py27
 * tox -e lint

* display: move validation logic to ops

To avoid adding complexity to `IDisplay` methods, move validation logic
to helper methods in `display.ops`.

Testing Done:
 * tox -e py27
 * tox -e lint
This commit is contained in:
Zach Shepherd 2017-04-24 17:36:00 -07:00 committed by Brad Warren
parent b079d0da2f
commit b41472afce
5 changed files with 148 additions and 30 deletions

View file

@ -289,3 +289,60 @@ def _gen_https_names(domains):
domains[-1])
return ""
def _get_validated(method, validator, message, default=None, **kwargs):
if default is not None:
try:
validator(default)
except errors.Error as error:
logger.debug('Encountered invalid default value "%s" when prompting for "%s"',
default,
message,
exc_info=True)
raise AssertionError('Invalid default "{0}"'.format(default))
while True:
code, raw = method(message, default=default, **kwargs)
if code == display_util.OK:
try:
validator(raw)
return code, raw
except errors.Error as error:
logger.debug('Validator rejected "%s" when prompting for "%s"',
raw,
message,
exc_info=True)
zope.component.getUtility(interfaces.IDisplay).notification(str(error), pause=False)
else:
return code, raw
def validated_input(validator, *args, **kwargs):
"""Like `~certbot.interfaces.IDisplay.input`, but with validation.
:param callable validator: A method which will be called on the
supplied input. If the method raises a `errors.Error`, its
text will be displayed and the user will be re-prompted.
:param list *args: Arguments to be passed to `~certbot.interfaces.IDisplay.input`
:param dict **kwargs: Arguments to be passed to `~certbot.interfaces.IDisplay.input`
:return: as `~certbot.interfaces.IDisplay.input`
:rtype: tuple
"""
return _get_validated(zope.component.getUtility(interfaces.IDisplay).input,
validator, *args, **kwargs)
def validated_directory(validator, *args, **kwargs):
"""Like `~certbot.interfaces.IDisplay.directory_select`, but with validation.
:param callable validator: A method which will be called on the
supplied input. If the method raises a `errors.Error`, its
text will be displayed and the user will be re-prompted.
:param list *args: Arguments to be passed to `~certbot.interfaces.IDisplay.directory_select`
:param dict **kwargs: Arguments to be passed to `~certbot.interfaces.IDisplay.directory_select`
:return: as `~certbot.interfaces.IDisplay.directory_select`
:rtype: tuple
"""
return _get_validated(zope.component.getUtility(interfaces.IDisplay).directory_select,
validator, *args, **kwargs)

View file

@ -123,7 +123,6 @@ class FileDisplay(object):
def input(self, message, default=None,
cli_flag=None, force_interactive=False, **unused_kwargs):
# pylint: disable=no-self-use
"""Accept input from the user.
:param str message: message to display to the user

View file

@ -16,6 +16,7 @@ from certbot import cli
from certbot import errors
from certbot import interfaces
from certbot.display import util as display_util
from certbot.display import ops
from certbot.plugins import common
@ -136,22 +137,17 @@ to serve all files under specified web root ({0})."""
return None if index == 0 else known_webroots[index - 1]
def _prompt_for_new_webroot(self, domain):
display = zope.component.getUtility(interfaces.IDisplay)
while True:
code, webroot = display.directory_select(
"Input the webroot for {0}:".format(domain),
force_interactive=True)
if code == display_util.HELP:
# Displaying help is not currently implemented
return None
elif code == display_util.CANCEL:
return None
else: # code == display_util.OK
try:
return _validate_webroot(webroot)
except errors.PluginError as error:
display.notification(str(error), pause=False)
code, webroot = ops.validated_directory(
_validate_webroot,
"Input the webroot for {0}:".format(domain),
force_interactive=True)
if code == display_util.HELP:
# Displaying help is not currently implemented
return None
elif code == display_util.CANCEL or code == display_util.ESC:
return None
else: # code == display_util.OK
return _validate_webroot(webroot)
def _create_challenge_dirs(self):
path_map = self.conf("map")

View file

@ -100,22 +100,16 @@ class AuthenticatorTest(unittest.TestCase):
self.config.webroot_path = []
self.config.webroot_map = {}
imaginary_dir = os.path.join(os.sep, "imaginary", "dir")
mock_display = mock_get_utility()
mock_display.menu.return_value = (display_util.OK, 0,)
mock_display.directory_select.side_effect = (
(display_util.HELP, -1,), (display_util.CANCEL, -1,),
(display_util.OK, imaginary_dir,), (display_util.OK, self.path,),)
self.auth.perform([self.achall])
with mock.patch('certbot.display.ops.validated_directory') as m:
m.side_effect = ((display_util.HELP, -1),
(display_util.CANCEL, -1),
(display_util.OK, self.path,))
self.assertTrue(mock_display.notification.called)
for call in mock_display.notification.call_args_list:
self.assertTrue(imaginary_dir in call[0][0])
self.auth.perform([self.achall])
self.assertTrue(mock_display.directory_select.called)
for call in mock_display.directory_select.call_args_list:
self.assertTrue(self.achall.domain in call[0][0])
self.assertEqual(self.config.webroot_map[self.achall.domain], self.path)
def test_perform_missing_root(self):
self.config.webroot_path = None

View file

@ -14,6 +14,7 @@ from certbot import account
from certbot import errors
from certbot.display import util as display_util
from certbot.display import ops
import certbot.tests.util as test_util
@ -112,7 +113,6 @@ class ChooseAccountTest(test_util.TempDirTestCase):
@classmethod
def _call(cls, accounts):
from certbot.display import ops
return ops.choose_account(accounts)
@test_util.patch_get_utility("certbot.display.ops.z_util")
@ -405,5 +405,77 @@ class SuccessRevocationTest(unittest.TestCase):
os.linesep), pause=False)
self.assertTrue(path in mock_util().notification.call_args[0][0])
class ValidatorTests(unittest.TestCase):
"""Tests for `validated_input` and `validated_directory`."""
__ERROR = "Must be non-empty"
valid_input = "asdf"
valid_directory = "/var/www/html"
@staticmethod
def __validator(m):
if m == "":
raise errors.PluginError(ValidatorTests.__ERROR)
@test_util.patch_get_utility()
def test_input_blank_with_validator(self, mock_util):
mock_util().input.side_effect = [(display_util.OK, ""),
(display_util.OK, ""),
(display_util.OK, ""),
(display_util.OK, self.valid_input)]
returned = ops.validated_input(self.__validator, "message", force_interactive=True)
self.assertEqual(ValidatorTests.__ERROR, mock_util().notification.call_args[0][0])
self.assertEqual(returned, (display_util.OK, self.valid_input))
@test_util.patch_get_utility()
def test_input_validation_with_default(self, mock_util):
mock_util().input.side_effect = [(display_util.OK, self.valid_input)]
returned = ops.validated_input(self.__validator, "msg", default="other")
self.assertEqual(returned, (display_util.OK, self.valid_input))
@test_util.patch_get_utility()
def test_input_validation_with_bad_default(self, mock_util):
mock_util().input.side_effect = [(display_util.OK, self.valid_input)]
self.assertRaises(AssertionError,
ops.validated_input,
self.__validator, "msg", default="")
@test_util.patch_get_utility()
def test_input_cancel_with_validator(self, mock_util):
mock_util().input.side_effect = [(display_util.CANCEL, "")]
code, unused_raw = ops.validated_input(self.__validator, "message", force_interactive=True)
self.assertEqual(code, display_util.CANCEL)
@test_util.patch_get_utility()
def test_directory_select_validation(self, mock_util):
mock_util().directory_select.side_effect = [(display_util.OK, ""),
(display_util.OK, self.valid_directory)]
returned = ops.validated_directory(self.__validator, "msg", force_interactive=True)
self.assertEqual(ValidatorTests.__ERROR, mock_util().notification.call_args[0][0])
self.assertEqual(returned, (display_util.OK, self.valid_directory))
@test_util.patch_get_utility()
def test_directory_select_validation_with_default(self, mock_util):
mock_util().directory_select.side_effect = [(display_util.OK, self.valid_directory)]
returned = ops.validated_directory(self.__validator, "msg", default="other")
self.assertEqual(returned, (display_util.OK, self.valid_directory))
@test_util.patch_get_utility()
def test_directory_select_validation_with_bad_default(self, mock_util):
mock_util().directory_select.side_effect = [(display_util.OK, self.valid_directory)]
self.assertRaises(AssertionError,
ops.validated_directory,
self.__validator, "msg", default="")
if __name__ == "__main__":
unittest.main() # pragma: no cover