diff --git a/docs/api/client/standalone_authenticator.rst b/docs/api/client/standalone_authenticator.rst new file mode 100644 index 000000000..d05f4f057 --- /dev/null +++ b/docs/api/client/standalone_authenticator.rst @@ -0,0 +1,5 @@ +:mod:`letsencrypt.client.standalone_authenticator` +-------------------------------------------------- + +.. automodule:: letsencrypt.client.standalone_authenticator + :members: diff --git a/docs/using.rst b/docs/using.rst index 441bf1623..d5b008670 100644 --- a/docs/using.rst +++ b/docs/using.rst @@ -24,7 +24,8 @@ Ubuntu :: sudo apt-get install python python-setuptools python-virtualenv python-dev \ - gcc swig dialog libaugeas0 libssl-dev ca-certificates + gcc swig dialog libaugeas0 libssl-dev libffi-dev \ + ca-certificates Mac OSX diff --git a/letsencrypt/client/apache/configurator.py b/letsencrypt/client/apache/configurator.py index bafe94f54..1c8bbbc14 100644 --- a/letsencrypt/client/apache/configurator.py +++ b/letsencrypt/client/apache/configurator.py @@ -457,11 +457,11 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator): ssl_addr_p = self.aug.match( addr_match % (ssl_fp, parser.case_i('VirtualHost'))) - for i in range(len(ssl_addr_p)): + for addr in ssl_addr_p: old_addr = obj.Addr.fromstring( - str(self.aug.get(ssl_addr_p[i]))) + str(self.aug.get(addr))) ssl_addr = old_addr.get_addr_obj("443") - self.aug.set(ssl_addr_p[i], str(ssl_addr)) + self.aug.set(addr, str(ssl_addr)) ssl_addrs.add(ssl_addr) # Add directives diff --git a/letsencrypt/client/apache/parser.py b/letsencrypt/client/apache/parser.py index 0a5eff97c..b713c8f6a 100644 --- a/letsencrypt/client/apache/parser.py +++ b/letsencrypt/client/apache/parser.py @@ -77,13 +77,12 @@ class ApacheParser(object): """ self.aug.set(aug_conf_path + "/directive[last() + 1]", directive) - if type(arg) is not list: - self.aug.set(aug_conf_path + "/directive[last()]/arg", arg) + if isinstance(arg, list): + for i, value in enumerate(arg, 1): + self.aug.set( + "%s/directive[last()]/arg[%d]" % (aug_conf_path, i), value) else: - for i in range(len(arg)): - self.aug.set("%s/directive[last()]/arg[%d]" % - (aug_conf_path, (i+1)), - arg[i]) + self.aug.set(aug_conf_path + "/directive[last()]/arg", arg) def find_dir(self, directive, arg=None, start=None): """Finds directive in the configuration. @@ -96,7 +95,7 @@ class ApacheParser(object): Note: Augeas is inherently case sensitive while Apache is case insensitive. Augeas 1.0 allows case insensitive regexes like - regexp(/Listen/, 'i'), however the version currently supported + regexp(/Listen/, "i"), however the version currently supported by Ubuntu 0.10 does not. Thus I have included my own case insensitive transformation by calling case_i() on everything to maintain compatibility. @@ -119,10 +118,11 @@ class ApacheParser(object): # No regexp code # if arg is None: # matches = self.aug.match(start + - # "//*[self::directive='"+directive+"']/arg") + # "//*[self::directive='" + directive + "']/arg") # else: # matches = self.aug.match(start + - # "//*[self::directive='" + directive+"']/* [self::arg='" + arg + "']") + # "//*[self::directive='" + directive + + # "']/* [self::arg='" + arg + "']") # includes = self.aug.match(start + # "//* [self::directive='Include']/* [label()='arg']") @@ -313,8 +313,8 @@ class ApacheParser(object): self.root + "/*/*/*.augsave", self.root + "/*/*/*~"] - for i in range(len(excl)): - self.aug.set("/augeas/load/Httpd/excl[%d]" % (i+1), excl[i]) + for i, excluded in enumerate(excl, 1): + self.aug.set("/augeas/load/Httpd/excl[%d]" % i, excluded) self.aug.load() diff --git a/letsencrypt/client/client.py b/letsencrypt/client/client.py index 196d260bd..45ed93c89 100644 --- a/letsencrypt/client/client.py +++ b/letsencrypt/client/client.py @@ -44,7 +44,6 @@ class Client(object): :type config: :class:`~letsencrypt.client.interfaces.IConfig` """ - zope.interface.implements(interfaces.IAuthenticator) def __init__(self, config, authkey, dv_auth, installer): """Initialize a client. @@ -388,14 +387,6 @@ def init_csr(privkey, names, cert_dir): return le_util.CSR(csr_filename, csr_der, "der") - -def csr_pem_to_der(csr): - """Convert pem CSR to der.""" - - csr_obj = M2Crypto.X509.load_request_string(csr.data) - return le_util.CSR(csr.file, csr_obj.as_der(), "der") - - # This should be controlled by commandline parameters def determine_authenticator(config): """Returns a valid IAuthenticator. diff --git a/letsencrypt/client/constants.py b/letsencrypt/client/constants.py index e30a4b725..291506940 100644 --- a/letsencrypt/client/constants.py +++ b/letsencrypt/client/constants.py @@ -36,7 +36,7 @@ List of expected options parameters: APACHE_MOD_SSL_CONF = pkg_resources.resource_filename( - 'letsencrypt.client.apache', 'options-ssl.conf') + "letsencrypt.client.apache", "options-ssl.conf") """Path to the Apache mod_ssl config file found in the Let's Encrypt distribution.""" diff --git a/letsencrypt/client/interfaces.py b/letsencrypt/client/interfaces.py index 8ae995d4f..9fcd95c6a 100644 --- a/letsencrypt/client/interfaces.py +++ b/letsencrypt/client/interfaces.py @@ -31,7 +31,7 @@ class IAuthenticator(zope.interface.Interface): :param list chall_list: List of namedtuple types defined in :mod:`letsencrypt.client.challenge_util` (``DvsniChall``, etc.). - :returns: Challenge responses or if it cannot be completed then: + :returns: ACME Challenge responses or if it cannot be completed then: ``None`` Authenticator can perform challenge, but can't at this time diff --git a/letsencrypt/client/reverter.py b/letsencrypt/client/reverter.py index 3f008fc38..0f808b7cf 100644 --- a/letsencrypt/client/reverter.py +++ b/letsencrypt/client/reverter.py @@ -424,7 +424,8 @@ class Reverter(object): # It is possible save checkpoints faster than 1 per second resulting in # collisions in the naming convention. cur_time = time.time() - for _ in range(10): + + for _ in xrange(10): final_dir = os.path.join(self.config.backup_dir, str(cur_time)) try: os.rename(self.config.in_progress_dir, final_dir) diff --git a/letsencrypt/client/standalone_authenticator.py b/letsencrypt/client/standalone_authenticator.py old mode 100755 new mode 100644 index a1b1daa58..81c3e381f --- a/letsencrypt/client/standalone_authenticator.py +++ b/letsencrypt/client/standalone_authenticator.py @@ -1,12 +1,4 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -"""An authenticator that doesn't rely on any existing server program. - -This authenticator creates its own ephemeral TCP listener on the specified -port in order to respond to incoming DVSNI challenges from the certificate -authority.""" - +"""Standalone authenticator.""" import os import signal import socket @@ -26,11 +18,14 @@ from letsencrypt.client import interfaces class StandaloneAuthenticator(object): # pylint: disable=too-many-instance-attributes - """The StandaloneAuthenticator class itself. + """Standalone authenticator. - This authenticator can be invoked by the Let's Encrypt client - according to the IAuthenticator API interface. It creates a local - TCP listener on a specified port and satisfies DVSNI challenges.""" + This authenticator creates its own ephemeral TCP listener on the + specified port in order to respond to incoming DVSNI challenges from + the certificate authority. Therefore, it does not rely on any + existing server program. + + """ zope.interface.implements(interfaces.IAuthenticator) def __init__(self): @@ -49,10 +44,12 @@ class StandaloneAuthenticator(object): This handler receives inter-process communication from the child process in the form of Unix signals. - :param int sig: Which signal the process received.""" - # subprocess → client READY : SIGIO - # subprocess → client INUSE : SIGUSR1 - # subprocess → client CANTBIND: SIGUSR2 + :param int sig: Which signal the process received. + + """ + # subprocess to client READY: SIGIO + # subprocess to client INUSE: SIGUSR1 + # subprocess to client CANTBIND: SIGUSR2 if sig == signal.SIGIO: self.subproc_state = "ready" elif sig == signal.SIGUSR1: @@ -69,8 +66,10 @@ class StandaloneAuthenticator(object): This handler receives inter-process communication from the parent process in the form of Unix signals. - :param int sig: Which signal the process received.""" - # client → subprocess CLEANUP : SIGINT + :param int sig: Which signal the process received. + + """ + # client to subprocess CLEANUP : SIGINT if sig == signal.SIGINT: try: self.ssl_conn.shutdown() @@ -91,6 +90,7 @@ class StandaloneAuthenticator(object): # reported here and none of them should impede us from # exiting as gracefully as possible. pass + os.kill(self.parent_pid, signal.SIGUSR1) sys.exit(0) @@ -101,18 +101,20 @@ class StandaloneAuthenticator(object): connection when an incoming connection provides an SNI name (in order to serve the appropriate certificate, if any). - :param OpenSSL.Connection connection: The TLS connection object - on which the SNI extension was received.""" + :param connection: The TLS connection object on which the SNI + extension was received. + :type connection: :class:`OpenSSL.Connection` + """ sni_name = connection.get_servername() if sni_name in self.tasks: pem_cert = self.tasks[sni_name] else: # TODO: Should we really present a certificate if we get an - # unexpected SNI name? Or should we just disconnect? + # unexpected SNI name? Or should we just disconnect? pem_cert = self.tasks.values()[0] - cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, - pem_cert) + cert = OpenSSL.crypto.load_certificate( + OpenSSL.crypto.FILETYPE_PEM, pem_cert) new_ctx = OpenSSL.SSL.Context(OpenSSL.SSL.TLSv1_METHOD) new_ctx.set_verify(OpenSSL.SSL.VERIFY_NONE, lambda: False) new_ctx.use_certificate(cert) @@ -122,32 +124,36 @@ class StandaloneAuthenticator(object): def do_parent_process(self, port, delay_amount=5): """Perform the parent process side of the TCP listener task. - This should only be called by start_listener(). We will wait - up to delay_amount seconds to hear from the child process via - a signal. + This should only be called by :meth:`start_listener`. We will + wait up to delay_amount seconds to hear from the child process + via a signal. :param int port: Which TCP port to bind. :param float delay_amount: How long in seconds to wait for the - subprocess to notify us whether it succeeded. + subprocess to notify us whether it succeeded. - :returns: True or False according to whether we were notified - that the child process succeeded or failed in binding the port.""" + :returns: ``True`` or ``False`` according to whether we were notified + that the child process succeeded or failed in binding the port. + :rtype: bool + """ signal.signal(signal.SIGIO, self.client_signal_handler) signal.signal(signal.SIGUSR1, self.client_signal_handler) signal.signal(signal.SIGUSR2, self.client_signal_handler) + display = zope.component.getUtility(interfaces.IDisplay) + start_time = time.time() while time.time() < start_time + delay_amount: if self.subproc_state == "ready": return True - if self.subproc_state == "inuse": + elif self.subproc_state == "inuse": display.generic_notification( "Could not bind TCP port {0} because it is already in " "use it is already in use by another process on this " "system (such as a web server).".format(port)) return False - if self.subproc_state == "cantbind": + elif self.subproc_state == "cantbind": display.generic_notification( "Could not bind TCP port {0} because you don't have " "the appropriate permissions (for example, you " @@ -155,23 +161,28 @@ class StandaloneAuthenticator(object): "root).".format(port)) return False time.sleep(0.1) + display.generic_notification( "Subprocess unexpectedly timed out while trying to bind TCP " "port {0}.".format(port)) + return False def do_child_process(self, port, key): """Perform the child process side of the TCP listener task. - This should only be called by start_listener(). + This should only be called by :meth:`start_listener`. Normally does not return; instead, the child process exits from within this function or from within the child process signal handler. :param int port: Which TCP port to bind. - :param le_util.Key key: The private key to use to respond to - DVSNI challenge requests.""" + :param key: The private key to use to respond to DVSNI challenge + requests. + :type key: `letsencrypt.client.le_util.Key` + + """ signal.signal(signal.SIGINT, self.subproc_signal_handler) self.sock = socket.socket() try: @@ -217,14 +228,20 @@ class StandaloneAuthenticator(object): self.ssl_conn.close() def start_listener(self, port, key): - """Create a child process which will start a TCP listener on the + """Start listener. + + Create a child process which will start a TCP listener on the specified port to perform the specified DVSNI challenges. :param int port: The TCP port to bind. - :param le_util.Key key: The private key to use to respond to - DVSNI challenge requests. - :returns: True or False to indicate success or failure creating - the subprocess. + :param key: The private key to use to respond to DVSNI challenge + requests. + :type key: :class:`letsencrypt.client.le_util.Key` + + :returns: ``True`` or ``False`` to indicate success or failure creating + the subprocess. + :rtype: bool + """ fork_result = os.fork() Crypto.Random.atfork() @@ -243,33 +260,38 @@ class StandaloneAuthenticator(object): # IAuthenticator method implementations follow - def get_chall_pref(self, unused_domain): - # pylint: disable=no-self-use - """IAuthenticator interface method get_chall_pref. + def get_chall_pref(self, unused_domain): # pylint: disable=no-self-use + """Get challenge preferences. + IAuthenticator interface method get_chall_pref. Return a list of challenge types that this authenticator can perform for this domain. In the case of the StandaloneAuthenticator, the only challenge type that can ever be performed is dvsni. - :returns: A list containing only 'dvsni'.""" + :returns: A list containing only 'dvsni'. + + """ return ["dvsni"] def perform(self, chall_list): - """IAuthenticator interface method perform. + """Perform the challenge. - Attempt to perform the - specified challenges, returning the status of each. For the - StandaloneAuthenticator, because there is no convenient way to add - additional requests, this should only be invoked once; subsequent - invocations are an error. To perform validations for multiple - independent sets of domains, a separate StandaloneAuthenticator - should be instantiated. + .. warning:: + For the StandaloneAuthenticator, because there is no convenient + way to add additional requests, this should only be invoked + once; subsequent invocations are an error. To perform + validations for multiple independent sets of domains, a separate + StandaloneAuthenticator should be instantiated. - :param list chall_list: A list of the the challenge objects to - be attempted by this authenticator. - :returns: A list in the same order containing, in each position, - the successfully configured challenge, False, or None.""" + :param list chall_list: List of namedtuple types defined in + :mod:`letsencrypt.client.challenge_util` (``DvsniChall``, etc.) + + :returns: ACME Challenge DVSNI responses following IAuthenticator + interface. + :rtype: :class:`list` of :class`dict` + + """ if self.child_pid or self.tasks: # We should not be willing to continue with perform # if there were existing pending challenges. @@ -305,17 +327,14 @@ class StandaloneAuthenticator(object): return results_if_failure def cleanup(self, chall_list): - """IAuthenticator interface method cleanup. + """Clean up. - Remove each of the specified challenges from the list of - challenges that still need to be performed. (In the case of - the StandaloneAuthenticator, if some challenges are removed - from the list, the authenticator socket will still respond to - those challenges.) Once all challenges have been removed from - the list, the listener is deactivated and stops listening. + If some challenges are removed from the list, the authenticator + socket will still respond to those challenges. Once all + challenges have been removed from the list, the listener is + deactivated and stops listening. - :param list chall_list: A list of the the challenge objects to - be deactivated.""" + """ # Remove this from pending tasks list for chall in chall_list: assert isinstance(chall, challenge_util.DvsniChall) diff --git a/letsencrypt/client/tests/acme_util.py b/letsencrypt/client/tests/acme_util.py index 2c7baecc1..3fc6565b4 100644 --- a/letsencrypt/client/tests/acme_util.py +++ b/letsencrypt/client/tests/acme_util.py @@ -80,7 +80,6 @@ def gen_combos(challs): """Generate natural combinations for challs.""" dv_chall = [] renewal_chall = [] - combos = [] for i, chall in enumerate(challs): if chall["type"] in constants.DV_CHALLENGES: @@ -89,8 +88,5 @@ def gen_combos(challs): renewal_chall.append(i) # Gen combos for 1 of each type - for i in range(len(dv_chall)): - for j in range(len(renewal_chall)): - combos.append([i, j]) - - return combos + return [[i, j] for i in xrange(len(dv_chall)) + for j in xrange(len(renewal_chall))] diff --git a/letsencrypt/client/tests/apache/dvsni_test.py b/letsencrypt/client/tests/apache/dvsni_test.py index 7fbce9cbb..fee93eb1a 100644 --- a/letsencrypt/client/tests/apache/dvsni_test.py +++ b/letsencrypt/client/tests/apache/dvsni_test.py @@ -10,6 +10,8 @@ from letsencrypt.client import challenge_util from letsencrypt.client import constants from letsencrypt.client import le_util +from letsencrypt.client.apache.obj import Addr + from letsencrypt.client.tests.apache import util @@ -134,7 +136,6 @@ class DvsniPerformTest(util.ApacheTest): self.assertEqual(responses[i]["s"], "randomS%d" % i) def test_mod_config(self): - from letsencrypt.client.apache.obj import Addr for chall in self.challs: self.sni.add_chall(chall) v_addr1 = [Addr(("1.2.3.4", "443")), Addr(("5.6.7.8", "443"))] diff --git a/letsencrypt/client/tests/apache/obj_test.py b/letsencrypt/client/tests/apache/obj_test.py index f78e83bb4..0dccd3afb 100644 --- a/letsencrypt/client/tests/apache/obj_test.py +++ b/letsencrypt/client/tests/apache/obj_test.py @@ -64,3 +64,7 @@ class VirtualHostTest(unittest.TestCase): self.assertEqual(vhost1b, self.vhost1) self.assertEqual(str(vhost1b), str(self.vhost1)) self.assertNotEqual(vhost1b, 1234) + + +if __name__ == "__main__": + unittest.main() diff --git a/letsencrypt/client/tests/auth_handler_test.py b/letsencrypt/client/tests/auth_handler_test.py index 615dfa88b..f102202f8 100644 --- a/letsencrypt/client/tests/auth_handler_test.py +++ b/letsencrypt/client/tests/auth_handler_test.py @@ -174,7 +174,7 @@ class SatisfyChallengesTest(unittest.TestCase): self.assertEqual(len(self.handler.dv_c), 5) self.assertEqual(len(self.handler.client_c), 5) - for i in range(5): + for i in xrange(5): dom = str(i) self.assertEqual( self.handler.responses[dom], @@ -476,7 +476,7 @@ class PathSatisfiedTest(unittest.TestCase): def gen_auth_resp(chall_list): """Generate a dummy authorization response.""" - return ["%s%s" % (type(chall).__name__, chall.domain) + return ["%s%s" % (chall.__class__.__name__, chall.domain) for chall in chall_list] diff --git a/letsencrypt/client/tests/challenge_util_test.py b/letsencrypt/client/tests/challenge_util_test.py index 7c6d1035f..c7848a213 100644 --- a/letsencrypt/client/tests/challenge_util_test.py +++ b/letsencrypt/client/tests/challenge_util_test.py @@ -51,3 +51,7 @@ class DvsniGenCertTest(unittest.TestCase): def _call(cls, name, r_b64, nonce, key): from letsencrypt.client.challenge_util import dvsni_gen_cert return dvsni_gen_cert(name, r_b64, nonce, key) + + +if __name__ == "__main__": + unittest.main() diff --git a/letsencrypt/client/tests/client_authenticator_test.py b/letsencrypt/client/tests/client_authenticator_test.py index b2eff7d28..83a7d50d8 100644 --- a/letsencrypt/client/tests/client_authenticator_test.py +++ b/letsencrypt/client/tests/client_authenticator_test.py @@ -3,6 +3,9 @@ import unittest import mock +from letsencrypt.client import challenge_util +from letsencrypt.client import errors + class PerformTest(unittest.TestCase): """Test client perform function.""" @@ -16,33 +19,27 @@ class PerformTest(unittest.TestCase): name="rec_token_perform", side_effect=gen_client_resp) def test_rec_token1(self): - from letsencrypt.client.challenge_util import RecTokenChall - token = RecTokenChall("0") - + token = challenge_util.RecTokenChall("0") responses = self.auth.perform([token]) - self.assertEqual(responses, ["RecTokenChall0"]) def test_rec_token5(self): - from letsencrypt.client.challenge_util import RecTokenChall tokens = [] - for i in range(5): - tokens.append(RecTokenChall(str(i))) + for i in xrange(5): + tokens.append(challenge_util.RecTokenChall(str(i))) responses = self.auth.perform(tokens) self.assertEqual(len(responses), 5) - for i in range(5): + for i in xrange(5): self.assertEqual(responses[i], "RecTokenChall%d" % i) def test_unexpected(self): - from letsencrypt.client.challenge_util import DvsniChall - from letsencrypt.client.errors import LetsEncryptClientAuthError - - unexpected = DvsniChall("0", "rb64", "123", "invalid_key") + unexpected = challenge_util.DvsniChall( + "0", "rb64", "123", "invalid_key") self.assertRaises( - LetsEncryptClientAuthError, self.auth.perform, [unexpected]) + errors.LetsEncryptClientAuthError, self.auth.perform, [unexpected]) class CleanupTest(unittest.TestCase): @@ -57,9 +54,8 @@ class CleanupTest(unittest.TestCase): self.auth.rec_token.cleanup = self.mock_cleanup def test_rec_token2(self): - from letsencrypt.client.challenge_util import RecTokenChall - token1 = RecTokenChall("0") - token2 = RecTokenChall("1") + token1 = challenge_util.RecTokenChall("0") + token2 = challenge_util.RecTokenChall("1") self.auth.cleanup([token1, token2]) @@ -67,20 +63,16 @@ class CleanupTest(unittest.TestCase): [mock.call(token1), mock.call(token2)]) def test_unexpected(self): - from letsencrypt.client.challenge_util import DvsniChall - from letsencrypt.client.challenge_util import RecTokenChall - from letsencrypt.client.errors import LetsEncryptClientAuthError + token = challenge_util.RecTokenChall("0") + unexpected = challenge_util.DvsniChall("0", "rb64", "123", "dummy_key") - token = RecTokenChall("0") - unexpected = DvsniChall("0", "rb64", "123", "dummy_key") - - self.assertRaises( - LetsEncryptClientAuthError, self.auth.cleanup, [token, unexpected]) + self.assertRaises(errors.LetsEncryptClientAuthError, + self.auth.cleanup, [token, unexpected]) def gen_client_resp(chall): """Generate a dummy response.""" - return "%s%s" % (type(chall).__name__, chall.domain) + return "%s%s" % (chall.__class__.__name__, chall.domain) if __name__ == '__main__': diff --git a/letsencrypt/client/tests/recovery_token_test.py b/letsencrypt/client/tests/recovery_token_test.py index d3d82e8ad..2476b0c34 100644 --- a/letsencrypt/client/tests/recovery_token_test.py +++ b/letsencrypt/client/tests/recovery_token_test.py @@ -6,6 +6,8 @@ import tempfile import mock +from letsencrypt.client import challenge_util + class RecoveryTokenTest(unittest.TestCase): def setUp(self): @@ -31,32 +33,31 @@ class RecoveryTokenTest(unittest.TestCase): self.assertTrue(self.rec_token.requires_human("example3.com")) def test_cleanup(self): - from letsencrypt.client.challenge_util import RecTokenChall self.rec_token.store_token("example3.com", 333) self.assertFalse(self.rec_token.requires_human("example3.com")) - self.rec_token.cleanup(RecTokenChall("example3.com")) + self.rec_token.cleanup(challenge_util.RecTokenChall("example3.com")) self.assertTrue(self.rec_token.requires_human("example3.com")) # Shouldn't throw an error - self.rec_token.cleanup(RecTokenChall("example4.com")) + self.rec_token.cleanup(challenge_util.RecTokenChall("example4.com")) def test_perform_stored(self): - from letsencrypt.client.challenge_util import RecTokenChall self.rec_token.store_token("example4.com", 444) - response = self.rec_token.perform(RecTokenChall("example4.com")) + response = self.rec_token.perform( + challenge_util.RecTokenChall("example4.com")) self.assertEqual(response, {"type": "recoveryToken", "token": "444"}) @mock.patch("letsencrypt.client.recovery_token.zope.component.getUtility") def test_perform_not_stored(self, mock_input): - from letsencrypt.client.challenge_util import RecTokenChall - mock_input().generic_input.side_effect = [(0, "555"), (1, "000")] - response = self.rec_token.perform(RecTokenChall("example5.com")) + response = self.rec_token.perform( + challenge_util.RecTokenChall("example5.com")) self.assertEqual(response, {"type": "recoveryToken", "token": "555"}) - response = self.rec_token.perform(RecTokenChall("example6.com")) + response = self.rec_token.perform( + challenge_util.RecTokenChall("example6.com")) self.assertTrue(response is None) diff --git a/letsencrypt/client/tests/standalone_authenticator_test.py b/letsencrypt/client/tests/standalone_authenticator_test.py index e28ce2c45..a6208e817 100644 --- a/letsencrypt/client/tests/standalone_authenticator_test.py +++ b/letsencrypt/client/tests/standalone_authenticator_test.py @@ -1,14 +1,11 @@ -#!/usr/bin/env python - -"""Tests for standalone_authenticator.py.""" -import mock -import unittest - +"""Tests for letsencrypt.client.standalone_authenticator.""" import os import pkg_resources import signal import socket +import unittest +import mock import OpenSSL.crypto import OpenSSL.SSL @@ -22,7 +19,7 @@ from letsencrypt.client import le_util # after one iteration, based on. # http://igorsobreira.com/2013/03/17/testing-infinite-loops.html -class SocketAcceptOnlyNTimes(object): +class _SocketAcceptOnlyNTimes(object): # pylint: disable=too-few-public-methods """ Callable that will raise `CallableExhausted` @@ -41,6 +38,7 @@ class SocketAcceptOnlyNTimes(object): # Modified here for a single use as socket.accept() return (mock.MagicMock(), "ignored") + class CallableExhausted(Exception): # pylint: disable=too-few-public-methods """Exception raised when a method is called more than the @@ -67,7 +65,7 @@ class SNICallbackTest(unittest.TestCase): self.authenticator = StandaloneAuthenticator() name, r_b64 = "example.com", jose.b64encode("x" * 32) test_key = pkg_resources.resource_string( - __name__, 'testdata/rsa256_key.pem') + __name__, "testdata/rsa256_key.pem") nonce, key = "abcdef", le_util.Key("foo", test_key) self.cert = challenge_util.dvsni_gen_cert(name, r_b64, nonce, key)[0] private_key = OpenSSL.crypto.load_privatekey( @@ -191,7 +189,7 @@ class PerformTest(unittest.TestCase): def test_can_perform(self): """What happens if start_listener() returns True.""" test_key = pkg_resources.resource_string( - __name__, 'testdata/rsa256_key.pem') + __name__, "testdata/rsa256_key.pem") key = le_util.Key("something", test_key) chall1 = challenge_util.DvsniChall( "foo.example.com", "whee", "foononce", key) @@ -218,7 +216,7 @@ class PerformTest(unittest.TestCase): def test_cannot_perform(self): """What happens if start_listener() returns False.""" test_key = pkg_resources.resource_string( - __name__, 'testdata/rsa256_key.pem') + __name__, "testdata/rsa256_key.pem") key = le_util.Key("something", test_key) chall1 = challenge_util.DvsniChall( "foo.example.com", "whee", "foononce", key) @@ -349,7 +347,7 @@ class DoChildProcessTest(unittest.TestCase): self.authenticator = StandaloneAuthenticator() name, r_b64 = "example.com", jose.b64encode("x" * 32) test_key = pkg_resources.resource_string( - __name__, 'testdata/rsa256_key.pem') + __name__, "testdata/rsa256_key.pem") nonce, key = "abcdef", le_util.Key("foo", test_key) self.key = key self.cert = challenge_util.dvsni_gen_cert(name, r_b64, nonce, key)[0] @@ -414,10 +412,10 @@ class DoChildProcessTest(unittest.TestCase): "OpenSSL.SSL.Connection") @mock.patch("letsencrypt.client.standalone_authenticator.socket.socket") @mock.patch("letsencrypt.client.standalone_authenticator.os.kill") - def test_do_child_process_success(self, mock_kill, mock_socket, - mock_connection): + def test_do_child_process_success( + self, mock_kill, mock_socket, mock_connection): sample_socket = mock.MagicMock() - sample_socket.accept.side_effect = SocketAcceptOnlyNTimes(2) + sample_socket.accept.side_effect = _SocketAcceptOnlyNTimes(2) mock_socket.return_value = sample_socket mock_connection.return_value = mock.MagicMock() self.assertRaises( @@ -459,5 +457,5 @@ class CleanupTest(unittest.TestCase): self.assertRaises(ValueError, self.authenticator.cleanup, [chall]) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main()