100% coverage for manual test mode and related code.

This commit is contained in:
Jakub Warmuz 2015-07-19 11:02:17 +00:00
parent 09e658c486
commit 9e2682a025
No known key found for this signature in database
GPG key ID: 2A7BAD3A489B52EA
3 changed files with 61 additions and 10 deletions

View file

@ -153,22 +153,22 @@ class AuthHandler(object):
"""
active_achalls = []
for achall, resp in itertools.izip(achalls, resps):
# XXX: make sure that all achalls, including those
# corresponding to None or False returned from
# Authenticator are removed from the queue and thus avoid
# infinite loop
active_achalls.append(achall)
# Don't send challenges for None and False authenticator responses
if resp is not None and resp:
self.acme.answer_challenge(achall.challb, resp)
# TODO: answer_challenge returns challr, with URI,
# that can be used in _find_updated_challr
# comparisons...
active_achalls.append(achall)
if achall.domain in chall_update:
chall_update[achall.domain].append(achall)
else:
chall_update[achall.domain] = [achall]
else: # resp is None or not resp
# XXX: make sure that achalls corresponding to None or
# False returned from Authenticator are removed from
# the queue and thus avoid infinite loop
active_achalls.append(achall)
return active_achalls

View file

@ -89,6 +89,7 @@ s.serve_forever()" """
else self.HTTPS_TEMPLATE)
self._root = (tempfile.mkdtemp() if self.conf("test-mode")
else "/tmp/letsencrypt")
self._httpd = None
@classmethod
def add_parser_arguments(cls, add):
@ -136,7 +137,6 @@ binary for temporary key/certificate generation.""".replace("\n", "")
if self.conf("test-mode"):
logger.debug("Test mode. Executing the manual command: %s", command)
try:
# pylint: disable=attribute-defined-outside-init
self._httpd = subprocess.Popen(
command,
# don't care about setting stdout and stderr,
@ -146,13 +146,13 @@ binary for temporary key/certificate generation.""".replace("\n", "")
preexec_fn=os.setsid)
except OSError as error: # ValueError should not happen!
logging.debug(
"Couldn't execute manual command", error, exc_info=True)
"Couldn't execute manual command: %s", error, exc_info=True)
return False
logger.debug("Manual command running as PID %s.", self._httpd.pid)
# give it some time to bootstrap, before we try to verify
# (cert generation in case of simpleHttpS might take time)
time.sleep(4) # XXX
if self._httpd.poll():
if self._httpd.poll() is not None:
raise errors.Error("Couldn't execute manual command")
else:
self._notify_and_wait(self.MESSAGE_TEMPLATE.format(
@ -164,7 +164,8 @@ binary for temporary key/certificate generation.""".replace("\n", "")
achall.challb, achall.domain, self.config.simple_http_port):
return response
else:
if self.conf("test-mode") and self._httpd.poll():
if self.conf("test-mode") and self._httpd.poll() is not None:
# simply verify cause command failure...
return False
return None
@ -178,6 +179,8 @@ binary for temporary key/certificate generation.""".replace("\n", "")
def cleanup(self, achalls):
# pylint: disable=missing-docstring,no-self-use,unused-argument
if self.conf("test-mode"):
assert self._httpd is not None, (
"cleanup() must be called after perform()")
if self._httpd.poll() is None:
logger.debug("Terminating manual command process")
os.killpg(self._httpd.pid, signal.SIGTERM)

View file

@ -1,4 +1,5 @@
"""Tests for letsencrypt.plugins.manual."""
import signal
import unittest
import mock
@ -6,6 +7,8 @@ import mock
from acme import challenges
from letsencrypt import achallenges
from letsencrypt import errors
from letsencrypt.tests import acme_util
@ -21,6 +24,12 @@ class ManualAuthenticatorTest(unittest.TestCase):
self.achalls = [achallenges.SimpleHTTP(
challb=acme_util.SIMPLE_HTTP, domain="foo.com", key=None)]
config_test_mode = mock.MagicMock(
no_simple_http_tls=True, simple_http_port=4430,
manual_test_mode=True)
self.auth_test_mode = ManualAuthenticator(
config=config_test_mode, name="manual")
def test_more_info(self):
self.assertTrue(isinstance(self.auth.more_info(), str))
@ -52,6 +61,45 @@ class ManualAuthenticatorTest(unittest.TestCase):
mock_verify.return_value = False
self.assertEqual([None], self.auth.perform(self.achalls))
@mock.patch("letsencrypt.plugins.manual.subprocess.Popen", autospec=True)
def test_perform_test_command_oserror(self, mock_popen):
mock_popen.side_effect = OSError
self.assertEqual([False], self.auth_test_mode.perform(self.achalls))
@mock.patch("letsencrypt.plugins.manual.time.sleep", autospec=True)
@mock.patch("letsencrypt.plugins.manual.subprocess.Popen", autospec=True)
def test_perform_test_command_run_failure(
self, mock_popen, unused_mock_sleep):
mock_popen.poll.return_value = 10
mock_popen.return_value.pid = 1234
self.assertRaises(
errors.Error, self.auth_test_mode.perform, self.achalls)
@mock.patch("letsencrypt.plugins.manual.time.sleep", autospec=True)
@mock.patch("acme.challenges.SimpleHTTPResponse.simple_verify",
autospec=True)
@mock.patch("letsencrypt.plugins.manual.subprocess.Popen", autospec=True)
def test_perform_test_mode(self, mock_popen, mock_verify, mock_sleep):
mock_popen.return_value.poll.side_effect = [None, 10]
mock_popen.return_value.pid = 1234
mock_verify.return_value = False
self.assertEqual([False], self.auth_test_mode.perform(self.achalls))
self.assertEqual(1, mock_sleep.call_count)
def test_cleanup_test_mode_already_terminated(self):
# pylint: disable=protected-access
self.auth_test_mode._httpd = httpd = mock.Mock()
httpd.poll.return_value = 0
self.auth_test_mode.cleanup(self.achalls)
@mock.patch("letsencrypt.plugins.manual.os.killpg", autospec=True)
def test_cleanup_test_mode_kills_still_running(self, mock_killpg):
# pylint: disable=protected-access
self.auth_test_mode._httpd = httpd = mock.Mock(pid=1234)
httpd.poll.return_value = None
self.auth_test_mode.cleanup(self.achalls)
mock_killpg.assert_called_once_with(1234, signal.SIGTERM)
if __name__ == "__main__":
unittest.main() # pragma: no cover