prep testing infrastructure

This commit is contained in:
James Kasten 2015-04-19 20:10:40 -07:00
parent 932edbaf75
commit 849415f71b
5 changed files with 83 additions and 76 deletions

View file

@ -58,7 +58,12 @@ class Account(object):
@property
def new_authzr_uri(self): # pylint: disable=missing-docstring
if self.regr is not None:
return self.regr.new_authzr_uri
if self.regr.new_authzr_uri:
return self.regr.new_authzr_uri
else:
# Default: spec says they "may" provide the header
# ugh.. acme-spec #93
return "https://%s/acme/new-authz" % self.config.server
@property
def terms_of_service(self): # pylint: disable=missing-docstring

View file

@ -29,7 +29,8 @@ class AuthHandler(object): # pylint: disable=too-many-instance-attributes
:ivar account: Client's Account
:type account: :class:`letsencrypt.client.account.Account`
:ivar dict authzr: ACME Authorization Resource dict where keys are domains.
:ivar dict authzr: ACME Authorization Resource dict where keys are domains
and values are :class:`letsencrypt.acme.messages2.AuthorizationResource`
:ivar list dv_c: DV challenges in the form of
:class:`letsencrypt.client.achallenges.AnnotatedChallenge`
:ivar list cont_c: Continuity challenges in the
@ -48,11 +49,10 @@ class AuthHandler(object): # pylint: disable=too-many-instance-attributes
self.dv_c = []
self.cont_c = []
def get_authorizations(self, domains, new_authz_uri, best_effort=False):
def get_authorizations(self, domains, best_effort=False):
"""Retrieve all authorizations for challenges.
:param set domains: Domains for authorization
:param str new_authz_uri: Location to get new authorization resources
:param bool best_effort: Whether or not all authorizations are required
(this is useful in renewal)
@ -66,7 +66,7 @@ class AuthHandler(object): # pylint: disable=too-many-instance-attributes
"""
for domain in domains:
self.authzr[domain] = self.network.request_domain_challenges(
domain, new_authz_uri)
domain, self.account.new_authzr_uri)
self._choose_challenges(domains)
# While there are still challenges remaining...
@ -80,6 +80,7 @@ class AuthHandler(object): # pylint: disable=too-many-instance-attributes
return self.authzr.values()
def _choose_challenges(self, domains):
"""Retrieve necessary challenges to satisfy server."""
logging.info("Performing the following challenges:")
for dom in domains:
path = gen_challenge_path(

View file

@ -115,15 +115,7 @@ class Client(object):
"Please register with the ACME server first.")
# Perform Challenges/Get Authorizations
if self.account.new_authzr_uri:
authzr = self.auth_handler.get_authorizations(
domains, self.account.new_authzr_uri)
# This isn't required to be in the registration resource...
# and it isn't standardized... ugh - acme-spec #93
else:
authzr = self.auth_handler.get_authorizations(
domains,
"https://%s/acme/new-authz" % self.config.server)
authzr = self.auth_handler.get_authorizations(domains)
# Create CSR from names
if csr is None:

View file

@ -1,4 +1,6 @@
"""Class helps construct valid ACME messages for testing."""
import datetime
import itertools
import os
import pkg_resources
@ -6,6 +8,7 @@ import Crypto.PublicKey.RSA
from letsencrypt.acme import challenges
from letsencrypt.acme import jose
from letsencrypt.acme import messages2
KEY = jose.HashableRSAKey(Crypto.PublicKey.RSA.importKey(
@ -52,13 +55,13 @@ CONT_CHALLENGES = [chall for chall in CHALLENGES
if isinstance(chall, challenges.ContinuityChallenge)]
def gen_combos(challs):
"""Generate natural combinations for challs."""
def gen_combos(challbs):
"""Generate natural combinations for challbs."""
dv_chall = []
cont_chall = []
for i, chall in enumerate(challs): # pylint: disable=redefined-outer-name
if isinstance(chall, challenges.DVChallenge):
for i, challb in enumerate(challbs): # pylint: disable=redefined-outer-name
if isinstance(challb.chall, challenges.DVChallenge):
dv_chall.append(i)
else:
cont_chall.append(i)
@ -66,3 +69,54 @@ def gen_combos(challs):
# Gen combos for 1 of each type, lowest index first (makes testing easier)
return tuple((i, j) if i < j else (j, i)
for i in dv_chall for j in cont_chall)
def chall_to_challb(chall, status):
"""Return ChallengeBody from Challenge.
:param str status: "valid", "invalid", "pending"...
"""
kwargs = {
"uri": chall.typ+"_uri",
"status": messages2.Status(status),
}
if status == "valid":
kwargs.update({"validated": datetime.datetime.now()})
return messages2.ChallengeBody(**kwargs)
def gen_authzr(authz_status, domain, challs, statuses, combos=True):
"""Generate an authorization resource.
:param str authz_status: "valid", "invalid", "pending"...
:param list challs: Challenge objects
:param list statuses: status of each challenge object e.g. "valid"...
:param bool combos: Whether or not to add combinations
"""
challbs = [
chall_to_challb(chall, status)
for chall, status in itertools.izip(challs, statuses)
]
authz_kwargs = {
"identifier": messages2.Identifier(
type=messages2.IDENTIFIER_FQDN, value=domain),
"challenges": challbs,
}
if combos:
authz_kwargs.update({"combinations": gen_combos(challbs)})
if authz_status == "valid":
now = datetime.datetime.now()
authz_kwargs.update({
"status": messages2.Status(authz_status),
"expires": datetime.datetime(now.year, now.month+1, now.day),
})
return messages2.AuthorizationResource(
uri="https://trusted.ca/new-authz-resource",
new_cert_uri="https://trusted.ca/new-cert",
body=messages2.Authorization(**authz_kwargs)
)

View file

@ -5,10 +5,12 @@ import unittest
import mock
from letsencrypt.acme import challenges
from letsencrypt.acme import messages
from letsencrypt.acme import messages2
from letsencrypt.client import account
from letsencrypt.client import achallenges
from letsencrypt.client import errors
from letsencrypt.client import le_util
from letsencrypt.client.tests import acme_util
@ -23,7 +25,7 @@ TRANSLATE = {
}
class SatisfyChallengesTest(unittest.TestCase):
class SolveChallengesTest(unittest.TestCase):
"""verify_identities test."""
def setUp(self):
@ -39,8 +41,10 @@ class SatisfyChallengesTest(unittest.TestCase):
self.mock_cont_auth.perform.side_effect = gen_auth_resp
self.mock_dv_auth.perform.side_effect = gen_auth_resp
self.account = account.Account(None, le_util.Key("filepath", "pem"))
self.handler = AuthHandler(
self.mock_dv_auth, self.mock_cont_auth, None)
self.mock_dv_auth, self.mock_cont_auth, None, account)
logging.disable(logging.CRITICAL)
@ -48,22 +52,17 @@ class SatisfyChallengesTest(unittest.TestCase):
logging.disable(logging.NOTSET)
def test_name1_dvsni1(self):
# pylint: disable=protected-access
dom = "0"
msg = messages.Challenge(
session_id=dom, nonce="nonce0", combinations=[],
challenges=[acme_util.DVSNI])
self.handler.add_chall_msg(dom, msg, "dummy_key")
# Note:
self.handler.dv_c = []
cont_resp, dv_resp = self.handler._solve_challenges()
self.handler._satisfy_challenges() # pylint: disable=protected-access
self.assertEqual(len(self.handler.responses), 1)
self.assertEqual(len(self.handler.responses[dom]), 1)
self.assertEqual("DVSNI0", self.handler.responses[dom][0])
self.assertEqual(len(self.handler.dv_c), 1)
self.assertEqual(len(self.handler.cont_c), 1)
self.assertEqual(len(self.handler.dv_c[dom]), 1)
self.assertEqual(len(self.handler.cont_c[dom]), 0)
self.assertEqual(len(self.handler.cont_c), 0)
def test_name1_rectok1(self):
dom = "0"
@ -292,7 +291,7 @@ class SatisfyChallengesTest(unittest.TestCase):
for i in xrange(3):
self.handler.add_chall_msg(
str(i),
messages.Challenge(
messages2.Challenge(
session_id=str(i), nonce="nonce%d" % i,
challenges=acme_util.CHALLENGES, combinations=combos),
"dummy_key")
@ -469,50 +468,6 @@ class GetAuthorizationsTest(unittest.TestCase):
self.assertFalse(self.handler.domains)
# pylint: disable=protected-access
class PathSatisfiedTest(unittest.TestCase):
def setUp(self):
from letsencrypt.client.auth_handler import AuthHandler
self.handler = AuthHandler(None, None, None)
def test_satisfied_true(self):
dom = ["0", "1", "2", "3", "4"]
self.handler.paths[dom[0]] = [1, 2]
self.handler.responses[dom[0]] = [None, "sat", "sat2", None]
self.handler.paths[dom[1]] = [0]
self.handler.responses[dom[1]] = ["sat", None, None, False]
self.handler.paths[dom[2]] = [0]
self.handler.responses[dom[2]] = ["sat"]
self.handler.paths[dom[3]] = []
self.handler.responses[dom[3]] = []
self.handler.paths[dom[4]] = []
self.handler.responses[dom[4]] = ["respond... sure"]
for i in xrange(5):
self.assertTrue(self.handler._path_satisfied(dom[i]))
def test_not_satisfied(self):
dom = ["0", "1", "2", "3"]
self.handler.paths[dom[0]] = [1, 2]
self.handler.responses[dom[0]] = ["sat1", None, "sat2", None]
self.handler.paths[dom[1]] = [0]
self.handler.responses[dom[1]] = [None, None, None, None]
self.handler.paths[dom[2]] = [0]
self.handler.responses[dom[2]] = [None]
self.handler.paths[dom[3]] = [0]
self.handler.responses[dom[3]] = [False]
for i in xrange(3):
self.assertFalse(self.handler._path_satisfied(dom[i]))
class GenChallengePathTest(unittest.TestCase):
"""Tests for letsencrypt.client.auth_handler.gen_challenge_path.