mirror of
https://github.com/certbot/certbot.git
synced 2026-06-06 15:22:38 -04:00
Add unit tests for listener child process
This commit is contained in:
parent
e9b67ff6f9
commit
f9b0d8d0bf
1 changed files with 104 additions and 4 deletions
|
|
@ -8,6 +8,33 @@ import pkg_resources
|
|||
from letsencrypt.client.challenge_util import DvsniChall
|
||||
|
||||
|
||||
# ErrorAfter/CallableExhausted from
|
||||
# http://igorsobreira.com/2013/03/17/testing-infinite-loops.html
|
||||
# to allow interrupting infinite loop under test after one
|
||||
# iteration.
|
||||
|
||||
class ErrorAfter_socket_accept(object):
|
||||
"""
|
||||
Callable that will raise `CallableExhausted`
|
||||
exception after `limit` calls, modified to also return
|
||||
a tuple simulating the return values of a socket.accept()
|
||||
call
|
||||
"""
|
||||
def __init__(self, limit):
|
||||
self.limit = limit
|
||||
self.calls = 0
|
||||
|
||||
def __call__(self):
|
||||
self.calls += 1
|
||||
if self.calls > self.limit:
|
||||
raise CallableExhausted
|
||||
# Modified here for a single use as socket.accept()
|
||||
return (mock.MagicMock(), "ignored")
|
||||
|
||||
class CallableExhausted(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class PackAndUnpackTests(unittest.TestCase):
|
||||
def test_pack_and_unpack_bytes(self):
|
||||
from letsencrypt.client.standalone_authenticator import \
|
||||
|
|
@ -305,6 +332,83 @@ class DoParentProcessTest(unittest.TestCase):
|
|||
self.assertEqual(mock_signal.call_count, 3)
|
||||
|
||||
|
||||
class DoChildProcessTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
from letsencrypt.client.standalone_authenticator import \
|
||||
StandaloneAuthenticator
|
||||
from letsencrypt.client.challenge_util import dvsni_gen_cert
|
||||
from letsencrypt.client import le_util
|
||||
import OpenSSL.crypto
|
||||
self.authenticator = StandaloneAuthenticator()
|
||||
r = "x" * 32
|
||||
name, r_b64 = "example.com", le_util.jose_b64encode(r)
|
||||
RSA256_KEY = pkg_resources.resource_string(__name__,
|
||||
'testdata/rsa256_key.pem')
|
||||
nonce, key = "abcdef", le_util.Key("foo", RSA256_KEY)
|
||||
self.key = key
|
||||
self.cert = dvsni_gen_cert(name, r_b64, nonce, key)[0]
|
||||
self.authenticator.private_key = OpenSSL.crypto.load_privatekey(
|
||||
OpenSSL.crypto.FILETYPE_PEM, key.pem)
|
||||
self.authenticator.tasks = {"abcdef.acme.invalid": self.cert}
|
||||
self.authenticator.parent_pid = 12345
|
||||
|
||||
@mock.patch("letsencrypt.client.standalone_authenticator.socket.socket")
|
||||
@mock.patch("letsencrypt.client.standalone_authenticator.os.kill")
|
||||
@mock.patch("letsencrypt.client.standalone_authenticator.sys.exit")
|
||||
def test_do_child_process_cantbind1(self, mock_exit, mock_kill, mock_socket):
|
||||
import socket, signal
|
||||
mock_exit.side_effect = IndentationError("subprocess would exit here")
|
||||
eaccess = socket.error(socket.errno.EACCES, "Permission denied")
|
||||
sample_socket = mock.MagicMock()
|
||||
sample_socket.bind.side_effect = eaccess
|
||||
mock_socket.return_value = sample_socket
|
||||
# Using the IndentationError as an error that cannot easily be
|
||||
# generated at runtime, to indicate the behavior of sys.exit has
|
||||
# taken effect without actually causing the test process to exit.
|
||||
# (Just replacing it with a no-op causes logic errors because the
|
||||
# do_child_process code assumes that calling sys.exit() will
|
||||
# cause subsequent code not to be executed.)
|
||||
with self.assertRaises(IndentationError):
|
||||
result = self.authenticator.do_child_process(1717, self.key)
|
||||
self.assertEqual(mock_exit.call_count, 1)
|
||||
mock_kill.assert_called_once_with(12345, signal.SIGUSR2)
|
||||
|
||||
@mock.patch("letsencrypt.client.standalone_authenticator.socket.socket")
|
||||
@mock.patch("letsencrypt.client.standalone_authenticator.os.kill")
|
||||
@mock.patch("letsencrypt.client.standalone_authenticator.sys.exit")
|
||||
def test_do_child_process_cantbind2(self, mock_exit, mock_kill, mock_socket):
|
||||
import socket, signal
|
||||
mock_exit.side_effect = IndentationError("subprocess would exit here")
|
||||
eaccess = socket.error(socket.errno.EADDRINUSE, "Port already in use")
|
||||
sample_socket = mock.MagicMock()
|
||||
sample_socket.bind.side_effect = eaccess
|
||||
mock_socket.return_value = sample_socket
|
||||
with self.assertRaises(IndentationError):
|
||||
result = self.authenticator.do_child_process(1717, self.key)
|
||||
self.assertEqual(mock_exit.call_count, 1)
|
||||
mock_kill.assert_called_once_with(12345, signal.SIGUSR1)
|
||||
|
||||
@mock.patch("letsencrypt.client.standalone_authenticator.OpenSSL.SSL.Connection")
|
||||
@mock.patch("letsencrypt.client.standalone_authenticator.socket.socket")
|
||||
@mock.patch("letsencrypt.client.standalone_authenticator.os.kill")
|
||||
def test_do_child_process_success(self, mock_kill, mock_socket, mock_Connection):
|
||||
import socket, signal
|
||||
sample_socket = mock.MagicMock()
|
||||
sample_socket.accept.side_effect = ErrorAfter_socket_accept(2)
|
||||
mock_socket.return_value = sample_socket
|
||||
mock_Connection.return_value = mock.MagicMock()
|
||||
with self.assertRaises(CallableExhausted):
|
||||
result = self.authenticator.do_child_process(1717, self.key)
|
||||
mock_socket.assert_called_once_with()
|
||||
sample_socket.bind.assert_called_once_with(("0.0.0.0", 1717))
|
||||
sample_socket.listen.assert_called_once_with(1)
|
||||
self.assertEqual(sample_socket.accept.call_count, 3)
|
||||
mock_kill.assert_called_once_with(12345, signal.SIGIO)
|
||||
# TODO: We could have some tests about the fact that the listener
|
||||
# asks OpenSSL to negotiate a TLS connection (and correctly
|
||||
# sets the SNI callback function).
|
||||
|
||||
|
||||
class CleanupTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
from letsencrypt.client.standalone_authenticator import \
|
||||
|
|
@ -331,7 +435,3 @@ class CleanupTest(unittest.TestCase):
|
|||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
|
||||
# TODO: Unit tests for the following functions
|
||||
# do_child_process
|
||||
|
|
|
|||
Loading…
Reference in a new issue