diff --git a/letsencrypt/acme/errors.py b/letsencrypt/acme/errors.py index a70271894..c88881412 100644 --- a/letsencrypt/acme/errors.py +++ b/letsencrypt/acme/errors.py @@ -4,10 +4,10 @@ class Error(Exception): """Generic ACME error.""" class ValidationError(Error): - """ACME message validation error.""" + """ACME object validation error.""" -class UnrecognizedMessageTypeError(ValidationError): - """Unrecognized ACME message type error.""" +class UnrecognizedTypeError(ValidationError): + """Unrecognized ACME object type error.""" class SchemaValidationError(ValidationError): - """JSON schema ACME message validation error.""" + """JSON schema ACME object validation error.""" diff --git a/letsencrypt/acme/interfaces.py b/letsencrypt/acme/interfaces.py index 0d9e56495..c80b6c2fa 100644 --- a/letsencrypt/acme/interfaces.py +++ b/letsencrypt/acme/interfaces.py @@ -2,6 +2,7 @@ import zope.interface # pylint: disable=no-self-argument,no-method-argument,no-init,inherit-non-class +# pylint: disable=too-few-public-methods class IJSONSerializable(zope.interface.Interface): @@ -20,3 +21,13 @@ class IJSONSerializable(zope.interface.Interface): :rtype: dict """ + +class IJSONDeserializable(zope.interface.Interface): + """JSON deserializable class.""" + + def from_valid_json(jobj): + """Deserialize valid JSON object. + + :param jobj: Validated JSON object. + + """ diff --git a/letsencrypt/acme/jose.py b/letsencrypt/acme/jose.py index 6d2097ba5..b08e1e036 100644 --- a/letsencrypt/acme/jose.py +++ b/letsencrypt/acme/jose.py @@ -13,7 +13,7 @@ def _leading_zeros(arg): return arg -class JWK(util.JSONDeSerializable, util.ImmutableMap): +class JWK(util.ACMEObject): # pylint: disable=too-few-public-methods """JSON Web Key. @@ -21,7 +21,6 @@ class JWK(util.JSONDeSerializable, util.ImmutableMap): """ __slots__ = ('key',) - schema = util.load_schema('jwk') @classmethod def _encode_param(cls, param): @@ -35,7 +34,6 @@ class JWK(util.JSONDeSerializable, util.ImmutableMap): return long(binascii.hexlify(b64decode(param)), 16) def to_json(self): - """Serialize to JSON.""" return { 'kty': 'RSA', # TODO 'n': self._encode_param(self.key.n), @@ -43,7 +41,7 @@ class JWK(util.JSONDeSerializable, util.ImmutableMap): } @classmethod - def _from_valid_json(cls, jobj): + def from_valid_json(cls, jobj): assert 'RSA' == jobj['kty'] # TODO return cls(key=Crypto.PublicKey.RSA.construct( (cls._decode_param(jobj['n']), cls._decode_param(jobj['e'])))) diff --git a/letsencrypt/acme/jose_test.py b/letsencrypt/acme/jose_test.py index a1a872704..4f2828ec5 100644 --- a/letsencrypt/acme/jose_test.py +++ b/letsencrypt/acme/jose_test.py @@ -45,7 +45,7 @@ class JWKTest(unittest.TestCase): def test_from_json(self): from letsencrypt.acme.jose import JWK - self.assertEqual(self.jwk256, JWK.from_json(self.jwk256json)) + self.assertEqual(self.jwk256, JWK.from_valid_json(self.jwk256json)) # TODO: fix schemata to allow RSA512 #self.assertEqual(self.jwk512, JWK.from_json(self.jwk512json)) diff --git a/letsencrypt/acme/messages.py b/letsencrypt/acme/messages.py index a345be9f9..3e3efd3b7 100644 --- a/letsencrypt/acme/messages.py +++ b/letsencrypt/acme/messages.py @@ -1,58 +1,21 @@ """ACME protocol messages.""" +import json + +import jsonschema import M2Crypto -import zope.interface from letsencrypt.acme import errors -from letsencrypt.acme import interfaces from letsencrypt.acme import jose from letsencrypt.acme import other from letsencrypt.acme import util -class Message(util.JSONDeSerializable, util.ImmutableMap): - """ACME message. - - Messages are considered immutable. - - """ - zope.interface.implements(interfaces.IJSONSerializable) - - acme_type = NotImplemented - """ACME message "type" field. Subclasses must override.""" - +class Message(util.TypedACMEObject): + # _fields_to_json | pylint: disable=abstract-method + """ACME message.""" TYPES = {} - """Message types registered for JSON deserialization""" - @classmethod - def register(cls, msg_cls): - """Register class for JSON deserialization.""" - cls.TYPES[msg_cls.acme_type] = msg_cls - return msg_cls - - def to_json(self): - """Get JSON serializable object. - - :returns: Serializable JSON object representing ACME message. - :meth:`validate` will almost certainly not work, due to reasons - explained in :class:`letsencrypt.acme.interfaces.IJSONSerializable`. - :rtype: dict - - """ - jobj = self._fields_to_json() - jobj["type"] = self.acme_type - return jobj - - def _fields_to_json(self): - """Prepare ACME message fields for JSON serialiazation. - - Subclasses must override this method. - - :returns: Serializable JSON object containg all ACME message fields - apart from "type". - :rtype: dict - - """ - raise NotImplementedError() + schema = NotImplemented @classmethod def get_msg_cls(cls, jobj): @@ -74,30 +37,47 @@ class Message(util.JSONDeSerializable, util.ImmutableMap): try: msg_cls = cls.TYPES[msg_type] except KeyError: - raise errors.UnrecognizedMessageTypeError(msg_type) + raise errors.UnrecognizedTypeError(msg_type) return msg_cls @classmethod - def from_json(cls, jobj, validate=True): - """Deserialize validated ACME message from JSON string. + def from_json(cls, jobj): + """Deserialize from (possibly invalid) JSON object. - :param str jobj: JSON object. - :param bool validate: Validate against schema before deserializing. - Useful if :class:`JWK` is part of already validated json object. + Note that the input ``jobj`` has not been sanitized in any way. - :raises letsencrypt.acme.errors.ValidationError: if validation - was unsuccessful + :param jobj: JSON object. - :returns: Valid ACME message. - :rtype: subclass of :class:`Message` + :raises letsencrypt.acme.errors.SchemaValidationError: if ``validate`` + was ``True`` and object couldn't be validated. + + :returns: instance of the class """ msg_cls = cls.get_msg_cls(jobj) - if validate: - msg_cls.validate_json(jobj) - # pylint: disable=protected-access - return msg_cls._from_valid_json(jobj) + + try: + jsonschema.validate(jobj, msg_cls.schema) + except jsonschema.ValidationError as error: + raise errors.SchemaValidationError(error) + + return cls.from_valid_json(jobj) + + @classmethod + def json_loads(cls, json_string): + """Load JSON string.""" + return cls.from_json(json.loads(json_string)) + + def json_dumps(self, *args, **kwargs): + """Dump to JSON string using proper serializer. + + :returns: JSON serialized string. + :rtype: str + + """ + return json.dumps( + self, *args, default=util.dump_ijsonserializable, **kwargs) @Message.register # pylint: disable=too-few-public-methods @@ -118,7 +98,7 @@ class Challenge(Message): return fields @classmethod - def _from_valid_json(cls, jobj): + def from_valid_json(cls, jobj): return cls(session_id=jobj["sessionID"], nonce=jose.b64decode(jobj["nonce"]), challenges=jobj["challenges"], @@ -142,7 +122,7 @@ class ChallengeRequest(Message): } @classmethod - def _from_valid_json(cls, jobj): + def from_valid_json(cls, jobj): return cls(identifier=jobj["identifier"]) @@ -164,10 +144,10 @@ class Authorization(Message): return fields @classmethod - def _from_valid_json(cls, jobj): + def from_valid_json(cls, jobj): jwk = jobj.get("jwk") if jwk is not None: - jwk = jose.JWK.from_json(jwk, validate=False) + jwk = jose.JWK.from_valid_json(jwk) return cls(recovery_token=jobj.get("recoveryToken"), identifier=jobj.get("identifier"), jwk=jwk) @@ -236,12 +216,11 @@ class AuthorizationRequest(Message): return fields @classmethod - def _from_valid_json(cls, jobj): + def from_valid_json(cls, jobj): return cls(session_id=jobj["sessionID"], nonce=jose.b64decode(jobj["nonce"]), responses=jobj["responses"], - signature=other.Signature.from_json( - jobj["signature"], validate=False), + signature=other.Signature.from_valid_json(jobj["signature"]), contact=jobj.get("contact", [])) @@ -278,7 +257,7 @@ class Certificate(Message): return jose.b64encode(cert.as_der()) @classmethod - def _from_valid_json(cls, jobj): + def from_valid_json(cls, jobj): return cls(certificate=cls._decode_cert(jobj["certificate"]), chain=[cls._decode_cert(cert) for cert in jobj.get("chain", [])], @@ -344,10 +323,9 @@ class CertificateRequest(Message): } @classmethod - def _from_valid_json(cls, jobj): + def from_valid_json(cls, jobj): return cls(csr=cls._decode_csr(jobj["csr"]), - signature=other.Signature.from_json( - jobj["signature"], validate=False)) + signature=other.Signature.from_valid_json(jobj["signature"])) @Message.register # pylint: disable=too-few-public-methods @@ -366,7 +344,7 @@ class Defer(Message): return fields @classmethod - def _from_valid_json(cls, jobj): + def from_valid_json(cls, jobj): return cls(token=jobj["token"], interval=jobj.get("interval"), message=jobj.get("message")) @@ -396,7 +374,7 @@ class Error(Message): return fields @classmethod - def _from_valid_json(cls, jobj): + def from_valid_json(cls, jobj): return cls(error=jobj["error"], message=jobj.get("message"), more_info=jobj.get("moreInfo")) @@ -412,7 +390,7 @@ class Revocation(Message): return {} @classmethod - def _from_valid_json(cls, jobj): + def from_valid_json(cls, jobj): return cls() @@ -475,10 +453,9 @@ class RevocationRequest(Message): } @classmethod - def _from_valid_json(cls, jobj): + def from_valid_json(cls, jobj): return cls(certificate=cls._decode_cert(jobj["certificate"]), - signature=other.Signature.from_json( - jobj["signature"], validate=False)) + signature=other.Signature.from_valid_json(jobj["signature"])) @Message.register # pylint: disable=too-few-public-methods @@ -496,5 +473,5 @@ class StatusRequest(Message): return {"token": self.token} @classmethod - def _from_valid_json(cls, jobj): + def from_valid_json(cls, jobj): return cls(token=jobj["token"]) diff --git a/letsencrypt/acme/messages_test.py b/letsencrypt/acme/messages_test.py index 018854225..9ec4c5ba1 100644 --- a/letsencrypt/acme/messages_test.py +++ b/letsencrypt/acme/messages_test.py @@ -4,7 +4,6 @@ import unittest import Crypto.PublicKey.RSA import M2Crypto.X509 -import mock from letsencrypt.acme import errors from letsencrypt.acme import jose @@ -28,7 +27,13 @@ class MessageTest(unittest.TestCase): def setUp(self): # pylint: disable=missing-docstring,too-few-public-methods from letsencrypt.acme.messages import Message - class TestMessage(Message): + + class MockParentMessage(Message): + # pylint: disable=abstract-method + TYPES = {} + + @MockParentMessage.register + class MockMessage(MockParentMessage): acme_type = 'test' schema = { 'type': 'object', @@ -37,53 +42,45 @@ class MessageTest(unittest.TestCase): 'name': {'type': 'string'}, }, } + __slots__ = ('price', 'name') @classmethod - def _from_valid_json(cls, jobj): - return jobj + def from_valid_json(cls, jobj): + return cls(price=jobj.get('price'), name=jobj.get('name')) def _fields_to_json(self): - return {'foo': 'bar'} + # pylint: disable=no-member + return {'price': self.price, 'name': self.name} - self.msg_cls = TestMessage - - def test_to_json(self): - self.assertEqual(self.msg_cls().to_json(), { - 'type': 'test', - 'foo': 'bar', - }) - - def test_fields_to_json_not_implemented(self): - from letsencrypt.acme.messages import Message - # pylint: disable=protected-access - self.assertRaises(NotImplementedError, Message()._fields_to_json) - - @classmethod - def _from_json(cls, jobj, validate=True): - from letsencrypt.acme.messages import Message - return Message.from_json(jobj, validate) + self.parent_cls = MockParentMessage + self.msg = MockMessage(price=123, name='foo') def test_from_json_non_dict_fails(self): - self.assertRaises(errors.ValidationError, self._from_json, []) + self.assertRaises(errors.ValidationError, self.parent_cls.from_json, []) def test_from_json_dict_no_type_fails(self): - self.assertRaises(errors.ValidationError, self._from_json, {}) + self.assertRaises(errors.ValidationError, self.parent_cls.from_json, {}) - def test_from_json_unknown_type_fails(self): - self.assertRaises(errors.UnrecognizedMessageTypeError, - self._from_json, {'type': 'bar'}) + def test_from_json_unrecognized_type(self): + self.assertRaises(errors.UnrecognizedTypeError, + self.parent_cls.from_json, {'type': 'foo'}) - @mock.patch('letsencrypt.acme.messages.Message.TYPES') - def test_from_json_validate_errors(self, types): - types.__getitem__.side_effect = lambda x: {'foo': self.msg_cls}[x] + def test_from_json_validates(self): self.assertRaises(errors.SchemaValidationError, - self._from_json, {'type': 'foo', 'price': 'asd'}) + self.parent_cls.from_json, + {'type': 'test', 'price': 'asd'}) - @mock.patch('letsencrypt.acme.messages.Message.TYPES') - def test_from_json_valid_returns_cls(self, types): - types.__getitem__.side_effect = lambda x: {'foo': self.msg_cls}[x] - self.assertEqual(self._from_json({'type': 'foo'}, validate=False), - {'type': 'foo'}) + def test_from_json(self): + self.assertEqual(self.msg, self.parent_cls.from_json( + {'type': 'test', 'name': 'foo', 'price': 123})) + + def test_json_loads(self): + self.assertEqual(self.msg, self.parent_cls.json_loads( + '{"type": "test", "name": "foo", "price": 123}')) + + def test_json_dumps(self): + self.assertEqual(self.msg.json_dumps(sort_keys=True), + '{"name": "foo", "price": 123, "type": "test"}') class ChallengeTest(unittest.TestCase): @@ -408,10 +405,7 @@ class RevocationTest(unittest.TestCase): def setUp(self): from letsencrypt.acme.messages import Revocation self.msg = Revocation() - - self.jmsg = { - 'type': 'revocation', - } + self.jmsg = {'type': 'revocation'} def test_to_json(self): self.assertEqual(self.msg.to_json(), self.jmsg) diff --git a/letsencrypt/acme/other.py b/letsencrypt/acme/other.py index 1fe0d9463..a874a8cab 100644 --- a/letsencrypt/acme/other.py +++ b/letsencrypt/acme/other.py @@ -1,4 +1,4 @@ -"""JSON objects in ACME protocol other than messages.""" +"""Other ACME objects.""" import logging from Crypto import Random @@ -9,7 +9,7 @@ from letsencrypt.acme import jose from letsencrypt.acme import util -class Signature(util.JSONDeSerializable, util.ImmutableMap): +class Signature(util.ACMEObject): """ACME signature. :ivar str alg: Signature algorithm. @@ -23,7 +23,6 @@ class Signature(util.JSONDeSerializable, util.ImmutableMap): """ __slots__ = ('alg', 'sig', 'nonce', 'jwk') - schema = util.load_schema('signature') NONCE_LEN = 16 """Size of nonce in bytes, as specified in the ACME protocol.""" @@ -68,7 +67,6 @@ class Signature(util.JSONDeSerializable, util.ImmutableMap): hashed, self.sig) def to_json(self): - """Prepare JSON serializable object.""" return { 'alg': self.alg, 'sig': jose.b64encode(self.sig), @@ -77,7 +75,7 @@ class Signature(util.JSONDeSerializable, util.ImmutableMap): } @classmethod - def _from_valid_json(cls, jobj): + def from_valid_json(cls, jobj): return cls(alg=jobj['alg'], sig=jose.b64decode(jobj['sig']), nonce=jose.b64decode(jobj['nonce']), - jwk=jose.JWK.from_json(jobj['jwk'], validate=False)) + jwk=jose.JWK.from_valid_json(jobj['jwk'])) diff --git a/letsencrypt/acme/other_test.py b/letsencrypt/acme/other_test.py index 292fbd886..5279cd66b 100644 --- a/letsencrypt/acme/other_test.py +++ b/letsencrypt/acme/other_test.py @@ -78,9 +78,8 @@ class SigatureTest(unittest.TestCase): def test_from_json(self): from letsencrypt.acme.other import Signature - # pylint: disable=protected-access self.assertEqual( - self.signature, Signature._from_valid_json(self.jsig_from)) + self.signature, Signature.from_valid_json(self.jsig_from)) if __name__ == '__main__': diff --git a/letsencrypt/acme/util.py b/letsencrypt/acme/util.py index 8906e584a..d7ea7fac6 100644 --- a/letsencrypt/acme/util.py +++ b/letsencrypt/acme/util.py @@ -2,7 +2,6 @@ import json import pkg_resources -import jsonschema import zope.interface from letsencrypt.acme import errors @@ -34,73 +33,6 @@ def load_schema(name): __name__, "schemata/%s.json" % name))) -class JSONDeSerializable(object): - """JSON (de)serializable object.""" - zope.interface.implements(interfaces.IJSONSerializable) - - schema = NotImplemented - - @classmethod - def validate_json(cls, jobj): - """Validate JSON object against schema. - - :raises letsencrypt.acme.errors.SchemaValidationError: if object - couldn't be validated. - - """ - try: - jsonschema.validate(jobj, cls.schema) - except jsonschema.ValidationError as error: - raise errors.SchemaValidationError(error) - - @classmethod - def from_json(cls, jobj, validate=True): - """Deserialize from JSON. - - Note that the input ``jobj`` has not been sanitized in any way. - - :param jobj: JSON object. - :param bool validate: Validate against schema before deserializing. - Useful if :class:`JWK` is part of already validated json object. - - :raises letsencrypt.acme.errors.SchemaValidationError: if ``validate`` - was ``True`` and object couldn't be validated. - - :returns: instance of the class - - """ - if validate: - cls.validate_json(jobj) - return cls._from_valid_json(jobj) - - @classmethod - def _from_valid_json(cls, jobj): - """Deserializa from valid JSON object. - - :param jobj: JSON object that has been validated against schema. - - """ - raise NotImplementedError() - - @classmethod - def json_loads(cls, json_string, validate=True): - """Load JSON string.""" - return cls.from_json(json.loads(json_string), validate) - - def to_json(self): - """Prepare JSON serializable object.""" - raise NotImplementedError() - - def json_dumps(self): - """Dump to JSON string using proper serializer. - - :returns: JSON serialized string. - :rtype: str - - """ - return json.dumps(self, default=dump_ijsonserializable) - - def dump_ijsonserializable(python_object): """Serialize IJSONSerializable to JSON. @@ -145,3 +77,73 @@ class ImmutableMap(object): # pylint: disable=too-few-public-methods return '{0}({1})'.format(self.__class__.__name__, ', '.join( '{0}={1!r}'.format(slot, getattr(self, slot)) for slot in self.__slots__)) + + +class ACMEObject(ImmutableMap): # pylint: disable=too-few-public-methods + """ACME object.""" + zope.interface.implements(interfaces.IJSONSerializable) + zope.interface.classImplements(interfaces.IJSONDeserializable) + + def to_json(self): # pragma: no cover + """Serialize to JSON.""" + raise NotImplementedError() + + @classmethod + def from_valid_json(cls, jobj): # pragma: no cover + """Deserialize from valid JSON object.""" + raise NotImplementedError() + + +class TypedACMEObject(ACMEObject): + """ACME object with type (immutable).""" + + acme_type = NotImplemented + """ACME "type" field. Subclasses must override.""" + + TYPES = NotImplemented + """Types registered for JSON deserialization""" + + @classmethod + def register(cls, msg_cls): + """Register class for JSON deserialization.""" + cls.TYPES[msg_cls.acme_type] = msg_cls + return msg_cls + + def to_json(self): + """Get JSON serializable object. + + :returns: Serializable JSON object representing ACME typed object. + :meth:`validate` will almost certianly not work, due to reasons + explained in :class:`letsencrypt.acme.interfaces.IJSONSerializable`. + :rtype: dict + + """ + jobj = self._fields_to_json() + jobj["type"] = self.acme_type + return jobj + + def _fields_to_json(self): # pragma: no cover + """Prepare ACME object fields for JSON serialiazation. + + Subclasses must override this method. + + :returns: Serializable JSON object containg all ACME object fields + apart from "type". + :rtype: dict + + """ + raise NotImplementedError() + + @classmethod + def from_valid_json(cls, jobj): + """Deserialize ACME object from valid JSON object. + + :raises letsencrypt.acme.errors.UnrecognizedTypeError: if type + of the ACME object has not been registered. + + """ + try: + msg_cls = cls.TYPES[jobj["type"]] + except KeyError: + raise errors.UnrecognizedTypeError(jobj["type"]) + return msg_cls.from_valid_json(jobj) diff --git a/letsencrypt/acme/util_test.py b/letsencrypt/acme/util_test.py index cf71963e8..65c52f7ff 100644 --- a/letsencrypt/acme/util_test.py +++ b/letsencrypt/acme/util_test.py @@ -9,79 +9,16 @@ from letsencrypt.acme import errors from letsencrypt.acme import interfaces -class MockJSONSerialiazable(object): - # pylint: disable=missing-docstring,too-few-public-methods,no-self-use - zope.interface.implements(interfaces.IJSONSerializable) - - def to_json(self): - return [3, 2, 1] - - -class JSONDeSerializableTest(unittest.TestCase): - """Tests for letsencrypt.acme.util.JSONDeSerializable.""" - - def setUp(self): - from letsencrypt.acme.util import JSONDeSerializable - - class Tester(JSONDeSerializable): - # pylint: disable=missing-docstring,no-self-use, - # pylint: disable=too-few-public-methods - zope.interface.implements(interfaces.IJSONSerializable) - - schema = {'type': 'integer'} - - def __init__(self, jobj): - self.jobj = jobj - - @classmethod - def _from_valid_json(cls, jobj): - return cls(jobj) - - def to_json(self): - return {'foo': MockJSONSerialiazable()} - - self.tester_cls = Tester - - def test_validate_invalid_json(self): - self.assertRaises(errors.SchemaValidationError, - self.tester_cls.validate_json, 'bang!') - - def test_validate_valid_json(self): - self.tester_cls.validate_json(5) - - def test_from_json(self): - self.assertEqual(5, self.tester_cls.from_json(5, validate=True).jobj) - - def test_from_json_no_validation(self): - self.assertEqual(['1', 2], self.tester_cls.from_json( - ['1', 2], validate=False).jobj) - - def test_from_valid_json_raises_error(self): - from letsencrypt.acme.util import JSONDeSerializable - # pylint: disable=protected-access - self.assertRaises( - NotImplementedError, JSONDeSerializable._from_valid_json, 'foo') - - def test_json_loads(self): - tester = self.tester_cls.json_loads('5', validate=True) - self.assertEqual(tester.jobj, 5) - - def test_json_loads_no_validation(self): - self.assertEqual( - 'foo', self.tester_cls.json_loads('"foo"', validate=False).jobj) - - def test_to_json_raises_error(self): - from letsencrypt.acme.util import JSONDeSerializable - self.assertRaises(NotImplementedError, JSONDeSerializable().to_json) - - def test_json_dumps(self): - self.assertEqual( - self.tester_cls('foo').json_dumps(), '{"foo": [3, 2, 1]}') - - class DumpIJSONSerializableTest(unittest.TestCase): """Tests for letsencrypt.acme.util.dump_ijsonserializable.""" + class MockJSONSerialiazable(object): + # pylint: disable=missing-docstring,too-few-public-methods,no-self-use + zope.interface.implements(interfaces.IJSONSerializable) + + def to_json(self): + return [3, 2, 1] + @classmethod def _call(cls, obj): from letsencrypt.acme.util import dump_ijsonserializable @@ -91,7 +28,7 @@ class DumpIJSONSerializableTest(unittest.TestCase): self.assertEqual('5', self._call(5)) def test_ijsonserializable(self): - self.assertEqual('[3, 2, 1]', self._call(MockJSONSerialiazable())) + self.assertEqual('[3, 2, 1]', self._call(self.MockJSONSerialiazable())) def test_raises_type_error(self): self.assertRaises(TypeError, self._call, object()) @@ -163,5 +100,44 @@ class ImmutableMapTest(unittest.TestCase): self.assertEqual("B(x='foo', y='bar')", repr(self.B(x='foo', y='bar'))) +class TypedACMEObjectTest(unittest.TestCase): + + def setUp(self): + from letsencrypt.acme.util import TypedACMEObject + + # pylint: disable=missing-docstring,abstract-method + # pylint: disable=too-few-public-methods + + class MockParentTypedACMEObject(TypedACMEObject): + TYPES = {} + + @MockParentTypedACMEObject.register + class MockTypedACMEObject(MockParentTypedACMEObject): + acme_type = 'test' + + @classmethod + def from_valid_json(cls, unused_obj): + return '!' + + def _fields_to_json(self): + return {'foo': 'bar'} + + self.parent_cls = MockParentTypedACMEObject + self.msg = MockTypedACMEObject() + + def test_to_json(self): + self.assertEqual(self.msg.to_json(), { + 'type': 'test', + 'foo': 'bar', + }) + + def test_from_json_unknown_type_fails(self): + self.assertRaises(errors.UnrecognizedTypeError, + self.parent_cls.from_valid_json, {'type': 'bar'}) + + def test_from_json_returns_obj(self): + self.assertEqual(self.parent_cls.from_valid_json({'type': 'test'}), '!') + + if __name__ == '__main__': unittest.main()