Working mixin

This commit is contained in:
Adrien Ferrand 2019-12-28 15:01:29 +01:00
parent ba74e32beb
commit a7770845f8
6 changed files with 59 additions and 48 deletions

View file

@ -10,6 +10,7 @@ import requests
import six
from acme import fields
from acme.mixins import ResourceMixin
logger = logging.getLogger(__name__)
@ -28,13 +29,19 @@ class Challenge(jose.TypedJSONObjectWithFields):
return UnrecognizedChallenge.from_json(jobj)
class ChallengeResponse(jose.TypedJSONObjectWithFields):
class ChallengeResponse(ResourceMixin, jose.TypedJSONObjectWithFields):
# _fields_to_partial_json
"""ACME challenge response."""
TYPES = {} # type: dict
resource_type = 'challenge'
resource = fields.Resource(resource_type)
def to_partial_json(self):
jobj = super(ChallengeResponse, self).to_partial_json()
if self.le_auto_version == 2:
jobj.pop('type', None)
return jobj
class UnrecognizedChallenge(Challenge):
"""Unrecognized challenge.

View file

@ -25,6 +25,7 @@ from acme.magic_typing import Dict # pylint: disable=unused-import, no-name-in-
from acme.magic_typing import List # pylint: disable=unused-import, no-name-in-module
from acme.magic_typing import Set # pylint: disable=unused-import, no-name-in-module
from acme.magic_typing import Text # pylint: disable=unused-import, no-name-in-module
from acme.mixins import VersionedLEACMEMixin
logger = logging.getLogger(__name__)
@ -1003,7 +1004,9 @@ class ClientNetwork(object):
:rtype: `josepy.JWS`
"""
jobj = jws.compliant_rfc8555_payload(obj, acme_version)
if isinstance(obj, VersionedLEACMEMixin):
obj.le_auto_version = acme_version
jobj = obj.json_dumps(indent=2).encode() if obj else b''
logger.debug('JWS payload:\n%s', jobj)
kwargs = {
"alg": self.alg,

View file

@ -56,31 +56,3 @@ class JWS(jose.JWS):
protect=frozenset(['nonce', 'url', 'kid', 'jwk', 'alg']),
nonce=nonce, url=url, kid=kid,
include_jwk=include_jwk)
def compliant_rfc8555_payload(obj, acme_version):
"""
This method extracts and fixes the JWS payload in respect with RFC 8555.
:param jose.JSONDeSerializable obj:
:param int acme_version: Version of ACME protocol to use
:rtype: str
"""
# POST-as-GET requests should contain an empty JWS payload.
# See: https://tools.ietf.org/html/rfc8555#section-6.3
if not obj:
return b''
jobj = obj.json_dumps(indent=2).encode()
if acme_version == 2:
# Challenge POST JWS bodies should be exactly `{}`.
# See: https://tools.ietf.org/html/rfc8555#section-7.5.1
if isinstance(obj, ChallengeResponse):
return b'{}'
# Field `resource` is not part of RFC 8555.
jobj = json.loads(jobj.decode('utf-8'))
jobj.pop('resource', None)
jobj = json.dumps(jobj).encode('utf-8')
return jobj

View file

@ -9,6 +9,7 @@ from acme import errors
from acme import fields
from acme import jws
from acme import util
from acme.mixins import ResourceMixin
try:
from collections.abc import Hashable # pylint: disable=no-name-in-module
@ -356,13 +357,13 @@ class Registration(ResourceBody):
@Directory.register
class NewRegistration(Registration):
class NewRegistration(ResourceMixin, Registration):
"""New registration."""
resource_type = 'new-reg'
resource = fields.Resource(resource_type)
class UpdateRegistration(Registration):
class UpdateRegistration(ResourceMixin, Registration):
"""Update registration."""
resource_type = 'reg'
resource = fields.Resource(resource_type)
@ -499,13 +500,13 @@ class Authorization(ResourceBody):
@Directory.register
class NewAuthorization(Authorization):
class NewAuthorization(ResourceMixin, Authorization):
"""New authorization."""
resource_type = 'new-authz'
resource = fields.Resource(resource_type)
class UpdateAuthorization(Authorization):
class UpdateAuthorization(ResourceMixin, Authorization):
"""Update authorization."""
resource_type = 'authz'
resource = fields.Resource(resource_type)
@ -523,7 +524,7 @@ class AuthorizationResource(ResourceWithURI):
@Directory.register
class CertificateRequest(jose.JSONObjectWithFields):
class CertificateRequest(ResourceMixin, jose.JSONObjectWithFields):
"""ACME new-cert request.
:ivar josepy.util.ComparableX509 csr:
@ -549,7 +550,7 @@ class CertificateResource(ResourceWithURI):
@Directory.register
class Revocation(jose.JSONObjectWithFields):
class Revocation(ResourceMixin, jose.JSONObjectWithFields):
"""Revocation message.
:ivar .ComparableX509 certificate: `OpenSSL.crypto.X509` wrapped in

29
acme/acme/mixins.py Normal file
View file

@ -0,0 +1,29 @@
class VersionedLEACMEMixin(object):
@property
def le_auto_version(self):
return getattr(self, '_le_auto_version', 1)
@le_auto_version.setter
def le_auto_version(self, version):
# We need to use object.__setattr__ to not depend on the specific implementation of
# __setattr__ in current class (eg. jose.TypedJSONObjectWithFields raises AttributeError
# for any attempt to set an attribute to make objects immutable).
object.__setattr__(self, '_le_auto_version', version)
def __setattr__(self, key, value):
if key == 'le_auto_version':
# Needed to allow @property to operate properly. See comment above.
object.__setattr__(self, key, value)
else:
super(VersionedLEACMEMixin, self).__setattr__(key, value)
class ResourceMixin(VersionedLEACMEMixin):
def fields_to_partial_json(self):
if hasattr(super(ResourceMixin, self), 'fields_to_partial_json'):
jobj = super(ResourceMixin, self).fields_to_partial_json()
if self.le_auto_version == 2:
jobj.pop('resource', None)
return jobj
raise AttributeError('This class does not implement method fields_to_partial_json().')

View file

@ -3,6 +3,8 @@ import unittest
import josepy as jose
from acme.mixins import ResourceMixin
import test_util
KEY = jose.JWKRSA.load(test_util.load_vector('rsa512_key.pem'))
@ -62,34 +64,31 @@ class JWSTest(unittest.TestCase):
self.assertEqual(jws.signature.combined.jwk, self.pubkey)
class JWSPayloadCompliant(unittest.TestCase):
"""Test for compliant_rfc8555_payload"""
def test_post_as_get_payload(self):
from acme.jws import compliant_rfc8555_payload
jobj = compliant_rfc8555_payload(None, 2)
self.assertEqual(jobj, b'')
class JWSPayloadRFC8555Compliant(unittest.TestCase):
"""Test for RFC8555 compliance of JWS generated from resources/challenges"""
def test_challenge_payload(self):
from acme.jws import compliant_rfc8555_payload
from acme.challenges import HTTP01Response
challenge_body = HTTP01Response()
challenge_body.le_auto_version = 2
jobj = compliant_rfc8555_payload(challenge_body, 2)
jobj = challenge_body.json_dumps(indent=2).encode()
# RFC8555 states that challenge requests must have an empty payload.
self.assertEqual(jobj, b'{}')
def test_resource_payload(self):
from acme.jws import compliant_rfc8555_payload
from acme.messages import ResourceBody
from acme import fields
class _MockResourceResponse(ResourceBody):
class _MockResourceResponse(ResourceMixin, ResourceBody):
resource_type = 'one-resource'
resource = fields.Resource(resource_type)
resource_body = _MockResourceResponse()
resource_body.le_auto_version = 2
jobj = compliant_rfc8555_payload(resource_body, 2)
jobj = resource_body.json_dumps(indent=2).encode()
# Having a resource field in JWS payloads for resources is not compliant with RFC8555.
self.assertTrue(b'resource' not in jobj)