(Typed)ACMEObject class hierarchy, from_json vs from_valid_json.

This commit is contained in:
Jakub Warmuz 2015-02-12 08:43:42 +00:00
parent 05cdb821dc
commit 3d883fd77f
No known key found for this signature in database
GPG key ID: 2A7BAD3A489B52EA
10 changed files with 227 additions and 272 deletions

View file

@ -4,10 +4,10 @@ class Error(Exception):
"""Generic ACME error."""
class ValidationError(Error):
"""ACME message validation error."""
"""ACME object validation error."""
class UnrecognizedMessageTypeError(ValidationError):
"""Unrecognized ACME message type error."""
class UnrecognizedTypeError(ValidationError):
"""Unrecognized ACME object type error."""
class SchemaValidationError(ValidationError):
"""JSON schema ACME message validation error."""
"""JSON schema ACME object validation error."""

View file

@ -2,6 +2,7 @@
import zope.interface
# pylint: disable=no-self-argument,no-method-argument,no-init,inherit-non-class
# pylint: disable=too-few-public-methods
class IJSONSerializable(zope.interface.Interface):
@ -20,3 +21,13 @@ class IJSONSerializable(zope.interface.Interface):
:rtype: dict
"""
class IJSONDeserializable(zope.interface.Interface):
"""JSON deserializable class."""
def from_valid_json(jobj):
"""Deserialize valid JSON object.
:param jobj: Validated JSON object.
"""

View file

@ -13,7 +13,7 @@ def _leading_zeros(arg):
return arg
class JWK(util.JSONDeSerializable, util.ImmutableMap):
class JWK(util.ACMEObject):
# pylint: disable=too-few-public-methods
"""JSON Web Key.
@ -21,7 +21,6 @@ class JWK(util.JSONDeSerializable, util.ImmutableMap):
"""
__slots__ = ('key',)
schema = util.load_schema('jwk')
@classmethod
def _encode_param(cls, param):
@ -35,7 +34,6 @@ class JWK(util.JSONDeSerializable, util.ImmutableMap):
return long(binascii.hexlify(b64decode(param)), 16)
def to_json(self):
"""Serialize to JSON."""
return {
'kty': 'RSA', # TODO
'n': self._encode_param(self.key.n),
@ -43,7 +41,7 @@ class JWK(util.JSONDeSerializable, util.ImmutableMap):
}
@classmethod
def _from_valid_json(cls, jobj):
def from_valid_json(cls, jobj):
assert 'RSA' == jobj['kty'] # TODO
return cls(key=Crypto.PublicKey.RSA.construct(
(cls._decode_param(jobj['n']), cls._decode_param(jobj['e']))))

View file

@ -45,7 +45,7 @@ class JWKTest(unittest.TestCase):
def test_from_json(self):
from letsencrypt.acme.jose import JWK
self.assertEqual(self.jwk256, JWK.from_json(self.jwk256json))
self.assertEqual(self.jwk256, JWK.from_valid_json(self.jwk256json))
# TODO: fix schemata to allow RSA512
#self.assertEqual(self.jwk512, JWK.from_json(self.jwk512json))

View file

@ -1,58 +1,21 @@
"""ACME protocol messages."""
import json
import jsonschema
import M2Crypto
import zope.interface
from letsencrypt.acme import errors
from letsencrypt.acme import interfaces
from letsencrypt.acme import jose
from letsencrypt.acme import other
from letsencrypt.acme import util
class Message(util.JSONDeSerializable, util.ImmutableMap):
"""ACME message.
Messages are considered immutable.
"""
zope.interface.implements(interfaces.IJSONSerializable)
acme_type = NotImplemented
"""ACME message "type" field. Subclasses must override."""
class Message(util.TypedACMEObject):
# _fields_to_json | pylint: disable=abstract-method
"""ACME message."""
TYPES = {}
"""Message types registered for JSON deserialization"""
@classmethod
def register(cls, msg_cls):
"""Register class for JSON deserialization."""
cls.TYPES[msg_cls.acme_type] = msg_cls
return msg_cls
def to_json(self):
"""Get JSON serializable object.
:returns: Serializable JSON object representing ACME message.
:meth:`validate` will almost certainly not work, due to reasons
explained in :class:`letsencrypt.acme.interfaces.IJSONSerializable`.
:rtype: dict
"""
jobj = self._fields_to_json()
jobj["type"] = self.acme_type
return jobj
def _fields_to_json(self):
"""Prepare ACME message fields for JSON serialiazation.
Subclasses must override this method.
:returns: Serializable JSON object containg all ACME message fields
apart from "type".
:rtype: dict
"""
raise NotImplementedError()
schema = NotImplemented
@classmethod
def get_msg_cls(cls, jobj):
@ -74,30 +37,47 @@ class Message(util.JSONDeSerializable, util.ImmutableMap):
try:
msg_cls = cls.TYPES[msg_type]
except KeyError:
raise errors.UnrecognizedMessageTypeError(msg_type)
raise errors.UnrecognizedTypeError(msg_type)
return msg_cls
@classmethod
def from_json(cls, jobj, validate=True):
"""Deserialize validated ACME message from JSON string.
def from_json(cls, jobj):
"""Deserialize from (possibly invalid) JSON object.
:param str jobj: JSON object.
:param bool validate: Validate against schema before deserializing.
Useful if :class:`JWK` is part of already validated json object.
Note that the input ``jobj`` has not been sanitized in any way.
:raises letsencrypt.acme.errors.ValidationError: if validation
was unsuccessful
:param jobj: JSON object.
:returns: Valid ACME message.
:rtype: subclass of :class:`Message`
:raises letsencrypt.acme.errors.SchemaValidationError: if ``validate``
was ``True`` and object couldn't be validated.
:returns: instance of the class
"""
msg_cls = cls.get_msg_cls(jobj)
if validate:
msg_cls.validate_json(jobj)
# pylint: disable=protected-access
return msg_cls._from_valid_json(jobj)
try:
jsonschema.validate(jobj, msg_cls.schema)
except jsonschema.ValidationError as error:
raise errors.SchemaValidationError(error)
return cls.from_valid_json(jobj)
@classmethod
def json_loads(cls, json_string):
"""Load JSON string."""
return cls.from_json(json.loads(json_string))
def json_dumps(self, *args, **kwargs):
"""Dump to JSON string using proper serializer.
:returns: JSON serialized string.
:rtype: str
"""
return json.dumps(
self, *args, default=util.dump_ijsonserializable, **kwargs)
@Message.register # pylint: disable=too-few-public-methods
@ -118,7 +98,7 @@ class Challenge(Message):
return fields
@classmethod
def _from_valid_json(cls, jobj):
def from_valid_json(cls, jobj):
return cls(session_id=jobj["sessionID"],
nonce=jose.b64decode(jobj["nonce"]),
challenges=jobj["challenges"],
@ -142,7 +122,7 @@ class ChallengeRequest(Message):
}
@classmethod
def _from_valid_json(cls, jobj):
def from_valid_json(cls, jobj):
return cls(identifier=jobj["identifier"])
@ -164,10 +144,10 @@ class Authorization(Message):
return fields
@classmethod
def _from_valid_json(cls, jobj):
def from_valid_json(cls, jobj):
jwk = jobj.get("jwk")
if jwk is not None:
jwk = jose.JWK.from_json(jwk, validate=False)
jwk = jose.JWK.from_valid_json(jwk)
return cls(recovery_token=jobj.get("recoveryToken"),
identifier=jobj.get("identifier"), jwk=jwk)
@ -236,12 +216,11 @@ class AuthorizationRequest(Message):
return fields
@classmethod
def _from_valid_json(cls, jobj):
def from_valid_json(cls, jobj):
return cls(session_id=jobj["sessionID"],
nonce=jose.b64decode(jobj["nonce"]),
responses=jobj["responses"],
signature=other.Signature.from_json(
jobj["signature"], validate=False),
signature=other.Signature.from_valid_json(jobj["signature"]),
contact=jobj.get("contact", []))
@ -278,7 +257,7 @@ class Certificate(Message):
return jose.b64encode(cert.as_der())
@classmethod
def _from_valid_json(cls, jobj):
def from_valid_json(cls, jobj):
return cls(certificate=cls._decode_cert(jobj["certificate"]),
chain=[cls._decode_cert(cert) for cert in
jobj.get("chain", [])],
@ -344,10 +323,9 @@ class CertificateRequest(Message):
}
@classmethod
def _from_valid_json(cls, jobj):
def from_valid_json(cls, jobj):
return cls(csr=cls._decode_csr(jobj["csr"]),
signature=other.Signature.from_json(
jobj["signature"], validate=False))
signature=other.Signature.from_valid_json(jobj["signature"]))
@Message.register # pylint: disable=too-few-public-methods
@ -366,7 +344,7 @@ class Defer(Message):
return fields
@classmethod
def _from_valid_json(cls, jobj):
def from_valid_json(cls, jobj):
return cls(token=jobj["token"], interval=jobj.get("interval"),
message=jobj.get("message"))
@ -396,7 +374,7 @@ class Error(Message):
return fields
@classmethod
def _from_valid_json(cls, jobj):
def from_valid_json(cls, jobj):
return cls(error=jobj["error"], message=jobj.get("message"),
more_info=jobj.get("moreInfo"))
@ -412,7 +390,7 @@ class Revocation(Message):
return {}
@classmethod
def _from_valid_json(cls, jobj):
def from_valid_json(cls, jobj):
return cls()
@ -475,10 +453,9 @@ class RevocationRequest(Message):
}
@classmethod
def _from_valid_json(cls, jobj):
def from_valid_json(cls, jobj):
return cls(certificate=cls._decode_cert(jobj["certificate"]),
signature=other.Signature.from_json(
jobj["signature"], validate=False))
signature=other.Signature.from_valid_json(jobj["signature"]))
@Message.register # pylint: disable=too-few-public-methods
@ -496,5 +473,5 @@ class StatusRequest(Message):
return {"token": self.token}
@classmethod
def _from_valid_json(cls, jobj):
def from_valid_json(cls, jobj):
return cls(token=jobj["token"])

View file

@ -4,7 +4,6 @@ import unittest
import Crypto.PublicKey.RSA
import M2Crypto.X509
import mock
from letsencrypt.acme import errors
from letsencrypt.acme import jose
@ -28,7 +27,13 @@ class MessageTest(unittest.TestCase):
def setUp(self):
# pylint: disable=missing-docstring,too-few-public-methods
from letsencrypt.acme.messages import Message
class TestMessage(Message):
class MockParentMessage(Message):
# pylint: disable=abstract-method
TYPES = {}
@MockParentMessage.register
class MockMessage(MockParentMessage):
acme_type = 'test'
schema = {
'type': 'object',
@ -37,53 +42,45 @@ class MessageTest(unittest.TestCase):
'name': {'type': 'string'},
},
}
__slots__ = ('price', 'name')
@classmethod
def _from_valid_json(cls, jobj):
return jobj
def from_valid_json(cls, jobj):
return cls(price=jobj.get('price'), name=jobj.get('name'))
def _fields_to_json(self):
return {'foo': 'bar'}
# pylint: disable=no-member
return {'price': self.price, 'name': self.name}
self.msg_cls = TestMessage
def test_to_json(self):
self.assertEqual(self.msg_cls().to_json(), {
'type': 'test',
'foo': 'bar',
})
def test_fields_to_json_not_implemented(self):
from letsencrypt.acme.messages import Message
# pylint: disable=protected-access
self.assertRaises(NotImplementedError, Message()._fields_to_json)
@classmethod
def _from_json(cls, jobj, validate=True):
from letsencrypt.acme.messages import Message
return Message.from_json(jobj, validate)
self.parent_cls = MockParentMessage
self.msg = MockMessage(price=123, name='foo')
def test_from_json_non_dict_fails(self):
self.assertRaises(errors.ValidationError, self._from_json, [])
self.assertRaises(errors.ValidationError, self.parent_cls.from_json, [])
def test_from_json_dict_no_type_fails(self):
self.assertRaises(errors.ValidationError, self._from_json, {})
self.assertRaises(errors.ValidationError, self.parent_cls.from_json, {})
def test_from_json_unknown_type_fails(self):
self.assertRaises(errors.UnrecognizedMessageTypeError,
self._from_json, {'type': 'bar'})
def test_from_json_unrecognized_type(self):
self.assertRaises(errors.UnrecognizedTypeError,
self.parent_cls.from_json, {'type': 'foo'})
@mock.patch('letsencrypt.acme.messages.Message.TYPES')
def test_from_json_validate_errors(self, types):
types.__getitem__.side_effect = lambda x: {'foo': self.msg_cls}[x]
def test_from_json_validates(self):
self.assertRaises(errors.SchemaValidationError,
self._from_json, {'type': 'foo', 'price': 'asd'})
self.parent_cls.from_json,
{'type': 'test', 'price': 'asd'})
@mock.patch('letsencrypt.acme.messages.Message.TYPES')
def test_from_json_valid_returns_cls(self, types):
types.__getitem__.side_effect = lambda x: {'foo': self.msg_cls}[x]
self.assertEqual(self._from_json({'type': 'foo'}, validate=False),
{'type': 'foo'})
def test_from_json(self):
self.assertEqual(self.msg, self.parent_cls.from_json(
{'type': 'test', 'name': 'foo', 'price': 123}))
def test_json_loads(self):
self.assertEqual(self.msg, self.parent_cls.json_loads(
'{"type": "test", "name": "foo", "price": 123}'))
def test_json_dumps(self):
self.assertEqual(self.msg.json_dumps(sort_keys=True),
'{"name": "foo", "price": 123, "type": "test"}')
class ChallengeTest(unittest.TestCase):
@ -408,10 +405,7 @@ class RevocationTest(unittest.TestCase):
def setUp(self):
from letsencrypt.acme.messages import Revocation
self.msg = Revocation()
self.jmsg = {
'type': 'revocation',
}
self.jmsg = {'type': 'revocation'}
def test_to_json(self):
self.assertEqual(self.msg.to_json(), self.jmsg)

View file

@ -1,4 +1,4 @@
"""JSON objects in ACME protocol other than messages."""
"""Other ACME objects."""
import logging
from Crypto import Random
@ -9,7 +9,7 @@ from letsencrypt.acme import jose
from letsencrypt.acme import util
class Signature(util.JSONDeSerializable, util.ImmutableMap):
class Signature(util.ACMEObject):
"""ACME signature.
:ivar str alg: Signature algorithm.
@ -23,7 +23,6 @@ class Signature(util.JSONDeSerializable, util.ImmutableMap):
"""
__slots__ = ('alg', 'sig', 'nonce', 'jwk')
schema = util.load_schema('signature')
NONCE_LEN = 16
"""Size of nonce in bytes, as specified in the ACME protocol."""
@ -68,7 +67,6 @@ class Signature(util.JSONDeSerializable, util.ImmutableMap):
hashed, self.sig)
def to_json(self):
"""Prepare JSON serializable object."""
return {
'alg': self.alg,
'sig': jose.b64encode(self.sig),
@ -77,7 +75,7 @@ class Signature(util.JSONDeSerializable, util.ImmutableMap):
}
@classmethod
def _from_valid_json(cls, jobj):
def from_valid_json(cls, jobj):
return cls(alg=jobj['alg'], sig=jose.b64decode(jobj['sig']),
nonce=jose.b64decode(jobj['nonce']),
jwk=jose.JWK.from_json(jobj['jwk'], validate=False))
jwk=jose.JWK.from_valid_json(jobj['jwk']))

View file

@ -78,9 +78,8 @@ class SigatureTest(unittest.TestCase):
def test_from_json(self):
from letsencrypt.acme.other import Signature
# pylint: disable=protected-access
self.assertEqual(
self.signature, Signature._from_valid_json(self.jsig_from))
self.signature, Signature.from_valid_json(self.jsig_from))
if __name__ == '__main__':

View file

@ -2,7 +2,6 @@
import json
import pkg_resources
import jsonschema
import zope.interface
from letsencrypt.acme import errors
@ -34,73 +33,6 @@ def load_schema(name):
__name__, "schemata/%s.json" % name)))
class JSONDeSerializable(object):
"""JSON (de)serializable object."""
zope.interface.implements(interfaces.IJSONSerializable)
schema = NotImplemented
@classmethod
def validate_json(cls, jobj):
"""Validate JSON object against schema.
:raises letsencrypt.acme.errors.SchemaValidationError: if object
couldn't be validated.
"""
try:
jsonschema.validate(jobj, cls.schema)
except jsonschema.ValidationError as error:
raise errors.SchemaValidationError(error)
@classmethod
def from_json(cls, jobj, validate=True):
"""Deserialize from JSON.
Note that the input ``jobj`` has not been sanitized in any way.
:param jobj: JSON object.
:param bool validate: Validate against schema before deserializing.
Useful if :class:`JWK` is part of already validated json object.
:raises letsencrypt.acme.errors.SchemaValidationError: if ``validate``
was ``True`` and object couldn't be validated.
:returns: instance of the class
"""
if validate:
cls.validate_json(jobj)
return cls._from_valid_json(jobj)
@classmethod
def _from_valid_json(cls, jobj):
"""Deserializa from valid JSON object.
:param jobj: JSON object that has been validated against schema.
"""
raise NotImplementedError()
@classmethod
def json_loads(cls, json_string, validate=True):
"""Load JSON string."""
return cls.from_json(json.loads(json_string), validate)
def to_json(self):
"""Prepare JSON serializable object."""
raise NotImplementedError()
def json_dumps(self):
"""Dump to JSON string using proper serializer.
:returns: JSON serialized string.
:rtype: str
"""
return json.dumps(self, default=dump_ijsonserializable)
def dump_ijsonserializable(python_object):
"""Serialize IJSONSerializable to JSON.
@ -145,3 +77,73 @@ class ImmutableMap(object): # pylint: disable=too-few-public-methods
return '{0}({1})'.format(self.__class__.__name__, ', '.join(
'{0}={1!r}'.format(slot, getattr(self, slot))
for slot in self.__slots__))
class ACMEObject(ImmutableMap): # pylint: disable=too-few-public-methods
"""ACME object."""
zope.interface.implements(interfaces.IJSONSerializable)
zope.interface.classImplements(interfaces.IJSONDeserializable)
def to_json(self): # pragma: no cover
"""Serialize to JSON."""
raise NotImplementedError()
@classmethod
def from_valid_json(cls, jobj): # pragma: no cover
"""Deserialize from valid JSON object."""
raise NotImplementedError()
class TypedACMEObject(ACMEObject):
"""ACME object with type (immutable)."""
acme_type = NotImplemented
"""ACME "type" field. Subclasses must override."""
TYPES = NotImplemented
"""Types registered for JSON deserialization"""
@classmethod
def register(cls, msg_cls):
"""Register class for JSON deserialization."""
cls.TYPES[msg_cls.acme_type] = msg_cls
return msg_cls
def to_json(self):
"""Get JSON serializable object.
:returns: Serializable JSON object representing ACME typed object.
:meth:`validate` will almost certianly not work, due to reasons
explained in :class:`letsencrypt.acme.interfaces.IJSONSerializable`.
:rtype: dict
"""
jobj = self._fields_to_json()
jobj["type"] = self.acme_type
return jobj
def _fields_to_json(self): # pragma: no cover
"""Prepare ACME object fields for JSON serialiazation.
Subclasses must override this method.
:returns: Serializable JSON object containg all ACME object fields
apart from "type".
:rtype: dict
"""
raise NotImplementedError()
@classmethod
def from_valid_json(cls, jobj):
"""Deserialize ACME object from valid JSON object.
:raises letsencrypt.acme.errors.UnrecognizedTypeError: if type
of the ACME object has not been registered.
"""
try:
msg_cls = cls.TYPES[jobj["type"]]
except KeyError:
raise errors.UnrecognizedTypeError(jobj["type"])
return msg_cls.from_valid_json(jobj)

View file

@ -9,79 +9,16 @@ from letsencrypt.acme import errors
from letsencrypt.acme import interfaces
class MockJSONSerialiazable(object):
# pylint: disable=missing-docstring,too-few-public-methods,no-self-use
zope.interface.implements(interfaces.IJSONSerializable)
def to_json(self):
return [3, 2, 1]
class JSONDeSerializableTest(unittest.TestCase):
"""Tests for letsencrypt.acme.util.JSONDeSerializable."""
def setUp(self):
from letsencrypt.acme.util import JSONDeSerializable
class Tester(JSONDeSerializable):
# pylint: disable=missing-docstring,no-self-use,
# pylint: disable=too-few-public-methods
zope.interface.implements(interfaces.IJSONSerializable)
schema = {'type': 'integer'}
def __init__(self, jobj):
self.jobj = jobj
@classmethod
def _from_valid_json(cls, jobj):
return cls(jobj)
def to_json(self):
return {'foo': MockJSONSerialiazable()}
self.tester_cls = Tester
def test_validate_invalid_json(self):
self.assertRaises(errors.SchemaValidationError,
self.tester_cls.validate_json, 'bang!')
def test_validate_valid_json(self):
self.tester_cls.validate_json(5)
def test_from_json(self):
self.assertEqual(5, self.tester_cls.from_json(5, validate=True).jobj)
def test_from_json_no_validation(self):
self.assertEqual(['1', 2], self.tester_cls.from_json(
['1', 2], validate=False).jobj)
def test_from_valid_json_raises_error(self):
from letsencrypt.acme.util import JSONDeSerializable
# pylint: disable=protected-access
self.assertRaises(
NotImplementedError, JSONDeSerializable._from_valid_json, 'foo')
def test_json_loads(self):
tester = self.tester_cls.json_loads('5', validate=True)
self.assertEqual(tester.jobj, 5)
def test_json_loads_no_validation(self):
self.assertEqual(
'foo', self.tester_cls.json_loads('"foo"', validate=False).jobj)
def test_to_json_raises_error(self):
from letsencrypt.acme.util import JSONDeSerializable
self.assertRaises(NotImplementedError, JSONDeSerializable().to_json)
def test_json_dumps(self):
self.assertEqual(
self.tester_cls('foo').json_dumps(), '{"foo": [3, 2, 1]}')
class DumpIJSONSerializableTest(unittest.TestCase):
"""Tests for letsencrypt.acme.util.dump_ijsonserializable."""
class MockJSONSerialiazable(object):
# pylint: disable=missing-docstring,too-few-public-methods,no-self-use
zope.interface.implements(interfaces.IJSONSerializable)
def to_json(self):
return [3, 2, 1]
@classmethod
def _call(cls, obj):
from letsencrypt.acme.util import dump_ijsonserializable
@ -91,7 +28,7 @@ class DumpIJSONSerializableTest(unittest.TestCase):
self.assertEqual('5', self._call(5))
def test_ijsonserializable(self):
self.assertEqual('[3, 2, 1]', self._call(MockJSONSerialiazable()))
self.assertEqual('[3, 2, 1]', self._call(self.MockJSONSerialiazable()))
def test_raises_type_error(self):
self.assertRaises(TypeError, self._call, object())
@ -163,5 +100,44 @@ class ImmutableMapTest(unittest.TestCase):
self.assertEqual("B(x='foo', y='bar')", repr(self.B(x='foo', y='bar')))
class TypedACMEObjectTest(unittest.TestCase):
def setUp(self):
from letsencrypt.acme.util import TypedACMEObject
# pylint: disable=missing-docstring,abstract-method
# pylint: disable=too-few-public-methods
class MockParentTypedACMEObject(TypedACMEObject):
TYPES = {}
@MockParentTypedACMEObject.register
class MockTypedACMEObject(MockParentTypedACMEObject):
acme_type = 'test'
@classmethod
def from_valid_json(cls, unused_obj):
return '!'
def _fields_to_json(self):
return {'foo': 'bar'}
self.parent_cls = MockParentTypedACMEObject
self.msg = MockTypedACMEObject()
def test_to_json(self):
self.assertEqual(self.msg.to_json(), {
'type': 'test',
'foo': 'bar',
})
def test_from_json_unknown_type_fails(self):
self.assertRaises(errors.UnrecognizedTypeError,
self.parent_cls.from_valid_json, {'type': 'bar'})
def test_from_json_returns_obj(self):
self.assertEqual(self.parent_cls.from_valid_json({'type': 'test'}), '!')
if __name__ == '__main__':
unittest.main()