From 76085f0bb07ac6bbb5d5b228a48895cd086a209b Mon Sep 17 00:00:00 2001 From: Jakub Warmuz Date: Fri, 13 Feb 2015 12:37:00 +0000 Subject: [PATCH] Non-schema errors for acme.messages --- letsencrypt/acme/interfaces.py | 8 ++- letsencrypt/acme/jose.py | 46 --------------- letsencrypt/acme/jose_test.py | 48 ---------------- letsencrypt/acme/messages.py | 34 +---------- letsencrypt/acme/messages_test.py | 8 +-- letsencrypt/acme/other.py | 78 ++++++++++++++++++++----- letsencrypt/acme/other_test.py | 63 +++++++++++++++++++- letsencrypt/acme/util.py | 51 ++++++++++++++++ letsencrypt/acme/util_test.py | 96 +++++++++++++++++++++++++++++++ letsencrypt/client/network.py | 12 +++- 10 files changed, 294 insertions(+), 150 deletions(-) diff --git a/letsencrypt/acme/interfaces.py b/letsencrypt/acme/interfaces.py index c80b6c2fa..a106f2aca 100644 --- a/letsencrypt/acme/interfaces.py +++ b/letsencrypt/acme/interfaces.py @@ -28,6 +28,12 @@ class IJSONDeserializable(zope.interface.Interface): def from_valid_json(jobj): """Deserialize valid JSON object. - :param jobj: Validated JSON object. + :param jobj: JSON object validated against JSON schema (found in + schemata/ directory). + + :raises letsencrypt.acme.errors.ValidationError: It might be the + case that ``jobj`` validates against schema, but still is not + valid (e.g. unparseable X509 certificate, or wrong padding in + JOSE base64 encoded string). """ diff --git a/letsencrypt/acme/jose.py b/letsencrypt/acme/jose.py index b08e1e036..81c1abbf7 100644 --- a/letsencrypt/acme/jose.py +++ b/letsencrypt/acme/jose.py @@ -1,51 +1,5 @@ """JOSE.""" import base64 -import binascii - -import Crypto.PublicKey.RSA - -from letsencrypt.acme import util - - -def _leading_zeros(arg): - if len(arg) % 2: - return '0' + arg - return arg - - -class JWK(util.ACMEObject): - # pylint: disable=too-few-public-methods - """JSON Web Key. - - .. todo:: Currently works for RSA public keys only. - - """ - __slots__ = ('key',) - - @classmethod - def _encode_param(cls, param): - """Encode numeric key parameter.""" - return b64encode(binascii.unhexlify( - _leading_zeros(hex(param)[2:].rstrip('L')))) - - @classmethod - def _decode_param(cls, param): - """Decode numeric key parameter.""" - return long(binascii.hexlify(b64decode(param)), 16) - - def to_json(self): - return { - 'kty': 'RSA', # TODO - 'n': self._encode_param(self.key.n), - 'e': self._encode_param(self.key.e), - } - - @classmethod - def from_valid_json(cls, jobj): - assert 'RSA' == jobj['kty'] # TODO - return cls(key=Crypto.PublicKey.RSA.construct( - (cls._decode_param(jobj['n']), cls._decode_param(jobj['e'])))) - # https://tools.ietf.org/html/draft-ietf-jose-json-web-signature-37#appendix-C # diff --git a/letsencrypt/acme/jose_test.py b/letsencrypt/acme/jose_test.py index 4f2828ec5..42cf8051c 100644 --- a/letsencrypt/acme/jose_test.py +++ b/letsencrypt/acme/jose_test.py @@ -1,54 +1,6 @@ """Tests for letsencrypt.acme.jose.""" -import pkg_resources import unittest -import Crypto.PublicKey.RSA - - -RSA256_KEY = Crypto.PublicKey.RSA.importKey(pkg_resources.resource_string( - 'letsencrypt.client.tests', 'testdata/rsa256_key.pem')) -RSA512_KEY = Crypto.PublicKey.RSA.importKey(pkg_resources.resource_string( - 'letsencrypt.client.tests', 'testdata/rsa512_key.pem')) - - -class JWKTest(unittest.TestCase): - """Tests fro letsencrypt.acme.jose.JWK.""" - - def setUp(self): - from letsencrypt.acme.jose import JWK - self.jwk256 = JWK(key=RSA256_KEY.publickey()) - self.jwk256json = { - 'kty': 'RSA', - 'e': 'AQAB', - 'n': 'rHVztFHtH92ucFJD_N_HW9AsdRsUuHUBBBDlHwNlRd3fp5' - '80rv2-6QWE30cWgdmJS86ObRz6lUTor4R0T-3C5Q', - } - self.jwk512 = JWK(key=RSA512_KEY.publickey()) - self.jwk512json = { - 'kty': 'RSA', - 'e': 'AQAB', - 'n': '9LYRcVE3Nr-qleecEcX8JwVDnjeG1X7ucsCasuuZM0e09c' - 'mYuUzxIkMjO_9x4AVcvXXRXPEV-LzWWkfkTlzRMw', - } - - def test_equals(self): - self.assertEqual(self.jwk256, self.jwk256) - self.assertEqual(self.jwk512, self.jwk512) - - def test_not_equals(self): - self.assertNotEqual(self.jwk256, self.jwk512) - self.assertNotEqual(self.jwk512, self.jwk256) - - def test_to_json(self): - self.assertEqual(self.jwk256.to_json(), self.jwk256json) - self.assertEqual(self.jwk512.to_json(), self.jwk512json) - - def test_from_json(self): - from letsencrypt.acme.jose import JWK - self.assertEqual(self.jwk256, JWK.from_valid_json(self.jwk256json)) - # TODO: fix schemata to allow RSA512 - #self.assertEqual(self.jwk512, JWK.from_json(self.jwk512json)) - # https://en.wikipedia.org/wiki/Base64#Examples B64_PADDING_EXAMPLES = { diff --git a/letsencrypt/acme/messages.py b/letsencrypt/acme/messages.py index 3e3efd3b7..63978e9fd 100644 --- a/letsencrypt/acme/messages.py +++ b/letsencrypt/acme/messages.py @@ -2,7 +2,6 @@ import json import jsonschema -import M2Crypto from letsencrypt.acme import errors from letsencrypt.acme import jose @@ -100,7 +99,7 @@ class Challenge(Message): @classmethod def from_valid_json(cls, jobj): return cls(session_id=jobj["sessionID"], - nonce=jose.b64decode(jobj["nonce"]), + nonce=cls._decode_b64jose(jobj["nonce"]), challenges=jobj["challenges"], combinations=jobj.get("combinations", [])) @@ -147,7 +146,7 @@ class Authorization(Message): def from_valid_json(cls, jobj): jwk = jobj.get("jwk") if jwk is not None: - jwk = jose.JWK.from_valid_json(jwk) + jwk = other.JWK.from_valid_json(jwk) return cls(recovery_token=jobj.get("recoveryToken"), identifier=jobj.get("identifier"), jwk=jwk) @@ -218,7 +217,7 @@ class AuthorizationRequest(Message): @classmethod def from_valid_json(cls, jobj): return cls(session_id=jobj["sessionID"], - nonce=jose.b64decode(jobj["nonce"]), + nonce=cls._decode_b64jose(jobj["nonce"]), responses=jobj["responses"], signature=other.Signature.from_valid_json(jobj["signature"]), contact=jobj.get("contact", [])) @@ -247,15 +246,6 @@ class Certificate(Message): fields["refresh"] = self.refresh return fields - @classmethod - def _decode_cert(cls, b64der): - return util.ComparableX509(M2Crypto.X509.load_cert_der_string( - jose.b64decode(b64der))) - - @classmethod - def _encode_cert(cls, cert): - return jose.b64encode(cert.as_der()) - @classmethod def from_valid_json(cls, jobj): return cls(certificate=cls._decode_cert(jobj["certificate"]), @@ -307,15 +297,6 @@ class CertificateRequest(Message): """ return self.signature.verify(self.csr.as_der()) - @classmethod - def _decode_csr(cls, b64der): - return util.ComparableX509(M2Crypto.X509.load_request_der_string( - jose.b64decode(b64der))) - - @classmethod - def _encode_csr(cls, csr): - return jose.b64encode(csr.as_der()) - def _fields_to_json(self): return { "csr": self._encode_csr(self.csr), @@ -437,15 +418,6 @@ class RevocationRequest(Message): """ return self.signature.verify(self.certificate.as_der()) - @classmethod - def _decode_cert(cls, b64der): - return util.ComparableX509(M2Crypto.X509.load_cert_der_string( - jose.b64decode(b64der))) - - @classmethod - def _encode_cert(cls, cert): - return jose.b64encode(cert.as_der()) - def _fields_to_json(self): return { "certificate": self._encode_cert(self.certificate), diff --git a/letsencrypt/acme/messages_test.py b/letsencrypt/acme/messages_test.py index 9ec4c5ba1..0bc793eeb 100644 --- a/letsencrypt/acme/messages_test.py +++ b/letsencrypt/acme/messages_test.py @@ -146,7 +146,7 @@ class ChallengeRequestTest(unittest.TestCase): class AuthorizationTest(unittest.TestCase): def setUp(self): - jwk = jose.JWK(key=KEY.publickey()) + jwk = other.JWK(key=KEY.publickey()) from letsencrypt.acme.messages import Authorization self.msg = Authorization(recovery_token='tok', jwk=jwk, @@ -192,7 +192,7 @@ class AuthorizationRequestTest(unittest.TestCase): ] self.contact = ["mailto:cert-admin@example.com", "tel:+12025551212"] signature = other.Signature( - alg='RS256', jwk=jose.JWK(key=KEY.publickey()), + alg='RS256', jwk=other.JWK(key=KEY.publickey()), sig='-v\xd8\xc2\xa3\xba0\xd6\x92\x16\xb5.\xbe\xa1[\x04\xbe' '\x1b\xa1X\xd2)\x18\x94\x8f\xd7\xd0\xc0\xbbcI`W\xdf v' '\xe4\xed\xe8\x03J\xe8\xc8l#\x10<\x96\xd2\xcdr\xa3' '\x1b\xa1\xf5!f\xef\xc64\xb6\x13') self.nonce = '\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9' - self.jwk = jose.JWK(key=RSA256_KEY.publickey()) + + from letsencrypt.acme.other import JWK + self.jwk = JWK(key=RSA256_KEY.publickey()) b64sig = ('SUPYKucUnhlTt8_sMxLiigOYdf_wlOLXPI-o7aRLTsOquVjDd6r' 'AX9AFJHk-bCMQPJbSzXKjG6H1IWbvxjS2Ew') @@ -81,6 +132,14 @@ class SigatureTest(unittest.TestCase): self.assertEqual( self.signature, Signature.from_valid_json(self.jsig_from)) + def test_from_json_non_schema_errors(self): + from letsencrypt.acme.other import Signature + jwk = self.jwk.to_json() + self.assertRaises(errors.ValidationError, Signature.from_valid_json, { + 'alg': 'RS256', 'sig': 'x', 'nonce': '', 'jwk': jwk}) + self.assertRaises(errors.ValidationError, Signature.from_valid_json, { + 'alg': 'RS256', 'sig': '', 'nonce': 'x', 'jwk': jwk}) + if __name__ == '__main__': unittest.main() diff --git a/letsencrypt/acme/util.py b/letsencrypt/acme/util.py index d7ea7fac6..ac7bf3874 100644 --- a/letsencrypt/acme/util.py +++ b/letsencrypt/acme/util.py @@ -1,11 +1,14 @@ """ACME utilities.""" +import binascii import json import pkg_resources +import M2Crypto.X509 import zope.interface from letsencrypt.acme import errors from letsencrypt.acme import interfaces +from letsencrypt.acme import jose class ComparableX509(object): # pylint: disable=too-few-public-methods @@ -93,6 +96,54 @@ class ACMEObject(ImmutableMap): # pylint: disable=too-few-public-methods """Deserialize from valid JSON object.""" raise NotImplementedError() + @classmethod + def _decode_b64jose(cls, data, size=None, minimum=False): + try: + decoded = jose.b64decode(data) + except TypeError: + raise errors.ValidationError() + + if size is not None and ((not minimum and len(decoded) != size) + or (minimum and len(decoded) < size)): + raise errors.ValidationError() + + return decoded + + @classmethod + def _encode_hex16(cls, data): + return binascii.hexlify(data) + + @classmethod + def _decode_hex16(cls, data, size=None, minimum=False): + if size is not None and ((not minimum and len(data) != size * 2) + or (minimum and len(data) < size * 2)): + raise errors.ValidationError() + return binascii.unhexlify(data) + + @classmethod + def _encode_cert(cls, cert): + return jose.b64encode(cert.as_der()) + + @classmethod + def _decode_cert(cls, b64der): + try: + return ComparableX509(M2Crypto.X509.load_cert_der_string( + cls._decode_b64jose(b64der))) + except M2Crypto.X509.X509Error: + raise errors.ValidationError() + + @classmethod + def _encode_csr(cls, csr): + return cls._encode_cert(csr) + + @classmethod + def _decode_csr(cls, b64der): + try: + return ComparableX509(M2Crypto.X509.load_request_der_string( + cls._decode_b64jose(b64der))) + except M2Crypto.X509.X509Error: + raise errors.ValidationError() + class TypedACMEObject(ACMEObject): """ACME object with type (immutable).""" diff --git a/letsencrypt/acme/util_test.py b/letsencrypt/acme/util_test.py index 65c52f7ff..c64c5dfca 100644 --- a/letsencrypt/acme/util_test.py +++ b/letsencrypt/acme/util_test.py @@ -1,14 +1,23 @@ """Tests for letsencrypt.acme.util.""" import functools import json +import os +import pkg_resources import unittest +import M2Crypto.X509 import zope.interface from letsencrypt.acme import errors from letsencrypt.acme import interfaces +CERT = M2Crypto.X509.load_cert(pkg_resources.resource_filename( + 'letsencrypt.client.tests', os.path.join('testdata', 'cert.pem'))) +CSR = M2Crypto.X509.load_request(pkg_resources.resource_filename( + 'letsencrypt.client.tests', os.path.join('testdata', 'csr.pem'))) + + class DumpIJSONSerializableTest(unittest.TestCase): """Tests for letsencrypt.acme.util.dump_ijsonserializable.""" @@ -100,6 +109,93 @@ class ImmutableMapTest(unittest.TestCase): self.assertEqual("B(x='foo', y='bar')", repr(self.B(x='foo', y='bar'))) +class ACMEObjectTest(unittest.TestCase): + """Tests for letsencrypt.acme.util.ACMEObject.""" + # pylint: disable=protected-access + + def setUp(self): + self.b64_cert = ( + 'MIIB3jCCAYigAwIBAgICBTkwDQYJKoZIhvcNAQELBQAwdzELMAkGA1UEBhM' + 'CVVMxETAPBgNVBAgMCE1pY2hpZ2FuMRIwEAYDVQQHDAlBbm4gQXJib3IxKz' + 'ApBgNVBAoMIlVuaXZlcnNpdHkgb2YgTWljaGlnYW4gYW5kIHRoZSBFRkYxF' + 'DASBgNVBAMMC2V4YW1wbGUuY29tMB4XDTE0MTIxMTIyMzQ0NVoXDTE0MTIx' + 'ODIyMzQ0NVowdzELMAkGA1UEBhMCVVMxETAPBgNVBAgMCE1pY2hpZ2FuMRI' + 'wEAYDVQQHDAlBbm4gQXJib3IxKzApBgNVBAoMIlVuaXZlcnNpdHkgb2YgTW' + 'ljaGlnYW4gYW5kIHRoZSBFRkYxFDASBgNVBAMMC2V4YW1wbGUuY29tMFwwD' + 'QYJKoZIhvcNAQEBBQADSwAwSAJBAKx1c7RR7R_drnBSQ_zfx1vQLHUbFLh1' + 'AQQQ5R8DZUXd36efNK79vukFhN9HFoHZiUvOjm0c-pVE6K-EdE_twuUCAwE' + 'AATANBgkqhkiG9w0BAQsFAANBAC24z0IdwIVKSlntksllvr6zJepBH5fMnd' + 'fk3XJp10jT6VE-14KNtjh02a56GoraAvJAT5_H67E8GvJ_ocNnB_o' + ) + self.b64_csr = ( + 'MIIBXTCCAQcCAQAweTELMAkGA1UEBhMCVVMxETAPBgNVBAgMCE1pY2hpZ2F' + 'uMRIwEAYDVQQHDAlBbm4gQXJib3IxDDAKBgNVBAoMA0VGRjEfMB0GA1UECw' + 'wWVW5pdmVyc2l0eSBvZiBNaWNoaWdhbjEUMBIGA1UEAwwLZXhhbXBsZS5jb' + '20wXDANBgkqhkiG9w0BAQEFAANLADBIAkEArHVztFHtH92ucFJD_N_HW9As' + 'dRsUuHUBBBDlHwNlRd3fp580rv2-6QWE30cWgdmJS86ObRz6lUTor4R0T-3' + 'C5QIDAQABoCkwJwYJKoZIhvcNAQkOMRowGDAWBgNVHREEDzANggtleGFtcG' + 'xlLmNvbTANBgkqhkiG9w0BAQsFAANBAHJH_O6BtC9aGzEVCMGOZ7z9iIRHW' + 'Szr9x_bOzn7hLwsbXPAgO1QxEwL-X-4g20Gn9XBE1N9W6HCIEut2d8wACg' + ) + + def test_decode_b64_jose_padding_error(self): + from letsencrypt.acme.util import ACMEObject + self.assertRaises( + errors.ValidationError, ACMEObject._decode_b64jose, 'x') + + def test_decode_b64_jose_size(self): + from letsencrypt.acme.util import ACMEObject + self.assertEqual('foo', ACMEObject._decode_b64jose('Zm9v', size=3)) + self.assertRaises( + errors.ValidationError, ACMEObject._decode_b64jose, 'Zm9v', size=2) + self.assertRaises( + errors.ValidationError, ACMEObject._decode_b64jose, 'Zm9v', size=4) + + def test_decode_b64_jose_minimum_size(self): + from letsencrypt.acme.util import ACMEObject + self.assertEqual( + 'foo', ACMEObject._decode_b64jose('Zm9v', size=3, minimum=True)) + self.assertEqual( + 'foo', ACMEObject._decode_b64jose('Zm9v', size=2, minimum=True)) + self.assertRaises(errors.ValidationError, ACMEObject._decode_b64jose, + 'Zm9v', size=4, minimum=True) + + def test_encode_hex16(self): + from letsencrypt.acme.util import ACMEObject + self.assertEqual('666f6f', ACMEObject._encode_hex16('foo')) + + def test_decode_hex16(self): + from letsencrypt.acme.util import ACMEObject + self.assertEqual('foo', ACMEObject._decode_hex16('666f6f')) + + def test_decode_hex16_minimum_size(self): + from letsencrypt.acme.util import ACMEObject + self.assertEqual( + 'foo', ACMEObject._decode_hex16('666f6f', size=3, minimum=True)) + self.assertEqual( + 'foo', ACMEObject._decode_hex16('666f6f', size=2, minimum=True)) + self.assertRaises(errors.ValidationError, ACMEObject._decode_hex16, + '666f6f', size=4, minimum=True) + + def test_encode_cert(self): + from letsencrypt.acme.util import ACMEObject + self.assertEqual(self.b64_cert, ACMEObject._encode_cert(CERT)) + + def test_decode_cert(self): + from letsencrypt.acme.util import ACMEObject + self.assertEqual(CERT, ACMEObject._decode_cert(self.b64_cert)) + self.assertRaises(errors.ValidationError, ACMEObject._decode_cert, '') + + def test_encode_csr(self): + from letsencrypt.acme.util import ACMEObject + self.assertEqual(self.b64_csr, ACMEObject._encode_csr(CSR)) + + def test_decode_csr(self): + from letsencrypt.acme.util import ACMEObject + self.assertEqual(CSR, ACMEObject._decode_csr(self.b64_csr)) + self.assertRaises(errors.ValidationError, ACMEObject._decode_csr, '') + + class TypedACMEObjectTest(unittest.TestCase): def setUp(self): diff --git a/letsencrypt/client/network.py b/letsencrypt/client/network.py index bdba746b0..b61a8a2f8 100644 --- a/letsencrypt/client/network.py +++ b/letsencrypt/client/network.py @@ -5,6 +5,7 @@ import time import requests +from letsencrypt.acme import errors as acme_errors from letsencrypt.acme import messages from letsencrypt.client import errors @@ -36,8 +37,8 @@ class Network(object): :returns: Server response message. :rtype: :class:`letsencrypt.acme.messages.Message` - :raises TypeError: if `msg` is not JSON serializable - :raises jsonschema.ValidationError: if not valid ACME message + :raises letsencrypt.acme.errors.ValidationError: if `msg` is not + valid serializable ACME JSON message. :raises errors.LetsEncryptClientError: in case of connection error or if response from server is not a valid ACME message. @@ -53,7 +54,12 @@ class Network(object): raise errors.LetsEncryptClientError( 'Sending ACME message to server has failed: %s' % error) - return messages.Message.from_json(response.json(), validate=True) + json_string = response.json() + try: + return messages.Message.from_json(json_string) + except acme_errors.ValidationError as error: + logging.error(json_string) + raise # TODO def send_and_receive_expected(self, msg, expected): """Send ACME message to server and return expected message.