Merge remote-tracking branch 'origin' into renewer

This commit is contained in:
Seth Schoen 2015-05-12 11:30:01 -07:00
commit 587c81ebb4
9 changed files with 263 additions and 9 deletions

View file

@ -0,0 +1,5 @@
:mod:`letsencrypt.client.proof_of_possession`
--------------------------------------------------
.. automodule:: letsencrypt.client.proof_of_possession
:members:

View file

@ -72,7 +72,8 @@ class Client(object):
# standalone (then default is False, otherwise default is True)
if dv_auth is not None:
cont_auth = continuity_auth.ContinuityAuthenticator(config)
cont_auth = continuity_auth.ContinuityAuthenticator(config,
installer)
self.auth_handler = auth_handler.AuthHandler(
dv_auth, cont_auth, self.network, self.account)
else:

View file

@ -6,6 +6,7 @@ from letsencrypt.acme import challenges
from letsencrypt.client import achallenges
from letsencrypt.client import errors
from letsencrypt.client import interfaces
from letsencrypt.client import proof_of_possession
from letsencrypt.client import recovery_token
@ -13,32 +14,42 @@ class ContinuityAuthenticator(object):
"""IAuthenticator for
:const:`~letsencrypt.acme.challenges.ContinuityChallenge` class challenges.
:ivar rec_token: Performs "recoveryToken" challenges
:ivar rec_token: Performs "recoveryToken" challenges.
:type rec_token: :class:`letsencrypt.client.recovery_token.RecoveryToken`
:ivar proof_of_pos: Performs "proofOfPossession" challenges.
:type proof_of_pos:
:class:`letsencrypt.client.proof_of_possession.Proof_of_Possession`
"""
zope.interface.implements(interfaces.IAuthenticator)
# This will have an installer soon for get_key/cert purposes
def __init__(self, config):
def __init__(self, config, installer):
"""Initialize Client Authenticator.
:param config: Configuration.
:type config: :class:`letsencrypt.client.interfaces.IConfig`
:param installer: Let's Encrypt Installer.
:type installer: :class:`letsencrypt.client.interfaces.IInstaller`
"""
self.rec_token = recovery_token.RecoveryToken(
config.server, config.rec_token_dir)
self.proof_of_pos = proof_of_possession.ProofOfPossession(installer)
def get_chall_pref(self, unused_domain): # pylint: disable=no-self-use
"""Return list of challenge preferences."""
return [challenges.RecoveryToken]
return [challenges.ProofOfPossession, challenges.RecoveryToken]
def perform(self, achalls):
"""Perform client specific challenges for IAuthenticator"""
responses = []
for achall in achalls:
if isinstance(achall, achallenges.RecoveryToken):
if isinstance(achall, achallenges.ProofOfPossession):
responses.append(self.proof_of_pos.perform(achall))
elif isinstance(achall, achallenges.RecoveryToken):
responses.append(self.rec_token.perform(achall))
else:
raise errors.LetsEncryptContAuthError("Unexpected Challenge")
@ -49,5 +60,5 @@ class ContinuityAuthenticator(object):
for achall in achalls:
if isinstance(achall, achallenges.RecoveryToken):
self.rec_token.cleanup(achall)
else:
elif not isinstance(achall, achallenges.ProofOfPossession):
raise errors.LetsEncryptContAuthError("Unexpected Challenge")

View file

@ -0,0 +1,85 @@
"""Proof of Possession Identifier Validation Challenge."""
import M2Crypto
import os
import zope.component
from letsencrypt.acme import challenges
from letsencrypt.acme import jose
from letsencrypt.acme import other
from letsencrypt.client import interfaces
from letsencrypt.client.display import util as display_util
class ProofOfPossession(object): # pylint: disable=too-few-public-methods
"""Proof of Possession Identifier Validation Challenge.
Based on draft-barnes-acme, section 6.5.
:ivar installer: Installer object
:type installer: :class:`~letsencrypt.client.interfaces.IInstaller`
"""
def __init__(self, installer):
self.installer = installer
def perform(self, achall):
"""Perform the Proof of Possession Challenge.
:param achall: Proof of Possession Challenge
:type achall: :class:`letsencrypt.client.achallenges.ProofOfPossession`
:returns: Response or None/False if the challenge cannot be completed
:rtype: :class:`letsencrypt.acme.challenges.ProofOfPossessionResponse`
or False
"""
if (achall.alg in [jose.HS256, jose.HS384, jose.HS512] or
not isinstance(achall.hints.jwk, achall.alg.kty)):
return None
for cert, key, _ in self.installer.get_all_certs_keys():
der_cert_key = M2Crypto.X509.load_cert(cert).get_pubkey().as_der()
try:
cert_key = achall.alg.kty.load(der_cert_key)
# If JWKES.load raises other exceptions, they should be caught here
except (IndexError, ValueError, TypeError):
continue
if cert_key == achall.hints.jwk:
return self._gen_response(achall, key)
# Is there are different prompt we should give the user?
code, key = zope.component.getUtility(
interfaces.IDisplay).input(
"Path to private key for identifier: %s " % achall.domain)
if code != display_util.CANCEL:
return self._gen_response(achall, key)
# If we get here, the key wasn't found
return False
def _gen_response(self, achall, key_path): # pylint: disable=no-self-use
"""Create the response to the Proof of Possession Challenge.
:param achall: Proof of Possession Challenge
:type achall: :class:`letsencrypt.client.achallenges.ProofOfPossession`
:param str key_path: Path to the key corresponding to the hinted to
public key.
:returns: Response or False if the challenge cannot be completed
:rtype: :class:`letsencrypt.acme.challenges.ProofOfPossessionResponse`
or False
"""
if os.path.isfile(key_path):
with open(key_path, 'rb') as key:
try:
# Needs to be changed if JWKES doesn't have a key attribute
jwk = achall.alg.kty.load(key.read())
sig = other.Signature.from_msg(achall.nonce, jwk.key,
alg=achall.alg)
except (IndexError, ValueError, TypeError, jose.errors.Error):
return False
return challenges.ProofOfPossessionResponse(nonce=achall.nonce,
signature=sig)
return False

View file

@ -16,9 +16,11 @@ class PerformTest(unittest.TestCase):
from letsencrypt.client.continuity_auth import ContinuityAuthenticator
self.auth = ContinuityAuthenticator(
mock.MagicMock(server="demo_server.org"))
mock.MagicMock(server="demo_server.org"), None)
self.auth.rec_token.perform = mock.MagicMock(
name="rec_token_perform", side_effect=gen_client_resp)
self.auth.proof_of_pos.perform = mock.MagicMock(
name="proof_of_pos_perform", side_effect=gen_client_resp)
def test_rec_token1(self):
token = achallenges.RecoveryToken(challb=None, domain="0")
@ -36,6 +38,24 @@ class PerformTest(unittest.TestCase):
for i in xrange(5):
self.assertEqual(responses[i], "RecoveryToken%d" % i)
def test_pop_and_rec_token(self):
achalls = []
for i in xrange(4):
if i % 2 == 0:
achalls.append(achallenges.RecoveryToken(challb=None,
domain=str(i)))
else:
achalls.append(achallenges.ProofOfPossession(challb=None,
domain=str(i)))
responses = self.auth.perform(achalls)
self.assertEqual(len(responses), 4)
for i in xrange(4):
if i % 2 == 0:
self.assertEqual(responses[i], "RecoveryToken%d" % i)
else:
self.assertEqual(responses[i], "ProofOfPossession%d" % i)
def test_unexpected(self):
self.assertRaises(
errors.LetsEncryptContAuthError, self.auth.perform, [
@ -43,7 +63,8 @@ class PerformTest(unittest.TestCase):
def test_chall_pref(self):
self.assertEqual(
self.auth.get_chall_pref("example.com"), [challenges.RecoveryToken])
self.auth.get_chall_pref("example.com"),
[challenges.ProofOfPossession, challenges.RecoveryToken])
class CleanupTest(unittest.TestCase):
@ -53,7 +74,7 @@ class CleanupTest(unittest.TestCase):
from letsencrypt.client.continuity_auth import ContinuityAuthenticator
self.auth = ContinuityAuthenticator(
mock.MagicMock(server="demo_server.org"))
mock.MagicMock(server="demo_server.org"), None)
self.mock_cleanup = mock.MagicMock(name="rec_token_cleanup")
self.auth.rec_token.cleanup = self.mock_cleanup

View file

@ -0,0 +1,86 @@
"""Tests for proof_of_possession.py"""
import Crypto.PublicKey.RSA
import os
import pkg_resources
import unittest
import mock
from letsencrypt.acme import challenges
from letsencrypt.acme import jose
from letsencrypt.acme import messages2
from letsencrypt.client import achallenges
from letsencrypt.client import proof_of_possession
from letsencrypt.client.display import util as display_util
BASE_PACKAGE = "letsencrypt.client.tests"
CERT0_PATH = pkg_resources.resource_filename(
BASE_PACKAGE, os.path.join("testdata", "cert.pem"))
CERT1_PATH = pkg_resources.resource_filename(
BASE_PACKAGE, os.path.join("testdata", "cert-san.pem"))
CERT2_PATH = pkg_resources.resource_filename(
BASE_PACKAGE, os.path.join("testdata", "dsa_cert.pem"))
CERT2_KEY_PATH = pkg_resources.resource_filename(
BASE_PACKAGE, os.path.join("testdata", "dsa512_key.pem"))
CERT3_PATH = pkg_resources.resource_filename(
BASE_PACKAGE, os.path.join("testdata", "matching_cert.pem"))
CERT3_KEY_PATH = pkg_resources.resource_filename(
BASE_PACKAGE, os.path.join("testdata", "rsa512_key.pem"))
CERT3_KEY = Crypto.PublicKey.RSA.importKey(pkg_resources.resource_string(
BASE_PACKAGE, os.path.join('testdata', 'rsa512_key.pem'))).publickey()
class ProofOfPossessionTest(unittest.TestCase):
def setUp(self):
self.installer = mock.MagicMock()
certs = [CERT0_PATH, CERT1_PATH, CERT2_PATH, CERT3_PATH]
keys = [None, None, CERT2_KEY_PATH, CERT3_KEY_PATH]
self.installer.get_all_certs_keys.return_value = zip(
certs, keys, 4 * [None])
self.proof_of_pos = proof_of_possession.ProofOfPossession(
self.installer)
hints = challenges.ProofOfPossession.Hints(
jwk=jose.JWKRSA(key=CERT3_KEY), cert_fingerprints=(),
certs=(), serial_numbers=(), subject_key_identifiers=(),
issuers=(), authorized_for=())
chall = challenges.ProofOfPossession(
alg=jose.RS256, nonce='zczv4HMLVe_0kimJ25Juig', hints=hints)
challb = messages2.ChallengeBody(
chall=chall, uri="http://example", status=messages2.STATUS_PENDING)
self.achall = achallenges.ProofOfPossession(
challb=challb, domain="example.com")
def test_perform_bad_challenge(self):
hints = challenges.ProofOfPossession.Hints(
jwk=jose.jwk.JWKOct(key=CERT3_KEY), cert_fingerprints=(),
certs=(), serial_numbers=(), subject_key_identifiers=(),
issuers=(), authorized_for=())
chall = challenges.ProofOfPossession(
alg=jose.HS512, nonce='zczv4HMLVe_0kimJ25Juig', hints=hints)
challb = messages2.ChallengeBody(
chall=chall, uri="http://example", status=messages2.STATUS_PENDING)
self.achall = achallenges.ProofOfPossession(
challb=challb, domain="example.com")
self.assertEqual(self.proof_of_pos.perform(self.achall), None)
def test_perform_no_input(self):
self.assertTrue(self.proof_of_pos.perform(self.achall).verify())
@mock.patch("letsencrypt.client.recovery_token.zope.component.getUtility")
def test_perform_with_input(self, mock_input):
# Remove the matching certificate
self.installer.get_all_certs_keys.return_value.pop()
mock_input().input.side_effect = [(display_util.CANCEL, ""),
(display_util.OK, CERT0_PATH),
(display_util.OK, "imaginary_file"),
(display_util.OK, CERT3_KEY_PATH)]
self.assertFalse(self.proof_of_pos.perform(self.achall))
self.assertFalse(self.proof_of_pos.perform(self.achall))
self.assertFalse(self.proof_of_pos.perform(self.achall))
self.assertTrue(self.proof_of_pos.perform(self.achall).verify())
if __name__ == "__main__":
unittest.main() # pragma: no cover

View file

@ -0,0 +1,14 @@
-----BEGIN DSA PARAMETERS-----
MIGdAkEAwebEoGBfokKQeALHHnAZMQwYU35ILEBdV8oUmzv7qpSVUoHihyqfn6GC
OixAKSP8EJYcTilIqPbFbfFyOPlbLwIVANoFHEDiQgknAvKrG78pHzAJdQSPAkEA
qfka5Bnl+CeEMpzVZGrOVqZE/LFdZK9eT6YtWjzqtIkf3hwXUVxJsTnBG4xmrfvl
41pgNJpgu99YOYqPpS0g7A==
-----END DSA PARAMETERS-----
-----BEGIN DSA PRIVATE KEY-----
MIH5AgEAAkEAwebEoGBfokKQeALHHnAZMQwYU35ILEBdV8oUmzv7qpSVUoHihyqf
n6GCOixAKSP8EJYcTilIqPbFbfFyOPlbLwIVANoFHEDiQgknAvKrG78pHzAJdQSP
AkEAqfka5Bnl+CeEMpzVZGrOVqZE/LFdZK9eT6YtWjzqtIkf3hwXUVxJsTnBG4xm
rfvl41pgNJpgu99YOYqPpS0g7AJATQ2LUzjGQSM6UljcPY5I2OD9THkUR9kH2tth
zZd70UoI9btrVaTizgqYShuok94glSQNK0H92JgUk3scJPaAkAIVAMDn61h6vrCE
mNv063So6E+eYaIN
-----END DSA PRIVATE KEY-----

View file

@ -0,0 +1,17 @@
-----BEGIN CERTIFICATE-----
MIICuDCCAnWgAwIBAgIJAPjmErVMzwVLMAsGCWCGSAFlAwQDAjB3MQswCQYDVQQG
EwJVUzERMA8GA1UECAwITWljaGlnYW4xEjAQBgNVBAcMCUFubiBBcmJvcjErMCkG
A1UECgwiVW5pdmVyc2l0eSBvZiBNaWNoaWdhbiBhbmQgdGhlIEVGRjEUMBIGA1UE
AwwLZXhhbXBsZS5jb20wHhcNMTUwNTEyMTUzOTQzWhcNMTUwNjExMTUzOTQzWjB3
MQswCQYDVQQGEwJVUzERMA8GA1UECAwITWljaGlnYW4xEjAQBgNVBAcMCUFubiBB
cmJvcjErMCkGA1UECgwiVW5pdmVyc2l0eSBvZiBNaWNoaWdhbiBhbmQgdGhlIEVG
RjEUMBIGA1UEAwwLZXhhbXBsZS5jb20wgfEwgakGByqGSM44BAEwgZ0CQQDB5sSg
YF+iQpB4AscecBkxDBhTfkgsQF1XyhSbO/uqlJVSgeKHKp+foYI6LEApI/wQlhxO
KUio9sVt8XI4+VsvAhUA2gUcQOJCCScC8qsbvykfMAl1BI8CQQCp+RrkGeX4J4Qy
nNVkas5WpkT8sV1kr15Ppi1aPOq0iR/eHBdRXEmxOcEbjGat++XjWmA0mmC731g5
io+lLSDsA0MAAkBNDYtTOMZBIzpSWNw9jkjY4P1MeRRH2Qfa22HNl3vRSgj1u2tV
pOLOCphKG6iT3iCVJA0rQf3YmBSTexwk9oCQo1AwTjAdBgNVHQ4EFgQUZ2DlTDGU
PMwTUt0KztM6IyX61BcwHwYDVR0jBBgwFoAUZ2DlTDGUPMwTUt0KztM6IyX61Bcw
DAYDVR0TBAUwAwEB/zALBglghkgBZQMEAwIDMAAwLQIVAIbMgGx+KwBr4rgqZ2Lh
AAO8TegHAhQsuxpIIIphiReoWEtEJk4TqEIz/A==
-----END CERTIFICATE-----

View file

@ -0,0 +1,14 @@
-----BEGIN CERTIFICATE-----
MIICNzCCAeGgAwIBAgIJALizm9Y3q620MA0GCSqGSIb3DQEBCwUAMHcxCzAJBgNV
BAYTAlVTMREwDwYDVQQIDAhNaWNoaWdhbjESMBAGA1UEBwwJQW5uIEFyYm9yMSsw
KQYDVQQKDCJVbml2ZXJzaXR5IG9mIE1pY2hpZ2FuIGFuZCB0aGUgRUZGMRQwEgYD
VQQDDAtleGFtcGxlLmNvbTAeFw0xNTA1MDkwMDI0NTJaFw0xNjA1MDgwMDI0NTJa
MHcxCzAJBgNVBAYTAlVTMREwDwYDVQQIDAhNaWNoaWdhbjESMBAGA1UEBwwJQW5u
IEFyYm9yMSswKQYDVQQKDCJVbml2ZXJzaXR5IG9mIE1pY2hpZ2FuIGFuZCB0aGUg
RUZGMRQwEgYDVQQDDAtleGFtcGxlLmNvbTBcMA0GCSqGSIb3DQEBAQUAA0sAMEgC
QQD0thFxUTc2v6qV55wRxfwnBUOeN4bVfu5ywJqy65kzR7T1yZi5TPEiQyM7/3Hg
BVy9ddFc8RX4vNZaR+ROXNEzAgMBAAGjUDBOMB0GA1UdDgQWBBRJieHEVSHKmBk0
mTExx1erzlylCjAfBgNVHSMEGDAWgBRJieHEVSHKmBk0mTExx1erzlylCjAMBgNV
HRMEBTADAQH/MA0GCSqGSIb3DQEBCwUAA0EABT/nlpqOaanFSLZmWIrKv0zt63k4
bmWNMA8fYT45KYpLomsW8qXdpC82IlVKfNk7fW0UYT3HOeDSJRcycxNCTQ==
-----END CERTIFICATE-----