diff --git a/.pylintrc b/.pylintrc index 9cd5a8781..2970f2bc9 100644 --- a/.pylintrc +++ b/.pylintrc @@ -19,7 +19,7 @@ persistent=yes # List of plugins (as comma separated values of python modules names) to load, # usually to register additional checkers. -load-plugins= +load-plugins=linter_plugin [MESSAGES CONTROL] diff --git a/MANIFEST.in b/MANIFEST.in index 0c082ea32..bb7efba38 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,4 +1,4 @@ -include README.rst CHANGES.rst +include README.rst CHANGES.rst linter_plugin.py recursive-include letsencrypt *.json recursive-include letsencrypt *.sh recursive-include letsencrypt *.conf diff --git a/docs/api/acme/errors.rst b/docs/api/acme/errors.rst new file mode 100644 index 000000000..53132bd15 --- /dev/null +++ b/docs/api/acme/errors.rst @@ -0,0 +1,5 @@ +:mod:`letsencrypt.acme.errors` +------------------------------ + +.. automodule:: letsencrypt.acme.errors + :members: diff --git a/docs/api/acme/interfaces.rst b/docs/api/acme/interfaces.rst new file mode 100644 index 000000000..5ed652834 --- /dev/null +++ b/docs/api/acme/interfaces.rst @@ -0,0 +1,5 @@ +:mod:`letsencrypt.acme.interfaces` +---------------------------------- + +.. automodule:: letsencrypt.acme.interfaces + :members: diff --git a/docs/api/acme/jose.rst b/docs/api/acme/jose.rst new file mode 100644 index 000000000..d82dc1f15 --- /dev/null +++ b/docs/api/acme/jose.rst @@ -0,0 +1,5 @@ +:mod:`letsencrypt.acme.jose` +---------------------------- + +.. automodule:: letsencrypt.acme.jose + :members: diff --git a/docs/api/acme/messages.rst b/docs/api/acme/messages.rst new file mode 100644 index 000000000..d231f9c52 --- /dev/null +++ b/docs/api/acme/messages.rst @@ -0,0 +1,5 @@ +:mod:`letsencrypt.acme.messages` +-------------------------------- + +.. automodule:: letsencrypt.acme.messages + :members: diff --git a/docs/api/acme/other.rst b/docs/api/acme/other.rst new file mode 100644 index 000000000..8372e3028 --- /dev/null +++ b/docs/api/acme/other.rst @@ -0,0 +1,5 @@ +:mod:`letsencrypt.acme.other` +----------------------------- + +.. automodule:: letsencrypt.acme.other + :members: diff --git a/docs/api/acme/util.rst b/docs/api/acme/util.rst new file mode 100644 index 000000000..960cf8882 --- /dev/null +++ b/docs/api/acme/util.rst @@ -0,0 +1,5 @@ +:mod:`letsencrypt.acme.util` +---------------------------- + +.. automodule:: letsencrypt.acme.util + :members: diff --git a/docs/api/client/acme.rst b/docs/api/client/acme.rst deleted file mode 100644 index 7773fae04..000000000 --- a/docs/api/client/acme.rst +++ /dev/null @@ -1,5 +0,0 @@ -:mod:`letsencrypt.client.acme` ------------------------------- - -.. automodule:: letsencrypt.client.acme - :members: diff --git a/letsencrypt/acme/__init__.py b/letsencrypt/acme/__init__.py new file mode 100644 index 000000000..69418608b --- /dev/null +++ b/letsencrypt/acme/__init__.py @@ -0,0 +1 @@ +"""ACME protocol implementation.""" diff --git a/letsencrypt/acme/errors.py b/letsencrypt/acme/errors.py new file mode 100644 index 000000000..a70271894 --- /dev/null +++ b/letsencrypt/acme/errors.py @@ -0,0 +1,13 @@ +"""ACME errors.""" + +class Error(Exception): + """Generic ACME error.""" + +class ValidationError(Error): + """ACME message validation error.""" + +class UnrecognizedMessageTypeError(ValidationError): + """Unrecognized ACME message type error.""" + +class SchemaValidationError(ValidationError): + """JSON schema ACME message validation error.""" diff --git a/letsencrypt/acme/interfaces.py b/letsencrypt/acme/interfaces.py new file mode 100644 index 000000000..0d9e56495 --- /dev/null +++ b/letsencrypt/acme/interfaces.py @@ -0,0 +1,22 @@ +"""ACME interfaces.""" +import zope.interface + +# pylint: disable=no-self-argument,no-method-argument,no-init,inherit-non-class + + +class IJSONSerializable(zope.interface.Interface): + # pylint: disable=too-few-public-methods + """JSON serializable object.""" + + def to_json(): + """Prepare JSON serializable object. + + :returns: JSON object ready to be serialized. Note, however, that + this might return other + :class:`letsencrypt.acme.interfaces.IJSONSerializable` + objects, that haven't been serialized yet, which is fine as + long as :func:`letsencrypt.acme.util.dump_ijsonserializable` + is used. + :rtype: dict + + """ diff --git a/letsencrypt/acme/jose.py b/letsencrypt/acme/jose.py new file mode 100644 index 000000000..6d2097ba5 --- /dev/null +++ b/letsencrypt/acme/jose.py @@ -0,0 +1,100 @@ +"""JOSE.""" +import base64 +import binascii + +import Crypto.PublicKey.RSA + +from letsencrypt.acme import util + + +def _leading_zeros(arg): + if len(arg) % 2: + return '0' + arg + return arg + + +class JWK(util.JSONDeSerializable, util.ImmutableMap): + # pylint: disable=too-few-public-methods + """JSON Web Key. + + .. todo:: Currently works for RSA public keys only. + + """ + __slots__ = ('key',) + schema = util.load_schema('jwk') + + @classmethod + def _encode_param(cls, param): + """Encode numeric key parameter.""" + return b64encode(binascii.unhexlify( + _leading_zeros(hex(param)[2:].rstrip('L')))) + + @classmethod + def _decode_param(cls, param): + """Decode numeric key parameter.""" + return long(binascii.hexlify(b64decode(param)), 16) + + def to_json(self): + """Serialize to JSON.""" + return { + 'kty': 'RSA', # TODO + 'n': self._encode_param(self.key.n), + 'e': self._encode_param(self.key.e), + } + + @classmethod + def _from_valid_json(cls, jobj): + assert 'RSA' == jobj['kty'] # TODO + return cls(key=Crypto.PublicKey.RSA.construct( + (cls._decode_param(jobj['n']), cls._decode_param(jobj['e'])))) + + +# https://tools.ietf.org/html/draft-ietf-jose-json-web-signature-37#appendix-C +# +# Jose Base64: +# +# - URL-safe Base64 +# +# - padding stripped + + +def b64encode(data): + """JOSE Base64 encode. + + :param data: Data to be encoded. + :type data: str or bytearray + + :returns: JOSE Base64 string. + :rtype: str + + :raises TypeError: if `data` is of incorrect type + + """ + if not isinstance(data, str): + raise TypeError('argument should be str or bytearray') + return base64.urlsafe_b64encode(data).rstrip('=') + + +def b64decode(data): + """JOSE Base64 decode. + + :param data: Base64 string to be decoded. If it's unicode, then + only ASCII characters are allowed. + :type data: str or unicode + + :returns: Decoded data. + + :raises TypeError: if input is of incorrect type + :raises ValueError: if input is unicode with non-ASCII characters + + """ + if isinstance(data, unicode): + try: + data = data.encode('ascii') + except UnicodeEncodeError: + raise ValueError( + 'unicode argument should contain only ASCII characters') + elif not isinstance(data, str): + raise TypeError('argument should be a str or unicode') + + return base64.urlsafe_b64decode(data + '=' * (4 - (len(data) % 4))) diff --git a/letsencrypt/acme/jose_test.py b/letsencrypt/acme/jose_test.py new file mode 100644 index 000000000..a1a872704 --- /dev/null +++ b/letsencrypt/acme/jose_test.py @@ -0,0 +1,120 @@ +"""Tests for letsencrypt.acme.jose.""" +import pkg_resources +import unittest + +import Crypto.PublicKey.RSA + + +RSA256_KEY = Crypto.PublicKey.RSA.importKey(pkg_resources.resource_string( + 'letsencrypt.client.tests', 'testdata/rsa256_key.pem')) +RSA512_KEY = Crypto.PublicKey.RSA.importKey(pkg_resources.resource_string( + 'letsencrypt.client.tests', 'testdata/rsa512_key.pem')) + + +class JWKTest(unittest.TestCase): + """Tests fro letsencrypt.acme.jose.JWK.""" + + def setUp(self): + from letsencrypt.acme.jose import JWK + self.jwk256 = JWK(key=RSA256_KEY.publickey()) + self.jwk256json = { + 'kty': 'RSA', + 'e': 'AQAB', + 'n': 'rHVztFHtH92ucFJD_N_HW9AsdRsUuHUBBBDlHwNlRd3fp5' + '80rv2-6QWE30cWgdmJS86ObRz6lUTor4R0T-3C5Q', + } + self.jwk512 = JWK(key=RSA512_KEY.publickey()) + self.jwk512json = { + 'kty': 'RSA', + 'e': 'AQAB', + 'n': '9LYRcVE3Nr-qleecEcX8JwVDnjeG1X7ucsCasuuZM0e09c' + 'mYuUzxIkMjO_9x4AVcvXXRXPEV-LzWWkfkTlzRMw', + } + + def test_equals(self): + self.assertEqual(self.jwk256, self.jwk256) + self.assertEqual(self.jwk512, self.jwk512) + + def test_not_equals(self): + self.assertNotEqual(self.jwk256, self.jwk512) + self.assertNotEqual(self.jwk512, self.jwk256) + + def test_to_json(self): + self.assertEqual(self.jwk256.to_json(), self.jwk256json) + self.assertEqual(self.jwk512.to_json(), self.jwk512json) + + def test_from_json(self): + from letsencrypt.acme.jose import JWK + self.assertEqual(self.jwk256, JWK.from_json(self.jwk256json)) + # TODO: fix schemata to allow RSA512 + #self.assertEqual(self.jwk512, JWK.from_json(self.jwk512json)) + + +# https://en.wikipedia.org/wiki/Base64#Examples +B64_PADDING_EXAMPLES = { + 'any carnal pleasure.': ('YW55IGNhcm5hbCBwbGVhc3VyZS4', '='), + 'any carnal pleasure': ('YW55IGNhcm5hbCBwbGVhc3VyZQ', '=='), + 'any carnal pleasur': ('YW55IGNhcm5hbCBwbGVhc3Vy', ''), + 'any carnal pleasu': ('YW55IGNhcm5hbCBwbGVhc3U', '='), + 'any carnal pleas': ('YW55IGNhcm5hbCBwbGVhcw', '=='), +} + + +B64_URL_UNSAFE_EXAMPLES = { + chr(251) + chr(239): '--8', + chr(255) * 2: '__8', +} + + +class B64EncodeTest(unittest.TestCase): + """Tests for letsencrypt.acme.jose.b64encode.""" + + @classmethod + def _call(cls, data): + from letsencrypt.acme.jose import b64encode + return b64encode(data) + + def test_unsafe_url(self): + for text, b64 in B64_URL_UNSAFE_EXAMPLES.iteritems(): + self.assertEqual(self._call(text), b64) + + def test_different_paddings(self): + for text, (b64, _) in B64_PADDING_EXAMPLES.iteritems(): + self.assertEqual(self._call(text), b64) + + def test_unicode_fails_with_type_error(self): + self.assertRaises(TypeError, self._call, u'some unicode') + + +class B64DecodeTest(unittest.TestCase): + """Tests for letsencrypt.acme.jose.b64decode.""" + + @classmethod + def _call(cls, data): + from letsencrypt.acme.jose import b64decode + return b64decode(data) + + def test_unsafe_url(self): + for text, b64 in B64_URL_UNSAFE_EXAMPLES.iteritems(): + self.assertEqual(self._call(b64), text) + + def test_input_without_padding(self): + for text, (b64, _) in B64_PADDING_EXAMPLES.iteritems(): + self.assertEqual(self._call(b64), text) + + def test_input_with_padding(self): + for text, (b64, pad) in B64_PADDING_EXAMPLES.iteritems(): + self.assertEqual(self._call(b64 + pad), text) + + def test_unicode_with_ascii(self): + self.assertEqual(self._call(u'YQ'), 'a') + + def test_non_ascii_unicode_fails(self): + self.assertRaises(ValueError, self._call, u'\u0105') + + def test_type_error_no_unicode_or_str(self): + self.assertRaises(TypeError, self._call, object()) + + +if __name__ == '__main__': + unittest.main() diff --git a/letsencrypt/acme/messages.py b/letsencrypt/acme/messages.py new file mode 100644 index 000000000..a345be9f9 --- /dev/null +++ b/letsencrypt/acme/messages.py @@ -0,0 +1,500 @@ +"""ACME protocol messages.""" +import M2Crypto +import zope.interface + +from letsencrypt.acme import errors +from letsencrypt.acme import interfaces +from letsencrypt.acme import jose +from letsencrypt.acme import other +from letsencrypt.acme import util + + +class Message(util.JSONDeSerializable, util.ImmutableMap): + """ACME message. + + Messages are considered immutable. + + """ + zope.interface.implements(interfaces.IJSONSerializable) + + acme_type = NotImplemented + """ACME message "type" field. Subclasses must override.""" + + TYPES = {} + """Message types registered for JSON deserialization""" + + @classmethod + def register(cls, msg_cls): + """Register class for JSON deserialization.""" + cls.TYPES[msg_cls.acme_type] = msg_cls + return msg_cls + + def to_json(self): + """Get JSON serializable object. + + :returns: Serializable JSON object representing ACME message. + :meth:`validate` will almost certainly not work, due to reasons + explained in :class:`letsencrypt.acme.interfaces.IJSONSerializable`. + :rtype: dict + + """ + jobj = self._fields_to_json() + jobj["type"] = self.acme_type + return jobj + + def _fields_to_json(self): + """Prepare ACME message fields for JSON serialiazation. + + Subclasses must override this method. + + :returns: Serializable JSON object containg all ACME message fields + apart from "type". + :rtype: dict + + """ + raise NotImplementedError() + + @classmethod + def get_msg_cls(cls, jobj): + """Get the registered class for ``jobj``.""" + if cls in cls.TYPES.itervalues(): + # cls is already registered Message type, force to use it + # so that, e.g Revocation.from_json(jobj) fails if + # jobj["type"] != "revocation". + return cls + + if not isinstance(jobj, dict): + raise errors.ValidationError( + "{0} is not a dictionary object".format(jobj)) + try: + msg_type = jobj["type"] + except KeyError: + raise errors.ValidationError("missing type field") + + try: + msg_cls = cls.TYPES[msg_type] + except KeyError: + raise errors.UnrecognizedMessageTypeError(msg_type) + + return msg_cls + + @classmethod + def from_json(cls, jobj, validate=True): + """Deserialize validated ACME message from JSON string. + + :param str jobj: JSON object. + :param bool validate: Validate against schema before deserializing. + Useful if :class:`JWK` is part of already validated json object. + + :raises letsencrypt.acme.errors.ValidationError: if validation + was unsuccessful + + :returns: Valid ACME message. + :rtype: subclass of :class:`Message` + + """ + msg_cls = cls.get_msg_cls(jobj) + if validate: + msg_cls.validate_json(jobj) + # pylint: disable=protected-access + return msg_cls._from_valid_json(jobj) + + +@Message.register # pylint: disable=too-few-public-methods +class Challenge(Message): + """ACME "challenge" message.""" + acme_type = "challenge" + schema = util.load_schema(acme_type) + __slots__ = ("session_id", "nonce", "challenges", "combinations") + + def _fields_to_json(self): + fields = { + "sessionID": self.session_id, + "nonce": jose.b64encode(self.nonce), + "challenges": self.challenges, + } + if self.combinations: + fields["combinations"] = self.combinations + return fields + + @classmethod + def _from_valid_json(cls, jobj): + return cls(session_id=jobj["sessionID"], + nonce=jose.b64decode(jobj["nonce"]), + challenges=jobj["challenges"], + combinations=jobj.get("combinations", [])) + + +@Message.register # pylint: disable=too-few-public-methods +class ChallengeRequest(Message): + """ACME "challengeRequest" message. + + :ivar str identifier: Domain name. + + """ + acme_type = "challengeRequest" + schema = util.load_schema(acme_type) + __slots__ = ("identifier",) + + def _fields_to_json(self): + return { + "identifier": self.identifier, + } + + @classmethod + def _from_valid_json(cls, jobj): + return cls(identifier=jobj["identifier"]) + + +@Message.register # pylint: disable=too-few-public-methods +class Authorization(Message): + """ACME "authorization" message.""" + acme_type = "authorization" + schema = util.load_schema(acme_type) + __slots__ = ("recovery_token", "identifier", "jwk") + + def _fields_to_json(self): + fields = {} + if self.recovery_token is not None: + fields["recoveryToken"] = self.recovery_token + if self.identifier is not None: + fields["identifier"] = self.identifier + if self.jwk is not None: + fields["jwk"] = self.jwk + return fields + + @classmethod + def _from_valid_json(cls, jobj): + jwk = jobj.get("jwk") + if jwk is not None: + jwk = jose.JWK.from_json(jwk, validate=False) + return cls(recovery_token=jobj.get("recoveryToken"), + identifier=jobj.get("identifier"), jwk=jwk) + + +@Message.register +class AuthorizationRequest(Message): + """ACME "authorizationRequest" message. + + :ivar str session_id: "sessionID" from the server challenge + :ivar str nonce: Nonce from the server challenge + :ivar list responses: List of completed challenges + :ivar signature: Signature (:class:`letsencrypt.acme.other.Signature`). + :ivar contact: TODO + + """ + acme_type = "authorizationRequest" + schema = util.load_schema(acme_type) + __slots__ = ("session_id", "nonce", "responses", "signature", "contact") + + @classmethod + def create(cls, name, key, sig_nonce=None, **kwargs): + """Create signed "authorizationRequest". + + :param str name: Hostname + + :param key: Key used for signing. + :type key: :class:`Crypto.PublicKey.RSA` + + :param str sig_nonce: Nonce used for signature. Useful for testing. + :kwargs: Any other arguments accepted by the class constructor. + + :returns: Signed "authorizationRequest" ACME message. + :rtype: :class:`AuthorizationRequest` + + """ + # pylint: disable=too-many-arguments + signature = other.Signature.from_msg( + name + kwargs["nonce"], key, sig_nonce) + return cls( + signature=signature, contact=kwargs.pop("contact", []), **kwargs) + + def verify(self, name): + """Verify signature. + + .. warning:: Caller must check that the public key encoded in the + :attr:`signature`'s :class:`letsencrypt.acme.jose.JWK` object + is the correct key for a given context. + + :param str name: Hostname + + :returns: True iff ``signature`` can be verified, False otherwise. + :rtype: bool + + """ + return self.signature.verify(name + self.nonce) + + def _fields_to_json(self): + fields = { + "sessionID": self.session_id, + "nonce": jose.b64encode(self.nonce), + "responses": self.responses, + "signature": self.signature, + } + if self.contact: + fields["contact"] = self.contact + return fields + + @classmethod + def _from_valid_json(cls, jobj): + return cls(session_id=jobj["sessionID"], + nonce=jose.b64decode(jobj["nonce"]), + responses=jobj["responses"], + signature=other.Signature.from_json( + jobj["signature"], validate=False), + contact=jobj.get("contact", [])) + + +@Message.register # pylint: disable=too-few-public-methods +class Certificate(Message): + """ACME "certificate" message. + + :ivar certificate: The certificate (:class:`M2Crypto.X509.X509` + wrapped in :class:`letsencrypt.acme.util.ComparableX509`). + + :ivar list chain: Chain of certificates (:class:`M2Crypto.X509.X509` + wrapped in :class:`letsencrypt.acme.util.ComparableX509` ). + + """ + acme_type = "certificate" + schema = util.load_schema(acme_type) + __slots__ = ("certificate", "chain", "refresh") + + def _fields_to_json(self): + fields = {"certificate": self._encode_cert(self.certificate)} + if self.chain: + fields["chain"] = [self._encode_cert(cert) for cert in self.chain] + if self.refresh is not None: + fields["refresh"] = self.refresh + return fields + + @classmethod + def _decode_cert(cls, b64der): + return util.ComparableX509(M2Crypto.X509.load_cert_der_string( + jose.b64decode(b64der))) + + @classmethod + def _encode_cert(cls, cert): + return jose.b64encode(cert.as_der()) + + @classmethod + def _from_valid_json(cls, jobj): + return cls(certificate=cls._decode_cert(jobj["certificate"]), + chain=[cls._decode_cert(cert) for cert in + jobj.get("chain", [])], + refresh=jobj.get("refresh")) + + +@Message.register +class CertificateRequest(Message): + """ACME "certificateRequest" message. + + :ivar csr: Certificate Signing Request (:class:`M2Crypto.X509.Request` + wrapped in :class:`letsencrypt.acme.util.ComparableX509`. + :ivar signature: Signature (:class:`letsencrypt.acme.other.Signature`). + + """ + acme_type = "certificateRequest" + schema = util.load_schema(acme_type) + __slots__ = ("csr", "signature") + + @classmethod + def create(cls, key, sig_nonce=None, **kwargs): + """Create signed "certificateRequest". + + :param key: Key used for signing. + :type key: :class:`Crypto.PublicKey.RSA` + + :param str sig_nonce: Nonce used for signature. Useful for testing. + :kwargs: Any other arguments accepted by the class constructor. + + :returns: Signed "certificateRequest" ACME message. + :rtype: :class:`CertificateRequest` + + """ + return cls(signature=other.Signature.from_msg( + kwargs["csr"].as_der(), key, sig_nonce), **kwargs) + + def verify(self): + """Verify signature. + + .. warning:: Caller must check that the public key encoded in the + :attr:`signature`'s :class:`letsencrypt.acme.jose.JWK` object + is the correct key for a given context. + + :returns: True iff ``signature`` can be verified, False otherwise. + :rtype: bool + + """ + return self.signature.verify(self.csr.as_der()) + + @classmethod + def _decode_csr(cls, b64der): + return util.ComparableX509(M2Crypto.X509.load_request_der_string( + jose.b64decode(b64der))) + + @classmethod + def _encode_csr(cls, csr): + return jose.b64encode(csr.as_der()) + + def _fields_to_json(self): + return { + "csr": self._encode_csr(self.csr), + "signature": self.signature, + } + + @classmethod + def _from_valid_json(cls, jobj): + return cls(csr=cls._decode_csr(jobj["csr"]), + signature=other.Signature.from_json( + jobj["signature"], validate=False)) + + +@Message.register # pylint: disable=too-few-public-methods +class Defer(Message): + """ACME "defer" message.""" + acme_type = "defer" + schema = util.load_schema(acme_type) + __slots__ = ("token", "interval", "message") + + def _fields_to_json(self): + fields = {"token": self.token} + if self.interval is not None: + fields["interval"] = self.interval + if self.message is not None: + fields["message"] = self.message + return fields + + @classmethod + def _from_valid_json(cls, jobj): + return cls(token=jobj["token"], interval=jobj.get("interval"), + message=jobj.get("message")) + + +@Message.register # pylint: disable=too-few-public-methods +class Error(Message): + """ACME "error" message.""" + acme_type = "error" + schema = util.load_schema(acme_type) + __slots__ = ("error", "message", "more_info") + + CODES = { + "malformed": "The request message was malformed", + "unauthorized": "The client lacks sufficient authorization", + "serverInternal": "The server experienced an internal error", + "notSupported": "The request type is not supported", + "unknown": "The server does not recognize an ID/token in the request", + "badCSR": "The CSR is unacceptable (e.g., due to a short key)", + } + + def _fields_to_json(self): + fields = {"error": self.error} + if self.message is not None: + fields["message"] = self.message + if self.more_info is not None: + fields["moreInfo"] = self.more_info + return fields + + @classmethod + def _from_valid_json(cls, jobj): + return cls(error=jobj["error"], message=jobj.get("message"), + more_info=jobj.get("moreInfo")) + + +@Message.register # pylint: disable=too-few-public-methods +class Revocation(Message): + """ACME "revocation" message.""" + acme_type = "revocation" + schema = util.load_schema(acme_type) + __slots__ = () + + def _fields_to_json(self): + return {} + + @classmethod + def _from_valid_json(cls, jobj): + return cls() + + +@Message.register +class RevocationRequest(Message): + """ACME "revocationRequest" message. + + :ivar certificate: Certificate (:class:`M2Crypto.X509.X509` + wrapped in :class:`letsencrypt.acme.util.ComparableX509`). + :ivar signature: Signature (:class:`letsencrypt.acme.other.Signature`). + + """ + acme_type = "revocationRequest" + schema = util.load_schema(acme_type) + __slots__ = ("certificate", "signature") + + @classmethod + def create(cls, key, sig_nonce=None, **kwargs): + """Create signed "revocationRequest". + + :param key: Key used for signing. + :type key: :class:`Crypto.PublicKey.RSA` + + :param str sig_nonce: Nonce used for signature. Useful for testing. + :kwargs: Any other arguments accepted by the class constructor. + + :returns: Signed "revocationRequest" ACME message. + :rtype: :class:`RevocationRequest` + + """ + return cls(signature=other.Signature.from_msg( + kwargs["certificate"].as_der(), key, sig_nonce), **kwargs) + + def verify(self): + """Verify signature. + + .. warning:: Caller must check that the public key encoded in the + :attr:`signature`'s :class:`letsencrypt.acme.jose.JWK` object + is the correct key for a given context. + + :returns: True iff ``signature`` can be verified, False otherwise. + :rtype: bool + + """ + return self.signature.verify(self.certificate.as_der()) + + @classmethod + def _decode_cert(cls, b64der): + return util.ComparableX509(M2Crypto.X509.load_cert_der_string( + jose.b64decode(b64der))) + + @classmethod + def _encode_cert(cls, cert): + return jose.b64encode(cert.as_der()) + + def _fields_to_json(self): + return { + "certificate": self._encode_cert(self.certificate), + "signature": self.signature, + } + + @classmethod + def _from_valid_json(cls, jobj): + return cls(certificate=cls._decode_cert(jobj["certificate"]), + signature=other.Signature.from_json( + jobj["signature"], validate=False)) + + +@Message.register # pylint: disable=too-few-public-methods +class StatusRequest(Message): + """ACME "statusRequest" message. + + :ivar unicode token: Token provided in ACME "defer" message. + + """ + acme_type = "statusRequest" + schema = util.load_schema(acme_type) + __slots__ = ("token",) + + def _fields_to_json(self): + return {"token": self.token} + + @classmethod + def _from_valid_json(cls, jobj): + return cls(token=jobj["token"]) diff --git a/letsencrypt/acme/messages_test.py b/letsencrypt/acme/messages_test.py new file mode 100644 index 000000000..018854225 --- /dev/null +++ b/letsencrypt/acme/messages_test.py @@ -0,0 +1,484 @@ +"""Tests for letsencrypt.acme.messages.""" +import pkg_resources +import unittest + +import Crypto.PublicKey.RSA +import M2Crypto.X509 +import mock + +from letsencrypt.acme import errors +from letsencrypt.acme import jose +from letsencrypt.acme import other +from letsencrypt.acme import util + + +KEY = Crypto.PublicKey.RSA.importKey(pkg_resources.resource_string( + 'letsencrypt.client.tests', 'testdata/rsa256_key.pem')) +CERT = util.ComparableX509(M2Crypto.X509.load_cert( + pkg_resources.resource_filename( + 'letsencrypt.client.tests', 'testdata/cert.pem'))) +CSR = util.ComparableX509(M2Crypto.X509.load_request( + pkg_resources.resource_filename( + 'letsencrypt.client.tests', 'testdata/csr.pem'))) + + +class MessageTest(unittest.TestCase): + """Tests for letsencrypt.acme.messages.Message.""" + + def setUp(self): + # pylint: disable=missing-docstring,too-few-public-methods + from letsencrypt.acme.messages import Message + class TestMessage(Message): + acme_type = 'test' + schema = { + 'type': 'object', + 'properties': { + 'price': {'type': 'number'}, + 'name': {'type': 'string'}, + }, + } + + @classmethod + def _from_valid_json(cls, jobj): + return jobj + + def _fields_to_json(self): + return {'foo': 'bar'} + + self.msg_cls = TestMessage + + def test_to_json(self): + self.assertEqual(self.msg_cls().to_json(), { + 'type': 'test', + 'foo': 'bar', + }) + + def test_fields_to_json_not_implemented(self): + from letsencrypt.acme.messages import Message + # pylint: disable=protected-access + self.assertRaises(NotImplementedError, Message()._fields_to_json) + + @classmethod + def _from_json(cls, jobj, validate=True): + from letsencrypt.acme.messages import Message + return Message.from_json(jobj, validate) + + def test_from_json_non_dict_fails(self): + self.assertRaises(errors.ValidationError, self._from_json, []) + + def test_from_json_dict_no_type_fails(self): + self.assertRaises(errors.ValidationError, self._from_json, {}) + + def test_from_json_unknown_type_fails(self): + self.assertRaises(errors.UnrecognizedMessageTypeError, + self._from_json, {'type': 'bar'}) + + @mock.patch('letsencrypt.acme.messages.Message.TYPES') + def test_from_json_validate_errors(self, types): + types.__getitem__.side_effect = lambda x: {'foo': self.msg_cls}[x] + self.assertRaises(errors.SchemaValidationError, + self._from_json, {'type': 'foo', 'price': 'asd'}) + + @mock.patch('letsencrypt.acme.messages.Message.TYPES') + def test_from_json_valid_returns_cls(self, types): + types.__getitem__.side_effect = lambda x: {'foo': self.msg_cls}[x] + self.assertEqual(self._from_json({'type': 'foo'}, validate=False), + {'type': 'foo'}) + + +class ChallengeTest(unittest.TestCase): + + def setUp(self): + challenges = [ + {'type': 'simpleHttps', 'token': 'IlirfxKKXAsHtmzK29Pj8A'}, + {'type': 'dns', 'token': 'DGyRejmCefe7v4NfDGDKfA'}, + {'type': 'recoveryToken'}, + ] + combinations = [[0, 2], [1, 2]] + + from letsencrypt.acme.messages import Challenge + self.msg = Challenge( + session_id='aefoGaavieG9Wihuk2aufai3aeZ5EeW4', + nonce='\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9', + challenges=challenges, combinations=combinations) + + self.jmsg = { + 'type': 'challenge', + 'sessionID': 'aefoGaavieG9Wihuk2aufai3aeZ5EeW4', + 'nonce': '7Nbyb1lI6xPVI3Hg3aKSqQ', + 'challenges': challenges, + 'combinations': combinations, + } + + def test_to_json(self): + self.assertEqual(self.msg.to_json(), self.jmsg) + + def test_from_json(self): + from letsencrypt.acme.messages import Challenge + self.assertEqual(Challenge.from_json(self.jmsg), self.msg) + + def test_json_without_optionals(self): + del self.jmsg['combinations'] + + from letsencrypt.acme.messages import Challenge + msg = Challenge.from_json(self.jmsg) + + self.assertEqual(msg.combinations, []) + self.assertEqual(msg.to_json(), self.jmsg) + + +class ChallengeRequestTest(unittest.TestCase): + + def setUp(self): + from letsencrypt.acme.messages import ChallengeRequest + self.msg = ChallengeRequest(identifier='example.com') + + self.jmsg = { + 'type': 'challengeRequest', + 'identifier': 'example.com', + } + + def test_to_json(self): + self.assertEqual(self.msg.to_json(), self.jmsg) + + def test_from_json(self): + from letsencrypt.acme.messages import ChallengeRequest + self.assertEqual(ChallengeRequest.from_json(self.jmsg), self.msg) + + +class AuthorizationTest(unittest.TestCase): + + def setUp(self): + jwk = jose.JWK(key=KEY.publickey()) + + from letsencrypt.acme.messages import Authorization + self.msg = Authorization(recovery_token='tok', jwk=jwk, + identifier='example.com') + + self.jmsg = { + 'type': 'authorization', + 'recoveryToken': 'tok', + 'identifier': 'example.com', + 'jwk': jwk, + } + + def test_to_json(self): + self.assertEqual(self.msg.to_json(), self.jmsg) + + def test_from_json(self): + self.jmsg['jwk'] = self.jmsg['jwk'].to_json() + + from letsencrypt.acme.messages import Authorization + self.assertEqual(Authorization.from_json(self.jmsg), self.msg) + + def test_json_without_optionals(self): + del self.jmsg['recoveryToken'] + del self.jmsg['identifier'] + del self.jmsg['jwk'] + + from letsencrypt.acme.messages import Authorization + msg = Authorization.from_json(self.jmsg) + + self.assertTrue(msg.recovery_token is None) + self.assertTrue(msg.identifier is None) + self.assertTrue(msg.jwk is None) + self.assertEqual(self.jmsg, msg.to_json()) + + +class AuthorizationRequestTest(unittest.TestCase): + + def setUp(self): + self.responses = [ + {'type': 'simpleHttps', 'path': 'Hf5GrX4Q7EBax9hc2jJnfw'}, + None, # null + {'type': 'recoveryToken', 'token': '23029d88d9e123e'}, + ] + self.contact = ["mailto:cert-admin@example.com", "tel:+12025551212"] + signature = other.Signature( + alg='RS256', jwk=jose.JWK(key=KEY.publickey()), + sig='-v\xd8\xc2\xa3\xba0\xd6\x92\x16\xb5.\xbe\xa1[\x04\xbe' + '\x1b\xa1X\xd2)\x18\x94\x8f\xd7\xd0\xc0\xbbcI`W\xdf v' + '\xe4\xed\xe8\x03J\xe8\xc8l#\x10<\x96\xd2\xcdr\xa3' + '\x1b\xa1\xf5!f\xef\xc64\xb6\x13') + self.nonce = '\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9' + self.jwk = jose.JWK(key=RSA256_KEY.publickey()) + + b64sig = ('SUPYKucUnhlTt8_sMxLiigOYdf_wlOLXPI-o7aRLTsOquVjDd6r' + 'AX9AFJHk-bCMQPJbSzXKjG6H1IWbvxjS2Ew') + b64nonce = '7Nbyb1lI6xPVI3Hg3aKSqQ' + self.jsig_to = { + 'nonce': b64nonce, + 'alg': self.alg, + 'jwk': self.jwk, + 'sig': b64sig, + } + + self.jsig_from = { + 'nonce': b64nonce, + 'alg': self.alg, + 'jwk': self.jwk.to_json(), + 'sig': b64sig, + } + + from letsencrypt.acme.other import Signature + self.signature = Signature( + alg=self.alg, sig=self.sig, nonce=self.nonce, jwk=self.jwk) + + def test_attributes(self): + self.assertEqual(self.signature.nonce, self.nonce) + self.assertEqual(self.signature.alg, self.alg) + self.assertEqual(self.signature.sig, self.sig) + self.assertEqual(self.signature.jwk, self.jwk) + + def test_verify_good_succeeds(self): + self.assertTrue(self.signature.verify(self.msg)) + + def test_verify_bad_fails(self): + self.assertFalse(self.signature.verify(self.msg + 'x')) + + @classmethod + def _from_msg(cls, *args, **kwargs): + from letsencrypt.acme.other import Signature + return Signature.from_msg(*args, **kwargs) + + def test_create_from_msg(self): + signature = self._from_msg(self.msg, RSA256_KEY, self.nonce) + self.assertEqual(self.signature, signature) + + def test_create_from_msg_random_nonce(self): + signature = self._from_msg(self.msg, RSA256_KEY) + self.assertEqual(signature.alg, self.alg) + self.assertEqual(signature.jwk, self.jwk) + self.assertTrue(signature.verify(self.msg)) + + def test_to_json(self): + self.assertEqual(self.signature.to_json(), self.jsig_to) + + def test_from_json(self): + from letsencrypt.acme.other import Signature + # pylint: disable=protected-access + self.assertEqual( + self.signature, Signature._from_valid_json(self.jsig_from)) + + +if __name__ == '__main__': + unittest.main() diff --git a/letsencrypt/client/schemata/authorization.json b/letsencrypt/acme/schemata/authorization.json similarity index 88% rename from letsencrypt/client/schemata/authorization.json rename to letsencrypt/acme/schemata/authorization.json index 59877b648..742a9c0d5 100644 --- a/letsencrypt/client/schemata/authorization.json +++ b/letsencrypt/acme/schemata/authorization.json @@ -15,7 +15,7 @@ "type": "string" }, "jwk": { - "$ref": "file:letsencrypt/client/schemata/jwk.json" + "$ref": "file:letsencrypt/acme/schemata/jwk.json" } } } diff --git a/letsencrypt/client/schemata/authorizationRequest.json b/letsencrypt/acme/schemata/authorizationRequest.json similarity index 85% rename from letsencrypt/client/schemata/authorizationRequest.json rename to letsencrypt/acme/schemata/authorizationRequest.json index a0d198333..ee22808bc 100644 --- a/letsencrypt/client/schemata/authorizationRequest.json +++ b/letsencrypt/acme/schemata/authorizationRequest.json @@ -15,14 +15,14 @@ "type": "string" }, "signature" : { - "$ref": "file:letsencrypt/client/schemata/signature.json" + "$ref": "file:letsencrypt/acme/schemata/signature.json" }, "responses": { "type": "array", "minItems": 1, "items": { "anyOf": [ - { "$ref": "file:letsencrypt/client/schemata/responseobject.json" }, + { "$ref": "file:letsencrypt/acme/schemata/responseobject.json" }, { "type": "null" } ] } diff --git a/letsencrypt/client/schemata/certificate.json b/letsencrypt/acme/schemata/certificate.json similarity index 100% rename from letsencrypt/client/schemata/certificate.json rename to letsencrypt/acme/schemata/certificate.json diff --git a/letsencrypt/client/schemata/certificateRequest.json b/letsencrypt/acme/schemata/certificateRequest.json similarity index 87% rename from letsencrypt/client/schemata/certificateRequest.json rename to letsencrypt/acme/schemata/certificateRequest.json index 0ea5b83d7..c75e93bd9 100644 --- a/letsencrypt/client/schemata/certificateRequest.json +++ b/letsencrypt/acme/schemata/certificateRequest.json @@ -13,7 +13,7 @@ "pattern": "^[-_=0-9A-Za-z]+$" }, "signature" : { - "$ref": "file:letsencrypt/client/schemata/signature.json" + "$ref": "file:letsencrypt/acme/schemata/signature.json" } } } diff --git a/letsencrypt/client/schemata/challenge.json b/letsencrypt/acme/schemata/challenge.json similarity index 91% rename from letsencrypt/client/schemata/challenge.json rename to letsencrypt/acme/schemata/challenge.json index 92e22424b..b4b2a5205 100644 --- a/letsencrypt/client/schemata/challenge.json +++ b/letsencrypt/acme/schemata/challenge.json @@ -18,7 +18,7 @@ "type": "array", "minItems": 1, "items": { - "$ref": "file:letsencrypt/client/schemata/challengeobject.json" + "$ref": "file:letsencrypt/acme/schemata/challengeobject.json" } }, "combinations": { diff --git a/letsencrypt/client/schemata/challengeRequest.json b/letsencrypt/acme/schemata/challengeRequest.json similarity index 100% rename from letsencrypt/client/schemata/challengeRequest.json rename to letsencrypt/acme/schemata/challengeRequest.json diff --git a/letsencrypt/client/schemata/challengeobject.json b/letsencrypt/acme/schemata/challengeobject.json similarity index 100% rename from letsencrypt/client/schemata/challengeobject.json rename to letsencrypt/acme/schemata/challengeobject.json diff --git a/letsencrypt/client/schemata/defer.json b/letsencrypt/acme/schemata/defer.json similarity index 100% rename from letsencrypt/client/schemata/defer.json rename to letsencrypt/acme/schemata/defer.json diff --git a/letsencrypt/client/schemata/error.json b/letsencrypt/acme/schemata/error.json similarity index 100% rename from letsencrypt/client/schemata/error.json rename to letsencrypt/acme/schemata/error.json diff --git a/letsencrypt/client/schemata/jwk.json b/letsencrypt/acme/schemata/jwk.json similarity index 100% rename from letsencrypt/client/schemata/jwk.json rename to letsencrypt/acme/schemata/jwk.json diff --git a/letsencrypt/client/schemata/responseobject.json b/letsencrypt/acme/schemata/responseobject.json similarity index 96% rename from letsencrypt/client/schemata/responseobject.json rename to letsencrypt/acme/schemata/responseobject.json index dfb1fac28..c6d6c9c1b 100644 --- a/letsencrypt/client/schemata/responseobject.json +++ b/letsencrypt/acme/schemata/responseobject.json @@ -59,7 +59,7 @@ "pattern": "^[-_=0-9A-Za-z]+$" }, "signature": { - "$ref": "file:letsencrypt/client/schemata/signature.json" + "$ref": "file:letsencrypt/acme/schemata/signature.json" } } }, diff --git a/letsencrypt/client/schemata/revocation.json b/letsencrypt/acme/schemata/revocation.json similarity index 100% rename from letsencrypt/client/schemata/revocation.json rename to letsencrypt/acme/schemata/revocation.json diff --git a/letsencrypt/client/schemata/revocationRequest.json b/letsencrypt/acme/schemata/revocationRequest.json similarity index 86% rename from letsencrypt/client/schemata/revocationRequest.json rename to letsencrypt/acme/schemata/revocationRequest.json index 38cbe85b8..5eb604fd9 100644 --- a/letsencrypt/client/schemata/revocationRequest.json +++ b/letsencrypt/acme/schemata/revocationRequest.json @@ -12,7 +12,7 @@ "type" : "string" }, "signature" : { - "$ref": "file:letsencrypt/client/schemata/signature.json" + "$ref": "file:letsencrypt/acme/schemata/signature.json" } } } diff --git a/letsencrypt/client/schemata/signature.json b/letsencrypt/acme/schemata/signature.json similarity index 100% rename from letsencrypt/client/schemata/signature.json rename to letsencrypt/acme/schemata/signature.json diff --git a/letsencrypt/client/schemata/statusRequest.json b/letsencrypt/acme/schemata/statusRequest.json similarity index 100% rename from letsencrypt/client/schemata/statusRequest.json rename to letsencrypt/acme/schemata/statusRequest.json diff --git a/letsencrypt/acme/util.py b/letsencrypt/acme/util.py new file mode 100644 index 000000000..8906e584a --- /dev/null +++ b/letsencrypt/acme/util.py @@ -0,0 +1,147 @@ +"""ACME utilities.""" +import json +import pkg_resources + +import jsonschema +import zope.interface + +from letsencrypt.acme import errors +from letsencrypt.acme import interfaces + + +class ComparableX509(object): # pylint: disable=too-few-public-methods + """Wrapper for M2Crypto.X509.* objects that supports __eq__. + + Wraps around: + + - :class:`M2Crypto.X509.X509` + - :class:`M2Crypto.X509.Request` + + """ + def __init__(self, wrapped): + self._wrapped = wrapped + + def __getattr__(self, name): + return getattr(self._wrapped, name) + + def __eq__(self, other): + return self.as_der() == other.as_der() + + +def load_schema(name): + """Load JSON schema from distribution.""" + return json.load(open(pkg_resources.resource_filename( + __name__, "schemata/%s.json" % name))) + + +class JSONDeSerializable(object): + """JSON (de)serializable object.""" + zope.interface.implements(interfaces.IJSONSerializable) + + schema = NotImplemented + + @classmethod + def validate_json(cls, jobj): + """Validate JSON object against schema. + + :raises letsencrypt.acme.errors.SchemaValidationError: if object + couldn't be validated. + + """ + try: + jsonschema.validate(jobj, cls.schema) + except jsonschema.ValidationError as error: + raise errors.SchemaValidationError(error) + + @classmethod + def from_json(cls, jobj, validate=True): + """Deserialize from JSON. + + Note that the input ``jobj`` has not been sanitized in any way. + + :param jobj: JSON object. + :param bool validate: Validate against schema before deserializing. + Useful if :class:`JWK` is part of already validated json object. + + :raises letsencrypt.acme.errors.SchemaValidationError: if ``validate`` + was ``True`` and object couldn't be validated. + + :returns: instance of the class + + """ + if validate: + cls.validate_json(jobj) + return cls._from_valid_json(jobj) + + @classmethod + def _from_valid_json(cls, jobj): + """Deserializa from valid JSON object. + + :param jobj: JSON object that has been validated against schema. + + """ + raise NotImplementedError() + + @classmethod + def json_loads(cls, json_string, validate=True): + """Load JSON string.""" + return cls.from_json(json.loads(json_string), validate) + + def to_json(self): + """Prepare JSON serializable object.""" + raise NotImplementedError() + + def json_dumps(self): + """Dump to JSON string using proper serializer. + + :returns: JSON serialized string. + :rtype: str + + """ + return json.dumps(self, default=dump_ijsonserializable) + + +def dump_ijsonserializable(python_object): + """Serialize IJSONSerializable to JSON. + + This is meant to be passed to :func:`json.dumps` as ``default`` + argument. + + """ + # providedBy | pylint: disable=no-member + if interfaces.IJSONSerializable.providedBy(python_object): + return python_object.to_json() + else: + raise TypeError(repr(python_object) + ' is not JSON serializable') + + +class ImmutableMap(object): # pylint: disable=too-few-public-methods + """Immutable key to value mapping with attribute access.""" + + __slots__ = () + """Must be overriden in subclasses.""" + + def __init__(self, **kwargs): + if set(kwargs) != set(self.__slots__): + raise TypeError( + '__init__() takes exactly the following arguments: {0} ' + '({1} given)'.format(', '.join(self.__slots__), + ', '.join(kwargs) if kwargs else 'none')) + for slot in self.__slots__: + object.__setattr__(self, slot, kwargs.pop(slot)) + + def __setattr__(self, name, value): + raise AttributeError("can't set attribute") + + def __eq__(self, other): + return isinstance(other, self.__class__) and all( + getattr(self, slot) == getattr(other, slot) + for slot in self.__slots__) + + def __hash__(self): + return hash(tuple(getattr(self, slot) for slot in self.__slots__)) + + def __repr__(self): + return '{0}({1})'.format(self.__class__.__name__, ', '.join( + '{0}={1!r}'.format(slot, getattr(self, slot)) + for slot in self.__slots__)) diff --git a/letsencrypt/acme/util_test.py b/letsencrypt/acme/util_test.py new file mode 100644 index 000000000..cf71963e8 --- /dev/null +++ b/letsencrypt/acme/util_test.py @@ -0,0 +1,167 @@ +"""Tests for letsencrypt.acme.util.""" +import functools +import json +import unittest + +import zope.interface + +from letsencrypt.acme import errors +from letsencrypt.acme import interfaces + + +class MockJSONSerialiazable(object): + # pylint: disable=missing-docstring,too-few-public-methods,no-self-use + zope.interface.implements(interfaces.IJSONSerializable) + + def to_json(self): + return [3, 2, 1] + + +class JSONDeSerializableTest(unittest.TestCase): + """Tests for letsencrypt.acme.util.JSONDeSerializable.""" + + def setUp(self): + from letsencrypt.acme.util import JSONDeSerializable + + class Tester(JSONDeSerializable): + # pylint: disable=missing-docstring,no-self-use, + # pylint: disable=too-few-public-methods + zope.interface.implements(interfaces.IJSONSerializable) + + schema = {'type': 'integer'} + + def __init__(self, jobj): + self.jobj = jobj + + @classmethod + def _from_valid_json(cls, jobj): + return cls(jobj) + + def to_json(self): + return {'foo': MockJSONSerialiazable()} + + self.tester_cls = Tester + + def test_validate_invalid_json(self): + self.assertRaises(errors.SchemaValidationError, + self.tester_cls.validate_json, 'bang!') + + def test_validate_valid_json(self): + self.tester_cls.validate_json(5) + + def test_from_json(self): + self.assertEqual(5, self.tester_cls.from_json(5, validate=True).jobj) + + def test_from_json_no_validation(self): + self.assertEqual(['1', 2], self.tester_cls.from_json( + ['1', 2], validate=False).jobj) + + def test_from_valid_json_raises_error(self): + from letsencrypt.acme.util import JSONDeSerializable + # pylint: disable=protected-access + self.assertRaises( + NotImplementedError, JSONDeSerializable._from_valid_json, 'foo') + + def test_json_loads(self): + tester = self.tester_cls.json_loads('5', validate=True) + self.assertEqual(tester.jobj, 5) + + def test_json_loads_no_validation(self): + self.assertEqual( + 'foo', self.tester_cls.json_loads('"foo"', validate=False).jobj) + + def test_to_json_raises_error(self): + from letsencrypt.acme.util import JSONDeSerializable + self.assertRaises(NotImplementedError, JSONDeSerializable().to_json) + + def test_json_dumps(self): + self.assertEqual( + self.tester_cls('foo').json_dumps(), '{"foo": [3, 2, 1]}') + + +class DumpIJSONSerializableTest(unittest.TestCase): + """Tests for letsencrypt.acme.util.dump_ijsonserializable.""" + + @classmethod + def _call(cls, obj): + from letsencrypt.acme.util import dump_ijsonserializable + return json.dumps(obj, default=dump_ijsonserializable) + + def test_json_type(self): + self.assertEqual('5', self._call(5)) + + def test_ijsonserializable(self): + self.assertEqual('[3, 2, 1]', self._call(MockJSONSerialiazable())) + + def test_raises_type_error(self): + self.assertRaises(TypeError, self._call, object()) + + +class ImmutableMapTest(unittest.TestCase): + """Tests for letsencrypt.acme.util.ImmutableMap.""" + + def setUp(self): + # pylint: disable=invalid-name,too-few-public-methods + # pylint: disable=missing-docstring + from letsencrypt.acme.util import ImmutableMap + + class A(ImmutableMap): + __slots__ = ('x', 'y') + + class B(ImmutableMap): + __slots__ = ('x', 'y') + + self.A = A + self.B = B + + self.a1 = self.A(x=1, y=2) + self.a1_swap = self.A(y=2, x=1) + self.a2 = self.A(x=3, y=4) + self.b = self.B(x=1, y=2) + + def test_order_of_args_does_not_matter(self): + self.assertEqual(self.a1, self.a1_swap) + + def test_type_error_on_missing(self): + self.assertRaises(TypeError, self.A, x=1) + self.assertRaises(TypeError, self.A, y=2) + + def test_type_error_on_unrecognized(self): + self.assertRaises(TypeError, self.A, x=1, z=2) + self.assertRaises(TypeError, self.A, x=1, y=2, z=3) + + def test_get_attr(self): + self.assertEqual(1, self.a1.x) + self.assertEqual(2, self.a1.y) + self.assertEqual(1, self.a1_swap.x) + self.assertEqual(2, self.a1_swap.y) + + def test_set_attr_raises_attribute_error(self): + self.assertRaises( + AttributeError, functools.partial(self.a1.__setattr__, 'x'), 10) + + def test_equal(self): + self.assertEqual(self.a1, self.a1) + self.assertEqual(self.a2, self.a2) + self.assertNotEqual(self.a1, self.a2) + + def test_same_slots_diff_cls_not_equal(self): + self.assertEqual(self.a1.x, self.b.x) + self.assertEqual(self.a1.y, self.b.y) + self.assertNotEqual(self.a1, self.b) + + def test_hash(self): + self.assertEqual(hash((1, 2)), hash(self.a1)) + + def test_unhashable(self): + self.assertRaises(TypeError, self.A(x=1, y={}).__hash__) + + def test_repr(self): + self.assertEqual('A(x=1, y=2)', repr(self.a1)) + self.assertEqual('A(x=1, y=2)', repr(self.a1_swap)) + self.assertEqual('B(x=1, y=2)', repr(self.b)) + self.assertEqual("B(x='foo', y='bar')", repr(self.B(x='foo', y='bar'))) + + +if __name__ == '__main__': + unittest.main() diff --git a/letsencrypt/client/acme.py b/letsencrypt/client/acme.py deleted file mode 100644 index bbb39ef83..000000000 --- a/letsencrypt/client/acme.py +++ /dev/null @@ -1,155 +0,0 @@ -"""ACME protocol messages.""" -import json -import pkg_resources - -import jsonschema - -from letsencrypt.client import crypto_util -from letsencrypt.client import le_util - - -SCHEMATA = dict([ - (schema, json.load(open(pkg_resources.resource_filename( - __name__, "schemata/%s.json" % schema)))) for schema in [ - "authorization", - "authorizationRequest", - "certificate", - "certificateRequest", - "challenge", - "challengeRequest", - "defer", - "error", - "revocation", - "revocationRequest", - "statusRequest" - ] -]) - - -def acme_object_validate(json_string, schemata=None): - """Validate a JSON string against the ACME protocol using JSON Schema. - - :param str json_string: Well-formed input JSON string. - - :param dict schemata: Mapping from type name to JSON Schema - definition. Useful for testing. - - :returns: None if validation was successful. - - :raises jsonschema.ValidationError: if validation was unsuccessful - :raises ValueError: if the object cannot even be parsed as valid JSON - - """ - schemata = SCHEMATA if schemata is None else schemata - json_object = json.loads(json_string) - if not isinstance(json_object, dict): - raise jsonschema.ValidationError("this is not a dictionary object") - if "type" not in json_object: - raise jsonschema.ValidationError("missing type field") - if json_object["type"] not in schemata: - raise jsonschema.ValidationError( - "unknown type %s" % json_object["type"]) - jsonschema.validate(json_object, schemata[json_object["type"]]) - - -def pretty(json_string): - """Return a pretty-printed version of any JSON string. - - Useful when printing out protocol messages for debugging purposes. - - """ - return json.dumps(json.loads(json_string), indent=4) - - -def challenge_request(name): - """Create ACME "challengeRequest message. - - :param str name: Domain name - - :returns: ACME "challengeRequest" message. - :rtype: dict - - """ - return { - "type": "challengeRequest", - "identifier": name, - } - - -def authorization_request(req_id, name, server_nonce, responses, key, - nonce=None): - """Create ACME "authorizationRequest" message. - - :param str req_id: SessionID from the server challenge - :param str name: Hostname - :param str server_nonce: Nonce from the server challenge - :param list responses: List of completed challenges - :param str key: Key in string form. Accepted formats - are the same as for `Crypto.PublicKey.RSA.importKey`. - :param str nonce: Nonce used for signature. Useful for testing. - - :returns: ACME "authorizationRequest" message. - :rtype: dict - - """ - return { - "type": "authorizationRequest", - "sessionID": req_id, - "nonce": server_nonce, - "responses": responses, - "signature": crypto_util.create_sig( - name + le_util.jose_b64decode(server_nonce), key, nonce), - } - - -def certificate_request(csr_der, key, nonce=None): - """Create ACME "certificateRequest" message. - - :param str csr_der: DER encoded CSR. - :param str key: Key in string form. Accepted formats - are the same as for `Crypto.PublicKey.RSA.importKey`. - :param str nonce: Nonce used for signature. Useful for testing. - - :returns: ACME "certificateRequest" message. - :rtype: dict - - """ - return { - "type": "certificateRequest", - "csr": le_util.jose_b64encode(csr_der), - "signature": crypto_util.create_sig(csr_der, key, nonce), - } - - -def revocation_request(cert_der, key, nonce=None): - """Create ACME "revocationRequest" message. - - :param str cert_der: DER encoded certificate. - :param str key: Key in string form. Accepted formats - are the same as for `Crypto.PublicKey.RSA.importKey`. - :param str nonce: Nonce used for signature. Useful for testing. - - :returns: ACME "revocationRequest" message. - :rtype: dict - - """ - return { - "type": "revocationRequest", - "certificate": le_util.jose_b64encode(cert_der), - "signature": crypto_util.create_sig(cert_der, key, nonce), - } - - -def status_request(token): - """Create ACME "statusRequest" message. - - :param unicode token: Token provided in ACME "defer" message. - - :returns: ACME "statusRequest" message. - :rtype: dict - - """ - return { - "type": "statusRequest", - "token": token, - } diff --git a/letsencrypt/client/auth_handler.py b/letsencrypt/client/auth_handler.py index 984862f46..c0eeeb0cd 100644 --- a/letsencrypt/client/auth_handler.py +++ b/letsencrypt/client/auth_handler.py @@ -2,7 +2,10 @@ import logging import sys -from letsencrypt.client import acme +import Crypto.PublicKey.RSA + +from letsencrypt.acme import messages + from letsencrypt.client import challenge_util from letsencrypt.client import constants from letsencrypt.client import errors @@ -53,7 +56,9 @@ class AuthHandler(object): # pylint: disable=too-many-instance-attributes """Add a challenge message to the AuthHandler. :param str domain: domain for authorization - :param dict msg: ACME challenge message + + :param msg: ACME "challenge" message + :type msg: :class:`letsencrypt.acme.message.Challenge` :param authkey: authorized key for the challenge :type authkey: :class:`letsencrypt.client.le_util.Key` @@ -64,7 +69,7 @@ class AuthHandler(object): # pylint: disable=too-many-instance-attributes "Multiple ACMEChallengeMessages for the same domain " "is not supported.") self.domains.append(domain) - self.responses[domain] = ["null"] * len(msg["challenges"]) + self.responses[domain] = ["null"] * len(msg.challenges) self.msgs[domain] = msg self.authkey[domain] = authkey @@ -102,18 +107,19 @@ class AuthHandler(object): # pylint: disable=too-many-instance-attributes :param str domain: domain that is requesting authorization :returns: ACME "authorization" message. - :rtype: dict + :rtype: :class:`letsencrypt.acme.messages.Authorization` """ try: auth = self.network.send_and_receive_expected( - acme.authorization_request( - self.msgs[domain]["sessionID"], - domain, - self.msgs[domain]["nonce"], - self.responses[domain], - self.authkey[domain].pem), - "authorization") + messages.AuthorizationRequest.create( + session_id=self.msgs[domain].session_id, + nonce=self.msgs[domain].nonce, + responses=self.responses[domain], + name=domain, + key=Crypto.PublicKey.RSA.importKey( + self.authkey[domain].pem)), + messages.Authorization) logging.info("Received Authorization for %s", domain) return auth except errors.LetsEncryptClientError as err: @@ -135,9 +141,9 @@ class AuthHandler(object): # pylint: disable=too-many-instance-attributes logging.info("Performing the following challenges:") for dom in self.domains: self.paths[dom] = gen_challenge_path( - self.msgs[dom]["challenges"], + self.msgs[dom].challenges, self._get_chall_pref(dom), - self.msgs[dom].get("combinations", None)) + self.msgs[dom].combinations) self.dv_c[dom], self.client_c[dom] = self._challenge_factory( dom, self.paths[dom]) @@ -263,7 +269,7 @@ class AuthHandler(object): # pylint: disable=too-many-instance-attributes recognized """ - challenges = self.msgs[domain]["challenges"] + challenges = self.msgs[domain].challenges dv_chall = [] client_chall = [] diff --git a/letsencrypt/client/challenge_util.py b/letsencrypt/client/challenge_util.py index b836fd142..118f6d6aa 100644 --- a/letsencrypt/client/challenge_util.py +++ b/letsencrypt/client/challenge_util.py @@ -4,9 +4,10 @@ import hashlib from Crypto import Random +from letsencrypt.acme import jose + from letsencrypt.client import constants from letsencrypt.client import crypto_util -from letsencrypt.client import le_util # Authenticator Challenges @@ -45,7 +46,7 @@ def dvsni_gen_cert(name, r_b64, nonce, key): """ # Generate S dvsni_s = Random.get_random_bytes(constants.S_SIZE) - dvsni_r = le_util.jose_b64decode(r_b64) + dvsni_r = jose.b64decode(r_b64) # Generate extension ext = _dvsni_gen_ext(dvsni_r, dvsni_s) @@ -53,7 +54,7 @@ def dvsni_gen_cert(name, r_b64, nonce, key): cert_pem = crypto_util.make_ss_cert( key.pem, [nonce + constants.DVSNI_DOMAIN_SUFFIX, name, ext]) - return cert_pem, le_util.jose_b64encode(dvsni_s) + return cert_pem, jose.b64encode(dvsni_s) def _dvsni_gen_ext(dvsni_r, dvsni_s): diff --git a/letsencrypt/client/client.py b/letsencrypt/client/client.py index 7fe4bedf2..b7abbcc5c 100644 --- a/letsencrypt/client/client.py +++ b/letsencrypt/client/client.py @@ -5,10 +5,13 @@ import os import shutil import sys +import Crypto.PublicKey.RSA import M2Crypto import zope.component -from letsencrypt.client import acme +from letsencrypt.acme import messages +from letsencrypt.acme import util as acme_util + from letsencrypt.client import auth_handler from letsencrypt.client import client_authenticator from letsencrypt.client import crypto_util @@ -95,11 +98,11 @@ class Client(object): csr = init_csr(self.authkey, domains, self.config.cert_dir) # Retrieve certificate - certificate_dict = self.acme_certificate(csr.data) + certificate_msg = self.acme_certificate(csr.data) # Save Certificate cert_file, chain_file = self.save_certificate( - certificate_dict, self.config.cert_path, self.config.chain_path) + certificate_msg, self.config.cert_path, self.config.chain_path) self.store_cert_key(cert_file, False) @@ -109,11 +112,12 @@ class Client(object): """Handle ACME "challenge" phase. :returns: ACME "challenge" message. - :rtype: dict + :rtype: :class:`letsencrypt.acme.messages.Challenge` """ return self.network.send_and_receive_expected( - acme.challenge_request(domain), "challenge") + messages.ChallengeRequest(identifier=domain), + messages.Challenge) def acme_certificate(self, csr_der): """Handle ACME "certificate" phase. @@ -121,18 +125,24 @@ class Client(object): :param str csr_der: CSR in DER format. :returns: ACME "certificate" message. - :rtype: dict + :rtype: :class:`letsencrypt.acme.message.Certificate` """ logging.info("Preparing and sending CSR...") return self.network.send_and_receive_expected( - acme.certificate_request(csr_der, self.authkey.pem), "certificate") + messages.CertificateRequest.create( + csr=acme_util.ComparableX509( + M2Crypto.X509.load_request_der_string(csr_der)), + key=Crypto.PublicKey.RSA.importKey(self.authkey.pem)), + messages.Certificate) - def save_certificate(self, certificate_dict, cert_path, chain_path): + def save_certificate(self, certificate_msg, cert_path, chain_path): # pylint: disable=no-self-use """Saves the certificate received from the ACME server. - :param dict certificate_dict: certificate message from server + :param certificate_msg: ACME "certificate" message from server. + :type certificate_msg: :class:`letsencrypt.acme.messages.Certificate` + :param str cert_path: Path to attempt to save the cert file :param str chain_path: Path to attempt to save the chain file @@ -144,16 +154,15 @@ class Client(object): """ cert_chain_abspath = None cert_fd, cert_file = le_util.unique_file(cert_path, 0o644) - cert_fd.write( - crypto_util.b64_cert_to_pem(certificate_dict["certificate"])) + cert_fd.write(certificate_msg.certificate.as_pem()) cert_fd.close() logging.info( "Server issued certificate; certificate written to %s", cert_file) - if certificate_dict.get("chain", None): + if certificate_msg.chain: chain_fd, chain_fn = le_util.unique_file(chain_path, 0o644) - for cert in certificate_dict.get("chain", []): - chain_fd.write(crypto_util.b64_cert_to_pem(cert)) + for cert in certificate_msg.chain: + chain_fd.write(cert.to_pem()) chain_fd.close() logging.info("Cert chain written to %s", chain_fn) diff --git a/letsencrypt/client/crypto_util.py b/letsencrypt/client/crypto_util.py index 9b038f0de..e2c4965fe 100644 --- a/letsencrypt/client/crypto_util.py +++ b/letsencrypt/client/crypto_util.py @@ -1,67 +1,12 @@ """Let's Encrypt client crypto utility functions""" -import binascii -import logging import time -from Crypto import Random import Crypto.Hash.SHA256 import Crypto.PublicKey.RSA import Crypto.Signature.PKCS1_v1_5 import M2Crypto -from letsencrypt.client import constants -from letsencrypt.client import le_util - - -def create_sig(msg, key_str, nonce=None): - """Create signature with nonce prepended to the message. - - .. todo:: Change this over to M2Crypto... PKey - - .. todo:: Protect against crypto unicode errors... is this sufficient? - Do I need to escape? - - :param str key_str: Key in string form. Accepted formats - are the same as for `Crypto.PublicKey.RSA.importKey`. - :param str msg: Message to be signed - :param str nonce: Nonce to be used (required size - - :returns: Signature. - :rtype: dict - - """ - key = Crypto.PublicKey.RSA.importKey(key_str) - if nonce is None: - nonce = Random.get_random_bytes(constants.NONCE_SIZE) - assert len(nonce) == constants.NONCE_SIZE - - msg_with_nonce = nonce + msg - hashed = Crypto.Hash.SHA256.new(msg_with_nonce) - signature = Crypto.Signature.PKCS1_v1_5.new(key).sign(hashed) - - logging.debug("%s signed as %s", msg_with_nonce, signature) - - n_bytes = binascii.unhexlify(_leading_zeros(hex(key.n)[2:].rstrip("L"))) - e_bytes = binascii.unhexlify(_leading_zeros(hex(key.e)[2:].rstrip("L"))) - - return { - "nonce": le_util.jose_b64encode(nonce), - "alg": "RS256", - "jwk": { - "kty": "RSA", - "n": le_util.jose_b64encode(n_bytes), - "e": le_util.jose_b64encode(e_bytes), - }, - "sig": le_util.jose_b64encode(signature), - } - - -def _leading_zeros(arg): - if len(arg) % 2: - return "0" + arg - return arg - def make_csr(key_str, domains): """Generate a CSR. @@ -244,9 +189,3 @@ def get_cert_info(filename): "serial": cert.get_serial_number(), "pub_key": "RSA " + str(cert.get_pubkey().size() * 8), } - - -def b64_cert_to_pem(b64_der_cert): - """Convert JOSE Base-64 encoded DER cert to PEM.""" - return M2Crypto.X509.load_cert_der_string( - le_util.jose_b64decode(b64_der_cert)).as_pem() diff --git a/letsencrypt/client/le_util.py b/letsencrypt/client/le_util.py index 9266f0ca9..8b4b51536 100644 --- a/letsencrypt/client/le_util.py +++ b/letsencrypt/client/le_util.py @@ -1,5 +1,4 @@ """Utilities for all Let's Encrypt.""" -import base64 import collections import errno import os @@ -73,54 +72,3 @@ def unique_file(path, mode=0o777): except OSError: pass count += 1 - - -# https://tools.ietf.org/html/draft-ietf-jose-json-web-signature-37#appendix-C -# -# Jose Base64: -# -# - URL-safe Base64 -# -# - padding stripped - - -def jose_b64encode(data): - """JOSE Base64 encode. - - :param data: Data to be encoded. - :type data: str or bytearray - - :returns: JOSE Base64 string. - :rtype: str - - :raises TypeError: if `data` is of incorrect type - - """ - if not isinstance(data, str): - raise TypeError("argument should be str or bytearray") - return base64.urlsafe_b64encode(data).rstrip("=") - - -def jose_b64decode(data): - """JOSE Base64 decode. - - :param data: Base64 string to be decoded. If it's unicode, then - only ASCII characters are allowed. - :type data: str or unicode - - :returns: Decoded data. - - :raises TypeError: if input is of incorrect type - :raises ValueError: if input is unicode with non-ASCII characters - - """ - if isinstance(data, unicode): - try: - data = data.encode("ascii") - except UnicodeEncodeError: - raise ValueError( - "unicode argument should contain only ASCII characters") - elif not isinstance(data, str): - raise TypeError("argument should be a str or unicode") - - return base64.urlsafe_b64decode(data + "=" * (4 - (len(data) % 4))) diff --git a/letsencrypt/client/network.py b/letsencrypt/client/network.py index 2ec93136d..bdba746b0 100644 --- a/letsencrypt/client/network.py +++ b/letsencrypt/client/network.py @@ -1,13 +1,12 @@ """Network Module.""" -import json import logging import sys import time -import jsonschema import requests -from letsencrypt.client import acme +from letsencrypt.acme import messages + from letsencrypt.client import errors @@ -31,10 +30,11 @@ class Network(object): def send(self, msg): """Send ACME message to server. - :param dict msg: ACME message (JSON serializable). + :param msg: ACME message. + :type msg: :class:`letsencrypt.acme.messages.Message` :returns: Server response message. - :rtype: dict + :rtype: :class:`letsencrypt.acme.messages.Message` :raises TypeError: if `msg` is not JSON serializable :raises jsonschema.ValidationError: if not valid ACME message @@ -42,13 +42,10 @@ class Network(object): or if response from server is not a valid ACME message. """ - json_encoded = json.dumps(msg) - acme.acme_object_validate(json_encoded) - try: response = requests.post( self.server_url, - data=json_encoded, + data=msg.json_dumps(), headers={"Content-Type": "application/json"}, verify=True ) @@ -56,66 +53,55 @@ class Network(object): raise errors.LetsEncryptClientError( 'Sending ACME message to server has failed: %s' % error) - try: - acme.acme_object_validate(response.content) - except ValueError: - raise errors.LetsEncryptClientError( - 'Server did not send JSON serializable message') - except jsonschema.ValidationError as error: - raise errors.LetsEncryptClientError( - 'Response from server is not a valid ACME message') - - return response.json() + return messages.Message.from_json(response.json(), validate=True) def send_and_receive_expected(self, msg, expected): """Send ACME message to server and return expected message. - :param dict msg: ACME message (JSON serializable). - :param str expected: Name of the expected response ACME message type. + :param msg: ACME message. + :type msg: :class:`letsencrypt.acme.Message` :returns: ACME response message of expected type. - :rtype: dict + :rtype: :class:`letsencrypt.acme.messages.Message` :raises errors.LetsEncryptClientError: An exception is thrown """ response = self.send(msg) - try: - return self.is_expected_msg(response, expected) - except: # TODO: too generic exception - raise errors.LetsEncryptClientError( - 'Expected message (%s) not received' % expected) + return self.is_expected_msg(response, expected) + def is_expected_msg(self, response, expected, delay=3, rounds=20): """Is response expected ACME message? - :param dict response: ACME response message from server. - :param str expected: Name of the expected response ACME message type. + :param response: ACME response message from server. + :type response: :class:`letsencrypt.acme.messages.Message` + + :param expected: Expected response type. + :type expected: subclass of :class:`letsencrypt.acme.messages.Message` + :param int delay: Number of seconds to delay before next round in case of ACME "defer" response message. :param int rounds: Number of resend attempts in case of ACME "defer" response message. :returns: ACME response message from server. - :rtype: dict + :rtype: :class:`letsencrypt.acme.messages.Message` :raises LetsEncryptClientError: if server sent ACME "error" message """ for _ in xrange(rounds): - if response["type"] == expected: + if isinstance(response, expected): return response - - elif response["type"] == "error": - logging.error( - "%s: %s - More Info: %s", response["error"], - response.get("message", ""), response.get("moreInfo", "")) - raise errors.LetsEncryptClientError(response["error"]) - - elif response["type"] == "defer": + elif isinstance(response, messages.Error): + logging.error("%s", response) + raise errors.LetsEncryptClientError(response.error) + elif isinstance(response, messages.Defer): logging.info("Waiting for %d seconds...", delay) time.sleep(delay) - response = self.send(acme.status_request(response["token"])) + response = self.send( + messages.StatusRequest(token=response.token)) else: logging.fatal("Received unexpected message") logging.fatal("Expected: %s", expected) diff --git a/letsencrypt/client/revoker.py b/letsencrypt/client/revoker.py index 297866e54..f3a4c0127 100644 --- a/letsencrypt/client/revoker.py +++ b/letsencrypt/client/revoker.py @@ -4,10 +4,13 @@ import logging import os import shutil +import Crypto.PublicKey.RSA import M2Crypto import zope.component -from letsencrypt.client import acme +from letsencrypt.acme import messages +from letsencrypt.acme import util as acme_util + from letsencrypt.client import crypto_util from letsencrypt.client import display from letsencrypt.client import interfaces @@ -33,15 +36,18 @@ class Revoker(object): :param dict cert: TODO :returns: ACME "revocation" message. - :rtype: dict + :rtype: :class:`letsencrypt.acme.message.Revocation` """ - cert_der = M2Crypto.X509.load_cert(cert["backup_cert_file"]).as_der() + certificate = acme_util.ComparableX509( + M2Crypto.X509.load_cert(cert["backup_cert_file"])) with open(cert["backup_key_file"], 'rU') as backup_key_file: - key = backup_key_file.read() + key = Crypto.PublicKey.RSA.importKey(backup_key_file.read()) revocation = self.network.send_and_receive_expected( - acme.revocation_request(cert_der, key), "revocation") + messages.RevocationRequest.create( + certificate=certificate, key=key), + messages.Revocation) zope.component.getUtility(interfaces.IDisplay).generic_notification( "You have successfully revoked the certificate for " @@ -65,8 +71,8 @@ class Revoker(object): c_sha1_vh = {} for (cert, _, path) in self.installer.get_all_certs_keys(): try: - c_sha1_vh[M2Crypto.X509.load_cert( - cert).get_fingerprint(md='sha1')] = path + c_sha1_vh[acme_util.ComparableX509(M2Crypto.X509.load_cert( + cert).get_fingerprint(md='sha1'))] = path except M2Crypto.X509.X509Error: continue diff --git a/letsencrypt/client/tests/acme_test.py b/letsencrypt/client/tests/acme_test.py deleted file mode 100644 index 514c6b14e..000000000 --- a/letsencrypt/client/tests/acme_test.py +++ /dev/null @@ -1,158 +0,0 @@ -"""Tests for letsencrypt.client.acme.""" -import pkg_resources -import unittest - -import jsonschema - - -class ACMEObjectValidateTest(unittest.TestCase): - """Tests for letsencrypt.client.acme.acme_object_validate.""" - - def setUp(self): - self.schemata = { - 'foo': { - 'type': 'object', - 'properties': { - 'price': {'type': 'number'}, - 'name': {'type': 'string'}, - }, - }, - } - - def _call(self, json_string): - from letsencrypt.client.acme import acme_object_validate - return acme_object_validate(json_string, self.schemata) - - def _test_fails(self, json_string): - self.assertRaises(jsonschema.ValidationError, self._call, json_string) - - def test_non_dictionary_fails(self): - self._test_fails('[]') - - def test_dict_without_type_fails(self): - self._test_fails('{}') - - def test_unknown_type_fails(self): - self._test_fails('{"type": "bar"}') - - def test_valid_returns_none(self): - self.assertTrue(self._call('{"type": "foo"}') is None) - - def test_invalid_fails(self): - self._test_fails('{"type": "foo", "price": "asd"}') - - -class PrettyTest(unittest.TestCase): # pylint: disable=too-few-public-methods - """Tests for letsencrypt.client.acme.pretty.""" - - @classmethod - def _call(cls, json_string): - from letsencrypt.client.acme import pretty - return pretty(json_string) - - def test_it(self): - self.assertEqual( - self._call('{"foo": {"bar": "baz"}}'), - '{\n "foo": {\n "bar": "baz"\n }\n}') - - -class MessageFactoriesTest(unittest.TestCase): - """Tests for ACME message factories from letsencrypt.client.acme.""" - - def setUp(self): - self.privkey = pkg_resources.resource_string( - __name__, 'testdata/rsa256_key.pem') - self.nonce = '\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9' - self.b64nonce = '7Nbyb1lI6xPVI3Hg3aKSqQ' - - @classmethod - def _validate(cls, msg): - from letsencrypt.client.acme import SCHEMATA - jsonschema.validate(msg, SCHEMATA[msg['type']]) - - def test_challenge_request(self): - from letsencrypt.client.acme import challenge_request - msg = challenge_request('example.com') - self._validate(msg) - self.assertEqual(msg, { - 'type': 'challengeRequest', - 'identifier': 'example.com', - }) - - def test_authorization_request(self): - from letsencrypt.client.acme import authorization_request - responses = [ - { - 'type': 'simpleHttps', - 'path': 'Hf5GrX4Q7EBax9hc2jJnfw', - }, - None, # null - { - 'type': 'recoveryToken', - 'token': '23029d88d9e123e', - } - ] - msg = authorization_request( - 'aefoGaavieG9Wihuk2aufai3aeZ5EeW4', - 'example.com', - 'czpsrF0KMH6dgajig3TGHw', - responses, - self.privkey, - self.nonce, - ) - - self._validate(msg) - self.assertEqual( - msg.pop('signature')['sig'], - 'VkpReso87ogwGul2MGck96TkYs4QoblIgNthgrm9O7EBGlzCRCnTHnx' - 'bj6loqaC4f5bn1rgS927Gp1Kvbqnmqg' - ) - self.assertEqual(msg, { - 'type': 'authorizationRequest', - 'sessionID': 'aefoGaavieG9Wihuk2aufai3aeZ5EeW4', - 'nonce': 'czpsrF0KMH6dgajig3TGHw', - 'responses': responses, - }) - - def test_certificate_request(self): - from letsencrypt.client.acme import certificate_request - msg = certificate_request( - 'TODO: real DER CSR?', self.privkey, self.nonce) - self._validate(msg) - self.assertEqual( - msg.pop('signature')['sig'], - 'HEQVN4MU1yDrArP2T7WZQ12XlHCn5DgTPgb5eWT5_vjRPppLSNe6uWE' - 'x9SFwG9d9umqn49nZCSW7uskA2lcW6Q' - ) - self.assertEqual(msg, { - 'type': 'certificateRequest', - 'csr': 'VE9ETzogcmVhbCBERVIgQ1NSPw', - }) - - def test_revocation_request(self): - from letsencrypt.client.acme import revocation_request - msg = revocation_request( - 'TODO: real DER cert?', self.privkey, self.nonce) - self._validate(msg) - self.assertEqual( - msg.pop('signature')['sig'], - 'ABXA1IsyTalTXIojxmGnIUGyZASmvqEvTQ98jJ5KFs2FTswLEmsoqFX' - 'fU6l5_fous-tsbXOfLN-7PjfZ5XWPvg' - ) - self.assertEqual(msg, { - 'type': 'revocationRequest', - 'certificate': 'VE9ETzogcmVhbCBERVIgY2VydD8', - }) - - def test_status_request(self): - from letsencrypt.client.acme import status_request - msg = status_request(u'O7-s9MNq1siZHlgrMzi9_A') - self._validate(msg) - self.assertEqual(msg, { - 'type': 'statusRequest', - 'token': u'O7-s9MNq1siZHlgrMzi9_A', - }) - - -if __name__ == '__main__': - unittest.main() diff --git a/letsencrypt/client/tests/acme_util.py b/letsencrypt/client/tests/acme_util.py index 08a7e44bd..86bdbb282 100644 --- a/letsencrypt/client/tests/acme_util.py +++ b/letsencrypt/client/tests/acme_util.py @@ -90,18 +90,3 @@ def gen_combos(challs): # Gen combos for 1 of each type return [[i, j] for i in xrange(len(dv_chall)) for j in xrange(len(renewal_chall))] - -def get_chall_msg(iden, nonce, challenges, combos=None): - """Produce an ACME challenge message.""" - chall_msg = { - "type": "challenge", - "sessionID": iden, - "nonce": nonce, - "challenges": challenges - } - - if combos is None: - return chall_msg - - chall_msg["combinations"] = combos - return chall_msg diff --git a/letsencrypt/client/tests/auth_handler_test.py b/letsencrypt/client/tests/auth_handler_test.py index 945141f4e..c3ef196ba 100644 --- a/letsencrypt/client/tests/auth_handler_test.py +++ b/letsencrypt/client/tests/auth_handler_test.py @@ -4,8 +4,11 @@ import unittest import mock +from letsencrypt.acme import messages + from letsencrypt.client import challenge_util from letsencrypt.client import errors + from letsencrypt.client.tests import acme_util @@ -45,7 +48,8 @@ class SatisfyChallengesTest(unittest.TestCase): def test_name1_dvsni1(self): dom = "0" challenge = [acme_util.CHALLENGES["dvsni"]] - msg = acme_util.get_chall_msg(dom, "nonce0", challenge) + msg = messages.Challenge(session_id=dom, nonce="nonce0", + challenges=challenge, combinations=[]) self.handler.add_chall_msg(dom, msg, "dummy_key") self.handler._satisfy_challenges() # pylint: disable=protected-access @@ -62,7 +66,8 @@ class SatisfyChallengesTest(unittest.TestCase): def test_name1_rectok1(self): dom = "0" challenge = [acme_util.CHALLENGES["recoveryToken"]] - msg = acme_util.get_chall_msg(dom, "nonce0", challenge) + msg = messages.Challenge(session_id=dom, nonce="nonce0", + challenges=challenge, combinations=[]) self.handler.add_chall_msg(dom, msg, "dummy_key") self.handler._satisfy_challenges() # pylint: disable=protected-access @@ -87,7 +92,8 @@ class SatisfyChallengesTest(unittest.TestCase): for i in xrange(5): self.handler.add_chall_msg( str(i), - acme_util.get_chall_msg(str(i), "nonce%d" % i, challenge), + messages.Challenge(session_id=str(i), nonce="nonce%d" % i, + challenges=challenge, combinations=[]), "dummy_key") self.handler._satisfy_challenges() # pylint: disable=protected-access @@ -118,7 +124,8 @@ class SatisfyChallengesTest(unittest.TestCase): combos = acme_util.gen_combos(challenges) self.handler.add_chall_msg( dom, - acme_util.get_chall_msg("0", "nonce0", challenges, combos), + messages.Challenge(session_id="0", nonce="nonce0", + challenges=challenges, combinations=combos), "dummy_key") path = gen_path(["simpleHttps"], challenges) @@ -151,7 +158,8 @@ class SatisfyChallengesTest(unittest.TestCase): combos = acme_util.gen_combos(challenges) self.handler.add_chall_msg( dom, - acme_util.get_chall_msg(dom, "nonce0", challenges, combos), + messages.Challenge(session_id=dom, nonce="nonce0", + challenges=challenges, combinations=combos), "dummy_key") path = gen_path(["simpleHttps", "recoveryToken"], challenges) @@ -181,8 +189,9 @@ class SatisfyChallengesTest(unittest.TestCase): for i in xrange(5): self.handler.add_chall_msg( str(i), - acme_util.get_chall_msg( - str(i), "nonce%d" % i, challenges, combos), + messages.Challenge( + session_id=str(i), nonce="nonce%d" % i, + challenges=challenges, combinations=combos), "dummy_key") path = gen_path(["dvsni", "recoveryContact"], challenges) @@ -230,8 +239,9 @@ class SatisfyChallengesTest(unittest.TestCase): paths.append(gen_path(chosen_chall[i], challenge_list[i])) self.handler.add_chall_msg( dom, - acme_util.get_chall_msg( - dom, "nonce%d" % i, challenge_list[i]), + messages.Challenge( + session_id=dom, nonce="nonce%d" % i, + challenges=challenge_list[i], combinations=[]), "dummy_key") mock_chall_path.side_effect = paths @@ -278,8 +288,9 @@ class SatisfyChallengesTest(unittest.TestCase): for i in xrange(3): self.handler.add_chall_msg( str(i), - acme_util.get_chall_msg( - str(i), "nonce%d" % i, challenges, combos), + messages.Challenge( + session_id=str(i), nonce="nonce%d" % i, + challenges=challenges, combinations=combos), "dummy_key") mock_chall_path.side_effect = [ @@ -316,7 +327,8 @@ class SatisfyChallengesTest(unittest.TestCase): isinstance(client_chall_list[0], challenge_util.PopChall)) - def _get_exp_response(self, domain, path, challenges): # pylint: disable=no-self-use + def _get_exp_response(self, domain, path, challenges): + # pylint: disable=no-self-use exp_resp = ["null"] * len(challenges) for i in path: exp_resp[i] = TRANSLATE[challenges[i]["type"]] + str(domain) @@ -349,7 +361,8 @@ class GetAuthorizationsTest(unittest.TestCase): for i in xrange(3): self.handler.add_chall_msg( str(i), - acme_util.get_chall_msg(str(i), "nonce%d" % i, challenge), + messages.Challenge(session_id=str(i), nonce="nonce%d" % i, + challenges=challenge, combinations=[]), "dummy_key") self.mock_sat_chall.side_effect = self._sat_solved_at_once @@ -377,7 +390,8 @@ class GetAuthorizationsTest(unittest.TestCase): challenges = acme_util.get_challenges() self.handler.add_chall_msg( "0", - acme_util.get_chall_msg("0", "nonce0", challenges), + messages.Challenge(session_id="0", nonce="nonce0", + challenges=challenges, combinations=[]), "dummy_key") # Don't do anything to satisfy challenges @@ -392,7 +406,7 @@ class GetAuthorizationsTest(unittest.TestCase): def _sat_failure(self): dom = "0" self.handler.paths[dom] = gen_path( - ["dns", "recoveryToken"], self.handler.msgs[dom]["challenges"]) + ["dns", "recoveryToken"], self.handler.msgs[dom].challenges) dv_c, c_c = self.handler._challenge_factory( dom, self.handler.paths[dom]) self.handler.dv_c[dom], self.handler.client_c[dom] = dv_c, c_c @@ -405,7 +419,8 @@ class GetAuthorizationsTest(unittest.TestCase): dom = str(i) self.handler.add_chall_msg( dom, - acme_util.get_chall_msg(dom, "nonce%d" % i, challs[i]), + messages.Challenge(session_id=dom, nonce="nonce%d" % i, + challenges=challs[i], combinations=[]), "dummy_key") self.mock_sat_chall.side_effect = self._sat_incremental diff --git a/letsencrypt/client/tests/challenge_util_test.py b/letsencrypt/client/tests/challenge_util_test.py index 97f341b0d..c7848a213 100644 --- a/letsencrypt/client/tests/challenge_util_test.py +++ b/letsencrypt/client/tests/challenge_util_test.py @@ -6,6 +6,8 @@ import unittest import M2Crypto +from letsencrypt.acme import jose + from letsencrypt.client import challenge_util from letsencrypt.client import constants from letsencrypt.client import le_util @@ -19,7 +21,7 @@ class DvsniGenCertTest(unittest.TestCase): """Basic test for straightline code.""" domain = "example.com" dvsni_r = "r_value" - r_b64 = le_util.jose_b64encode(dvsni_r) + r_b64 = jose.b64encode(dvsni_r) pem = pkg_resources.resource_string( __name__, os.path.join("testdata", "rsa256_key.pem")) key = le_util.Key("path", pem) @@ -28,7 +30,7 @@ class DvsniGenCertTest(unittest.TestCase): # pylint: disable=protected-access ext = challenge_util._dvsni_gen_ext( - dvsni_r, le_util.jose_b64decode(s_b64)) + dvsni_r, jose.b64decode(s_b64)) self._standard_check_cert(cert_pem, domain, nonce, ext) def _standard_check_cert(self, pem, domain, nonce, ext): diff --git a/letsencrypt/client/tests/crypto_util_test.py b/letsencrypt/client/tests/crypto_util_test.py index 8b1a8ecd7..cb047281f 100644 --- a/letsencrypt/client/tests/crypto_util_test.py +++ b/letsencrypt/client/tests/crypto_util_test.py @@ -11,43 +11,6 @@ RSA256_KEY = pkg_resources.resource_string(__name__, 'testdata/rsa256_key.pem') RSA512_KEY = pkg_resources.resource_string(__name__, 'testdata/rsa512_key.pem') -class CreateSigTest(unittest.TestCase): - """Tests for letsencrypt.client.crypto_util.create_sig.""" - - def setUp(self): - self.nonce = '\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9' - self.b64nonce = '7Nbyb1lI6xPVI3Hg3aKSqQ' - self.signature = { - 'nonce': self.b64nonce, - 'alg': 'RS256', - 'jwk': { - 'kty': 'RSA', - 'e': 'AQAB', - 'n': 'rHVztFHtH92ucFJD_N_HW9AsdRsUuHUBBBDlHwNlRd3fp5' - '80rv2-6QWE30cWgdmJS86ObRz6lUTor4R0T-3C5Q', - }, - 'sig': 'SUPYKucUnhlTt8_sMxLiigOYdf_wlOLXPI-o7aRLTsOquVjDd6r' - 'AX9AFJHk-bCMQPJbSzXKjG6H1IWbvxjS2Ew', - } - - @classmethod - def _call(cls, *args, **kwargs): - from letsencrypt.client.crypto_util import create_sig - return create_sig(*args, **kwargs) - - def test_it(self): - self.assertEqual( - self._call('message', RSA256_KEY, self.nonce), self.signature) - - def test_random_nonce(self): - signature = self._call('message', RSA256_KEY) - signature.pop('sig') - signature.pop('nonce') - del self.signature['sig'] - del self.signature['nonce'] - self.assertEqual(signature, self.signature) - - class ValidCSRTest(unittest.TestCase): """Tests for letsencrypt.client.crypto_util.valid_csr.""" @@ -170,17 +133,5 @@ class GetCertInfoTest(unittest.TestCase): self._call('cert-san.pem') -class B64CertToPEMTest(unittest.TestCase): - # pylint: disable=too-few-public-methods - """Tests for letsencrypt.client.crypto_util.b64_cert_to_pem.""" - - def test_it(self): - from letsencrypt.client.crypto_util import b64_cert_to_pem - self.assertEqual( - b64_cert_to_pem(pkg_resources.resource_string( - __name__, 'testdata/cert.b64jose')), - pkg_resources.resource_string(__name__, 'testdata/cert.pem')) - - if __name__ == '__main__': unittest.main() diff --git a/letsencrypt/client/tests/le_util_test.py b/letsencrypt/client/tests/le_util_test.py index e7331e769..39926a9b5 100644 --- a/letsencrypt/client/tests/le_util_test.py +++ b/letsencrypt/client/tests/le_util_test.py @@ -122,71 +122,5 @@ class UniqueFileTest(unittest.TestCase): self.assertTrue(basename3.endswith('foo.txt')) -# https://en.wikipedia.org/wiki/Base64#Examples -JOSE_B64_PADDING_EXAMPLES = { - 'any carnal pleasure.': ('YW55IGNhcm5hbCBwbGVhc3VyZS4', '='), - 'any carnal pleasure': ('YW55IGNhcm5hbCBwbGVhc3VyZQ', '=='), - 'any carnal pleasur': ('YW55IGNhcm5hbCBwbGVhc3Vy', ''), - 'any carnal pleasu': ('YW55IGNhcm5hbCBwbGVhc3U', '='), - 'any carnal pleas': ('YW55IGNhcm5hbCBwbGVhcw', '=='), -} - - -B64_URL_UNSAFE_EXAMPLES = { - chr(251) + chr(239): '--8', - chr(255) * 2: '__8', -} - - -class JOSEB64EncodeTest(unittest.TestCase): - """Tests for letsencrypt.client.le_util.jose_b64encode.""" - - @classmethod - def _call(cls, data): - from letsencrypt.client.le_util import jose_b64encode - return jose_b64encode(data) - - def test_unsafe_url(self): - for text, b64 in B64_URL_UNSAFE_EXAMPLES.iteritems(): - self.assertEqual(self._call(text), b64) - - def test_different_paddings(self): - for text, (b64, _) in JOSE_B64_PADDING_EXAMPLES.iteritems(): - self.assertEqual(self._call(text), b64) - - def test_unicode_fails_with_type_error(self): - self.assertRaises(TypeError, self._call, u'some unicode') - - -class JOSEB64DecodeTest(unittest.TestCase): - """Tests for letsencrypt.client.le_util.jose_b64decode.""" - - @classmethod - def _call(cls, data): - from letsencrypt.client.le_util import jose_b64decode - return jose_b64decode(data) - - def test_unsafe_url(self): - for text, b64 in B64_URL_UNSAFE_EXAMPLES.iteritems(): - self.assertEqual(self._call(b64), text) - - def test_input_without_padding(self): - for text, (b64, _) in JOSE_B64_PADDING_EXAMPLES.iteritems(): - self.assertEqual(self._call(b64), text) - - def test_input_with_padding(self): - for text, (b64, pad) in JOSE_B64_PADDING_EXAMPLES.iteritems(): - self.assertEqual(self._call(b64 + pad), text) - - def test_unicode_with_ascii(self): - self.assertEqual(self._call(u'YQ'), 'a') - - def test_non_ascii_unicode_fails(self): - self.assertRaises(ValueError, self._call, u'\u0105') - - def test_type_error_no_unicode_or_str(self): - self.assertRaises(TypeError, self._call, object()) - - if __name__ == '__main__': unittest.main() diff --git a/letsencrypt/client/tests/standalone_authenticator_test.py b/letsencrypt/client/tests/standalone_authenticator_test.py index 9787073b1..60a1ba600 100644 --- a/letsencrypt/client/tests/standalone_authenticator_test.py +++ b/letsencrypt/client/tests/standalone_authenticator_test.py @@ -9,6 +9,8 @@ import mock import OpenSSL.crypto import OpenSSL.SSL +from letsencrypt.acme import jose + from letsencrypt.client import challenge_util from letsencrypt.client import le_util @@ -61,7 +63,7 @@ class SNICallbackTest(unittest.TestCase): from letsencrypt.client.standalone_authenticator import \ StandaloneAuthenticator self.authenticator = StandaloneAuthenticator() - name, r_b64 = "example.com", le_util.jose_b64encode("x" * 32) + name, r_b64 = "example.com", jose.b64encode("x" * 32) test_key = pkg_resources.resource_string( __name__, "testdata/rsa256_key.pem") nonce, key = "abcdef", le_util.Key("foo", test_key) @@ -428,7 +430,7 @@ class DoChildProcessTest(unittest.TestCase): from letsencrypt.client.standalone_authenticator import \ StandaloneAuthenticator self.authenticator = StandaloneAuthenticator() - name, r_b64 = "example.com", le_util.jose_b64encode("x" * 32) + name, r_b64 = "example.com", jose.b64encode("x" * 32) test_key = pkg_resources.resource_string( __name__, "testdata/rsa256_key.pem") nonce, key = "abcdef", le_util.Key("foo", test_key) diff --git a/linter_plugin.py b/linter_plugin.py new file mode 100644 index 000000000..d5faf33ac --- /dev/null +++ b/linter_plugin.py @@ -0,0 +1,23 @@ +"""Let's Encrypt ACME PyLint plugin. + +http://docs.pylint.org/plugins.html + +""" +from astroid import MANAGER +from astroid import nodes + + +def register(unused_linter): + """Register this module as PyLint plugin.""" + +def _transform(cls): + # fix the "no-member" error on instances of + # letsencrypt.acme.util.ImmutableMap subclasses (instance + # attributes are initialized dynamically based on __slots__) + if (('Message' in cls.basenames or 'ImmutableMap' in cls.basenames or + 'util.ImmutableMap' in cls.basenames) and (cls.slots() is not None)): + for slot in cls.slots(): + cls.locals[slot.value] = [nodes.EmptyNode()] + + +MANAGER.register_transform(nodes.Class, _transform) diff --git a/setup.py b/setup.py index 9626fbb39..f2550446f 100755 --- a/setup.py +++ b/setup.py @@ -61,6 +61,7 @@ setup( url="https://letsencrypt.org", packages=[ 'letsencrypt', + 'letsencrypt.acme', 'letsencrypt.client', 'letsencrypt.client.apache', 'letsencrypt.client.tests', diff --git a/tox.ini b/tox.ini index d3b67e629..7a5f3810d 100644 --- a/tox.ini +++ b/tox.ini @@ -10,11 +10,14 @@ commands = pip install -e .[testing] python setup.py test -q # -q does not suppress errors +setenv = + PYTHONPATH = {toxinidir} + [testenv:cover] basepython = python2.7 commands = pip install -e .[testing] - python setup.py nosetests --with-coverage --cover-min-percentage=66 + python setup.py nosetests --with-coverage --cover-min-percentage=73 [testenv:lint] # recent versions of pylint do not support Python 2.6 (#97, #187)