Created auth_handler and client_authenticator. Use dicts for all messages and keep client clean.

This commit is contained in:
James Kasten 2015-01-10 05:19:22 -08:00
parent 8f062ddc54
commit be5ae7ae9a
9 changed files with 506 additions and 334 deletions

View file

@ -0,0 +1,5 @@
:mod:`letsencrypt.client.auth_handler`
--------------------------------
.. automodule:: letsencrypt.client.auth_handler
:members:

View file

@ -0,0 +1,5 @@
:mod:`letsencrypt.client.client_authenticator`
--------------------------------
.. automodule:: letsencrypt.client.client_authenticator
:members:

View file

@ -60,7 +60,7 @@ INVALID_EXT = ".acme.invalid"
EXCLUSIVE_CHALLENGES = [frozenset(["dvsni", "simpleHttps"])]
"""Mutually Exclusive Challenges - only solve 1"""
AUTH_CHALLENGES = frozenset(["dvsni", "simpleHttps", "dns"])
DV_CHALLENGES = frozenset(["dvsni", "simpleHttps", "dns"])
"""These are challenges that must be solved by an Authenticator object"""
CLIENT_CHALLENGES = frozenset(

View file

@ -0,0 +1,418 @@
"""ACME AuthHandler."""
import logging
import sys
import zope.component
from letsencrypt.client import acme
from letsencrypt.client import CONFIG
from letsencrypt.client import challenge_util
from letsencrypt.client import errors
class AuthHandler(object):
"""ACME Authorization Handler for a client.
:ivar dv_auth: Authenticator capable of solving CONFIG.DV_CHALLENGES
:type dv_auth: :class:`letsencrypt.client.interfaces.IAuthenticator`
:ivar client_auth: Authenticator capable of solving CONFIG.CLIENT_CHALLENGES
:type client_auth: :class:`letsencrypt.client.interfaces.IAuthenticator`
:ivar network: Network object for sending and receiving authorization
messages
:type network: :class:`letsencrypt.client.network.Network`
:ivar list domains: list of str domains to get authorization
:ivar dict authkey: Authorized Keys for each domain.
values are of type :class:`letsencrypt.client.client.Client.Key`
:ivar dict responses: keys: domain, values: list of dict responses
:ivar dict msgs: ACME Challenge messages with domain as a key
:ivar dict paths: optimal path for authorization. eg. paths[domain]
:ivar dict dv_c: Keys - domain, Values are DV challenges in the form of
:class:`letsencrypt.client.challenge_util.IndexedChall`
:ivar dict client_c: Keys - domain, Values are Client challenges in the form
of :class:`letsencrypt.client.challenge_util.IndexedChall`
"""
def __init__(self, dv_auth, client_auth, network):
self.dv_auth = dv_auth
self.client_auth = client_auth
self.network = network
self.domains = []
self.authkey = dict()
self.responses = dict()
self.msgs = dict()
self.paths = dict()
self.dv_c = dict()
self.client_c = dict()
def add_chall_msg(self, domain, msg, authkey):
"""Add a challenge message to the AuthHandler.
:param str domain: domain for authorization
:param dict msg: ACME challenge message
:param authkey: authorized key for the challenge
:type authkey: :class:`letsencrypt.client.client.Client.Key`
"""
if domain in self.domains:
raise errors.LetsEncryptAuthHandlerError(
"Multiple Challenges for the same domain is not supported.")
self.domains.append(domain)
self.responses[domain] = ["null"] * len(msg["challenges"])
self.msgs[domain] = msg
self.authkey[domain] = authkey
def get_authorizations(self):
"""Retreive all authorizations for challenges.
:raises LetsEncryptAuthHandlerError: If unable to retrieve all
authorizations
"""
progress = True
while self.msgs and progress:
progress = False
self._satisfy_challenges()
delete_list = []
for dom in self.domains:
if self._path_satisfied(dom):
self.acme_authorization(dom)
delete_list.append(dom)
# This avoids modifying while iterating over the list
if delete_list:
self._cleanup_state(delete_list)
progress = True
if not progress:
raise errors.LetsEncryptAuthHandlerError(
"Unable to solve challenges for requested names.")
def acme_authorization(self, domain):
"""Handle ACME "authorization" phase.
:param str domain: domain that is requesting authorization
:returns: ACME "authorization" message.
:rtype: dict
"""
try:
return self.network.send_and_receive_expected(
acme.authorization_request(
self.msgs[domain]["sessionID"],
domain,
self.msgs[domain]["nonce"],
self.responses[domain],
self.authkey[domain].pem),
"authorization")
except errors.LetsEncryptClientError as err:
logging.fatal(str(err))
logging.fatal(
"Failed Authorization procedure - cleaning up challenges")
sys.exit(1)
finally:
self._cleanup_challenges(domain)
def _path_satisfied(self, dom):
"""Returns whether a path has been completely satisfied."""
return all(
None != self.responses[dom][i] and "null" != self.responses[dom][i]
for i in self.paths[dom])
def _satisfy_challenges(self):
"""Attempt to satisfy all saved challenge messages."""
logging.info("Performing the following challenges:")
for dom in self.domains:
self.paths[dom] = gen_challenge_path(
self.msgs[dom]["challenges"],
self._get_chall_pref(dom),
self.msgs[dom].get("combinations", None))
self.dv_c[dom], self.client_c[dom] = self._challenge_factory(
dom, self.paths[dom])
# Flatten challs for authenticator functions and remove index
# Order is important here as we will not expose the outside
# Authenticator to our own indices.
flat_client = []
flat_auth = []
for dom in self.domains:
flat_client.extend(ichall.chall for ichall in self.client_c[dom])
flat_auth.extend(ichall.chall for ichall in self.dv_c[dom])
client_resp = self.client_auth.perform(flat_client)
dv_resp = self.dv_auth.perform(flat_auth)
# Assemble Responses
self._assign_responses(client_resp, self.client_c)
self._assign_responses(dv_resp, self.dv_c)
def _assign_responses(self, flat_list, ichall_dict):
"""Assign responses from flat_list back to the IndexedChall dicts."""
flat_index = 0
for dom in self.domains:
for ichall in ichall_dict[dom]:
self.responses[dom][ichall.index] = flat_list[flat_index]
flat_index += 1
def _get_chall_pref(self, domain):
"""Return list of challenge preferences."""
chall_prefs = self.client_auth.get_chall_pref(domain)
chall_prefs.extend(self.dv_auth.get_chall_pref(domain))
return chall_prefs
def _cleanup_challenges(self, domain):
"""Cleanup configuration challenges
:param str domain: domain for which to clean up challenges
"""
logging.info("Cleaning up challenges...")
self.dv_auth.cleanup(self.dv_c[domain])
self.client_auth.cleanup(self.client_c[domain])
def _cleanup_state(self, delete_list):
"""Cleanup state after an authorization is received.
:param list delete_list: list of domains in str form
"""
for domain in delete_list:
del self.msgs[domain]
del self.responses[domain]
del self.paths[domain]
del self.authkey[domain]
del self.client_c[domain]
del self.dv_c[domain]
self.domains.remove(domain)
def _challenge_factory(self, domain, path):
"""Construct Namedtuple Challenges
:param str domain: domain of the enrollee
:param list path: List of indices from `challenges`.
:returns: dv_chall, list of
:class:`letsencrypt.client.challenge_util.IndexedChall`
client_chall, list of
:class:`letsencrypt.client.challenge_util.IndexedChall`
:rtype: tuple
:raises errors.LetsEncryptClientError: If Challenge type is not
recognized
"""
challenges = self.msgs[domain]["challenges"]
dv_chall = []
client_chall = []
for index in self.paths[domain]:
chall = challenges[index]
# Authenticator Challenges
if chall["type"] in CONFIG.DV_CHALLENGES:
dv_chall.append(challenge_util.IndexedChall(
self._construct_dv_chall(chall, domain), index))
# Client Challenges
elif chall["type"] in CONFIG.CLIENT_CHALLENGES:
client_chall.append(challenge_util.IndexedChall(
self._construct_client_chall(chall, domain), index))
else:
raise errors.LetsEncryptClientError(
"Received unrecognized challenge of type: "
"%s" % chall["type"])
return dv_chall, client_chall
def _construct_dv_chall(self, chall, domain):
"""Construct Auth Type Challenges.
:param dict chall: Single challenge
:param str domain: challenge's domain
:returns: challenge_util named tuple Chall object
:rtype: `collections.namedtuple`
:raises errors.LetsEncryptClientError: If unimplemented challenge exists
"""
if chall["type"] == "dvsni":
logging.info(" DVSNI challenge for name %s.", domain)
return challenge_util.DvsniChall(
domain, str(chall["r"]), str(chall["nonce"]),
self.authkey[domain])
elif chall["type"] == "simpleHttps":
logging.info(" SimpleHTTPS challenge for name %s.", domain)
return challenge_util.SimpleHttpsChall(
domain, str(chall["token"]), self.authkey[domain])
elif chall["type"] == "dns":
logging.info(" DNS challenge for name %s.", domain)
return challenge_util.DnsChall(
domain, str(chall["token"]), self.authkey[domain])
else:
raise errors.LetsEncryptClientError(
"Unimplemented Auth Challenge: %s" % chall["type"])
def _construct_client_chall(self, chall, domain):
"""Construct Client Type Challenges.
:param dict chall: Single challenge
:param str domain: challenge's domain
:returns: challenge_util named tuple Chall object
:rtype: `collections.namedtuple`
:raises errors.LetsEncryptClientError: If unimplemented challenge exists
"""
if chall["type"] == "recoveryToken":
logging.info(" Recovery Token Challenge for name: %s.", domain)
return challenge_util.RecTokenChall(domain)
elif chall["type"] == "recoveryContact":
logging.info(" Recovery Contact Challenge for name: %s.", domain)
return challenge_util.RecContactChall(
domain,
chall.get("activationURL", None),
chall.get("successURL", None),
chall.get("contact", None))
elif chall["type"] == "proofOfPossession":
logging.info(" Proof-of-Possession Challenge for name: "
"%s", domain)
return challenge_util.PopChall(
domain, chall["alg"], chall["nonce"], chall["hints"])
else:
raise errors.LetsEncryptClientError(
"Unimplemented Client Challenge: %s" % chall["type"])
def gen_challenge_path(challenges, preferences, combos=None):
"""Generate a plan to get authority over the identity.
.. todo:: Make sure that the challenges are feasible...
Example: Do you have the recovery key?
:param list challenges: A list of challenges from ACME "challenge"
server message to be fulfilled by the client in order to prove
possession of the identifier.
:param list preferences: List of challenge preferences for domain
:param combos: A collection of sets of challenges from ACME
"challenge" server message ("combinations"), each of which would
be sufficient to prove possession of the identifier.
:type combos: list or None
:returns: List of indices from `challenges`.
:rtype: list
"""
if combos:
return _find_smart_path(challenges, preferences, combos)
else:
return _find_dumb_path(challenges, preferences)
def _find_smart_path(challenges, preferences, combos):
"""Find challenge path with server hints.
Can be called if combinations is included. Function uses a simple
ranking system to choose the combo with the lowest cost.
:param list challenges: A list of challenges from ACME "challenge"
server message to be fulfilled by the client in order to prove
possession of the identifier.
:param combos: A collection of sets of challenges from ACME
"challenge" server message ("combinations"), each of which would
be sufficient to prove possession of the identifier.
:type combos: list or None
:returns: List of indices from `challenges`.
:rtype: list
"""
chall_cost = {}
max_cost = 0
for i, chall in enumerate(preferences):
chall_cost[chall] = i
max_cost += i
best_combo = []
# Set above completing all of the available challenges
best_combo_cost = max_cost + 1
combo_total = 0
for combo in combos:
for challenge_index in combo:
combo_total += chall_cost.get(challenges[
challenge_index]["type"], max_cost)
if combo_total < best_combo_cost:
best_combo = combo
best_combo_cost = combo_total
combo_total = 0
if not best_combo:
logging.fatal("Client does not support any combination of "
"challenges to satisfy ACME server")
sys.exit(22)
return best_combo
def _find_dumb_path(challenges, preferences):
"""Find challenge path without server hints.
Should be called if the combinations hint is not included by the
server. This function returns the best path that does not contain
multiple mutually exclusive challenges.
:param list challenges: A list of challenges from ACME "challenge"
server message to be fulfilled by the client in order to prove
possession of the identifier.
:returns: List of indices from `challenges`.
:rtype: list
"""
# Add logic for a crappy server
# Choose a DV
path = []
for pref_c in preferences:
for i, offered_challenge in enumerate(challenges):
if (pref_c == offered_challenge["type"] and
is_preferred(offered_challenge["type"], path)):
path.append((i, offered_challenge["type"]))
return [i for (i, _) in path]
def is_preferred(offered_challenge_type, path):
"""Return whether or not the challenge is preferred in path."""
for _, challenge_type in path:
for mutually_exclusive in CONFIG.EXCLUSIVE_CHALLENGES:
# Second part is in case we eventually allow multiple names
# to be challenges at the same time
if (challenge_type in mutually_exclusive and
offered_challenge_type in mutually_exclusive and
challenge_type != offered_challenge_type):
return False
return True

View file

@ -1,118 +0,0 @@
"""ACME challenge."""
import logging
import sys
from letsencrypt.client import CONFIG
def gen_challenge_path(challenges, preferences, combos=None):
"""Generate a plan to get authority over the identity.
.. todo:: Make sure that the challenges are feasible...
Example: Do you have the recovery key?
:param list challenges: A list of challenges from ACME "challenge"
server message to be fulfilled by the client in order to prove
possession of the identifier.
:param combos: A collection of sets of challenges from ACME
"challenge" server message ("combinations"), each of which would
be sufficient to prove possession of the identifier.
:type combos: list or None
:returns: List of indices from `challenges`.
:rtype: list
"""
if combos:
return _find_smart_path(challenges, preferences, combos)
else:
return _find_dumb_path(challenges, preferences)
def _find_smart_path(challenges, preferences, combos):
"""Find challenge path with server hints.
Can be called if combinations is included. Function uses a simple
ranking system to choose the combo with the lowest cost.
:param list challenges: A list of challenges from ACME "challenge"
server message to be fulfilled by the client in order to prove
possession of the identifier.
:param combos: A collection of sets of challenges from ACME
"challenge" server message ("combinations"), each of which would
be sufficient to prove possession of the identifier.
:type combos: list or None
:returns: List of indices from `challenges`.
:rtype: list
"""
chall_cost = {}
max_cost = 0
for i, chall in enumerate(preferences):
chall_cost[chall] = i
max_cost += i
best_combo = []
# Set above completing all of the available challenges
best_combo_cost = max_cost + 1
combo_total = 0
for combo in combos:
for challenge_index in combo:
combo_total += chall_cost.get(challenges[
challenge_index]["type"], max_cost)
if combo_total < best_combo_cost:
best_combo = combo
best_combo_cost = combo_total
combo_total = 0
if not best_combo:
logging.fatal("Client does not support any combination of "
"challenges to satisfy ACME server")
sys.exit(22)
return best_combo
def _find_dumb_path(challenges, preferences):
"""Find challenge path without server hints.
Should be called if the combinations hint is not included by the
server. This function returns the best path that does not contain
multiple mutually exclusive challenges.
:param list challenges: A list of challenges from ACME "challenge"
server message to be fulfilled by the client in order to prove
possession of the identifier.
:returns: List of indices from `challenges`.
:rtype: list
"""
# Add logic for a crappy server
# Choose a DV
path = []
for pref_c in preferences:
for i, offered_challenge in enumerate(challenges):
if (pref_c == offered_challenge["type"] and
is_preferred(offered_challenge["type"], path)):
path.append((i, offered_challenge["type"]))
return [i for (i, _) in path]
def is_preferred(offered_challenge_type, path):
"""Return whether or not the challenge is preferred in path."""
for _, challenge_type in path:
for mutually_exclusive in CONFIG.EXCLUSIVE_CHALLENGES:
# Second part is in case we eventually allow multiple names
# to be challenges at the same time
if (challenge_type in mutually_exclusive and
offered_challenge_type in mutually_exclusive and
challenge_type != offered_challenge_type):
return False
return True

View file

@ -20,6 +20,9 @@ RecContactChall = collections.namedtuple(
RecTokenChall = collections.namedtuple("RecTokenChall", "domain")
PopChall = collections.namedtuple("PopChall", "domain, alg, nonce, hints")
# Helper Challenge Wrapper - Can be used to maintain the proper position of
# the response within a larger challenge list
IndexedChall = collections.namedtuple("IndexedChall", "chall, index")
# DVSNI Challenge functions
def dvsni_gen_cert(filepath, name, r_b64, nonce, key):

View file

@ -12,8 +12,9 @@ import M2Crypto
import zope.component
from letsencrypt.client import acme
from letsencrypt.client import challenge
from letsencrypt.client import auth_handler
from letsencrypt.client import challenge_util
from letsencrypt.client import client_authenticator
from letsencrypt.client import CONFIG
from letsencrypt.client import crypto_util
from letsencrypt.client import errors
@ -40,9 +41,9 @@ class Client(object):
:ivar authkey: Authorization Key
:type authkey: :class:`letsencrypt.client.client.Client.Key`
:ivar auth: Object that supports the IAuthenticator interface.
`auth` is used specifically for CONFIG.AUTH_CHALLENGES
:type auth: :class:`letsencrypt.client.interfaces.IAuthenticator`
:ivar auth_handler: Object that supports the IAuthenticator interface.
auth_handler contains both a dv_authenticator and a client_authenticator
:type auth_handler: :class:`letsencrypt.client.auth_handler.AuthHandler`
:ivar installer: Object supporting the IInstaller interface.
:type installer: :class:`letsencrypt.client.interfaces.IInstraller`
@ -53,18 +54,26 @@ class Client(object):
Key = collections.namedtuple("Key", "file pem")
CSR = collections.namedtuple("CSR", "file data form")
def __init__(self, server, names, authkey, auth, installer):
"""Initialize a client."""
def __init__(self, server, names, authkey, dv_auth, installer):
"""Initialize a client.
:param str server: CA server to contact
:param dv_auth: IAuthenticator Interface that can solve the
CONFIG.DV_CHALLENGES
:type dv_auth: :class:`letsencrypt.client.interfaces.IAuthenticator`
"""
self.network = network.Network(server)
self.names = names
self.authkey = authkey
sanity_check_names([server] + names)
self.auth = auth
self.installer = installer
self.rec_token = recovery_token.RecoveryToken(server)
client_auth = client_authenticator.ClientAuthenticator(server)
self.auth_handler = auth_handler.AuthHandler(
dv_auth, client_auth, self.network)
def obtain_certificate(self, csr,
cert_path=CONFIG.CERT_PATH,
@ -82,42 +91,13 @@ class Client(object):
:rtype: `tuple` of `str`
"""
challenge_msgs = []
# Request Challenges
for name in self.names:
# Maintaining order of challenge_msgs to names is important
challenge_msgs.append(self.acme_challenge(name))
self.auth_handler.add_chall_msg(
name, self.acme_challenge(name), self.authkey)
# Perform Challenges
# Make sure at least one challenge is solved every round
progress = True
# This outer loop handles cases where the Authenticator cannot solve
# all challenge_msgs at once
while challenge_msgs and progress:
responses, auth_c, client_c = self.verify_identities(challenge_msgs)
progress = False
i = 0
while i < len(responses):
# Get Authorization
if responses[i] is not None:
self.acme_authorization(
challenge_msgs[i], self.names[i],
auth_c[i], client_c[i], responses[i])
# Received authorization, remove challenge from list
# We have also cleaned up challenges... keep index
# in sync
del challenge_msgs[i]
del auth_c[i]
del client_c[i]
del responses[i]
progress = True
else:
i += 1
if not progress:
raise errors.LetsEncryptClientError(
"Unable to solve challenges for requested names.")
# Perform Challenges/Get Authorizations
self.auth_handler.get_authorizations()
# Retrieve certificate
certificate_dict = self.acme_certificate(csr.data)
@ -140,34 +120,6 @@ class Client(object):
return self.network.send_and_receive_expected(
acme.challenge_request(domain), "challenge")
def acme_authorization(
self, challenge_msg, domain, auth_c, client_c, responses):
"""Handle ACME "authorization" phase.
:param dict challenge_msg: ACME "challenge" message.
:param str domain: domain that is requesting authorization
:param list auth_c: auth challenges
:param list client_c: client challenges
:param list responses: Responses to all challenges in challenge_msg
:returns: ACME "authorization" message.
:rtype: dict
"""
try:
return self.network.send_and_receive_expected(
acme.authorization_request(
challenge_msg["sessionID"], domain,
challenge_msg["nonce"], responses, self.authkey.pem),
"authorization")
except errors.LetsEncryptClientError as err:
logging.fatal(str(err))
logging.fatal(
"Failed Authorization procedure - cleaning up challenges")
sys.exit(1)
finally:
self.cleanup_challenges(auth_c, client_c)
def acme_certificate(self, csr_der):
"""Handle ACME "certificate" phase.
@ -277,17 +229,6 @@ class Client(object):
# # TODO enable OCSP Stapling
# continue
def cleanup_challenges(self, auth_c, client_c):
"""Cleanup configuration challenges
:param dict challenges: challenges from a challenge message
"""
logging.info("Cleaning up challenges...")
self.auth.cleanup(auth_c)
# should cleanup client_c
assert not client_c
def verify_identities(self, challenge_msgs):
"""Verify identities.
@ -386,10 +327,6 @@ class Client(object):
responses[msg_num][idx] = flat_resp[flat_index]
flat_index += 1
def _path_satisfied(self, responses, path):
"""Returns whether a path has been completely satisfied."""
return all("null" != responses[i] for i in path)
def store_cert_key(self, cert_file, encrypt=False):
"""Store certificate key.
@ -467,136 +404,6 @@ class Client(object):
vhost.add(host)
return vhost
def challenge_factory(self, domain, challenges, path):
"""
:param str domain: domain of the enrollee
:param list challenges: A list of challenges from ACME "challenge"
server message to be fulfilled by the client in order to prove
possession of the identifier.
:param list path: List of indices from `challenges`.
:returns: auth_chall, list of `collections.namedtuples`
auth_satisfies, list of indices, each associated auth_chall
satisfieswithin the challenge_msg
client_chall, list of `collections.namedtuples`
client_satisfies, list of indices each associated client_chall
satisfies within the challenge_msg
:rtype: tuple
:raises errors.LetsEncryptClientError: If Challenge type is not
recognized
"""
auth_chall = []
# Since a single invocation of SNI challenge can satisfy multiple
# challenges. We must keep track of all the challenges it satisfies
auth_satisfies = []
client_chall = []
client_satisfies = []
domain = str(domain)
for index in path:
chall = challenges[index]
# Authenticator Challenges
if chall["type"] in CONFIG.AUTH_CHALLENGES:
auth_chall.append(self._construct_auth_chall(chall, domain))
auth_satisfies.append(index)
# Client Challenges
elif chall["type"] in CONFIG.CLIENT_CHALLENGES:
client_chall.append(self._construct_client_chall(chall, domain))
client_satisfies.append(index)
else:
raise errors.LetsEncryptClientError(
"Received unrecognized challenge of type: "
"%s" % chall["type"])
return auth_chall, auth_satisfies, client_chall, client_satisfies
def _construct_auth_chall(self, chall, domain):
"""Construct Auth Type Challenges.
:param dict chall: Single challenge
:returns: challenge_util named tuple Chall object
:rtype: `collections.namedtuple`
:raises errors.LetsEncryptClientError: If unimplemented challenge exists
"""
if chall["type"] == "dvsni":
logging.info(" DVSNI challenge for name %s.", domain)
return challenge_util.DvsniChall(
domain, str(chall["r"]), str(chall["nonce"]), self.authkey)
elif chall["type"] == "simpleHttps":
logging.info(" SimpleHTTPS challenge for name %s.", domain)
return challenge_util.SimpleHttpsChall(
domain, str(chall["token"]), self.authkey)
elif chall["type"] == "dns":
logging.info(" DNS challenge for name %s.", domain)
return challenge_util.DnsChall(
domain, str(chall["token"]), self.authkey)
else:
raise errors.LetsEncryptClientError(
"Unimplemented Auth Challenge: %s" % chall["type"])
def _construct_client_chall(self, chall, domain):
"""Construct Client Type Challenges.
:param dict chall: Single challenge
:returns: challenge_util named tuple Chall object
:rtype: `collections.namedtuple`
:raises errors.LetsEncryptClientError: If unimplemented challenge exists
"""
if chall["type"] == "recoveryToken":
logging.info(" Recovery Token Challenge for name: %s.", domain)
return challenge_util.RecTokenChall(domain)
elif chall["type"] == "recoveryContact":
logging.info(" Recovery Contact Challenge for name: %s.", domain)
return challenge_util.RecContactChall(
domain,
chall.get("activationURL", None),
chall.get("successURL", None),
chall.get("contact", None))
elif chall["type"] == "proofOfPossession":
logging.info(" Proof-of-Possession Challenge for name: "
"%s", domain)
return challenge_util.PopChall(
domain, chall["alg"], chall["nonce"], chall["hints"])
else:
raise errors.LetsEncryptClientError(
"Unimplemented Client Challenge: %s" % chall["type"])
# pylint: disable=unused-argument
def get_chall_pref(self, domain):
"""Return list of challenge preferences."""
return ["recoveryToken"]
def perform(self, chall_list):
"""Perform client specific challenges."""
responses = []
for chall in chall_list:
if isinstance(chall, challenge_util.RecTokenChall):
responses.append(self.rec_token.perform(chall))
else:
raise errors.LetsEncryptClientError("Unexpected Challenge")
return responses
def validate_key_csr(privkey, csr):
"""Validate CSR and key files.

View file

@ -0,0 +1,44 @@
import zope.interface
from letsencrypt.client import errors
from letsencrypt.client import interfaces
from letsencrypt.client import recovery_token
class ClientAuthenticator(object):
"""Authenticator for CONFIG.CLIENT_CHALLENGES.
:ivar rec_token: Performs "recoveryToken" challenges
:type rec_token: :class:`letsencrypt.client.recovery_token.RecoveryToken`
"""
zope.interface.implements(interfaces.IAuthenticator)
# This will have an installer soon for get_key/cert purposes
def __init__(self, server):
"""Initialize Client Authenticator.
:param str server: ACME CA Server
"""
self.rec_token = recovery_token.RecoveryToken(server)
def get_chall_pref(self, domain): # pylint: disable=no-member-use
"""Return list of challenge preferences."""
return ["recoveryToken"]
def perform(self, chall_list):
"""Perform client specific challenges."""
responses = []
for chall in chall_list:
if isinstance(chall, challenge_util.RecTokenChall):
responses.append(self.rec_token.perform(chall))
else:
raise errors.LetsEncryptClientAuthError("Unexpected Challenge")
return responses
def cleanup(self, chall_list):
for chall in chall_list:
if isinstance(chall, challenge_util.RecTokenChall):
self.rec_token.cleanup(chall)
else:
raise errors.LetsEncryptClientAuthError("Unexpected Challenge")

View file

@ -5,8 +5,16 @@ class LetsEncryptClientError(Exception):
"""Generic Let's Encrypt client error."""
class LetsEncryptAuthHandlerError(LetsEncryptClientError):
"""Let's Encrypt Auth Handler error."""
class LetsEncryptClientAuthError(LetsEncryptAuthHandlerError):
"""Let's Encrypt Client Authenticator Error."""
class LetsEncryptConfiguratorError(LetsEncryptClientError):
"""Let's Encrypt configurator error."""
"""Let's Encrypt Configurator error."""
class LetsEncryptDvsniError(LetsEncryptConfiguratorError):