From e9b67ff6f9153292f29f97fc1ff7bc2356f817d0 Mon Sep 17 00:00:00 2001 From: Seth Schoen Date: Tue, 3 Feb 2015 17:59:35 -0800 Subject: [PATCH] Several more unit tests for StandaloneAuthenticator --- .../tests/standalone_authenticator_test.py | 137 +++++++++++++++++- 1 file changed, 129 insertions(+), 8 deletions(-) diff --git a/letsencrypt/client/tests/standalone_authenticator_test.py b/letsencrypt/client/tests/standalone_authenticator_test.py index aa614c22d..978550e4f 100644 --- a/letsencrypt/client/tests/standalone_authenticator_test.py +++ b/letsencrypt/client/tests/standalone_authenticator_test.py @@ -47,6 +47,7 @@ class TLSParseClientHelloTest(unittest.TestCase): # defective or surprising in distinct ways. (Each invalid TLS # record is invalid in its own way.) + class TLSGenerateServerHelloTest(unittest.TestCase): def test_tls_generate_server_hello(self): from letsencrypt.client.standalone_authenticator import \ @@ -142,17 +143,17 @@ class ClientSignalHandlerTest(unittest.TestCase): def test_client_signal_handler(self): import signal - self.assertEquals(self.authenticator.subproc_ready, False) - self.assertEquals(self.authenticator.subproc_inuse, False) - self.assertEquals(self.authenticator.subproc_cantbind, False) + self.assertFalse(self.authenticator.subproc_ready) + self.assertFalse(self.authenticator.subproc_inuse) + self.assertFalse(self.authenticator.subproc_cantbind) self.authenticator.client_signal_handler(signal.SIGIO, None) - self.assertEquals(self.authenticator.subproc_ready, True) + self.assertTrue(self.authenticator.subproc_ready) self.authenticator.client_signal_handler(signal.SIGUSR1, None) - self.assertEquals(self.authenticator.subproc_inuse, True) + self.assertTrue(self.authenticator.subproc_inuse) self.authenticator.client_signal_handler(signal.SIGUSR2, None) - self.assertEquals(self.authenticator.subproc_cantbind, True) + self.assertTrue(self.authenticator.subproc_cantbind) class SubprocSignalHandlerTest(unittest.TestCase): def setUp(self): @@ -183,6 +184,127 @@ class SubprocSignalHandlerTest(unittest.TestCase): self.assertEquals(mock_exit.call_count, 1) +class PerformTest(unittest.TestCase): + def setUp(self): + from letsencrypt.client.standalone_authenticator import \ + StandaloneAuthenticator + self.authenticator = StandaloneAuthenticator() + + def test_can_perform(self): + """What happens if start_listener() returns True.""" + from letsencrypt.client import le_util + RSA256_KEY = pkg_resources.resource_string(__name__, + 'testdata/rsa256_key.pem') + key = le_util.Key("something", RSA256_KEY) + chall1 = DvsniChall("foo.example.com", "whee", "foononce", key) + chall2 = DvsniChall("bar.example.com", "whee", "barnonce", key) + bad_chall = ("This", "Represents", "A Non-DVSNI", "Challenge") + self.authenticator.start_listener = mock.Mock() + self.authenticator.start_listener.return_value = True + result = self.authenticator.perform([chall1, chall2, bad_chall]) + self.assertEqual(len(self.authenticator.tasks), 2) + self.assertTrue( + self.authenticator.tasks.has_key("foononce.acme.invalid")) + self.assertTrue( + self.authenticator.tasks.has_key("barnonce.acme.invalid")) + self.assertIsInstance(result, list) + self.assertEqual(len(result), 3) + self.assertIsInstance(result[0], dict) + self.assertIsInstance(result[1], dict) + self.assertFalse(result[2]) + self.assertTrue(result[0].has_key("s")) + self.assertTrue(result[1].has_key("s")) + self.assertEqual(self.authenticator.start_listener.call_count, 1) + + def test_cannot_perform(self): + """What happens if start_listener() returns False.""" + from letsencrypt.client import le_util + RSA256_KEY = pkg_resources.resource_string(__name__, + 'testdata/rsa256_key.pem') + key = le_util.Key("something", RSA256_KEY) + chall1 = DvsniChall("foo.example.com", "whee", "foononce", key) + chall2 = DvsniChall("bar.example.com", "whee", "barnonce", key) + bad_chall = ("This", "Represents", "A Non-DVSNI", "Challenge") + self.authenticator.start_listener = mock.Mock() + self.authenticator.start_listener.return_value = False + result = self.authenticator.perform([chall1, chall2, bad_chall]) + self.assertEqual(len(self.authenticator.tasks), 2) + self.assertTrue( + self.authenticator.tasks.has_key("foononce.acme.invalid")) + self.assertTrue( + self.authenticator.tasks.has_key("barnonce.acme.invalid")) + self.assertIsInstance(result, list) + self.assertEqual(len(result), 3) + self.assertEqual(result, [None, None, False]) + self.assertEqual(self.authenticator.start_listener.call_count, 1) + +class StartListenerTest(unittest.TestCase): + def setUp(self): + from letsencrypt.client.standalone_authenticator import \ + StandaloneAuthenticator + self.authenticator = StandaloneAuthenticator() + + @mock.patch("letsencrypt.client.standalone_authenticator.Crypto.Random.atfork") + @mock.patch("letsencrypt.client.standalone_authenticator.os.fork") + def test_start_listener_fork_parent(self, mock_fork, mock_atfork): + self.authenticator.do_parent_process = mock.Mock() + mock_fork.return_value = 22222 + self.authenticator.start_listener(1717, "key") + self.assertEqual(self.authenticator.child_pid, 22222) + self.assertEqual(self.authenticator.do_parent_process.call_count, 1) + self.assertEqual(mock_atfork.call_count, 1) + + @mock.patch("letsencrypt.client.standalone_authenticator.Crypto.Random.atfork") + @mock.patch("letsencrypt.client.standalone_authenticator.os.fork") + def test_start_listener_fork_child(self, mock_fork, mock_atfork): + import os + self.authenticator.do_parent_process = mock.Mock() + self.authenticator.do_child_process = mock.Mock() + mock_fork.return_value = 0 + self.authenticator.start_listener(1717, "key") + self.assertEqual(self.authenticator.child_pid, os.getpid()) + self.assertEqual(self.authenticator.do_child_process.call_count, 1) + self.assertEqual(mock_atfork.call_count, 1) + +class DoParentProcessTest(unittest.TestCase): + def setUp(self): + from letsencrypt.client.standalone_authenticator import \ + StandaloneAuthenticator + self.authenticator = StandaloneAuthenticator() + + @mock.patch("letsencrypt.client.standalone_authenticator.signal.signal") + @mock.patch("letsencrypt.client.standalone_authenticator.zope.component.getUtility") + def test_do_parent_process_ok(self, mock_getUtility, mock_signal): + self.authenticator.subproc_ready = True + result = self.authenticator.do_parent_process(1717) + self.assertTrue(result) + self.assertEqual(mock_signal.call_count, 3) + + @mock.patch("letsencrypt.client.standalone_authenticator.signal.signal") + @mock.patch("letsencrypt.client.standalone_authenticator.zope.component.getUtility") + def test_do_parent_process_inuse(self, mock_getUtility, mock_signal): + self.authenticator.subproc_inuse = True + result = self.authenticator.do_parent_process(1717) + self.assertFalse(result) + self.assertEqual(mock_signal.call_count, 3) + + @mock.patch("letsencrypt.client.standalone_authenticator.signal.signal") + @mock.patch("letsencrypt.client.standalone_authenticator.zope.component.getUtility") + def test_do_parent_process_cantbind(self, mock_getUtility, mock_signal): + self.authenticator.subproc_cantbind = True + result = self.authenticator.do_parent_process(1717) + self.assertFalse(result) + self.assertEqual(mock_signal.call_count, 3) + + @mock.patch("letsencrypt.client.standalone_authenticator.signal.signal") + @mock.patch("letsencrypt.client.standalone_authenticator.zope.component.getUtility") + def test_do_parent_process_timeout(self, mock_getUtility, mock_signal): + # Times out in 5 seconds and returns False. + result = self.authenticator.do_parent_process(1717) + self.assertFalse(result) + self.assertEqual(mock_signal.call_count, 3) + + class CleanupTest(unittest.TestCase): def setUp(self): from letsencrypt.client.standalone_authenticator import \ @@ -212,5 +334,4 @@ if __name__ == '__main__': # TODO: Unit tests for the following functions -# def start_listener(self, port, key): -# def perform(self, chall_list): +# do_child_process