Update letsencrypt to DVSNI v03 (fixes #597).

This commit is contained in:
Jakub Warmuz 2015-07-17 17:44:20 +00:00
parent 1bb6763595
commit 735bd924bf
No known key found for this signature in database
GPG key ID: 2A7BAD3A489B52EA
11 changed files with 133 additions and 146 deletions

View file

@ -18,7 +18,7 @@ Note, that all annotated challenges act as a proxy objects::
"""
from acme import challenges
from acme.jose import util as jose_util
from acme import jose
from letsencrypt import crypto_util
@ -26,7 +26,7 @@ from letsencrypt import crypto_util
# pylint: disable=too-few-public-methods
class AnnotatedChallenge(jose_util.ImmutableMap):
class AnnotatedChallenge(jose.ImmutableMap):
"""Client annotated challenge.
Wraps around server provided challenge and annotates with data
@ -43,23 +43,41 @@ class AnnotatedChallenge(jose_util.ImmutableMap):
class DVSNI(AnnotatedChallenge):
"""Client annotated "dvsni" ACME challenge."""
__slots__ = ('challb', 'domain', 'key')
"""Client annotated "dvsni" ACME challenge.
:ivar .Account account:
"""
__slots__ = ('challb', 'domain', 'account')
acme_type = challenges.DVSNI
def gen_cert_and_response(self, s=None): # pylint: disable=invalid-name
def gen_cert_and_response(self, key_pem=None, bits=2048, alg=jose.RS256):
"""Generate a DVSNI cert and save it to filepath.
:returns: ``(cert_pem, response)`` tuple, where ``cert_pem`` is the PEM
encoded certificate and ``response`` is an instance
:class:`acme.challenges.DVSNIResponse`.
:param bytes key_pem: Private PEM-encoded key used for
certificate generation. If none provided, a fresh key will
be generated.
:param int bits: Number of bits for fresh key generation.
:param .JWAAlgorithm alg:
:returns: ``(response, cert_pem, key_pem)`` tuple, where
``response`` is an instance of
`acme.challenges.DVSNIResponse`, ``cert_pem`` is the
PEM-encoded certificate and ``key_pem`` is PEM-encoded
private key.
:rtype: tuple
"""
response = challenges.DVSNIResponse(s=s)
cert_pem = crypto_util.make_ss_cert(self.key, [
self.domain, self.nonce_domain, response.z_domain(self.challb)])
return cert_pem, response
key_pem = crypto_util.make_key(bits) if key_pem is None else key_pem
response = challenges.DVSNIResponse(validation=jose.JWS.sign(
payload=self.challb.chall.json_dumps().encode('utf-8'),
alg=alg,
key=self.account.key,
include_jwk=False,
))
cert_pem = crypto_util.make_ss_cert(
key_pem, ["some CN", response.z_domain], force_san=True)
return response, cert_pem, key_pem
class SimpleHTTP(AnnotatedChallenge):

View file

@ -317,7 +317,7 @@ class AuthHandler(object):
challb = self.authzr[domain].body.challenges[index]
chall = challb.chall
achall = challb_to_achall(challb, self.account.key, domain)
achall = challb_to_achall(challb, self.account, domain)
if isinstance(chall, challenges.ContinuityChallenge):
cont_chall.append(achall)
@ -327,15 +327,11 @@ class AuthHandler(object):
return cont_chall, dv_chall
def challb_to_achall(challb, key, domain):
def challb_to_achall(challb, account, domain):
"""Converts a ChallengeBody object to an AnnotatedChallenge.
:param challb: ChallengeBody
:type challb: :class:`acme.messages.ChallengeBody`
:param key: Key
:type key: :class:`letsencrypt.le_util.Key`
:param .ChallengeBody challb: ChallengeBody
:param .Account account:
:param str domain: Domain of the challb
:returns: Appropriate AnnotatedChallenge
@ -347,10 +343,10 @@ def challb_to_achall(challb, key, domain):
if isinstance(chall, challenges.DVSNI):
return achallenges.DVSNI(
challb=challb, domain=domain, key=key)
challb=challb, domain=domain, account=account)
elif isinstance(chall, challenges.SimpleHTTP):
return achallenges.SimpleHTTP(
challb=challb, domain=domain, key=key)
challb=challb, domain=domain, key=account.key)
elif isinstance(chall, challenges.DNS):
return achallenges.DNS(challb=challb, domain=domain)
elif isinstance(chall, challenges.RecoveryToken):

View file

@ -216,10 +216,13 @@ def pyopenssl_load_certificate(data):
def make_ss_cert(key, domains, not_before=None,
validity=(7 * 24 * 60 * 60)):
validity=(7 * 24 * 60 * 60), force_san=False):
"""Returns new self-signed cert in PEM form.
Uses key and contains all domains.
If more than one domain is provided, all of the domains are put into
``subjectAltName`` X.509 extension and first domain is set as the
subject CN. If only one domain is provided no ``subjectAltName``
extension is used, unless `force_san` is ``True``.
"""
if isinstance(key, jose.JWK):
@ -243,7 +246,7 @@ def make_ss_cert(key, domains, not_before=None,
# TODO: what to put into cert.get_subject()?
cert.set_issuer(cert.get_subject())
if len(domains) > 1:
if force_san or len(domains) > 1:
extensions.append(OpenSSL.crypto.X509Extension(
"subjectAltName",
critical=False,
@ -312,7 +315,7 @@ def _get_sans_from_cert_or_req(
def get_sans_from_cert(cert, typ=OpenSSL.crypto.FILETYPE_PEM):
"""Get a list of Subject Alternative Names from a certificate.
:param str csr: Certificate (encoded).
:param str cert: Certificate (encoded).
:param typ: `OpenSSL.crypto.FILETYPE_PEM` or `OpenSSL.crypto.FILETYPE_ASN1`
:returns: A list of Subject Alternative Names.

View file

@ -4,7 +4,6 @@ import pkg_resources
import shutil
import tempfile
from cryptography.hazmat.primitives import serialization
import zope.interface
from acme.jose import util as jose_util
@ -173,17 +172,11 @@ class Dvsni(object):
self.configurator.reverter.register_file_creation(True, key_path)
self.configurator.reverter.register_file_creation(True, cert_path)
cert_pem, response = achall.gen_cert_and_response(s)
response, cert_pem, key_pem = achall.gen_cert_and_response(s)
# Write out challenge cert
# Write out challenge cert and key
with open(cert_path, "wb") as cert_chall_fd:
cert_chall_fd.write(cert_pem)
# Write out challenge key
key_pem = achall.key.key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption())
with le_util.safe_open(key_path, 'wb', chmod=0o400) as key_file:
key_file.write(key_pem)

View file

@ -120,21 +120,12 @@ class DvsniTest(unittest.TestCase):
achalls = [
achallenges.DVSNI(
challb=acme_util.chall_to_challb(
challenges.DVSNI(
r="\x8c\x8a\xbf_-f\\cw\xee\xd6\xf8/\xa5\xe3\xfd\xeb9"
"\xf1\xf5\xb9\xefVM\xc9w\xa4u\x9c\xe1\x87\xb4",
nonce="7\xbc^\xb7]>\x00\xa1\x9bOcU\x84^Z\x18",
), "pending"),
domain="encryption-example.demo", key=auth_key),
challenges.DVSNI(token=b'dvsni1'), "pending"),
domain="encryption-example.demo", account=mock.Mock(key=auth_key)),
achallenges.DVSNI(
challb=acme_util.chall_to_challb(
challenges.DVSNI(
r="\xba\xa9\xda?<m\xaewmx\xea\xad\xadv\xf4\x02\xc9y\x80"
"\xe2_X\t\xe7\xc7\xa4\t\xca\xf7&\x945",
nonce="Y\xed\x01L\xac\x95\xf7pW\xb1\xd7\xa1\xb2\xc5"
"\x96\xba",
), "pending"),
domain="letsencrypt.demo", key=auth_key),
challenges.DVSNI(token=b'dvsni2'), "pending"),
domain="letsencrypt.demo", account=mock.Mock(key=auth_key)),
]
def setUp(self):
@ -153,10 +144,9 @@ class DvsniTest(unittest.TestCase):
# http://www.voidspace.org.uk/python/mock/helpers.html#mock.mock_open
mock_open, mock_safe_open = mock.mock_open(), mock.mock_open()
response = challenges.DVSNIResponse(s="randomS1")
achall = mock.MagicMock(nonce=self.achalls[0].nonce,
nonce_domain=self.achalls[0].nonce_domain)
achall.gen_cert_and_response.return_value = ("pem", response)
response = challenges.DVSNIResponse(validation=mock.Mock())
achall = mock.MagicMock()
achall.gen_cert_and_response.return_value = (response, "cert", "key")
with mock.patch("letsencrypt.plugins.common.open",
mock_open, create=True):
@ -168,11 +158,10 @@ class DvsniTest(unittest.TestCase):
# pylint: disable=no-member
mock_open.assert_called_once_with(self.sni.get_cert_path(achall), "wb")
mock_open.return_value.write.assert_called_once_with("pem")
mock_open.return_value.write.assert_called_once_with("cert")
mock_safe_open.assert_called_once_with(
self.sni.get_key_path(achall), "wb", chmod=0o400)
mock_safe_open.return_value.write.assert_called_once_with(
achall.key.key.private_bytes())
mock_safe_open.return_value.write.assert_called_once_with("key")
if __name__ == "__main__":

View file

@ -6,7 +6,6 @@ import socket
import sys
import time
from cryptography.hazmat.primitives import serialization
import OpenSSL
import zope.component
import zope.interface
@ -14,6 +13,7 @@ import zope.interface
from acme import challenges
from letsencrypt import achallenges
from letsencrypt import crypto_util
from letsencrypt import interfaces
from letsencrypt.plugins import common
@ -28,6 +28,11 @@ class StandaloneAuthenticator(common.Plugin):
the certificate authority. Therefore, it does not rely on any
existing server program.
:param OpenSSL.crypto.PKey private_key: DVSNI challenge certificate
key.
:param sni_names: Mapping from z_domain (`bytes`) to PEM-encoded
certificate (`bytes`).
"""
zope.interface.implements(interfaces.IAuthenticator)
zope.interface.classProvides(interfaces.IPluginFactory)
@ -40,9 +45,12 @@ class StandaloneAuthenticator(common.Plugin):
self.parent_pid = os.getpid()
self.subproc_state = None
self.tasks = {}
self.sni_names = {}
self.sock = None
self.connection = None
self.private_key = None
self.key_pem = crypto_util.make_key(bits=2048)
self.private_key = OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, self.key_pem)
self.ssl_conn = None
def prepare(self):
@ -121,8 +129,8 @@ class StandaloneAuthenticator(common.Plugin):
"""
sni_name = connection.get_servername()
if sni_name in self.tasks:
pem_cert = self.tasks[sni_name]
if sni_name in self.sni_names:
pem_cert = self.sni_names[sni_name]
else:
# TODO: Should we really present a certificate if we get an
# unexpected SNI name? Or should we just disconnect?
@ -179,7 +187,7 @@ class StandaloneAuthenticator(common.Plugin):
return False
def do_child_process(self, port, key):
def do_child_process(self, port):
"""Perform the child process side of the TCP listener task.
This should only be called by :meth:`start_listener`.
@ -189,9 +197,6 @@ class StandaloneAuthenticator(common.Plugin):
handler.
:param int port: Which TCP port to bind.
:param key: The private key to use to respond to DVSNI challenge
requests.
:type key: `letsencrypt.le_util.Key`
"""
signal.signal(signal.SIGINT, self.subproc_signal_handler)
@ -214,11 +219,6 @@ class StandaloneAuthenticator(common.Plugin):
self.sock.listen(1)
# Signal that we've successfully bound TCP port
os.kill(self.parent_pid, signal.SIGIO)
self.private_key = OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, key.key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()))
while True:
self.connection, _ = self.sock.accept()
@ -241,16 +241,13 @@ class StandaloneAuthenticator(common.Plugin):
self.ssl_conn.shutdown()
self.ssl_conn.close()
def start_listener(self, port, key):
def start_listener(self, port):
"""Start listener.
Create a child process which will start a TCP listener on the
specified port to perform the specified DVSNI challenges.
:param int port: The TCP port to bind.
:param key: The private key to use to respond to DVSNI challenge
requests.
:type key: :class:`letsencrypt.le_util.Key`
:returns: ``True`` or ``False`` to indicate success or failure creating
the subprocess.
@ -286,7 +283,7 @@ class StandaloneAuthenticator(common.Plugin):
self.child_pid = os.getpid()
# do_child_process() is normally not expected to return but
# should terminate via sys.exit().
return self.do_child_process(port, key)
return self.do_child_process(port)
def already_listening(self, port): # pylint: disable=no-self-use
"""Check if a process is already listening on the port.
@ -364,12 +361,14 @@ class StandaloneAuthenticator(common.Plugin):
results_if_failure = []
if not achalls or not isinstance(achalls, list):
raise ValueError(".perform() was called without challenge list")
# TODO: "bits" should be user-configurable
for achall in achalls:
if isinstance(achall, achallenges.DVSNI):
# We will attempt to do it
key = achall.key # TODO: bug; one key per start_listener
cert_pem, response = achall.gen_cert_and_response()
self.tasks[achall.nonce_domain] = cert_pem
response, cert_pem, _ = achall.gen_cert_and_response(
key_pem=self.key_pem)
self.sni_names[response.z_domain] = cert_pem
self.tasks[achall.token] = cert_pem
results_if_success.append(response)
results_if_failure.append(None)
else:
@ -388,7 +387,7 @@ class StandaloneAuthenticator(common.Plugin):
return results_if_failure
# Try to do the authentication; note that this creates
# the listener subprocess via os.fork()
if self.start_listener(self.config.dvsni_port, key):
if self.start_listener(self.config.dvsni_port):
return results_if_success
else:
# TODO: This should probably raise a DVAuthError exception
@ -407,8 +406,8 @@ class StandaloneAuthenticator(common.Plugin):
# Remove this from pending tasks list
for achall in achalls:
assert isinstance(achall, achallenges.DVSNI)
if achall.nonce_domain in self.tasks:
del self.tasks[achall.nonce_domain]
if achall.token in self.tasks:
del self.tasks[achall.token]
else:
# Could not find the challenge to remove!
raise ValueError("could not find the challenge to remove")

View file

@ -1,13 +1,10 @@
"""Tests for letsencrypt.plugins.standalone.authenticator."""
import os
import pkg_resources
import psutil
import signal
import socket
import unittest
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import mock
import OpenSSL
@ -17,16 +14,14 @@ from acme import jose
from letsencrypt import achallenges
from letsencrypt.tests import acme_util
from letsencrypt.tests import test_util
KEY_PATH = pkg_resources.resource_filename(
"letsencrypt.tests", os.path.join("testdata", "rsa512_key.pem"))
KEY_DATA = pkg_resources.resource_string(
"letsencrypt.tests", os.path.join("testdata", "rsa512_key.pem"))
KEY = jose.JWKRSA(key=jose.ComparableRSAKey(serialization.load_pem_private_key(
KEY_DATA, password=None, backend=default_backend())))
PRIVATE_KEY = OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, KEY_DATA)
ACCOUNT = mock.Mock(key=jose.JWKRSA.load(
test_util.load_vector("rsa512_key.pem")))
CHALL_KEY_PEM = test_util.load_vector("rsa512_key_2.pem")
CHALL_KEY = OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, CHALL_KEY_PEM)
CONFIG = mock.Mock(dvsni_port=5001)
@ -80,8 +75,9 @@ class SNICallbackTest(unittest.TestCase):
self.authenticator = StandaloneAuthenticator(config=CONFIG, name=None)
self.cert = achallenges.DVSNI(
challb=acme_util.DVSNI_P,
domain="example.com", key=KEY).gen_cert_and_response()[0]
self.authenticator.private_key = PRIVATE_KEY
domain="example.com",
account=ACCOUNT).gen_cert_and_response(key_pem=CHALL_KEY_PEM)[1]
self.authenticator.private_key = CHALL_KEY
self.authenticator.tasks = {"abcdef.acme.invalid": self.cert}
self.authenticator.child_pid = 12345
@ -116,7 +112,7 @@ class ClientSignalHandlerTest(unittest.TestCase):
from letsencrypt.plugins.standalone.authenticator import \
StandaloneAuthenticator
self.authenticator = StandaloneAuthenticator(config=CONFIG, name=None)
self.authenticator.tasks = {"foononce.acme.invalid": "stuff"}
self.authenticator.tasks = {"footoken.acme.invalid": "stuff"}
self.authenticator.child_pid = 12345
def test_client_signal_handler(self):
@ -145,7 +141,7 @@ class SubprocSignalHandlerTest(unittest.TestCase):
from letsencrypt.plugins.standalone.authenticator import \
StandaloneAuthenticator
self.authenticator = StandaloneAuthenticator(config=CONFIG, name=None)
self.authenticator.tasks = {"foononce.acme.invalid": "stuff"}
self.authenticator.tasks = {"footoken.acme.invalid": "stuff"}
self.authenticator.child_pid = 12345
self.authenticator.parent_pid = 23456
@ -303,12 +299,12 @@ class PerformTest(unittest.TestCase):
self.achall1 = achallenges.DVSNI(
challb=acme_util.chall_to_challb(
challenges.DVSNI(r="whee", nonce="foo"), "pending"),
domain="foo.example.com", key=KEY)
challenges.DVSNI(token=b"foo"), "pending"),
domain="foo.example.com", account=ACCOUNT)
self.achall2 = achallenges.DVSNI(
challb=acme_util.chall_to_challb(
challenges.DVSNI(r="whee", nonce="bar"), "pending"),
domain="bar.example.com", key=KEY)
challenges.DVSNI(token=b"bar"), "pending"),
domain="bar.example.com", account=ACCOUNT)
bad_achall = ("This", "Represents", "A Non-DVSNI", "Challenge")
self.achalls = [self.achall1, self.achall2, bad_achall]
@ -326,16 +322,16 @@ class PerformTest(unittest.TestCase):
result = self.authenticator.perform(self.achalls)
self.assertEqual(len(self.authenticator.tasks), 2)
self.assertTrue(
self.authenticator.tasks.has_key(self.achall1.nonce_domain))
self.authenticator.tasks.has_key(self.achall1.token))
self.assertTrue(
self.authenticator.tasks.has_key(self.achall2.nonce_domain))
self.authenticator.tasks.has_key(self.achall2.token))
self.assertTrue(isinstance(result, list))
self.assertEqual(len(result), 3)
self.assertTrue(isinstance(result[0], challenges.ChallengeResponse))
self.assertTrue(isinstance(result[1], challenges.ChallengeResponse))
self.assertFalse(result[2])
self.authenticator.start_listener.assert_called_once_with(
CONFIG.dvsni_port, KEY)
CONFIG.dvsni_port)
def test_cannot_perform(self):
"""What happens if start_listener() returns False."""
@ -345,17 +341,17 @@ class PerformTest(unittest.TestCase):
result = self.authenticator.perform(self.achalls)
self.assertEqual(len(self.authenticator.tasks), 2)
self.assertTrue(
self.authenticator.tasks.has_key(self.achall1.nonce_domain))
self.authenticator.tasks.has_key(self.achall1.token))
self.assertTrue(
self.authenticator.tasks.has_key(self.achall2.nonce_domain))
self.authenticator.tasks.has_key(self.achall2.token))
self.assertTrue(isinstance(result, list))
self.assertEqual(len(result), 3)
self.assertEqual(result, [None, None, False])
self.authenticator.start_listener.assert_called_once_with(
CONFIG.dvsni_port, KEY)
CONFIG.dvsni_port)
def test_perform_with_pending_tasks(self):
self.authenticator.tasks = {"foononce.acme.invalid": "cert_data"}
self.authenticator.tasks = {"footoken.acme.invalid": "cert_data"}
extra_achall = acme_util.DVSNI_P
self.assertRaises(
ValueError, self.authenticator.perform, [extra_achall])
@ -384,7 +380,7 @@ class StartListenerTest(unittest.TestCase):
self.authenticator.do_parent_process = mock.Mock()
self.authenticator.do_parent_process.return_value = True
mock_fork.return_value = 22222
result = self.authenticator.start_listener(1717, "key")
result = self.authenticator.start_listener(1717)
# start_listener is expected to return the True or False return
# value from do_parent_process.
self.assertTrue(result)
@ -396,10 +392,9 @@ class StartListenerTest(unittest.TestCase):
self.authenticator.do_parent_process = mock.Mock()
self.authenticator.do_child_process = mock.Mock()
mock_fork.return_value = 0
self.authenticator.start_listener(1717, "key")
self.authenticator.start_listener(1717)
self.assertEqual(self.authenticator.child_pid, os.getpid())
self.authenticator.do_child_process.assert_called_once_with(
1717, "key")
self.authenticator.do_child_process.assert_called_once_with(1717)
class DoParentProcessTest(unittest.TestCase):
@ -452,9 +447,10 @@ class DoChildProcessTest(unittest.TestCase):
self.authenticator = StandaloneAuthenticator(config=CONFIG, name=None)
self.cert = achallenges.DVSNI(
challb=acme_util.chall_to_challb(
challenges.DVSNI(r=("x" * 32), nonce="abcdef"), "pending"),
domain="example.com", key=KEY).gen_cert_and_response()[0]
self.authenticator.private_key = PRIVATE_KEY
challenges.DVSNI(token=b"abcdef"), "pending"),
domain="example.com", account=ACCOUNT).gen_cert_and_response(
key_pem=CHALL_KEY_PEM)[1]
self.authenticator.private_key = CHALL_KEY
self.authenticator.tasks = {"abcdef.acme.invalid": self.cert}
self.authenticator.parent_pid = 12345
@ -475,7 +471,7 @@ class DoChildProcessTest(unittest.TestCase):
# do_child_process code assumes that calling sys.exit() will
# cause subsequent code not to be executed.)
self.assertRaises(
IndentationError, self.authenticator.do_child_process, 1717, KEY)
IndentationError, self.authenticator.do_child_process, 1717)
mock_exit.assert_called_once_with(1)
mock_kill.assert_called_once_with(12345, signal.SIGUSR2)
@ -490,7 +486,7 @@ class DoChildProcessTest(unittest.TestCase):
sample_socket.bind.side_effect = eaccess
mock_socket.return_value = sample_socket
self.assertRaises(
IndentationError, self.authenticator.do_child_process, 1717, KEY)
IndentationError, self.authenticator.do_child_process, 1717)
mock_exit.assert_called_once_with(1)
mock_kill.assert_called_once_with(12345, signal.SIGUSR1)
@ -506,7 +502,7 @@ class DoChildProcessTest(unittest.TestCase):
sample_socket.bind.side_effect = eio
mock_socket.return_value = sample_socket
self.assertRaises(
socket.error, self.authenticator.do_child_process, 1717, KEY)
socket.error, self.authenticator.do_child_process, 1717)
@mock.patch("letsencrypt.plugins.standalone.authenticator."
"OpenSSL.SSL.Connection")
@ -519,7 +515,7 @@ class DoChildProcessTest(unittest.TestCase):
mock_socket.return_value = sample_socket
mock_connection.return_value = mock.MagicMock()
self.assertRaises(
CallableExhausted, self.authenticator.do_child_process, 1717, KEY)
CallableExhausted, self.authenticator.do_child_process, 1717)
mock_socket.assert_called_once_with()
sample_socket.bind.assert_called_once_with(("0.0.0.0", 1717))
sample_socket.listen.assert_called_once_with(1)
@ -538,9 +534,9 @@ class CleanupTest(unittest.TestCase):
self.authenticator = StandaloneAuthenticator(config=CONFIG, name=None)
self.achall = achallenges.DVSNI(
challb=acme_util.chall_to_challb(
challenges.DVSNI(r="whee", nonce="foononce"), "pending"),
domain="foo.example.com", key="key")
self.authenticator.tasks = {self.achall.nonce_domain: "stuff"}
challenges.DVSNI(token=b"footoken"), "pending"),
domain="foo.example.com", account=mock.Mock(key="key"))
self.authenticator.tasks = {self.achall.token: "stuff"}
self.authenticator.child_pid = 12345
@mock.patch("letsencrypt.plugins.standalone.authenticator.os.kill")
@ -558,8 +554,8 @@ class CleanupTest(unittest.TestCase):
self.assertRaises(
ValueError, self.authenticator.cleanup, [achallenges.DVSNI(
challb=acme_util.chall_to_challb(
challenges.DVSNI(r="whee", nonce="badnonce"), "pending"),
domain="bad.example.com", key="key")])
challenges.DVSNI(token=b"badtoken"), "pending"),
domain="bad.example.com", account=mock.Mock(key="key"))])
class MoreInfoTest(unittest.TestCase):

View file

@ -1,9 +1,9 @@
"""Tests for letsencrypt.achallenges."""
import unittest
import mock
import OpenSSL
from acme import challenges
from acme import jose
from letsencrypt import crypto_util
@ -16,28 +16,20 @@ class DVSNITest(unittest.TestCase):
"""Tests for letsencrypt.achallenges.DVSNI."""
def setUp(self):
self.chall = acme_util.chall_to_challb(
challenges.DVSNI(r="r_value", nonce="12345ABCDE"), "pending")
self.response = challenges.DVSNIResponse()
key = jose.JWKRSA.load(test_util.load_vector("rsa512_key.pem"))
self.challb = acme_util.chall_to_challb(acme_util.DVSNI, "pending")
account = mock.Mock(key=jose.JWKRSA.load(
test_util.load_vector("rsa512_key.pem")))
from letsencrypt.achallenges import DVSNI
self.achall = DVSNI(challb=self.chall, domain="example.com", key=key)
self.achall = DVSNI(
challb=self.challb, domain="example.com", account=account)
def test_proxy(self):
self.assertEqual(self.chall.r, self.achall.r)
self.assertEqual(self.chall.nonce, self.achall.nonce)
self.assertEqual(self.challb.token, self.achall.token)
def test_gen_cert_and_response(self):
cert_pem, _ = self.achall.gen_cert_and_response(s=self.response.s)
cert = OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, cert_pem)
self.assertEqual(cert.get_subject().CN, "example.com")
# pylint: disable=protected-access
self.assertEqual(crypto_util._pyopenssl_cert_or_req_san(cert), [
"example.com", self.chall.nonce_domain,
self.response.z_domain(self.chall)])
response, cert_pem, _ = self.achall.gen_cert_and_response()
self.assertTrue(response.z_domain in crypto_util.get_sans_from_cert(
cert_pem, typ=OpenSSL.crypto.FILETYPE_PEM))
if __name__ == "__main__":

View file

@ -15,8 +15,7 @@ KEY = test_util.load_rsa_private_key('rsa512_key.pem')
SIMPLE_HTTP = challenges.SimpleHTTP(
token="evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ+PCt92wr+oA")
DVSNI = challenges.DVSNI(
r=jose.b64decode("Tyq0La3slT7tqQ0wlOiXnCY2vyez7Zo5blgPJ1xt5xI"),
nonce=jose.b64decode("a82d5ff8ef740d12881f6d3c2277ab2e"))
token=jose.b64decode(b"evaGxfADs6pSRb2LAv9IZf17Dt3juxGJyPCt92wrDoA"))
DNS = challenges.DNS(token="17817c66b60ce2e4012dfad92657527a")
RECOVERY_CONTACT = challenges.RecoveryContact(
activation_url="https://example.ca/sendrecovery/a5bd99383fb0",

View file

@ -196,7 +196,7 @@ class PollChallengesTest(unittest.TestCase):
self.chall_update = {}
for dom in self.doms:
self.chall_update[dom] = [
challb_to_achall(challb, "dummy_key", dom)
challb_to_achall(challb, mock.Mock(key="dummy_key"), dom)
for challb in self.handler.authzr[dom].body.challenges]
@mock.patch("letsencrypt.auth_handler.time")
@ -444,13 +444,13 @@ class ReportFailedChallsTest(unittest.TestCase):
self.dvsni_same = achallenges.DVSNI(
challb=messages.ChallengeBody(**kwargs),# pylint: disable=star-args
domain="example.com",
key=acme_util.KEY)
account=mock.Mock(key=acme_util.KEY))
kwargs["error"] = messages.Error(typ="dnssec", detail="detail")
self.dvsni_diff = achallenges.DVSNI(
challb=messages.ChallengeBody(**kwargs),# pylint: disable=star-args
domain="foo.bar",
key=acme_util.KEY)
account=mock.Mock(key=acme_util.KEY))
@mock.patch("letsencrypt.auth_handler.zope.component.getUtility")
def test_same_error_and_domain(self, mock_zope):

View file

@ -59,7 +59,8 @@ class PerformTest(unittest.TestCase):
def test_unexpected(self):
self.assertRaises(
errors.ContAuthError, self.auth.perform, [
achallenges.DVSNI(challb=None, domain="0", key="invalid_key")])
achallenges.DVSNI(challb=None, domain="0",
account=mock.Mock(key="invalid_key"))])
def test_chall_pref(self):
self.assertEqual(
@ -89,7 +90,8 @@ class CleanupTest(unittest.TestCase):
def test_unexpected(self):
token = achallenges.RecoveryToken(challb=None, domain="0")
unexpected = achallenges.DVSNI(challb=None, domain="0", key="dummy_key")
unexpected = achallenges.DVSNI(
challb=None, domain="0", account=mock.Mock("dummy_key"))
self.assertRaises(
errors.ContAuthError, self.auth.cleanup, [token, unexpected])