Merge remote-tracking branch 'github/letsencrypt/master' into acme

Conflicts:
	letsencrypt/client/tests/acme_util.py
This commit is contained in:
Jakub Warmuz 2015-02-12 09:56:53 +00:00
commit a57574cbba
No known key found for this signature in database
GPG key ID: 2A7BAD3A489B52EA
17 changed files with 163 additions and 150 deletions

View file

@ -0,0 +1,5 @@
:mod:`letsencrypt.client.standalone_authenticator`
--------------------------------------------------
.. automodule:: letsencrypt.client.standalone_authenticator
:members:

View file

@ -24,7 +24,8 @@ Ubuntu
::
sudo apt-get install python python-setuptools python-virtualenv python-dev \
gcc swig dialog libaugeas0 libssl-dev ca-certificates
gcc swig dialog libaugeas0 libssl-dev libffi-dev \
ca-certificates
Mac OSX

View file

@ -457,11 +457,11 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
ssl_addr_p = self.aug.match(
addr_match % (ssl_fp, parser.case_i('VirtualHost')))
for i in range(len(ssl_addr_p)):
for addr in ssl_addr_p:
old_addr = obj.Addr.fromstring(
str(self.aug.get(ssl_addr_p[i])))
str(self.aug.get(addr)))
ssl_addr = old_addr.get_addr_obj("443")
self.aug.set(ssl_addr_p[i], str(ssl_addr))
self.aug.set(addr, str(ssl_addr))
ssl_addrs.add(ssl_addr)
# Add directives

View file

@ -77,13 +77,12 @@ class ApacheParser(object):
"""
self.aug.set(aug_conf_path + "/directive[last() + 1]", directive)
if type(arg) is not list:
self.aug.set(aug_conf_path + "/directive[last()]/arg", arg)
if isinstance(arg, list):
for i, value in enumerate(arg, 1):
self.aug.set(
"%s/directive[last()]/arg[%d]" % (aug_conf_path, i), value)
else:
for i in range(len(arg)):
self.aug.set("%s/directive[last()]/arg[%d]" %
(aug_conf_path, (i+1)),
arg[i])
self.aug.set(aug_conf_path + "/directive[last()]/arg", arg)
def find_dir(self, directive, arg=None, start=None):
"""Finds directive in the configuration.
@ -96,7 +95,7 @@ class ApacheParser(object):
Note: Augeas is inherently case sensitive while Apache is case
insensitive. Augeas 1.0 allows case insensitive regexes like
regexp(/Listen/, 'i'), however the version currently supported
regexp(/Listen/, "i"), however the version currently supported
by Ubuntu 0.10 does not. Thus I have included my own case insensitive
transformation by calling case_i() on everything to maintain
compatibility.
@ -119,10 +118,11 @@ class ApacheParser(object):
# No regexp code
# if arg is None:
# matches = self.aug.match(start +
# "//*[self::directive='"+directive+"']/arg")
# "//*[self::directive='" + directive + "']/arg")
# else:
# matches = self.aug.match(start +
# "//*[self::directive='" + directive+"']/* [self::arg='" + arg + "']")
# "//*[self::directive='" + directive +
# "']/* [self::arg='" + arg + "']")
# includes = self.aug.match(start +
# "//* [self::directive='Include']/* [label()='arg']")
@ -313,8 +313,8 @@ class ApacheParser(object):
self.root + "/*/*/*.augsave",
self.root + "/*/*/*~"]
for i in range(len(excl)):
self.aug.set("/augeas/load/Httpd/excl[%d]" % (i+1), excl[i])
for i, excluded in enumerate(excl, 1):
self.aug.set("/augeas/load/Httpd/excl[%d]" % i, excluded)
self.aug.load()

View file

@ -44,7 +44,6 @@ class Client(object):
:type config: :class:`~letsencrypt.client.interfaces.IConfig`
"""
zope.interface.implements(interfaces.IAuthenticator)
def __init__(self, config, authkey, dv_auth, installer):
"""Initialize a client.
@ -388,14 +387,6 @@ def init_csr(privkey, names, cert_dir):
return le_util.CSR(csr_filename, csr_der, "der")
def csr_pem_to_der(csr):
"""Convert pem CSR to der."""
csr_obj = M2Crypto.X509.load_request_string(csr.data)
return le_util.CSR(csr.file, csr_obj.as_der(), "der")
# This should be controlled by commandline parameters
def determine_authenticator(config):
"""Returns a valid IAuthenticator.

View file

@ -36,7 +36,7 @@ List of expected options parameters:
APACHE_MOD_SSL_CONF = pkg_resources.resource_filename(
'letsencrypt.client.apache', 'options-ssl.conf')
"letsencrypt.client.apache", "options-ssl.conf")
"""Path to the Apache mod_ssl config file found in the Let's Encrypt
distribution."""

View file

@ -31,7 +31,7 @@ class IAuthenticator(zope.interface.Interface):
:param list chall_list: List of namedtuple types defined in
:mod:`letsencrypt.client.challenge_util` (``DvsniChall``, etc.).
:returns: Challenge responses or if it cannot be completed then:
:returns: ACME Challenge responses or if it cannot be completed then:
``None``
Authenticator can perform challenge, but can't at this time

View file

@ -424,7 +424,8 @@ class Reverter(object):
# It is possible save checkpoints faster than 1 per second resulting in
# collisions in the naming convention.
cur_time = time.time()
for _ in range(10):
for _ in xrange(10):
final_dir = os.path.join(self.config.backup_dir, str(cur_time))
try:
os.rename(self.config.in_progress_dir, final_dir)

149
letsencrypt/client/standalone_authenticator.py Executable file → Normal file
View file

@ -1,12 +1,4 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""An authenticator that doesn't rely on any existing server program.
This authenticator creates its own ephemeral TCP listener on the specified
port in order to respond to incoming DVSNI challenges from the certificate
authority."""
"""Standalone authenticator."""
import os
import signal
import socket
@ -26,11 +18,14 @@ from letsencrypt.client import interfaces
class StandaloneAuthenticator(object):
# pylint: disable=too-many-instance-attributes
"""The StandaloneAuthenticator class itself.
"""Standalone authenticator.
This authenticator can be invoked by the Let's Encrypt client
according to the IAuthenticator API interface. It creates a local
TCP listener on a specified port and satisfies DVSNI challenges."""
This authenticator creates its own ephemeral TCP listener on the
specified port in order to respond to incoming DVSNI challenges from
the certificate authority. Therefore, it does not rely on any
existing server program.
"""
zope.interface.implements(interfaces.IAuthenticator)
def __init__(self):
@ -49,10 +44,12 @@ class StandaloneAuthenticator(object):
This handler receives inter-process communication from the
child process in the form of Unix signals.
:param int sig: Which signal the process received."""
# subprocess → client READY : SIGIO
# subprocess → client INUSE : SIGUSR1
# subprocess → client CANTBIND: SIGUSR2
:param int sig: Which signal the process received.
"""
# subprocess to client READY: SIGIO
# subprocess to client INUSE: SIGUSR1
# subprocess to client CANTBIND: SIGUSR2
if sig == signal.SIGIO:
self.subproc_state = "ready"
elif sig == signal.SIGUSR1:
@ -69,8 +66,10 @@ class StandaloneAuthenticator(object):
This handler receives inter-process communication from the parent
process in the form of Unix signals.
:param int sig: Which signal the process received."""
# client → subprocess CLEANUP : SIGINT
:param int sig: Which signal the process received.
"""
# client to subprocess CLEANUP : SIGINT
if sig == signal.SIGINT:
try:
self.ssl_conn.shutdown()
@ -91,6 +90,7 @@ class StandaloneAuthenticator(object):
# reported here and none of them should impede us from
# exiting as gracefully as possible.
pass
os.kill(self.parent_pid, signal.SIGUSR1)
sys.exit(0)
@ -101,18 +101,20 @@ class StandaloneAuthenticator(object):
connection when an incoming connection provides an SNI name
(in order to serve the appropriate certificate, if any).
:param OpenSSL.Connection connection: The TLS connection object
on which the SNI extension was received."""
:param connection: The TLS connection object on which the SNI
extension was received.
:type connection: :class:`OpenSSL.Connection`
"""
sni_name = connection.get_servername()
if sni_name in self.tasks:
pem_cert = self.tasks[sni_name]
else:
# TODO: Should we really present a certificate if we get an
# unexpected SNI name? Or should we just disconnect?
# unexpected SNI name? Or should we just disconnect?
pem_cert = self.tasks.values()[0]
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
pem_cert)
cert = OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, pem_cert)
new_ctx = OpenSSL.SSL.Context(OpenSSL.SSL.TLSv1_METHOD)
new_ctx.set_verify(OpenSSL.SSL.VERIFY_NONE, lambda: False)
new_ctx.use_certificate(cert)
@ -122,32 +124,36 @@ class StandaloneAuthenticator(object):
def do_parent_process(self, port, delay_amount=5):
"""Perform the parent process side of the TCP listener task.
This should only be called by start_listener(). We will wait
up to delay_amount seconds to hear from the child process via
a signal.
This should only be called by :meth:`start_listener`. We will
wait up to delay_amount seconds to hear from the child process
via a signal.
:param int port: Which TCP port to bind.
:param float delay_amount: How long in seconds to wait for the
subprocess to notify us whether it succeeded.
subprocess to notify us whether it succeeded.
:returns: True or False according to whether we were notified
that the child process succeeded or failed in binding the port."""
:returns: ``True`` or ``False`` according to whether we were notified
that the child process succeeded or failed in binding the port.
:rtype: bool
"""
signal.signal(signal.SIGIO, self.client_signal_handler)
signal.signal(signal.SIGUSR1, self.client_signal_handler)
signal.signal(signal.SIGUSR2, self.client_signal_handler)
display = zope.component.getUtility(interfaces.IDisplay)
start_time = time.time()
while time.time() < start_time + delay_amount:
if self.subproc_state == "ready":
return True
if self.subproc_state == "inuse":
elif self.subproc_state == "inuse":
display.generic_notification(
"Could not bind TCP port {0} because it is already in "
"use it is already in use by another process on this "
"system (such as a web server).".format(port))
return False
if self.subproc_state == "cantbind":
elif self.subproc_state == "cantbind":
display.generic_notification(
"Could not bind TCP port {0} because you don't have "
"the appropriate permissions (for example, you "
@ -155,23 +161,28 @@ class StandaloneAuthenticator(object):
"root).".format(port))
return False
time.sleep(0.1)
display.generic_notification(
"Subprocess unexpectedly timed out while trying to bind TCP "
"port {0}.".format(port))
return False
def do_child_process(self, port, key):
"""Perform the child process side of the TCP listener task.
This should only be called by start_listener().
This should only be called by :meth:`start_listener`.
Normally does not return; instead, the child process exits from
within this function or from within the child process signal
handler.
:param int port: Which TCP port to bind.
:param le_util.Key key: The private key to use to respond to
DVSNI challenge requests."""
:param key: The private key to use to respond to DVSNI challenge
requests.
:type key: `letsencrypt.client.le_util.Key`
"""
signal.signal(signal.SIGINT, self.subproc_signal_handler)
self.sock = socket.socket()
try:
@ -217,14 +228,20 @@ class StandaloneAuthenticator(object):
self.ssl_conn.close()
def start_listener(self, port, key):
"""Create a child process which will start a TCP listener on the
"""Start listener.
Create a child process which will start a TCP listener on the
specified port to perform the specified DVSNI challenges.
:param int port: The TCP port to bind.
:param le_util.Key key: The private key to use to respond to
DVSNI challenge requests.
:returns: True or False to indicate success or failure creating
the subprocess.
:param key: The private key to use to respond to DVSNI challenge
requests.
:type key: :class:`letsencrypt.client.le_util.Key`
:returns: ``True`` or ``False`` to indicate success or failure creating
the subprocess.
:rtype: bool
"""
fork_result = os.fork()
Crypto.Random.atfork()
@ -243,33 +260,38 @@ class StandaloneAuthenticator(object):
# IAuthenticator method implementations follow
def get_chall_pref(self, unused_domain):
# pylint: disable=no-self-use
"""IAuthenticator interface method get_chall_pref.
def get_chall_pref(self, unused_domain): # pylint: disable=no-self-use
"""Get challenge preferences.
IAuthenticator interface method get_chall_pref.
Return a list of challenge types that this authenticator
can perform for this domain. In the case of the
StandaloneAuthenticator, the only challenge type that can ever
be performed is dvsni.
:returns: A list containing only 'dvsni'."""
:returns: A list containing only 'dvsni'.
"""
return ["dvsni"]
def perform(self, chall_list):
"""IAuthenticator interface method perform.
"""Perform the challenge.
Attempt to perform the
specified challenges, returning the status of each. For the
StandaloneAuthenticator, because there is no convenient way to add
additional requests, this should only be invoked once; subsequent
invocations are an error. To perform validations for multiple
independent sets of domains, a separate StandaloneAuthenticator
should be instantiated.
.. warning::
For the StandaloneAuthenticator, because there is no convenient
way to add additional requests, this should only be invoked
once; subsequent invocations are an error. To perform
validations for multiple independent sets of domains, a separate
StandaloneAuthenticator should be instantiated.
:param list chall_list: A list of the the challenge objects to
be attempted by this authenticator.
:returns: A list in the same order containing, in each position,
the successfully configured challenge, False, or None."""
:param list chall_list: List of namedtuple types defined in
:mod:`letsencrypt.client.challenge_util` (``DvsniChall``, etc.)
:returns: ACME Challenge DVSNI responses following IAuthenticator
interface.
:rtype: :class:`list` of :class`dict`
"""
if self.child_pid or self.tasks:
# We should not be willing to continue with perform
# if there were existing pending challenges.
@ -305,17 +327,14 @@ class StandaloneAuthenticator(object):
return results_if_failure
def cleanup(self, chall_list):
"""IAuthenticator interface method cleanup.
"""Clean up.
Remove each of the specified challenges from the list of
challenges that still need to be performed. (In the case of
the StandaloneAuthenticator, if some challenges are removed
from the list, the authenticator socket will still respond to
those challenges.) Once all challenges have been removed from
the list, the listener is deactivated and stops listening.
If some challenges are removed from the list, the authenticator
socket will still respond to those challenges. Once all
challenges have been removed from the list, the listener is
deactivated and stops listening.
:param list chall_list: A list of the the challenge objects to
be deactivated."""
"""
# Remove this from pending tasks list
for chall in chall_list:
assert isinstance(chall, challenge_util.DvsniChall)

View file

@ -80,7 +80,6 @@ def gen_combos(challs):
"""Generate natural combinations for challs."""
dv_chall = []
renewal_chall = []
combos = []
for i, chall in enumerate(challs):
if chall["type"] in constants.DV_CHALLENGES:
@ -89,8 +88,5 @@ def gen_combos(challs):
renewal_chall.append(i)
# Gen combos for 1 of each type
for i in range(len(dv_chall)):
for j in range(len(renewal_chall)):
combos.append([i, j])
return combos
return [[i, j] for i in xrange(len(dv_chall))
for j in xrange(len(renewal_chall))]

View file

@ -10,6 +10,8 @@ from letsencrypt.client import challenge_util
from letsencrypt.client import constants
from letsencrypt.client import le_util
from letsencrypt.client.apache.obj import Addr
from letsencrypt.client.tests.apache import util
@ -134,7 +136,6 @@ class DvsniPerformTest(util.ApacheTest):
self.assertEqual(responses[i]["s"], "randomS%d" % i)
def test_mod_config(self):
from letsencrypt.client.apache.obj import Addr
for chall in self.challs:
self.sni.add_chall(chall)
v_addr1 = [Addr(("1.2.3.4", "443")), Addr(("5.6.7.8", "443"))]

View file

@ -64,3 +64,7 @@ class VirtualHostTest(unittest.TestCase):
self.assertEqual(vhost1b, self.vhost1)
self.assertEqual(str(vhost1b), str(self.vhost1))
self.assertNotEqual(vhost1b, 1234)
if __name__ == "__main__":
unittest.main()

View file

@ -174,7 +174,7 @@ class SatisfyChallengesTest(unittest.TestCase):
self.assertEqual(len(self.handler.dv_c), 5)
self.assertEqual(len(self.handler.client_c), 5)
for i in range(5):
for i in xrange(5):
dom = str(i)
self.assertEqual(
self.handler.responses[dom],
@ -476,7 +476,7 @@ class PathSatisfiedTest(unittest.TestCase):
def gen_auth_resp(chall_list):
"""Generate a dummy authorization response."""
return ["%s%s" % (type(chall).__name__, chall.domain)
return ["%s%s" % (chall.__class__.__name__, chall.domain)
for chall in chall_list]

View file

@ -51,3 +51,7 @@ class DvsniGenCertTest(unittest.TestCase):
def _call(cls, name, r_b64, nonce, key):
from letsencrypt.client.challenge_util import dvsni_gen_cert
return dvsni_gen_cert(name, r_b64, nonce, key)
if __name__ == "__main__":
unittest.main()

View file

@ -3,6 +3,9 @@ import unittest
import mock
from letsencrypt.client import challenge_util
from letsencrypt.client import errors
class PerformTest(unittest.TestCase):
"""Test client perform function."""
@ -16,33 +19,27 @@ class PerformTest(unittest.TestCase):
name="rec_token_perform", side_effect=gen_client_resp)
def test_rec_token1(self):
from letsencrypt.client.challenge_util import RecTokenChall
token = RecTokenChall("0")
token = challenge_util.RecTokenChall("0")
responses = self.auth.perform([token])
self.assertEqual(responses, ["RecTokenChall0"])
def test_rec_token5(self):
from letsencrypt.client.challenge_util import RecTokenChall
tokens = []
for i in range(5):
tokens.append(RecTokenChall(str(i)))
for i in xrange(5):
tokens.append(challenge_util.RecTokenChall(str(i)))
responses = self.auth.perform(tokens)
self.assertEqual(len(responses), 5)
for i in range(5):
for i in xrange(5):
self.assertEqual(responses[i], "RecTokenChall%d" % i)
def test_unexpected(self):
from letsencrypt.client.challenge_util import DvsniChall
from letsencrypt.client.errors import LetsEncryptClientAuthError
unexpected = DvsniChall("0", "rb64", "123", "invalid_key")
unexpected = challenge_util.DvsniChall(
"0", "rb64", "123", "invalid_key")
self.assertRaises(
LetsEncryptClientAuthError, self.auth.perform, [unexpected])
errors.LetsEncryptClientAuthError, self.auth.perform, [unexpected])
class CleanupTest(unittest.TestCase):
@ -57,9 +54,8 @@ class CleanupTest(unittest.TestCase):
self.auth.rec_token.cleanup = self.mock_cleanup
def test_rec_token2(self):
from letsencrypt.client.challenge_util import RecTokenChall
token1 = RecTokenChall("0")
token2 = RecTokenChall("1")
token1 = challenge_util.RecTokenChall("0")
token2 = challenge_util.RecTokenChall("1")
self.auth.cleanup([token1, token2])
@ -67,20 +63,16 @@ class CleanupTest(unittest.TestCase):
[mock.call(token1), mock.call(token2)])
def test_unexpected(self):
from letsencrypt.client.challenge_util import DvsniChall
from letsencrypt.client.challenge_util import RecTokenChall
from letsencrypt.client.errors import LetsEncryptClientAuthError
token = challenge_util.RecTokenChall("0")
unexpected = challenge_util.DvsniChall("0", "rb64", "123", "dummy_key")
token = RecTokenChall("0")
unexpected = DvsniChall("0", "rb64", "123", "dummy_key")
self.assertRaises(
LetsEncryptClientAuthError, self.auth.cleanup, [token, unexpected])
self.assertRaises(errors.LetsEncryptClientAuthError,
self.auth.cleanup, [token, unexpected])
def gen_client_resp(chall):
"""Generate a dummy response."""
return "%s%s" % (type(chall).__name__, chall.domain)
return "%s%s" % (chall.__class__.__name__, chall.domain)
if __name__ == '__main__':

View file

@ -6,6 +6,8 @@ import tempfile
import mock
from letsencrypt.client import challenge_util
class RecoveryTokenTest(unittest.TestCase):
def setUp(self):
@ -31,32 +33,31 @@ class RecoveryTokenTest(unittest.TestCase):
self.assertTrue(self.rec_token.requires_human("example3.com"))
def test_cleanup(self):
from letsencrypt.client.challenge_util import RecTokenChall
self.rec_token.store_token("example3.com", 333)
self.assertFalse(self.rec_token.requires_human("example3.com"))
self.rec_token.cleanup(RecTokenChall("example3.com"))
self.rec_token.cleanup(challenge_util.RecTokenChall("example3.com"))
self.assertTrue(self.rec_token.requires_human("example3.com"))
# Shouldn't throw an error
self.rec_token.cleanup(RecTokenChall("example4.com"))
self.rec_token.cleanup(challenge_util.RecTokenChall("example4.com"))
def test_perform_stored(self):
from letsencrypt.client.challenge_util import RecTokenChall
self.rec_token.store_token("example4.com", 444)
response = self.rec_token.perform(RecTokenChall("example4.com"))
response = self.rec_token.perform(
challenge_util.RecTokenChall("example4.com"))
self.assertEqual(response, {"type": "recoveryToken", "token": "444"})
@mock.patch("letsencrypt.client.recovery_token.zope.component.getUtility")
def test_perform_not_stored(self, mock_input):
from letsencrypt.client.challenge_util import RecTokenChall
mock_input().generic_input.side_effect = [(0, "555"), (1, "000")]
response = self.rec_token.perform(RecTokenChall("example5.com"))
response = self.rec_token.perform(
challenge_util.RecTokenChall("example5.com"))
self.assertEqual(response, {"type": "recoveryToken", "token": "555"})
response = self.rec_token.perform(RecTokenChall("example6.com"))
response = self.rec_token.perform(
challenge_util.RecTokenChall("example6.com"))
self.assertTrue(response is None)

View file

@ -1,14 +1,11 @@
#!/usr/bin/env python
"""Tests for standalone_authenticator.py."""
import mock
import unittest
"""Tests for letsencrypt.client.standalone_authenticator."""
import os
import pkg_resources
import signal
import socket
import unittest
import mock
import OpenSSL.crypto
import OpenSSL.SSL
@ -22,7 +19,7 @@ from letsencrypt.client import le_util
# after one iteration, based on.
# http://igorsobreira.com/2013/03/17/testing-infinite-loops.html
class SocketAcceptOnlyNTimes(object):
class _SocketAcceptOnlyNTimes(object):
# pylint: disable=too-few-public-methods
"""
Callable that will raise `CallableExhausted`
@ -41,6 +38,7 @@ class SocketAcceptOnlyNTimes(object):
# Modified here for a single use as socket.accept()
return (mock.MagicMock(), "ignored")
class CallableExhausted(Exception):
# pylint: disable=too-few-public-methods
"""Exception raised when a method is called more than the
@ -67,7 +65,7 @@ class SNICallbackTest(unittest.TestCase):
self.authenticator = StandaloneAuthenticator()
name, r_b64 = "example.com", jose.b64encode("x" * 32)
test_key = pkg_resources.resource_string(
__name__, 'testdata/rsa256_key.pem')
__name__, "testdata/rsa256_key.pem")
nonce, key = "abcdef", le_util.Key("foo", test_key)
self.cert = challenge_util.dvsni_gen_cert(name, r_b64, nonce, key)[0]
private_key = OpenSSL.crypto.load_privatekey(
@ -191,7 +189,7 @@ class PerformTest(unittest.TestCase):
def test_can_perform(self):
"""What happens if start_listener() returns True."""
test_key = pkg_resources.resource_string(
__name__, 'testdata/rsa256_key.pem')
__name__, "testdata/rsa256_key.pem")
key = le_util.Key("something", test_key)
chall1 = challenge_util.DvsniChall(
"foo.example.com", "whee", "foononce", key)
@ -218,7 +216,7 @@ class PerformTest(unittest.TestCase):
def test_cannot_perform(self):
"""What happens if start_listener() returns False."""
test_key = pkg_resources.resource_string(
__name__, 'testdata/rsa256_key.pem')
__name__, "testdata/rsa256_key.pem")
key = le_util.Key("something", test_key)
chall1 = challenge_util.DvsniChall(
"foo.example.com", "whee", "foononce", key)
@ -349,7 +347,7 @@ class DoChildProcessTest(unittest.TestCase):
self.authenticator = StandaloneAuthenticator()
name, r_b64 = "example.com", jose.b64encode("x" * 32)
test_key = pkg_resources.resource_string(
__name__, 'testdata/rsa256_key.pem')
__name__, "testdata/rsa256_key.pem")
nonce, key = "abcdef", le_util.Key("foo", test_key)
self.key = key
self.cert = challenge_util.dvsni_gen_cert(name, r_b64, nonce, key)[0]
@ -414,10 +412,10 @@ class DoChildProcessTest(unittest.TestCase):
"OpenSSL.SSL.Connection")
@mock.patch("letsencrypt.client.standalone_authenticator.socket.socket")
@mock.patch("letsencrypt.client.standalone_authenticator.os.kill")
def test_do_child_process_success(self, mock_kill, mock_socket,
mock_connection):
def test_do_child_process_success(
self, mock_kill, mock_socket, mock_connection):
sample_socket = mock.MagicMock()
sample_socket.accept.side_effect = SocketAcceptOnlyNTimes(2)
sample_socket.accept.side_effect = _SocketAcceptOnlyNTimes(2)
mock_socket.return_value = sample_socket
mock_connection.return_value = mock.MagicMock()
self.assertRaises(
@ -459,5 +457,5 @@ class CleanupTest(unittest.TestCase):
self.assertRaises(ValueError, self.authenticator.cleanup, [chall])
if __name__ == '__main__':
if __name__ == "__main__":
unittest.main()