Factor out common challengeperformer logic (#5413)

This commit is contained in:
Brad Warren 2018-01-10 18:34:45 -08:00 committed by GitHub
parent 39472f88de
commit 9e95208101
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 67 additions and 29 deletions

View file

@ -16,8 +16,8 @@ from six.moves import xrange # pylint: disable=redefined-builtin, import-error
class TlsSniPerformTest(util.ApacheTest):
"""Test the ApacheTlsSni01 challenge."""
auth_key = common_test.TLSSNI01Test.auth_key
achalls = common_test.TLSSNI01Test.achalls
auth_key = common_test.AUTH_KEY
achalls = common_test.ACHALLS
def setUp(self): # pylint: disable=arguments-differ
super(TlsSniPerformTest, self).setUp()

View file

@ -20,7 +20,7 @@ from certbot_nginx.tests import util
class TlsSniPerformTest(util.NginxTest):
"""Test the NginxTlsSni01 challenge."""
account_key = common_test.TLSSNI01Test.auth_key
account_key = common_test.AUTH_KEY
achalls = [
achallenges.KeyAuthorizationAnnotatedChallenge(
challb=acme_util.chall_to_challb(

View file

@ -315,23 +315,28 @@ class Addr(object):
return result
class TLSSNI01(object):
"""Abstract base for TLS-SNI-01 challenge performers"""
class ChallengePerformer(object):
"""Abstract base for challenge performers.
:ivar configurator: Authenticator and installer plugin
:ivar achalls: Annotated challenges
:vartype achalls: `list` of `.KeyAuthorizationAnnotatedChallenge`
:ivar indices: Holds the indices of challenges from a larger array
so the user of the class doesn't have to.
:vartype indices: `list` of `int`
"""
def __init__(self, configurator):
self.configurator = configurator
self.achalls = []
self.indices = []
self.challenge_conf = os.path.join(
configurator.config.config_dir, "le_tls_sni_01_cert_challenge.conf")
# self.completed = 0
def add_chall(self, achall, idx=None):
"""Add challenge to TLSSNI01 object to perform at once.
"""Store challenge to be performed when perform() is called.
:param .KeyAuthorizationAnnotatedChallenge achall: Annotated
TLSSNI01 challenge.
challenge.
:param int idx: index to challenge in a larger array
"""
@ -339,6 +344,27 @@ class TLSSNI01(object):
if idx is not None:
self.indices.append(idx)
def perform(self):
"""Perform all added challenges.
:returns: challenge respones
:rtype: `list` of `acme.challenges.KeyAuthorizationChallengeResponse`
"""
raise NotImplementedError()
class TLSSNI01(ChallengePerformer):
# pylint: disable=abstract-method
"""Abstract base for TLS-SNI-01 challenge performers"""
def __init__(self, configurator):
super(TLSSNI01, self).__init__(configurator)
self.challenge_conf = os.path.join(
configurator.config.config_dir, "le_tls_sni_01_cert_challenge.conf")
# self.completed = 0
def get_cert_path(self, achall):
"""Returns standardized name for challenge certificate.

View file

@ -18,6 +18,17 @@ from certbot import errors
from certbot.tests import acme_util
from certbot.tests import util as test_util
AUTH_KEY = jose.JWKRSA.load(test_util.load_vector("rsa512_key.pem"))
ACHALLS = [
achallenges.KeyAuthorizationAnnotatedChallenge(
challb=acme_util.chall_to_challb(
challenges.TLSSNI01(token=b'token1'), "pending"),
domain="encryption-example.demo", account_key=AUTH_KEY),
achallenges.KeyAuthorizationAnnotatedChallenge(
challb=acme_util.chall_to_challb(
challenges.TLSSNI01(token=b'token2'), "pending"),
domain="certbot.demo", account_key=AUTH_KEY),
]
class NamespaceFunctionsTest(unittest.TestCase):
"""Tests for certbot.plugins.common.*_namespace functions."""
@ -261,21 +272,27 @@ class AddrTest(unittest.TestCase):
self.assertEqual(set_c, set_d)
class ChallengePerformerTest(unittest.TestCase):
"""Tests for certbot.plugins.common.ChallengePerformer."""
def setUp(self):
configurator = mock.MagicMock()
from certbot.plugins.common import ChallengePerformer
self.performer = ChallengePerformer(configurator)
def test_add_chall(self):
self.performer.add_chall(ACHALLS[0], 0)
self.assertEqual(1, len(self.performer.achalls))
self.assertEqual([0], self.performer.indices)
def test_perform(self):
self.assertRaises(NotImplementedError, self.performer.perform)
class TLSSNI01Test(unittest.TestCase):
"""Tests for certbot.plugins.common.TLSSNI01."""
auth_key = jose.JWKRSA.load(test_util.load_vector("rsa512_key.pem"))
achalls = [
achallenges.KeyAuthorizationAnnotatedChallenge(
challb=acme_util.chall_to_challb(
challenges.TLSSNI01(token=b'token1'), "pending"),
domain="encryption-example.demo", account_key=auth_key),
achallenges.KeyAuthorizationAnnotatedChallenge(
challb=acme_util.chall_to_challb(
challenges.TLSSNI01(token=b'token2'), "pending"),
domain="certbot.demo", account_key=auth_key),
]
def setUp(self):
self.tempdir = tempfile.mkdtemp()
configurator = mock.MagicMock()
@ -288,11 +305,6 @@ class TLSSNI01Test(unittest.TestCase):
def tearDown(self):
shutil.rmtree(self.tempdir)
def test_add_chall(self):
self.sni.add_chall(self.achalls[0], 0)
self.assertEqual(1, len(self.sni.achalls))
self.assertEqual([0], self.sni.indices)
def test_setup_challenge_cert(self):
# This is a helper function that can be used for handling
# open context managers more elegantly. It avoids dealing with
@ -325,7 +337,7 @@ class TLSSNI01Test(unittest.TestCase):
OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key))
def test_get_z_domain(self):
achall = self.achalls[0]
achall = ACHALLS[0]
self.assertEqual(self.sni.get_z_domain(achall),
achall.response(achall.account_key).z_domain.decode("utf-8"))