From f9b0d8d0bf632d14def5ff6f5837cb33579a7b5e Mon Sep 17 00:00:00 2001 From: Seth Schoen Date: Wed, 4 Feb 2015 22:17:11 -0800 Subject: [PATCH] Add unit tests for listener child process --- .../tests/standalone_authenticator_test.py | 108 +++++++++++++++++- 1 file changed, 104 insertions(+), 4 deletions(-) diff --git a/letsencrypt/client/tests/standalone_authenticator_test.py b/letsencrypt/client/tests/standalone_authenticator_test.py index 978550e4f..97ccaa5a5 100644 --- a/letsencrypt/client/tests/standalone_authenticator_test.py +++ b/letsencrypt/client/tests/standalone_authenticator_test.py @@ -8,6 +8,33 @@ import pkg_resources from letsencrypt.client.challenge_util import DvsniChall +# ErrorAfter/CallableExhausted from +# http://igorsobreira.com/2013/03/17/testing-infinite-loops.html +# to allow interrupting infinite loop under test after one +# iteration. + +class ErrorAfter_socket_accept(object): + """ + Callable that will raise `CallableExhausted` + exception after `limit` calls, modified to also return + a tuple simulating the return values of a socket.accept() + call + """ + def __init__(self, limit): + self.limit = limit + self.calls = 0 + + def __call__(self): + self.calls += 1 + if self.calls > self.limit: + raise CallableExhausted + # Modified here for a single use as socket.accept() + return (mock.MagicMock(), "ignored") + +class CallableExhausted(Exception): + pass + + class PackAndUnpackTests(unittest.TestCase): def test_pack_and_unpack_bytes(self): from letsencrypt.client.standalone_authenticator import \ @@ -305,6 +332,83 @@ class DoParentProcessTest(unittest.TestCase): self.assertEqual(mock_signal.call_count, 3) +class DoChildProcessTest(unittest.TestCase): + def setUp(self): + from letsencrypt.client.standalone_authenticator import \ + StandaloneAuthenticator + from letsencrypt.client.challenge_util import dvsni_gen_cert + from letsencrypt.client import le_util + import OpenSSL.crypto + self.authenticator = StandaloneAuthenticator() + r = "x" * 32 + name, r_b64 = "example.com", le_util.jose_b64encode(r) + RSA256_KEY = pkg_resources.resource_string(__name__, + 'testdata/rsa256_key.pem') + nonce, key = "abcdef", le_util.Key("foo", RSA256_KEY) + self.key = key + self.cert = dvsni_gen_cert(name, r_b64, nonce, key)[0] + self.authenticator.private_key = OpenSSL.crypto.load_privatekey( + OpenSSL.crypto.FILETYPE_PEM, key.pem) + self.authenticator.tasks = {"abcdef.acme.invalid": self.cert} + self.authenticator.parent_pid = 12345 + + @mock.patch("letsencrypt.client.standalone_authenticator.socket.socket") + @mock.patch("letsencrypt.client.standalone_authenticator.os.kill") + @mock.patch("letsencrypt.client.standalone_authenticator.sys.exit") + def test_do_child_process_cantbind1(self, mock_exit, mock_kill, mock_socket): + import socket, signal + mock_exit.side_effect = IndentationError("subprocess would exit here") + eaccess = socket.error(socket.errno.EACCES, "Permission denied") + sample_socket = mock.MagicMock() + sample_socket.bind.side_effect = eaccess + mock_socket.return_value = sample_socket + # Using the IndentationError as an error that cannot easily be + # generated at runtime, to indicate the behavior of sys.exit has + # taken effect without actually causing the test process to exit. + # (Just replacing it with a no-op causes logic errors because the + # do_child_process code assumes that calling sys.exit() will + # cause subsequent code not to be executed.) + with self.assertRaises(IndentationError): + result = self.authenticator.do_child_process(1717, self.key) + self.assertEqual(mock_exit.call_count, 1) + mock_kill.assert_called_once_with(12345, signal.SIGUSR2) + + @mock.patch("letsencrypt.client.standalone_authenticator.socket.socket") + @mock.patch("letsencrypt.client.standalone_authenticator.os.kill") + @mock.patch("letsencrypt.client.standalone_authenticator.sys.exit") + def test_do_child_process_cantbind2(self, mock_exit, mock_kill, mock_socket): + import socket, signal + mock_exit.side_effect = IndentationError("subprocess would exit here") + eaccess = socket.error(socket.errno.EADDRINUSE, "Port already in use") + sample_socket = mock.MagicMock() + sample_socket.bind.side_effect = eaccess + mock_socket.return_value = sample_socket + with self.assertRaises(IndentationError): + result = self.authenticator.do_child_process(1717, self.key) + self.assertEqual(mock_exit.call_count, 1) + mock_kill.assert_called_once_with(12345, signal.SIGUSR1) + + @mock.patch("letsencrypt.client.standalone_authenticator.OpenSSL.SSL.Connection") + @mock.patch("letsencrypt.client.standalone_authenticator.socket.socket") + @mock.patch("letsencrypt.client.standalone_authenticator.os.kill") + def test_do_child_process_success(self, mock_kill, mock_socket, mock_Connection): + import socket, signal + sample_socket = mock.MagicMock() + sample_socket.accept.side_effect = ErrorAfter_socket_accept(2) + mock_socket.return_value = sample_socket + mock_Connection.return_value = mock.MagicMock() + with self.assertRaises(CallableExhausted): + result = self.authenticator.do_child_process(1717, self.key) + mock_socket.assert_called_once_with() + sample_socket.bind.assert_called_once_with(("0.0.0.0", 1717)) + sample_socket.listen.assert_called_once_with(1) + self.assertEqual(sample_socket.accept.call_count, 3) + mock_kill.assert_called_once_with(12345, signal.SIGIO) + # TODO: We could have some tests about the fact that the listener + # asks OpenSSL to negotiate a TLS connection (and correctly + # sets the SNI callback function). + + class CleanupTest(unittest.TestCase): def setUp(self): from letsencrypt.client.standalone_authenticator import \ @@ -331,7 +435,3 @@ class CleanupTest(unittest.TestCase): if __name__ == '__main__': unittest.main() - - -# TODO: Unit tests for the following functions -# do_child_process