acme: drop PyCrypto and use cryptography instead.

- Use cryptography in acme.jose.jwa/jwk.
- Change Crypto.Random to os.urandom,
  c.f. https://cryptography.io/en/latest/random-numbers/?highlight=urandom
This commit is contained in:
Jakub Warmuz 2015-07-05 13:11:33 +00:00
parent 4407210e01
commit e0293d81f3
No known key found for this signature in database
GPG key ID: 2A7BAD3A489B52EA
22 changed files with 370 additions and 165 deletions

View file

@ -3,8 +3,8 @@ import binascii
import functools
import hashlib
import logging
import os
import Crypto.Random
import requests
from acme import jose
@ -204,7 +204,7 @@ class DVSNIResponse(ChallengeResponse):
decoder=functools.partial(jose.decode_b64jose, size=S_SIZE))
def __init__(self, s=None, *args, **kwargs):
s = Crypto.Random.get_random_bytes(self.S_SIZE) if s is None else s
s = os.urandom(self.S_SIZE) if s is None else s
super(DVSNIResponse, self).__init__(s=s, *args, **kwargs)
def z(self, chall): # pylint: disable=invalid-name

View file

@ -3,7 +3,8 @@ import os
import pkg_resources
import unittest
import Crypto.PublicKey.RSA
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import M2Crypto
import mock
import requests
@ -16,9 +17,10 @@ from acme import other
CERT = jose.ComparableX509(M2Crypto.X509.load_cert(
pkg_resources.resource_filename(
'letsencrypt.tests', os.path.join('testdata', 'cert.pem'))))
KEY = jose.HashableRSAKey(Crypto.PublicKey.RSA.importKey(
KEY = jose.ComparableRSAKey(serialization.load_pem_private_key(
pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'rsa512_key.pem'))))
'acme.jose', os.path.join('testdata', 'rsa512_key.pem')),
password=None, backend=default_backend()))
class ChallengeResponseTest(unittest.TestCase):
@ -345,7 +347,7 @@ class RecoveryTokenResponseTest(unittest.TestCase):
class ProofOfPossessionHintsTest(unittest.TestCase):
def setUp(self):
jwk = jose.JWKRSA(key=KEY.publickey())
jwk = jose.JWKRSA(key=KEY.public_key())
issuers = (
'C=US, O=SuperT LLC, CN=SuperTrustworthy Public CA',
'O=LessTrustworthy CA Inc, CN=LessTrustworthy But StillSecure',
@ -413,7 +415,7 @@ class ProofOfPossessionTest(unittest.TestCase):
def setUp(self):
from acme.challenges import ProofOfPossession
hints = ProofOfPossession.Hints(
jwk=jose.JWKRSA(key=KEY.publickey()), cert_fingerprints=(),
jwk=jose.JWKRSA(key=KEY.public_key()), cert_fingerprints=(),
certs=(), serial_numbers=(), subject_key_identifiers=(),
issuers=(), authorized_for=())
self.msg = ProofOfPossession(
@ -453,7 +455,7 @@ class ProofOfPossessionResponseTest(unittest.TestCase):
# nonce and challenge nonce are the same, don't make the same
# mistake here...
signature = other.Signature(
alg=jose.RS256, jwk=jose.JWKRSA(key=KEY.publickey()),
alg=jose.RS256, jwk=jose.JWKRSA(key=KEY.public_key()),
sig='\xa7\xc1\xe7\xe82o\xbc\xcd\xd0\x1e\x010#Z|\xaf\x15\x83'
'\x94\x8f#\x9b\nQo(\x80\x15,\x08\xfcz\x1d\xfd\xfd.\xaap'
'\xfa\x06\xd1\xa2f\x8d8X2>%d\xbd%\xe1T\xdd\xaa0\x18\xde'

View file

@ -83,7 +83,8 @@ class Client(object): # pylint: disable=too-many-instance-attributes
assert response.status_code == httplib.CREATED # TODO: handle errors
regr = self._regr_from_response(response)
if regr.body.key != self.key.public() or regr.body.contact != contact:
if (regr.body.key != self.key.public_key()
or regr.body.contact != contact):
raise errors.UnexpectedUpdate(regr)
return regr

View file

@ -44,7 +44,7 @@ class ClientTest(unittest.TestCase):
# Registration
self.contact = ('mailto:cert-admin@example.com', 'tel:+12025551212')
reg = messages.Registration(
contact=self.contact, key=KEY.public(), recovery_token='t')
contact=self.contact, key=KEY.public_key(), recovery_token='t')
self.regr = messages.RegistrationResource(
body=reg, uri='https://www.letsencrypt-demo.org/acme/reg/1',
new_authzr_uri='https://www.letsencrypt-demo.org/acme/new-reg',
@ -84,7 +84,7 @@ class ClientTest(unittest.TestCase):
# TODO: test POST call arguments
# TODO: split here and separate test
reg_wrong_key = self.regr.body.update(key=KEY2.public())
reg_wrong_key = self.regr.body.update(key=KEY2.public_key())
self.response.json.return_value = reg_wrong_key.to_json()
self.assertRaises(
errors.UnexpectedUpdate, self.client.register, self.contact)

View file

@ -74,6 +74,6 @@ from acme.jose.jws import (
from acme.jose.util import (
ComparableX509,
HashableRSAKey,
ComparableRSAKey,
ImmutableMap,
)

View file

@ -4,20 +4,22 @@ https://tools.ietf.org/html/draft-ietf-jose-json-web-algorithms-40
"""
import abc
import logging
from Crypto.Hash import HMAC
from Crypto.Hash import SHA256
from Crypto.Hash import SHA384
from Crypto.Hash import SHA512
from Crypto.Signature import PKCS1_PSS
from Crypto.Signature import PKCS1_v1_5
import cryptography.exceptions
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import hmac
from cryptography.hazmat.primitives.asymmetric import padding
from acme.jose import errors
from acme.jose import interfaces
from acme.jose import jwk
logger = logging.getLogger(__name__)
class JWA(interfaces.JSONDeSerializable): # pylint: disable=abstract-method
# pylint: disable=too-few-public-methods
# for some reason disable=abstract-method has to be on the line
@ -66,43 +68,79 @@ class _JWAHS(JWASignature):
kty = jwk.JWKOct
def __init__(self, name, digestmod):
def __init__(self, name, hash_):
super(_JWAHS, self).__init__(name)
self.digestmod = digestmod
self.hash = hash_()
def sign(self, key, msg):
return HMAC.new(key, msg, self.digestmod).digest()
signer = hmac.HMAC(key, self.hash, backend=default_backend())
signer.update(msg)
return signer.finalize()
def verify(self, key, msg, sig):
"""Verify the signature.
.. warning::
Does not protect against timing attack (no constant compare).
"""
return self.sign(key, msg) == sig
verifier = hmac.HMAC(key, self.hash, backend=default_backend())
verifier.update(msg)
try:
verifier.verify(sig)
except cryptography.exceptions.InvalidSignature as error:
logger.debug(error, exc_info=True)
return False
else:
return True
class _JWARS(JWASignature):
class _JWARSA(object):
kty = jwk.JWKRSA
def __init__(self, name, padding, digestmod):
super(_JWARS, self).__init__(name)
self.padding = padding
self.digestmod = digestmod
padding = NotImplemented
hash = NotImplemented
def sign(self, key, msg):
"""Sign the ``msg`` using ``key``."""
try:
return self.padding.new(key).sign(self.digestmod.new(msg))
except TypeError:
raise errors.Error('Key has no private part necessary for signing')
except (AttributeError, ValueError):
# ValueError for PS, AttributeError for RS
raise errors.Error('Key too small ({0})'.format(key.size()))
signer = key.signer(self.padding, self.hash)
except AttributeError as error:
logger.debug(error, exc_info=True)
raise errors.Error("Public key cannot be used for signing")
except ValueError as error: # digest too large
logger.debug(error, exc_info=True)
raise errors.Error(str(error))
signer.update(msg)
try:
return signer.finalize()
except ValueError as error:
logger.debug(error, exc_info=True)
raise errors.Error(str(error))
def verify(self, key, msg, sig):
return self.padding.new(key).verify(self.digestmod.new(msg), sig)
"""Verify the ``msg` and ``sig`` using ``key``."""
verifier = key.verifier(sig, self.padding, self.hash)
verifier.update(msg)
try:
verifier.verify()
except cryptography.exceptions.InvalidSignature as error:
logger.debug(error, exc_info=True)
return False
else:
return True
class _JWARS(_JWARSA, JWASignature):
def __init__(self, name, hash_):
super(_JWARS, self).__init__(name)
self.padding = padding.PKCS1v15()
self.hash = hash_()
class _JWAPS(_JWARSA, JWASignature):
def __init__(self, name, hash_):
super(_JWAPS, self).__init__(name)
self.padding = padding.PSS(
mgf=padding.MGF1(hash_()),
salt_length=padding.PSS.MAX_LENGTH)
self.hash = hash_()
class _JWAES(JWASignature): # pylint: disable=abstract-class-not-used
@ -116,17 +154,17 @@ class _JWAES(JWASignature): # pylint: disable=abstract-class-not-used
raise NotImplementedError()
HS256 = JWASignature.register(_JWAHS('HS256', SHA256))
HS384 = JWASignature.register(_JWAHS('HS384', SHA384))
HS512 = JWASignature.register(_JWAHS('HS512', SHA512))
HS256 = JWASignature.register(_JWAHS('HS256', hashes.SHA256))
HS384 = JWASignature.register(_JWAHS('HS384', hashes.SHA384))
HS512 = JWASignature.register(_JWAHS('HS512', hashes.SHA512))
RS256 = JWASignature.register(_JWARS('RS256', PKCS1_v1_5, SHA256))
RS384 = JWASignature.register(_JWARS('RS384', PKCS1_v1_5, SHA384))
RS512 = JWASignature.register(_JWARS('RS512', PKCS1_v1_5, SHA512))
RS256 = JWASignature.register(_JWARS('RS256', hashes.SHA256))
RS384 = JWASignature.register(_JWARS('RS384', hashes.SHA384))
RS512 = JWASignature.register(_JWARS('RS512', hashes.SHA512))
PS256 = JWASignature.register(_JWARS('PS256', PKCS1_PSS, SHA256))
PS384 = JWASignature.register(_JWARS('PS384', PKCS1_PSS, SHA384))
PS512 = JWASignature.register(_JWARS('PS512', PKCS1_PSS, SHA512))
PS256 = JWASignature.register(_JWAPS('PS256', hashes.SHA256))
PS384 = JWASignature.register(_JWAPS('PS384', hashes.SHA384))
PS512 = JWASignature.register(_JWAPS('PS512', hashes.SHA512))
ES256 = JWASignature.register(_JWAES('ES256'))
ES256 = JWASignature.register(_JWAES('ES384'))

View file

@ -3,17 +3,24 @@ import os
import pkg_resources
import unittest
from Crypto.PublicKey import RSA
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from acme.jose import errors
RSA256_KEY = RSA.importKey(pkg_resources.resource_string(
__name__, os.path.join('testdata', 'rsa256_key.pem')))
RSA512_KEY = RSA.importKey(pkg_resources.resource_string(
__name__, os.path.join('testdata', 'rsa512_key.pem')))
RSA1024_KEY = RSA.importKey(pkg_resources.resource_string(
__name__, os.path.join('testdata', 'rsa1024_key.pem')))
RSA256_KEY = serialization.load_pem_private_key(
pkg_resources.resource_string(
__name__, os.path.join('testdata', 'rsa256_key.pem')),
password=None, backend=default_backend())
RSA512_KEY = serialization.load_pem_private_key(
pkg_resources.resource_string(
__name__, os.path.join('testdata', 'rsa512_key.pem')),
password=None, backend=default_backend())
RSA1024_KEY = serialization.load_pem_private_key(
pkg_resources.resource_string(
__name__, os.path.join('testdata', 'rsa1024_key.pem')),
password=None, backend=default_backend())
class JWASignatureTest(unittest.TestCase):
@ -71,14 +78,13 @@ class JWARSTest(unittest.TestCase):
def test_sign_no_private_part(self):
from acme.jose.jwa import RS256
self.assertRaises(
errors.Error, RS256.sign, RSA512_KEY.publickey(), 'foo')
errors.Error, RS256.sign, RSA512_KEY.public_key(), 'foo')
def test_sign_key_too_small(self):
from acme.jose.jwa import RS256
from acme.jose.jwa import PS256
self.assertRaises(errors.Error, RS256.sign, RSA256_KEY, 'foo')
self.assertRaises(errors.Error, PS256.sign, RSA256_KEY, 'foo')
self.assertRaises(errors.Error, PS256.sign, RSA512_KEY, 'foo')
def test_rs(self):
from acme.jose.jwa import RS256
@ -89,16 +95,14 @@ class JWARSTest(unittest.TestCase):
'\xd2\xb9.>}\xfd'
)
self.assertEqual(RS256.sign(RSA512_KEY, 'foo'), sig)
# next tests guard that only True/False are return as oppossed
# to e.g. 1/0
self.assertTrue(RS256.verify(RSA512_KEY, 'foo', sig) is True)
self.assertFalse(RS256.verify(RSA512_KEY, 'foo', sig + '!') is False)
self.assertTrue(RS256.verify(RSA512_KEY.public_key(), 'foo', sig))
self.assertFalse(RS256.verify(RSA512_KEY.public_key(), 'foo', sig + '!'))
def test_ps(self):
from acme.jose.jwa import PS256
sig = PS256.sign(RSA1024_KEY, 'foo')
self.assertTrue(PS256.verify(RSA1024_KEY, 'foo', sig) is True)
self.assertTrue(PS256.verify(RSA1024_KEY, 'foo', sig + '!') is False)
self.assertTrue(PS256.verify(RSA1024_KEY.public_key(), 'foo', sig))
self.assertFalse(PS256.verify(RSA1024_KEY.public_key(), 'foo', sig + '!'))
if __name__ == '__main__':

View file

@ -2,7 +2,9 @@
import abc
import binascii
import Crypto.PublicKey.RSA
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from acme.jose import b64
from acme.jose import errors
@ -22,14 +24,12 @@ class JWK(json_util.TypedJSONObjectWithFields):
raise NotImplementedError()
@abc.abstractmethod
def public(self): # pragma: no cover
def public_key(self): # pragma: no cover
"""Generate JWK with public key.
For symmetric cryptosystems, this would return ``self``.
"""
# TODO: rename publickey to stay consistent with
# HashableRSAKey.publickey
raise NotImplementedError()
@ -54,7 +54,7 @@ class JWKES(JWK): # pragma: no cover
def load(cls, string):
raise NotImplementedError()
def public(self):
def public_key(self):
raise NotImplementedError()
@ -79,7 +79,7 @@ class JWKOct(JWK):
def load(cls, string):
return cls(key=string)
def public(self):
def public_key(self):
return self
@ -87,7 +87,9 @@ class JWKOct(JWK):
class JWKRSA(JWK):
"""RSA JWK.
:ivar key: `Crypto.PublicKey.RSA` wrapped in `.HashableRSAKey`
:ivar key: `cryptography.hazmat.primitives.rsa.RSAPrivateKey`
or `cryptography.hazmat.primitives.rsa.RSAPublicKey` wrapped
in `.ComparableRSAKey`
"""
typ = 'RSA'
@ -120,21 +122,73 @@ class JWKRSA(JWK):
:rtype: :class:`JWKRSA`
"""
return cls(key=util.HashableRSAKey(
Crypto.PublicKey.RSA.importKey(string)))
try:
key = serialization.load_pem_public_key(
string, backend=default_backend())
except ValueError: # ValueError: Could not unserialize key data.
key = serialization.load_pem_private_key(
string, password=None, backend=default_backend())
return cls(key=util.ComparableRSAKey(key))
def public(self):
return type(self)(key=self.key.publickey())
def public_key(self):
return type(self)(key=self.key.public_key())
@classmethod
def fields_from_json(cls, jobj):
return cls(key=util.HashableRSAKey(
Crypto.PublicKey.RSA.construct(
(cls._decode_param(jobj['n']),
cls._decode_param(jobj['e'])))))
# pylint: disable=invalid-name
n, e = (cls._decode_param(jobj[x]) for x in ('n', 'e'))
public_numbers = rsa.RSAPublicNumbers(e=e, n=n)
if 'd' not in jobj: # public key
key = public_numbers.public_key(default_backend())
else: # private key
d = cls._decode_param(jobj['d'])
if ('p' in jobj or 'q' in jobj or 'dp' in jobj or
'dq' in jobj or 'qi' in jobj or 'oth' in jobj):
# "If the producer includes any of the other private
# key parameters, then all of the others MUST be
# present, with the exception of "oth", which MUST
# only be present when more than two prime factors
# were used."
p, q, dp, dq, qi, = all_params = tuple(
jobj.get(x) for x in ('p', 'q', 'dp', 'dq', 'qi'))
if tuple(param for param in all_params if param is None):
raise errors.Error(
"Some private parameters are missing: {0}".format(
all_params))
p, q, dp, dq, qi = tuple(cls._decode_param(x) for x in all_params)
# TODO: check for oth
else:
p, q = rsa.rsa_recover_prime_factors(n, e, d) # cryptography>=0.8
dp = rsa.rsa_crt_dmp1(d, p)
dq = rsa.rsa_crt_dmq1(d, q)
qi = rsa.rsa_crt_iqmp(p, q)
key = rsa.RSAPrivateNumbers(
p, q, d, dp, dq, qi, public_numbers).private_key(default_backend())
return cls(key=util.ComparableRSAKey(key))
def fields_to_partial_json(self):
return {
'n': self._encode_param(self.key.n),
'e': self._encode_param(self.key.e),
}
# pylint: disable=protected-access
if isinstance(self.key._wrapped, rsa.RSAPublicKey):
numbers = self.key.public_numbers()
params = {
'n': numbers.n,
'e': numbers.e,
}
else: # rsa.RSAPrivateKey
private = self.key.private_numbers()
public = self.key.public_key().public_numbers()
params = {
'n': public.n,
'e': public.e,
'd': private.d,
'p': private.p,
'q': private.q,
'dp': private.dmp1,
'dq': private.dmq1,
'qi': private.iqmp,
}
return dict((key, self._encode_param(value))
for key, value in params.iteritems())

View file

@ -3,16 +3,21 @@ import os
import pkg_resources
import unittest
from Crypto.PublicKey import RSA
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from acme.jose import errors
from acme.jose import util
RSA256_KEY = util.HashableRSAKey(RSA.importKey(pkg_resources.resource_string(
__name__, os.path.join('testdata', 'rsa256_key.pem'))))
RSA512_KEY = util.HashableRSAKey(RSA.importKey(pkg_resources.resource_string(
__name__, os.path.join('testdata', 'rsa512_key.pem'))))
RSA256_KEY = util.ComparableRSAKey(serialization.load_pem_private_key(
pkg_resources.resource_string(
__name__, os.path.join('testdata', 'rsa256_key.pem')),
password=None, backend=default_backend()))
RSA512_KEY = util.ComparableRSAKey(serialization.load_pem_private_key(
pkg_resources.resource_string(
__name__, os.path.join('testdata', 'rsa512_key.pem')),
password=None, backend=default_backend()))
class JWKOctTest(unittest.TestCase):
@ -38,8 +43,8 @@ class JWKOctTest(unittest.TestCase):
from acme.jose.jwk import JWKOct
self.assertEqual(self.jwk, JWKOct.load('foo'))
def test_public(self):
self.assertTrue(self.jwk.public() is self.jwk)
def test_public_key(self):
self.assertTrue(self.jwk.public_key() is self.jwk)
class JWKRSATest(unittest.TestCase):
@ -47,20 +52,32 @@ class JWKRSATest(unittest.TestCase):
def setUp(self):
from acme.jose.jwk import JWKRSA
self.jwk256 = JWKRSA(key=RSA256_KEY.publickey())
self.jwk256_private = JWKRSA(key=RSA256_KEY)
self.jwk256 = JWKRSA(key=RSA256_KEY.public_key())
self.jwk256json = {
'kty': 'RSA',
'e': 'AQAB',
'n': 'm2Fylv-Uz7trgTW8EBHP3FQSMeZs2GNQ6VRo1sIVJEk',
}
self.jwk512 = JWKRSA(key=RSA512_KEY.publickey())
self.jwk512 = JWKRSA(key=RSA512_KEY.public_key())
self.jwk512json = {
'kty': 'RSA',
'e': 'AQAB',
'n': 'rHVztFHtH92ucFJD_N_HW9AsdRsUuHUBBBDlHwNlRd3fp5'
'80rv2-6QWE30cWgdmJS86ObRz6lUTor4R0T-3C5Q',
}
self.private = JWKRSA(key=RSA256_KEY)
self.private_json_small = self.jwk256json.copy()
self.private_json_small['d'] = (
'lPQED_EPTV0UIBfNI3KP2d9Jlrc2mrMllmf946bu-CE')
self.private_json = self.jwk256json.copy()
self.private_json.update({
'd': 'lPQED_EPTV0UIBfNI3KP2d9Jlrc2mrMllmf946bu-CE',
'p': 'zUVNZn4lLLBD1R6NE8TKNQ',
'q': 'wcfKfc7kl5jfqXArCRSURQ',
'dp': 'CWJFq43QvT5Bm5iN8n1okQ',
'dq': 'bHh2u7etM8LKKCF2pY2UdQ',
'qi': 'oi45cEkbVoJjAbnQpFY87Q',
})
def test_equals(self):
self.assertEqual(self.jwk256, self.jwk256)
@ -73,22 +90,34 @@ class JWKRSATest(unittest.TestCase):
def test_load(self):
from acme.jose.jwk import JWKRSA
self.assertEqual(
JWKRSA(key=util.HashableRSAKey(RSA256_KEY)), JWKRSA.load(
JWKRSA(key=RSA256_KEY), JWKRSA.load(
pkg_resources.resource_string(
__name__, os.path.join('testdata', 'rsa256_key.pem'))))
def test_public(self):
self.assertEqual(self.jwk256, self.jwk256_private.public())
def test_public_key(self):
self.assertEqual(self.jwk256, self.private.public_key())
def test_to_partial_json(self):
self.assertEqual(self.jwk256.to_partial_json(), self.jwk256json)
self.assertEqual(self.jwk512.to_partial_json(), self.jwk512json)
self.assertEqual(self.private.to_partial_json(), self.private_json)
def test_from_json(self):
from acme.jose.jwk import JWK
self.assertEqual(self.jwk256, JWK.from_json(self.jwk256json))
# TODO: fix schemata to allow RSA512
#self.assertEqual(self.jwk512, JWK.from_json(self.jwk512json))
self.assertEqual(
self.jwk256, JWK.from_json(self.jwk256json))
self.assertEqual(
self.jwk512, JWK.from_json(self.jwk512json))
self.assertEqual(self.private, JWK.from_json(self.private_json))
def test_from_json_private_small(self):
from acme.jose.jwk import JWK
self.assertEqual(self.private, JWK.from_json(self.private_json_small))
def test_from_json_missing_one_additional(self):
from acme.jose.jwk import JWK
del self.private_json['q']
self.assertRaises(errors.Error, JWK.from_json, self.private_json)
def test_from_json_hashable(self):
from acme.jose.jwk import JWK

View file

@ -203,7 +203,7 @@ class Signature(json_util.JSONObjectWithFields):
header_params = kwargs
header_params['alg'] = alg
if include_jwk:
header_params['jwk'] = key.public()
header_params['jwk'] = key.public_key()
assert set(header_params).issubset(cls.header_cls._fields)
assert protect.issubset(cls.header_cls._fields)
@ -354,12 +354,12 @@ class CLI(object):
if args.key is not None:
assert args.kty is not None
key = args.kty.load(args.key.read())
key = args.kty.load(args.key.read()).public_key()
else:
key = None
sys.stdout.write(sig.payload)
return int(not sig.verify(key=key))
return not sig.verify(key=key)
@classmethod
def _alg_type(cls, arg):

View file

@ -4,7 +4,8 @@ import os
import pkg_resources
import unittest
import Crypto.PublicKey.RSA
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import M2Crypto
import mock
@ -18,8 +19,10 @@ from acme.jose import util
CERT = util.ComparableX509(M2Crypto.X509.load_cert(
pkg_resources.resource_filename(
'letsencrypt.tests', 'testdata/cert.pem')))
RSA512_KEY = Crypto.PublicKey.RSA.importKey(pkg_resources.resource_string(
__name__, os.path.join('testdata', 'rsa512_key.pem')))
RSA512_KEY = util.ComparableRSAKey(serialization.load_pem_private_key(
pkg_resources.resource_string(
__name__, os.path.join('testdata', 'rsa512_key.pem')),
password=None, backend=default_backend()))
class MediaTypeTest(unittest.TestCase):
@ -107,7 +110,7 @@ class JWSTest(unittest.TestCase):
def setUp(self):
self.privkey = jwk.JWKRSA(key=RSA512_KEY)
self.pubkey = self.privkey.public()
self.pubkey = self.privkey.public_key()
from acme.jose.jws import JWS
self.unprotected = JWS.sign(

View file

@ -1,6 +1,8 @@
"""JOSE utilities."""
import collections
from cryptography.hazmat.primitives.asymmetric import rsa
class abstractclassmethod(classmethod):
# pylint: disable=invalid-name,too-few-public-methods
@ -41,9 +43,14 @@ class ComparableX509(object): # pylint: disable=too-few-public-methods
return self.as_der() == other.as_der()
class HashableRSAKey(object): # pylint: disable=too-few-public-methods
"""Wrapper for `Crypto.PublicKey.RSA` objects that supports hashing."""
class ComparableRSAKey(object): # pylint: disable=too-few-public-methods
"""Wrapper for `cryptography` RSA keys.
Wraps around:
- `cryptography.hazmat.primitives.assymetric.RSAPrivateKey`
- `cryptography.hazmat.primitives.assymetric.RSAPublicKey`
"""
def __init__(self, wrapped):
self._wrapped = wrapped
@ -51,14 +58,36 @@ class HashableRSAKey(object): # pylint: disable=too-few-public-methods
return getattr(self._wrapped, name)
def __eq__(self, other):
return self._wrapped == other
# pylint: disable=protected-access
if (not isinstance(other, self.__class__) or
self._wrapped.__class__ is not other._wrapped.__class__):
return False
# RSA*KeyWithSerialization requires cryptography>=0.8
if isinstance(self._wrapped, rsa.RSAPrivateKeyWithSerialization):
return self.private_numbers() == other.private_numbers()
elif isinstance(self._wrapped, rsa.RSAPublicKeyWithSerialization):
return self.public_numbers() == other.public_numbers()
else:
return False # we shouldn't reach here...
def __hash__(self):
return hash((type(self), self.exportKey(format='DER')))
# public_numbers() hasn't got stable hash!
if isinstance(self._wrapped, rsa.RSAPrivateKeyWithSerialization):
priv = self.private_numbers()
pub = priv.public_numbers
return hash((type(self), priv.p, priv.q, priv.dmp1,
priv.dmq1, priv.iqmp, pub.n, pub.e))
elif isinstance(self._wrapped, rsa.RSAPublicKeyWithSerialization):
pub = self.public_numbers()
return hash((type(self), pub.n, pub.e))
def publickey(self):
def __repr__(self):
return '<{0}({1!r})>'.format(self.__class__.__name__, self._wrapped)
def public_key(self):
"""Get wrapped public key."""
return type(self)(self._wrapped.publickey())
return type(self)(self._wrapped.public_key())
class ImmutableMap(collections.Mapping, collections.Hashable):

View file

@ -4,32 +4,52 @@ import os
import pkg_resources
import unittest
import Crypto.PublicKey.RSA
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
class HashableRSAKeyTest(unittest.TestCase):
"""Tests for acme.jose.util.HashableRSAKey."""
class ComparableRSAKeyTest(unittest.TestCase):
"""Tests for acme.jose.util.ComparableRSAKey."""
def setUp(self):
from acme.jose.util import HashableRSAKey
self.key = HashableRSAKey(Crypto.PublicKey.RSA.importKey(
pkg_resources.resource_string(
__name__, os.path.join('testdata', 'rsa256_key.pem'))))
self.key_same = HashableRSAKey(Crypto.PublicKey.RSA.importKey(
pkg_resources.resource_string(
__name__, os.path.join('testdata', 'rsa256_key.pem'))))
from acme.jose.util import ComparableRSAKey
backend = default_backend()
def load_key(): # pylint: disable=missing-docstring
return ComparableRSAKey(serialization.load_pem_private_key(
pkg_resources.resource_string(
__name__, os.path.join('testdata', 'rsa256_key.pem')),
password=None, backend=backend))
self.key = load_key()
self.key_same = load_key()
def test_getattr_proxy(self):
self.assertEqual(256, self.key.key_size)
def test_eq(self):
# if __eq__ is not defined, then two HashableRSAKeys with same
# _wrapped do not equate
self.assertEqual(self.key, self.key_same)
def test_not_eq_different_types(self):
self.assertFalse(self.key.__eq__(5))
def test_not_eq_not_wrapped(self):
# pylint: disable=protected-access
self.assertFalse(self.key.__eq__(self.key_same._wrapped))
def test_not_eq_no_serialization(self):
from acme.jose.util import ComparableRSAKey
self.assertFalse(ComparableRSAKey(5).__eq__(ComparableRSAKey(5)))
def test_hash(self):
self.assertTrue(isinstance(hash(self.key), int))
self.assertEqual(hash(self.key), hash(self.key_same))
def test_publickey(self):
from acme.jose.util import HashableRSAKey
self.assertTrue(isinstance(self.key.publickey(), HashableRSAKey))
def test_repr(self):
self.assertTrue(repr(self.key).startswith(
'<ComparableRSAKey(<cryptography.hazmat.'))
def test_public_key(self):
from acme.jose.util import ComparableRSAKey
self.assertTrue(isinstance(self.key.public_key(), ComparableRSAKey))
class ImmutableMapTest(unittest.TestCase):

View file

@ -3,14 +3,17 @@ import os
import pkg_resources
import unittest
import Crypto.PublicKey.RSA
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from acme import errors
from acme import jose
RSA512_KEY = Crypto.PublicKey.RSA.importKey(pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'rsa512_key.pem')))
RSA512_KEY = jose.ComparableRSAKey(serialization.load_pem_private_key(
pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'rsa512_key.pem')),
password=None, backend=default_backend()))
class HeaderTest(unittest.TestCase):
@ -44,7 +47,7 @@ class JWSTest(unittest.TestCase):
def setUp(self):
self.privkey = jose.JWKRSA(key=RSA512_KEY)
self.pubkey = self.privkey.public()
self.pubkey = self.privkey.public_key()
self.nonce = jose.b64encode('Nonce')
def test_it(self):

View file

@ -3,7 +3,8 @@ import os
import pkg_resources
import unittest
from Crypto.PublicKey import RSA
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import M2Crypto
import mock
@ -19,8 +20,10 @@ CSR = jose.ComparableX509(M2Crypto.X509.load_request_string(
pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'csr.der')),
M2Crypto.X509.FORMAT_DER))
KEY = jose.util.HashableRSAKey(RSA.importKey(pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'rsa512_key.pem'))))
KEY = jose.util.ComparableRSAKey(serialization.load_pem_private_key(
pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'rsa512_key.pem')),
password=None, backend=default_backend()))
CERT = jose.ComparableX509(M2Crypto.X509.load_cert(
format=M2Crypto.X509.FORMAT_DER, file=pkg_resources.resource_filename(
'acme.jose', os.path.join('testdata', 'cert.der'))))
@ -109,7 +112,7 @@ class RegistrationTest(unittest.TestCase):
"""Tests for acme.messages.Registration."""
def setUp(self):
key = jose.jwk.JWKRSA(key=KEY.publickey())
key = jose.jwk.JWKRSA(key=KEY.public_key())
contact = (
'mailto:admin@foo.com',
'tel:1234',

View file

@ -1,9 +1,7 @@
"""Other ACME objects."""
import functools
import logging
import Crypto.Random
import Crypto.PublicKey.RSA
import os
from acme import jose
@ -43,7 +41,8 @@ class Signature(jose.JSONObjectWithFields):
:param str msg: Message to be signed.
:param key: Key used for signing.
:type key: :class:`Crypto.PublicKey.RSA`
:type key: `cryptography.hazmat.primitives.assymetric.rsa.RSAPrivateKey`
wrapped in `.ComparableRSAKey`.
:param str nonce: Nonce to be used. If None, nonce of
``nonce_size`` will be randomly generated.
@ -52,15 +51,14 @@ class Signature(jose.JSONObjectWithFields):
"""
nonce_size = cls.NONCE_SIZE if nonce_size is None else nonce_size
if nonce is None:
nonce = Crypto.Random.get_random_bytes(nonce_size)
nonce = os.urandom(nonce_size) if nonce is None else nonce
msg_with_nonce = nonce + msg
sig = alg.sign(key, nonce + msg)
logger.debug('%s signed as %s', msg_with_nonce, sig)
return cls(alg=alg, sig=sig, nonce=nonce,
jwk=alg.kty(key=key.publickey()))
jwk=alg.kty(key=key.public_key()))
def verify(self, msg):
"""Verify the signature.

View file

@ -3,14 +3,16 @@ import os
import pkg_resources
import unittest
import Crypto.PublicKey.RSA
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from acme import jose
KEY = jose.HashableRSAKey(Crypto.PublicKey.RSA.importKey(
KEY = jose.ComparableRSAKey(serialization.load_pem_private_key(
pkg_resources.resource_string(
'acme.jose', os.path.join('testdata', 'rsa512_key.pem'))))
'acme.jose', os.path.join('testdata', 'rsa512_key.pem')),
password=None, backend=default_backend()))
class SignatureTest(unittest.TestCase):
@ -26,7 +28,7 @@ class SignatureTest(unittest.TestCase):
self.nonce = '\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9'
self.alg = jose.RS256
self.jwk = jose.JWKRSA(key=KEY.publickey())
self.jwk = jose.JWKRSA(key=KEY.public_key())
b64sig = ('SUPYKucUnhlTt8_sMxLiigOYdf_wlOLXPI-o7aRLTsOquVjDd6r'
'AX9AFJHk-bCMQPJbSzXKjG6H1IWbvxjS2Ew')

View file

@ -3,7 +3,8 @@ import logging
import os
import pkg_resources
import Crypto.PublicKey.RSA
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
import M2Crypto
from acme import client
@ -18,8 +19,11 @@ NEW_REG_URL = 'https://www.letsencrypt-demo.org/acme/new-reg'
BITS = 2048 # minimum for Boulder
DOMAIN = 'example1.com' # example.com is ignored by Boulder
key = jose.JWKRSA.load(
Crypto.PublicKey.RSA.generate(BITS).exportKey(format="PEM"))
# generate_private_key requires cryptography>=0.5
key = jose.JWKRSA(key=jose.ComparableRSAKey(rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend())))
acme = client.Client(NEW_REG_URL, key)
regr = acme.register(contact=())

View file

@ -1,5 +1,6 @@
"""Proof of Possession Identifier Validation Challenge."""
import M2Crypto
import logging
import os
import zope.component
@ -11,6 +12,9 @@ from letsencrypt import interfaces
from letsencrypt.display import util as display_util
logger = logging.getLogger(__name__)
class ProofOfPossession(object): # pylint: disable=too-few-public-methods
"""Proof of Possession Identifier Validation Challenge.
@ -39,19 +43,22 @@ class ProofOfPossession(object): # pylint: disable=too-few-public-methods
return None
for cert, key, _ in self.installer.get_all_certs_keys():
der_cert_key = M2Crypto.X509.load_cert(cert).get_pubkey().as_der()
pkey = M2Crypto.X509.load_cert(cert).get_pubkey()
try:
cert_key = achall.alg.kty.load(der_cert_key)
# If JWKES.load raises other exceptions, they should be caught here
except (IndexError, ValueError, TypeError):
rsa_pkey = pkey.get_rsa()
except ValueError:
logger.warn("Only RSA supported at this time")
continue
pem_cert_key = rsa_pkey.as_pem()
cert_key = achall.alg.kty.load(pem_cert_key)
# TODO: If JWKES.load raises other exceptions, they should be caught here
if cert_key == achall.hints.jwk:
return self._gen_response(achall, key)
# Is there are different prompt we should give the user?
code, key = zope.component.getUtility(
interfaces.IDisplay).input(
"Path to private key for identifier: %s " % achall.domain)
"QPath to private key for identifier: %s " % achall.domain)
if code != display_util.CANCEL:
return self._gen_response(achall, key)

View file

@ -4,16 +4,18 @@ import itertools
import os
import pkg_resources
import Crypto.PublicKey.RSA
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from acme import challenges
from acme import jose
from acme import messages
KEY = jose.HashableRSAKey(Crypto.PublicKey.RSA.importKey(
KEY = jose.ComparableRSAKey(serialization.load_pem_private_key(
pkg_resources.resource_string(
"acme.jose", os.path.join("testdata", "rsa512_key.pem"))))
__name__, os.path.join('testdata', 'rsa512_key.pem')),
password=None, backend=default_backend()))
# Challenges
SIMPLE_HTTP = challenges.SimpleHTTP(
@ -30,7 +32,7 @@ RECOVERY_TOKEN = challenges.RecoveryToken()
POP = challenges.ProofOfPossession(
alg="RS256", nonce="xD\xf9\xb9\xdbU\xed\xaa\x17\xf1y|\x81\x88\x99 ",
hints=challenges.ProofOfPossession.Hints(
jwk=jose.JWKRSA(key=KEY.publickey()),
jwk=jose.JWKRSA(key=KEY.public_key()),
cert_fingerprints=(
"93416768eb85e33adc4277f4c9acd63e7418fcfe",
"16d95b7b63f1972b980b14c20291f3c0d1855d95",

View file

@ -1,9 +1,10 @@
"""Tests for letsencrypt.proof_of_possession."""
import Crypto.PublicKey.RSA
import os
import pkg_resources
import unittest
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import mock
from acme import challenges
@ -28,8 +29,10 @@ CERT3_PATH = pkg_resources.resource_filename(
BASE_PACKAGE, os.path.join("testdata", "matching_cert.pem"))
CERT3_KEY_PATH = pkg_resources.resource_filename(
BASE_PACKAGE, os.path.join("testdata", "rsa512_key.pem"))
CERT3_KEY = Crypto.PublicKey.RSA.importKey(pkg_resources.resource_string(
BASE_PACKAGE, os.path.join('testdata', 'rsa512_key.pem'))).publickey()
with open(CERT3_KEY_PATH) as cert3_file:
CERT3_KEY = jose.ComparableRSAKey(serialization.load_pem_private_key(
cert3_file.read(), password=None,
backend=default_backend())).public_key()
class ProofOfPossessionTest(unittest.TestCase):
@ -55,7 +58,7 @@ class ProofOfPossessionTest(unittest.TestCase):
def test_perform_bad_challenge(self):
hints = challenges.ProofOfPossession.Hints(
jwk=jose.jwk.JWKOct(key=CERT3_KEY), cert_fingerprints=(),
jwk=jose.jwk.JWKOct(key="foo"), cert_fingerprints=(),
certs=(), serial_numbers=(), subject_key_identifiers=(),
issuers=(), authorized_for=())
chall = challenges.ProofOfPossession(

View file

@ -35,9 +35,11 @@ changes = read_file(os.path.join(here, 'CHANGES.rst'))
# maintainers. and will make the future migration a lot easier.
acme_install_requires = [
'argparse',
# load_pem_private/public_key (>=0.6)
# rsa_recover_prime_factors (>=0.8)
'cryptography>=0.8',
#'letsencrypt' # TODO: uses testdata vectors
'mock',
'pycrypto',
'pyrfc3339',
'ndg-httpsclient', # urllib3 InsecurePlatformWarning (#304)
'pyasn1', # urllib3 InsecurePlatformWarning (#304)
@ -83,6 +85,7 @@ letsencrypt_nginx_install_requires = [
install_requires = [
'argparse',
'cryptography>=0.8',
'ConfigArgParse',
'configobj',
'mock',