diff --git a/acme/challenges.py b/acme/challenges.py index 0a2a461f0..45db23e72 100644 --- a/acme/challenges.py +++ b/acme/challenges.py @@ -3,8 +3,8 @@ import binascii import functools import hashlib import logging +import os -import Crypto.Random import requests from acme import jose @@ -204,7 +204,7 @@ class DVSNIResponse(ChallengeResponse): decoder=functools.partial(jose.decode_b64jose, size=S_SIZE)) def __init__(self, s=None, *args, **kwargs): - s = Crypto.Random.get_random_bytes(self.S_SIZE) if s is None else s + s = os.urandom(self.S_SIZE) if s is None else s super(DVSNIResponse, self).__init__(s=s, *args, **kwargs) def z(self, chall): # pylint: disable=invalid-name diff --git a/acme/challenges_test.py b/acme/challenges_test.py index bd332d1d9..b39e40280 100644 --- a/acme/challenges_test.py +++ b/acme/challenges_test.py @@ -3,7 +3,8 @@ import os import pkg_resources import unittest -import Crypto.PublicKey.RSA +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import serialization import M2Crypto import mock import requests @@ -16,9 +17,10 @@ from acme import other CERT = jose.ComparableX509(M2Crypto.X509.load_cert( pkg_resources.resource_filename( 'letsencrypt.tests', os.path.join('testdata', 'cert.pem')))) -KEY = jose.HashableRSAKey(Crypto.PublicKey.RSA.importKey( +KEY = jose.ComparableRSAKey(serialization.load_pem_private_key( pkg_resources.resource_string( - 'acme.jose', os.path.join('testdata', 'rsa512_key.pem')))) + 'acme.jose', os.path.join('testdata', 'rsa512_key.pem')), + password=None, backend=default_backend())) class ChallengeResponseTest(unittest.TestCase): @@ -345,7 +347,7 @@ class RecoveryTokenResponseTest(unittest.TestCase): class ProofOfPossessionHintsTest(unittest.TestCase): def setUp(self): - jwk = jose.JWKRSA(key=KEY.publickey()) + jwk = jose.JWKRSA(key=KEY.public_key()) issuers = ( 'C=US, O=SuperT LLC, CN=SuperTrustworthy Public CA', 'O=LessTrustworthy CA Inc, CN=LessTrustworthy But StillSecure', @@ -413,7 +415,7 @@ class ProofOfPossessionTest(unittest.TestCase): def setUp(self): from acme.challenges import ProofOfPossession hints = ProofOfPossession.Hints( - jwk=jose.JWKRSA(key=KEY.publickey()), cert_fingerprints=(), + jwk=jose.JWKRSA(key=KEY.public_key()), cert_fingerprints=(), certs=(), serial_numbers=(), subject_key_identifiers=(), issuers=(), authorized_for=()) self.msg = ProofOfPossession( @@ -453,7 +455,7 @@ class ProofOfPossessionResponseTest(unittest.TestCase): # nonce and challenge nonce are the same, don't make the same # mistake here... signature = other.Signature( - alg=jose.RS256, jwk=jose.JWKRSA(key=KEY.publickey()), + alg=jose.RS256, jwk=jose.JWKRSA(key=KEY.public_key()), sig='\xa7\xc1\xe7\xe82o\xbc\xcd\xd0\x1e\x010#Z|\xaf\x15\x83' '\x94\x8f#\x9b\nQo(\x80\x15,\x08\xfcz\x1d\xfd\xfd.\xaap' '\xfa\x06\xd1\xa2f\x8d8X2>%d\xbd%\xe1T\xdd\xaa0\x18\xde' diff --git a/acme/client.py b/acme/client.py index 4a4192528..fbdb4543d 100644 --- a/acme/client.py +++ b/acme/client.py @@ -83,7 +83,8 @@ class Client(object): # pylint: disable=too-many-instance-attributes assert response.status_code == httplib.CREATED # TODO: handle errors regr = self._regr_from_response(response) - if regr.body.key != self.key.public() or regr.body.contact != contact: + if (regr.body.key != self.key.public_key() + or regr.body.contact != contact): raise errors.UnexpectedUpdate(regr) return regr diff --git a/acme/client_test.py b/acme/client_test.py index b934e1efd..7ae14d0be 100644 --- a/acme/client_test.py +++ b/acme/client_test.py @@ -44,7 +44,7 @@ class ClientTest(unittest.TestCase): # Registration self.contact = ('mailto:cert-admin@example.com', 'tel:+12025551212') reg = messages.Registration( - contact=self.contact, key=KEY.public(), recovery_token='t') + contact=self.contact, key=KEY.public_key(), recovery_token='t') self.regr = messages.RegistrationResource( body=reg, uri='https://www.letsencrypt-demo.org/acme/reg/1', new_authzr_uri='https://www.letsencrypt-demo.org/acme/new-reg', @@ -84,7 +84,7 @@ class ClientTest(unittest.TestCase): # TODO: test POST call arguments # TODO: split here and separate test - reg_wrong_key = self.regr.body.update(key=KEY2.public()) + reg_wrong_key = self.regr.body.update(key=KEY2.public_key()) self.response.json.return_value = reg_wrong_key.to_json() self.assertRaises( errors.UnexpectedUpdate, self.client.register, self.contact) diff --git a/acme/jose/__init__.py b/acme/jose/__init__.py index a4fe7008b..793b342b0 100644 --- a/acme/jose/__init__.py +++ b/acme/jose/__init__.py @@ -74,6 +74,6 @@ from acme.jose.jws import ( from acme.jose.util import ( ComparableX509, - HashableRSAKey, + ComparableRSAKey, ImmutableMap, ) diff --git a/acme/jose/jwa.py b/acme/jose/jwa.py index 97c770b78..c3d79ff20 100644 --- a/acme/jose/jwa.py +++ b/acme/jose/jwa.py @@ -4,20 +4,22 @@ https://tools.ietf.org/html/draft-ietf-jose-json-web-algorithms-40 """ import abc +import logging -from Crypto.Hash import HMAC -from Crypto.Hash import SHA256 -from Crypto.Hash import SHA384 -from Crypto.Hash import SHA512 - -from Crypto.Signature import PKCS1_PSS -from Crypto.Signature import PKCS1_v1_5 +import cryptography.exceptions +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives import hmac +from cryptography.hazmat.primitives.asymmetric import padding from acme.jose import errors from acme.jose import interfaces from acme.jose import jwk +logger = logging.getLogger(__name__) + + class JWA(interfaces.JSONDeSerializable): # pylint: disable=abstract-method # pylint: disable=too-few-public-methods # for some reason disable=abstract-method has to be on the line @@ -66,43 +68,79 @@ class _JWAHS(JWASignature): kty = jwk.JWKOct - def __init__(self, name, digestmod): + def __init__(self, name, hash_): super(_JWAHS, self).__init__(name) - self.digestmod = digestmod + self.hash = hash_() def sign(self, key, msg): - return HMAC.new(key, msg, self.digestmod).digest() + signer = hmac.HMAC(key, self.hash, backend=default_backend()) + signer.update(msg) + return signer.finalize() def verify(self, key, msg, sig): - """Verify the signature. - - .. warning:: - Does not protect against timing attack (no constant compare). - - """ - return self.sign(key, msg) == sig + verifier = hmac.HMAC(key, self.hash, backend=default_backend()) + verifier.update(msg) + try: + verifier.verify(sig) + except cryptography.exceptions.InvalidSignature as error: + logger.debug(error, exc_info=True) + return False + else: + return True -class _JWARS(JWASignature): +class _JWARSA(object): kty = jwk.JWKRSA - - def __init__(self, name, padding, digestmod): - super(_JWARS, self).__init__(name) - self.padding = padding - self.digestmod = digestmod + padding = NotImplemented + hash = NotImplemented def sign(self, key, msg): + """Sign the ``msg`` using ``key``.""" try: - return self.padding.new(key).sign(self.digestmod.new(msg)) - except TypeError: - raise errors.Error('Key has no private part necessary for signing') - except (AttributeError, ValueError): - # ValueError for PS, AttributeError for RS - raise errors.Error('Key too small ({0})'.format(key.size())) + signer = key.signer(self.padding, self.hash) + except AttributeError as error: + logger.debug(error, exc_info=True) + raise errors.Error("Public key cannot be used for signing") + except ValueError as error: # digest too large + logger.debug(error, exc_info=True) + raise errors.Error(str(error)) + signer.update(msg) + try: + return signer.finalize() + except ValueError as error: + logger.debug(error, exc_info=True) + raise errors.Error(str(error)) def verify(self, key, msg, sig): - return self.padding.new(key).verify(self.digestmod.new(msg), sig) + """Verify the ``msg` and ``sig`` using ``key``.""" + verifier = key.verifier(sig, self.padding, self.hash) + verifier.update(msg) + try: + verifier.verify() + except cryptography.exceptions.InvalidSignature as error: + logger.debug(error, exc_info=True) + return False + else: + return True + + +class _JWARS(_JWARSA, JWASignature): + + def __init__(self, name, hash_): + super(_JWARS, self).__init__(name) + self.padding = padding.PKCS1v15() + self.hash = hash_() + + +class _JWAPS(_JWARSA, JWASignature): + + def __init__(self, name, hash_): + super(_JWAPS, self).__init__(name) + self.padding = padding.PSS( + mgf=padding.MGF1(hash_()), + salt_length=padding.PSS.MAX_LENGTH) + self.hash = hash_() class _JWAES(JWASignature): # pylint: disable=abstract-class-not-used @@ -116,17 +154,17 @@ class _JWAES(JWASignature): # pylint: disable=abstract-class-not-used raise NotImplementedError() -HS256 = JWASignature.register(_JWAHS('HS256', SHA256)) -HS384 = JWASignature.register(_JWAHS('HS384', SHA384)) -HS512 = JWASignature.register(_JWAHS('HS512', SHA512)) +HS256 = JWASignature.register(_JWAHS('HS256', hashes.SHA256)) +HS384 = JWASignature.register(_JWAHS('HS384', hashes.SHA384)) +HS512 = JWASignature.register(_JWAHS('HS512', hashes.SHA512)) -RS256 = JWASignature.register(_JWARS('RS256', PKCS1_v1_5, SHA256)) -RS384 = JWASignature.register(_JWARS('RS384', PKCS1_v1_5, SHA384)) -RS512 = JWASignature.register(_JWARS('RS512', PKCS1_v1_5, SHA512)) +RS256 = JWASignature.register(_JWARS('RS256', hashes.SHA256)) +RS384 = JWASignature.register(_JWARS('RS384', hashes.SHA384)) +RS512 = JWASignature.register(_JWARS('RS512', hashes.SHA512)) -PS256 = JWASignature.register(_JWARS('PS256', PKCS1_PSS, SHA256)) -PS384 = JWASignature.register(_JWARS('PS384', PKCS1_PSS, SHA384)) -PS512 = JWASignature.register(_JWARS('PS512', PKCS1_PSS, SHA512)) +PS256 = JWASignature.register(_JWAPS('PS256', hashes.SHA256)) +PS384 = JWASignature.register(_JWAPS('PS384', hashes.SHA384)) +PS512 = JWASignature.register(_JWAPS('PS512', hashes.SHA512)) ES256 = JWASignature.register(_JWAES('ES256')) ES256 = JWASignature.register(_JWAES('ES384')) diff --git a/acme/jose/jwa_test.py b/acme/jose/jwa_test.py index f66b2a250..c8347ff69 100644 --- a/acme/jose/jwa_test.py +++ b/acme/jose/jwa_test.py @@ -3,17 +3,24 @@ import os import pkg_resources import unittest -from Crypto.PublicKey import RSA +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import serialization from acme.jose import errors -RSA256_KEY = RSA.importKey(pkg_resources.resource_string( - __name__, os.path.join('testdata', 'rsa256_key.pem'))) -RSA512_KEY = RSA.importKey(pkg_resources.resource_string( - __name__, os.path.join('testdata', 'rsa512_key.pem'))) -RSA1024_KEY = RSA.importKey(pkg_resources.resource_string( - __name__, os.path.join('testdata', 'rsa1024_key.pem'))) +RSA256_KEY = serialization.load_pem_private_key( + pkg_resources.resource_string( + __name__, os.path.join('testdata', 'rsa256_key.pem')), + password=None, backend=default_backend()) +RSA512_KEY = serialization.load_pem_private_key( + pkg_resources.resource_string( + __name__, os.path.join('testdata', 'rsa512_key.pem')), + password=None, backend=default_backend()) +RSA1024_KEY = serialization.load_pem_private_key( + pkg_resources.resource_string( + __name__, os.path.join('testdata', 'rsa1024_key.pem')), + password=None, backend=default_backend()) class JWASignatureTest(unittest.TestCase): @@ -71,14 +78,13 @@ class JWARSTest(unittest.TestCase): def test_sign_no_private_part(self): from acme.jose.jwa import RS256 self.assertRaises( - errors.Error, RS256.sign, RSA512_KEY.publickey(), 'foo') + errors.Error, RS256.sign, RSA512_KEY.public_key(), 'foo') def test_sign_key_too_small(self): from acme.jose.jwa import RS256 from acme.jose.jwa import PS256 self.assertRaises(errors.Error, RS256.sign, RSA256_KEY, 'foo') self.assertRaises(errors.Error, PS256.sign, RSA256_KEY, 'foo') - self.assertRaises(errors.Error, PS256.sign, RSA512_KEY, 'foo') def test_rs(self): from acme.jose.jwa import RS256 @@ -89,16 +95,14 @@ class JWARSTest(unittest.TestCase): '\xd2\xb9.>}\xfd' ) self.assertEqual(RS256.sign(RSA512_KEY, 'foo'), sig) - # next tests guard that only True/False are return as oppossed - # to e.g. 1/0 - self.assertTrue(RS256.verify(RSA512_KEY, 'foo', sig) is True) - self.assertFalse(RS256.verify(RSA512_KEY, 'foo', sig + '!') is False) + self.assertTrue(RS256.verify(RSA512_KEY.public_key(), 'foo', sig)) + self.assertFalse(RS256.verify(RSA512_KEY.public_key(), 'foo', sig + '!')) def test_ps(self): from acme.jose.jwa import PS256 sig = PS256.sign(RSA1024_KEY, 'foo') - self.assertTrue(PS256.verify(RSA1024_KEY, 'foo', sig) is True) - self.assertTrue(PS256.verify(RSA1024_KEY, 'foo', sig + '!') is False) + self.assertTrue(PS256.verify(RSA1024_KEY.public_key(), 'foo', sig)) + self.assertFalse(PS256.verify(RSA1024_KEY.public_key(), 'foo', sig + '!')) if __name__ == '__main__': diff --git a/acme/jose/jwk.py b/acme/jose/jwk.py index 7c55e99a8..376988ae2 100644 --- a/acme/jose/jwk.py +++ b/acme/jose/jwk.py @@ -2,7 +2,9 @@ import abc import binascii -import Crypto.PublicKey.RSA +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import rsa from acme.jose import b64 from acme.jose import errors @@ -22,14 +24,12 @@ class JWK(json_util.TypedJSONObjectWithFields): raise NotImplementedError() @abc.abstractmethod - def public(self): # pragma: no cover + def public_key(self): # pragma: no cover """Generate JWK with public key. For symmetric cryptosystems, this would return ``self``. """ - # TODO: rename publickey to stay consistent with - # HashableRSAKey.publickey raise NotImplementedError() @@ -54,7 +54,7 @@ class JWKES(JWK): # pragma: no cover def load(cls, string): raise NotImplementedError() - def public(self): + def public_key(self): raise NotImplementedError() @@ -79,7 +79,7 @@ class JWKOct(JWK): def load(cls, string): return cls(key=string) - def public(self): + def public_key(self): return self @@ -87,7 +87,9 @@ class JWKOct(JWK): class JWKRSA(JWK): """RSA JWK. - :ivar key: `Crypto.PublicKey.RSA` wrapped in `.HashableRSAKey` + :ivar key: `cryptography.hazmat.primitives.rsa.RSAPrivateKey` + or `cryptography.hazmat.primitives.rsa.RSAPublicKey` wrapped + in `.ComparableRSAKey` """ typ = 'RSA' @@ -120,21 +122,73 @@ class JWKRSA(JWK): :rtype: :class:`JWKRSA` """ - return cls(key=util.HashableRSAKey( - Crypto.PublicKey.RSA.importKey(string))) + try: + key = serialization.load_pem_public_key( + string, backend=default_backend()) + except ValueError: # ValueError: Could not unserialize key data. + key = serialization.load_pem_private_key( + string, password=None, backend=default_backend()) + return cls(key=util.ComparableRSAKey(key)) - def public(self): - return type(self)(key=self.key.publickey()) + def public_key(self): + return type(self)(key=self.key.public_key()) @classmethod def fields_from_json(cls, jobj): - return cls(key=util.HashableRSAKey( - Crypto.PublicKey.RSA.construct( - (cls._decode_param(jobj['n']), - cls._decode_param(jobj['e']))))) + # pylint: disable=invalid-name + n, e = (cls._decode_param(jobj[x]) for x in ('n', 'e')) + public_numbers = rsa.RSAPublicNumbers(e=e, n=n) + if 'd' not in jobj: # public key + key = public_numbers.public_key(default_backend()) + else: # private key + d = cls._decode_param(jobj['d']) + if ('p' in jobj or 'q' in jobj or 'dp' in jobj or + 'dq' in jobj or 'qi' in jobj or 'oth' in jobj): + # "If the producer includes any of the other private + # key parameters, then all of the others MUST be + # present, with the exception of "oth", which MUST + # only be present when more than two prime factors + # were used." + p, q, dp, dq, qi, = all_params = tuple( + jobj.get(x) for x in ('p', 'q', 'dp', 'dq', 'qi')) + if tuple(param for param in all_params if param is None): + raise errors.Error( + "Some private parameters are missing: {0}".format( + all_params)) + p, q, dp, dq, qi = tuple(cls._decode_param(x) for x in all_params) + + # TODO: check for oth + else: + p, q = rsa.rsa_recover_prime_factors(n, e, d) # cryptography>=0.8 + dp = rsa.rsa_crt_dmp1(d, p) + dq = rsa.rsa_crt_dmq1(d, q) + qi = rsa.rsa_crt_iqmp(p, q) + + key = rsa.RSAPrivateNumbers( + p, q, d, dp, dq, qi, public_numbers).private_key(default_backend()) + + return cls(key=util.ComparableRSAKey(key)) def fields_to_partial_json(self): - return { - 'n': self._encode_param(self.key.n), - 'e': self._encode_param(self.key.e), - } + # pylint: disable=protected-access + if isinstance(self.key._wrapped, rsa.RSAPublicKey): + numbers = self.key.public_numbers() + params = { + 'n': numbers.n, + 'e': numbers.e, + } + else: # rsa.RSAPrivateKey + private = self.key.private_numbers() + public = self.key.public_key().public_numbers() + params = { + 'n': public.n, + 'e': public.e, + 'd': private.d, + 'p': private.p, + 'q': private.q, + 'dp': private.dmp1, + 'dq': private.dmq1, + 'qi': private.iqmp, + } + return dict((key, self._encode_param(value)) + for key, value in params.iteritems()) diff --git a/acme/jose/jwk_test.py b/acme/jose/jwk_test.py index 39d595f94..f3745b251 100644 --- a/acme/jose/jwk_test.py +++ b/acme/jose/jwk_test.py @@ -3,16 +3,21 @@ import os import pkg_resources import unittest -from Crypto.PublicKey import RSA +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import serialization from acme.jose import errors from acme.jose import util -RSA256_KEY = util.HashableRSAKey(RSA.importKey(pkg_resources.resource_string( - __name__, os.path.join('testdata', 'rsa256_key.pem')))) -RSA512_KEY = util.HashableRSAKey(RSA.importKey(pkg_resources.resource_string( - __name__, os.path.join('testdata', 'rsa512_key.pem')))) +RSA256_KEY = util.ComparableRSAKey(serialization.load_pem_private_key( + pkg_resources.resource_string( + __name__, os.path.join('testdata', 'rsa256_key.pem')), + password=None, backend=default_backend())) +RSA512_KEY = util.ComparableRSAKey(serialization.load_pem_private_key( + pkg_resources.resource_string( + __name__, os.path.join('testdata', 'rsa512_key.pem')), + password=None, backend=default_backend())) class JWKOctTest(unittest.TestCase): @@ -38,8 +43,8 @@ class JWKOctTest(unittest.TestCase): from acme.jose.jwk import JWKOct self.assertEqual(self.jwk, JWKOct.load('foo')) - def test_public(self): - self.assertTrue(self.jwk.public() is self.jwk) + def test_public_key(self): + self.assertTrue(self.jwk.public_key() is self.jwk) class JWKRSATest(unittest.TestCase): @@ -47,20 +52,32 @@ class JWKRSATest(unittest.TestCase): def setUp(self): from acme.jose.jwk import JWKRSA - self.jwk256 = JWKRSA(key=RSA256_KEY.publickey()) - self.jwk256_private = JWKRSA(key=RSA256_KEY) + self.jwk256 = JWKRSA(key=RSA256_KEY.public_key()) self.jwk256json = { 'kty': 'RSA', 'e': 'AQAB', 'n': 'm2Fylv-Uz7trgTW8EBHP3FQSMeZs2GNQ6VRo1sIVJEk', } - self.jwk512 = JWKRSA(key=RSA512_KEY.publickey()) + self.jwk512 = JWKRSA(key=RSA512_KEY.public_key()) self.jwk512json = { 'kty': 'RSA', 'e': 'AQAB', 'n': 'rHVztFHtH92ucFJD_N_HW9AsdRsUuHUBBBDlHwNlRd3fp5' '80rv2-6QWE30cWgdmJS86ObRz6lUTor4R0T-3C5Q', } + self.private = JWKRSA(key=RSA256_KEY) + self.private_json_small = self.jwk256json.copy() + self.private_json_small['d'] = ( + 'lPQED_EPTV0UIBfNI3KP2d9Jlrc2mrMllmf946bu-CE') + self.private_json = self.jwk256json.copy() + self.private_json.update({ + 'd': 'lPQED_EPTV0UIBfNI3KP2d9Jlrc2mrMllmf946bu-CE', + 'p': 'zUVNZn4lLLBD1R6NE8TKNQ', + 'q': 'wcfKfc7kl5jfqXArCRSURQ', + 'dp': 'CWJFq43QvT5Bm5iN8n1okQ', + 'dq': 'bHh2u7etM8LKKCF2pY2UdQ', + 'qi': 'oi45cEkbVoJjAbnQpFY87Q', + }) def test_equals(self): self.assertEqual(self.jwk256, self.jwk256) @@ -73,22 +90,34 @@ class JWKRSATest(unittest.TestCase): def test_load(self): from acme.jose.jwk import JWKRSA self.assertEqual( - JWKRSA(key=util.HashableRSAKey(RSA256_KEY)), JWKRSA.load( + JWKRSA(key=RSA256_KEY), JWKRSA.load( pkg_resources.resource_string( __name__, os.path.join('testdata', 'rsa256_key.pem')))) - def test_public(self): - self.assertEqual(self.jwk256, self.jwk256_private.public()) + def test_public_key(self): + self.assertEqual(self.jwk256, self.private.public_key()) def test_to_partial_json(self): self.assertEqual(self.jwk256.to_partial_json(), self.jwk256json) self.assertEqual(self.jwk512.to_partial_json(), self.jwk512json) + self.assertEqual(self.private.to_partial_json(), self.private_json) def test_from_json(self): from acme.jose.jwk import JWK - self.assertEqual(self.jwk256, JWK.from_json(self.jwk256json)) - # TODO: fix schemata to allow RSA512 - #self.assertEqual(self.jwk512, JWK.from_json(self.jwk512json)) + self.assertEqual( + self.jwk256, JWK.from_json(self.jwk256json)) + self.assertEqual( + self.jwk512, JWK.from_json(self.jwk512json)) + self.assertEqual(self.private, JWK.from_json(self.private_json)) + + def test_from_json_private_small(self): + from acme.jose.jwk import JWK + self.assertEqual(self.private, JWK.from_json(self.private_json_small)) + + def test_from_json_missing_one_additional(self): + from acme.jose.jwk import JWK + del self.private_json['q'] + self.assertRaises(errors.Error, JWK.from_json, self.private_json) def test_from_json_hashable(self): from acme.jose.jwk import JWK diff --git a/acme/jose/jws.py b/acme/jose/jws.py index 3ba60d40c..a9e38ead7 100644 --- a/acme/jose/jws.py +++ b/acme/jose/jws.py @@ -203,7 +203,7 @@ class Signature(json_util.JSONObjectWithFields): header_params = kwargs header_params['alg'] = alg if include_jwk: - header_params['jwk'] = key.public() + header_params['jwk'] = key.public_key() assert set(header_params).issubset(cls.header_cls._fields) assert protect.issubset(cls.header_cls._fields) @@ -354,12 +354,12 @@ class CLI(object): if args.key is not None: assert args.kty is not None - key = args.kty.load(args.key.read()) + key = args.kty.load(args.key.read()).public_key() else: key = None sys.stdout.write(sig.payload) - return int(not sig.verify(key=key)) + return not sig.verify(key=key) @classmethod def _alg_type(cls, arg): diff --git a/acme/jose/jws_test.py b/acme/jose/jws_test.py index 6ecce63d2..19f45ae94 100644 --- a/acme/jose/jws_test.py +++ b/acme/jose/jws_test.py @@ -4,7 +4,8 @@ import os import pkg_resources import unittest -import Crypto.PublicKey.RSA +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import serialization import M2Crypto import mock @@ -18,8 +19,10 @@ from acme.jose import util CERT = util.ComparableX509(M2Crypto.X509.load_cert( pkg_resources.resource_filename( 'letsencrypt.tests', 'testdata/cert.pem'))) -RSA512_KEY = Crypto.PublicKey.RSA.importKey(pkg_resources.resource_string( - __name__, os.path.join('testdata', 'rsa512_key.pem'))) +RSA512_KEY = util.ComparableRSAKey(serialization.load_pem_private_key( + pkg_resources.resource_string( + __name__, os.path.join('testdata', 'rsa512_key.pem')), + password=None, backend=default_backend())) class MediaTypeTest(unittest.TestCase): @@ -107,7 +110,7 @@ class JWSTest(unittest.TestCase): def setUp(self): self.privkey = jwk.JWKRSA(key=RSA512_KEY) - self.pubkey = self.privkey.public() + self.pubkey = self.privkey.public_key() from acme.jose.jws import JWS self.unprotected = JWS.sign( diff --git a/acme/jose/util.py b/acme/jose/util.py index 2312055f7..8fa341fa2 100644 --- a/acme/jose/util.py +++ b/acme/jose/util.py @@ -1,6 +1,8 @@ """JOSE utilities.""" import collections +from cryptography.hazmat.primitives.asymmetric import rsa + class abstractclassmethod(classmethod): # pylint: disable=invalid-name,too-few-public-methods @@ -41,9 +43,14 @@ class ComparableX509(object): # pylint: disable=too-few-public-methods return self.as_der() == other.as_der() -class HashableRSAKey(object): # pylint: disable=too-few-public-methods - """Wrapper for `Crypto.PublicKey.RSA` objects that supports hashing.""" +class ComparableRSAKey(object): # pylint: disable=too-few-public-methods + """Wrapper for `cryptography` RSA keys. + Wraps around: + - `cryptography.hazmat.primitives.assymetric.RSAPrivateKey` + - `cryptography.hazmat.primitives.assymetric.RSAPublicKey` + + """ def __init__(self, wrapped): self._wrapped = wrapped @@ -51,14 +58,36 @@ class HashableRSAKey(object): # pylint: disable=too-few-public-methods return getattr(self._wrapped, name) def __eq__(self, other): - return self._wrapped == other + # pylint: disable=protected-access + if (not isinstance(other, self.__class__) or + self._wrapped.__class__ is not other._wrapped.__class__): + return False + # RSA*KeyWithSerialization requires cryptography>=0.8 + if isinstance(self._wrapped, rsa.RSAPrivateKeyWithSerialization): + return self.private_numbers() == other.private_numbers() + elif isinstance(self._wrapped, rsa.RSAPublicKeyWithSerialization): + return self.public_numbers() == other.public_numbers() + else: + return False # we shouldn't reach here... + def __hash__(self): - return hash((type(self), self.exportKey(format='DER'))) + # public_numbers() hasn't got stable hash! + if isinstance(self._wrapped, rsa.RSAPrivateKeyWithSerialization): + priv = self.private_numbers() + pub = priv.public_numbers + return hash((type(self), priv.p, priv.q, priv.dmp1, + priv.dmq1, priv.iqmp, pub.n, pub.e)) + elif isinstance(self._wrapped, rsa.RSAPublicKeyWithSerialization): + pub = self.public_numbers() + return hash((type(self), pub.n, pub.e)) - def publickey(self): + def __repr__(self): + return '<{0}({1!r})>'.format(self.__class__.__name__, self._wrapped) + + def public_key(self): """Get wrapped public key.""" - return type(self)(self._wrapped.publickey()) + return type(self)(self._wrapped.public_key()) class ImmutableMap(collections.Mapping, collections.Hashable): diff --git a/acme/jose/util_test.py b/acme/jose/util_test.py index b5592c57e..80d8b8999 100644 --- a/acme/jose/util_test.py +++ b/acme/jose/util_test.py @@ -4,32 +4,52 @@ import os import pkg_resources import unittest -import Crypto.PublicKey.RSA +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import serialization -class HashableRSAKeyTest(unittest.TestCase): - """Tests for acme.jose.util.HashableRSAKey.""" +class ComparableRSAKeyTest(unittest.TestCase): + """Tests for acme.jose.util.ComparableRSAKey.""" def setUp(self): - from acme.jose.util import HashableRSAKey - self.key = HashableRSAKey(Crypto.PublicKey.RSA.importKey( - pkg_resources.resource_string( - __name__, os.path.join('testdata', 'rsa256_key.pem')))) - self.key_same = HashableRSAKey(Crypto.PublicKey.RSA.importKey( - pkg_resources.resource_string( - __name__, os.path.join('testdata', 'rsa256_key.pem')))) + from acme.jose.util import ComparableRSAKey + backend = default_backend() + def load_key(): # pylint: disable=missing-docstring + return ComparableRSAKey(serialization.load_pem_private_key( + pkg_resources.resource_string( + __name__, os.path.join('testdata', 'rsa256_key.pem')), + password=None, backend=backend)) + self.key = load_key() + self.key_same = load_key() + + def test_getattr_proxy(self): + self.assertEqual(256, self.key.key_size) def test_eq(self): - # if __eq__ is not defined, then two HashableRSAKeys with same - # _wrapped do not equate self.assertEqual(self.key, self.key_same) + def test_not_eq_different_types(self): + self.assertFalse(self.key.__eq__(5)) + + def test_not_eq_not_wrapped(self): + # pylint: disable=protected-access + self.assertFalse(self.key.__eq__(self.key_same._wrapped)) + + def test_not_eq_no_serialization(self): + from acme.jose.util import ComparableRSAKey + self.assertFalse(ComparableRSAKey(5).__eq__(ComparableRSAKey(5))) + def test_hash(self): self.assertTrue(isinstance(hash(self.key), int)) + self.assertEqual(hash(self.key), hash(self.key_same)) - def test_publickey(self): - from acme.jose.util import HashableRSAKey - self.assertTrue(isinstance(self.key.publickey(), HashableRSAKey)) + def test_repr(self): + self.assertTrue(repr(self.key).startswith( + '=0.5 +key = jose.JWKRSA(key=jose.ComparableRSAKey(rsa.generate_private_key( + public_exponent=65537, + key_size=2048, + backend=default_backend()))) acme = client.Client(NEW_REG_URL, key) regr = acme.register(contact=()) diff --git a/letsencrypt/proof_of_possession.py b/letsencrypt/proof_of_possession.py index 9cec341de..a70fff697 100644 --- a/letsencrypt/proof_of_possession.py +++ b/letsencrypt/proof_of_possession.py @@ -1,5 +1,6 @@ """Proof of Possession Identifier Validation Challenge.""" import M2Crypto +import logging import os import zope.component @@ -11,6 +12,9 @@ from letsencrypt import interfaces from letsencrypt.display import util as display_util +logger = logging.getLogger(__name__) + + class ProofOfPossession(object): # pylint: disable=too-few-public-methods """Proof of Possession Identifier Validation Challenge. @@ -39,19 +43,22 @@ class ProofOfPossession(object): # pylint: disable=too-few-public-methods return None for cert, key, _ in self.installer.get_all_certs_keys(): - der_cert_key = M2Crypto.X509.load_cert(cert).get_pubkey().as_der() + pkey = M2Crypto.X509.load_cert(cert).get_pubkey() try: - cert_key = achall.alg.kty.load(der_cert_key) - # If JWKES.load raises other exceptions, they should be caught here - except (IndexError, ValueError, TypeError): + rsa_pkey = pkey.get_rsa() + except ValueError: + logger.warn("Only RSA supported at this time") continue + pem_cert_key = rsa_pkey.as_pem() + cert_key = achall.alg.kty.load(pem_cert_key) + # TODO: If JWKES.load raises other exceptions, they should be caught here if cert_key == achall.hints.jwk: return self._gen_response(achall, key) # Is there are different prompt we should give the user? code, key = zope.component.getUtility( interfaces.IDisplay).input( - "Path to private key for identifier: %s " % achall.domain) + "QPath to private key for identifier: %s " % achall.domain) if code != display_util.CANCEL: return self._gen_response(achall, key) diff --git a/letsencrypt/tests/acme_util.py b/letsencrypt/tests/acme_util.py index 7ac05c1fa..b0939dc68 100644 --- a/letsencrypt/tests/acme_util.py +++ b/letsencrypt/tests/acme_util.py @@ -4,16 +4,18 @@ import itertools import os import pkg_resources -import Crypto.PublicKey.RSA +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import serialization from acme import challenges from acme import jose from acme import messages -KEY = jose.HashableRSAKey(Crypto.PublicKey.RSA.importKey( +KEY = jose.ComparableRSAKey(serialization.load_pem_private_key( pkg_resources.resource_string( - "acme.jose", os.path.join("testdata", "rsa512_key.pem")))) + __name__, os.path.join('testdata', 'rsa512_key.pem')), + password=None, backend=default_backend())) # Challenges SIMPLE_HTTP = challenges.SimpleHTTP( @@ -30,7 +32,7 @@ RECOVERY_TOKEN = challenges.RecoveryToken() POP = challenges.ProofOfPossession( alg="RS256", nonce="xD\xf9\xb9\xdbU\xed\xaa\x17\xf1y|\x81\x88\x99 ", hints=challenges.ProofOfPossession.Hints( - jwk=jose.JWKRSA(key=KEY.publickey()), + jwk=jose.JWKRSA(key=KEY.public_key()), cert_fingerprints=( "93416768eb85e33adc4277f4c9acd63e7418fcfe", "16d95b7b63f1972b980b14c20291f3c0d1855d95", diff --git a/letsencrypt/tests/proof_of_possession_test.py b/letsencrypt/tests/proof_of_possession_test.py index 415e4caed..68f740ebd 100644 --- a/letsencrypt/tests/proof_of_possession_test.py +++ b/letsencrypt/tests/proof_of_possession_test.py @@ -1,9 +1,10 @@ """Tests for letsencrypt.proof_of_possession.""" -import Crypto.PublicKey.RSA import os import pkg_resources import unittest +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import serialization import mock from acme import challenges @@ -28,8 +29,10 @@ CERT3_PATH = pkg_resources.resource_filename( BASE_PACKAGE, os.path.join("testdata", "matching_cert.pem")) CERT3_KEY_PATH = pkg_resources.resource_filename( BASE_PACKAGE, os.path.join("testdata", "rsa512_key.pem")) -CERT3_KEY = Crypto.PublicKey.RSA.importKey(pkg_resources.resource_string( - BASE_PACKAGE, os.path.join('testdata', 'rsa512_key.pem'))).publickey() +with open(CERT3_KEY_PATH) as cert3_file: + CERT3_KEY = jose.ComparableRSAKey(serialization.load_pem_private_key( + cert3_file.read(), password=None, + backend=default_backend())).public_key() class ProofOfPossessionTest(unittest.TestCase): @@ -55,7 +58,7 @@ class ProofOfPossessionTest(unittest.TestCase): def test_perform_bad_challenge(self): hints = challenges.ProofOfPossession.Hints( - jwk=jose.jwk.JWKOct(key=CERT3_KEY), cert_fingerprints=(), + jwk=jose.jwk.JWKOct(key="foo"), cert_fingerprints=(), certs=(), serial_numbers=(), subject_key_identifiers=(), issuers=(), authorized_for=()) chall = challenges.ProofOfPossession( diff --git a/setup.py b/setup.py index ca2746113..8b58a3e80 100644 --- a/setup.py +++ b/setup.py @@ -35,9 +35,11 @@ changes = read_file(os.path.join(here, 'CHANGES.rst')) # maintainers. and will make the future migration a lot easier. acme_install_requires = [ 'argparse', + # load_pem_private/public_key (>=0.6) + # rsa_recover_prime_factors (>=0.8) + 'cryptography>=0.8', #'letsencrypt' # TODO: uses testdata vectors 'mock', - 'pycrypto', 'pyrfc3339', 'ndg-httpsclient', # urllib3 InsecurePlatformWarning (#304) 'pyasn1', # urllib3 InsecurePlatformWarning (#304) @@ -83,6 +85,7 @@ letsencrypt_nginx_install_requires = [ install_requires = [ 'argparse', + 'cryptography>=0.8', 'ConfigArgParse', 'configobj', 'mock',