Merge pull request #233 from kuba/acme

letsencrypt.acme module
This commit is contained in:
James Kasten 2015-02-15 23:14:50 -08:00
commit 9d090017b5
53 changed files with 1916 additions and 669 deletions

View file

@ -19,7 +19,7 @@ persistent=yes
# List of plugins (as comma separated values of python modules names) to load,
# usually to register additional checkers.
load-plugins=
load-plugins=linter_plugin
[MESSAGES CONTROL]

View file

@ -1,4 +1,4 @@
include README.rst CHANGES.rst
include README.rst CHANGES.rst linter_plugin.py
recursive-include letsencrypt *.json
recursive-include letsencrypt *.sh
recursive-include letsencrypt *.conf

5
docs/api/acme/errors.rst Normal file
View file

@ -0,0 +1,5 @@
:mod:`letsencrypt.acme.errors`
------------------------------
.. automodule:: letsencrypt.acme.errors
:members:

View file

@ -0,0 +1,5 @@
:mod:`letsencrypt.acme.interfaces`
----------------------------------
.. automodule:: letsencrypt.acme.interfaces
:members:

5
docs/api/acme/jose.rst Normal file
View file

@ -0,0 +1,5 @@
:mod:`letsencrypt.acme.jose`
----------------------------
.. automodule:: letsencrypt.acme.jose
:members:

View file

@ -0,0 +1,5 @@
:mod:`letsencrypt.acme.messages`
--------------------------------
.. automodule:: letsencrypt.acme.messages
:members:

5
docs/api/acme/other.rst Normal file
View file

@ -0,0 +1,5 @@
:mod:`letsencrypt.acme.other`
-----------------------------
.. automodule:: letsencrypt.acme.other
:members:

5
docs/api/acme/util.rst Normal file
View file

@ -0,0 +1,5 @@
:mod:`letsencrypt.acme.util`
----------------------------
.. automodule:: letsencrypt.acme.util
:members:

View file

@ -1,5 +0,0 @@
:mod:`letsencrypt.client.acme`
------------------------------
.. automodule:: letsencrypt.client.acme
:members:

View file

@ -0,0 +1 @@
"""ACME protocol implementation."""

View file

@ -0,0 +1,13 @@
"""ACME errors."""
class Error(Exception):
"""Generic ACME error."""
class ValidationError(Error):
"""ACME message validation error."""
class UnrecognizedMessageTypeError(ValidationError):
"""Unrecognized ACME message type error."""
class SchemaValidationError(ValidationError):
"""JSON schema ACME message validation error."""

View file

@ -0,0 +1,22 @@
"""ACME interfaces."""
import zope.interface
# pylint: disable=no-self-argument,no-method-argument,no-init,inherit-non-class
class IJSONSerializable(zope.interface.Interface):
# pylint: disable=too-few-public-methods
"""JSON serializable object."""
def to_json():
"""Prepare JSON serializable object.
:returns: JSON object ready to be serialized. Note, however, that
this might return other
:class:`letsencrypt.acme.interfaces.IJSONSerializable`
objects, that haven't been serialized yet, which is fine as
long as :func:`letsencrypt.acme.util.dump_ijsonserializable`
is used.
:rtype: dict
"""

100
letsencrypt/acme/jose.py Normal file
View file

@ -0,0 +1,100 @@
"""JOSE."""
import base64
import binascii
import Crypto.PublicKey.RSA
from letsencrypt.acme import util
def _leading_zeros(arg):
if len(arg) % 2:
return '0' + arg
return arg
class JWK(util.JSONDeSerializable, util.ImmutableMap):
# pylint: disable=too-few-public-methods
"""JSON Web Key.
.. todo:: Currently works for RSA public keys only.
"""
__slots__ = ('key',)
schema = util.load_schema('jwk')
@classmethod
def _encode_param(cls, param):
"""Encode numeric key parameter."""
return b64encode(binascii.unhexlify(
_leading_zeros(hex(param)[2:].rstrip('L'))))
@classmethod
def _decode_param(cls, param):
"""Decode numeric key parameter."""
return long(binascii.hexlify(b64decode(param)), 16)
def to_json(self):
"""Serialize to JSON."""
return {
'kty': 'RSA', # TODO
'n': self._encode_param(self.key.n),
'e': self._encode_param(self.key.e),
}
@classmethod
def _from_valid_json(cls, jobj):
assert 'RSA' == jobj['kty'] # TODO
return cls(key=Crypto.PublicKey.RSA.construct(
(cls._decode_param(jobj['n']), cls._decode_param(jobj['e']))))
# https://tools.ietf.org/html/draft-ietf-jose-json-web-signature-37#appendix-C
#
# Jose Base64:
#
# - URL-safe Base64
#
# - padding stripped
def b64encode(data):
"""JOSE Base64 encode.
:param data: Data to be encoded.
:type data: str or bytearray
:returns: JOSE Base64 string.
:rtype: str
:raises TypeError: if `data` is of incorrect type
"""
if not isinstance(data, str):
raise TypeError('argument should be str or bytearray')
return base64.urlsafe_b64encode(data).rstrip('=')
def b64decode(data):
"""JOSE Base64 decode.
:param data: Base64 string to be decoded. If it's unicode, then
only ASCII characters are allowed.
:type data: str or unicode
:returns: Decoded data.
:raises TypeError: if input is of incorrect type
:raises ValueError: if input is unicode with non-ASCII characters
"""
if isinstance(data, unicode):
try:
data = data.encode('ascii')
except UnicodeEncodeError:
raise ValueError(
'unicode argument should contain only ASCII characters')
elif not isinstance(data, str):
raise TypeError('argument should be a str or unicode')
return base64.urlsafe_b64decode(data + '=' * (4 - (len(data) % 4)))

View file

@ -0,0 +1,120 @@
"""Tests for letsencrypt.acme.jose."""
import pkg_resources
import unittest
import Crypto.PublicKey.RSA
RSA256_KEY = Crypto.PublicKey.RSA.importKey(pkg_resources.resource_string(
'letsencrypt.client.tests', 'testdata/rsa256_key.pem'))
RSA512_KEY = Crypto.PublicKey.RSA.importKey(pkg_resources.resource_string(
'letsencrypt.client.tests', 'testdata/rsa512_key.pem'))
class JWKTest(unittest.TestCase):
"""Tests fro letsencrypt.acme.jose.JWK."""
def setUp(self):
from letsencrypt.acme.jose import JWK
self.jwk256 = JWK(key=RSA256_KEY.publickey())
self.jwk256json = {
'kty': 'RSA',
'e': 'AQAB',
'n': 'rHVztFHtH92ucFJD_N_HW9AsdRsUuHUBBBDlHwNlRd3fp5'
'80rv2-6QWE30cWgdmJS86ObRz6lUTor4R0T-3C5Q',
}
self.jwk512 = JWK(key=RSA512_KEY.publickey())
self.jwk512json = {
'kty': 'RSA',
'e': 'AQAB',
'n': '9LYRcVE3Nr-qleecEcX8JwVDnjeG1X7ucsCasuuZM0e09c'
'mYuUzxIkMjO_9x4AVcvXXRXPEV-LzWWkfkTlzRMw',
}
def test_equals(self):
self.assertEqual(self.jwk256, self.jwk256)
self.assertEqual(self.jwk512, self.jwk512)
def test_not_equals(self):
self.assertNotEqual(self.jwk256, self.jwk512)
self.assertNotEqual(self.jwk512, self.jwk256)
def test_to_json(self):
self.assertEqual(self.jwk256.to_json(), self.jwk256json)
self.assertEqual(self.jwk512.to_json(), self.jwk512json)
def test_from_json(self):
from letsencrypt.acme.jose import JWK
self.assertEqual(self.jwk256, JWK.from_json(self.jwk256json))
# TODO: fix schemata to allow RSA512
#self.assertEqual(self.jwk512, JWK.from_json(self.jwk512json))
# https://en.wikipedia.org/wiki/Base64#Examples
B64_PADDING_EXAMPLES = {
'any carnal pleasure.': ('YW55IGNhcm5hbCBwbGVhc3VyZS4', '='),
'any carnal pleasure': ('YW55IGNhcm5hbCBwbGVhc3VyZQ', '=='),
'any carnal pleasur': ('YW55IGNhcm5hbCBwbGVhc3Vy', ''),
'any carnal pleasu': ('YW55IGNhcm5hbCBwbGVhc3U', '='),
'any carnal pleas': ('YW55IGNhcm5hbCBwbGVhcw', '=='),
}
B64_URL_UNSAFE_EXAMPLES = {
chr(251) + chr(239): '--8',
chr(255) * 2: '__8',
}
class B64EncodeTest(unittest.TestCase):
"""Tests for letsencrypt.acme.jose.b64encode."""
@classmethod
def _call(cls, data):
from letsencrypt.acme.jose import b64encode
return b64encode(data)
def test_unsafe_url(self):
for text, b64 in B64_URL_UNSAFE_EXAMPLES.iteritems():
self.assertEqual(self._call(text), b64)
def test_different_paddings(self):
for text, (b64, _) in B64_PADDING_EXAMPLES.iteritems():
self.assertEqual(self._call(text), b64)
def test_unicode_fails_with_type_error(self):
self.assertRaises(TypeError, self._call, u'some unicode')
class B64DecodeTest(unittest.TestCase):
"""Tests for letsencrypt.acme.jose.b64decode."""
@classmethod
def _call(cls, data):
from letsencrypt.acme.jose import b64decode
return b64decode(data)
def test_unsafe_url(self):
for text, b64 in B64_URL_UNSAFE_EXAMPLES.iteritems():
self.assertEqual(self._call(b64), text)
def test_input_without_padding(self):
for text, (b64, _) in B64_PADDING_EXAMPLES.iteritems():
self.assertEqual(self._call(b64), text)
def test_input_with_padding(self):
for text, (b64, pad) in B64_PADDING_EXAMPLES.iteritems():
self.assertEqual(self._call(b64 + pad), text)
def test_unicode_with_ascii(self):
self.assertEqual(self._call(u'YQ'), 'a')
def test_non_ascii_unicode_fails(self):
self.assertRaises(ValueError, self._call, u'\u0105')
def test_type_error_no_unicode_or_str(self):
self.assertRaises(TypeError, self._call, object())
if __name__ == '__main__':
unittest.main()

View file

@ -0,0 +1,500 @@
"""ACME protocol messages."""
import M2Crypto
import zope.interface
from letsencrypt.acme import errors
from letsencrypt.acme import interfaces
from letsencrypt.acme import jose
from letsencrypt.acme import other
from letsencrypt.acme import util
class Message(util.JSONDeSerializable, util.ImmutableMap):
"""ACME message.
Messages are considered immutable.
"""
zope.interface.implements(interfaces.IJSONSerializable)
acme_type = NotImplemented
"""ACME message "type" field. Subclasses must override."""
TYPES = {}
"""Message types registered for JSON deserialization"""
@classmethod
def register(cls, msg_cls):
"""Register class for JSON deserialization."""
cls.TYPES[msg_cls.acme_type] = msg_cls
return msg_cls
def to_json(self):
"""Get JSON serializable object.
:returns: Serializable JSON object representing ACME message.
:meth:`validate` will almost certainly not work, due to reasons
explained in :class:`letsencrypt.acme.interfaces.IJSONSerializable`.
:rtype: dict
"""
jobj = self._fields_to_json()
jobj["type"] = self.acme_type
return jobj
def _fields_to_json(self):
"""Prepare ACME message fields for JSON serialiazation.
Subclasses must override this method.
:returns: Serializable JSON object containg all ACME message fields
apart from "type".
:rtype: dict
"""
raise NotImplementedError()
@classmethod
def get_msg_cls(cls, jobj):
"""Get the registered class for ``jobj``."""
if cls in cls.TYPES.itervalues():
# cls is already registered Message type, force to use it
# so that, e.g Revocation.from_json(jobj) fails if
# jobj["type"] != "revocation".
return cls
if not isinstance(jobj, dict):
raise errors.ValidationError(
"{0} is not a dictionary object".format(jobj))
try:
msg_type = jobj["type"]
except KeyError:
raise errors.ValidationError("missing type field")
try:
msg_cls = cls.TYPES[msg_type]
except KeyError:
raise errors.UnrecognizedMessageTypeError(msg_type)
return msg_cls
@classmethod
def from_json(cls, jobj, validate=True):
"""Deserialize validated ACME message from JSON string.
:param str jobj: JSON object.
:param bool validate: Validate against schema before deserializing.
Useful if :class:`JWK` is part of already validated json object.
:raises letsencrypt.acme.errors.ValidationError: if validation
was unsuccessful
:returns: Valid ACME message.
:rtype: subclass of :class:`Message`
"""
msg_cls = cls.get_msg_cls(jobj)
if validate:
msg_cls.validate_json(jobj)
# pylint: disable=protected-access
return msg_cls._from_valid_json(jobj)
@Message.register # pylint: disable=too-few-public-methods
class Challenge(Message):
"""ACME "challenge" message."""
acme_type = "challenge"
schema = util.load_schema(acme_type)
__slots__ = ("session_id", "nonce", "challenges", "combinations")
def _fields_to_json(self):
fields = {
"sessionID": self.session_id,
"nonce": jose.b64encode(self.nonce),
"challenges": self.challenges,
}
if self.combinations:
fields["combinations"] = self.combinations
return fields
@classmethod
def _from_valid_json(cls, jobj):
return cls(session_id=jobj["sessionID"],
nonce=jose.b64decode(jobj["nonce"]),
challenges=jobj["challenges"],
combinations=jobj.get("combinations", []))
@Message.register # pylint: disable=too-few-public-methods
class ChallengeRequest(Message):
"""ACME "challengeRequest" message.
:ivar str identifier: Domain name.
"""
acme_type = "challengeRequest"
schema = util.load_schema(acme_type)
__slots__ = ("identifier",)
def _fields_to_json(self):
return {
"identifier": self.identifier,
}
@classmethod
def _from_valid_json(cls, jobj):
return cls(identifier=jobj["identifier"])
@Message.register # pylint: disable=too-few-public-methods
class Authorization(Message):
"""ACME "authorization" message."""
acme_type = "authorization"
schema = util.load_schema(acme_type)
__slots__ = ("recovery_token", "identifier", "jwk")
def _fields_to_json(self):
fields = {}
if self.recovery_token is not None:
fields["recoveryToken"] = self.recovery_token
if self.identifier is not None:
fields["identifier"] = self.identifier
if self.jwk is not None:
fields["jwk"] = self.jwk
return fields
@classmethod
def _from_valid_json(cls, jobj):
jwk = jobj.get("jwk")
if jwk is not None:
jwk = jose.JWK.from_json(jwk, validate=False)
return cls(recovery_token=jobj.get("recoveryToken"),
identifier=jobj.get("identifier"), jwk=jwk)
@Message.register
class AuthorizationRequest(Message):
"""ACME "authorizationRequest" message.
:ivar str session_id: "sessionID" from the server challenge
:ivar str nonce: Nonce from the server challenge
:ivar list responses: List of completed challenges
:ivar signature: Signature (:class:`letsencrypt.acme.other.Signature`).
:ivar contact: TODO
"""
acme_type = "authorizationRequest"
schema = util.load_schema(acme_type)
__slots__ = ("session_id", "nonce", "responses", "signature", "contact")
@classmethod
def create(cls, name, key, sig_nonce=None, **kwargs):
"""Create signed "authorizationRequest".
:param str name: Hostname
:param key: Key used for signing.
:type key: :class:`Crypto.PublicKey.RSA`
:param str sig_nonce: Nonce used for signature. Useful for testing.
:kwargs: Any other arguments accepted by the class constructor.
:returns: Signed "authorizationRequest" ACME message.
:rtype: :class:`AuthorizationRequest`
"""
# pylint: disable=too-many-arguments
signature = other.Signature.from_msg(
name + kwargs["nonce"], key, sig_nonce)
return cls(
signature=signature, contact=kwargs.pop("contact", []), **kwargs)
def verify(self, name):
"""Verify signature.
.. warning:: Caller must check that the public key encoded in the
:attr:`signature`'s :class:`letsencrypt.acme.jose.JWK` object
is the correct key for a given context.
:param str name: Hostname
:returns: True iff ``signature`` can be verified, False otherwise.
:rtype: bool
"""
return self.signature.verify(name + self.nonce)
def _fields_to_json(self):
fields = {
"sessionID": self.session_id,
"nonce": jose.b64encode(self.nonce),
"responses": self.responses,
"signature": self.signature,
}
if self.contact:
fields["contact"] = self.contact
return fields
@classmethod
def _from_valid_json(cls, jobj):
return cls(session_id=jobj["sessionID"],
nonce=jose.b64decode(jobj["nonce"]),
responses=jobj["responses"],
signature=other.Signature.from_json(
jobj["signature"], validate=False),
contact=jobj.get("contact", []))
@Message.register # pylint: disable=too-few-public-methods
class Certificate(Message):
"""ACME "certificate" message.
:ivar certificate: The certificate (:class:`M2Crypto.X509.X509`
wrapped in :class:`letsencrypt.acme.util.ComparableX509`).
:ivar list chain: Chain of certificates (:class:`M2Crypto.X509.X509`
wrapped in :class:`letsencrypt.acme.util.ComparableX509` ).
"""
acme_type = "certificate"
schema = util.load_schema(acme_type)
__slots__ = ("certificate", "chain", "refresh")
def _fields_to_json(self):
fields = {"certificate": self._encode_cert(self.certificate)}
if self.chain:
fields["chain"] = [self._encode_cert(cert) for cert in self.chain]
if self.refresh is not None:
fields["refresh"] = self.refresh
return fields
@classmethod
def _decode_cert(cls, b64der):
return util.ComparableX509(M2Crypto.X509.load_cert_der_string(
jose.b64decode(b64der)))
@classmethod
def _encode_cert(cls, cert):
return jose.b64encode(cert.as_der())
@classmethod
def _from_valid_json(cls, jobj):
return cls(certificate=cls._decode_cert(jobj["certificate"]),
chain=[cls._decode_cert(cert) for cert in
jobj.get("chain", [])],
refresh=jobj.get("refresh"))
@Message.register
class CertificateRequest(Message):
"""ACME "certificateRequest" message.
:ivar csr: Certificate Signing Request (:class:`M2Crypto.X509.Request`
wrapped in :class:`letsencrypt.acme.util.ComparableX509`.
:ivar signature: Signature (:class:`letsencrypt.acme.other.Signature`).
"""
acme_type = "certificateRequest"
schema = util.load_schema(acme_type)
__slots__ = ("csr", "signature")
@classmethod
def create(cls, key, sig_nonce=None, **kwargs):
"""Create signed "certificateRequest".
:param key: Key used for signing.
:type key: :class:`Crypto.PublicKey.RSA`
:param str sig_nonce: Nonce used for signature. Useful for testing.
:kwargs: Any other arguments accepted by the class constructor.
:returns: Signed "certificateRequest" ACME message.
:rtype: :class:`CertificateRequest`
"""
return cls(signature=other.Signature.from_msg(
kwargs["csr"].as_der(), key, sig_nonce), **kwargs)
def verify(self):
"""Verify signature.
.. warning:: Caller must check that the public key encoded in the
:attr:`signature`'s :class:`letsencrypt.acme.jose.JWK` object
is the correct key for a given context.
:returns: True iff ``signature`` can be verified, False otherwise.
:rtype: bool
"""
return self.signature.verify(self.csr.as_der())
@classmethod
def _decode_csr(cls, b64der):
return util.ComparableX509(M2Crypto.X509.load_request_der_string(
jose.b64decode(b64der)))
@classmethod
def _encode_csr(cls, csr):
return jose.b64encode(csr.as_der())
def _fields_to_json(self):
return {
"csr": self._encode_csr(self.csr),
"signature": self.signature,
}
@classmethod
def _from_valid_json(cls, jobj):
return cls(csr=cls._decode_csr(jobj["csr"]),
signature=other.Signature.from_json(
jobj["signature"], validate=False))
@Message.register # pylint: disable=too-few-public-methods
class Defer(Message):
"""ACME "defer" message."""
acme_type = "defer"
schema = util.load_schema(acme_type)
__slots__ = ("token", "interval", "message")
def _fields_to_json(self):
fields = {"token": self.token}
if self.interval is not None:
fields["interval"] = self.interval
if self.message is not None:
fields["message"] = self.message
return fields
@classmethod
def _from_valid_json(cls, jobj):
return cls(token=jobj["token"], interval=jobj.get("interval"),
message=jobj.get("message"))
@Message.register # pylint: disable=too-few-public-methods
class Error(Message):
"""ACME "error" message."""
acme_type = "error"
schema = util.load_schema(acme_type)
__slots__ = ("error", "message", "more_info")
CODES = {
"malformed": "The request message was malformed",
"unauthorized": "The client lacks sufficient authorization",
"serverInternal": "The server experienced an internal error",
"notSupported": "The request type is not supported",
"unknown": "The server does not recognize an ID/token in the request",
"badCSR": "The CSR is unacceptable (e.g., due to a short key)",
}
def _fields_to_json(self):
fields = {"error": self.error}
if self.message is not None:
fields["message"] = self.message
if self.more_info is not None:
fields["moreInfo"] = self.more_info
return fields
@classmethod
def _from_valid_json(cls, jobj):
return cls(error=jobj["error"], message=jobj.get("message"),
more_info=jobj.get("moreInfo"))
@Message.register # pylint: disable=too-few-public-methods
class Revocation(Message):
"""ACME "revocation" message."""
acme_type = "revocation"
schema = util.load_schema(acme_type)
__slots__ = ()
def _fields_to_json(self):
return {}
@classmethod
def _from_valid_json(cls, jobj):
return cls()
@Message.register
class RevocationRequest(Message):
"""ACME "revocationRequest" message.
:ivar certificate: Certificate (:class:`M2Crypto.X509.X509`
wrapped in :class:`letsencrypt.acme.util.ComparableX509`).
:ivar signature: Signature (:class:`letsencrypt.acme.other.Signature`).
"""
acme_type = "revocationRequest"
schema = util.load_schema(acme_type)
__slots__ = ("certificate", "signature")
@classmethod
def create(cls, key, sig_nonce=None, **kwargs):
"""Create signed "revocationRequest".
:param key: Key used for signing.
:type key: :class:`Crypto.PublicKey.RSA`
:param str sig_nonce: Nonce used for signature. Useful for testing.
:kwargs: Any other arguments accepted by the class constructor.
:returns: Signed "revocationRequest" ACME message.
:rtype: :class:`RevocationRequest`
"""
return cls(signature=other.Signature.from_msg(
kwargs["certificate"].as_der(), key, sig_nonce), **kwargs)
def verify(self):
"""Verify signature.
.. warning:: Caller must check that the public key encoded in the
:attr:`signature`'s :class:`letsencrypt.acme.jose.JWK` object
is the correct key for a given context.
:returns: True iff ``signature`` can be verified, False otherwise.
:rtype: bool
"""
return self.signature.verify(self.certificate.as_der())
@classmethod
def _decode_cert(cls, b64der):
return util.ComparableX509(M2Crypto.X509.load_cert_der_string(
jose.b64decode(b64der)))
@classmethod
def _encode_cert(cls, cert):
return jose.b64encode(cert.as_der())
def _fields_to_json(self):
return {
"certificate": self._encode_cert(self.certificate),
"signature": self.signature,
}
@classmethod
def _from_valid_json(cls, jobj):
return cls(certificate=cls._decode_cert(jobj["certificate"]),
signature=other.Signature.from_json(
jobj["signature"], validate=False))
@Message.register # pylint: disable=too-few-public-methods
class StatusRequest(Message):
"""ACME "statusRequest" message.
:ivar unicode token: Token provided in ACME "defer" message.
"""
acme_type = "statusRequest"
schema = util.load_schema(acme_type)
__slots__ = ("token",)
def _fields_to_json(self):
return {"token": self.token}
@classmethod
def _from_valid_json(cls, jobj):
return cls(token=jobj["token"])

View file

@ -0,0 +1,484 @@
"""Tests for letsencrypt.acme.messages."""
import pkg_resources
import unittest
import Crypto.PublicKey.RSA
import M2Crypto.X509
import mock
from letsencrypt.acme import errors
from letsencrypt.acme import jose
from letsencrypt.acme import other
from letsencrypt.acme import util
KEY = Crypto.PublicKey.RSA.importKey(pkg_resources.resource_string(
'letsencrypt.client.tests', 'testdata/rsa256_key.pem'))
CERT = util.ComparableX509(M2Crypto.X509.load_cert(
pkg_resources.resource_filename(
'letsencrypt.client.tests', 'testdata/cert.pem')))
CSR = util.ComparableX509(M2Crypto.X509.load_request(
pkg_resources.resource_filename(
'letsencrypt.client.tests', 'testdata/csr.pem')))
class MessageTest(unittest.TestCase):
"""Tests for letsencrypt.acme.messages.Message."""
def setUp(self):
# pylint: disable=missing-docstring,too-few-public-methods
from letsencrypt.acme.messages import Message
class TestMessage(Message):
acme_type = 'test'
schema = {
'type': 'object',
'properties': {
'price': {'type': 'number'},
'name': {'type': 'string'},
},
}
@classmethod
def _from_valid_json(cls, jobj):
return jobj
def _fields_to_json(self):
return {'foo': 'bar'}
self.msg_cls = TestMessage
def test_to_json(self):
self.assertEqual(self.msg_cls().to_json(), {
'type': 'test',
'foo': 'bar',
})
def test_fields_to_json_not_implemented(self):
from letsencrypt.acme.messages import Message
# pylint: disable=protected-access
self.assertRaises(NotImplementedError, Message()._fields_to_json)
@classmethod
def _from_json(cls, jobj, validate=True):
from letsencrypt.acme.messages import Message
return Message.from_json(jobj, validate)
def test_from_json_non_dict_fails(self):
self.assertRaises(errors.ValidationError, self._from_json, [])
def test_from_json_dict_no_type_fails(self):
self.assertRaises(errors.ValidationError, self._from_json, {})
def test_from_json_unknown_type_fails(self):
self.assertRaises(errors.UnrecognizedMessageTypeError,
self._from_json, {'type': 'bar'})
@mock.patch('letsencrypt.acme.messages.Message.TYPES')
def test_from_json_validate_errors(self, types):
types.__getitem__.side_effect = lambda x: {'foo': self.msg_cls}[x]
self.assertRaises(errors.SchemaValidationError,
self._from_json, {'type': 'foo', 'price': 'asd'})
@mock.patch('letsencrypt.acme.messages.Message.TYPES')
def test_from_json_valid_returns_cls(self, types):
types.__getitem__.side_effect = lambda x: {'foo': self.msg_cls}[x]
self.assertEqual(self._from_json({'type': 'foo'}, validate=False),
{'type': 'foo'})
class ChallengeTest(unittest.TestCase):
def setUp(self):
challenges = [
{'type': 'simpleHttps', 'token': 'IlirfxKKXAsHtmzK29Pj8A'},
{'type': 'dns', 'token': 'DGyRejmCefe7v4NfDGDKfA'},
{'type': 'recoveryToken'},
]
combinations = [[0, 2], [1, 2]]
from letsencrypt.acme.messages import Challenge
self.msg = Challenge(
session_id='aefoGaavieG9Wihuk2aufai3aeZ5EeW4',
nonce='\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9',
challenges=challenges, combinations=combinations)
self.jmsg = {
'type': 'challenge',
'sessionID': 'aefoGaavieG9Wihuk2aufai3aeZ5EeW4',
'nonce': '7Nbyb1lI6xPVI3Hg3aKSqQ',
'challenges': challenges,
'combinations': combinations,
}
def test_to_json(self):
self.assertEqual(self.msg.to_json(), self.jmsg)
def test_from_json(self):
from letsencrypt.acme.messages import Challenge
self.assertEqual(Challenge.from_json(self.jmsg), self.msg)
def test_json_without_optionals(self):
del self.jmsg['combinations']
from letsencrypt.acme.messages import Challenge
msg = Challenge.from_json(self.jmsg)
self.assertEqual(msg.combinations, [])
self.assertEqual(msg.to_json(), self.jmsg)
class ChallengeRequestTest(unittest.TestCase):
def setUp(self):
from letsencrypt.acme.messages import ChallengeRequest
self.msg = ChallengeRequest(identifier='example.com')
self.jmsg = {
'type': 'challengeRequest',
'identifier': 'example.com',
}
def test_to_json(self):
self.assertEqual(self.msg.to_json(), self.jmsg)
def test_from_json(self):
from letsencrypt.acme.messages import ChallengeRequest
self.assertEqual(ChallengeRequest.from_json(self.jmsg), self.msg)
class AuthorizationTest(unittest.TestCase):
def setUp(self):
jwk = jose.JWK(key=KEY.publickey())
from letsencrypt.acme.messages import Authorization
self.msg = Authorization(recovery_token='tok', jwk=jwk,
identifier='example.com')
self.jmsg = {
'type': 'authorization',
'recoveryToken': 'tok',
'identifier': 'example.com',
'jwk': jwk,
}
def test_to_json(self):
self.assertEqual(self.msg.to_json(), self.jmsg)
def test_from_json(self):
self.jmsg['jwk'] = self.jmsg['jwk'].to_json()
from letsencrypt.acme.messages import Authorization
self.assertEqual(Authorization.from_json(self.jmsg), self.msg)
def test_json_without_optionals(self):
del self.jmsg['recoveryToken']
del self.jmsg['identifier']
del self.jmsg['jwk']
from letsencrypt.acme.messages import Authorization
msg = Authorization.from_json(self.jmsg)
self.assertTrue(msg.recovery_token is None)
self.assertTrue(msg.identifier is None)
self.assertTrue(msg.jwk is None)
self.assertEqual(self.jmsg, msg.to_json())
class AuthorizationRequestTest(unittest.TestCase):
def setUp(self):
self.responses = [
{'type': 'simpleHttps', 'path': 'Hf5GrX4Q7EBax9hc2jJnfw'},
None, # null
{'type': 'recoveryToken', 'token': '23029d88d9e123e'},
]
self.contact = ["mailto:cert-admin@example.com", "tel:+12025551212"]
signature = other.Signature(
alg='RS256', jwk=jose.JWK(key=KEY.publickey()),
sig='-v\xd8\xc2\xa3\xba0\xd6\x92\x16\xb5.\xbe\xa1[\x04\xbe'
'\x1b\xa1X\xd2)\x18\x94\x8f\xd7\xd0\xc0\xbbcI`W\xdf v'
'\xe4\xed\xe8\x03J\xe8\xc8<?\xc8W\x94\x94cj(\xe7\xaa$'
'\x92\xe9\x96\x11\xc2\xefx\x0bR',
nonce='\xab?\x08o\xe6\x81$\x9f\xa1\xc9\x025\x1c\x1b\xa5+')
from letsencrypt.acme.messages import AuthorizationRequest
self.msg = AuthorizationRequest(
session_id='aefoGaavieG9Wihuk2aufai3aeZ5EeW4',
nonce='\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9',
responses=self.responses,
signature=signature,
contact=self.contact,
)
self.jmsg_to = {
'type': 'authorizationRequest',
'sessionID': 'aefoGaavieG9Wihuk2aufai3aeZ5EeW4',
'nonce': '7Nbyb1lI6xPVI3Hg3aKSqQ',
'responses': self.responses,
'signature': signature,
'contact': self.contact,
}
self.jmsg_from = {
'type': 'authorizationRequest',
'sessionID': 'aefoGaavieG9Wihuk2aufai3aeZ5EeW4',
'nonce': '7Nbyb1lI6xPVI3Hg3aKSqQ',
'responses': self.responses,
'signature': signature.to_json(),
'contact': self.contact,
}
self.jmsg_from['signature']['jwk'] = self.jmsg_from[
'signature']['jwk'].to_json()
def test_create(self):
from letsencrypt.acme.messages import AuthorizationRequest
self.assertEqual(self.msg, AuthorizationRequest.create(
name='example.com', key=KEY, responses=self.responses,
nonce='\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9',
session_id='aefoGaavieG9Wihuk2aufai3aeZ5EeW4',
sig_nonce='\xab?\x08o\xe6\x81$\x9f\xa1\xc9\x025\x1c\x1b\xa5+',
contact=self.contact))
def test_verify(self):
self.assertTrue(self.msg.verify('example.com'))
def test_to_json(self):
self.assertEqual(self.msg.to_json(), self.jmsg_to)
def test_from_json(self):
from letsencrypt.acme.messages import AuthorizationRequest
self.assertEqual(
self.msg, AuthorizationRequest.from_json(self.jmsg_from))
def test_json_without_optionals(self):
del self.jmsg_from['contact']
del self.jmsg_to['contact']
from letsencrypt.acme.messages import AuthorizationRequest
msg = AuthorizationRequest.from_json(self.jmsg_from)
self.assertEqual(msg.contact, [])
self.assertEqual(self.jmsg_to, msg.to_json())
class CertificateTest(unittest.TestCase):
def setUp(self):
refresh = 'https://example.com/refresh/Dr8eAwTVQfSS/'
from letsencrypt.acme.messages import Certificate
self.msg = Certificate(
certificate=CERT, chain=[CERT], refresh=refresh)
self.jmsg = {
'type': 'certificate',
'certificate': jose.b64encode(CERT.as_der()),
'chain': [jose.b64encode(CERT.as_der())],
'refresh': refresh,
}
def test_to_json(self):
self.assertEqual(self.msg.to_json(), self.jmsg)
def test_from_json(self):
from letsencrypt.acme.messages import Certificate
self.assertEqual(Certificate.from_json(self.jmsg), self.msg)
def test_json_without_optionals(self):
del self.jmsg['chain']
del self.jmsg['refresh']
from letsencrypt.acme.messages import Certificate
msg = Certificate.from_json(self.jmsg)
self.assertEqual(msg.chain, [])
self.assertTrue(msg.refresh is None)
self.assertEqual(self.jmsg, msg.to_json())
class CertificateRequestTest(unittest.TestCase):
def setUp(self):
signature = other.Signature(
alg='RS256', jwk=jose.JWK(key=KEY.publickey()),
sig='\x15\xed\x84\xaa:\xf2DO\x0e9 \xbcg\xf8\xc0\xcf\x87\x9a'
'\x95\xeb\xffT[\x84[\xec\x85\x7f\x8eK\xe9\xc2\x12\xc8Q'
'\xafo\xc6h\x07\xba\xa6\xdf\xd1\xa7"$\xba=Z\x13n\x14\x0b'
'k\xfe\xee\xb4\xe4\xc8\x05\x9a\x08\xa7',
nonce='\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9')
from letsencrypt.acme.messages import CertificateRequest
self.msg = CertificateRequest(csr=CSR, signature=signature)
self.jmsg = {
'type': 'certificateRequest',
'csr': jose.b64encode(CSR.as_der()),
'signature': signature,
}
def test_create(self):
from letsencrypt.acme.messages import CertificateRequest
self.assertEqual(self.msg, CertificateRequest.create(
csr=CSR, key=KEY,
sig_nonce='\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9'))
def test_verify(self):
self.assertTrue(self.msg.verify())
def test_to_json(self):
self.assertEqual(self.msg.to_json(), self.jmsg)
def test_from_json(self):
from letsencrypt.acme.messages import CertificateRequest
self.jmsg['signature'] = self.jmsg['signature'].to_json()
self.jmsg['signature']['jwk'] = self.jmsg['signature']['jwk'].to_json()
self.assertEqual(self.msg, CertificateRequest.from_json(self.jmsg))
class DeferTest(unittest.TestCase):
def setUp(self):
from letsencrypt.acme.messages import Defer
self.msg = Defer(
token='O7-s9MNq1siZHlgrMzi9_A', interval=60,
message='Warming up the HSM')
self.jmsg = {
'type': 'defer',
'token': 'O7-s9MNq1siZHlgrMzi9_A',
'interval': 60,
'message': 'Warming up the HSM',
}
def test_to_json(self):
self.assertEqual(self.msg.to_json(), self.jmsg)
def test_from_json(self):
from letsencrypt.acme.messages import Defer
self.assertEqual(Defer.from_json(self.jmsg), self.msg)
def test_json_without_optionals(self):
del self.jmsg['interval']
del self.jmsg['message']
from letsencrypt.acme.messages import Defer
msg = Defer.from_json(self.jmsg)
self.assertTrue(msg.interval is None)
self.assertTrue(msg.message is None)
self.assertEqual(self.jmsg, msg.to_json())
class ErrorTest(unittest.TestCase):
def setUp(self):
from letsencrypt.acme.messages import Error
self.msg = Error(
error='badCSR', message='RSA keys must be at least 2048 bits long',
more_info='https://ca.example.com/documentation/csr-requirements')
self.jmsg = {
'type': 'error',
'error': 'badCSR',
'message':'RSA keys must be at least 2048 bits long',
'moreInfo': 'https://ca.example.com/documentation/csr-requirements',
}
def test_to_json(self):
self.assertEqual(self.msg.to_json(), self.jmsg)
def test_from_json(self):
from letsencrypt.acme.messages import Error
self.assertEqual(Error.from_json(self.jmsg), self.msg)
def test_json_without_optionals(self):
del self.jmsg['message']
del self.jmsg['moreInfo']
from letsencrypt.acme.messages import Error
msg = Error.from_json(self.jmsg)
self.assertTrue(msg.message is None)
self.assertTrue(msg.more_info is None)
self.assertEqual(self.jmsg, msg.to_json())
class RevocationTest(unittest.TestCase):
def setUp(self):
from letsencrypt.acme.messages import Revocation
self.msg = Revocation()
self.jmsg = {
'type': 'revocation',
}
def test_to_json(self):
self.assertEqual(self.msg.to_json(), self.jmsg)
def test_from_json(self):
from letsencrypt.acme.messages import Revocation
self.assertEqual(Revocation.from_json(self.jmsg), self.msg)
class RevocationRequestTest(unittest.TestCase):
def setUp(self):
self.sig_nonce = '\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9'
signature = other.Signature(
alg='RS256', jwk=jose.JWK(key=KEY.publickey()),
sig='eJ\xfe\x12"U\x87\x8b\xbf/ ,\xdeP\xb2\xdc1\xb00\xe5\x1dB'
'\xfch<\xc6\x9eH@!\x1c\x16\xb2\x0b_\xc4\xddP\x89\xc8\xce?'
'\x16g\x069I\xb9\xb3\x91\xb9\x0e$3\x9f\x87\x8e\x82\xca\xc5'
's\xd9\xd0\xe7',
nonce=self.sig_nonce)
from letsencrypt.acme.messages import RevocationRequest
self.msg = RevocationRequest(certificate=CERT, signature=signature)
self.jmsg = {
'type': 'revocationRequest',
'certificate': jose.b64encode(CERT.as_der()),
'signature': signature,
}
def test_create(self):
from letsencrypt.acme.messages import RevocationRequest
self.assertEqual(self.msg, RevocationRequest.create(
certificate=CERT, key=KEY, sig_nonce=self.sig_nonce))
def test_verify(self):
self.assertTrue(self.msg.verify())
def test_to_json(self):
self.assertEqual(self.msg.to_json(), self.jmsg)
def test_from_json(self):
self.jmsg['signature'] = self.jmsg['signature'].to_json()
self.jmsg['signature']['jwk'] = self.jmsg['signature']['jwk'].to_json()
from letsencrypt.acme.messages import RevocationRequest
self.assertEqual(self.msg, RevocationRequest.from_json(self.jmsg))
class StatusRequestTest(unittest.TestCase):
def setUp(self):
from letsencrypt.acme.messages import StatusRequest
self.msg = StatusRequest(token=u'O7-s9MNq1siZHlgrMzi9_A')
self.jmsg = {
'type': 'statusRequest',
'token': u'O7-s9MNq1siZHlgrMzi9_A',
}
def test_to_json(self):
self.assertEqual(self.msg.to_json(), self.jmsg)
def test_from_json(self):
from letsencrypt.acme.messages import StatusRequest
self.assertEqual(StatusRequest.from_json(self.jmsg), self.msg)
if __name__ == '__main__':
unittest.main()

83
letsencrypt/acme/other.py Normal file
View file

@ -0,0 +1,83 @@
"""JSON objects in ACME protocol other than messages."""
import logging
from Crypto import Random
import Crypto.Hash.SHA256
import Crypto.Signature.PKCS1_v1_5
from letsencrypt.acme import jose
from letsencrypt.acme import util
class Signature(util.JSONDeSerializable, util.ImmutableMap):
"""ACME signature.
:ivar str alg: Signature algorithm.
:ivar str sig: Signature.
:ivar str nonce: Nonce.
:ivar jwk: JWK.
:type jwk: :class:`letsencrypt.acme.jose.JWK`
.. todo:: Currently works for RSA keys only.
"""
__slots__ = ('alg', 'sig', 'nonce', 'jwk')
schema = util.load_schema('signature')
NONCE_LEN = 16
"""Size of nonce in bytes, as specified in the ACME protocol."""
@classmethod
def from_msg(cls, msg, key, nonce=None):
"""Create signature with nonce prepended to the message.
.. todo:: Protect against crypto unicode errors... is this sufficient?
Do I need to escape?
:param str msg: Message to be signed.
:param key: Key used for signing.
:type key: :class:`Crypto.PublicKey.RSA`
:param nonce: Nonce to be used. If None, nonce of
:const:`NONCE_LEN` size will be randomly generated.
:type nonce: str or None
"""
if nonce is None:
nonce = Random.get_random_bytes(cls.NONCE_LEN)
msg_with_nonce = nonce + msg
hashed = Crypto.Hash.SHA256.new(msg_with_nonce)
sig = Crypto.Signature.PKCS1_v1_5.new(key).sign(hashed)
logging.debug('%s signed as %s', msg_with_nonce, sig)
return cls(alg='RS256', sig=sig, nonce=nonce,
jwk=jose.JWK(key=key.publickey()))
def verify(self, msg):
"""Verify the signature.
:param str msg: Message that was used in signing.
"""
hashed = Crypto.Hash.SHA256.new(self.nonce + msg)
return Crypto.Signature.PKCS1_v1_5.new(self.jwk.key).verify(
hashed, self.sig)
def to_json(self):
"""Prepare JSON serializable object."""
return {
'alg': self.alg,
'sig': jose.b64encode(self.sig),
'nonce': jose.b64encode(self.nonce),
'jwk': self.jwk,
}
@classmethod
def _from_valid_json(cls, jobj):
return cls(alg=jobj['alg'], sig=jose.b64decode(jobj['sig']),
nonce=jose.b64decode(jobj['nonce']),
jwk=jose.JWK.from_json(jobj['jwk'], validate=False))

View file

@ -0,0 +1,87 @@
"""Tests for letsencrypt.acme.sig."""
import pkg_resources
import unittest
import Crypto.PublicKey.RSA
from letsencrypt.acme import jose
RSA256_KEY = Crypto.PublicKey.RSA.importKey(pkg_resources.resource_string(
'letsencrypt.client.tests', 'testdata/rsa256_key.pem'))
class SigatureTest(unittest.TestCase):
# pylint: disable=too-many-instance-attributes
"""Tests for letsencrypt.acme.sig.Signature."""
def setUp(self):
self.msg = 'message'
self.alg = 'RS256'
self.sig = ('IC\xd8*\xe7\x14\x9e\x19S\xb7\xcf\xec3\x12\xe2\x8a\x03'
'\x98u\xff\xf0\x94\xe2\xd7<\x8f\xa8\xed\xa4KN\xc3\xaa'
'\xb9X\xc3w\xaa\xc0_\xd0\x05$y>l#\x10<\x96\xd2\xcdr\xa3'
'\x1b\xa1\xf5!f\xef\xc64\xb6\x13')
self.nonce = '\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9'
self.jwk = jose.JWK(key=RSA256_KEY.publickey())
b64sig = ('SUPYKucUnhlTt8_sMxLiigOYdf_wlOLXPI-o7aRLTsOquVjDd6r'
'AX9AFJHk-bCMQPJbSzXKjG6H1IWbvxjS2Ew')
b64nonce = '7Nbyb1lI6xPVI3Hg3aKSqQ'
self.jsig_to = {
'nonce': b64nonce,
'alg': self.alg,
'jwk': self.jwk,
'sig': b64sig,
}
self.jsig_from = {
'nonce': b64nonce,
'alg': self.alg,
'jwk': self.jwk.to_json(),
'sig': b64sig,
}
from letsencrypt.acme.other import Signature
self.signature = Signature(
alg=self.alg, sig=self.sig, nonce=self.nonce, jwk=self.jwk)
def test_attributes(self):
self.assertEqual(self.signature.nonce, self.nonce)
self.assertEqual(self.signature.alg, self.alg)
self.assertEqual(self.signature.sig, self.sig)
self.assertEqual(self.signature.jwk, self.jwk)
def test_verify_good_succeeds(self):
self.assertTrue(self.signature.verify(self.msg))
def test_verify_bad_fails(self):
self.assertFalse(self.signature.verify(self.msg + 'x'))
@classmethod
def _from_msg(cls, *args, **kwargs):
from letsencrypt.acme.other import Signature
return Signature.from_msg(*args, **kwargs)
def test_create_from_msg(self):
signature = self._from_msg(self.msg, RSA256_KEY, self.nonce)
self.assertEqual(self.signature, signature)
def test_create_from_msg_random_nonce(self):
signature = self._from_msg(self.msg, RSA256_KEY)
self.assertEqual(signature.alg, self.alg)
self.assertEqual(signature.jwk, self.jwk)
self.assertTrue(signature.verify(self.msg))
def test_to_json(self):
self.assertEqual(self.signature.to_json(), self.jsig_to)
def test_from_json(self):
from letsencrypt.acme.other import Signature
# pylint: disable=protected-access
self.assertEqual(
self.signature, Signature._from_valid_json(self.jsig_from))
if __name__ == '__main__':
unittest.main()

View file

@ -15,7 +15,7 @@
"type": "string"
},
"jwk": {
"$ref": "file:letsencrypt/client/schemata/jwk.json"
"$ref": "file:letsencrypt/acme/schemata/jwk.json"
}
}
}

View file

@ -15,14 +15,14 @@
"type": "string"
},
"signature" : {
"$ref": "file:letsencrypt/client/schemata/signature.json"
"$ref": "file:letsencrypt/acme/schemata/signature.json"
},
"responses": {
"type": "array",
"minItems": 1,
"items": {
"anyOf": [
{ "$ref": "file:letsencrypt/client/schemata/responseobject.json" },
{ "$ref": "file:letsencrypt/acme/schemata/responseobject.json" },
{ "type": "null" }
]
}

View file

@ -13,7 +13,7 @@
"pattern": "^[-_=0-9A-Za-z]+$"
},
"signature" : {
"$ref": "file:letsencrypt/client/schemata/signature.json"
"$ref": "file:letsencrypt/acme/schemata/signature.json"
}
}
}

View file

@ -18,7 +18,7 @@
"type": "array",
"minItems": 1,
"items": {
"$ref": "file:letsencrypt/client/schemata/challengeobject.json"
"$ref": "file:letsencrypt/acme/schemata/challengeobject.json"
}
},
"combinations": {

View file

@ -59,7 +59,7 @@
"pattern": "^[-_=0-9A-Za-z]+$"
},
"signature": {
"$ref": "file:letsencrypt/client/schemata/signature.json"
"$ref": "file:letsencrypt/acme/schemata/signature.json"
}
}
},

View file

@ -12,7 +12,7 @@
"type" : "string"
},
"signature" : {
"$ref": "file:letsencrypt/client/schemata/signature.json"
"$ref": "file:letsencrypt/acme/schemata/signature.json"
}
}
}

147
letsencrypt/acme/util.py Normal file
View file

@ -0,0 +1,147 @@
"""ACME utilities."""
import json
import pkg_resources
import jsonschema
import zope.interface
from letsencrypt.acme import errors
from letsencrypt.acme import interfaces
class ComparableX509(object): # pylint: disable=too-few-public-methods
"""Wrapper for M2Crypto.X509.* objects that supports __eq__.
Wraps around:
- :class:`M2Crypto.X509.X509`
- :class:`M2Crypto.X509.Request`
"""
def __init__(self, wrapped):
self._wrapped = wrapped
def __getattr__(self, name):
return getattr(self._wrapped, name)
def __eq__(self, other):
return self.as_der() == other.as_der()
def load_schema(name):
"""Load JSON schema from distribution."""
return json.load(open(pkg_resources.resource_filename(
__name__, "schemata/%s.json" % name)))
class JSONDeSerializable(object):
"""JSON (de)serializable object."""
zope.interface.implements(interfaces.IJSONSerializable)
schema = NotImplemented
@classmethod
def validate_json(cls, jobj):
"""Validate JSON object against schema.
:raises letsencrypt.acme.errors.SchemaValidationError: if object
couldn't be validated.
"""
try:
jsonschema.validate(jobj, cls.schema)
except jsonschema.ValidationError as error:
raise errors.SchemaValidationError(error)
@classmethod
def from_json(cls, jobj, validate=True):
"""Deserialize from JSON.
Note that the input ``jobj`` has not been sanitized in any way.
:param jobj: JSON object.
:param bool validate: Validate against schema before deserializing.
Useful if :class:`JWK` is part of already validated json object.
:raises letsencrypt.acme.errors.SchemaValidationError: if ``validate``
was ``True`` and object couldn't be validated.
:returns: instance of the class
"""
if validate:
cls.validate_json(jobj)
return cls._from_valid_json(jobj)
@classmethod
def _from_valid_json(cls, jobj):
"""Deserializa from valid JSON object.
:param jobj: JSON object that has been validated against schema.
"""
raise NotImplementedError()
@classmethod
def json_loads(cls, json_string, validate=True):
"""Load JSON string."""
return cls.from_json(json.loads(json_string), validate)
def to_json(self):
"""Prepare JSON serializable object."""
raise NotImplementedError()
def json_dumps(self):
"""Dump to JSON string using proper serializer.
:returns: JSON serialized string.
:rtype: str
"""
return json.dumps(self, default=dump_ijsonserializable)
def dump_ijsonserializable(python_object):
"""Serialize IJSONSerializable to JSON.
This is meant to be passed to :func:`json.dumps` as ``default``
argument.
"""
# providedBy | pylint: disable=no-member
if interfaces.IJSONSerializable.providedBy(python_object):
return python_object.to_json()
else:
raise TypeError(repr(python_object) + ' is not JSON serializable')
class ImmutableMap(object): # pylint: disable=too-few-public-methods
"""Immutable key to value mapping with attribute access."""
__slots__ = ()
"""Must be overriden in subclasses."""
def __init__(self, **kwargs):
if set(kwargs) != set(self.__slots__):
raise TypeError(
'__init__() takes exactly the following arguments: {0} '
'({1} given)'.format(', '.join(self.__slots__),
', '.join(kwargs) if kwargs else 'none'))
for slot in self.__slots__:
object.__setattr__(self, slot, kwargs.pop(slot))
def __setattr__(self, name, value):
raise AttributeError("can't set attribute")
def __eq__(self, other):
return isinstance(other, self.__class__) and all(
getattr(self, slot) == getattr(other, slot)
for slot in self.__slots__)
def __hash__(self):
return hash(tuple(getattr(self, slot) for slot in self.__slots__))
def __repr__(self):
return '{0}({1})'.format(self.__class__.__name__, ', '.join(
'{0}={1!r}'.format(slot, getattr(self, slot))
for slot in self.__slots__))

View file

@ -0,0 +1,167 @@
"""Tests for letsencrypt.acme.util."""
import functools
import json
import unittest
import zope.interface
from letsencrypt.acme import errors
from letsencrypt.acme import interfaces
class MockJSONSerialiazable(object):
# pylint: disable=missing-docstring,too-few-public-methods,no-self-use
zope.interface.implements(interfaces.IJSONSerializable)
def to_json(self):
return [3, 2, 1]
class JSONDeSerializableTest(unittest.TestCase):
"""Tests for letsencrypt.acme.util.JSONDeSerializable."""
def setUp(self):
from letsencrypt.acme.util import JSONDeSerializable
class Tester(JSONDeSerializable):
# pylint: disable=missing-docstring,no-self-use,
# pylint: disable=too-few-public-methods
zope.interface.implements(interfaces.IJSONSerializable)
schema = {'type': 'integer'}
def __init__(self, jobj):
self.jobj = jobj
@classmethod
def _from_valid_json(cls, jobj):
return cls(jobj)
def to_json(self):
return {'foo': MockJSONSerialiazable()}
self.tester_cls = Tester
def test_validate_invalid_json(self):
self.assertRaises(errors.SchemaValidationError,
self.tester_cls.validate_json, 'bang!')
def test_validate_valid_json(self):
self.tester_cls.validate_json(5)
def test_from_json(self):
self.assertEqual(5, self.tester_cls.from_json(5, validate=True).jobj)
def test_from_json_no_validation(self):
self.assertEqual(['1', 2], self.tester_cls.from_json(
['1', 2], validate=False).jobj)
def test_from_valid_json_raises_error(self):
from letsencrypt.acme.util import JSONDeSerializable
# pylint: disable=protected-access
self.assertRaises(
NotImplementedError, JSONDeSerializable._from_valid_json, 'foo')
def test_json_loads(self):
tester = self.tester_cls.json_loads('5', validate=True)
self.assertEqual(tester.jobj, 5)
def test_json_loads_no_validation(self):
self.assertEqual(
'foo', self.tester_cls.json_loads('"foo"', validate=False).jobj)
def test_to_json_raises_error(self):
from letsencrypt.acme.util import JSONDeSerializable
self.assertRaises(NotImplementedError, JSONDeSerializable().to_json)
def test_json_dumps(self):
self.assertEqual(
self.tester_cls('foo').json_dumps(), '{"foo": [3, 2, 1]}')
class DumpIJSONSerializableTest(unittest.TestCase):
"""Tests for letsencrypt.acme.util.dump_ijsonserializable."""
@classmethod
def _call(cls, obj):
from letsencrypt.acme.util import dump_ijsonserializable
return json.dumps(obj, default=dump_ijsonserializable)
def test_json_type(self):
self.assertEqual('5', self._call(5))
def test_ijsonserializable(self):
self.assertEqual('[3, 2, 1]', self._call(MockJSONSerialiazable()))
def test_raises_type_error(self):
self.assertRaises(TypeError, self._call, object())
class ImmutableMapTest(unittest.TestCase):
"""Tests for letsencrypt.acme.util.ImmutableMap."""
def setUp(self):
# pylint: disable=invalid-name,too-few-public-methods
# pylint: disable=missing-docstring
from letsencrypt.acme.util import ImmutableMap
class A(ImmutableMap):
__slots__ = ('x', 'y')
class B(ImmutableMap):
__slots__ = ('x', 'y')
self.A = A
self.B = B
self.a1 = self.A(x=1, y=2)
self.a1_swap = self.A(y=2, x=1)
self.a2 = self.A(x=3, y=4)
self.b = self.B(x=1, y=2)
def test_order_of_args_does_not_matter(self):
self.assertEqual(self.a1, self.a1_swap)
def test_type_error_on_missing(self):
self.assertRaises(TypeError, self.A, x=1)
self.assertRaises(TypeError, self.A, y=2)
def test_type_error_on_unrecognized(self):
self.assertRaises(TypeError, self.A, x=1, z=2)
self.assertRaises(TypeError, self.A, x=1, y=2, z=3)
def test_get_attr(self):
self.assertEqual(1, self.a1.x)
self.assertEqual(2, self.a1.y)
self.assertEqual(1, self.a1_swap.x)
self.assertEqual(2, self.a1_swap.y)
def test_set_attr_raises_attribute_error(self):
self.assertRaises(
AttributeError, functools.partial(self.a1.__setattr__, 'x'), 10)
def test_equal(self):
self.assertEqual(self.a1, self.a1)
self.assertEqual(self.a2, self.a2)
self.assertNotEqual(self.a1, self.a2)
def test_same_slots_diff_cls_not_equal(self):
self.assertEqual(self.a1.x, self.b.x)
self.assertEqual(self.a1.y, self.b.y)
self.assertNotEqual(self.a1, self.b)
def test_hash(self):
self.assertEqual(hash((1, 2)), hash(self.a1))
def test_unhashable(self):
self.assertRaises(TypeError, self.A(x=1, y={}).__hash__)
def test_repr(self):
self.assertEqual('A(x=1, y=2)', repr(self.a1))
self.assertEqual('A(x=1, y=2)', repr(self.a1_swap))
self.assertEqual('B(x=1, y=2)', repr(self.b))
self.assertEqual("B(x='foo', y='bar')", repr(self.B(x='foo', y='bar')))
if __name__ == '__main__':
unittest.main()

View file

@ -1,155 +0,0 @@
"""ACME protocol messages."""
import json
import pkg_resources
import jsonschema
from letsencrypt.client import crypto_util
from letsencrypt.client import le_util
SCHEMATA = dict([
(schema, json.load(open(pkg_resources.resource_filename(
__name__, "schemata/%s.json" % schema)))) for schema in [
"authorization",
"authorizationRequest",
"certificate",
"certificateRequest",
"challenge",
"challengeRequest",
"defer",
"error",
"revocation",
"revocationRequest",
"statusRequest"
]
])
def acme_object_validate(json_string, schemata=None):
"""Validate a JSON string against the ACME protocol using JSON Schema.
:param str json_string: Well-formed input JSON string.
:param dict schemata: Mapping from type name to JSON Schema
definition. Useful for testing.
:returns: None if validation was successful.
:raises jsonschema.ValidationError: if validation was unsuccessful
:raises ValueError: if the object cannot even be parsed as valid JSON
"""
schemata = SCHEMATA if schemata is None else schemata
json_object = json.loads(json_string)
if not isinstance(json_object, dict):
raise jsonschema.ValidationError("this is not a dictionary object")
if "type" not in json_object:
raise jsonschema.ValidationError("missing type field")
if json_object["type"] not in schemata:
raise jsonschema.ValidationError(
"unknown type %s" % json_object["type"])
jsonschema.validate(json_object, schemata[json_object["type"]])
def pretty(json_string):
"""Return a pretty-printed version of any JSON string.
Useful when printing out protocol messages for debugging purposes.
"""
return json.dumps(json.loads(json_string), indent=4)
def challenge_request(name):
"""Create ACME "challengeRequest message.
:param str name: Domain name
:returns: ACME "challengeRequest" message.
:rtype: dict
"""
return {
"type": "challengeRequest",
"identifier": name,
}
def authorization_request(req_id, name, server_nonce, responses, key,
nonce=None):
"""Create ACME "authorizationRequest" message.
:param str req_id: SessionID from the server challenge
:param str name: Hostname
:param str server_nonce: Nonce from the server challenge
:param list responses: List of completed challenges
:param str key: Key in string form. Accepted formats
are the same as for `Crypto.PublicKey.RSA.importKey`.
:param str nonce: Nonce used for signature. Useful for testing.
:returns: ACME "authorizationRequest" message.
:rtype: dict
"""
return {
"type": "authorizationRequest",
"sessionID": req_id,
"nonce": server_nonce,
"responses": responses,
"signature": crypto_util.create_sig(
name + le_util.jose_b64decode(server_nonce), key, nonce),
}
def certificate_request(csr_der, key, nonce=None):
"""Create ACME "certificateRequest" message.
:param str csr_der: DER encoded CSR.
:param str key: Key in string form. Accepted formats
are the same as for `Crypto.PublicKey.RSA.importKey`.
:param str nonce: Nonce used for signature. Useful for testing.
:returns: ACME "certificateRequest" message.
:rtype: dict
"""
return {
"type": "certificateRequest",
"csr": le_util.jose_b64encode(csr_der),
"signature": crypto_util.create_sig(csr_der, key, nonce),
}
def revocation_request(cert_der, key, nonce=None):
"""Create ACME "revocationRequest" message.
:param str cert_der: DER encoded certificate.
:param str key: Key in string form. Accepted formats
are the same as for `Crypto.PublicKey.RSA.importKey`.
:param str nonce: Nonce used for signature. Useful for testing.
:returns: ACME "revocationRequest" message.
:rtype: dict
"""
return {
"type": "revocationRequest",
"certificate": le_util.jose_b64encode(cert_der),
"signature": crypto_util.create_sig(cert_der, key, nonce),
}
def status_request(token):
"""Create ACME "statusRequest" message.
:param unicode token: Token provided in ACME "defer" message.
:returns: ACME "statusRequest" message.
:rtype: dict
"""
return {
"type": "statusRequest",
"token": token,
}

View file

@ -2,7 +2,10 @@
import logging
import sys
from letsencrypt.client import acme
import Crypto.PublicKey.RSA
from letsencrypt.acme import messages
from letsencrypt.client import challenge_util
from letsencrypt.client import constants
from letsencrypt.client import errors
@ -53,7 +56,9 @@ class AuthHandler(object): # pylint: disable=too-many-instance-attributes
"""Add a challenge message to the AuthHandler.
:param str domain: domain for authorization
:param dict msg: ACME challenge message
:param msg: ACME "challenge" message
:type msg: :class:`letsencrypt.acme.message.Challenge`
:param authkey: authorized key for the challenge
:type authkey: :class:`letsencrypt.client.le_util.Key`
@ -64,7 +69,7 @@ class AuthHandler(object): # pylint: disable=too-many-instance-attributes
"Multiple ACMEChallengeMessages for the same domain "
"is not supported.")
self.domains.append(domain)
self.responses[domain] = ["null"] * len(msg["challenges"])
self.responses[domain] = ["null"] * len(msg.challenges)
self.msgs[domain] = msg
self.authkey[domain] = authkey
@ -102,18 +107,19 @@ class AuthHandler(object): # pylint: disable=too-many-instance-attributes
:param str domain: domain that is requesting authorization
:returns: ACME "authorization" message.
:rtype: dict
:rtype: :class:`letsencrypt.acme.messages.Authorization`
"""
try:
auth = self.network.send_and_receive_expected(
acme.authorization_request(
self.msgs[domain]["sessionID"],
domain,
self.msgs[domain]["nonce"],
self.responses[domain],
self.authkey[domain].pem),
"authorization")
messages.AuthorizationRequest.create(
session_id=self.msgs[domain].session_id,
nonce=self.msgs[domain].nonce,
responses=self.responses[domain],
name=domain,
key=Crypto.PublicKey.RSA.importKey(
self.authkey[domain].pem)),
messages.Authorization)
logging.info("Received Authorization for %s", domain)
return auth
except errors.LetsEncryptClientError as err:
@ -135,9 +141,9 @@ class AuthHandler(object): # pylint: disable=too-many-instance-attributes
logging.info("Performing the following challenges:")
for dom in self.domains:
self.paths[dom] = gen_challenge_path(
self.msgs[dom]["challenges"],
self.msgs[dom].challenges,
self._get_chall_pref(dom),
self.msgs[dom].get("combinations", None))
self.msgs[dom].combinations)
self.dv_c[dom], self.client_c[dom] = self._challenge_factory(
dom, self.paths[dom])
@ -263,7 +269,7 @@ class AuthHandler(object): # pylint: disable=too-many-instance-attributes
recognized
"""
challenges = self.msgs[domain]["challenges"]
challenges = self.msgs[domain].challenges
dv_chall = []
client_chall = []

View file

@ -4,9 +4,10 @@ import hashlib
from Crypto import Random
from letsencrypt.acme import jose
from letsencrypt.client import constants
from letsencrypt.client import crypto_util
from letsencrypt.client import le_util
# Authenticator Challenges
@ -45,7 +46,7 @@ def dvsni_gen_cert(name, r_b64, nonce, key):
"""
# Generate S
dvsni_s = Random.get_random_bytes(constants.S_SIZE)
dvsni_r = le_util.jose_b64decode(r_b64)
dvsni_r = jose.b64decode(r_b64)
# Generate extension
ext = _dvsni_gen_ext(dvsni_r, dvsni_s)
@ -53,7 +54,7 @@ def dvsni_gen_cert(name, r_b64, nonce, key):
cert_pem = crypto_util.make_ss_cert(
key.pem, [nonce + constants.DVSNI_DOMAIN_SUFFIX, name, ext])
return cert_pem, le_util.jose_b64encode(dvsni_s)
return cert_pem, jose.b64encode(dvsni_s)
def _dvsni_gen_ext(dvsni_r, dvsni_s):

View file

@ -5,10 +5,13 @@ import os
import shutil
import sys
import Crypto.PublicKey.RSA
import M2Crypto
import zope.component
from letsencrypt.client import acme
from letsencrypt.acme import messages
from letsencrypt.acme import util as acme_util
from letsencrypt.client import auth_handler
from letsencrypt.client import client_authenticator
from letsencrypt.client import crypto_util
@ -95,11 +98,11 @@ class Client(object):
csr = init_csr(self.authkey, domains, self.config.cert_dir)
# Retrieve certificate
certificate_dict = self.acme_certificate(csr.data)
certificate_msg = self.acme_certificate(csr.data)
# Save Certificate
cert_file, chain_file = self.save_certificate(
certificate_dict, self.config.cert_path, self.config.chain_path)
certificate_msg, self.config.cert_path, self.config.chain_path)
self.store_cert_key(cert_file, False)
@ -109,11 +112,12 @@ class Client(object):
"""Handle ACME "challenge" phase.
:returns: ACME "challenge" message.
:rtype: dict
:rtype: :class:`letsencrypt.acme.messages.Challenge`
"""
return self.network.send_and_receive_expected(
acme.challenge_request(domain), "challenge")
messages.ChallengeRequest(identifier=domain),
messages.Challenge)
def acme_certificate(self, csr_der):
"""Handle ACME "certificate" phase.
@ -121,18 +125,24 @@ class Client(object):
:param str csr_der: CSR in DER format.
:returns: ACME "certificate" message.
:rtype: dict
:rtype: :class:`letsencrypt.acme.message.Certificate`
"""
logging.info("Preparing and sending CSR...")
return self.network.send_and_receive_expected(
acme.certificate_request(csr_der, self.authkey.pem), "certificate")
messages.CertificateRequest.create(
csr=acme_util.ComparableX509(
M2Crypto.X509.load_request_der_string(csr_der)),
key=Crypto.PublicKey.RSA.importKey(self.authkey.pem)),
messages.Certificate)
def save_certificate(self, certificate_dict, cert_path, chain_path):
def save_certificate(self, certificate_msg, cert_path, chain_path):
# pylint: disable=no-self-use
"""Saves the certificate received from the ACME server.
:param dict certificate_dict: certificate message from server
:param certificate_msg: ACME "certificate" message from server.
:type certificate_msg: :class:`letsencrypt.acme.messages.Certificate`
:param str cert_path: Path to attempt to save the cert file
:param str chain_path: Path to attempt to save the chain file
@ -144,16 +154,15 @@ class Client(object):
"""
cert_chain_abspath = None
cert_fd, cert_file = le_util.unique_file(cert_path, 0o644)
cert_fd.write(
crypto_util.b64_cert_to_pem(certificate_dict["certificate"]))
cert_fd.write(certificate_msg.certificate.as_pem())
cert_fd.close()
logging.info(
"Server issued certificate; certificate written to %s", cert_file)
if certificate_dict.get("chain", None):
if certificate_msg.chain:
chain_fd, chain_fn = le_util.unique_file(chain_path, 0o644)
for cert in certificate_dict.get("chain", []):
chain_fd.write(crypto_util.b64_cert_to_pem(cert))
for cert in certificate_msg.chain:
chain_fd.write(cert.to_pem())
chain_fd.close()
logging.info("Cert chain written to %s", chain_fn)

View file

@ -1,67 +1,12 @@
"""Let's Encrypt client crypto utility functions"""
import binascii
import logging
import time
from Crypto import Random
import Crypto.Hash.SHA256
import Crypto.PublicKey.RSA
import Crypto.Signature.PKCS1_v1_5
import M2Crypto
from letsencrypt.client import constants
from letsencrypt.client import le_util
def create_sig(msg, key_str, nonce=None):
"""Create signature with nonce prepended to the message.
.. todo:: Change this over to M2Crypto... PKey
.. todo:: Protect against crypto unicode errors... is this sufficient?
Do I need to escape?
:param str key_str: Key in string form. Accepted formats
are the same as for `Crypto.PublicKey.RSA.importKey`.
:param str msg: Message to be signed
:param str nonce: Nonce to be used (required size
:returns: Signature.
:rtype: dict
"""
key = Crypto.PublicKey.RSA.importKey(key_str)
if nonce is None:
nonce = Random.get_random_bytes(constants.NONCE_SIZE)
assert len(nonce) == constants.NONCE_SIZE
msg_with_nonce = nonce + msg
hashed = Crypto.Hash.SHA256.new(msg_with_nonce)
signature = Crypto.Signature.PKCS1_v1_5.new(key).sign(hashed)
logging.debug("%s signed as %s", msg_with_nonce, signature)
n_bytes = binascii.unhexlify(_leading_zeros(hex(key.n)[2:].rstrip("L")))
e_bytes = binascii.unhexlify(_leading_zeros(hex(key.e)[2:].rstrip("L")))
return {
"nonce": le_util.jose_b64encode(nonce),
"alg": "RS256",
"jwk": {
"kty": "RSA",
"n": le_util.jose_b64encode(n_bytes),
"e": le_util.jose_b64encode(e_bytes),
},
"sig": le_util.jose_b64encode(signature),
}
def _leading_zeros(arg):
if len(arg) % 2:
return "0" + arg
return arg
def make_csr(key_str, domains):
"""Generate a CSR.
@ -244,9 +189,3 @@ def get_cert_info(filename):
"serial": cert.get_serial_number(),
"pub_key": "RSA " + str(cert.get_pubkey().size() * 8),
}
def b64_cert_to_pem(b64_der_cert):
"""Convert JOSE Base-64 encoded DER cert to PEM."""
return M2Crypto.X509.load_cert_der_string(
le_util.jose_b64decode(b64_der_cert)).as_pem()

View file

@ -1,5 +1,4 @@
"""Utilities for all Let's Encrypt."""
import base64
import collections
import errno
import os
@ -73,54 +72,3 @@ def unique_file(path, mode=0o777):
except OSError:
pass
count += 1
# https://tools.ietf.org/html/draft-ietf-jose-json-web-signature-37#appendix-C
#
# Jose Base64:
#
# - URL-safe Base64
#
# - padding stripped
def jose_b64encode(data):
"""JOSE Base64 encode.
:param data: Data to be encoded.
:type data: str or bytearray
:returns: JOSE Base64 string.
:rtype: str
:raises TypeError: if `data` is of incorrect type
"""
if not isinstance(data, str):
raise TypeError("argument should be str or bytearray")
return base64.urlsafe_b64encode(data).rstrip("=")
def jose_b64decode(data):
"""JOSE Base64 decode.
:param data: Base64 string to be decoded. If it's unicode, then
only ASCII characters are allowed.
:type data: str or unicode
:returns: Decoded data.
:raises TypeError: if input is of incorrect type
:raises ValueError: if input is unicode with non-ASCII characters
"""
if isinstance(data, unicode):
try:
data = data.encode("ascii")
except UnicodeEncodeError:
raise ValueError(
"unicode argument should contain only ASCII characters")
elif not isinstance(data, str):
raise TypeError("argument should be a str or unicode")
return base64.urlsafe_b64decode(data + "=" * (4 - (len(data) % 4)))

View file

@ -1,13 +1,12 @@
"""Network Module."""
import json
import logging
import sys
import time
import jsonschema
import requests
from letsencrypt.client import acme
from letsencrypt.acme import messages
from letsencrypt.client import errors
@ -31,10 +30,11 @@ class Network(object):
def send(self, msg):
"""Send ACME message to server.
:param dict msg: ACME message (JSON serializable).
:param msg: ACME message.
:type msg: :class:`letsencrypt.acme.messages.Message`
:returns: Server response message.
:rtype: dict
:rtype: :class:`letsencrypt.acme.messages.Message`
:raises TypeError: if `msg` is not JSON serializable
:raises jsonschema.ValidationError: if not valid ACME message
@ -42,13 +42,10 @@ class Network(object):
or if response from server is not a valid ACME message.
"""
json_encoded = json.dumps(msg)
acme.acme_object_validate(json_encoded)
try:
response = requests.post(
self.server_url,
data=json_encoded,
data=msg.json_dumps(),
headers={"Content-Type": "application/json"},
verify=True
)
@ -56,66 +53,55 @@ class Network(object):
raise errors.LetsEncryptClientError(
'Sending ACME message to server has failed: %s' % error)
try:
acme.acme_object_validate(response.content)
except ValueError:
raise errors.LetsEncryptClientError(
'Server did not send JSON serializable message')
except jsonschema.ValidationError as error:
raise errors.LetsEncryptClientError(
'Response from server is not a valid ACME message')
return response.json()
return messages.Message.from_json(response.json(), validate=True)
def send_and_receive_expected(self, msg, expected):
"""Send ACME message to server and return expected message.
:param dict msg: ACME message (JSON serializable).
:param str expected: Name of the expected response ACME message type.
:param msg: ACME message.
:type msg: :class:`letsencrypt.acme.Message`
:returns: ACME response message of expected type.
:rtype: dict
:rtype: :class:`letsencrypt.acme.messages.Message`
:raises errors.LetsEncryptClientError: An exception is thrown
"""
response = self.send(msg)
try:
return self.is_expected_msg(response, expected)
except: # TODO: too generic exception
raise errors.LetsEncryptClientError(
'Expected message (%s) not received' % expected)
return self.is_expected_msg(response, expected)
def is_expected_msg(self, response, expected, delay=3, rounds=20):
"""Is response expected ACME message?
:param dict response: ACME response message from server.
:param str expected: Name of the expected response ACME message type.
:param response: ACME response message from server.
:type response: :class:`letsencrypt.acme.messages.Message`
:param expected: Expected response type.
:type expected: subclass of :class:`letsencrypt.acme.messages.Message`
:param int delay: Number of seconds to delay before next round
in case of ACME "defer" response message.
:param int rounds: Number of resend attempts in case of ACME "defer"
response message.
:returns: ACME response message from server.
:rtype: dict
:rtype: :class:`letsencrypt.acme.messages.Message`
:raises LetsEncryptClientError: if server sent ACME "error" message
"""
for _ in xrange(rounds):
if response["type"] == expected:
if isinstance(response, expected):
return response
elif response["type"] == "error":
logging.error(
"%s: %s - More Info: %s", response["error"],
response.get("message", ""), response.get("moreInfo", ""))
raise errors.LetsEncryptClientError(response["error"])
elif response["type"] == "defer":
elif isinstance(response, messages.Error):
logging.error("%s", response)
raise errors.LetsEncryptClientError(response.error)
elif isinstance(response, messages.Defer):
logging.info("Waiting for %d seconds...", delay)
time.sleep(delay)
response = self.send(acme.status_request(response["token"]))
response = self.send(
messages.StatusRequest(token=response.token))
else:
logging.fatal("Received unexpected message")
logging.fatal("Expected: %s", expected)

View file

@ -4,10 +4,13 @@ import logging
import os
import shutil
import Crypto.PublicKey.RSA
import M2Crypto
import zope.component
from letsencrypt.client import acme
from letsencrypt.acme import messages
from letsencrypt.acme import util as acme_util
from letsencrypt.client import crypto_util
from letsencrypt.client import display
from letsencrypt.client import interfaces
@ -33,15 +36,18 @@ class Revoker(object):
:param dict cert: TODO
:returns: ACME "revocation" message.
:rtype: dict
:rtype: :class:`letsencrypt.acme.message.Revocation`
"""
cert_der = M2Crypto.X509.load_cert(cert["backup_cert_file"]).as_der()
certificate = acme_util.ComparableX509(
M2Crypto.X509.load_cert(cert["backup_cert_file"]))
with open(cert["backup_key_file"], 'rU') as backup_key_file:
key = backup_key_file.read()
key = Crypto.PublicKey.RSA.importKey(backup_key_file.read())
revocation = self.network.send_and_receive_expected(
acme.revocation_request(cert_der, key), "revocation")
messages.RevocationRequest.create(
certificate=certificate, key=key),
messages.Revocation)
zope.component.getUtility(interfaces.IDisplay).generic_notification(
"You have successfully revoked the certificate for "
@ -65,8 +71,8 @@ class Revoker(object):
c_sha1_vh = {}
for (cert, _, path) in self.installer.get_all_certs_keys():
try:
c_sha1_vh[M2Crypto.X509.load_cert(
cert).get_fingerprint(md='sha1')] = path
c_sha1_vh[acme_util.ComparableX509(M2Crypto.X509.load_cert(
cert).get_fingerprint(md='sha1'))] = path
except M2Crypto.X509.X509Error:
continue

View file

@ -1,158 +0,0 @@
"""Tests for letsencrypt.client.acme."""
import pkg_resources
import unittest
import jsonschema
class ACMEObjectValidateTest(unittest.TestCase):
"""Tests for letsencrypt.client.acme.acme_object_validate."""
def setUp(self):
self.schemata = {
'foo': {
'type': 'object',
'properties': {
'price': {'type': 'number'},
'name': {'type': 'string'},
},
},
}
def _call(self, json_string):
from letsencrypt.client.acme import acme_object_validate
return acme_object_validate(json_string, self.schemata)
def _test_fails(self, json_string):
self.assertRaises(jsonschema.ValidationError, self._call, json_string)
def test_non_dictionary_fails(self):
self._test_fails('[]')
def test_dict_without_type_fails(self):
self._test_fails('{}')
def test_unknown_type_fails(self):
self._test_fails('{"type": "bar"}')
def test_valid_returns_none(self):
self.assertTrue(self._call('{"type": "foo"}') is None)
def test_invalid_fails(self):
self._test_fails('{"type": "foo", "price": "asd"}')
class PrettyTest(unittest.TestCase): # pylint: disable=too-few-public-methods
"""Tests for letsencrypt.client.acme.pretty."""
@classmethod
def _call(cls, json_string):
from letsencrypt.client.acme import pretty
return pretty(json_string)
def test_it(self):
self.assertEqual(
self._call('{"foo": {"bar": "baz"}}'),
'{\n "foo": {\n "bar": "baz"\n }\n}')
class MessageFactoriesTest(unittest.TestCase):
"""Tests for ACME message factories from letsencrypt.client.acme."""
def setUp(self):
self.privkey = pkg_resources.resource_string(
__name__, 'testdata/rsa256_key.pem')
self.nonce = '\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9'
self.b64nonce = '7Nbyb1lI6xPVI3Hg3aKSqQ'
@classmethod
def _validate(cls, msg):
from letsencrypt.client.acme import SCHEMATA
jsonschema.validate(msg, SCHEMATA[msg['type']])
def test_challenge_request(self):
from letsencrypt.client.acme import challenge_request
msg = challenge_request('example.com')
self._validate(msg)
self.assertEqual(msg, {
'type': 'challengeRequest',
'identifier': 'example.com',
})
def test_authorization_request(self):
from letsencrypt.client.acme import authorization_request
responses = [
{
'type': 'simpleHttps',
'path': 'Hf5GrX4Q7EBax9hc2jJnfw',
},
None, # null
{
'type': 'recoveryToken',
'token': '23029d88d9e123e',
}
]
msg = authorization_request(
'aefoGaavieG9Wihuk2aufai3aeZ5EeW4',
'example.com',
'czpsrF0KMH6dgajig3TGHw',
responses,
self.privkey,
self.nonce,
)
self._validate(msg)
self.assertEqual(
msg.pop('signature')['sig'],
'VkpReso87ogwGul2MGck96TkYs4QoblIgNthgrm9O7EBGlzCRCnTHnx'
'bj6loqaC4f5bn1rgS927Gp1Kvbqnmqg'
)
self.assertEqual(msg, {
'type': 'authorizationRequest',
'sessionID': 'aefoGaavieG9Wihuk2aufai3aeZ5EeW4',
'nonce': 'czpsrF0KMH6dgajig3TGHw',
'responses': responses,
})
def test_certificate_request(self):
from letsencrypt.client.acme import certificate_request
msg = certificate_request(
'TODO: real DER CSR?', self.privkey, self.nonce)
self._validate(msg)
self.assertEqual(
msg.pop('signature')['sig'],
'HEQVN4MU1yDrArP2T7WZQ12XlHCn5DgTPgb5eWT5_vjRPppLSNe6uWE'
'x9SFwG9d9umqn49nZCSW7uskA2lcW6Q'
)
self.assertEqual(msg, {
'type': 'certificateRequest',
'csr': 'VE9ETzogcmVhbCBERVIgQ1NSPw',
})
def test_revocation_request(self):
from letsencrypt.client.acme import revocation_request
msg = revocation_request(
'TODO: real DER cert?', self.privkey, self.nonce)
self._validate(msg)
self.assertEqual(
msg.pop('signature')['sig'],
'ABXA1IsyTalTXIojxmGnIUGyZASmvqEvTQ98jJ5KFs2FTswLEmsoqFX'
'fU6l5_fous-tsbXOfLN-7PjfZ5XWPvg'
)
self.assertEqual(msg, {
'type': 'revocationRequest',
'certificate': 'VE9ETzogcmVhbCBERVIgY2VydD8',
})
def test_status_request(self):
from letsencrypt.client.acme import status_request
msg = status_request(u'O7-s9MNq1siZHlgrMzi9_A')
self._validate(msg)
self.assertEqual(msg, {
'type': 'statusRequest',
'token': u'O7-s9MNq1siZHlgrMzi9_A',
})
if __name__ == '__main__':
unittest.main()

View file

@ -90,18 +90,3 @@ def gen_combos(challs):
# Gen combos for 1 of each type
return [[i, j] for i in xrange(len(dv_chall))
for j in xrange(len(renewal_chall))]
def get_chall_msg(iden, nonce, challenges, combos=None):
"""Produce an ACME challenge message."""
chall_msg = {
"type": "challenge",
"sessionID": iden,
"nonce": nonce,
"challenges": challenges
}
if combos is None:
return chall_msg
chall_msg["combinations"] = combos
return chall_msg

View file

@ -4,8 +4,11 @@ import unittest
import mock
from letsencrypt.acme import messages
from letsencrypt.client import challenge_util
from letsencrypt.client import errors
from letsencrypt.client.tests import acme_util
@ -45,7 +48,8 @@ class SatisfyChallengesTest(unittest.TestCase):
def test_name1_dvsni1(self):
dom = "0"
challenge = [acme_util.CHALLENGES["dvsni"]]
msg = acme_util.get_chall_msg(dom, "nonce0", challenge)
msg = messages.Challenge(session_id=dom, nonce="nonce0",
challenges=challenge, combinations=[])
self.handler.add_chall_msg(dom, msg, "dummy_key")
self.handler._satisfy_challenges() # pylint: disable=protected-access
@ -62,7 +66,8 @@ class SatisfyChallengesTest(unittest.TestCase):
def test_name1_rectok1(self):
dom = "0"
challenge = [acme_util.CHALLENGES["recoveryToken"]]
msg = acme_util.get_chall_msg(dom, "nonce0", challenge)
msg = messages.Challenge(session_id=dom, nonce="nonce0",
challenges=challenge, combinations=[])
self.handler.add_chall_msg(dom, msg, "dummy_key")
self.handler._satisfy_challenges() # pylint: disable=protected-access
@ -87,7 +92,8 @@ class SatisfyChallengesTest(unittest.TestCase):
for i in xrange(5):
self.handler.add_chall_msg(
str(i),
acme_util.get_chall_msg(str(i), "nonce%d" % i, challenge),
messages.Challenge(session_id=str(i), nonce="nonce%d" % i,
challenges=challenge, combinations=[]),
"dummy_key")
self.handler._satisfy_challenges() # pylint: disable=protected-access
@ -118,7 +124,8 @@ class SatisfyChallengesTest(unittest.TestCase):
combos = acme_util.gen_combos(challenges)
self.handler.add_chall_msg(
dom,
acme_util.get_chall_msg("0", "nonce0", challenges, combos),
messages.Challenge(session_id="0", nonce="nonce0",
challenges=challenges, combinations=combos),
"dummy_key")
path = gen_path(["simpleHttps"], challenges)
@ -151,7 +158,8 @@ class SatisfyChallengesTest(unittest.TestCase):
combos = acme_util.gen_combos(challenges)
self.handler.add_chall_msg(
dom,
acme_util.get_chall_msg(dom, "nonce0", challenges, combos),
messages.Challenge(session_id=dom, nonce="nonce0",
challenges=challenges, combinations=combos),
"dummy_key")
path = gen_path(["simpleHttps", "recoveryToken"], challenges)
@ -181,8 +189,9 @@ class SatisfyChallengesTest(unittest.TestCase):
for i in xrange(5):
self.handler.add_chall_msg(
str(i),
acme_util.get_chall_msg(
str(i), "nonce%d" % i, challenges, combos),
messages.Challenge(
session_id=str(i), nonce="nonce%d" % i,
challenges=challenges, combinations=combos),
"dummy_key")
path = gen_path(["dvsni", "recoveryContact"], challenges)
@ -230,8 +239,9 @@ class SatisfyChallengesTest(unittest.TestCase):
paths.append(gen_path(chosen_chall[i], challenge_list[i]))
self.handler.add_chall_msg(
dom,
acme_util.get_chall_msg(
dom, "nonce%d" % i, challenge_list[i]),
messages.Challenge(
session_id=dom, nonce="nonce%d" % i,
challenges=challenge_list[i], combinations=[]),
"dummy_key")
mock_chall_path.side_effect = paths
@ -278,8 +288,9 @@ class SatisfyChallengesTest(unittest.TestCase):
for i in xrange(3):
self.handler.add_chall_msg(
str(i),
acme_util.get_chall_msg(
str(i), "nonce%d" % i, challenges, combos),
messages.Challenge(
session_id=str(i), nonce="nonce%d" % i,
challenges=challenges, combinations=combos),
"dummy_key")
mock_chall_path.side_effect = [
@ -316,7 +327,8 @@ class SatisfyChallengesTest(unittest.TestCase):
isinstance(client_chall_list[0], challenge_util.PopChall))
def _get_exp_response(self, domain, path, challenges): # pylint: disable=no-self-use
def _get_exp_response(self, domain, path, challenges):
# pylint: disable=no-self-use
exp_resp = ["null"] * len(challenges)
for i in path:
exp_resp[i] = TRANSLATE[challenges[i]["type"]] + str(domain)
@ -349,7 +361,8 @@ class GetAuthorizationsTest(unittest.TestCase):
for i in xrange(3):
self.handler.add_chall_msg(
str(i),
acme_util.get_chall_msg(str(i), "nonce%d" % i, challenge),
messages.Challenge(session_id=str(i), nonce="nonce%d" % i,
challenges=challenge, combinations=[]),
"dummy_key")
self.mock_sat_chall.side_effect = self._sat_solved_at_once
@ -377,7 +390,8 @@ class GetAuthorizationsTest(unittest.TestCase):
challenges = acme_util.get_challenges()
self.handler.add_chall_msg(
"0",
acme_util.get_chall_msg("0", "nonce0", challenges),
messages.Challenge(session_id="0", nonce="nonce0",
challenges=challenges, combinations=[]),
"dummy_key")
# Don't do anything to satisfy challenges
@ -392,7 +406,7 @@ class GetAuthorizationsTest(unittest.TestCase):
def _sat_failure(self):
dom = "0"
self.handler.paths[dom] = gen_path(
["dns", "recoveryToken"], self.handler.msgs[dom]["challenges"])
["dns", "recoveryToken"], self.handler.msgs[dom].challenges)
dv_c, c_c = self.handler._challenge_factory(
dom, self.handler.paths[dom])
self.handler.dv_c[dom], self.handler.client_c[dom] = dv_c, c_c
@ -405,7 +419,8 @@ class GetAuthorizationsTest(unittest.TestCase):
dom = str(i)
self.handler.add_chall_msg(
dom,
acme_util.get_chall_msg(dom, "nonce%d" % i, challs[i]),
messages.Challenge(session_id=dom, nonce="nonce%d" % i,
challenges=challs[i], combinations=[]),
"dummy_key")
self.mock_sat_chall.side_effect = self._sat_incremental

View file

@ -6,6 +6,8 @@ import unittest
import M2Crypto
from letsencrypt.acme import jose
from letsencrypt.client import challenge_util
from letsencrypt.client import constants
from letsencrypt.client import le_util
@ -19,7 +21,7 @@ class DvsniGenCertTest(unittest.TestCase):
"""Basic test for straightline code."""
domain = "example.com"
dvsni_r = "r_value"
r_b64 = le_util.jose_b64encode(dvsni_r)
r_b64 = jose.b64encode(dvsni_r)
pem = pkg_resources.resource_string(
__name__, os.path.join("testdata", "rsa256_key.pem"))
key = le_util.Key("path", pem)
@ -28,7 +30,7 @@ class DvsniGenCertTest(unittest.TestCase):
# pylint: disable=protected-access
ext = challenge_util._dvsni_gen_ext(
dvsni_r, le_util.jose_b64decode(s_b64))
dvsni_r, jose.b64decode(s_b64))
self._standard_check_cert(cert_pem, domain, nonce, ext)
def _standard_check_cert(self, pem, domain, nonce, ext):

View file

@ -11,43 +11,6 @@ RSA256_KEY = pkg_resources.resource_string(__name__, 'testdata/rsa256_key.pem')
RSA512_KEY = pkg_resources.resource_string(__name__, 'testdata/rsa512_key.pem')
class CreateSigTest(unittest.TestCase):
"""Tests for letsencrypt.client.crypto_util.create_sig."""
def setUp(self):
self.nonce = '\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9'
self.b64nonce = '7Nbyb1lI6xPVI3Hg3aKSqQ'
self.signature = {
'nonce': self.b64nonce,
'alg': 'RS256',
'jwk': {
'kty': 'RSA',
'e': 'AQAB',
'n': 'rHVztFHtH92ucFJD_N_HW9AsdRsUuHUBBBDlHwNlRd3fp5'
'80rv2-6QWE30cWgdmJS86ObRz6lUTor4R0T-3C5Q',
},
'sig': 'SUPYKucUnhlTt8_sMxLiigOYdf_wlOLXPI-o7aRLTsOquVjDd6r'
'AX9AFJHk-bCMQPJbSzXKjG6H1IWbvxjS2Ew',
}
@classmethod
def _call(cls, *args, **kwargs):
from letsencrypt.client.crypto_util import create_sig
return create_sig(*args, **kwargs)
def test_it(self):
self.assertEqual(
self._call('message', RSA256_KEY, self.nonce), self.signature)
def test_random_nonce(self):
signature = self._call('message', RSA256_KEY)
signature.pop('sig')
signature.pop('nonce')
del self.signature['sig']
del self.signature['nonce']
self.assertEqual(signature, self.signature)
class ValidCSRTest(unittest.TestCase):
"""Tests for letsencrypt.client.crypto_util.valid_csr."""
@ -170,17 +133,5 @@ class GetCertInfoTest(unittest.TestCase):
self._call('cert-san.pem')
class B64CertToPEMTest(unittest.TestCase):
# pylint: disable=too-few-public-methods
"""Tests for letsencrypt.client.crypto_util.b64_cert_to_pem."""
def test_it(self):
from letsencrypt.client.crypto_util import b64_cert_to_pem
self.assertEqual(
b64_cert_to_pem(pkg_resources.resource_string(
__name__, 'testdata/cert.b64jose')),
pkg_resources.resource_string(__name__, 'testdata/cert.pem'))
if __name__ == '__main__':
unittest.main()

View file

@ -122,71 +122,5 @@ class UniqueFileTest(unittest.TestCase):
self.assertTrue(basename3.endswith('foo.txt'))
# https://en.wikipedia.org/wiki/Base64#Examples
JOSE_B64_PADDING_EXAMPLES = {
'any carnal pleasure.': ('YW55IGNhcm5hbCBwbGVhc3VyZS4', '='),
'any carnal pleasure': ('YW55IGNhcm5hbCBwbGVhc3VyZQ', '=='),
'any carnal pleasur': ('YW55IGNhcm5hbCBwbGVhc3Vy', ''),
'any carnal pleasu': ('YW55IGNhcm5hbCBwbGVhc3U', '='),
'any carnal pleas': ('YW55IGNhcm5hbCBwbGVhcw', '=='),
}
B64_URL_UNSAFE_EXAMPLES = {
chr(251) + chr(239): '--8',
chr(255) * 2: '__8',
}
class JOSEB64EncodeTest(unittest.TestCase):
"""Tests for letsencrypt.client.le_util.jose_b64encode."""
@classmethod
def _call(cls, data):
from letsencrypt.client.le_util import jose_b64encode
return jose_b64encode(data)
def test_unsafe_url(self):
for text, b64 in B64_URL_UNSAFE_EXAMPLES.iteritems():
self.assertEqual(self._call(text), b64)
def test_different_paddings(self):
for text, (b64, _) in JOSE_B64_PADDING_EXAMPLES.iteritems():
self.assertEqual(self._call(text), b64)
def test_unicode_fails_with_type_error(self):
self.assertRaises(TypeError, self._call, u'some unicode')
class JOSEB64DecodeTest(unittest.TestCase):
"""Tests for letsencrypt.client.le_util.jose_b64decode."""
@classmethod
def _call(cls, data):
from letsencrypt.client.le_util import jose_b64decode
return jose_b64decode(data)
def test_unsafe_url(self):
for text, b64 in B64_URL_UNSAFE_EXAMPLES.iteritems():
self.assertEqual(self._call(b64), text)
def test_input_without_padding(self):
for text, (b64, _) in JOSE_B64_PADDING_EXAMPLES.iteritems():
self.assertEqual(self._call(b64), text)
def test_input_with_padding(self):
for text, (b64, pad) in JOSE_B64_PADDING_EXAMPLES.iteritems():
self.assertEqual(self._call(b64 + pad), text)
def test_unicode_with_ascii(self):
self.assertEqual(self._call(u'YQ'), 'a')
def test_non_ascii_unicode_fails(self):
self.assertRaises(ValueError, self._call, u'\u0105')
def test_type_error_no_unicode_or_str(self):
self.assertRaises(TypeError, self._call, object())
if __name__ == '__main__':
unittest.main()

View file

@ -9,6 +9,8 @@ import mock
import OpenSSL.crypto
import OpenSSL.SSL
from letsencrypt.acme import jose
from letsencrypt.client import challenge_util
from letsencrypt.client import le_util
@ -61,7 +63,7 @@ class SNICallbackTest(unittest.TestCase):
from letsencrypt.client.standalone_authenticator import \
StandaloneAuthenticator
self.authenticator = StandaloneAuthenticator()
name, r_b64 = "example.com", le_util.jose_b64encode("x" * 32)
name, r_b64 = "example.com", jose.b64encode("x" * 32)
test_key = pkg_resources.resource_string(
__name__, "testdata/rsa256_key.pem")
nonce, key = "abcdef", le_util.Key("foo", test_key)
@ -428,7 +430,7 @@ class DoChildProcessTest(unittest.TestCase):
from letsencrypt.client.standalone_authenticator import \
StandaloneAuthenticator
self.authenticator = StandaloneAuthenticator()
name, r_b64 = "example.com", le_util.jose_b64encode("x" * 32)
name, r_b64 = "example.com", jose.b64encode("x" * 32)
test_key = pkg_resources.resource_string(
__name__, "testdata/rsa256_key.pem")
nonce, key = "abcdef", le_util.Key("foo", test_key)

23
linter_plugin.py Normal file
View file

@ -0,0 +1,23 @@
"""Let's Encrypt ACME PyLint plugin.
http://docs.pylint.org/plugins.html
"""
from astroid import MANAGER
from astroid import nodes
def register(unused_linter):
"""Register this module as PyLint plugin."""
def _transform(cls):
# fix the "no-member" error on instances of
# letsencrypt.acme.util.ImmutableMap subclasses (instance
# attributes are initialized dynamically based on __slots__)
if (('Message' in cls.basenames or 'ImmutableMap' in cls.basenames or
'util.ImmutableMap' in cls.basenames) and (cls.slots() is not None)):
for slot in cls.slots():
cls.locals[slot.value] = [nodes.EmptyNode()]
MANAGER.register_transform(nodes.Class, _transform)

View file

@ -61,6 +61,7 @@ setup(
url="https://letsencrypt.org",
packages=[
'letsencrypt',
'letsencrypt.acme',
'letsencrypt.client',
'letsencrypt.client.apache',
'letsencrypt.client.tests',

View file

@ -10,11 +10,14 @@ commands =
pip install -e .[testing]
python setup.py test -q # -q does not suppress errors
setenv =
PYTHONPATH = {toxinidir}
[testenv:cover]
basepython = python2.7
commands =
pip install -e .[testing]
python setup.py nosetests --with-coverage --cover-min-percentage=66
python setup.py nosetests --with-coverage --cover-min-percentage=73
[testenv:lint]
# recent versions of pylint do not support Python 2.6 (#97, #187)