Tests, lint, and docs for messages2

This commit is contained in:
Jakub Warmuz 2015-03-27 09:47:03 +00:00
parent c985a8987b
commit 3762622ee9
No known key found for this signature in database
GPG key ID: 2A7BAD3A489B52EA
2 changed files with 225 additions and 34 deletions

View file

@ -13,31 +13,37 @@ class Error(jose.JSONObjectWithFields, Exception):
ERROR_TYPE_NAMESPACE = 'urn:acme:error:'
ERROR_TYPE_DESCRIPTIONS = {
"malformed": "The request message was malformed",
"unauthorized": "The client lacks sufficient authorization",
"serverInternal": "The server experienced an internal error",
"badCSR": "The CSR is unacceptable (e.g., due to a short key)",
'malformed': 'The request message was malformed',
'unauthorized': 'The client lacks sufficient authorization',
'serverInternal': 'The server experienced an internal error',
'badCSR': 'The CSR is unacceptable (e.g., due to a short key)',
}
typ = jose.Field('type', omitempty=True) # Boulder omits, spec requires
# TODO: Boulder omits 'type' and 'instance', spec requires
typ = jose.Field('type', omitempty=True)
title = jose.Field('title', omitempty=True)
detail = jose.Field('detail')
# Boulder omits, spec requires
instance = jose.Field('instance', omitempty=True)
@typ.encoder
def typ(value):
return ERROR_TYPE_NAMESPACE + value
def typ(value): # pylint: disable=missing-docstring,no-self-argument
return Error.ERROR_TYPE_NAMESPACE + value
@typ.decoder
def typ(value):
if not value.startswith(ERROR_TYPE_NAMESPACE):
raise jose.DeserializationError('Unrecognized error type')
def typ(value): # pylint: disable=missing-docstring,no-self-argument
# pylint thinks isinstance(value, Error), so startswith is not found
# pylint: disable=no-member
if not value.startswith(Error.ERROR_TYPE_NAMESPACE):
raise jose.DeserializationError('Missing error type prefix')
return value[len(ERROR_TYPE_NAMESPACE):]
without_prefix = value[len(Error.ERROR_TYPE_NAMESPACE):]
if without_prefix not in Error.ERROR_TYPE_DESCRIPTIONS:
raise jose.DeserializationError('Error type not recognized')
return without_prefix
@property
def description(self):
def description(self): # pylint: disable=missing-docstring,no-self-argument
return self.ERROR_TYPE_DESCRIPTIONS[self.typ]
@ -61,7 +67,7 @@ class _Constant(jose.JSONDeSerializable):
return cls.POSSIBLE_NAMES[value]
def __repr__(self):
return '{0}({0})'.format(self.__class__.__name__, self.name)
return '{0}({1})'.format(self.__class__.__name__, self.name)
def __eq__(self, other):
return isinstance(other, type(self)) and other.name == self.name
@ -131,26 +137,32 @@ class Registration(ResourceBody):
class ChallengeResource(Resource, jose.JSONObjectWithFields):
"""Challenge resource.
:ivar body: `.challenges.Challenge`
:ivar body: `.challenges.ChallengeBody`
:ivar authz_uri: URI found in the 'up' Link header.
"""
__slots__ = ('body', 'authz_uri')
@property
def uri(self):
return body.uri
def uri(self): # pylint: disable=missing-docstring,no-self-argument
# bug? 'method already defined line None'
# pylint: disable=function-redefined
return self.body.uri
class Challenge(ResourceBody):
class ChallengeBody(ResourceBody):
"""Challenge resource body.
Confusingly, this has a similar name to `.challenges.Challenge`, as
well as `.achallanges.AnnotatedChallenge` or
`.achallanges.IndexedChallenge`. Use names such as ``challb`` to
distinguish instances of this class from ``achall`` or ``ichall``.
.. todo::
Confusingly, this has the same name as
`challenges.Challenge`. Indeed, this class could be integrated
with challenges.Challenge, but this way it would be confusing
when compared to acme-spec, where all challenges are presented
without 'uri', 'status', or 'validated' fields.
This class could be integrated with challenges.Challenge, but
this way it would be confusing when compared to acme-spec, where
all challenges are presented without 'uri', 'status', or
'validated' fields.
"""
@ -160,15 +172,15 @@ class Challenge(ResourceBody):
validated = fields.RFC3339Field('validated', omitempty=True)
def to_json(self):
jobj = super(Challenge, self).to_json()
jobj = super(ChallengeBody, self).to_json()
jobj.update(self.chall.to_json())
return jobj
@classmethod
def fields_from_json(cls, jobj):
fields = super(Challenge, cls).fields_from_json(jobj)
fields['chall'] = challenges.Challenge.from_json(jobj)
return fields
jobj_fields = super(ChallengeBody, cls).fields_from_json(jobj)
jobj_fields['chall'] = challenges.Challenge.from_json(jobj)
return jobj_fields
class AuthorizationResource(Resource):
@ -206,7 +218,8 @@ class Authorization(ResourceBody):
@challenges.decoder
def challenges(value): # pylint: disable=missing-docstring,no-self-argument
return tuple(
ChallengeResource(body=Challenge.from_json(chall), authz_uri=None)
ChallengeResource(
body=ChallengeBody.from_json(chall), authz_uri=None)
for chall in value)
@property
@ -238,23 +251,29 @@ class CertificateResource(Resource):
class Revocation(jose.JSONObjectWithFields):
"""Revocation message."""
"""Revocation message.
:ivar revoke: Either a `datetime.datetime` or `NOW`.
"""
NOW = 'now'
"""A possible value for `revoke`, denoting that certificate should
be revoked now."""
revoke = jose.Field('revoke')
authorizations = CertificateRequest._fields['authorizations']
@revoke.decoder
def revoke(value):
if jobj == NOW:
return jobj
def revoke(value): # pylint: disable=missing-docstring,no-self-argument
if value == Revocation.NOW:
return value
else:
return fields.RFC3339Field.default_decoder(value)
@revoke.encoder
def revoke(value):
if jobj == NOW:
def revoke(value): # pylint: disable=missing-docstring,no-self-argument
if value == Revocation.NOW:
return value
else:
return fields.RFC3339Field.default_encoder(value)

View file

@ -0,0 +1,172 @@
"""Tests for letsencrypt.acme.messages2."""
import datetime
import unittest
import mock
import pytz
from letsencrypt.acme import challenges
from letsencrypt.acme import jose
class ErrorTest(unittest.TestCase):
"""Tests for letsencrypt.acme.messages2.Error."""
def setUp(self):
from letsencrypt.acme.messages2 import Error
self.error = Error(detail='foo', typ='malformed')
def test_typ_prefix(self):
self.assertEqual('malformed', self.error.typ)
self.assertEqual(
'urn:acme:error:malformed', self.error.to_json()['type'])
self.assertEqual(
'malformed', self.error.from_json(self.error.to_json()).typ)
def test_typ_decoder_missing_prefix(self):
from letsencrypt.acme.messages2 import Error
self.assertRaises(jose.DeserializationError, Error.from_json,
{'detail': 'foo', 'type': 'malformed'})
self.assertRaises(jose.DeserializationError, Error.from_json,
{'detail': 'foo', 'type': 'not valid bare type'})
def test_typ_decoder_not_recognized(self):
from letsencrypt.acme.messages2 import Error
self.assertRaises(jose.DeserializationError, Error.from_json,
{'detail': 'foo', 'type': 'urn:acme:error:baz'})
def test_description(self):
self.assertEqual(
'The request message was malformed', self.error.description)
class ConstantTest(unittest.TestCase):
"""Tests for letsencrypt.acme.messages2._Constant."""
def setUp(self):
from letsencrypt.acme.messages2 import _Constant
class MockConstant(_Constant): # pylint: disable=missing-docstring
POSSIBLE_NAMES = {}
self.MockConstant = MockConstant # pylint: disable=invalid-name
self.const_a = MockConstant('a')
self.const_b = MockConstant('b')
def test_to_json(self):
self.assertEqual('a', self.const_a.to_json())
self.assertEqual('b', self.const_b.to_json())
def test_from_json(self):
self.assertEqual(self.const_a, self.MockConstant.from_json('a'))
self.assertRaises(
jose.DeserializationError, self.MockConstant.from_json, 'c')
def test_repr(self):
self.assertEqual('MockConstant(a)', repr(self.const_a))
self.assertEqual('MockConstant(b)', repr(self.const_b))
class ChallengeResourceTest(unittest.TestCase):
"""Tests for letsencrypt.acme.messages2.ChallengeResource."""
def test_uri(self):
from letsencrypt.acme.messages2 import ChallengeResource
self.assertEqual('http://challb', ChallengeResource(body=mock.MagicMock(
uri='http://challb'), authz_uri='http://authz').uri)
class ChallengeBodyTest(unittest.TestCase):
"""Tests for letsencrypt.acme.messages2.ChallengeBody."""
def setUp(self):
self.chall = challenges.DNS(token='foo')
from letsencrypt.acme.messages2 import ChallengeBody
from letsencrypt.acme.messages2 import STATUS_VALID
self.status = STATUS_VALID
self.challb = ChallengeBody(
uri='http://challb', chall=self.chall, status=self.status)
self.jobj_to = {
'uri': 'http://challb',
'status': self.status,
'type': 'dns',
'token': 'foo',
}
self.jobj_from = self.jobj_to.copy()
self.jobj_from['status'] = 'valid'
def test_to_json(self):
self.assertEqual(self.jobj_to, self.challb.to_json())
def test_fields_from_json(self):
from letsencrypt.acme.messages2 import ChallengeBody
self.assertEqual(self.challb, ChallengeBody.from_json(self.jobj_from))
class AuthorizationTest(unittest.TestCase):
"""Tests for letsencrypt.acme.messages2.Authorization."""
def setUp(self):
from letsencrypt.acme.messages2 import ChallengeBody
from letsencrypt.acme.messages2 import STATUS_VALID
self.challbs = (
ChallengeBody(
uri='http://challb1', status=STATUS_VALID,
chall=challenges.SimpleHTTPS(token='IlirfxKKXAsHtmzK29Pj8A')),
ChallengeBody(uri='http://challb2', status=STATUS_VALID,
chall=challenges.DNS(token='DGyRejmCefe7v4NfDGDKfA')),
ChallengeBody(uri='http://challb3', status=STATUS_VALID,
chall=challenges.RecoveryToken()),
)
combinations = ((0, 2), (1, 2))
from letsencrypt.acme.messages2 import Authorization
from letsencrypt.acme.messages2 import Identifier
from letsencrypt.acme.messages2 import IDENTIFIER_FQDN
identifier = Identifier(typ=IDENTIFIER_FQDN, value='example.com')
self.authz = Authorization(
identifier=identifier, combinations=combinations,
challenges=self.challbs)
self.jobj_from = {
'identifier': identifier.fully_serialize(),
'challenges': [challb.fully_serialize() for challb in self.challbs],
'combinations': combinations,
}
def test_from_json(self):
from letsencrypt.acme.messages2 import Authorization
Authorization.from_json(self.jobj_from)
def test_resolved_combinations(self):
self.assertEqual(self.authz.resolved_combinations, (
(self.challbs[0], self.challbs[2]),
(self.challbs[1], self.challbs[2]),
))
class RevocationTest(unittest.TestCase):
"""Tests for letsencrypt.acme.messages2.RevocationTest."""
def setUp(self):
from letsencrypt.acme.messages2 import Revocation
self.rev_now = Revocation(authorizations=(), revoke=Revocation.NOW)
self.rev_date = Revocation(authorizations=(), revoke=datetime.datetime(
2015, 3, 27, tzinfo=pytz.utc))
self.jobj_now = {'authorizations': (), 'revoke': Revocation.NOW}
self.jobj_date = {'authorizations': (),
'revoke': '2015-03-27T00:00:00Z'}
def test_revoke_decoder(self):
from letsencrypt.acme.messages2 import Revocation
self.assertEqual(self.rev_now, Revocation.from_json(self.jobj_now))
self.assertEqual(self.rev_date, Revocation.from_json(self.jobj_date))
def test_revoke_encoder(self):
self.assertEqual(self.jobj_now, self.rev_now.to_json())
self.assertEqual(self.jobj_date, self.rev_date.to_json())
if __name__ == '__main__':
unittest.main()