Use acme.challenges in client

This commit is contained in:
Jakub Warmuz 2015-02-13 22:37:45 +00:00
parent 9ba69f8878
commit 97bf10120c
No known key found for this signature in database
GPG key ID: 2A7BAD3A489B52EA
21 changed files with 742 additions and 762 deletions

View file

@ -0,0 +1,5 @@
:mod:`letsencrypt.client.achallenges`
-------------------------------------
.. automodule:: letsencrypt.client.achallenges
:members:

View file

@ -1,5 +0,0 @@
:mod:`letsencrypt.client.challenge_util`
----------------------------------------
.. automodule:: letsencrypt.client.challenge_util
:members:

View file

@ -0,0 +1,102 @@
"""Client annotated ACME challenges.
Please use names such as ``achall`` and ``ichall`` (respectively ``achalls``
and ``ichalls`` for collections) to distiguish from variables "of type"
:class:`letsencrypt.acme.challenges.Challenge` (denoted by ``chall``)::
from letsencrypt.acme import challenges
from letsencrypt.client import achallenges
chall = challenges.DNS(token='foo')
achall = achallenges.DNS(chall=chall, domain='example.com')
ichall = achallenges.Indexed(achall=achall, index=0)
Note, that all annotated challenges act as a proxy objects::
ichall.token == achall.token == chall.token
"""
from letsencrypt.acme import challenges
from letsencrypt.acme import util as acme_util
from letsencrypt.client import crypto_util
# pylint: disable=too-few-public-methods
class AnnotatedChallenge(acme_util.ImmutableMap):
"""Client annotated challenge.
Wraps around :class:`~letsencrypt.acme.challenges.Challenge` and
annotates with data usfeul for the client.
"""
acme_type = NotImplemented
def __getattr__(self, name):
return getattr(self.chall, name)
class DVSNI(AnnotatedChallenge):
"""Client annotated "dvsni" ACME challenge."""
__slots__ = ('chall', 'domain', 'key')
acme_type = challenges.DVSNI
def gen_cert_and_response(self, s=None): # pylint: disable=invalid-name
"""Generate a DVSNI cert and save it to filepath.
:returns: ``(cert_pem, response)`` tuple, where ``cert_pem`` is the PEM
encoded certificate and ``response`` is an instance
:class:`letsencrypt.acme.challenges.DVSNIResponse`.
:rtype: tuple
"""
response = challenges.DVSNIResponse(s=s)
cert_pem = crypto_util.make_ss_cert(self.key.pem, [
self.nonce_domain, self.domain, response.z_domain(self.chall)])
return cert_pem, response
class SimpleHTTPS(AnnotatedChallenge):
"""Client annotated "simpleHttps" ACME challenge."""
__slots__ = ('chall', 'domain', 'key')
acme_type = challenges.SimpleHTTPS
class DNS(AnnotatedChallenge):
"""Client annotated "dns" ACME challenge."""
__slots__ = ('chall', 'domain')
acme_type = challenges.DNS
class RecoveryContact(AnnotatedChallenge):
"""Client annotated "recoveryContact" ACME challenge."""
__slots__ = ('chall', 'domain')
acme_type = challenges.RecoveryContact
class RecoveryToken(AnnotatedChallenge):
"""Client annotated "recoveryToken" ACME challenge."""
__slots__ = ('chall', 'domain')
acme_type = challenges.RecoveryToken
class ProofOfPossession(AnnotatedChallenge):
"""Client annotated "proofOfPossession" ACME challenge."""
__slots__ = ('chall', 'domain')
acme_type = challenges.ProofOfPossession
class Indexed(acme_util.ImmutableMap):
"""Indexed and annotated ACME challenge.
Wraps around :class:`AnnotatedChallenge` and annotates with an
``index`` in order to maintain the proper position of the response
within a larger challenge list.
"""
__slots__ = ('achall', 'index')
def __getattr__(self, name):
return getattr(self.achall, name)

View file

@ -9,8 +9,10 @@ import sys
import zope.interface
from letsencrypt.acme import challenges
from letsencrypt.client import achallenges
from letsencrypt.client import augeas_configurator
from letsencrypt.client import challenge_util
from letsencrypt.client import constants
from letsencrypt.client import errors
from letsencrypt.client import interfaces
@ -971,34 +973,26 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
###########################################################################
def get_chall_pref(self, unused_domain): # pylint: disable=no-self-use
"""Return list of challenge preferences."""
return ["dvsni"]
return [challenges.DVSNI]
def perform(self, chall_list):
def perform(self, achalls):
"""Perform the configuration related challenge.
This function currently assumes all challenges will be fulfilled.
If this turns out not to be the case in the future. Cleanup and
outstanding challenges will have to be designed better.
:param list chall_list: List of challenges to be
fulfilled by configurator.
:returns: list of responses. All responses are returned in the same
order as received by the perform function. A None response
indicates the challenge was not perfromed.
:rtype: list
"""
self._chall_out += len(chall_list)
responses = [None] * len(chall_list)
self._chall_out += len(achalls)
responses = [None] * len(achalls)
apache_dvsni = dvsni.ApacheDvsni(self)
for i, chall in enumerate(chall_list):
if isinstance(chall, challenge_util.DvsniChall):
for i, achall in enumerate(achalls):
if isinstance(achall, achallenges.DVSNI):
# Currently also have dvsni hold associated index
# of the challenge. This helps to put all of the responses back
# together when they are all complete.
apache_dvsni.add_chall(chall, i)
apache_dvsni.add_chall(achall, i)
sni_response = apache_dvsni.perform()
# Must restart in order to activate the challenges.
@ -1013,9 +1007,9 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
return responses
def cleanup(self, chall_list):
def cleanup(self, achalls):
"""Revert all challenges."""
self._chall_out -= len(chall_list)
self._chall_out -= len(achalls)
# If all of the challenges have been finished, clean up everything
if self._chall_out <= 0:

View file

@ -2,9 +2,6 @@
import logging
import os
from letsencrypt.client import challenge_util
from letsencrypt.client import constants
from letsencrypt.client.apache import parser
@ -15,18 +12,14 @@ class ApacheDvsni(object):
:type configurator:
:class:`letsencrypt.client.apache.configurator.ApacheConfigurator`
:ivar dvsni_chall: Data required for challenges.
where DvsniChall tuples have the following fields
`domain` (`str`), `r_b64` (base64 `str`), `nonce` (hex `str`)
`key` (:class:`letsencrypt.client.le_util.Key`)
:type dvsni_chall: `list` of
:class:`letsencrypt.client.challenge_util.DvsniChall`
:ivar list achalls: Annotated :class:`~letsencrypt.client.achallenges.DVSNI`
challenges.
:param list indicies: Meant to hold indices of challenges in a
larger array. ApacheDvsni is capable of solving many challenges
at once which causes an indexing issue within ApacheConfigurator
who must return all responses in order. Imagine ApacheConfigurator
maintaining state about where all of the SimpleHttps Challenges,
maintaining state about where all of the SimpleHTTPS Challenges,
Dvsni Challenges belong in the response array. This is an optional
utility.
@ -35,28 +28,28 @@ class ApacheDvsni(object):
"""
def __init__(self, configurator):
self.configurator = configurator
self.dvsni_chall = []
self.achalls = []
self.indices = []
self.challenge_conf = os.path.join(
configurator.config.config_dir, "le_dvsni_cert_challenge.conf")
# self.completed = 0
def add_chall(self, chall, idx=None):
def add_chall(self, achall, idx=None):
"""Add challenge to DVSNI object to perform at once.
:param chall: DVSNI challenge info
:type chall: :class:`letsencrypt.client.challenge_util.DvsniChall`
:param achall: Annotated DVSNI challenge.
:type achall: :class:`letsencrypt.client.achallenges.DVSNI`
:param int idx: index to challenge in a larger array
"""
self.dvsni_chall.append(chall)
self.achalls.append(achall)
if idx is not None:
self.indices.append(idx)
def perform(self):
"""Peform a DVSNI challenge."""
if not self.dvsni_chall:
if not self.achalls:
return None
# Save any changes to the configuration as a precaution
# About to make temporary changes to the config
@ -64,12 +57,12 @@ class ApacheDvsni(object):
addresses = []
default_addr = "*:443"
for chall in self.dvsni_chall:
vhost = self.configurator.choose_vhost(chall.domain)
for achall in self.achalls:
vhost = self.configurator.choose_vhost(achall.domain)
if vhost is None:
logging.error(
"No vhost exists with servername or alias of: %s",
chall.domain)
achall.domain)
logging.error("No _default_:443 vhost exists")
logging.error("Please specify servernames in the Apache config")
return None
@ -87,9 +80,8 @@ class ApacheDvsni(object):
responses = []
# Create all of the challenge certs
for chall in self.dvsni_chall:
s_b64 = self._setup_challenge_cert(chall)
responses.append({"type": "dvsni", "s": s_b64})
for achall in self.achalls:
responses.append(self._setup_challenge_cert(achall))
# Setup the configuration
self._mod_config(addresses)
@ -99,20 +91,20 @@ class ApacheDvsni(object):
return responses
def _setup_challenge_cert(self, chall):
def _setup_challenge_cert(self, achall, s=None):
# pylint: disable=invalid-name
"""Generate and write out challenge certificate."""
cert_path = self.get_cert_file(chall.nonce)
cert_path = self.get_cert_file(achall)
# Register the path before you write out the file
self.configurator.reverter.register_file_creation(True, cert_path)
cert_pem, s_b64 = challenge_util.dvsni_gen_cert(
chall.domain, chall.r_b64, chall.nonce, chall.key)
cert_pem, response = achall.gen_cert_and_response(s)
# Write out challenge cert
with open(cert_path, 'w') as cert_chall_fd:
cert_chall_fd.write(cert_pem)
return s_b64
return response
def _mod_config(self, ll_addrs):
"""Modifies Apache config files to include challenge vhosts.
@ -126,9 +118,7 @@ class ApacheDvsni(object):
# TODO: Use ip address of existing vhost instead of relying on FQDN
config_text = "<IfModule mod_ssl.c>\n"
for idx, lis in enumerate(ll_addrs):
config_text += self._get_config_text(
self.dvsni_chall[idx].nonce, lis,
self.dvsni_chall[idx].key.file)
config_text += self._get_config_text(self.achalls[idx], lis)
config_text += "</IfModule>\n"
self._conf_include_check(self.configurator.parser.loc["default"])
@ -154,13 +144,14 @@ class ApacheDvsni(object):
parser.get_aug_path(main_config),
"Include", self.challenge_conf)
def _get_config_text(self, nonce, ip_addrs, dvsni_key_file):
def _get_config_text(self, achall, ip_addrs):
"""Chocolate virtual server configuration text
:param str nonce: hex form of nonce
:param achall: Annotated DVSNI challenge.
:type achall: :class:`letsencrypt.client.achallenges.DVSNI`
:param list ip_addrs: addresses of challenged domain
:class:`list` of type :class:`letsencrypt.client.apache.obj.Addr`
:param str dvsni_key_file: Path to key file
:returns: virtual host configuration text
:rtype: str
@ -170,26 +161,28 @@ class ApacheDvsni(object):
document_root = os.path.join(
self.configurator.config.config_dir, "dvsni_page/")
return ("<VirtualHost " + ips + ">\n"
"ServerName " + nonce + constants.DVSNI_DOMAIN_SUFFIX + "\n"
"ServerName " + achall.nonce_domain + "\n"
"UseCanonicalName on\n"
"SSLStrictSNIVHostCheck on\n"
"\n"
"LimitRequestBody 1048576\n"
"\n"
"Include " + self.configurator.parser.loc["ssl_options"] + "\n"
"SSLCertificateFile " + self.get_cert_file(nonce) + "\n"
"SSLCertificateKeyFile " + dvsni_key_file + "\n"
"SSLCertificateFile " + self.get_cert_file(achall) + "\n"
"SSLCertificateKeyFile " + achall.key.file + "\n"
"\n"
"DocumentRoot " + document_root + "\n"
"</VirtualHost>\n\n")
def get_cert_file(self, nonce):
def get_cert_file(self, achall):
"""Returns standardized name for challenge certificate.
:param str nonce: hex form of nonce
:param achall: Annotated DVSNI challenge.
:type achall: :class:`letsencrypt.client.achallenges.DVSNI`
:returns: certificate file name
:rtype: str
"""
return os.path.join(self.configurator.config.work_dir, nonce + ".crt")
return os.path.join(
self.configurator.config.work_dir, achall.nonce_domain + ".crt")

View file

@ -4,9 +4,10 @@ import sys
import Crypto.PublicKey.RSA
from letsencrypt.acme import challenges
from letsencrypt.acme import messages
from letsencrypt.client import challenge_util
from letsencrypt.client import achallenges
from letsencrypt.client import constants
from letsencrypt.client import errors
@ -29,13 +30,14 @@ class AuthHandler(object): # pylint: disable=too-many-instance-attributes
:ivar list domains: list of str domains to get authorization
:ivar dict authkey: Authorized Keys for each domain.
values are of type :class:`letsencrypt.client.le_util.Key`
:ivar dict responses: keys: domain, values: list of dict responses
:ivar dict msgs: ACME Challenge messages with domain as a key
:ivar dict responses: keys: domain, values: list of responses
(:class:`letsencrypt.acme.challenges.ChallengeResponse`.
:ivar dict msgs: ACME Challenge messages with domain as a key.
:ivar dict paths: optimal path for authorization. eg. paths[domain]
:ivar dict dv_c: Keys - domain, Values are DV challenges in the form of
:class:`letsencrypt.client.challenge_util.IndexedChall`
:class:`letsencrypt.client.achallenges.Indexed`
:ivar dict client_c: Keys - domain, Values are Client challenges in the form
of :class:`letsencrypt.client.challenge_util.IndexedChall`
of :class:`letsencrypt.client.achallenges.Indexed`
"""
def __init__(self, dv_auth, client_auth, network):
@ -69,7 +71,7 @@ class AuthHandler(object): # pylint: disable=too-many-instance-attributes
"Multiple ACMEChallengeMessages for the same domain "
"is not supported.")
self.domains.append(domain)
self.responses[domain] = ["null"] * len(msg.challenges)
self.responses[domain] = [None] * len(msg.challenges)
self.msgs[domain] = msg
self.authkey[domain] = authkey
@ -155,8 +157,8 @@ class AuthHandler(object): # pylint: disable=too-many-instance-attributes
flat_dv = []
for dom in self.domains:
flat_client.extend(ichall.chall for ichall in self.client_c[dom])
flat_dv.extend(ichall.chall for ichall in self.dv_c[dom])
flat_client.extend(ichall.achall for ichall in self.client_c[dom])
flat_dv.extend(ichall.achall for ichall in self.dv_c[dom])
client_resp = []
dv_resp = []
@ -185,12 +187,12 @@ class AuthHandler(object): # pylint: disable=too-many-instance-attributes
self._assign_responses(dv_resp, self.dv_c)
def _assign_responses(self, flat_list, ichall_dict):
"""Assign responses from flat_list back to the IndexedChall dicts.
"""Assign responses from flat_list back to the Indexed dicts.
:param list flat_list: flat_list of responses from an IAuthenticator
:param dict ichall_dict: Master dict mapping all domains to a list of
their associated 'client' and 'dv' IndexedChallenges, or their
:class:`letsencrypt.client.challenge_util.IndexedChall` list
their associated 'client' and 'dv' Indexed challengesenges, or their
:class:`letsencrypt.client.achallenges.Indexed` list
"""
flat_index = 0
@ -201,9 +203,7 @@ class AuthHandler(object): # pylint: disable=too-many-instance-attributes
def _path_satisfied(self, dom):
"""Returns whether a path has been completely satisfied."""
return all(
None != self.responses[dom][i] and "null" != self.responses[dom][i]
for i in self.paths[dom])
return all(self.responses[dom][i] is not None for i in self.paths[dom])
def _get_chall_pref(self, domain):
"""Return list of challenge preferences.
@ -226,8 +226,8 @@ class AuthHandler(object): # pylint: disable=too-many-instance-attributes
# These are indexed challenges... give just the challenges to the auth
# Chose to make these lists instead of a generator to make it easier to
# work with...
dv_list = [ichall.chall for ichall in self.dv_c[domain]]
client_list = [ichall.chall for ichall in self.client_c[domain]]
dv_list = [ichall.achall for ichall in self.dv_c[domain]]
client_list = [ichall.achall for ichall in self.client_c[domain]]
if dv_list:
self.dv_auth.cleanup(dv_list)
if client_list:
@ -259,156 +259,99 @@ class AuthHandler(object): # pylint: disable=too-many-instance-attributes
:param list path: List of indices from `challenges`.
:returns: dv_chall, list of
:class:`letsencrypt.client.challenge_util.IndexedChall`
:class:`letsencrypt.client.achallenges.Indexed`
client_chall, list of
:class:`letsencrypt.client.challenge_util.IndexedChall`
:class:`letsencrypt.client.achallenges.Indexed`
:rtype: tuple
:raises errors.LetsEncryptClientError: If Challenge type is not
recognized
"""
challenges = self.msgs[domain].challenges
dv_chall = []
client_chall = []
for index in path:
chall = challenges[index]
chall = self.msgs[domain].challenges[index]
# Authenticator Challenges
if chall["type"] in constants.DV_CHALLENGES:
dv_chall.append(challenge_util.IndexedChall(
self._construct_dv_chall(chall, domain), index))
if isinstance(chall, challenges.DVSNI):
logging.info("DVSNI challenge for %s.", domain)
achall = achallenges.DVSNI(
chall=chall, domain=domain, key=self.authkey[domain])
elif isinstance(chall, challenges.SimpleHTTPS):
logging.info("SimpleHTTPS challenge for %s.", domain)
achall = achallenges.SimpleHTTPS(
chall=chall, domain=domain, key=self.authkey[domain])
elif isinstance(chall, challenges.DNS):
logging.info("DNS challenge for %s.", domain)
achall = achallenges.DNS(chall=chall, domain=domain)
# Client Challenges
elif chall["type"] in constants.CLIENT_CHALLENGES:
client_chall.append(challenge_util.IndexedChall(
self._construct_client_chall(chall, domain), index))
elif isinstance(chall, challenges.RecoveryToken):
logging.info("Recovery Token Challenge for %s.", domain)
achall = achallenges.RecoveryToken(chall=chall, domain=domain)
elif isinstance(chall, challenges.RecoveryContact):
logging.info("Recovery Contact Challenge for %s.", domain)
achall = achallenges.RecoveryContact(chall=chall, domain=domain)
elif isinstance(chall, challenges.ProofOfPossession):
logging.info("Proof-of-Possession Challenge for %s", domain)
achall = achallenges.ProofOfPossession(
chall=chall, domain=domain)
else:
raise errors.LetsEncryptClientError(
"Received unrecognized challenge of type: "
"%s" % chall["type"])
"Received unsupported challenge of type: "
"%s" % chall.acme_type)
ichall = achallenges.Indexed(achall=achall, index=index)
if isinstance(chall, challenges.ClientChallenge):
client_chall.append(ichall)
elif isinstance(chall, challenges.DVChallenge):
dv_chall.append(ichall)
return dv_chall, client_chall
def _construct_dv_chall(self, chall, domain):
"""Construct Auth Type Challenges.
:param dict chall: Single challenge
:param str domain: challenge's domain
:returns: challenge_util named tuple Chall object
:rtype: `collections.namedtuple`
:raises errors.LetsEncryptClientError: If unimplemented challenge exists
"""
if chall["type"] == "dvsni":
logging.info(" DVSNI challenge for name %s.", domain)
return challenge_util.DvsniChall(
domain, str(chall["r"]), str(chall["nonce"]),
self.authkey[domain])
elif chall["type"] == "simpleHttps":
logging.info(" SimpleHTTPS challenge for name %s.", domain)
return challenge_util.SimpleHttpsChall(
domain, str(chall["token"]), self.authkey[domain])
elif chall["type"] == "dns":
logging.info(" DNS challenge for name %s.", domain)
return challenge_util.DnsChall(domain, str(chall["token"]))
else:
raise errors.LetsEncryptClientError(
"Unimplemented Auth Challenge: %s" % chall["type"])
def _construct_client_chall(self, chall, domain): # pylint: disable=no-self-use
"""Construct Client Type Challenges.
:param dict chall: Single challenge
:param str domain: challenge's domain
:returns: challenge_util named tuple Chall object
:rtype: `collections.namedtuple`
:raises errors.LetsEncryptClientError: If unimplemented challenge exists
"""
if chall["type"] == "recoveryToken":
logging.info(" Recovery Token Challenge for name: %s.", domain)
return challenge_util.RecTokenChall(domain)
elif chall["type"] == "recoveryContact":
logging.info(" Recovery Contact Challenge for name: %s.", domain)
return challenge_util.RecContactChall(
domain,
chall.get("activationURL", None),
chall.get("successURL", None),
chall.get("contact", None))
elif chall["type"] == "proofOfPossession":
logging.info(" Proof-of-Possession Challenge for name: "
"%s", domain)
return challenge_util.PopChall(
domain, chall["alg"], chall["nonce"], chall["hints"])
else:
raise errors.LetsEncryptClientError(
"Unimplemented Client Challenge: %s" % chall["type"])
def gen_challenge_path(challenges, preferences, combos=None):
def gen_challenge_path(challs, preferences, combinations):
"""Generate a plan to get authority over the identity.
.. todo:: Make sure that the challenges are feasible...
Example: Do you have the recovery key?
:param list challenges: A list of challenges from ACME "challenge"
server message to be fulfilled by the client in order to prove
possession of the identifier.
:param list challs: A list of challenges
(:class:`letsencrypt.acme.challenges.Challenge`) from
:class:`letsencrypt.acme.messages.Challenge` server message to
be fulfilled by the client in order to prove possession of the
identifier.
:param list preferences: List of challenge preferences for domain
(:class:`letsencrypt.acme.challenges.Challege` subclasses)
:param combos: A collection of sets of challenges from ACME
"challenge" server message ("combinations"), each of which would
:param list combinations: A collection of sets of challenges from
:class:`letsencrypt.acme.messages.Challenge`, each of which would
be sufficient to prove possession of the identifier.
:type combos: list or None
:returns: List of indices from `challenges`.
:returns: List of indices from ``challenges``.
:rtype: list
"""
if combos:
return _find_smart_path(challenges, preferences, combos)
if combinations:
return _find_smart_path(challs, preferences, combinations)
else:
return _find_dumb_path(challenges, preferences)
return _find_dumb_path(challs, preferences)
def _find_smart_path(challenges, preferences, combos):
def _find_smart_path(challs, preferences, combinations):
"""Find challenge path with server hints.
Can be called if combinations is included. Function uses a simple
ranking system to choose the combo with the lowest cost.
:param list challenges: A list of challenges from ACME "challenge"
server message to be fulfilled by the client in order to prove
possession of the identifier.
:param combos: A collection of sets of challenges from ACME
"challenge" server message ("combinations"), each of which would
be sufficient to prove possession of the identifier.
:type combos: list or None
:returns: List of indices from `challenges`.
:rtype: list
"""
chall_cost = {}
max_cost = 0
for i, chall in enumerate(preferences):
chall_cost[chall] = i
for i, chall_cls in enumerate(preferences):
chall_cost[chall_cls] = i
max_cost += i
best_combo = []
@ -416,10 +359,10 @@ def _find_smart_path(challenges, preferences, combos):
best_combo_cost = max_cost + 1
combo_total = 0
for combo in combos:
for combo in combinations:
for challenge_index in combo:
combo_total += chall_cost.get(challenges[
challenge_index]["type"], max_cost)
combo_total += chall_cost.get(challs[
challenge_index].__class__, max_cost)
if combo_total < best_combo_cost:
best_combo = combo
best_combo_cost = combo_total
@ -433,47 +376,48 @@ def _find_smart_path(challenges, preferences, combos):
return best_combo
def _find_dumb_path(challenges, preferences):
def _find_dumb_path(challs, preferences):
"""Find challenge path without server hints.
Should be called if the combinations hint is not included by the
server. This function returns the best path that does not contain
multiple mutually exclusive challenges.
:param list challenges: A list of challenges from ACME "challenge"
server message to be fulfilled by the client in order to prove
possession of the identifier.
:param list preferences: A list of preferences representing the
challenge type found within the ACME spec. Each challenge type
can only be listed once.
:returns: List of indices from `challenges`.
:rtype: list
"""
# Add logic for a crappy server
# Choose a DV
path = []
assert len(preferences) == len(set(preferences))
path = []
satisfied = set()
for pref_c in preferences:
for i, offered_challenge in enumerate(challenges):
if (pref_c == offered_challenge["type"] and
is_preferred(offered_challenge["type"], path)):
path.append((i, offered_challenge["type"]))
return [i for (i, _) in path]
for i, offered_chall in enumerate(challs):
if (isinstance(offered_chall, pref_c) and
is_preferred(offered_chall, satisfied)):
path.append(i)
satisfied.add(offered_chall)
return path
def is_preferred(offered_challenge_type, path):
"""Return whether or not the challenge is preferred in path."""
for _, challenge_type in path:
for mutually_exclusive in constants.EXCLUSIVE_CHALLENGES:
# Second part is in case we eventually allow multiple names
# to be challenges at the same time
if (challenge_type in mutually_exclusive and
offered_challenge_type in mutually_exclusive and
challenge_type != offered_challenge_type):
def mutually_exclusive(obj1, obj2, groups, different=False):
"""Are two objects mutually exclusive?"""
for group in groups:
obj1_present = False
obj2_present = False
for obj_cls in group:
obj1_present |= isinstance(obj1, obj_cls)
obj2_present |= isinstance(obj2, obj_cls)
if obj1_present and obj2_present and (
not different or not isinstance(obj1, obj2.__class__)):
return False
return True
def is_preferred(offered_chall, satisfied,
exclusive_groups=constants.EXCLUSIVE_CHALLENGES):
"""Return whether or not the challenge is preferred in path."""
for chall in satisfied:
if not mutually_exclusive(
offered_chall, chall, exclusive_groups, different=True):
return False
return True

View file

@ -1,74 +0,0 @@
"""Challenge specific utility functions."""
import collections
import hashlib
from Crypto import Random
from letsencrypt.acme import jose
from letsencrypt.client import constants
from letsencrypt.client import crypto_util
# Authenticator Challenges
DvsniChall = collections.namedtuple("DvsniChall", "domain, r_b64, nonce, key")
SimpleHttpsChall = collections.namedtuple(
"SimpleHttpsChall", "domain, token, key")
DnsChall = collections.namedtuple("DnsChall", "domain, token")
# Client Challenges
RecContactChall = collections.namedtuple(
"RecContactChall", "domain, a_url, s_url, contact")
RecTokenChall = collections.namedtuple("RecTokenChall", "domain")
PopChall = collections.namedtuple("PopChall", "domain, alg, nonce, hints")
# Helper Challenge Wrapper - Can be used to maintain the proper position of
# the response within a larger challenge list
IndexedChall = collections.namedtuple("IndexedChall", "chall, index")
# DVSNI Challenge functions
def dvsni_gen_cert(name, r_b64, nonce, key):
"""Generate a DVSNI cert and save it to filepath.
:param str name: domain to validate
:param str r_b64: jose base64 encoded dvsni r value
:param str nonce: hex value of nonce
:param key: Key to perform challenge
:type key: :class:`letsencrypt.client.le_util.Key`
:returns: tuple of (cert_pem, s) where
cert_pem is the certificate in pem form
s is the dvsni s value, jose base64 encoded
:rtype: tuple
"""
# Generate S
dvsni_s = Random.get_random_bytes(constants.S_SIZE)
dvsni_r = jose.b64decode(r_b64)
# Generate extension
ext = _dvsni_gen_ext(dvsni_r, dvsni_s)
cert_pem = crypto_util.make_ss_cert(
key.pem, [nonce + constants.DVSNI_DOMAIN_SUFFIX, name, ext])
return cert_pem, jose.b64encode(dvsni_s)
def _dvsni_gen_ext(dvsni_r, dvsni_s):
"""Generates z extension to be placed in certificate extension.
:param bytearray dvsni_r: DVSNI r value
:param bytearray dvsni_s: DVSNI s value
:returns: z + :const:`~letsencrypt.client.constants.DVSNI_DOMAIN_SUFFIX`
:rtype: str
"""
z_base = hashlib.new("sha256")
z_base.update(dvsni_r)
z_base.update(dvsni_s)
return z_base.hexdigest() + constants.DVSNI_DOMAIN_SUFFIX

View file

@ -1,7 +1,9 @@
"""Client Authenticator"""
import zope.interface
from letsencrypt.client import challenge_util
from letsencrypt.acme import challenges
from letsencrypt.client import achallenges
from letsencrypt.client import errors
from letsencrypt.client import interfaces
from letsencrypt.client import recovery_token
@ -30,22 +32,22 @@ class ClientAuthenticator(object):
def get_chall_pref(self, unused_domain): # pylint: disable=no-self-use
"""Return list of challenge preferences."""
return ["recoveryToken"]
return [challenges.RecoveryToken]
def perform(self, chall_list):
def perform(self, achalls):
"""Perform client specific challenges for IAuthenticator"""
responses = []
for chall in chall_list:
if isinstance(chall, challenge_util.RecTokenChall):
responses.append(self.rec_token.perform(chall))
for achall in achalls:
if isinstance(achall, achallenges.RecoveryToken):
responses.append(self.rec_token.perform(achall))
else:
raise errors.LetsEncryptClientAuthError("Unexpected Challenge")
return responses
def cleanup(self, chall_list):
def cleanup(self, achalls):
"""Cleanup call for IAuthenticator."""
for chall in chall_list:
if isinstance(chall, challenge_util.RecTokenChall):
self.rec_token.cleanup(chall)
for achall in achalls:
if isinstance(achall, achallenges.RecoveryToken):
self.rec_token.cleanup(achall)
else:
raise errors.LetsEncryptClientAuthError("Unexpected Challenge")

View file

@ -1,26 +1,21 @@
"""Let's Encrypt constants."""
import pkg_resources
from letsencrypt.acme import challenges
S_SIZE = 32
"""Size (in bytes) of secret base64-encoded octet string "s" used in
challanges."""
challenges."""
NONCE_SIZE = 16
"""Size of nonce used in JWS objects (in bytes)."""
EXCLUSIVE_CHALLENGES = [frozenset(["dvsni", "simpleHttps"])]
EXCLUSIVE_CHALLENGES = frozenset([frozenset([
challenges.DVSNI, challenges.SimpleHTTPS])])
"""Mutually exclusive challenges."""
DV_CHALLENGES = frozenset(["dvsni", "simpleHttps", "dns"])
"""Challenges that must be solved by a
:class:`letsencrypt.client.interfaces.IAuthenticator` object."""
CLIENT_CHALLENGES = frozenset(
["recoveryToken", "recoveryContact", "proofOfPossession"])
"""Challenges that are handled by the Let's Encrypt client."""
ENHANCEMENTS = ["redirect", "http-header", "ocsp-stapling", "spdy"]
"""List of possible :class:`letsencrypt.client.interfaces.IInstaller`
@ -48,9 +43,6 @@ APACHE_REWRITE_HTTPS_ARGS = [
DVSNI_CHALLENGE_PORT = 443
"""Port to perform DVSNI challenge."""
DVSNI_DOMAIN_SUFFIX = ".acme.invalid"
"""Suffix appended to domains in DVSNI validation."""
TEMP_CHECKPOINT_DIR = "temp_checkpoint"
"""Temporary checkpoint directory (relative to IConfig.work_dir)."""

View file

@ -30,43 +30,43 @@ class IAuthenticator(zope.interface.Interface):
:param str domain: Domain for which challenge preferences are sought.
:returns: list of strings with the most preferred challenges first.
If a type is not specified, it means the Authenticator cannot
perform the challenge.
:returns: List of challege types (subclasses of
:class:`letsencrypt.acme.challenges.Challenge`) with the most
preferred challenges first. If a type is not specified, it means the
Authenticator cannot perform the challenge.
:rtype: list
"""
def perform(chall_list):
def perform(achalls):
"""Perform the given challenge.
:param list chall_list: List of namedtuple types defined in
:mod:`letsencrypt.client.challenge_util` (``DvsniChall``, etc.).
:param list achalls: Non-empty (guaranteed) list of
:class:`~letsencrypt.client.achallenges.AnnotatedChallenge`
instances, such that it contains types found within
:func:`get_chall_pref` only.
- chall_list will never be empty
- chall_list will only contain types found within
:func:`get_chall_pref`
:returns: ACME Challenge responses or if it cannot be completed then:
:returns: List of ACME
:class:`~letsencrypt.acme.challenges.ChallengeResponse` instances
or if the :class:`~letsencrypt.acme.challenges.Challenge` cannot
be fulfilled then:
``None``
Authenticator can perform challenge, but can't at this time
Authenticator can perform challenge, but not at this time.
``False``
Authenticator will never be able to perform (error)
Authenticator will never be able to perform (error).
:rtype: :class:`list` of :class:`dict`
:rtype: :class:`list` of
:class:`letsencrypt.acme.challenges.ChallengeResponse`
"""
def cleanup(chall_list):
def cleanup(achalls):
"""Revert changes and shutdown after challenges complete.
:param list chall_list: List of namedtuple types defined in
:mod:`letsencrypt.client.challenge_util` (``DvsniChall``, etc.)
- Only challenges given previously in the perform function will be
found in chall_list.
- chall_list will never be empty
:param list achalls: Non-empty (guaranteed) list of
:class:`~letsencrypt.client.achallenges.AnnotatedChallenge`
instances, a subset of those previously passed to :func:`perform`.
"""

View file

@ -4,6 +4,8 @@ import os
import zope.component
from letsencrypt.acme import challenges
from letsencrypt.client import le_util
from letsencrypt.client import interfaces
@ -21,7 +23,7 @@ class RecoveryToken(object):
"""Perform the Recovery Token Challenge.
:param chall: Recovery Token Challenge
:type chall: :class:`letsencrypt.client.challenge_util.RecTokenChall`
:type chall: :class:`letsencrypt.client.achallenges.RecoveryToken`
:returns: response
:rtype: dict
@ -30,13 +32,13 @@ class RecoveryToken(object):
token_fp = os.path.join(self.token_dir, chall.domain)
if os.path.isfile(token_fp):
with open(token_fp) as token_fd:
return self.generate_response(token_fd.read())
return challenges.RecoveryTokenResponse(token=token_fd.read())
cancel, token = zope.component.getUtility(
interfaces.IDisplay).input(
"%s - Input Recovery Token: " % chall.domain)
if cancel != 1:
return self.generate_response(token)
return challenges.RecoveryTokenResponse(token=token)
return None
@ -44,7 +46,7 @@ class RecoveryToken(object):
"""Cleanup the saved recovery token if it exists.
:param chall: Recovery Token Challenge
:type chall: :class:`letsencrypt.client.challenge_util.RecTokenChall`
:type chall: :class:`letsencrypt.client.achallenges.RecoveryToken`
"""
try:
@ -53,13 +55,6 @@ class RecoveryToken(object):
if err.errno != errno.ENOENT:
raise
def generate_response(self, token): # pylint: disable=no-self-use
"""Generate json response."""
return {
"type": "recoveryToken",
"token": token,
}
def requires_human(self, domain):
"""Indicates whether or not domain can be auto solved."""
return not os.path.isfile(os.path.join(self.token_dir, domain))

View file

@ -12,7 +12,9 @@ import OpenSSL.SSL
import zope.component
import zope.interface
from letsencrypt.client import challenge_util
from letsencrypt.acme import challenges
from letsencrypt.client import achallenges
from letsencrypt.client import constants
from letsencrypt.client import interfaces
@ -328,9 +330,9 @@ class StandaloneAuthenticator(object):
:returns: A list containing only 'dvsni'.
"""
return ["dvsni"]
return [challenges.DVSNI]
def perform(self, chall_list):
def perform(self, achalls):
"""Perform the challenge.
.. warning::
@ -340,13 +342,6 @@ class StandaloneAuthenticator(object):
validations for multiple independent sets of domains, a separate
StandaloneAuthenticator should be instantiated.
:param list chall_list: List of namedtuple types defined in
:mod:`letsencrypt.client.challenge_util` (``DvsniChall``, etc.)
:returns: ACME Challenge DVSNI responses following IAuthenticator
interface.
:rtype: :class:`list` of :class`dict`
"""
if self.child_pid or self.tasks:
# We should not be willing to continue with perform
@ -354,17 +349,15 @@ class StandaloneAuthenticator(object):
raise ValueError(".perform() was called with pending tasks!")
results_if_success = []
results_if_failure = []
if not chall_list or not isinstance(chall_list, list):
if not achalls or not isinstance(achalls, list):
raise ValueError(".perform() was called without challenge list")
for chall in chall_list:
if isinstance(chall, challenge_util.DvsniChall):
for achall in achalls:
if isinstance(achall, achallenges.DVSNI):
# We will attempt to do it
name, r_b64 = chall.domain, chall.r_b64
nonce, key = chall.nonce, chall.key
cert, s_b64 = challenge_util.dvsni_gen_cert(
name, r_b64, nonce, key)
self.tasks[nonce + constants.DVSNI_DOMAIN_SUFFIX] = cert
results_if_success.append({"type": "dvsni", "s": s_b64})
key = achall.key # TODO: bug; one key per start_listener
cert_pem, response = achall.gen_cert_and_response()
self.tasks[achall.nonce_domain] = cert_pem
results_if_success.append(response)
results_if_failure.append(None)
else:
# We will not attempt to do this challenge because it
@ -388,7 +381,7 @@ class StandaloneAuthenticator(object):
# rather than returning a list of None objects.
return results_if_failure
def cleanup(self, chall_list):
def cleanup(self, achalls):
"""Clean up.
If some challenges are removed from the list, the authenticator
@ -398,11 +391,10 @@ class StandaloneAuthenticator(object):
"""
# Remove this from pending tasks list
for chall in chall_list:
assert isinstance(chall, challenge_util.DvsniChall)
nonce = chall.nonce
if nonce + constants.DVSNI_DOMAIN_SUFFIX in self.tasks:
del self.tasks[nonce + constants.DVSNI_DOMAIN_SUFFIX]
for achall in achalls:
assert isinstance(achall, achallenges.DVSNI)
if achall.nonce_domain in self.tasks:
del self.tasks[achall.nonce_domain]
else:
# Could not find the challenge to remove!
raise ValueError("could not find the challenge to remove")

View file

@ -0,0 +1,62 @@
"""Tests for letsencrypt.client.achallenges."""
import os
import pkg_resources
import re
import unittest
import M2Crypto
import mock
from letsencrypt.acme import challenges
from letsencrypt.client import le_util
class DVSNITest(unittest.TestCase):
"""Tests for letsencrypt.client.achallenges.DVSNI."""
def setUp(self):
self.chall = challenges.DVSNI(r="r_value", nonce="12345ABCDE")
self.response = challenges.DVSNIResponse()
key = le_util.Key("path", pkg_resources.resource_string(
__name__, os.path.join("testdata", "rsa256_key.pem")))
from letsencrypt.client.achallenges import DVSNI
self.achall = DVSNI(chall=self.chall, domain="example.com", key=key)
def test_proxy(self):
self.assertEqual(self.chall.r, self.achall.r)
self.assertEqual(self.chall.nonce, self.achall.nonce)
def test_gen_cert_and_response(self):
cert_pem, _ = self.achall.gen_cert_and_response(s=self.response.s)
cert = M2Crypto.X509.load_cert_string(cert_pem)
self.assertEqual(cert.get_subject().CN, self.chall.nonce_domain)
sans = cert.get_ext("subjectAltName").get_value()
self.assertEqual(
set([self.chall.nonce_domain, "example.com",
self.response.z_domain(self.chall)]),
set(re.findall(r"DNS:([^, $]*)", sans)),
)
class IndexedTest(unittest.TestCase):
"""Tests for letsencrypt.client.achallenges.Indexed."""
def setUp(self):
from letsencrypt.client.achallenges import Indexed
self.achall = mock.MagicMock()
self.ichall = Indexed(achall=self.achall, index=0)
def test_attributes(self):
self.assertEqual(self.achall, self.ichall.achall)
self.assertEqual(0, self.ichall.index)
def test_proxy(self):
self.assertEqual(self.achall.foo, self.ichall.foo)
if __name__ == "__main__":
unittest.main()

View file

@ -1,79 +1,53 @@
"""Class helps construct valid ACME messages for testing."""
from letsencrypt.client import constants
import os
import pkg_resources
import Crypto.PublicKey.RSA
from letsencrypt.acme import challenges
from letsencrypt.acme import other
CHALLENGES = {
"simpleHttps":
{
"type": "simpleHttps",
"token": "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ+PCt92wr+oA"
},
"dvsni":
{
"type": "dvsni",
"r": "Tyq0La3slT7tqQ0wlOiXnCY2vyez7Zo5blgPJ1xt5xI",
"nonce": "a82d5ff8ef740d12881f6d3c2277ab2e"
},
"dns":
{
"type": "dns",
"token": "17817c66b60ce2e4012dfad92657527a"
},
"recoveryContact":
{
"type": "recoveryContact",
"activationURL": "https://example.ca/sendrecovery/a5bd99383fb0",
"successURL": "https://example.ca/confirmrecovery/bb1b9928932",
"contact": "c********n@example.com"
},
"recoveryToken":
{
"type": "recoveryToken"
},
"proofOfPossession":
{
"type": "proofOfPossession",
"alg": "RS256",
"nonce": "eET5udtV7aoX8Xl8gYiZIA",
"hints": {
"jwk": {
"kty": "RSA",
"e": "AQAB",
"n": "KxITJ0rNlfDMAtfDr8eAw...fSSoehDFNZKQKzTZPtQ"
},
"certFingerprints": [
"93416768eb85e33adc4277f4c9acd63e7418fcfe",
"16d95b7b63f1972b980b14c20291f3c0d1855d95",
"48b46570d9fc6358108af43ad1649484def0debf"
],
"subjectKeyIdentifiers":
["d0083162dcc4c8a23ecb8aecbd86120e56fd24e5"],
"serialNumbers": [34234239832, 23993939911, 17],
"issuers": [
"C=US, O=SuperT LLC, CN=SuperTrustworthy Public CA",
"O=LessTrustworthy CA Inc, CN=LessTrustworthy But StillSecure"
],
"authorizedFor": ["www.example.com", "example.net"]
}
}
}
KEY = Crypto.PublicKey.RSA.importKey(pkg_resources.resource_string(
"letsencrypt.client.tests", os.path.join("testdata", "rsa256_key.pem")))
# Challenges
SIMPLE_HTTPS = challenges.SimpleHTTPS(
token="evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ+PCt92wr+oA")
DVSNI = challenges.DVSNI(
r="O*\xb4-\xad\xec\x95>\xed\xa9\r0\x94\xe8\x97\x9c&6\xbf'\xb3"
"\xed\x9a9nX\x0f'\\m\xe7\x12", nonce="a82d5ff8ef740d12881f6d3c2277ab2e")
DNS = challenges.DNS(token="17817c66b60ce2e4012dfad92657527a")
RECOVERY_CONTACT = challenges.RecoveryContact(
activation_url="https://example.ca/sendrecovery/a5bd99383fb0",
success_url="https://example.ca/confirmrecovery/bb1b9928932",
contact="c********n@example.com")
RECOVERY_TOKEN = challenges.RecoveryToken()
POP = challenges.ProofOfPossession(
alg="RS256", nonce="xD\xf9\xb9\xdbU\xed\xaa\x17\xf1y|\x81\x88\x99 ",
hints=challenges.ProofOfPossession.Hints(
jwk=other.JWK(key=KEY.publickey()),
cert_fingerprints=[
"93416768eb85e33adc4277f4c9acd63e7418fcfe",
"16d95b7b63f1972b980b14c20291f3c0d1855d95",
"48b46570d9fc6358108af43ad1649484def0debf"
],
certs=[], # TODO
subject_key_identifiers=["d0083162dcc4c8a23ecb8aecbd86120e56fd24e5"],
serial_numbers=[34234239832, 23993939911, 17],
issuers=[
"C=US, O=SuperT LLC, CN=SuperTrustworthy Public CA",
"O=LessTrustworthy CA Inc, CN=LessTrustworthy But StillSecure",
],
authorized_for=["www.example.com", "example.net"],
)
)
def get_dv_challenges():
"""Returns all auth challenges."""
return [chall for typ, chall in CHALLENGES.iteritems()
if typ in constants.DV_CHALLENGES]
def get_client_challenges():
"""Returns all client challenges."""
return [chall for typ, chall in CHALLENGES.iteritems()
if typ in constants.CLIENT_CHALLENGES]
def get_challenges():
"""Returns all challenges."""
return [chall for chall in CHALLENGES.itervalues()]
CHALLENGES = [SIMPLE_HTTPS, DVSNI, DNS, RECOVERY_CONTACT, RECOVERY_TOKEN, POP]
DV_CHALLENGES = [chall for chall in CHALLENGES
if isinstance(chall, challenges.DVChallenge)]
CLIENT_CHALLENGES = [chall for chall in CHALLENGES
if isinstance(chall, challenges.ClientChallenge)]
def gen_combos(challs):
@ -81,8 +55,8 @@ def gen_combos(challs):
dv_chall = []
renewal_chall = []
for i, chall in enumerate(challs):
if chall["type"] in constants.DV_CHALLENGES:
for i, chall in enumerate(challs): # pylint: disable=redefined-outer-name
if isinstance(chall, challenges.DVChallenge):
dv_chall.append(i)
else:
renewal_chall.append(i)

View file

@ -6,7 +6,9 @@ import unittest
import mock
from letsencrypt.client import challenge_util
from letsencrypt.acme import challenges
from letsencrypt.client import achallenges
from letsencrypt.client import errors
from letsencrypt.client import le_util
@ -140,16 +142,16 @@ class TwoVhost80Test(util.ApacheTest):
# Only tests functionality specific to configurator.perform
# Note: As more challenges are offered this will have to be expanded
auth_key = le_util.Key(self.rsa256_file, self.rsa256_pem)
chall1 = challenge_util.DvsniChall(
"encryption-example.demo",
"jIq_Xy1mXGN37tb4L6Xj_es58fW571ZNyXekdZzhh7Q",
"37bc5eb75d3e00a19b4f6355845e5a18",
auth_key)
chall2 = challenge_util.DvsniChall(
"letsencrypt.demo",
"uqnaPzxtrndteOqtrXb0Asl5gOJfWAnnx6QJyvcmlDU",
"59ed014cac95f77057b1d7a1b2c596ba",
auth_key)
achall1 = achallenges.DVSNI(
chall=challenges.DVSNI(
r="jIq_Xy1mXGN37tb4L6Xj_es58fW571ZNyXekdZzhh7Q",
nonce="37bc5eb75d3e00a19b4f6355845e5a18"),
domain="encryption-example.demo", key=auth_key)
achall2 = achallenges.DVSNI(
chall=challenges.DVSNI(
r="uqnaPzxtrndteOqtrXb0Asl5gOJfWAnnx6QJyvcmlDU",
nonce="59ed014cac95f77057b1d7a1b2c596ba"),
domain="letsencrypt.demo", key=auth_key)
dvsni_ret_val = [
{"type": "dvsni", "s": "randomS1"},
@ -157,7 +159,7 @@ class TwoVhost80Test(util.ApacheTest):
]
mock_dvsni_perform.return_value = dvsni_ret_val
responses = self.config.perform([chall1, chall2])
responses = self.config.perform([achall1, achall2])
self.assertEqual(mock_dvsni_perform.call_count, 1)
self.assertEqual(responses, dvsni_ret_val)

View file

@ -5,8 +5,9 @@ import shutil
import mock
from letsencrypt.client import challenge_util
from letsencrypt.client import constants
from letsencrypt.acme import challenges
from letsencrypt.client import achallenges
from letsencrypt.client import le_util
from letsencrypt.client.apache.obj import Addr
@ -36,17 +37,21 @@ class DvsniPerformTest(util.ApacheTest):
"letsencrypt.client.tests", 'testdata/rsa256_key.pem')
auth_key = le_util.Key(rsa256_file, rsa256_pem)
self.challs = []
self.challs.append(challenge_util.DvsniChall(
"encryption-example.demo",
"jIq_Xy1mXGN37tb4L6Xj_es58fW571ZNyXekdZzhh7Q",
"37bc5eb75d3e00a19b4f6355845e5a18",
auth_key))
self.challs.append(challenge_util.DvsniChall(
"letsencrypt.demo",
"uqnaPzxtrndteOqtrXb0Asl5gOJfWAnnx6QJyvcmlDU",
"59ed014cac95f77057b1d7a1b2c596ba",
auth_key))
self.achalls = [
achallenges.DVSNI(
chall=challenges.DVSNI(
r="\x8c\x8a\xbf_-f\\cw\xee\xd6\xf8/\xa5\xe3\xfd\xeb9\xf1"
"\xf5\xb9\xefVM\xc9w\xa4u\x9c\xe1\x87\xb4",
nonce="7\xbc^\xb7]>\x00\xa1\x9bOcU\x84^Z\x18",
), domain="encryption-example.demo", key=auth_key),
achallenges.DVSNI(
chall=challenges.DVSNI(
r="\xba\xa9\xda?<m\xaewmx\xea\xad\xadv\xf4\x02\xc9y\x80"
"\xe2_X\t\xe7\xc7\xa4\t\xca\xf7&\x945",
nonce="Y\xed\x01L\xac\x95\xf7pW\xb1\xd7"
"\xa1\xb2\xc5\x96\xba",
), domain="letsencrypt.demo", key=auth_key),
]
def tearDown(self):
shutil.rmtree(self.temp_dir)
@ -57,48 +62,40 @@ class DvsniPerformTest(util.ApacheTest):
resp = self.sni.perform()
self.assertTrue(resp is None)
@mock.patch("letsencrypt.client.challenge_util.dvsni_gen_cert")
def test_setup_challenge_cert(self, mock_dvsni_gen_cert):
def test_setup_challenge_cert(self):
# This is a helper function that can be used for handling
# open context managers more elegantly. It avoids dealing with
# __enter__ and __exit__ calls.
# http://www.voidspace.org.uk/python/mock/helpers.html#mock.mock_open
chall = self.challs[0]
m_open = mock.mock_open()
mock_dvsni_gen_cert.return_value = ("pem", "randomS1")
response = challenges.DVSNIResponse(s="randomS1")
achall = mock.MagicMock(nonce=self.achalls[0].nonce,
nonce_domain=self.achalls[0].nonce_domain)
achall.gen_cert_and_response.return_value = ("pem", response)
with mock.patch("letsencrypt.client.apache.dvsni.open",
m_open, create=True):
# pylint: disable=protected-access
s_b64 = self.sni._setup_challenge_cert(chall)
self.assertEqual(s_b64, "randomS1")
self.assertEqual(response, self.sni._setup_challenge_cert(
achall, "randomS1"))
self.assertTrue(m_open.called)
self.assertEqual(
m_open.call_args[0], (self.sni.get_cert_file(chall.nonce), 'w'))
m_open.call_args[0], (self.sni.get_cert_file(achall), 'w'))
self.assertEqual(m_open().write.call_args[0][0], "pem")
self.assertEqual(mock_dvsni_gen_cert.call_count, 1)
calls = mock_dvsni_gen_cert.call_args_list
expected_call_list = [
(chall.domain, chall.r_b64, chall.nonce, chall.key)
]
for i in xrange(len(expected_call_list)):
for j in xrange(len(expected_call_list[0])):
self.assertEqual(calls[i][0][j], expected_call_list[i][j])
def test_perform1(self):
chall = self.challs[0]
self.sni.add_chall(chall)
mock_setup_cert = mock.MagicMock(return_value="randomS1")
achall = self.achalls[0]
self.sni.add_chall(achall)
mock_setup_cert = mock.MagicMock(
return_value=challenges.DVSNIResponse(s="randomS1"))
# pylint: disable=protected-access
self.sni._setup_challenge_cert = mock_setup_cert
responses = self.sni.perform()
mock_setup_cert.assert_called_once_with(chall)
mock_setup_cert.assert_called_once_with(achall)
# Check to make sure challenge config path is included in apache config.
self.assertEqual(
@ -106,13 +103,15 @@ class DvsniPerformTest(util.ApacheTest):
"Include", self.sni.challenge_conf)),
1)
self.assertEqual(len(responses), 1)
self.assertEqual(responses[0]["s"], "randomS1")
self.assertEqual(responses[0].s, "randomS1")
def test_perform2(self):
for chall in self.challs:
self.sni.add_chall(chall)
for achall in self.achalls:
self.sni.add_chall(achall)
mock_setup_cert = mock.MagicMock(side_effect=["randomS0", "randomS1"])
mock_setup_cert = mock.MagicMock(side_effect=[
challenges.DVSNIResponse(s="randomS0"),
challenges.DVSNIResponse(s="randomS1")])
# pylint: disable=protected-access
self.sni._setup_challenge_cert = mock_setup_cert
@ -122,9 +121,9 @@ class DvsniPerformTest(util.ApacheTest):
# Make sure calls made to mocked function were correct
self.assertEqual(
mock_setup_cert.call_args_list[0], mock.call(self.challs[0]))
mock_setup_cert.call_args_list[0], mock.call(self.achalls[0]))
self.assertEqual(
mock_setup_cert.call_args_list[1], mock.call(self.challs[1]))
mock_setup_cert.call_args_list[1], mock.call(self.achalls[1]))
self.assertEqual(
len(self.sni.configurator.parser.find_dir(
@ -132,11 +131,11 @@ class DvsniPerformTest(util.ApacheTest):
1)
self.assertEqual(len(responses), 2)
for i in xrange(2):
self.assertEqual(responses[i]["s"], "randomS%d" % i)
self.assertEqual(responses[i].s, "randomS%d" % i)
def test_mod_config(self):
for chall in self.challs:
self.sni.add_chall(chall)
for achall in self.achalls:
self.sni.add_chall(achall)
v_addr1 = [Addr(("1.2.3.4", "443")), Addr(("5.6.7.8", "443"))]
v_addr2 = [Addr(("127.0.0.1", "443"))]
ll_addr = []
@ -159,14 +158,12 @@ class DvsniPerformTest(util.ApacheTest):
if vhost.addrs == set(v_addr1):
self.assertEqual(
vhost.names,
set([str(self.challs[0].nonce +
constants.DVSNI_DOMAIN_SUFFIX)]))
set([self.achalls[0].nonce_domain]))
else:
self.assertEqual(vhost.addrs, set(v_addr2))
self.assertEqual(
vhost.names,
set([str(self.challs[1].nonce +
constants.DVSNI_DOMAIN_SUFFIX)]))
set([self.achalls[1].nonce_domain]))
if __name__ == '__main__':

View file

@ -4,21 +4,22 @@ import unittest
import mock
from letsencrypt.acme import challenges
from letsencrypt.acme import messages
from letsencrypt.client import challenge_util
from letsencrypt.client import achallenges
from letsencrypt.client import errors
from letsencrypt.client.tests import acme_util
TRANSLATE = {
"dvsni": "DvsniChall",
"simpleHttps": "SimpleHttpsChall",
"dns": "DnsChall",
"recoveryToken": "RecTokenChall",
"recoveryContact": "RecContactChall",
"proofOfPossession": "PopChall",
"dvsni": "DVSNI",
"simpleHttps": "SimpleHTTPS",
"dns": "DNS",
"recoveryToken": "RecoveryToken",
"recoveryContact": "RecoveryContact",
"proofOfPossession": "ProofOfPossession",
}
@ -31,8 +32,9 @@ class SatisfyChallengesTest(unittest.TestCase):
self.mock_dv_auth = mock.MagicMock(name="ApacheConfigurator")
self.mock_client_auth = mock.MagicMock(name="ClientAuthenticator")
self.mock_dv_auth.get_chall_pref.return_value = ["dvsni"]
self.mock_client_auth.get_chall_pref.return_value = ["recoveryToken"]
self.mock_dv_auth.get_chall_pref.return_value = [challenges.DVSNI]
self.mock_client_auth.get_chall_pref.return_value = [
challenges.RecoveryToken]
self.mock_client_auth.perform.side_effect = gen_auth_resp
self.mock_dv_auth.perform.side_effect = gen_auth_resp
@ -47,9 +49,9 @@ class SatisfyChallengesTest(unittest.TestCase):
def test_name1_dvsni1(self):
dom = "0"
challenge = [acme_util.CHALLENGES["dvsni"]]
msg = messages.Challenge(session_id=dom, nonce="nonce0",
challenges=challenge, combinations=[])
msg = messages.Challenge(
session_id=dom, nonce="nonce0", combinations=[],
challenges=[acme_util.DVSNI])
self.handler.add_chall_msg(dom, msg, "dummy_key")
self.handler._satisfy_challenges() # pylint: disable=protected-access
@ -57,7 +59,7 @@ class SatisfyChallengesTest(unittest.TestCase):
self.assertEqual(len(self.handler.responses), 1)
self.assertEqual(len(self.handler.responses[dom]), 1)
self.assertEqual("DvsniChall0", self.handler.responses[dom][0])
self.assertEqual("DVSNI0", self.handler.responses[dom][0])
self.assertEqual(len(self.handler.dv_c), 1)
self.assertEqual(len(self.handler.client_c), 1)
self.assertEqual(len(self.handler.dv_c[dom]), 1)
@ -65,9 +67,9 @@ class SatisfyChallengesTest(unittest.TestCase):
def test_name1_rectok1(self):
dom = "0"
challenge = [acme_util.CHALLENGES["recoveryToken"]]
msg = messages.Challenge(session_id=dom, nonce="nonce0",
challenges=challenge, combinations=[])
msg = messages.Challenge(
session_id=dom, nonce="nonce0", combinations=[],
challenges=[acme_util.RECOVERY_TOKEN])
self.handler.add_chall_msg(dom, msg, "dummy_key")
self.handler._satisfy_challenges() # pylint: disable=protected-access
@ -79,7 +81,7 @@ class SatisfyChallengesTest(unittest.TestCase):
self.assertEqual(self.mock_client_auth.perform.call_count, 1)
self.assertEqual(self.mock_dv_auth.perform.call_count, 0)
self.assertEqual("RecTokenChall0", self.handler.responses[dom][0])
self.assertEqual("RecoveryToken0", self.handler.responses[dom][0])
# Assert 1 domain
self.assertEqual(len(self.handler.dv_c), 1)
self.assertEqual(len(self.handler.client_c), 1)
@ -88,12 +90,12 @@ class SatisfyChallengesTest(unittest.TestCase):
self.assertEqual(len(self.handler.client_c[dom]), 1)
def test_name5_dvsni5(self):
challenge = [acme_util.CHALLENGES["dvsni"]]
for i in xrange(5):
self.handler.add_chall_msg(
str(i),
messages.Challenge(session_id=str(i), nonce="nonce%d" % i,
challenges=challenge, combinations=[]),
challenges=[acme_util.DVSNI],
combinations=[]),
"dummy_key")
self.handler._satisfy_challenges() # pylint: disable=protected-access
@ -110,30 +112,31 @@ class SatisfyChallengesTest(unittest.TestCase):
for i in xrange(5):
dom = str(i)
self.assertEqual(len(self.handler.responses[dom]), 1)
self.assertEqual(self.handler.responses[dom][0], "DvsniChall%d" % i)
self.assertEqual(self.handler.responses[dom][0], "DVSNI%d" % i)
self.assertEqual(len(self.handler.dv_c[dom]), 1)
self.assertEqual(len(self.handler.client_c[dom]), 0)
self.assertTrue(isinstance(self.handler.dv_c[dom][0].chall,
challenge_util.DvsniChall))
self.assertTrue(isinstance(self.handler.dv_c[dom][0].achall,
achallenges.DVSNI))
@mock.patch("letsencrypt.client.auth_handler.gen_challenge_path")
def test_name1_auth(self, mock_chall_path):
dom = "0"
challenges = acme_util.get_dv_challenges()
combos = acme_util.gen_combos(challenges)
self.handler.add_chall_msg(
dom,
messages.Challenge(session_id="0", nonce="nonce0",
challenges=challenges, combinations=combos),
messages.Challenge(
session_id="0", nonce="nonce0",
challenges=acme_util.DV_CHALLENGES,
combinations=acme_util.gen_combos(acme_util.DV_CHALLENGES)),
"dummy_key")
path = gen_path(["simpleHttps"], challenges)
path = gen_path([acme_util.SIMPLE_HTTPS], acme_util.DV_CHALLENGES)
mock_chall_path.return_value = path
self.handler._satisfy_challenges() # pylint: disable=protected-access
self.assertEqual(len(self.handler.responses), 1)
self.assertEqual(len(self.handler.responses[dom]), len(challenges))
self.assertEqual(len(self.handler.responses[dom]),
len(acme_util.DV_CHALLENGES))
self.assertEqual(len(self.handler.dv_c), 1)
self.assertEqual(len(self.handler.client_c), 1)
@ -143,32 +146,34 @@ class SatisfyChallengesTest(unittest.TestCase):
self.assertEqual(
self.handler.responses[dom],
self._get_exp_response(dom, path, challenges))
self._get_exp_response(dom, path, acme_util.DV_CHALLENGES))
self.assertEqual(len(self.handler.dv_c[dom]), 1)
self.assertEqual(len(self.handler.client_c[dom]), 0)
self.assertTrue(isinstance(self.handler.dv_c[dom][0].chall,
challenge_util.SimpleHttpsChall))
self.assertTrue(isinstance(self.handler.dv_c[dom][0].achall,
achallenges.SimpleHTTPS))
@mock.patch("letsencrypt.client.auth_handler.gen_challenge_path")
def test_name1_all(self, mock_chall_path):
dom = "0"
challenges = acme_util.get_challenges()
combos = acme_util.gen_combos(challenges)
combos = acme_util.gen_combos(acme_util.CHALLENGES)
self.handler.add_chall_msg(
dom,
messages.Challenge(session_id=dom, nonce="nonce0",
challenges=challenges, combinations=combos),
messages.Challenge(
session_id=dom, nonce="nonce0", challenges=acme_util.CHALLENGES,
combinations=combos),
"dummy_key")
path = gen_path(["simpleHttps", "recoveryToken"], challenges)
path = gen_path([acme_util.SIMPLE_HTTPS, acme_util.RECOVERY_TOKEN],
acme_util.CHALLENGES)
mock_chall_path.return_value = path
self.handler._satisfy_challenges() # pylint: disable=protected-access
self.assertEqual(len(self.handler.responses), 1)
self.assertEqual(len(self.handler.responses[dom]), len(challenges))
self.assertEqual(
len(self.handler.responses[dom]), len(acme_util.CHALLENGES))
self.assertEqual(len(self.handler.dv_c), 1)
self.assertEqual(len(self.handler.client_c), 1)
self.assertEqual(len(self.handler.dv_c[dom]), 1)
@ -176,25 +181,25 @@ class SatisfyChallengesTest(unittest.TestCase):
self.assertEqual(
self.handler.responses[dom],
self._get_exp_response(dom, path, challenges))
self.assertTrue(isinstance(self.handler.dv_c[dom][0].chall,
challenge_util.SimpleHttpsChall))
self.assertTrue(isinstance(self.handler.client_c[dom][0].chall,
challenge_util.RecTokenChall))
self._get_exp_response(dom, path, acme_util.CHALLENGES))
self.assertTrue(isinstance(self.handler.dv_c[dom][0].achall,
achallenges.SimpleHTTPS))
self.assertTrue(isinstance(self.handler.client_c[dom][0].achall,
achallenges.RecoveryToken))
@mock.patch("letsencrypt.client.auth_handler.gen_challenge_path")
def test_name5_all(self, mock_chall_path):
challenges = acme_util.get_challenges()
combos = acme_util.gen_combos(challenges)
combos = acme_util.gen_combos(acme_util.CHALLENGES)
for i in xrange(5):
self.handler.add_chall_msg(
str(i),
messages.Challenge(
session_id=str(i), nonce="nonce%d" % i,
challenges=challenges, combinations=combos),
challenges=acme_util.CHALLENGES, combinations=combos),
"dummy_key")
path = gen_path(["dvsni", "recoveryContact"], challenges)
path = gen_path([acme_util.DVSNI, acme_util.RECOVERY_CONTACT],
acme_util.CHALLENGES)
mock_chall_path.return_value = path
self.handler._satisfy_challenges() # pylint: disable=protected-access
@ -202,7 +207,7 @@ class SatisfyChallengesTest(unittest.TestCase):
self.assertEqual(len(self.handler.responses), 5)
for i in xrange(5):
self.assertEqual(
len(self.handler.responses[str(i)]), len(challenges))
len(self.handler.responses[str(i)]), len(acme_util.CHALLENGES))
self.assertEqual(len(self.handler.dv_c), 5)
self.assertEqual(len(self.handler.client_c), 5)
@ -210,28 +215,28 @@ class SatisfyChallengesTest(unittest.TestCase):
dom = str(i)
self.assertEqual(
self.handler.responses[dom],
self._get_exp_response(dom, path, challenges))
self._get_exp_response(dom, path, acme_util.CHALLENGES))
self.assertEqual(len(self.handler.dv_c[dom]), 1)
self.assertEqual(len(self.handler.client_c[dom]), 1)
self.assertTrue(isinstance(self.handler.dv_c[dom][0].chall,
challenge_util.DvsniChall))
self.assertTrue(isinstance(self.handler.client_c[dom][0].chall,
challenge_util.RecContactChall))
self.assertTrue(isinstance(self.handler.dv_c[dom][0].achall,
achallenges.DVSNI))
self.assertTrue(isinstance(self.handler.client_c[dom][0].achall,
achallenges.RecoveryContact))
@mock.patch("letsencrypt.client.auth_handler.gen_challenge_path")
def test_name5_mix(self, mock_chall_path):
paths = []
chosen_chall = [["dns"],
["dvsni"],
["simpleHttps", "proofOfPossession"],
["simpleHttps"],
["dns", "recoveryToken"]]
challenge_list = [acme_util.get_dv_challenges(),
[acme_util.CHALLENGES["dvsni"]],
acme_util.get_challenges(),
acme_util.get_dv_challenges(),
acme_util.get_challenges()]
chosen_chall = [[acme_util.DNS],
[acme_util.DVSNI],
[acme_util.SIMPLE_HTTPS, acme_util.POP],
[acme_util.SIMPLE_HTTPS],
[acme_util.DNS, acme_util.RECOVERY_TOKEN]]
challenge_list = [acme_util.DV_CHALLENGES,
[acme_util.DVSNI],
acme_util.CHALLENGES,
acme_util.DV_CHALLENGES,
acme_util.CHALLENGES]
# Combos doesn't matter since I am overriding the gen_path function
for i in xrange(5):
@ -260,21 +265,21 @@ class SatisfyChallengesTest(unittest.TestCase):
self.assertEqual(
len(self.handler.client_c[dom]), len(chosen_chall[i]) - 1)
self.assertTrue(isinstance(self.handler.dv_c["0"][0].chall,
challenge_util.DnsChall))
self.assertTrue(isinstance(self.handler.dv_c["1"][0].chall,
challenge_util.DvsniChall))
self.assertTrue(isinstance(self.handler.dv_c["2"][0].chall,
challenge_util.SimpleHttpsChall))
self.assertTrue(isinstance(self.handler.dv_c["3"][0].chall,
challenge_util.SimpleHttpsChall))
self.assertTrue(isinstance(self.handler.dv_c["4"][0].chall,
challenge_util.DnsChall))
self.assertTrue(isinstance(
self.handler.dv_c["0"][0].achall, achallenges.DNS))
self.assertTrue(isinstance(
self.handler.dv_c["1"][0].achall, achallenges.DVSNI))
self.assertTrue(isinstance(
self.handler.dv_c["2"][0].achall, achallenges.SimpleHTTPS))
self.assertTrue(isinstance(
self.handler.dv_c["3"][0].achall, achallenges.SimpleHTTPS))
self.assertTrue(isinstance(
self.handler.dv_c["4"][0].achall, achallenges.DNS))
self.assertTrue(isinstance(self.handler.client_c["2"][0].chall,
challenge_util.PopChall))
self.assertTrue(isinstance(self.handler.client_c["4"][0].chall,
challenge_util.RecTokenChall))
self.assertTrue(isinstance(self.handler.client_c["2"][0].achall,
achallenges.ProofOfPossession))
self.assertTrue(isinstance(
self.handler.client_c["4"][0].achall, achallenges.RecoveryToken))
@mock.patch("letsencrypt.client.auth_handler.gen_challenge_path")
def test_perform_exception_cleanup(self, mock_chall_path):
@ -282,21 +287,20 @@ class SatisfyChallengesTest(unittest.TestCase):
# pylint: disable=protected-access
self.mock_dv_auth.perform.side_effect = errors.LetsEncryptDvsniError
challenges = acme_util.get_challenges()
combos = acme_util.gen_combos(challenges)
combos = acme_util.gen_combos(acme_util.CHALLENGES)
for i in xrange(3):
self.handler.add_chall_msg(
str(i),
messages.Challenge(
session_id=str(i), nonce="nonce%d" % i,
challenges=challenges, combinations=combos),
challenges=acme_util.CHALLENGES, combinations=combos),
"dummy_key")
mock_chall_path.side_effect = [
gen_path(["dvsni", "proofOfPossession"], challenges),
gen_path(["proofOfPossession"], challenges),
gen_path(["dvsni"], challenges),
gen_path([acme_util.DVSNI, acme_util.POP], acme_util.CHALLENGES),
gen_path([acme_util.POP], acme_util.CHALLENGES),
gen_path([acme_util.DVSNI], acme_util.CHALLENGES),
]
# This may change in the future... but for now catch the error
@ -316,7 +320,7 @@ class SatisfyChallengesTest(unittest.TestCase):
dv_chall_list = dv_cleanup_args[i][0][0]
self.assertEqual(len(dv_chall_list), 1)
self.assertTrue(
isinstance(dv_chall_list[0], challenge_util.DvsniChall))
isinstance(dv_chall_list[0], achallenges.DVSNI))
# Check Auth cleanup
@ -324,14 +328,14 @@ class SatisfyChallengesTest(unittest.TestCase):
client_chall_list = client_cleanup_args[i][0][0]
self.assertEqual(len(client_chall_list), 1)
self.assertTrue(
isinstance(client_chall_list[0], challenge_util.PopChall))
isinstance(client_chall_list[0], achallenges.ProofOfPossession))
def _get_exp_response(self, domain, path, challenges):
def _get_exp_response(self, domain, path, challs):
# pylint: disable=no-self-use
exp_resp = ["null"] * len(challenges)
exp_resp = [None] * len(challs)
for i in path:
exp_resp[i] = TRANSLATE[challenges[i]["type"]] + str(domain)
exp_resp[i] = TRANSLATE[challs[i].acme_type] + str(domain)
return exp_resp
@ -357,12 +361,12 @@ class GetAuthorizationsTest(unittest.TestCase):
def test_solved3_at_once(self):
# Set 3 DVSNI challenges
challenge = [acme_util.CHALLENGES["dvsni"]]
for i in xrange(3):
self.handler.add_chall_msg(
str(i),
messages.Challenge(session_id=str(i), nonce="nonce%d" % i,
challenges=challenge, combinations=[]),
messages.Challenge(
session_id=str(i), nonce="nonce%d" % i,
challenges=[acme_util.DVSNI], combinations=[]),
"dummy_key")
self.mock_sat_chall.side_effect = self._sat_solved_at_once
@ -379,7 +383,7 @@ class GetAuthorizationsTest(unittest.TestCase):
def _sat_solved_at_once(self):
for i in xrange(3):
dom = str(i)
self.handler.responses[dom] = ["DvsniChall%d" % i]
self.handler.responses[dom] = ["DVSNI%d" % i]
self.handler.paths[dom] = [0]
# Assignment was > 80 char...
dv_c, c_c = self.handler._challenge_factory(dom, [0])
@ -387,11 +391,11 @@ class GetAuthorizationsTest(unittest.TestCase):
self.handler.dv_c[dom], self.handler.client_c[dom] = dv_c, c_c
def test_progress_failure(self):
challenges = acme_util.get_challenges()
self.handler.add_chall_msg(
"0",
messages.Challenge(session_id="0", nonce="nonce0",
challenges=challenges, combinations=[]),
messages.Challenge(
session_id="0", nonce="nonce0", challenges=acme_util.CHALLENGES,
combinations=[]),
"dummy_key")
# Don't do anything to satisfy challenges
@ -406,21 +410,19 @@ class GetAuthorizationsTest(unittest.TestCase):
def _sat_failure(self):
dom = "0"
self.handler.paths[dom] = gen_path(
["dns", "recoveryToken"], self.handler.msgs[dom].challenges)
[acme_util.DNS, acme_util.RECOVERY_TOKEN],
self.handler.msgs[dom].challenges)
dv_c, c_c = self.handler._challenge_factory(
dom, self.handler.paths[dom])
self.handler.dv_c[dom], self.handler.client_c[dom] = dv_c, c_c
def test_incremental_progress(self):
challs = []
challs.append(acme_util.get_challenges())
challs.append(acme_util.get_dv_challenges())
for i in xrange(2):
dom = str(i)
for dom, challs in [("0", acme_util.CHALLENGES),
("1", acme_util.DV_CHALLENGES)]:
self.handler.add_chall_msg(
dom,
messages.Challenge(session_id=dom, nonce="nonce%d" % i,
challenges=challs[i], combinations=[]),
messages.Challenge(session_id=dom, nonce="nonce",
combinations=[], challenges=challs),
"dummy_key")
self.mock_sat_chall.side_effect = self._sat_incremental
@ -437,7 +439,7 @@ class GetAuthorizationsTest(unittest.TestCase):
# Only solve one of "0" required challs
self.handler.responses["0"][1] = "onecomplete"
self.handler.responses["0"][3] = None
self.handler.responses["1"] = ["null", "null", "goodresp"]
self.handler.responses["1"] = [None, None, "goodresp"]
self.handler.paths["0"] = [1, 3]
self.handler.paths["1"] = [2]
# This is probably overkill... but set it anyway
@ -476,10 +478,10 @@ class PathSatisfiedTest(unittest.TestCase):
def test_satisfied_true(self):
dom = ["0", "1", "2", "3", "4"]
self.handler.paths[dom[0]] = [1, 2]
self.handler.responses[dom[0]] = ["null", "sat", "sat2", "null"]
self.handler.responses[dom[0]] = [None, "sat", "sat2", None]
self.handler.paths[dom[1]] = [0]
self.handler.responses[dom[1]] = ["sat", None, None, "null"]
self.handler.responses[dom[1]] = ["sat", None, None, None]
self.handler.paths[dom[2]] = [0]
self.handler.responses[dom[2]] = ["sat"]
@ -494,46 +496,104 @@ class PathSatisfiedTest(unittest.TestCase):
self.assertTrue(self.handler._path_satisfied(dom[i]))
def test_not_satisfied(self):
dom = ["0", "1", "2", "3", "4"]
dom = ["0", "1", "2"]
self.handler.paths[dom[0]] = [1, 2]
self.handler.responses[dom[0]] = ["sat1", "null", "sat2", "null"]
self.handler.responses[dom[0]] = ["sat1", None, "sat2", None]
self.handler.paths[dom[1]] = [0]
self.handler.responses[dom[1]] = [None, "null", "null", "null"]
self.handler.responses[dom[1]] = [None, None, None, None]
self.handler.paths[dom[2]] = [0]
self.handler.responses[dom[2]] = [None]
self.handler.paths[dom[3]] = [0]
self.handler.responses[dom[3]] = ["null"]
for i in xrange(4):
for i in xrange(3):
self.assertFalse(self.handler._path_satisfied(dom[i]))
class MutuallyExclusiveTest(unittest.TestCase):
"""Tests for letsencrypt.client.auth_handler.mutually_exclusive."""
# pylint: disable=invalid-name,missing-docstring,too-few-public-methods
class A(object):
pass
class B(object):
pass
class C(object):
pass
class D(C):
pass
@classmethod
def _call(cls, chall1, chall2, different=False):
from letsencrypt.client.auth_handler import mutually_exclusive
return mutually_exclusive(chall1, chall2, groups=frozenset([
frozenset([cls.A, cls.B]), frozenset([cls.A, cls.C]),
]), different=different)
def test_group_members(self):
self.assertFalse(self._call(self.A(), self.B()))
self.assertFalse(self._call(self.A(), self.C()))
def test_cross_group(self):
self.assertTrue(self._call(self.B(), self.C()))
def test_same_type(self):
self.assertFalse(self._call(self.A(), self.A(), different=False))
self.assertTrue(self._call(self.A(), self.A(), different=True))
# in particular...
obj = self.A()
self.assertFalse(self._call(obj, obj, different=False))
self.assertTrue(self._call(obj, obj, different=True))
def test_subclass(self):
self.assertFalse(self._call(self.A(), self.D()))
self.assertFalse(self._call(self.D(), self.A()))
class IsPreferredTest(unittest.TestCase):
"""Tests for letsencrypt.client.auth_handler.is_preferred."""
@classmethod
def _call(cls, chall, satisfied):
from letsencrypt.client.auth_handler import is_preferred
return is_preferred(chall, satisfied, exclusive_groups=frozenset([
frozenset([challenges.DVSNI, challenges.SimpleHTTPS]),
frozenset([challenges.DNS, challenges.SimpleHTTPS]),
]))
def test_empty_satisfied(self):
self.assertTrue(self._call(acme_util.DNS, frozenset()))
def test_mutually_exclusvie(self):
self.assertFalse(
self._call(acme_util.DVSNI, frozenset([acme_util.SIMPLE_HTTPS])))
def test_mutually_exclusive_same_type(self):
self.assertTrue(
self._call(acme_util.DVSNI, frozenset([acme_util.DVSNI])))
def gen_auth_resp(chall_list):
"""Generate a dummy authorization response."""
return ["%s%s" % (chall.__class__.__name__, chall.domain)
for chall in chall_list]
def gen_path(str_list, challenges):
def gen_path(required, challs):
"""Generate a path for challenge messages
:param required:
:param list str_list: challenge message types (:class:`str`)
:param dict challenges: ACME challenge messages
:param challs: ACME challenge messages
:return: :class:`list` of :class:`int`
"""
path = []
for i, chall in enumerate(challenges):
for str_chall in str_list:
if chall["type"] == str_chall:
path.append(i)
continue
return path
return [challs.index(chall) for chall in required]
if __name__ == "__main__":
unittest.main()

View file

@ -1,57 +0,0 @@
"""Tests for challenge_util."""
import os
import pkg_resources
import re
import unittest
import M2Crypto
from letsencrypt.acme import jose
from letsencrypt.client import challenge_util
from letsencrypt.client import constants
from letsencrypt.client import le_util
class DvsniGenCertTest(unittest.TestCase):
# pylint: disable=too-few-public-methods
"""Tests for letsencrypt.client.challenge_util.dvsni_gen_cert."""
def test_standard(self):
"""Basic test for straightline code."""
domain = "example.com"
dvsni_r = "r_value"
r_b64 = jose.b64encode(dvsni_r)
pem = pkg_resources.resource_string(
__name__, os.path.join("testdata", "rsa256_key.pem"))
key = le_util.Key("path", pem)
nonce = "12345ABCDE"
cert_pem, s_b64 = self._call(domain, r_b64, nonce, key)
# pylint: disable=protected-access
ext = challenge_util._dvsni_gen_ext(
dvsni_r, jose.b64decode(s_b64))
self._standard_check_cert(cert_pem, domain, nonce, ext)
def _standard_check_cert(self, pem, domain, nonce, ext):
"""Check the certificate fields."""
dns_regex = r"DNS:([^, $]*)"
cert = M2Crypto.X509.load_cert_string(pem)
self.assertEqual(
cert.get_subject().CN, nonce + constants.DVSNI_DOMAIN_SUFFIX)
sans = cert.get_ext("subjectAltName").get_value()
exp_sans = set([nonce + constants.DVSNI_DOMAIN_SUFFIX, domain, ext])
act_sans = set(re.findall(dns_regex, sans))
self.assertEqual(exp_sans, act_sans)
@classmethod
def _call(cls, name, r_b64, nonce, key):
from letsencrypt.client.challenge_util import dvsni_gen_cert
return dvsni_gen_cert(name, r_b64, nonce, key)
if __name__ == "__main__":
unittest.main()

View file

@ -3,7 +3,9 @@ import unittest
import mock
from letsencrypt.client import challenge_util
from letsencrypt.acme import challenges
from letsencrypt.client import achallenges
from letsencrypt.client import errors
@ -19,31 +21,29 @@ class PerformTest(unittest.TestCase):
name="rec_token_perform", side_effect=gen_client_resp)
def test_rec_token1(self):
token = challenge_util.RecTokenChall("0")
token = achallenges.RecoveryToken(chall=None, domain="0")
responses = self.auth.perform([token])
self.assertEqual(responses, ["RecTokenChall0"])
self.assertEqual(responses, ["RecoveryToken0"])
def test_rec_token5(self):
tokens = []
for i in xrange(5):
tokens.append(challenge_util.RecTokenChall(str(i)))
tokens.append(achallenges.RecoveryToken(chall=None, domain=str(i)))
responses = self.auth.perform(tokens)
self.assertEqual(len(responses), 5)
for i in xrange(5):
self.assertEqual(responses[i], "RecTokenChall%d" % i)
self.assertEqual(responses[i], "RecoveryToken%d" % i)
def test_unexpected(self):
unexpected = challenge_util.DvsniChall(
"0", "rb64", "123", "invalid_key")
self.assertRaises(
errors.LetsEncryptClientAuthError, self.auth.perform, [unexpected])
errors.LetsEncryptClientAuthError, self.auth.perform, [
achallenges.DVSNI(chall=None, domain="0", key="invalid_key")])
def test_chall_pref(self):
self.assertEqual(
self.auth.get_chall_pref("example.com"), ["recoveryToken"])
self.auth.get_chall_pref("example.com"), [challenges.RecoveryToken])
class CleanupTest(unittest.TestCase):
@ -58,8 +58,8 @@ class CleanupTest(unittest.TestCase):
self.auth.rec_token.cleanup = self.mock_cleanup
def test_rec_token2(self):
token1 = challenge_util.RecTokenChall("0")
token2 = challenge_util.RecTokenChall("1")
token1 = achallenges.RecoveryToken(chall=None, domain="0")
token2 = achallenges.RecoveryToken(chall=None, domain="1")
self.auth.cleanup([token1, token2])
@ -67,8 +67,8 @@ class CleanupTest(unittest.TestCase):
[mock.call(token1), mock.call(token2)])
def test_unexpected(self):
token = challenge_util.RecTokenChall("0")
unexpected = challenge_util.DvsniChall("0", "rb64", "123", "dummy_key")
token = achallenges.RecoveryToken(chall=None, domain="0")
unexpected = achallenges.DVSNI(chall=None, domain="0", key="dummy_key")
self.assertRaises(errors.LetsEncryptClientAuthError,
self.auth.cleanup, [token, unexpected])

View file

@ -6,7 +6,9 @@ import tempfile
import mock
from letsencrypt.client import challenge_util
from letsencrypt.acme import challenges
from letsencrypt.client import achallenges
class RecoveryTokenTest(unittest.TestCase):
@ -36,34 +38,37 @@ class RecoveryTokenTest(unittest.TestCase):
self.rec_token.store_token("example3.com", 333)
self.assertFalse(self.rec_token.requires_human("example3.com"))
self.rec_token.cleanup(challenge_util.RecTokenChall("example3.com"))
self.rec_token.cleanup(achallenges.RecoveryToken(
chall=None, domain="example3.com"))
self.assertTrue(self.rec_token.requires_human("example3.com"))
# Shouldn't throw an error
self.rec_token.cleanup(challenge_util.RecTokenChall("example4.com"))
self.rec_token.cleanup(achallenges.RecoveryToken(
chall=None, domain="example4.com"))
# SHOULD throw an error (OSError other than nonexistent file)
self.assertRaises(
OSError, self.rec_token.cleanup,
challenge_util.RecTokenChall("a"+"r"*10000+".com"))
achallenges.RecoveryToken(chall=None, domain="a"+"r"*10000+".com"))
def test_perform_stored(self):
self.rec_token.store_token("example4.com", 444)
response = self.rec_token.perform(
challenge_util.RecTokenChall("example4.com"))
achallenges.RecoveryToken(chall=None, domain="example4.com"))
self.assertEqual(response, {"type": "recoveryToken", "token": "444"})
self.assertEqual(
response, challenges.RecoveryTokenResponse(token="444"))
@mock.patch("letsencrypt.client.recovery_token.zope.component.getUtility")
def test_perform_not_stored(self, mock_input):
mock_input().input.side_effect = [(0, "555"), (1, "000")]
response = self.rec_token.perform(
challenge_util.RecTokenChall("example5.com"))
self.assertEqual(response, {"type": "recoveryToken", "token": "555"})
achallenges.RecoveryToken(chall=None, domain="example5.com"))
self.assertEqual(
response, challenges.RecoveryTokenResponse(token="555"))
response = self.rec_token.perform(
challenge_util.RecTokenChall("example6.com"))
achallenges.RecoveryToken(chall=None, domain="example6.com"))
self.assertTrue(response is None)

View file

@ -9,9 +9,9 @@ import mock
import OpenSSL.crypto
import OpenSSL.SSL
from letsencrypt.acme import jose
from letsencrypt.acme import challenges
from letsencrypt.client import challenge_util
from letsencrypt.client import achallenges
from letsencrypt.client import le_util
@ -53,8 +53,8 @@ class ChallPrefTest(unittest.TestCase):
self.authenticator = StandaloneAuthenticator()
def test_chall_pref(self):
self.assertEqual(
self.authenticator.get_chall_pref("example.com"), ["dvsni"])
self.assertEqual(self.authenticator.get_chall_pref("example.com"),
[challenges.DVSNI])
class SNICallbackTest(unittest.TestCase):
@ -63,11 +63,12 @@ class SNICallbackTest(unittest.TestCase):
from letsencrypt.client.standalone_authenticator import \
StandaloneAuthenticator
self.authenticator = StandaloneAuthenticator()
name, r_b64 = "example.com", jose.b64encode("x" * 32)
test_key = pkg_resources.resource_string(
__name__, "testdata/rsa256_key.pem")
nonce, key = "abcdef", le_util.Key("foo", test_key)
self.cert = challenge_util.dvsni_gen_cert(name, r_b64, nonce, key)[0]
key = le_util.Key("foo", test_key)
self.cert = achallenges.DVSNI(
chall=challenges.DVSNI(r="x"*32, nonce="abcdef"),
domain="example.com", key=key).gen_cert_and_response()[0]
private_key = OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, key.pem)
self.authenticator.private_key = private_key
@ -260,80 +261,71 @@ class PerformTest(unittest.TestCase):
StandaloneAuthenticator
self.authenticator = StandaloneAuthenticator()
def test_perform_when_already_listening(self):
test_key = pkg_resources.resource_string(
__name__, "testdata/rsa256_key.pem")
key = le_util.Key("something", test_key)
chall1 = challenge_util.DvsniChall(
"foo.example.com", "whee", "foononce", key)
self.key = le_util.Key("something", test_key)
self.achall1 = achallenges.DVSNI(
chall=challenges.DVSNI(r="whee", nonce="foo"),
domain="foo.example.com", key=self.key)
self.achall2 = achallenges.DVSNI(
chall=challenges.DVSNI(r="whee", nonce="bar"),
domain="bar.example.com", key=self.key)
bad_achall = ("This", "Represents", "A Non-DVSNI", "Challenge")
self.achalls = [self.achall1, self.achall2, bad_achall]
def test_perform_when_already_listening(self):
self.authenticator.already_listening = mock.Mock()
self.authenticator.already_listening.return_value = True
result = self.authenticator.perform([chall1])
result = self.authenticator.perform([self.achall1])
self.assertEqual(result, [None])
def test_can_perform(self):
"""What happens if start_listener() returns True."""
test_key = pkg_resources.resource_string(
__name__, "testdata/rsa256_key.pem")
key = le_util.Key("something", test_key)
chall1 = challenge_util.DvsniChall(
"foo.example.com", "whee", "foononce", key)
chall2 = challenge_util.DvsniChall(
"bar.example.com", "whee", "barnonce", key)
bad_chall = ("This", "Represents", "A Non-DVSNI", "Challenge")
self.authenticator.start_listener = mock.Mock()
self.authenticator.start_listener.return_value = True
result = self.authenticator.perform([chall1, chall2, bad_chall])
result = self.authenticator.perform(self.achalls)
self.assertEqual(len(self.authenticator.tasks), 2)
self.assertTrue(
self.authenticator.tasks.has_key("foononce.acme.invalid"))
self.authenticator.tasks.has_key(self.achall1.nonce_domain))
self.assertTrue(
self.authenticator.tasks.has_key("barnonce.acme.invalid"))
self.authenticator.tasks.has_key(self.achall2.nonce_domain))
self.assertTrue(isinstance(result, list))
self.assertEqual(len(result), 3)
self.assertTrue(isinstance(result[0], dict))
self.assertTrue(isinstance(result[1], dict))
self.assertTrue(isinstance(result[0], challenges.ChallengeResponse))
self.assertTrue(isinstance(result[1], challenges.ChallengeResponse))
self.assertFalse(result[2])
self.assertTrue(result[0].has_key("s"))
self.assertTrue(result[1].has_key("s"))
self.authenticator.start_listener.assert_called_once_with(443, key)
self.authenticator.start_listener.assert_called_once_with(443, self.key)
def test_cannot_perform(self):
"""What happens if start_listener() returns False."""
test_key = pkg_resources.resource_string(
__name__, "testdata/rsa256_key.pem")
key = le_util.Key("something", test_key)
chall1 = challenge_util.DvsniChall(
"foo.example.com", "whee", "foononce", key)
chall2 = challenge_util.DvsniChall(
"bar.example.com", "whee", "barnonce", key)
bad_chall = ("This", "Represents", "A Non-DVSNI", "Challenge")
self.authenticator.start_listener = mock.Mock()
self.authenticator.start_listener.return_value = False
result = self.authenticator.perform([chall1, chall2, bad_chall])
result = self.authenticator.perform(self.achalls)
self.assertEqual(len(self.authenticator.tasks), 2)
self.assertTrue(
self.authenticator.tasks.has_key("foononce.acme.invalid"))
self.authenticator.tasks.has_key(self.achall1.nonce_domain))
self.assertTrue(
self.authenticator.tasks.has_key("barnonce.acme.invalid"))
self.authenticator.tasks.has_key(self.achall2.nonce_domain))
self.assertTrue(isinstance(result, list))
self.assertEqual(len(result), 3)
self.assertEqual(result, [None, None, False])
self.authenticator.start_listener.assert_called_once_with(443, key)
self.authenticator.start_listener.assert_called_once_with(
443, self. key)
def test_perform_with_pending_tasks(self):
self.authenticator.tasks = {"foononce.acme.invalid": "cert_data"}
extra_challenge = challenge_util.DvsniChall("a", "b", "c", "d")
extra_achall = achallenges.DVSNI(chall="a", domain="b", key="c")
self.assertRaises(
ValueError, self.authenticator.perform, [extra_challenge])
ValueError, self.authenticator.perform, [extra_achall])
def test_perform_without_challenge_list(self):
extra_challenge = challenge_util.DvsniChall("a", "b", "c", "d")
extra_achall = achallenges.DVSNI(chall="a", domain="b", key="c")
# This is wrong because a challenge must be specified.
self.assertRaises(ValueError, self.authenticator.perform, [])
# This is wrong because it must be a list, not a bare challenge.
self.assertRaises(
ValueError, self.authenticator.perform, extra_challenge)
ValueError, self.authenticator.perform, extra_achall)
# This is wrong because the list must contain at least one challenge.
self.assertRaises(
ValueError, self.authenticator.perform, range(20))
@ -430,12 +422,13 @@ class DoChildProcessTest(unittest.TestCase):
from letsencrypt.client.standalone_authenticator import \
StandaloneAuthenticator
self.authenticator = StandaloneAuthenticator()
name, r_b64 = "example.com", jose.b64encode("x" * 32)
test_key = pkg_resources.resource_string(
__name__, "testdata/rsa256_key.pem")
nonce, key = "abcdef", le_util.Key("foo", test_key)
key = le_util.Key("foo", test_key)
self.key = key
self.cert = challenge_util.dvsni_gen_cert(name, r_b64, nonce, key)[0]
self.cert = achallenges.DVSNI(
chall=challenges.DVSNI(r="x"*32, nonce="abcdef"),
domain="example.com", key=key).gen_cert_and_response()[0]
private_key = OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, key.pem)
self.authenticator.private_key = private_key
@ -522,7 +515,10 @@ class CleanupTest(unittest.TestCase):
from letsencrypt.client.standalone_authenticator import \
StandaloneAuthenticator
self.authenticator = StandaloneAuthenticator()
self.authenticator.tasks = {"foononce.acme.invalid": "stuff"}
self.achall = achallenges.DVSNI(
chall=challenges.DVSNI(r="whee", nonce="foononce"),
domain="foo.example.com", key="key")
self.authenticator.tasks = {self.achall.nonce_domain: "stuff"}
self.authenticator.child_pid = 12345
@mock.patch("letsencrypt.client.standalone_authenticator.os.kill")
@ -530,16 +526,17 @@ class CleanupTest(unittest.TestCase):
def test_cleanup(self, mock_sleep, mock_kill):
mock_sleep.return_value = None
mock_kill.return_value = None
chall = challenge_util.DvsniChall(
"foo.example.com", "whee", "foononce", "key")
self.authenticator.cleanup([chall])
self.authenticator.cleanup([self.achall])
mock_kill.assert_called_once_with(12345, signal.SIGINT)
mock_sleep.assert_called_once_with(1)
def test_bad_cleanup(self):
chall = challenge_util.DvsniChall(
"bad.example.com", "whee", "badnonce", "key")
self.assertRaises(ValueError, self.authenticator.cleanup, [chall])
self.assertRaises(
ValueError, self.authenticator.cleanup, [achallenges.DVSNI(
chall=challenges.DVSNI(r="whee", nonce="badnonce"),
domain="bad.example.com", key="key")])
class MoreInfoTest(unittest.TestCase):