Non-schema errors for acme.messages

This commit is contained in:
Jakub Warmuz 2015-02-13 12:37:00 +00:00
parent 3d883fd77f
commit 76085f0bb0
No known key found for this signature in database
GPG key ID: 2A7BAD3A489B52EA
10 changed files with 294 additions and 150 deletions

View file

@ -28,6 +28,12 @@ class IJSONDeserializable(zope.interface.Interface):
def from_valid_json(jobj):
"""Deserialize valid JSON object.
:param jobj: Validated JSON object.
:param jobj: JSON object validated against JSON schema (found in
schemata/ directory).
:raises letsencrypt.acme.errors.ValidationError: It might be the
case that ``jobj`` validates against schema, but still is not
valid (e.g. unparseable X509 certificate, or wrong padding in
JOSE base64 encoded string).
"""

View file

@ -1,51 +1,5 @@
"""JOSE."""
import base64
import binascii
import Crypto.PublicKey.RSA
from letsencrypt.acme import util
def _leading_zeros(arg):
if len(arg) % 2:
return '0' + arg
return arg
class JWK(util.ACMEObject):
# pylint: disable=too-few-public-methods
"""JSON Web Key.
.. todo:: Currently works for RSA public keys only.
"""
__slots__ = ('key',)
@classmethod
def _encode_param(cls, param):
"""Encode numeric key parameter."""
return b64encode(binascii.unhexlify(
_leading_zeros(hex(param)[2:].rstrip('L'))))
@classmethod
def _decode_param(cls, param):
"""Decode numeric key parameter."""
return long(binascii.hexlify(b64decode(param)), 16)
def to_json(self):
return {
'kty': 'RSA', # TODO
'n': self._encode_param(self.key.n),
'e': self._encode_param(self.key.e),
}
@classmethod
def from_valid_json(cls, jobj):
assert 'RSA' == jobj['kty'] # TODO
return cls(key=Crypto.PublicKey.RSA.construct(
(cls._decode_param(jobj['n']), cls._decode_param(jobj['e']))))
# https://tools.ietf.org/html/draft-ietf-jose-json-web-signature-37#appendix-C
#

View file

@ -1,54 +1,6 @@
"""Tests for letsencrypt.acme.jose."""
import pkg_resources
import unittest
import Crypto.PublicKey.RSA
RSA256_KEY = Crypto.PublicKey.RSA.importKey(pkg_resources.resource_string(
'letsencrypt.client.tests', 'testdata/rsa256_key.pem'))
RSA512_KEY = Crypto.PublicKey.RSA.importKey(pkg_resources.resource_string(
'letsencrypt.client.tests', 'testdata/rsa512_key.pem'))
class JWKTest(unittest.TestCase):
"""Tests fro letsencrypt.acme.jose.JWK."""
def setUp(self):
from letsencrypt.acme.jose import JWK
self.jwk256 = JWK(key=RSA256_KEY.publickey())
self.jwk256json = {
'kty': 'RSA',
'e': 'AQAB',
'n': 'rHVztFHtH92ucFJD_N_HW9AsdRsUuHUBBBDlHwNlRd3fp5'
'80rv2-6QWE30cWgdmJS86ObRz6lUTor4R0T-3C5Q',
}
self.jwk512 = JWK(key=RSA512_KEY.publickey())
self.jwk512json = {
'kty': 'RSA',
'e': 'AQAB',
'n': '9LYRcVE3Nr-qleecEcX8JwVDnjeG1X7ucsCasuuZM0e09c'
'mYuUzxIkMjO_9x4AVcvXXRXPEV-LzWWkfkTlzRMw',
}
def test_equals(self):
self.assertEqual(self.jwk256, self.jwk256)
self.assertEqual(self.jwk512, self.jwk512)
def test_not_equals(self):
self.assertNotEqual(self.jwk256, self.jwk512)
self.assertNotEqual(self.jwk512, self.jwk256)
def test_to_json(self):
self.assertEqual(self.jwk256.to_json(), self.jwk256json)
self.assertEqual(self.jwk512.to_json(), self.jwk512json)
def test_from_json(self):
from letsencrypt.acme.jose import JWK
self.assertEqual(self.jwk256, JWK.from_valid_json(self.jwk256json))
# TODO: fix schemata to allow RSA512
#self.assertEqual(self.jwk512, JWK.from_json(self.jwk512json))
# https://en.wikipedia.org/wiki/Base64#Examples
B64_PADDING_EXAMPLES = {

View file

@ -2,7 +2,6 @@
import json
import jsonschema
import M2Crypto
from letsencrypt.acme import errors
from letsencrypt.acme import jose
@ -100,7 +99,7 @@ class Challenge(Message):
@classmethod
def from_valid_json(cls, jobj):
return cls(session_id=jobj["sessionID"],
nonce=jose.b64decode(jobj["nonce"]),
nonce=cls._decode_b64jose(jobj["nonce"]),
challenges=jobj["challenges"],
combinations=jobj.get("combinations", []))
@ -147,7 +146,7 @@ class Authorization(Message):
def from_valid_json(cls, jobj):
jwk = jobj.get("jwk")
if jwk is not None:
jwk = jose.JWK.from_valid_json(jwk)
jwk = other.JWK.from_valid_json(jwk)
return cls(recovery_token=jobj.get("recoveryToken"),
identifier=jobj.get("identifier"), jwk=jwk)
@ -218,7 +217,7 @@ class AuthorizationRequest(Message):
@classmethod
def from_valid_json(cls, jobj):
return cls(session_id=jobj["sessionID"],
nonce=jose.b64decode(jobj["nonce"]),
nonce=cls._decode_b64jose(jobj["nonce"]),
responses=jobj["responses"],
signature=other.Signature.from_valid_json(jobj["signature"]),
contact=jobj.get("contact", []))
@ -247,15 +246,6 @@ class Certificate(Message):
fields["refresh"] = self.refresh
return fields
@classmethod
def _decode_cert(cls, b64der):
return util.ComparableX509(M2Crypto.X509.load_cert_der_string(
jose.b64decode(b64der)))
@classmethod
def _encode_cert(cls, cert):
return jose.b64encode(cert.as_der())
@classmethod
def from_valid_json(cls, jobj):
return cls(certificate=cls._decode_cert(jobj["certificate"]),
@ -307,15 +297,6 @@ class CertificateRequest(Message):
"""
return self.signature.verify(self.csr.as_der())
@classmethod
def _decode_csr(cls, b64der):
return util.ComparableX509(M2Crypto.X509.load_request_der_string(
jose.b64decode(b64der)))
@classmethod
def _encode_csr(cls, csr):
return jose.b64encode(csr.as_der())
def _fields_to_json(self):
return {
"csr": self._encode_csr(self.csr),
@ -437,15 +418,6 @@ class RevocationRequest(Message):
"""
return self.signature.verify(self.certificate.as_der())
@classmethod
def _decode_cert(cls, b64der):
return util.ComparableX509(M2Crypto.X509.load_cert_der_string(
jose.b64decode(b64der)))
@classmethod
def _encode_cert(cls, cert):
return jose.b64encode(cert.as_der())
def _fields_to_json(self):
return {
"certificate": self._encode_cert(self.certificate),

View file

@ -146,7 +146,7 @@ class ChallengeRequestTest(unittest.TestCase):
class AuthorizationTest(unittest.TestCase):
def setUp(self):
jwk = jose.JWK(key=KEY.publickey())
jwk = other.JWK(key=KEY.publickey())
from letsencrypt.acme.messages import Authorization
self.msg = Authorization(recovery_token='tok', jwk=jwk,
@ -192,7 +192,7 @@ class AuthorizationRequestTest(unittest.TestCase):
]
self.contact = ["mailto:cert-admin@example.com", "tel:+12025551212"]
signature = other.Signature(
alg='RS256', jwk=jose.JWK(key=KEY.publickey()),
alg='RS256', jwk=other.JWK(key=KEY.publickey()),
sig='-v\xd8\xc2\xa3\xba0\xd6\x92\x16\xb5.\xbe\xa1[\x04\xbe'
'\x1b\xa1X\xd2)\x18\x94\x8f\xd7\xd0\xc0\xbbcI`W\xdf v'
'\xe4\xed\xe8\x03J\xe8\xc8<?\xc8W\x94\x94cj(\xe7\xaa$'
@ -297,7 +297,7 @@ class CertificateRequestTest(unittest.TestCase):
def setUp(self):
signature = other.Signature(
alg='RS256', jwk=jose.JWK(key=KEY.publickey()),
alg='RS256', jwk=other.JWK(key=KEY.publickey()),
sig='\x15\xed\x84\xaa:\xf2DO\x0e9 \xbcg\xf8\xc0\xcf\x87\x9a'
'\x95\xeb\xffT[\x84[\xec\x85\x7f\x8eK\xe9\xc2\x12\xc8Q'
'\xafo\xc6h\x07\xba\xa6\xdf\xd1\xa7"$\xba=Z\x13n\x14\x0b'
@ -421,7 +421,7 @@ class RevocationRequestTest(unittest.TestCase):
self.sig_nonce = '\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9'
signature = other.Signature(
alg='RS256', jwk=jose.JWK(key=KEY.publickey()),
alg='RS256', jwk=other.JWK(key=KEY.publickey()),
sig='eJ\xfe\x12"U\x87\x8b\xbf/ ,\xdeP\xb2\xdc1\xb00\xe5\x1dB'
'\xfch<\xc6\x9eH@!\x1c\x16\xb2\x0b_\xc4\xddP\x89\xc8\xce?'
'\x16g\x069I\xb9\xb3\x91\xb9\x0e$3\x9f\x87\x8e\x82\xca\xc5'

View file

@ -1,14 +1,58 @@
"""Other ACME objects."""
import binascii
import logging
from Crypto import Random
import Crypto.Random
import Crypto.Hash.SHA256
import Crypto.PublicKey.RSA
import Crypto.Signature.PKCS1_v1_5
from letsencrypt.acme import errors
from letsencrypt.acme import jose
from letsencrypt.acme import util
class JWK(util.ACMEObject):
# pylint: disable=too-few-public-methods
"""JSON Web Key.
.. todo:: Currently works for RSA public keys only.
"""
__slots__ = ('key',)
@classmethod
def _encode_param(cls, data):
def _leading_zeros(arg):
if len(arg) % 2:
return '0' + arg
return arg
return jose.b64encode(binascii.unhexlify(
_leading_zeros(hex(data)[2:].rstrip('L'))))
@classmethod
def _decode_param(cls, data):
try:
return long(binascii.hexlify(cls._decode_b64jose(data)), 16)
except ValueError: # invalid literal for long() with base 16
raise errors.ValidationError(data)
def to_json(self):
return {
'kty': 'RSA', # TODO
'n': self._encode_param(self.key.n),
'e': self._encode_param(self.key.e),
}
@classmethod
def from_valid_json(cls, jobj):
assert 'RSA' == jobj['kty'] # TODO
return cls(key=Crypto.PublicKey.RSA.construct(
(cls._decode_param(jobj['n']),
cls._decode_param(jobj['e']))))
class Signature(util.ACMEObject):
"""ACME signature.
@ -17,18 +61,18 @@ class Signature(util.ACMEObject):
:ivar str nonce: Nonce.
:ivar jwk: JWK.
:type jwk: :class:`letsencrypt.acme.jose.JWK`
:type jwk: :class:`JWK`
.. todo:: Currently works for RSA keys only.
"""
__slots__ = ('alg', 'sig', 'nonce', 'jwk')
NONCE_LEN = 16
"""Size of nonce in bytes, as specified in the ACME protocol."""
NONCE_SIZE = 16
"""Minimum size of nonce in bytes."""
@classmethod
def from_msg(cls, msg, key, nonce=None):
def from_msg(cls, msg, key, nonce=None, nonce_size=None):
"""Create signature with nonce prepended to the message.
.. todo:: Protect against crypto unicode errors... is this sufficient?
@ -39,13 +83,15 @@ class Signature(util.ACMEObject):
:param key: Key used for signing.
:type key: :class:`Crypto.PublicKey.RSA`
:param nonce: Nonce to be used. If None, nonce of
:const:`NONCE_LEN` size will be randomly generated.
:type nonce: str or None
:param str nonce: Nonce to be used. If None, nonce of
``nonce_size`` will be randomly generated.
:param int nonce_size: Size of the automatically generated nonce.
Defaults to :const:`NONCE_SIZE`.
"""
nonce_size = cls.NONCE_SIZE if nonce_size is None else nonce_size
if nonce is None:
nonce = Random.get_random_bytes(cls.NONCE_LEN)
nonce = Crypto.Random.get_random_bytes(nonce_size)
msg_with_nonce = nonce + msg
hashed = Crypto.Hash.SHA256.new(msg_with_nonce)
@ -54,7 +100,7 @@ class Signature(util.ACMEObject):
logging.debug('%s signed as %s', msg_with_nonce, sig)
return cls(alg='RS256', sig=sig, nonce=nonce,
jwk=jose.JWK(key=key.publickey()))
jwk=JWK(key=key.publickey()))
def verify(self, msg):
"""Verify the signature.
@ -63,8 +109,8 @@ class Signature(util.ACMEObject):
"""
hashed = Crypto.Hash.SHA256.new(self.nonce + msg)
return Crypto.Signature.PKCS1_v1_5.new(self.jwk.key).verify(
hashed, self.sig)
return bool(Crypto.Signature.PKCS1_v1_5.new(self.jwk.key).verify(
hashed, self.sig))
def to_json(self):
return {
@ -76,6 +122,8 @@ class Signature(util.ACMEObject):
@classmethod
def from_valid_json(cls, jobj):
return cls(alg=jobj['alg'], sig=jose.b64decode(jobj['sig']),
nonce=jose.b64decode(jobj['nonce']),
jwk=jose.JWK.from_valid_json(jobj['jwk']))
assert jobj['alg'] == 'RS256' # TODO: support other algorithms
return cls(alg=jobj['alg'], sig=cls._decode_b64jose(jobj['sig']),
nonce=cls._decode_b64jose(
jobj['nonce'], cls.NONCE_SIZE, minimum=True),
jwk=JWK.from_valid_json(jobj['jwk']))

View file

@ -4,11 +4,60 @@ import unittest
import Crypto.PublicKey.RSA
from letsencrypt.acme import jose
from letsencrypt.acme import errors
RSA256_KEY = Crypto.PublicKey.RSA.importKey(pkg_resources.resource_string(
'letsencrypt.client.tests', 'testdata/rsa256_key.pem'))
RSA512_KEY = Crypto.PublicKey.RSA.importKey(pkg_resources.resource_string(
'letsencrypt.client.tests', 'testdata/rsa512_key.pem'))
class JWKTest(unittest.TestCase):
"""Tests fro letsencrypt.acme.other.JWK."""
def setUp(self):
from letsencrypt.acme.other import JWK
self.jwk256 = JWK(key=RSA256_KEY.publickey())
self.jwk256json = {
'kty': 'RSA',
'e': 'AQAB',
'n': 'rHVztFHtH92ucFJD_N_HW9AsdRsUuHUBBBDlHwNlRd3fp5'
'80rv2-6QWE30cWgdmJS86ObRz6lUTor4R0T-3C5Q',
}
self.jwk512 = JWK(key=RSA512_KEY.publickey())
self.jwk512json = {
'kty': 'RSA',
'e': 'AQAB',
'n': '9LYRcVE3Nr-qleecEcX8JwVDnjeG1X7ucsCasuuZM0e09c'
'mYuUzxIkMjO_9x4AVcvXXRXPEV-LzWWkfkTlzRMw',
}
def test_equals(self):
self.assertEqual(self.jwk256, self.jwk256)
self.assertEqual(self.jwk512, self.jwk512)
def test_not_equals(self):
self.assertNotEqual(self.jwk256, self.jwk512)
self.assertNotEqual(self.jwk512, self.jwk256)
def test_to_json(self):
self.assertEqual(self.jwk256.to_json(), self.jwk256json)
self.assertEqual(self.jwk512.to_json(), self.jwk512json)
def test_from_json(self):
from letsencrypt.acme.other import JWK
self.assertEqual(self.jwk256, JWK.from_valid_json(self.jwk256json))
# TODO: fix schemata to allow RSA512
#self.assertEqual(self.jwk512, JWK.from_json(self.jwk512json))
def test_from_json_non_schema_errors(self):
# valid against schema, but still failing
from letsencrypt.acme.other import JWK
self.assertRaises(errors.ValidationError, JWK.from_valid_json,
{'kty': 'RSA', 'e': 'AQAB', 'n': ''})
self.assertRaises(errors.ValidationError, JWK.from_valid_json,
{'kty': 'RSA', 'e': 'AQAB', 'n': '1'})
class SigatureTest(unittest.TestCase):
@ -23,7 +72,9 @@ class SigatureTest(unittest.TestCase):
'\xb9X\xc3w\xaa\xc0_\xd0\x05$y>l#\x10<\x96\xd2\xcdr\xa3'
'\x1b\xa1\xf5!f\xef\xc64\xb6\x13')
self.nonce = '\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9'
self.jwk = jose.JWK(key=RSA256_KEY.publickey())
from letsencrypt.acme.other import JWK
self.jwk = JWK(key=RSA256_KEY.publickey())
b64sig = ('SUPYKucUnhlTt8_sMxLiigOYdf_wlOLXPI-o7aRLTsOquVjDd6r'
'AX9AFJHk-bCMQPJbSzXKjG6H1IWbvxjS2Ew')
@ -81,6 +132,14 @@ class SigatureTest(unittest.TestCase):
self.assertEqual(
self.signature, Signature.from_valid_json(self.jsig_from))
def test_from_json_non_schema_errors(self):
from letsencrypt.acme.other import Signature
jwk = self.jwk.to_json()
self.assertRaises(errors.ValidationError, Signature.from_valid_json, {
'alg': 'RS256', 'sig': 'x', 'nonce': '', 'jwk': jwk})
self.assertRaises(errors.ValidationError, Signature.from_valid_json, {
'alg': 'RS256', 'sig': '', 'nonce': 'x', 'jwk': jwk})
if __name__ == '__main__':
unittest.main()

View file

@ -1,11 +1,14 @@
"""ACME utilities."""
import binascii
import json
import pkg_resources
import M2Crypto.X509
import zope.interface
from letsencrypt.acme import errors
from letsencrypt.acme import interfaces
from letsencrypt.acme import jose
class ComparableX509(object): # pylint: disable=too-few-public-methods
@ -93,6 +96,54 @@ class ACMEObject(ImmutableMap): # pylint: disable=too-few-public-methods
"""Deserialize from valid JSON object."""
raise NotImplementedError()
@classmethod
def _decode_b64jose(cls, data, size=None, minimum=False):
try:
decoded = jose.b64decode(data)
except TypeError:
raise errors.ValidationError()
if size is not None and ((not minimum and len(decoded) != size)
or (minimum and len(decoded) < size)):
raise errors.ValidationError()
return decoded
@classmethod
def _encode_hex16(cls, data):
return binascii.hexlify(data)
@classmethod
def _decode_hex16(cls, data, size=None, minimum=False):
if size is not None and ((not minimum and len(data) != size * 2)
or (minimum and len(data) < size * 2)):
raise errors.ValidationError()
return binascii.unhexlify(data)
@classmethod
def _encode_cert(cls, cert):
return jose.b64encode(cert.as_der())
@classmethod
def _decode_cert(cls, b64der):
try:
return ComparableX509(M2Crypto.X509.load_cert_der_string(
cls._decode_b64jose(b64der)))
except M2Crypto.X509.X509Error:
raise errors.ValidationError()
@classmethod
def _encode_csr(cls, csr):
return cls._encode_cert(csr)
@classmethod
def _decode_csr(cls, b64der):
try:
return ComparableX509(M2Crypto.X509.load_request_der_string(
cls._decode_b64jose(b64der)))
except M2Crypto.X509.X509Error:
raise errors.ValidationError()
class TypedACMEObject(ACMEObject):
"""ACME object with type (immutable)."""

View file

@ -1,14 +1,23 @@
"""Tests for letsencrypt.acme.util."""
import functools
import json
import os
import pkg_resources
import unittest
import M2Crypto.X509
import zope.interface
from letsencrypt.acme import errors
from letsencrypt.acme import interfaces
CERT = M2Crypto.X509.load_cert(pkg_resources.resource_filename(
'letsencrypt.client.tests', os.path.join('testdata', 'cert.pem')))
CSR = M2Crypto.X509.load_request(pkg_resources.resource_filename(
'letsencrypt.client.tests', os.path.join('testdata', 'csr.pem')))
class DumpIJSONSerializableTest(unittest.TestCase):
"""Tests for letsencrypt.acme.util.dump_ijsonserializable."""
@ -100,6 +109,93 @@ class ImmutableMapTest(unittest.TestCase):
self.assertEqual("B(x='foo', y='bar')", repr(self.B(x='foo', y='bar')))
class ACMEObjectTest(unittest.TestCase):
"""Tests for letsencrypt.acme.util.ACMEObject."""
# pylint: disable=protected-access
def setUp(self):
self.b64_cert = (
'MIIB3jCCAYigAwIBAgICBTkwDQYJKoZIhvcNAQELBQAwdzELMAkGA1UEBhM'
'CVVMxETAPBgNVBAgMCE1pY2hpZ2FuMRIwEAYDVQQHDAlBbm4gQXJib3IxKz'
'ApBgNVBAoMIlVuaXZlcnNpdHkgb2YgTWljaGlnYW4gYW5kIHRoZSBFRkYxF'
'DASBgNVBAMMC2V4YW1wbGUuY29tMB4XDTE0MTIxMTIyMzQ0NVoXDTE0MTIx'
'ODIyMzQ0NVowdzELMAkGA1UEBhMCVVMxETAPBgNVBAgMCE1pY2hpZ2FuMRI'
'wEAYDVQQHDAlBbm4gQXJib3IxKzApBgNVBAoMIlVuaXZlcnNpdHkgb2YgTW'
'ljaGlnYW4gYW5kIHRoZSBFRkYxFDASBgNVBAMMC2V4YW1wbGUuY29tMFwwD'
'QYJKoZIhvcNAQEBBQADSwAwSAJBAKx1c7RR7R_drnBSQ_zfx1vQLHUbFLh1'
'AQQQ5R8DZUXd36efNK79vukFhN9HFoHZiUvOjm0c-pVE6K-EdE_twuUCAwE'
'AATANBgkqhkiG9w0BAQsFAANBAC24z0IdwIVKSlntksllvr6zJepBH5fMnd'
'fk3XJp10jT6VE-14KNtjh02a56GoraAvJAT5_H67E8GvJ_ocNnB_o'
)
self.b64_csr = (
'MIIBXTCCAQcCAQAweTELMAkGA1UEBhMCVVMxETAPBgNVBAgMCE1pY2hpZ2F'
'uMRIwEAYDVQQHDAlBbm4gQXJib3IxDDAKBgNVBAoMA0VGRjEfMB0GA1UECw'
'wWVW5pdmVyc2l0eSBvZiBNaWNoaWdhbjEUMBIGA1UEAwwLZXhhbXBsZS5jb'
'20wXDANBgkqhkiG9w0BAQEFAANLADBIAkEArHVztFHtH92ucFJD_N_HW9As'
'dRsUuHUBBBDlHwNlRd3fp580rv2-6QWE30cWgdmJS86ObRz6lUTor4R0T-3'
'C5QIDAQABoCkwJwYJKoZIhvcNAQkOMRowGDAWBgNVHREEDzANggtleGFtcG'
'xlLmNvbTANBgkqhkiG9w0BAQsFAANBAHJH_O6BtC9aGzEVCMGOZ7z9iIRHW'
'Szr9x_bOzn7hLwsbXPAgO1QxEwL-X-4g20Gn9XBE1N9W6HCIEut2d8wACg'
)
def test_decode_b64_jose_padding_error(self):
from letsencrypt.acme.util import ACMEObject
self.assertRaises(
errors.ValidationError, ACMEObject._decode_b64jose, 'x')
def test_decode_b64_jose_size(self):
from letsencrypt.acme.util import ACMEObject
self.assertEqual('foo', ACMEObject._decode_b64jose('Zm9v', size=3))
self.assertRaises(
errors.ValidationError, ACMEObject._decode_b64jose, 'Zm9v', size=2)
self.assertRaises(
errors.ValidationError, ACMEObject._decode_b64jose, 'Zm9v', size=4)
def test_decode_b64_jose_minimum_size(self):
from letsencrypt.acme.util import ACMEObject
self.assertEqual(
'foo', ACMEObject._decode_b64jose('Zm9v', size=3, minimum=True))
self.assertEqual(
'foo', ACMEObject._decode_b64jose('Zm9v', size=2, minimum=True))
self.assertRaises(errors.ValidationError, ACMEObject._decode_b64jose,
'Zm9v', size=4, minimum=True)
def test_encode_hex16(self):
from letsencrypt.acme.util import ACMEObject
self.assertEqual('666f6f', ACMEObject._encode_hex16('foo'))
def test_decode_hex16(self):
from letsencrypt.acme.util import ACMEObject
self.assertEqual('foo', ACMEObject._decode_hex16('666f6f'))
def test_decode_hex16_minimum_size(self):
from letsencrypt.acme.util import ACMEObject
self.assertEqual(
'foo', ACMEObject._decode_hex16('666f6f', size=3, minimum=True))
self.assertEqual(
'foo', ACMEObject._decode_hex16('666f6f', size=2, minimum=True))
self.assertRaises(errors.ValidationError, ACMEObject._decode_hex16,
'666f6f', size=4, minimum=True)
def test_encode_cert(self):
from letsencrypt.acme.util import ACMEObject
self.assertEqual(self.b64_cert, ACMEObject._encode_cert(CERT))
def test_decode_cert(self):
from letsencrypt.acme.util import ACMEObject
self.assertEqual(CERT, ACMEObject._decode_cert(self.b64_cert))
self.assertRaises(errors.ValidationError, ACMEObject._decode_cert, '')
def test_encode_csr(self):
from letsencrypt.acme.util import ACMEObject
self.assertEqual(self.b64_csr, ACMEObject._encode_csr(CSR))
def test_decode_csr(self):
from letsencrypt.acme.util import ACMEObject
self.assertEqual(CSR, ACMEObject._decode_csr(self.b64_csr))
self.assertRaises(errors.ValidationError, ACMEObject._decode_csr, '')
class TypedACMEObjectTest(unittest.TestCase):
def setUp(self):

View file

@ -5,6 +5,7 @@ import time
import requests
from letsencrypt.acme import errors as acme_errors
from letsencrypt.acme import messages
from letsencrypt.client import errors
@ -36,8 +37,8 @@ class Network(object):
:returns: Server response message.
:rtype: :class:`letsencrypt.acme.messages.Message`
:raises TypeError: if `msg` is not JSON serializable
:raises jsonschema.ValidationError: if not valid ACME message
:raises letsencrypt.acme.errors.ValidationError: if `msg` is not
valid serializable ACME JSON message.
:raises errors.LetsEncryptClientError: in case of connection error
or if response from server is not a valid ACME message.
@ -53,7 +54,12 @@ class Network(object):
raise errors.LetsEncryptClientError(
'Sending ACME message to server has failed: %s' % error)
return messages.Message.from_json(response.json(), validate=True)
json_string = response.json()
try:
return messages.Message.from_json(json_string)
except acme_errors.ValidationError as error:
logging.error(json_string)
raise # TODO
def send_and_receive_expected(self, msg, expected):
"""Send ACME message to server and return expected message.