mirror of
https://github.com/certbot/certbot.git
synced 2026-06-03 22:08:07 -04:00
Merge remote-tracking branch 'letsencrypt/master'
This commit is contained in:
commit
19ee506e3d
17 changed files with 147 additions and 675 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
|
@ -8,6 +8,7 @@ dist*/
|
|||
/.tox/
|
||||
/releases/
|
||||
letsencrypt.log
|
||||
letsencrypt-auto-source/letsencrypt-auto.sig.lzma.base64
|
||||
|
||||
# coverage
|
||||
.coverage
|
||||
|
|
|
|||
|
|
@ -71,11 +71,12 @@ Plugin Auth Inst Notes
|
|||
=========== ==== ==== ===============================================================
|
||||
apache_ Y Y Automates obtaining and installing a cert with Apache 2.4 on
|
||||
Debian-based distributions with ``libaugeas0`` 1.0+.
|
||||
standalone_ Y N Uses a "standalone" webserver to obtain a cert. This is useful
|
||||
on systems with no webserver, or when direct integration with
|
||||
the local webserver is not supported or not desired.
|
||||
webroot_ Y N Obtains a cert by writing to the webroot directory of an
|
||||
already running webserver.
|
||||
standalone_ Y N Uses a "standalone" webserver to obtain a cert. Requires
|
||||
port 80 or 443 to be available. This is useful on systems
|
||||
with no webserver, or when direct integration with the local
|
||||
webserver is not supported or not desired.
|
||||
manual_ Y N Helps you obtain a cert by giving you instructions to perform
|
||||
domain validation yourself.
|
||||
nginx_ Y Y Very experimental and not included in letsencrypt-auto_.
|
||||
|
|
@ -87,15 +88,16 @@ There are also a number of third-party plugins for the client, provided by other
|
|||
Plugin Auth Inst Notes
|
||||
=========== ==== ==== ===============================================================
|
||||
plesk_ Y Y Integration with the Plesk web hosting tool
|
||||
https://github.com/plesk/letsencrypt-plesk
|
||||
haproxy_ Y Y Integration with the HAProxy load balancer
|
||||
https://code.greenhost.net/open/letsencrypt-haproxy
|
||||
s3front_ Y Y Integration with Amazon CloudFront distribution of S3 buckets
|
||||
https://github.com/dlapiduz/letsencrypt-s3front
|
||||
gandi_ Y Y Integration with Gandi's hosting products and API
|
||||
https://github.com/Gandi/letsencrypt-gandi
|
||||
=========== ==== ==== ===============================================================
|
||||
|
||||
.. _plesk: https://github.com/plesk/letsencrypt-plesk
|
||||
.. _haproxy: https://code.greenhost.net/open/letsencrypt-haproxy
|
||||
.. _s3front: https://github.com/dlapiduz/letsencrypt-s3front
|
||||
.. _gandi: https://github.com/Gandi/letsencrypt-gandi
|
||||
|
||||
Future plugins for IMAP servers, SMTP servers, IRC servers, etc, are likely to
|
||||
be installers but not authenticators.
|
||||
|
||||
|
|
@ -126,7 +128,9 @@ potentially be a separate directory for each domain. When requested a
|
|||
certificate for multiple domains, each domain will use the most recently
|
||||
specified ``--webroot-path``. So, for instance,
|
||||
|
||||
``letsencrypt certonly --webroot -w /var/www/example/ -d www.example.com -d example.com -w /var/www/other -d other.example.net -d another.other.example.net``
|
||||
::
|
||||
|
||||
letsencrypt certonly --webroot -w /var/www/example/ -d www.example.com -d example.com -w /var/www/other -d other.example.net -d another.other.example.net
|
||||
|
||||
would obtain a single certificate for all of those names, using the
|
||||
``/var/www/example`` webroot directory for the first two, and
|
||||
|
|
|
|||
|
|
@ -1,6 +0,0 @@
|
|||
XQAAAAT//////////wApLArrUzOk5bRHUk0UvMS4xjyZkm3U3qhnKvMbEan7rVeK6yBlbwGeeWFn
|
||||
Sw4XT1raGAMNq7cwyJvT7ql93Df7TpuRnxNSbPx7q52GojYyb5Oj1IQ2Y22Mvq41Q4K3kCZcVv+1
|
||||
YVKW3OazUn+wCnaoGhDdMFmH0EKbEPSGibba6HJqUoFosaDE2hRZmjqYR/VwwPCtW820L0Qz9PZ7
|
||||
DEAZ5VdMmj1+u+bYjDEcZD5+DyWKoLWci8tBXcPGiSvPDdZax/IWmR0GGUOd13gC7uX/HM2dHgbM
|
||||
Izh7Y3PPNEzM8Fu2wdXLoMCaYrQcrPAdKhsnyMCDbjxCVbD9LkS17xCq4LUMkcz/fMu3/CRSMMZ7
|
||||
gnn//jNQAA==
|
||||
|
|
@ -59,15 +59,3 @@ class DNS(AnnotatedChallenge):
|
|||
"""Client annotated "dns" ACME challenge."""
|
||||
__slots__ = ('challb', 'domain')
|
||||
acme_type = challenges.DNS
|
||||
|
||||
|
||||
class RecoveryContact(AnnotatedChallenge):
|
||||
"""Client annotated "recoveryContact" ACME challenge."""
|
||||
__slots__ = ('challb', 'domain')
|
||||
acme_type = challenges.RecoveryContact
|
||||
|
||||
|
||||
class ProofOfPossession(AnnotatedChallenge):
|
||||
"""Client annotated "proofOfPossession" ACME challenge."""
|
||||
__slots__ = ('challb', 'domain')
|
||||
acme_type = challenges.ProofOfPossession
|
||||
|
|
|
|||
|
|
@ -9,7 +9,6 @@ from acme import challenges
|
|||
from acme import messages
|
||||
|
||||
from letsencrypt import achallenges
|
||||
from letsencrypt import constants
|
||||
from letsencrypt import errors
|
||||
from letsencrypt import error_handler
|
||||
from letsencrypt import interfaces
|
||||
|
|
@ -21,13 +20,9 @@ logger = logging.getLogger(__name__)
|
|||
class AuthHandler(object):
|
||||
"""ACME Authorization Handler for a client.
|
||||
|
||||
:ivar dv_auth: Authenticator capable of solving
|
||||
:class:`~acme.challenges.DVChallenge` types
|
||||
:type dv_auth: :class:`letsencrypt.interfaces.IAuthenticator`
|
||||
|
||||
:ivar cont_auth: Authenticator capable of solving
|
||||
:class:`~acme.challenges.ContinuityChallenge` types
|
||||
:type cont_auth: :class:`letsencrypt.interfaces.IAuthenticator`
|
||||
:ivar auth: Authenticator capable of solving
|
||||
:class:`~acme.challenges.Challenge` types
|
||||
:type auth: :class:`letsencrypt.interfaces.IAuthenticator`
|
||||
|
||||
:ivar acme.client.Client acme: ACME client API.
|
||||
|
||||
|
|
@ -36,23 +31,19 @@ class AuthHandler(object):
|
|||
|
||||
:ivar dict authzr: ACME Authorization Resource dict where keys are domains
|
||||
and values are :class:`acme.messages.AuthorizationResource`
|
||||
:ivar list dv_c: DV challenges in the form of
|
||||
:ivar list achalls: DV challenges in the form of
|
||||
:class:`letsencrypt.achallenges.AnnotatedChallenge`
|
||||
:ivar list cont_c: Continuity challenges in the
|
||||
form of :class:`letsencrypt.achallenges.AnnotatedChallenge`
|
||||
|
||||
"""
|
||||
def __init__(self, dv_auth, cont_auth, acme, account):
|
||||
self.dv_auth = dv_auth
|
||||
self.cont_auth = cont_auth
|
||||
def __init__(self, auth, acme, account):
|
||||
self.auth = auth
|
||||
self.acme = acme
|
||||
|
||||
self.account = account
|
||||
self.authzr = dict()
|
||||
|
||||
# List must be used to keep responses straight.
|
||||
self.dv_c = []
|
||||
self.cont_c = []
|
||||
self.achalls = []
|
||||
|
||||
def get_authorizations(self, domains, best_effort=False):
|
||||
"""Retrieve all authorizations for challenges.
|
||||
|
|
@ -76,12 +67,12 @@ class AuthHandler(object):
|
|||
self._choose_challenges(domains)
|
||||
|
||||
# While there are still challenges remaining...
|
||||
while self.dv_c or self.cont_c:
|
||||
cont_resp, dv_resp = self._solve_challenges()
|
||||
while self.achalls:
|
||||
resp = self._solve_challenges()
|
||||
logger.info("Waiting for verification...")
|
||||
|
||||
# Send all Responses - this modifies dv_c and cont_c
|
||||
self._respond(cont_resp, dv_resp, best_effort)
|
||||
# Send all Responses - this modifies achalls
|
||||
self._respond(resp, best_effort)
|
||||
|
||||
# Just make sure all decisions are complete.
|
||||
self.verify_authzr_complete()
|
||||
|
|
@ -98,32 +89,27 @@ class AuthHandler(object):
|
|||
self._get_chall_pref(dom),
|
||||
self.authzr[dom].body.combinations)
|
||||
|
||||
dom_cont_c, dom_dv_c = self._challenge_factory(
|
||||
dom_achalls = self._challenge_factory(
|
||||
dom, path)
|
||||
self.dv_c.extend(dom_dv_c)
|
||||
self.cont_c.extend(dom_cont_c)
|
||||
self.achalls.extend(dom_achalls)
|
||||
|
||||
def _solve_challenges(self):
|
||||
"""Get Responses for challenges from authenticators."""
|
||||
cont_resp = []
|
||||
dv_resp = []
|
||||
resp = []
|
||||
with error_handler.ErrorHandler(self._cleanup_challenges):
|
||||
try:
|
||||
if self.cont_c:
|
||||
cont_resp = self.cont_auth.perform(self.cont_c)
|
||||
if self.dv_c:
|
||||
dv_resp = self.dv_auth.perform(self.dv_c)
|
||||
if self.achalls:
|
||||
resp = self.auth.perform(self.achalls)
|
||||
except errors.AuthorizationError:
|
||||
logger.critical("Failure in setting up challenges.")
|
||||
logger.info("Attempting to clean up outstanding challenges...")
|
||||
raise
|
||||
|
||||
assert len(cont_resp) == len(self.cont_c)
|
||||
assert len(dv_resp) == len(self.dv_c)
|
||||
assert len(resp) == len(self.achalls)
|
||||
|
||||
return cont_resp, dv_resp
|
||||
return resp
|
||||
|
||||
def _respond(self, cont_resp, dv_resp, best_effort):
|
||||
def _respond(self, resp, best_effort):
|
||||
"""Send/Receive confirmation of all challenges.
|
||||
|
||||
.. note:: This method also cleans up the auth_handler state.
|
||||
|
|
@ -131,17 +117,14 @@ class AuthHandler(object):
|
|||
"""
|
||||
# TODO: chall_update is a dirty hack to get around acme-spec #105
|
||||
chall_update = dict()
|
||||
active_achalls = []
|
||||
active_achalls.extend(
|
||||
self._send_responses(self.dv_c, dv_resp, chall_update))
|
||||
active_achalls.extend(
|
||||
self._send_responses(self.cont_c, cont_resp, chall_update))
|
||||
active_achalls = self._send_responses(self.achalls,
|
||||
resp, chall_update)
|
||||
|
||||
# Check for updated status...
|
||||
try:
|
||||
self._poll_challenges(chall_update, best_effort)
|
||||
finally:
|
||||
# This removes challenges from self.dv_c and self.cont_c
|
||||
# This removes challenges from self.achalls
|
||||
self._cleanup_challenges(active_achalls)
|
||||
|
||||
def _send_responses(self, achalls, resps, chall_update):
|
||||
|
|
@ -255,8 +238,7 @@ class AuthHandler(object):
|
|||
"""
|
||||
# Make sure to make a copy...
|
||||
chall_prefs = []
|
||||
chall_prefs.extend(self.cont_auth.get_chall_pref(domain))
|
||||
chall_prefs.extend(self.dv_auth.get_chall_pref(domain))
|
||||
chall_prefs.extend(self.auth.get_chall_pref(domain))
|
||||
return chall_prefs
|
||||
|
||||
def _cleanup_challenges(self, achall_list=None):
|
||||
|
|
@ -268,22 +250,14 @@ class AuthHandler(object):
|
|||
logger.info("Cleaning up challenges")
|
||||
|
||||
if achall_list is None:
|
||||
dv_c = self.dv_c
|
||||
cont_c = self.cont_c
|
||||
achalls = self.achalls
|
||||
else:
|
||||
dv_c = [achall for achall in achall_list
|
||||
if isinstance(achall.chall, challenges.DVChallenge)]
|
||||
cont_c = [achall for achall in achall_list if isinstance(
|
||||
achall.chall, challenges.ContinuityChallenge)]
|
||||
achalls = achall_list
|
||||
|
||||
if dv_c:
|
||||
self.dv_auth.cleanup(dv_c)
|
||||
for achall in dv_c:
|
||||
self.dv_c.remove(achall)
|
||||
if cont_c:
|
||||
self.cont_auth.cleanup(cont_c)
|
||||
for achall in cont_c:
|
||||
self.cont_c.remove(achall)
|
||||
if achalls:
|
||||
self.auth.cleanup(achalls)
|
||||
for achall in achalls:
|
||||
self.achalls.remove(achall)
|
||||
|
||||
def verify_authzr_complete(self):
|
||||
"""Verifies that all authorizations have been decided.
|
||||
|
|
@ -304,30 +278,20 @@ class AuthHandler(object):
|
|||
|
||||
:param list path: List of indices from `challenges`.
|
||||
|
||||
:returns: dv_chall, list of DVChallenge type
|
||||
:returns: achalls, list of challenge type
|
||||
:class:`letsencrypt.achallenges.Indexed`
|
||||
cont_chall, list of ContinuityChallenge type
|
||||
:class:`letsencrypt.achallenges.Indexed`
|
||||
:rtype: tuple
|
||||
:rtype: list
|
||||
|
||||
:raises .errors.Error: if challenge type is not recognized
|
||||
|
||||
"""
|
||||
dv_chall = []
|
||||
cont_chall = []
|
||||
achalls = []
|
||||
|
||||
for index in path:
|
||||
challb = self.authzr[domain].body.challenges[index]
|
||||
chall = challb.chall
|
||||
achalls.append(challb_to_achall(challb, self.account.key, domain))
|
||||
|
||||
achall = challb_to_achall(challb, self.account.key, domain)
|
||||
|
||||
if isinstance(chall, challenges.ContinuityChallenge):
|
||||
cont_chall.append(achall)
|
||||
elif isinstance(chall, challenges.DVChallenge):
|
||||
dv_chall.append(achall)
|
||||
|
||||
return cont_chall, dv_chall
|
||||
return achalls
|
||||
|
||||
|
||||
def challb_to_achall(challb, account_key, domain):
|
||||
|
|
@ -349,12 +313,6 @@ def challb_to_achall(challb, account_key, domain):
|
|||
challb=challb, domain=domain, account_key=account_key)
|
||||
elif isinstance(chall, challenges.DNS):
|
||||
return achallenges.DNS(challb=challb, domain=domain)
|
||||
elif isinstance(chall, challenges.RecoveryContact):
|
||||
return achallenges.RecoveryContact(
|
||||
challb=challb, domain=domain)
|
||||
elif isinstance(chall, challenges.ProofOfPossession):
|
||||
return achallenges.ProofOfPossession(
|
||||
challb=challb, domain=domain)
|
||||
else:
|
||||
raise errors.Error(
|
||||
"Received unsupported challenge of type: %s", chall.typ)
|
||||
|
|
@ -424,10 +382,7 @@ def _find_smart_path(challbs, preferences, combinations):
|
|||
combo_total = 0
|
||||
|
||||
if not best_combo:
|
||||
msg = ("Client does not support any combination of challenges that "
|
||||
"will satisfy the CA.")
|
||||
logger.fatal(msg)
|
||||
raise errors.AuthorizationError(msg)
|
||||
_report_no_chall_path()
|
||||
|
||||
return best_combo
|
||||
|
||||
|
|
@ -436,48 +391,29 @@ def _find_dumb_path(challbs, preferences):
|
|||
"""Find challenge path without server hints.
|
||||
|
||||
Should be called if the combinations hint is not included by the
|
||||
server. This function returns the best path that does not contain
|
||||
multiple mutually exclusive challenges.
|
||||
server. This function either returns a path containing all
|
||||
challenges provided by the CA or raises an exception.
|
||||
|
||||
"""
|
||||
assert len(preferences) == len(set(preferences))
|
||||
|
||||
path = []
|
||||
satisfied = set()
|
||||
for pref_c in preferences:
|
||||
for i, offered_challb in enumerate(challbs):
|
||||
if (isinstance(offered_challb.chall, pref_c) and
|
||||
is_preferred(offered_challb, satisfied)):
|
||||
path.append(i)
|
||||
satisfied.add(offered_challb)
|
||||
for i, challb in enumerate(challbs):
|
||||
# supported is set to True if the challenge type is supported
|
||||
supported = next((True for pref_c in preferences
|
||||
if isinstance(challb.chall, pref_c)), False)
|
||||
if supported:
|
||||
path.append(i)
|
||||
else:
|
||||
_report_no_chall_path()
|
||||
|
||||
return path
|
||||
|
||||
|
||||
def mutually_exclusive(obj1, obj2, groups, different=False):
|
||||
"""Are two objects mutually exclusive?"""
|
||||
for group in groups:
|
||||
obj1_present = False
|
||||
obj2_present = False
|
||||
|
||||
for obj_cls in group:
|
||||
obj1_present |= isinstance(obj1, obj_cls)
|
||||
obj2_present |= isinstance(obj2, obj_cls)
|
||||
|
||||
if obj1_present and obj2_present and (
|
||||
not different or not isinstance(obj1, obj2.__class__)):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def is_preferred(offered_challb, satisfied,
|
||||
exclusive_groups=constants.EXCLUSIVE_CHALLENGES):
|
||||
"""Return whether or not the challenge is preferred in path."""
|
||||
for challb in satisfied:
|
||||
if not mutually_exclusive(
|
||||
offered_challb.chall, challb.chall, exclusive_groups,
|
||||
different=True):
|
||||
return False
|
||||
return True
|
||||
def _report_no_chall_path():
|
||||
"""Logs and raises an error that no satisfiable chall path exists."""
|
||||
msg = ("Client with the currently selected authenticator does not support "
|
||||
"any combination of challenges that will satisfy the CA.")
|
||||
logger.fatal(msg)
|
||||
raise errors.AuthorizationError(msg)
|
||||
|
||||
|
||||
_ACME_PREFIX = "urn:acme:error:"
|
||||
|
|
|
|||
|
|
@ -1071,7 +1071,7 @@ def config_changes(config, unused_plugins):
|
|||
View checkpoints and associated configuration changes.
|
||||
|
||||
"""
|
||||
client.view_config_changes(config)
|
||||
client.view_config_changes(config, num=config.num)
|
||||
|
||||
|
||||
def plugins_cmd(config, plugins): # TODO: Use IDisplay rather than print
|
||||
|
|
@ -1651,6 +1651,10 @@ def _create_subparsers(helpful):
|
|||
helpful.add_group("revoke", description="Options for revocation of certs")
|
||||
helpful.add_group("rollback", description="Options for reverting config changes")
|
||||
helpful.add_group("plugins", description="Plugin options")
|
||||
helpful.add_group("config_changes",
|
||||
description="Options for showing a history of config changes")
|
||||
helpful.add("config_changes", "--num", type=int,
|
||||
help="How many past revisions you want to be displayed")
|
||||
helpful.add(
|
||||
None, "--user-agent", default=None,
|
||||
help="Set a custom user agent string for the client. User agent strings allow "
|
||||
|
|
|
|||
|
|
@ -17,7 +17,6 @@ from letsencrypt import account
|
|||
from letsencrypt import auth_handler
|
||||
from letsencrypt import configuration
|
||||
from letsencrypt import constants
|
||||
from letsencrypt import continuity_auth
|
||||
from letsencrypt import crypto_util
|
||||
from letsencrypt import errors
|
||||
from letsencrypt import error_handler
|
||||
|
|
@ -161,21 +160,21 @@ class Client(object):
|
|||
:ivar .IConfig config: Client configuration.
|
||||
:ivar .Account account: Account registered with `register`.
|
||||
:ivar .AuthHandler auth_handler: Authorizations handler that will
|
||||
dispatch DV and Continuity challenges to appropriate
|
||||
authenticators (providing `.IAuthenticator` interface).
|
||||
:ivar .IAuthenticator dv_auth: Prepared (`.IAuthenticator.prepare`)
|
||||
authenticator that can solve the `.constants.DV_CHALLENGES`.
|
||||
dispatch DV challenges to appropriate authenticators
|
||||
(providing `.IAuthenticator` interface).
|
||||
:ivar .IAuthenticator auth: Prepared (`.IAuthenticator.prepare`)
|
||||
authenticator that can solve ACME challenges.
|
||||
:ivar .IInstaller installer: Installer.
|
||||
:ivar acme.client.Client acme: Optional ACME client API handle.
|
||||
You might already have one from `register`.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, config, account_, dv_auth, installer, acme=None):
|
||||
def __init__(self, config, account_, auth, installer, acme=None):
|
||||
"""Initialize a client."""
|
||||
self.config = config
|
||||
self.account = account_
|
||||
self.dv_auth = dv_auth
|
||||
self.auth = auth
|
||||
self.installer = installer
|
||||
|
||||
# Initialize ACME if account is provided
|
||||
|
|
@ -183,15 +182,9 @@ class Client(object):
|
|||
acme = acme_from_config_key(config, self.account.key)
|
||||
self.acme = acme
|
||||
|
||||
# TODO: Check if self.config.enroll_autorenew is None. If
|
||||
# so, set it based to the default: figure out if dv_auth is
|
||||
# standalone (then default is False, otherwise default is True)
|
||||
|
||||
if dv_auth is not None:
|
||||
cont_auth = continuity_auth.ContinuityAuthenticator(config,
|
||||
installer)
|
||||
if auth is not None:
|
||||
self.auth_handler = auth_handler.AuthHandler(
|
||||
dv_auth, cont_auth, self.acme, self.account)
|
||||
auth, self.acme, self.account)
|
||||
else:
|
||||
self.auth_handler = None
|
||||
|
||||
|
|
@ -543,7 +536,7 @@ def rollback(default_installer, checkpoints, config, plugins):
|
|||
installer.restart()
|
||||
|
||||
|
||||
def view_config_changes(config):
|
||||
def view_config_changes(config, num=None):
|
||||
"""View checkpoints and associated configuration changes.
|
||||
|
||||
.. note:: This assumes that the installation is using a Reverter object.
|
||||
|
|
@ -554,7 +547,7 @@ def view_config_changes(config):
|
|||
"""
|
||||
rev = reverter.Reverter(config)
|
||||
rev.recovery_routine()
|
||||
rev.view_config_changes()
|
||||
rev.view_config_changes(num)
|
||||
|
||||
|
||||
def _save_chain(chain_pem, chain_path):
|
||||
|
|
|
|||
|
|
@ -44,11 +44,6 @@ RENEWER_DEFAULTS = dict(
|
|||
"""Defaults for renewer script."""
|
||||
|
||||
|
||||
EXCLUSIVE_CHALLENGES = frozenset([frozenset([
|
||||
challenges.TLSSNI01, challenges.HTTP01])])
|
||||
"""Mutually exclusive challenges."""
|
||||
|
||||
|
||||
ENHANCEMENTS = ["redirect", "http-header", "ocsp-stapling", "spdy"]
|
||||
"""List of possible :class:`letsencrypt.interfaces.IInstaller`
|
||||
enhancements.
|
||||
|
|
|
|||
|
|
@ -1,54 +0,0 @@
|
|||
"""Continuity Authenticator"""
|
||||
import zope.interface
|
||||
|
||||
from acme import challenges
|
||||
|
||||
from letsencrypt import achallenges
|
||||
from letsencrypt import errors
|
||||
from letsencrypt import interfaces
|
||||
from letsencrypt import proof_of_possession
|
||||
|
||||
|
||||
@zope.interface.implementer(interfaces.IAuthenticator)
|
||||
class ContinuityAuthenticator(object):
|
||||
"""IAuthenticator for
|
||||
:const:`~acme.challenges.ContinuityChallenge` class challenges.
|
||||
|
||||
:ivar proof_of_pos: Performs "proofOfPossession" challenges.
|
||||
:type proof_of_pos:
|
||||
:class:`letsencrypt.proof_of_possession.Proof_of_Possession`
|
||||
|
||||
"""
|
||||
|
||||
# This will have an installer soon for get_key/cert purposes
|
||||
def __init__(self, config, installer): # pylint: disable=unused-argument
|
||||
"""Initialize Client Authenticator.
|
||||
|
||||
:param config: Configuration.
|
||||
:type config: :class:`letsencrypt.interfaces.IConfig`
|
||||
|
||||
:param installer: Let's Encrypt Installer.
|
||||
:type installer: :class:`letsencrypt.interfaces.IInstaller`
|
||||
|
||||
"""
|
||||
self.proof_of_pos = proof_of_possession.ProofOfPossession(installer)
|
||||
|
||||
def get_chall_pref(self, unused_domain): # pylint: disable=no-self-use
|
||||
"""Return list of challenge preferences."""
|
||||
return [challenges.ProofOfPossession]
|
||||
|
||||
def perform(self, achalls):
|
||||
"""Perform client specific challenges for IAuthenticator"""
|
||||
responses = []
|
||||
for achall in achalls:
|
||||
if isinstance(achall, achallenges.ProofOfPossession):
|
||||
responses.append(self.proof_of_pos.perform(achall))
|
||||
else:
|
||||
raise errors.ContAuthError("Unexpected Challenge")
|
||||
return responses
|
||||
|
||||
def cleanup(self, achalls): # pylint: disable=no-self-use
|
||||
"""Cleanup call for IAuthenticator."""
|
||||
for achall in achalls:
|
||||
if not isinstance(achall, achallenges.ProofOfPossession):
|
||||
raise errors.ContAuthError("Unexpected Challenge")
|
||||
|
|
@ -48,19 +48,6 @@ class FailedChallenges(AuthorizationError):
|
|||
for achall in self.failed_achalls if achall.error is not None))
|
||||
|
||||
|
||||
class ContAuthError(AuthorizationError):
|
||||
"""Let's Encrypt Continuity Authenticator error."""
|
||||
|
||||
|
||||
class DvAuthError(AuthorizationError):
|
||||
"""Let's Encrypt DV Authenticator error."""
|
||||
|
||||
|
||||
# Authenticator - Challenge specific errors
|
||||
class TLSSNI01Error(DvAuthError):
|
||||
"""Let's Encrypt TLSSNI01 error."""
|
||||
|
||||
|
||||
# Plugin Errors
|
||||
class PluginError(Error):
|
||||
"""Let's Encrypt Plugin error."""
|
||||
|
|
@ -86,10 +73,6 @@ class NotSupportedError(PluginError):
|
|||
"""Let's Encrypt Plugin function not supported error."""
|
||||
|
||||
|
||||
class RevokerError(Error):
|
||||
"""Let's Encrypt Revoker error."""
|
||||
|
||||
|
||||
class StandaloneBindError(Error):
|
||||
"""Standalone plugin bind error."""
|
||||
|
||||
|
|
|
|||
|
|
@ -1,99 +0,0 @@
|
|||
"""Proof of Possession Identifier Validation Challenge."""
|
||||
import logging
|
||||
import os
|
||||
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
import zope.component
|
||||
|
||||
from acme import challenges
|
||||
from acme import jose
|
||||
from acme import other
|
||||
|
||||
from letsencrypt import interfaces
|
||||
from letsencrypt.display import util as display_util
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ProofOfPossession(object): # pylint: disable=too-few-public-methods
|
||||
"""Proof of Possession Identifier Validation Challenge.
|
||||
|
||||
Based on draft-barnes-acme, section 6.5.
|
||||
|
||||
:ivar installer: Installer object
|
||||
:type installer: :class:`~letsencrypt.interfaces.IInstaller`
|
||||
|
||||
"""
|
||||
def __init__(self, installer):
|
||||
self.installer = installer
|
||||
|
||||
def perform(self, achall):
|
||||
"""Perform the Proof of Possession Challenge.
|
||||
|
||||
:param achall: Proof of Possession Challenge
|
||||
:type achall: :class:`letsencrypt.achallenges.ProofOfPossession`
|
||||
|
||||
:returns: Response or None/False if the challenge cannot be completed
|
||||
:rtype: :class:`acme.challenges.ProofOfPossessionResponse`
|
||||
or False
|
||||
|
||||
"""
|
||||
if (achall.alg in [jose.HS256, jose.HS384, jose.HS512] or
|
||||
not isinstance(achall.hints.jwk, achall.alg.kty)):
|
||||
return None
|
||||
|
||||
for cert, key, _ in self.installer.get_all_certs_keys():
|
||||
with open(cert) as cert_file:
|
||||
cert_data = cert_file.read()
|
||||
try:
|
||||
cert_obj = x509.load_pem_x509_certificate(
|
||||
cert_data, default_backend())
|
||||
except ValueError:
|
||||
try:
|
||||
cert_obj = x509.load_der_x509_certificate(
|
||||
cert_data, default_backend())
|
||||
except ValueError:
|
||||
logger.warn("Certificate is neither PER nor DER: %s", cert)
|
||||
|
||||
cert_key = achall.alg.kty(key=cert_obj.public_key())
|
||||
if cert_key == achall.hints.jwk:
|
||||
return self._gen_response(achall, key)
|
||||
|
||||
# Is there are different prompt we should give the user?
|
||||
code, key = zope.component.getUtility(
|
||||
interfaces.IDisplay).input(
|
||||
"Path to private key for identifier: %s " % achall.domain)
|
||||
if code != display_util.CANCEL:
|
||||
return self._gen_response(achall, key)
|
||||
|
||||
# If we get here, the key wasn't found
|
||||
return False
|
||||
|
||||
def _gen_response(self, achall, key_path): # pylint: disable=no-self-use
|
||||
"""Create the response to the Proof of Possession Challenge.
|
||||
|
||||
:param achall: Proof of Possession Challenge
|
||||
:type achall: :class:`letsencrypt.achallenges.ProofOfPossession`
|
||||
|
||||
:param str key_path: Path to the key corresponding to the hinted to
|
||||
public key.
|
||||
|
||||
:returns: Response or False if the challenge cannot be completed
|
||||
:rtype: :class:`acme.challenges.ProofOfPossessionResponse`
|
||||
or False
|
||||
|
||||
"""
|
||||
if os.path.isfile(key_path):
|
||||
with open(key_path, 'rb') as key:
|
||||
try:
|
||||
# Needs to be changed if JWKES doesn't have a key attribute
|
||||
jwk = achall.alg.kty.load(key.read())
|
||||
sig = other.Signature.from_msg(achall.nonce, jwk.key,
|
||||
alg=achall.alg)
|
||||
except (IndexError, ValueError, TypeError, jose.errors.Error):
|
||||
return False
|
||||
return challenges.ProofOfPossessionResponse(nonce=achall.nonce,
|
||||
signature=sig)
|
||||
return False
|
||||
|
|
@ -94,7 +94,7 @@ class Reverter(object):
|
|||
"Unable to load checkpoint during rollback")
|
||||
rollback -= 1
|
||||
|
||||
def view_config_changes(self, for_logging=False):
|
||||
def view_config_changes(self, for_logging=False, num=None):
|
||||
"""Displays all saved checkpoints.
|
||||
|
||||
All checkpoints are printed by
|
||||
|
|
@ -107,7 +107,8 @@ class Reverter(object):
|
|||
"""
|
||||
backups = os.listdir(self.config.backup_dir)
|
||||
backups.sort(reverse=True)
|
||||
|
||||
if num:
|
||||
backups = backups[:num]
|
||||
if not backups:
|
||||
logger.info("The Let's Encrypt client has not saved any backups "
|
||||
"of your configuration")
|
||||
|
|
|
|||
|
|
@ -17,51 +17,14 @@ HTTP01 = challenges.HTTP01(
|
|||
TLSSNI01 = challenges.TLSSNI01(
|
||||
token=jose.b64decode(b"evaGxfADs6pSRb2LAv9IZf17Dt3juxGJyPCt92wrDoA"))
|
||||
DNS = challenges.DNS(token="17817c66b60ce2e4012dfad92657527a")
|
||||
RECOVERY_CONTACT = challenges.RecoveryContact(
|
||||
activation_url="https://example.ca/sendrecovery/a5bd99383fb0",
|
||||
success_url="https://example.ca/confirmrecovery/bb1b9928932",
|
||||
contact="c********n@example.com")
|
||||
POP = challenges.ProofOfPossession(
|
||||
alg="RS256", nonce=jose.b64decode("eET5udtV7aoX8Xl8gYiZIA"),
|
||||
hints=challenges.ProofOfPossession.Hints(
|
||||
jwk=jose.JWKRSA(key=KEY.public_key()),
|
||||
cert_fingerprints=(
|
||||
"93416768eb85e33adc4277f4c9acd63e7418fcfe",
|
||||
"16d95b7b63f1972b980b14c20291f3c0d1855d95",
|
||||
"48b46570d9fc6358108af43ad1649484def0debf"
|
||||
),
|
||||
certs=(), # TODO
|
||||
subject_key_identifiers=("d0083162dcc4c8a23ecb8aecbd86120e56fd24e5"),
|
||||
serial_numbers=(34234239832, 23993939911, 17),
|
||||
issuers=(
|
||||
"C=US, O=SuperT LLC, CN=SuperTrustworthy Public CA",
|
||||
"O=LessTrustworthy CA Inc, CN=LessTrustworthy But StillSecure",
|
||||
),
|
||||
authorized_for=("www.example.com", "example.net"),
|
||||
)
|
||||
)
|
||||
|
||||
CHALLENGES = [HTTP01, TLSSNI01, DNS, RECOVERY_CONTACT, POP]
|
||||
DV_CHALLENGES = [chall for chall in CHALLENGES
|
||||
if isinstance(chall, challenges.DVChallenge)]
|
||||
CONT_CHALLENGES = [chall for chall in CHALLENGES
|
||||
if isinstance(chall, challenges.ContinuityChallenge)]
|
||||
CHALLENGES = [HTTP01, TLSSNI01, DNS]
|
||||
|
||||
|
||||
def gen_combos(challbs):
|
||||
"""Generate natural combinations for challbs."""
|
||||
dv_chall = []
|
||||
cont_chall = []
|
||||
|
||||
for i, challb in enumerate(challbs): # pylint: disable=redefined-outer-name
|
||||
if isinstance(challb.chall, challenges.DVChallenge):
|
||||
dv_chall.append(i)
|
||||
else:
|
||||
cont_chall.append(i)
|
||||
|
||||
# Gen combos for 1 of each type, lowest index first (makes testing easier)
|
||||
return tuple((i, j) if i < j else (j, i)
|
||||
for i in dv_chall for j in cont_chall)
|
||||
# completing a single DV challenge satisfies the CA
|
||||
return tuple((i,) for i, _ in enumerate(challbs))
|
||||
|
||||
|
||||
def chall_to_challb(chall, status): # pylint: disable=redefined-outer-name
|
||||
|
|
@ -82,16 +45,8 @@ def chall_to_challb(chall, status): # pylint: disable=redefined-outer-name
|
|||
TLSSNI01_P = chall_to_challb(TLSSNI01, messages.STATUS_PENDING)
|
||||
HTTP01_P = chall_to_challb(HTTP01, messages.STATUS_PENDING)
|
||||
DNS_P = chall_to_challb(DNS, messages.STATUS_PENDING)
|
||||
RECOVERY_CONTACT_P = chall_to_challb(RECOVERY_CONTACT, messages.STATUS_PENDING)
|
||||
POP_P = chall_to_challb(POP, messages.STATUS_PENDING)
|
||||
|
||||
CHALLENGES_P = [HTTP01_P, TLSSNI01_P, DNS_P, RECOVERY_CONTACT_P, POP_P]
|
||||
DV_CHALLENGES_P = [challb for challb in CHALLENGES_P
|
||||
if isinstance(challb.chall, challenges.DVChallenge)]
|
||||
CONT_CHALLENGES_P = [
|
||||
challb for challb in CHALLENGES_P
|
||||
if isinstance(challb.chall, challenges.ContinuityChallenge)
|
||||
]
|
||||
CHALLENGES_P = [HTTP01_P, TLSSNI01_P, DNS_P]
|
||||
|
||||
|
||||
def gen_authzr(authz_status, domain, challs, statuses, combos=True):
|
||||
|
|
|
|||
|
|
@ -23,8 +23,7 @@ class ChallengeFactoryTest(unittest.TestCase):
|
|||
from letsencrypt.auth_handler import AuthHandler
|
||||
|
||||
# Account is mocked...
|
||||
self.handler = AuthHandler(
|
||||
None, None, None, mock.Mock(key="mock_key"))
|
||||
self.handler = AuthHandler(None, None, mock.Mock(key="mock_key"))
|
||||
|
||||
self.dom = "test"
|
||||
self.handler.authzr[self.dom] = acme_util.gen_authzr(
|
||||
|
|
@ -32,20 +31,17 @@ class ChallengeFactoryTest(unittest.TestCase):
|
|||
[messages.STATUS_PENDING] * 6, False)
|
||||
|
||||
def test_all(self):
|
||||
cont_c, dv_c = self.handler._challenge_factory(
|
||||
achalls = self.handler._challenge_factory(
|
||||
self.dom, range(0, len(acme_util.CHALLENGES)))
|
||||
|
||||
self.assertEqual(
|
||||
[achall.chall for achall in cont_c], acme_util.CONT_CHALLENGES)
|
||||
self.assertEqual(
|
||||
[achall.chall for achall in dv_c], acme_util.DV_CHALLENGES)
|
||||
[achall.chall for achall in achalls], acme_util.CHALLENGES)
|
||||
|
||||
def test_one_dv_one_cont(self):
|
||||
cont_c, dv_c = self.handler._challenge_factory(self.dom, [1, 3])
|
||||
def test_one_tls_sni(self):
|
||||
achalls = self.handler._challenge_factory(self.dom, [1])
|
||||
|
||||
self.assertEqual(
|
||||
[achall.chall for achall in cont_c], [acme_util.RECOVERY_CONTACT])
|
||||
self.assertEqual([achall.chall for achall in dv_c], [acme_util.TLSSNI01])
|
||||
[achall.chall for achall in achalls], [acme_util.TLSSNI01])
|
||||
|
||||
def test_unrecognized(self):
|
||||
self.handler.authzr["failure.com"] = acme_util.gen_authzr(
|
||||
|
|
@ -67,22 +63,17 @@ class GetAuthorizationsTest(unittest.TestCase):
|
|||
def setUp(self):
|
||||
from letsencrypt.auth_handler import AuthHandler
|
||||
|
||||
self.mock_dv_auth = mock.MagicMock(name="ApacheConfigurator")
|
||||
self.mock_cont_auth = mock.MagicMock(name="ContinuityAuthenticator")
|
||||
self.mock_auth = mock.MagicMock(name="ApacheConfigurator")
|
||||
|
||||
self.mock_dv_auth.get_chall_pref.return_value = [challenges.TLSSNI01]
|
||||
self.mock_cont_auth.get_chall_pref.return_value = [
|
||||
challenges.RecoveryContact]
|
||||
self.mock_auth.get_chall_pref.return_value = [challenges.TLSSNI01]
|
||||
|
||||
self.mock_cont_auth.perform.side_effect = gen_auth_resp
|
||||
self.mock_dv_auth.perform.side_effect = gen_auth_resp
|
||||
self.mock_auth.perform.side_effect = gen_auth_resp
|
||||
|
||||
self.mock_account = mock.Mock(key=le_util.Key("file_path", "PEM"))
|
||||
self.mock_net = mock.MagicMock(spec=acme_client.Client)
|
||||
|
||||
self.handler = AuthHandler(
|
||||
self.mock_dv_auth, self.mock_cont_auth,
|
||||
self.mock_net, self.mock_account)
|
||||
self.mock_auth, self.mock_net, self.mock_account)
|
||||
|
||||
logging.disable(logging.CRITICAL)
|
||||
|
||||
|
|
@ -92,7 +83,7 @@ class GetAuthorizationsTest(unittest.TestCase):
|
|||
@mock.patch("letsencrypt.auth_handler.AuthHandler._poll_challenges")
|
||||
def test_name1_tls_sni_01_1(self, mock_poll):
|
||||
self.mock_net.request_domain_challenges.side_effect = functools.partial(
|
||||
gen_dom_authzr, challs=acme_util.DV_CHALLENGES)
|
||||
gen_dom_authzr, challs=acme_util.CHALLENGES)
|
||||
|
||||
mock_poll.side_effect = self._validate_all
|
||||
|
||||
|
|
@ -105,16 +96,40 @@ class GetAuthorizationsTest(unittest.TestCase):
|
|||
self.assertEqual(chall_update.keys(), ["0"])
|
||||
self.assertEqual(len(chall_update.values()), 1)
|
||||
|
||||
self.assertEqual(self.mock_dv_auth.cleanup.call_count, 1)
|
||||
self.assertEqual(self.mock_cont_auth.cleanup.call_count, 0)
|
||||
self.assertEqual(self.mock_auth.cleanup.call_count, 1)
|
||||
# Test if list first element is TLSSNI01, use typ because it is an achall
|
||||
self.assertEqual(
|
||||
self.mock_dv_auth.cleanup.call_args[0][0][0].typ, "tls-sni-01")
|
||||
self.mock_auth.cleanup.call_args[0][0][0].typ, "tls-sni-01")
|
||||
|
||||
self.assertEqual(len(authzr), 1)
|
||||
|
||||
@mock.patch("letsencrypt.auth_handler.AuthHandler._poll_challenges")
|
||||
def test_name3_tls_sni_01_3_rectok_3(self, mock_poll):
|
||||
def test_name1_tls_sni_01_1_http_01_1_dns_1(self, mock_poll):
|
||||
self.mock_net.request_domain_challenges.side_effect = functools.partial(
|
||||
gen_dom_authzr, challs=acme_util.CHALLENGES, combos=False)
|
||||
|
||||
mock_poll.side_effect = self._validate_all
|
||||
self.mock_auth.get_chall_pref.return_value.append(challenges.HTTP01)
|
||||
self.mock_auth.get_chall_pref.return_value.append(challenges.DNS)
|
||||
|
||||
authzr = self.handler.get_authorizations(["0"])
|
||||
|
||||
self.assertEqual(self.mock_net.answer_challenge.call_count, 3)
|
||||
|
||||
self.assertEqual(mock_poll.call_count, 1)
|
||||
chall_update = mock_poll.call_args[0][0]
|
||||
self.assertEqual(chall_update.keys(), ["0"])
|
||||
self.assertEqual(len(chall_update.values()), 1)
|
||||
|
||||
self.assertEqual(self.mock_auth.cleanup.call_count, 1)
|
||||
# Test if list first element is TLSSNI01, use typ because it is an achall
|
||||
for achall in self.mock_auth.cleanup.call_args[0][0]:
|
||||
self.assertTrue(achall.typ in ["tls-sni-01", "http-01", "dns"])
|
||||
|
||||
self.assertEqual(len(authzr), 1)
|
||||
|
||||
@mock.patch("letsencrypt.auth_handler.AuthHandler._poll_challenges")
|
||||
def test_name3_tls_sni_01_3(self, mock_poll):
|
||||
self.mock_net.request_domain_challenges.side_effect = functools.partial(
|
||||
gen_dom_authzr, challs=acme_util.CHALLENGES)
|
||||
|
||||
|
|
@ -122,28 +137,27 @@ class GetAuthorizationsTest(unittest.TestCase):
|
|||
|
||||
authzr = self.handler.get_authorizations(["0", "1", "2"])
|
||||
|
||||
self.assertEqual(self.mock_net.answer_challenge.call_count, 6)
|
||||
self.assertEqual(self.mock_net.answer_challenge.call_count, 3)
|
||||
|
||||
# Check poll call
|
||||
self.assertEqual(mock_poll.call_count, 1)
|
||||
chall_update = mock_poll.call_args[0][0]
|
||||
self.assertEqual(len(chall_update.keys()), 3)
|
||||
self.assertTrue("0" in chall_update.keys())
|
||||
self.assertEqual(len(chall_update["0"]), 2)
|
||||
self.assertEqual(len(chall_update["0"]), 1)
|
||||
self.assertTrue("1" in chall_update.keys())
|
||||
self.assertEqual(len(chall_update["1"]), 2)
|
||||
self.assertEqual(len(chall_update["1"]), 1)
|
||||
self.assertTrue("2" in chall_update.keys())
|
||||
self.assertEqual(len(chall_update["2"]), 2)
|
||||
self.assertEqual(len(chall_update["2"]), 1)
|
||||
|
||||
self.assertEqual(self.mock_dv_auth.cleanup.call_count, 1)
|
||||
self.assertEqual(self.mock_cont_auth.cleanup.call_count, 1)
|
||||
self.assertEqual(self.mock_auth.cleanup.call_count, 1)
|
||||
|
||||
self.assertEqual(len(authzr), 3)
|
||||
|
||||
def test_perform_failure(self):
|
||||
self.mock_net.request_domain_challenges.side_effect = functools.partial(
|
||||
gen_dom_authzr, challs=acme_util.CHALLENGES)
|
||||
self.mock_dv_auth.perform.side_effect = errors.AuthorizationError
|
||||
self.mock_auth.perform.side_effect = errors.AuthorizationError
|
||||
|
||||
self.assertRaises(
|
||||
errors.AuthorizationError, self.handler.get_authorizations, ["0"])
|
||||
|
|
@ -170,20 +184,21 @@ class PollChallengesTest(unittest.TestCase):
|
|||
# Account and network are mocked...
|
||||
self.mock_net = mock.MagicMock()
|
||||
self.handler = AuthHandler(
|
||||
None, None, self.mock_net, mock.Mock(key="mock_key"))
|
||||
None, self.mock_net, mock.Mock(key="mock_key"))
|
||||
|
||||
self.doms = ["0", "1", "2"]
|
||||
self.handler.authzr[self.doms[0]] = acme_util.gen_authzr(
|
||||
messages.STATUS_PENDING, self.doms[0],
|
||||
acme_util.DV_CHALLENGES, [messages.STATUS_PENDING] * 3, False)
|
||||
[acme_util.HTTP01, acme_util.TLSSNI01],
|
||||
[messages.STATUS_PENDING] * 2, False)
|
||||
|
||||
self.handler.authzr[self.doms[1]] = acme_util.gen_authzr(
|
||||
messages.STATUS_PENDING, self.doms[1],
|
||||
acme_util.DV_CHALLENGES, [messages.STATUS_PENDING] * 3, False)
|
||||
acme_util.CHALLENGES, [messages.STATUS_PENDING] * 3, False)
|
||||
|
||||
self.handler.authzr[self.doms[2]] = acme_util.gen_authzr(
|
||||
messages.STATUS_PENDING, self.doms[2],
|
||||
acme_util.DV_CHALLENGES, [messages.STATUS_PENDING] * 3, False)
|
||||
acme_util.CHALLENGES, [messages.STATUS_PENDING] * 3, False)
|
||||
|
||||
self.chall_update = {}
|
||||
for dom in self.doms:
|
||||
|
|
@ -220,7 +235,7 @@ class PollChallengesTest(unittest.TestCase):
|
|||
from letsencrypt.auth_handler import challb_to_achall
|
||||
self.mock_net.poll.side_effect = self._mock_poll_solve_one_valid
|
||||
self.chall_update[self.doms[0]].append(
|
||||
challb_to_achall(acme_util.RECOVERY_CONTACT_P, "key", self.doms[0]))
|
||||
challb_to_achall(acme_util.DNS_P, "key", self.doms[0]))
|
||||
self.assertRaises(
|
||||
errors.AuthorizationError, self.handler._poll_challenges,
|
||||
self.chall_update, False)
|
||||
|
|
@ -311,7 +326,7 @@ class GenChallengePathTest(unittest.TestCase):
|
|||
def test_common_case(self):
|
||||
"""Given TLSSNI01 and HTTP01 with appropriate combos."""
|
||||
challbs = (acme_util.TLSSNI01_P, acme_util.HTTP01_P)
|
||||
prefs = [challenges.TLSSNI01]
|
||||
prefs = [challenges.TLSSNI01, challenges.HTTP01]
|
||||
combos = ((0,), (1,))
|
||||
|
||||
# Smart then trivial dumb path test
|
||||
|
|
@ -321,111 +336,17 @@ class GenChallengePathTest(unittest.TestCase):
|
|||
self.assertEqual(self._call(challbs[::-1], prefs, combos), (1,))
|
||||
self.assertTrue(self._call(challbs[::-1], prefs, None))
|
||||
|
||||
def test_common_case_with_continuity(self):
|
||||
challbs = (acme_util.POP_P,
|
||||
acme_util.RECOVERY_CONTACT_P,
|
||||
acme_util.TLSSNI01_P,
|
||||
acme_util.HTTP01_P)
|
||||
prefs = [challenges.ProofOfPossession, challenges.TLSSNI01]
|
||||
combos = acme_util.gen_combos(challbs)
|
||||
self.assertEqual(self._call(challbs, prefs, combos), (0, 2))
|
||||
|
||||
# dumb_path() trivial test
|
||||
self.assertTrue(self._call(challbs, prefs, None))
|
||||
|
||||
def test_full_cont_server(self):
|
||||
challbs = (acme_util.RECOVERY_CONTACT_P,
|
||||
acme_util.POP_P,
|
||||
acme_util.TLSSNI01_P,
|
||||
acme_util.HTTP01_P,
|
||||
acme_util.DNS_P)
|
||||
# Typical webserver client that can do everything except DNS
|
||||
# Attempted to make the order realistic
|
||||
prefs = [challenges.ProofOfPossession,
|
||||
challenges.HTTP01,
|
||||
challenges.TLSSNI01,
|
||||
challenges.RecoveryContact]
|
||||
combos = acme_util.gen_combos(challbs)
|
||||
self.assertEqual(self._call(challbs, prefs, combos), (1, 3))
|
||||
|
||||
# Dumb path trivial test
|
||||
self.assertTrue(self._call(challbs, prefs, None))
|
||||
|
||||
def test_not_supported(self):
|
||||
challbs = (acme_util.POP_P, acme_util.TLSSNI01_P)
|
||||
challbs = (acme_util.DNS_P, acme_util.TLSSNI01_P)
|
||||
prefs = [challenges.TLSSNI01]
|
||||
combos = ((0, 1),)
|
||||
|
||||
# smart path fails because no challs in perfs satisfies combos
|
||||
self.assertRaises(
|
||||
errors.AuthorizationError, self._call, challbs, prefs, combos)
|
||||
|
||||
|
||||
class MutuallyExclusiveTest(unittest.TestCase):
|
||||
"""Tests for letsencrypt.auth_handler.mutually_exclusive."""
|
||||
|
||||
# pylint: disable=missing-docstring,too-few-public-methods
|
||||
class A(object):
|
||||
pass
|
||||
|
||||
class B(object):
|
||||
pass
|
||||
|
||||
class C(object):
|
||||
pass
|
||||
|
||||
class D(C):
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def _call(cls, chall1, chall2, different=False):
|
||||
from letsencrypt.auth_handler import mutually_exclusive
|
||||
return mutually_exclusive(chall1, chall2, groups=frozenset([
|
||||
frozenset([cls.A, cls.B]), frozenset([cls.A, cls.C]),
|
||||
]), different=different)
|
||||
|
||||
def test_group_members(self):
|
||||
self.assertFalse(self._call(self.A(), self.B()))
|
||||
self.assertFalse(self._call(self.A(), self.C()))
|
||||
|
||||
def test_cross_group(self):
|
||||
self.assertTrue(self._call(self.B(), self.C()))
|
||||
|
||||
def test_same_type(self):
|
||||
self.assertFalse(self._call(self.A(), self.A(), different=False))
|
||||
self.assertTrue(self._call(self.A(), self.A(), different=True))
|
||||
|
||||
# in particular...
|
||||
obj = self.A()
|
||||
self.assertFalse(self._call(obj, obj, different=False))
|
||||
self.assertTrue(self._call(obj, obj, different=True))
|
||||
|
||||
def test_subclass(self):
|
||||
self.assertFalse(self._call(self.A(), self.D()))
|
||||
self.assertFalse(self._call(self.D(), self.A()))
|
||||
|
||||
|
||||
class IsPreferredTest(unittest.TestCase):
|
||||
"""Tests for letsencrypt.auth_handler.is_preferred."""
|
||||
|
||||
@classmethod
|
||||
def _call(cls, chall, satisfied):
|
||||
from letsencrypt.auth_handler import is_preferred
|
||||
return is_preferred(chall, satisfied, exclusive_groups=frozenset([
|
||||
frozenset([challenges.TLSSNI01, challenges.HTTP01]),
|
||||
frozenset([challenges.DNS, challenges.HTTP01]),
|
||||
]))
|
||||
|
||||
def test_empty_satisfied(self):
|
||||
self.assertTrue(self._call(acme_util.DNS_P, frozenset()))
|
||||
|
||||
def test_mutually_exclusvie(self):
|
||||
self.assertFalse(
|
||||
self._call(
|
||||
acme_util.TLSSNI01_P, frozenset([acme_util.HTTP01_P])))
|
||||
|
||||
def test_mutually_exclusive_same_type(self):
|
||||
self.assertTrue(
|
||||
self._call(acme_util.TLSSNI01_P, frozenset([acme_util.TLSSNI01_P])))
|
||||
# dumb path fails because all challbs are not supported
|
||||
self.assertRaises(
|
||||
errors.AuthorizationError, self._call, challbs, prefs, None)
|
||||
|
||||
|
||||
class ReportFailedChallsTest(unittest.TestCase):
|
||||
|
|
@ -486,11 +407,11 @@ def gen_auth_resp(chall_list):
|
|||
for chall in chall_list]
|
||||
|
||||
|
||||
def gen_dom_authzr(domain, unused_new_authzr_uri, challs):
|
||||
def gen_dom_authzr(domain, unused_new_authzr_uri, challs, combos=True):
|
||||
"""Generates new authzr for domains."""
|
||||
return acme_util.gen_authzr(
|
||||
messages.STATUS_PENDING, domain, challs,
|
||||
[messages.STATUS_PENDING] * len(challs))
|
||||
[messages.STATUS_PENDING] * len(challs), combos)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
|
|||
|
|
@ -107,7 +107,7 @@ class ClientTest(unittest.TestCase):
|
|||
self.acme = acme.return_value = mock.MagicMock()
|
||||
self.client = Client(
|
||||
config=self.config, account_=self.account,
|
||||
dv_auth=None, installer=None)
|
||||
auth=None, installer=None)
|
||||
|
||||
def test_init_acme_verify_ssl(self):
|
||||
net = self.acme_client.call_args[1]["net"]
|
||||
|
|
|
|||
|
|
@ -1,67 +0,0 @@
|
|||
"""Test for letsencrypt.continuity_auth."""
|
||||
import unittest
|
||||
|
||||
import mock
|
||||
|
||||
from acme import challenges
|
||||
|
||||
from letsencrypt import achallenges
|
||||
from letsencrypt import errors
|
||||
|
||||
|
||||
class PerformTest(unittest.TestCase):
|
||||
"""Test client perform function."""
|
||||
|
||||
def setUp(self):
|
||||
from letsencrypt.continuity_auth import ContinuityAuthenticator
|
||||
|
||||
self.auth = ContinuityAuthenticator(
|
||||
mock.MagicMock(server="demo_server.org"), None)
|
||||
self.auth.proof_of_pos.perform = mock.MagicMock(
|
||||
name="proof_of_pos_perform", side_effect=gen_client_resp)
|
||||
|
||||
def test_pop(self):
|
||||
achalls = []
|
||||
for i in xrange(4):
|
||||
achalls.append(achallenges.ProofOfPossession(
|
||||
challb=None, domain=str(i)))
|
||||
responses = self.auth.perform(achalls)
|
||||
|
||||
self.assertEqual(len(responses), 4)
|
||||
for i in xrange(4):
|
||||
self.assertEqual(responses[i], "ProofOfPossession%d" % i)
|
||||
|
||||
def test_unexpected(self):
|
||||
self.assertRaises(
|
||||
errors.ContAuthError, self.auth.perform, [
|
||||
achallenges.KeyAuthorizationAnnotatedChallenge(
|
||||
challb=None, domain="0", account_key="invalid_key")])
|
||||
|
||||
def test_chall_pref(self):
|
||||
self.assertEqual(
|
||||
self.auth.get_chall_pref("example.com"),
|
||||
[challenges.ProofOfPossession])
|
||||
|
||||
|
||||
class CleanupTest(unittest.TestCase):
|
||||
"""Test the Authenticator cleanup function."""
|
||||
|
||||
def setUp(self):
|
||||
from letsencrypt.continuity_auth import ContinuityAuthenticator
|
||||
|
||||
self.auth = ContinuityAuthenticator(
|
||||
mock.MagicMock(server="demo_server.org"), None)
|
||||
|
||||
def test_unexpected(self):
|
||||
unexpected = achallenges.KeyAuthorizationAnnotatedChallenge(
|
||||
challb=None, domain="0", account_key="dummy_key")
|
||||
self.assertRaises(errors.ContAuthError, self.auth.cleanup, [unexpected])
|
||||
|
||||
|
||||
def gen_client_resp(chall):
|
||||
"""Generate a dummy response."""
|
||||
return "%s%s" % (chall.__class__.__name__, chall.domain)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main() # pragma: no cover
|
||||
|
|
@ -1,83 +0,0 @@
|
|||
"""Tests for letsencrypt.proof_of_possession."""
|
||||
import os
|
||||
import tempfile
|
||||
import unittest
|
||||
|
||||
import mock
|
||||
|
||||
from acme import challenges
|
||||
from acme import jose
|
||||
from acme import messages
|
||||
|
||||
from letsencrypt import achallenges
|
||||
from letsencrypt import proof_of_possession
|
||||
from letsencrypt.display import util as display_util
|
||||
|
||||
from letsencrypt.tests import test_util
|
||||
|
||||
|
||||
CERT0_PATH = test_util.vector_path("cert.der")
|
||||
CERT2_PATH = test_util.vector_path("dsa_cert.pem")
|
||||
CERT2_KEY_PATH = test_util.vector_path("dsa512_key.pem")
|
||||
CERT3_PATH = test_util.vector_path("matching_cert.pem")
|
||||
CERT3_KEY_PATH = test_util.vector_path("rsa512_key_2.pem")
|
||||
CERT3_KEY = test_util.load_rsa_private_key("rsa512_key_2.pem").public_key()
|
||||
|
||||
|
||||
class ProofOfPossessionTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.installer = mock.MagicMock()
|
||||
self.cert1_path = tempfile.mkstemp()[1]
|
||||
certs = [CERT0_PATH, self.cert1_path, CERT2_PATH, CERT3_PATH]
|
||||
keys = [None, None, CERT2_KEY_PATH, CERT3_KEY_PATH]
|
||||
self.installer.get_all_certs_keys.return_value = zip(
|
||||
certs, keys, 4 * [None])
|
||||
self.proof_of_pos = proof_of_possession.ProofOfPossession(
|
||||
self.installer)
|
||||
|
||||
hints = challenges.ProofOfPossession.Hints(
|
||||
jwk=jose.JWKRSA(key=CERT3_KEY), cert_fingerprints=(),
|
||||
certs=(), serial_numbers=(), subject_key_identifiers=(),
|
||||
issuers=(), authorized_for=())
|
||||
chall = challenges.ProofOfPossession(
|
||||
alg=jose.RS256, nonce='zczv4HMLVe_0kimJ25Juig', hints=hints)
|
||||
challb = messages.ChallengeBody(
|
||||
chall=chall, uri="http://example", status=messages.STATUS_PENDING)
|
||||
self.achall = achallenges.ProofOfPossession(
|
||||
challb=challb, domain="example.com")
|
||||
|
||||
def tearDown(self):
|
||||
os.remove(self.cert1_path)
|
||||
|
||||
def test_perform_bad_challenge(self):
|
||||
hints = challenges.ProofOfPossession.Hints(
|
||||
jwk=jose.jwk.JWKOct(key="foo"), cert_fingerprints=(),
|
||||
certs=(), serial_numbers=(), subject_key_identifiers=(),
|
||||
issuers=(), authorized_for=())
|
||||
chall = challenges.ProofOfPossession(
|
||||
alg=jose.HS512, nonce='zczv4HMLVe_0kimJ25Juig', hints=hints)
|
||||
challb = messages.ChallengeBody(
|
||||
chall=chall, uri="http://example", status=messages.STATUS_PENDING)
|
||||
self.achall = achallenges.ProofOfPossession(
|
||||
challb=challb, domain="example.com")
|
||||
self.assertEqual(self.proof_of_pos.perform(self.achall), None)
|
||||
|
||||
def test_perform_no_input(self):
|
||||
self.assertTrue(self.proof_of_pos.perform(self.achall).verify())
|
||||
|
||||
@mock.patch("letsencrypt.proof_of_possession.zope.component.getUtility")
|
||||
def test_perform_with_input(self, mock_input):
|
||||
# Remove the matching certificate
|
||||
self.installer.get_all_certs_keys.return_value.pop()
|
||||
mock_input().input.side_effect = [(display_util.CANCEL, ""),
|
||||
(display_util.OK, CERT0_PATH),
|
||||
(display_util.OK, "imaginary_file"),
|
||||
(display_util.OK, CERT3_KEY_PATH)]
|
||||
self.assertFalse(self.proof_of_pos.perform(self.achall))
|
||||
self.assertFalse(self.proof_of_pos.perform(self.achall))
|
||||
self.assertFalse(self.proof_of_pos.perform(self.achall))
|
||||
self.assertTrue(self.proof_of_pos.perform(self.achall).verify())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main() # pragma: no cover
|
||||
Loading…
Reference in a new issue