Add http-01 to acme

This commit is contained in:
Jakub Warmuz 2015-10-28 20:41:15 +00:00
parent d2c5b87b95
commit 602f0b2dbe
No known key found for this signature in database
GPG key ID: 2A7BAD3A489B52EA
5 changed files with 367 additions and 9 deletions

View file

@ -1,9 +1,11 @@
"""ACME Identifier Validation Challenges."""
import abc
import functools
import hashlib
import logging
import socket
from cryptography.hazmat.primitives import hashes
import OpenSSL
import requests
@ -79,7 +81,7 @@ class UnrecognizedChallenge(Challenge):
class _TokenDVChallenge(DVChallenge):
"""DV Challenge with token.
:ivar unicode token:
:ivar bytes token:
"""
TOKEN_SIZE = 128 / 8 # Based on the entropy value from the spec
@ -105,6 +107,217 @@ class _TokenDVChallenge(DVChallenge):
return b'..' not in self.token and b'/' not in self.token
class KeyAuthorizationChallengeResponse(ChallengeResponse):
"""Response to Challenges based on Key Authorization.
:param unicode key_authorization:
"""
key_authorization = jose.Field("keyAuthorization")
thumbprint_hash_function = hashes.SHA256
def verify(self, chall, account_public_key):
"""Verify the key authorization.
:param KeyAuthorization chall: Challenge that corresponds to
this response.
:param JWK account_public_key:
:return: ``True`` iff verification of the key authorization was
successful.
:rtype: bool
"""
parts = self.key_authorization.split('.') # pylint: disable=no-member
if len(parts) != 2:
logger.debug("Key authorization (%r) is not well formed",
self.key_authorization)
return False
if parts[0] != chall.encode("token"):
logger.debug("Mismatching token in key authorization: "
"%r instead of %r", parts[0], chall.encode("token"))
return False
thumbprint = jose.b64encode(account_public_key.thumbprint(
hash_function=self.thumbprint_hash_function)).decode()
if parts[1] != thumbprint:
logger.debug("Mismatching thumbprint in key authorization: "
"%r instead of %r", parts[0], thumbprint)
return False
return True
class KeyAuthorizationChallenge(_TokenDVChallenge):
# pylint: disable=abstract-class-little-used,too-many-ancestors
"""Challenge based on Key Authorization.
:param response_cls: Subclass of `KeyAuthorizationChallengeResponse`
that will be used to generate `response`.
"""
__metaclass__ = abc.ABCMeta
response_cls = NotImplemented
thumbprint_hash_function = (
KeyAuthorizationChallengeResponse.thumbprint_hash_function)
def key_authorization(self, account_key):
"""Generate Key Authorization.
:param JWK account_key:
:rtype unicode:
"""
return self.encode("token") + "." + jose.b64encode(
account_key.thumbprint(
hash_function=self.thumbprint_hash_function)).decode()
def response(self, account_key):
"""Generate response to the challenge.
:param JWK account_key:
:returns: Response (initialized `response_cls`) to the challenge.
:rtype: KeyAuthorizationChallengeResponse
"""
return self.response_cls(
key_authorization=self.key_authorization(account_key))
@abc.abstractmethod
def validation(self, account_key):
"""Generate validation for the challenge.
Subclasses must implement this method, but they are likely to
return completely different data structures, depending on what's
necessary to complete the challenge. Interepretation of that
return value must be known to the caller.
:param JWK account_key:
:returns: Challenge-specific validation.
"""
raise NotImplementedError() # pragma: no cover
def response_and_validation(self, account_key):
"""Generate response and validation.
Convenience function that return results of `response` and
`validation`.
:param JWK account_key:
:rtype: tuple
"""
return (self.response(account_key), self.validation(account_key))
@ChallengeResponse.register
class HTTP01Response(KeyAuthorizationChallengeResponse):
"""ACME http-01 challenge response."""
typ = "http-01"
PORT = 80
def simple_verify(self, chall, domain, account_public_key, port=None):
"""Simple verify.
:param challenges.SimpleHTTP chall: Corresponding challenge.
:param unicode domain: Domain name being verified.
:param account_public_key: Public key for the key pair
being authorized. If ``None`` key verification is not
performed!
:param JWK account_public_key:
:param int port: Port used in the validation.
:returns: ``True`` iff validation is successful, ``False``
otherwise.
:rtype: bool
"""
if not self.verify(chall, account_public_key):
logger.debug("Verification of key authorization in response failed")
return False
# TODO: ACME specification defines URI template that doesn't
# allow to use a custom port... Make sure port is not in the
# request URI, if it's standard.
if port is not None and port != self.PORT:
logger.warning(
"Using non-standard port for SimpleHTTP verification: %s", port)
domain += ":{0}".format(port)
uri = self.uri(domain, chall)
logger.debug("Verifying %s at %s...", chall.typ, uri)
try:
http_response = requests.get(uri)
except requests.exceptions.RequestException as error:
logger.error("Unable to reach %s: %s", uri, error)
return False
logger.debug("Received %s: %s. Headers: %s", http_response,
http_response.text, http_response.headers)
found_ct = http_response.headers.get(
"Content-Type", chall.CONTENT_TYPE)
if found_ct != chall.CONTENT_TYPE:
logger.debug("Wrong Content-Type: found %r, expected %r",
found_ct, chall.CONTENT_TYPE)
return False
if self.key_authorization != http_response.text:
logger.debug("Key authorization from response (%r) doesn't match "
"HTTP response (%r)", self.key_authorization,
http_response.text)
return False
return True
@Challenge.register # pylint: disable=too-many-ancestors
class HTTP01(KeyAuthorizationChallenge):
"""ACME http-01 challenge."""
response_cls = HTTP01Response
typ = response_cls.typ
CONTENT_TYPE = "text/plain"
"""Content-Type header that must be used for provisioned resource."""
URI_ROOT_PATH = ".well-known/acme-challenge"
"""URI root path for the server provisioned resource."""
@property
def path(self):
"""Path (starting with '/') for provisioned resource.
:rtype: string
"""
return '/' + self.URI_ROOT_PATH + '/' + self.encode('token')
def uri(self, domain):
"""Create an URI to the provisioned resource.
Forms an URI to the HTTPS server provisioned resource
(containing :attr:`~SimpleHTTP.token`).
:param unicode domain: Domain name being verified.
:rtype: string
"""
return "http://" + domain + self.path
def validation(self, account_key):
"""Generate validation.
:param JWK account_key:
:rtype: unicode
"""
return self.key_authorization(account_key)
@Challenge.register # pylint: disable=too-many-ancestors
class SimpleHTTP(_TokenDVChallenge):
"""ACME "simpleHttp" challenge."""
@ -229,7 +442,7 @@ class SimpleHTTPResponse(ChallengeResponse):
# allow to use a custom port... Make sure port is not in the
# request URI, if it's standard.
if port is not None and port != self.port:
logger.warn(
logger.warning(
"Using non-standard port for SimpleHTTP verification: %s", port)
domain += ":{0}".format(port)
@ -529,11 +742,7 @@ class ProofOfPossessionResponse(ChallengeResponse):
@Challenge.register # pylint: disable=too-many-ancestors
class DNS(_TokenDVChallenge):
"""ACME "dns" challenge.
:ivar unicode token:
"""
"""ACME "dns" challenge."""
typ = "dns"
LABEL = "_acme-challenge"

View file

@ -43,6 +43,149 @@ class UnrecognizedChallengeTest(unittest.TestCase):
self.chall, UnrecognizedChallenge.from_json(self.jobj))
class KeyAuthorizationChallengeResponseTest(unittest.TestCase):
def setUp(self):
def _encode(name):
assert name == "token"
return "foo"
self.chall = mock.Mock()
self.chall.encode.side_effect = _encode
def test_verify_ok(self):
from acme.challenges import KeyAuthorizationChallengeResponse
response = KeyAuthorizationChallengeResponse(
key_authorization='foo.oKGqedy-b-acd5eoybm2f-NVFxvyOoET5CNy3xnv8WY')
self.assertTrue(response.verify(self.chall, KEY.public_key()))
def test_verify_wrong_token(self):
from acme.challenges import KeyAuthorizationChallengeResponse
response = KeyAuthorizationChallengeResponse(
key_authorization='bar.oKGqedy-b-acd5eoybm2f-NVFxvyOoET5CNy3xnv8WY')
self.assertFalse(response.verify(self.chall, KEY.public_key()))
def test_verify_wrong_thumbprint(self):
from acme.challenges import KeyAuthorizationChallengeResponse
response = KeyAuthorizationChallengeResponse(
key_authorization='foo.oKGqedy-b-acd5eoybm2f-NVFxv')
self.assertFalse(response.verify(self.chall, KEY.public_key()))
def test_verify_wrong_form(self):
from acme.challenges import KeyAuthorizationChallengeResponse
response = KeyAuthorizationChallengeResponse(
key_authorization='.foo.oKGqedy-b-acd5eoybm2f-NVFxvyOoET5CNy3xnv8WY')
self.assertFalse(response.verify(self.chall, KEY.public_key()))
class HTTP01ResponseTest(unittest.TestCase):
# pylint: disable=too-many-instance-attributes
def setUp(self):
from acme.challenges import HTTP01Response
self.msg = HTTP01Response(key_authorization=u'foo')
self.jmsg = {
'resource': 'challenge',
'type': 'http-01',
'keyAuthorization': u'foo',
}
from acme.challenges import HTTP01
self.chall = HTTP01(token=(b'x' * 16))
self.response = self.chall.response(KEY)
self.good_headers = {'Content-Type': HTTP01.CONTENT_TYPE}
def test_to_partial_json(self):
self.assertEqual(self.jmsg, self.msg.to_partial_json())
def test_from_json(self):
from acme.challenges import HTTP01Response
self.assertEqual(
self.msg, HTTP01Response.from_json(self.jmsg))
def test_from_json_hashable(self):
from acme.challenges import HTTP01Response
hash(HTTP01Response.from_json(self.jmsg))
def test_simple_verify_bad_key_authorization(self):
key2 = jose.JWKRSA.load(test_util.load_vector('rsa256_key.pem'))
self.response.simple_verify(self.chall, "local", key2.public_key())
@mock.patch("acme.challenges.requests.get")
def test_simple_verify_good_validation(self, mock_get):
validation = self.chall.validation(KEY)
mock_get.return_value = mock.MagicMock(
text=validation, headers=self.good_headers)
self.assertTrue(self.response.simple_verify(
self.chall, "local", KEY.public_key()))
mock_get.assert_called_once_with(self.chall.uri("local"))
@mock.patch("acme.challenges.requests.get")
def test_simple_verify_bad_validation(self, mock_get):
mock_get.return_value = mock.MagicMock(
text="!", headers=self.good_headers)
self.assertFalse(self.response.simple_verify(
self.chall, "local", KEY.public_key()))
@mock.patch("acme.challenges.requests.get")
def test_simple_verify_bad_content_type(self, mock_get):
mock_get().text = self.chall.token
self.assertFalse(self.response.simple_verify(
self.chall, "local", KEY.public_key()))
@mock.patch("acme.challenges.requests.get")
def test_simple_verify_connection_error(self, mock_get):
mock_get.side_effect = requests.exceptions.RequestException
self.assertFalse(self.response.simple_verify(
self.chall, "local", KEY.public_key()))
@mock.patch("acme.challenges.requests.get")
def test_simple_verify_port(self, mock_get):
self.response.simple_verify(
self.chall, domain="local",
account_public_key=KEY.public_key(), port=8080)
self.assertEqual("local:8080", urllib_parse.urlparse(
mock_get.mock_calls[0][1][0]).netloc)
class HTTP01Test(unittest.TestCase):
def setUp(self):
from acme.challenges import HTTP01
self.msg = HTTP01(
token=jose.decode_b64jose(
'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ+PCt92wr+oA'))
self.jmsg = {
'type': 'http-01',
'token': 'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA',
}
def test_path(self):
self.assertEqual(self.msg.path, '/.well-known/acme-challenge/'
'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA')
def test_uri(self):
self.assertEqual(
'http://example.com/.well-known/acme-challenge/'
'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA',
self.msg.uri('example.com'))
def test_to_partial_json(self):
self.assertEqual(self.jmsg, self.msg.to_partial_json())
def test_from_json(self):
from acme.challenges import HTTP01
self.assertEqual(self.msg, HTTP01.from_json(self.jmsg))
def test_from_json_hashable(self):
from acme.challenges import HTTP01
hash(HTTP01.from_json(self.jmsg))
def test_good_token(self):
self.assertTrue(self.msg.good_token)
self.assertFalse(
self.msg.update(token=b'..').good_token)
class SimpleHTTPTest(unittest.TestCase):
def setUp(self):
@ -55,6 +198,10 @@ class SimpleHTTPTest(unittest.TestCase):
'token': 'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA',
}
def test_path(self):
self.assertEqual(self.msg.path, '/.well-known/acme-challenge/'
'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA')
def test_to_partial_json(self):
self.assertEqual(self.jmsg, self.msg.to_partial_json())

View file

@ -47,6 +47,8 @@ class JWK(json_util.TypedJSONObjectWithFields):
https://tools.ietf.org/html/rfc7638
:returns bytes:
"""
digest = hashes.Hash(hash_function(), backend=default_backend())
digest.update(json.dumps(

View file

@ -278,7 +278,7 @@ class AuthorizationTest(unittest.TestCase):
self.challbs = (
ChallengeBody(
uri='http://challb1', status=STATUS_VALID,
chall=challenges.SimpleHTTP(token=b'IlirfxKKXAsHtmzK29Pj8A')),
chall=challenges.HTTP01(token=b'IlirfxKKXAsHtmzK29Pj8A')),
ChallengeBody(uri='http://challb2', status=STATUS_VALID,
chall=challenges.DNS(
token=b'DGyRejmCefe7v4NfDGDKfA')),

View file

@ -152,7 +152,7 @@ the ACME server. From the protocol, there are essentially two
different types of challenges. Challenges that must be solved by
individual plugins in order to satisfy domain validation (subclasses
of `~.DVChallenge`, i.e. `~.challenges.DVSNI`,
`~.challenges.SimpleHTTPS`, `~.challenges.DNS`) and continuity specific
`~.challenges.HTTP01`, `~.challenges.DNS`) and continuity specific
challenges (subclasses of `~.ContinuityChallenge`,
i.e. `~.challenges.RecoveryToken`, `~.challenges.RecoveryContact`,
`~.challenges.ProofOfPossession`). Continuity challenges are