mirror of
https://github.com/certbot/certbot.git
synced 2026-05-28 04:34:11 -04:00
Merge pull request #591 from kuba/cryptography
Drop M2Crypto and PyCrypto.
This commit is contained in:
commit
bb831206b5
44 changed files with 890 additions and 537 deletions
|
|
@ -3,8 +3,8 @@ import binascii
|
|||
import functools
|
||||
import hashlib
|
||||
import logging
|
||||
import os
|
||||
|
||||
import Crypto.Random
|
||||
import requests
|
||||
|
||||
from acme import jose
|
||||
|
|
@ -204,7 +204,7 @@ class DVSNIResponse(ChallengeResponse):
|
|||
decoder=functools.partial(jose.decode_b64jose, size=S_SIZE))
|
||||
|
||||
def __init__(self, s=None, *args, **kwargs):
|
||||
s = Crypto.Random.get_random_bytes(self.S_SIZE) if s is None else s
|
||||
s = os.urandom(self.S_SIZE) if s is None else s
|
||||
super(DVSNIResponse, self).__init__(s=s, *args, **kwargs)
|
||||
|
||||
def z(self, chall): # pylint: disable=invalid-name
|
||||
|
|
|
|||
|
|
@ -3,9 +3,10 @@ import os
|
|||
import pkg_resources
|
||||
import unittest
|
||||
|
||||
import Crypto.PublicKey.RSA
|
||||
import M2Crypto
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
import mock
|
||||
import OpenSSL
|
||||
import requests
|
||||
import urlparse
|
||||
|
||||
|
|
@ -13,12 +14,13 @@ from acme import jose
|
|||
from acme import other
|
||||
|
||||
|
||||
CERT = jose.ComparableX509(M2Crypto.X509.load_cert(
|
||||
pkg_resources.resource_filename(
|
||||
CERT = jose.ComparableX509(OpenSSL.crypto.load_certificate(
|
||||
OpenSSL.crypto.FILETYPE_PEM, pkg_resources.resource_string(
|
||||
'letsencrypt.tests', os.path.join('testdata', 'cert.pem'))))
|
||||
KEY = jose.HashableRSAKey(Crypto.PublicKey.RSA.importKey(
|
||||
KEY = serialization.load_pem_private_key(
|
||||
pkg_resources.resource_string(
|
||||
'acme.jose', os.path.join('testdata', 'rsa512_key.pem'))))
|
||||
'acme.jose', os.path.join('testdata', 'rsa512_key.pem')),
|
||||
password=None, backend=default_backend())
|
||||
|
||||
|
||||
class ChallengeResponseTest(unittest.TestCase):
|
||||
|
|
@ -345,7 +347,7 @@ class RecoveryTokenResponseTest(unittest.TestCase):
|
|||
class ProofOfPossessionHintsTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
jwk = jose.JWKRSA(key=KEY.publickey())
|
||||
jwk = jose.JWKRSA(key=KEY.public_key())
|
||||
issuers = (
|
||||
'C=US, O=SuperT LLC, CN=SuperTrustworthy Public CA',
|
||||
'O=LessTrustworthy CA Inc, CN=LessTrustworthy But StillSecure',
|
||||
|
|
@ -368,7 +370,8 @@ class ProofOfPossessionHintsTest(unittest.TestCase):
|
|||
self.jmsg_to = {
|
||||
'jwk': jwk,
|
||||
'certFingerprints': cert_fingerprints,
|
||||
'certs': (jose.b64encode(CERT.as_der()),),
|
||||
'certs': (jose.b64encode(OpenSSL.crypto.dump_certificate(
|
||||
OpenSSL.crypto.FILETYPE_ASN1, CERT)),),
|
||||
'subjectKeyIdentifiers': subject_key_identifiers,
|
||||
'serialNumbers': serial_numbers,
|
||||
'issuers': issuers,
|
||||
|
|
@ -413,7 +416,7 @@ class ProofOfPossessionTest(unittest.TestCase):
|
|||
def setUp(self):
|
||||
from acme.challenges import ProofOfPossession
|
||||
hints = ProofOfPossession.Hints(
|
||||
jwk=jose.JWKRSA(key=KEY.publickey()), cert_fingerprints=(),
|
||||
jwk=jose.JWKRSA(key=KEY.public_key()), cert_fingerprints=(),
|
||||
certs=(), serial_numbers=(), subject_key_identifiers=(),
|
||||
issuers=(), authorized_for=())
|
||||
self.msg = ProofOfPossession(
|
||||
|
|
@ -453,7 +456,7 @@ class ProofOfPossessionResponseTest(unittest.TestCase):
|
|||
# nonce and challenge nonce are the same, don't make the same
|
||||
# mistake here...
|
||||
signature = other.Signature(
|
||||
alg=jose.RS256, jwk=jose.JWKRSA(key=KEY.publickey()),
|
||||
alg=jose.RS256, jwk=jose.JWKRSA(key=KEY.public_key()),
|
||||
sig='\xa7\xc1\xe7\xe82o\xbc\xcd\xd0\x1e\x010#Z|\xaf\x15\x83'
|
||||
'\x94\x8f#\x9b\nQo(\x80\x15,\x08\xfcz\x1d\xfd\xfd.\xaap'
|
||||
'\xfa\x06\xd1\xa2f\x8d8X2>%d\xbd%\xe1T\xdd\xaa0\x18\xde'
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ import httplib
|
|||
import logging
|
||||
import time
|
||||
|
||||
import M2Crypto
|
||||
import OpenSSL
|
||||
import requests
|
||||
import werkzeug
|
||||
|
||||
|
|
@ -83,7 +83,8 @@ class Client(object): # pylint: disable=too-many-instance-attributes
|
|||
assert response.status_code == httplib.CREATED # TODO: handle errors
|
||||
|
||||
regr = self._regr_from_response(response)
|
||||
if regr.body.key != self.key.public() or regr.body.contact != contact:
|
||||
if (regr.body.key != self.key.public_key()
|
||||
or regr.body.contact != contact):
|
||||
raise errors.UnexpectedUpdate(regr)
|
||||
|
||||
return regr
|
||||
|
|
@ -255,7 +256,7 @@ class Client(object): # pylint: disable=too-many-instance-attributes
|
|||
"""Request issuance.
|
||||
|
||||
:param csr: CSR
|
||||
:type csr: `M2Crypto.X509.Request` wrapped in `.ComparableX509`
|
||||
:type csr: `OpenSSL.crypto.X509Req` wrapped in `.ComparableX509`
|
||||
|
||||
:param authzrs: `list` of `.AuthorizationResource`
|
||||
|
||||
|
|
@ -286,8 +287,8 @@ class Client(object): # pylint: disable=too-many-instance-attributes
|
|||
|
||||
return messages.CertificateResource(
|
||||
uri=uri, authzrs=authzrs, cert_chain_uri=cert_chain_uri,
|
||||
body=jose.ComparableX509(
|
||||
M2Crypto.X509.load_cert_der_string(response.content)))
|
||||
body=jose.ComparableX509(OpenSSL.crypto.load_certificate(
|
||||
OpenSSL.crypto.FILETYPE_ASN1, response.content)))
|
||||
|
||||
def poll_and_request_issuance(self, csr, authzrs, mintime=5):
|
||||
"""Poll and request issuance.
|
||||
|
|
@ -299,7 +300,7 @@ class Client(object): # pylint: disable=too-many-instance-attributes
|
|||
.. todo:: add `max_attempts` or `timeout`
|
||||
|
||||
:param csr: CSR.
|
||||
:type csr: `M2Crypto.X509.Request` wrapped in `.ComparableX509`
|
||||
:type csr: `OpenSSL.crypto.X509Req` wrapped in `.ComparableX509`
|
||||
|
||||
:param authzrs: `list` of `.AuthorizationResource`
|
||||
|
||||
|
|
@ -358,8 +359,8 @@ class Client(object): # pylint: disable=too-many-instance-attributes
|
|||
content_type = self.DER_CONTENT_TYPE # TODO: make it a param
|
||||
response = self.net.get(uri, headers={'Accept': content_type},
|
||||
content_type=content_type)
|
||||
return response, jose.ComparableX509(
|
||||
M2Crypto.X509.load_cert_der_string(response.content))
|
||||
return response, jose.ComparableX509(OpenSSL.crypto.load_certificate(
|
||||
OpenSSL.crypto.FILETYPE_ASN1, response.content))
|
||||
|
||||
def check_cert(self, certr):
|
||||
"""Check for new cert.
|
||||
|
|
@ -402,7 +403,7 @@ class Client(object): # pylint: disable=too-many-instance-attributes
|
|||
:type certr: `.CertificateResource`
|
||||
|
||||
:returns: Certificate chain, or `None` if no "up" Link was provided.
|
||||
:rtype: `M2Crypto.X509.X509` wrapped in `.ComparableX509`
|
||||
:rtype: `OpenSSL.crypto.X509` wrapped in `.ComparableX509`
|
||||
|
||||
"""
|
||||
if certr.cert_chain_uri is not None:
|
||||
|
|
@ -413,7 +414,7 @@ class Client(object): # pylint: disable=too-many-instance-attributes
|
|||
def revoke(self, cert):
|
||||
"""Revoke certificate.
|
||||
|
||||
:param .ComparableX509 cert: `M2Crypto.X509.X509` wrapped in
|
||||
:param .ComparableX509 cert: `OpenSSL.crypto.X509` wrapped in
|
||||
`.ComparableX509`
|
||||
|
||||
:raises .ClientError: If revocation is unsuccessful.
|
||||
|
|
|
|||
|
|
@ -16,6 +16,8 @@ from acme import messages
|
|||
from acme import messages_test
|
||||
|
||||
|
||||
CERT_DER = pkg_resources.resource_string(
|
||||
'acme.jose', os.path.join('testdata', 'cert.der'))
|
||||
KEY = jose.JWKRSA.load(pkg_resources.resource_string(
|
||||
'acme.jose', os.path.join('testdata', 'rsa512_key.pem')))
|
||||
KEY2 = jose.JWKRSA.load(pkg_resources.resource_string(
|
||||
|
|
@ -44,7 +46,7 @@ class ClientTest(unittest.TestCase):
|
|||
# Registration
|
||||
self.contact = ('mailto:cert-admin@example.com', 'tel:+12025551212')
|
||||
reg = messages.Registration(
|
||||
contact=self.contact, key=KEY.public(), recovery_token='t')
|
||||
contact=self.contact, key=KEY.public_key(), recovery_token='t')
|
||||
self.regr = messages.RegistrationResource(
|
||||
body=reg, uri='https://www.letsencrypt-demo.org/acme/reg/1',
|
||||
new_authzr_uri='https://www.letsencrypt-demo.org/acme/new-reg',
|
||||
|
|
@ -84,7 +86,7 @@ class ClientTest(unittest.TestCase):
|
|||
# TODO: test POST call arguments
|
||||
|
||||
# TODO: split here and separate test
|
||||
reg_wrong_key = self.regr.body.update(key=KEY2.public())
|
||||
reg_wrong_key = self.regr.body.update(key=KEY2.public_key())
|
||||
self.response.json.return_value = reg_wrong_key.to_json()
|
||||
self.assertRaises(
|
||||
errors.UnexpectedUpdate, self.client.register, self.contact)
|
||||
|
|
@ -204,7 +206,7 @@ class ClientTest(unittest.TestCase):
|
|||
errors.UnexpectedUpdate, self.client.poll, self.authzr)
|
||||
|
||||
def test_request_issuance(self):
|
||||
self.response.content = messages_test.CERT.as_der()
|
||||
self.response.content = CERT_DER
|
||||
self.response.headers['Location'] = self.certr.uri
|
||||
self.response.links['up'] = {'url': self.certr.cert_chain_uri}
|
||||
self.assertEqual(self.certr, self.client.request_issuance(
|
||||
|
|
@ -212,7 +214,7 @@ class ClientTest(unittest.TestCase):
|
|||
# TODO: check POST args
|
||||
|
||||
def test_request_issuance_missing_up(self):
|
||||
self.response.content = messages_test.CERT.as_der()
|
||||
self.response.content = CERT_DER
|
||||
self.response.headers['Location'] = self.certr.uri
|
||||
self.assertEqual(
|
||||
self.certr.update(cert_chain_uri=None),
|
||||
|
|
@ -306,7 +308,7 @@ class ClientTest(unittest.TestCase):
|
|||
|
||||
def test_check_cert(self):
|
||||
self.response.headers['Location'] = self.certr.uri
|
||||
self.response.content = messages_test.CERT.as_der()
|
||||
self.response.content = CERT_DER
|
||||
self.assertEqual(self.certr.update(body=messages_test.CERT),
|
||||
self.client.check_cert(self.certr))
|
||||
|
||||
|
|
@ -316,7 +318,7 @@ class ClientTest(unittest.TestCase):
|
|||
errors.UnexpectedUpdate, self.client.check_cert, self.certr)
|
||||
|
||||
def test_check_cert_missing_location(self):
|
||||
self.response.content = messages_test.CERT.as_der()
|
||||
self.response.content = CERT_DER
|
||||
self.assertRaises(
|
||||
errors.ClientError, self.client.check_cert, self.certr)
|
||||
|
||||
|
|
|
|||
|
|
@ -74,6 +74,6 @@ from acme.jose.jws import (
|
|||
|
||||
from acme.jose.util import (
|
||||
ComparableX509,
|
||||
HashableRSAKey,
|
||||
ComparableRSAKey,
|
||||
ImmutableMap,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ import abc
|
|||
import binascii
|
||||
import logging
|
||||
|
||||
import M2Crypto
|
||||
import OpenSSL
|
||||
|
||||
from acme.jose import b64
|
||||
from acme.jose import errors
|
||||
|
|
@ -321,26 +321,28 @@ def encode_cert(cert):
|
|||
:type cert: :class:`acme.jose.util.ComparableX509`
|
||||
|
||||
"""
|
||||
return b64.b64encode(cert.as_der())
|
||||
return b64.b64encode(OpenSSL.crypto.dump_certificate(
|
||||
OpenSSL.crypto.FILETYPE_ASN1, cert))
|
||||
|
||||
def decode_cert(b64der):
|
||||
"""Decode JOSE Base-64 DER-encoded certificate."""
|
||||
try:
|
||||
return util.ComparableX509(M2Crypto.X509.load_cert_der_string(
|
||||
decode_b64jose(b64der)))
|
||||
except M2Crypto.X509.X509Error as error:
|
||||
return util.ComparableX509(OpenSSL.crypto.load_certificate(
|
||||
OpenSSL.crypto.FILETYPE_ASN1, decode_b64jose(b64der)))
|
||||
except OpenSSL.crypto.Error as error:
|
||||
raise errors.DeserializationError(error)
|
||||
|
||||
def encode_csr(csr):
|
||||
"""Encode CSR as JOSE Base-64 DER."""
|
||||
return encode_cert(csr)
|
||||
return b64.b64encode(OpenSSL.crypto.dump_certificate_request(
|
||||
OpenSSL.crypto.FILETYPE_ASN1, csr))
|
||||
|
||||
def decode_csr(b64der):
|
||||
"""Decode JOSE Base-64 DER-encoded CSR."""
|
||||
try:
|
||||
return util.ComparableX509(M2Crypto.X509.load_request_der_string(
|
||||
decode_b64jose(b64der)))
|
||||
except M2Crypto.X509.X509Error as error:
|
||||
return util.ComparableX509(OpenSSL.crypto.load_certificate_request(
|
||||
OpenSSL.crypto.FILETYPE_ASN1, decode_b64jose(b64der)))
|
||||
except OpenSSL.crypto.Error as error:
|
||||
raise errors.DeserializationError(error)
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -4,18 +4,20 @@ import os
|
|||
import pkg_resources
|
||||
import unittest
|
||||
|
||||
import M2Crypto
|
||||
import mock
|
||||
import OpenSSL
|
||||
|
||||
from acme.jose import errors
|
||||
from acme.jose import interfaces
|
||||
from acme.jose import util
|
||||
|
||||
|
||||
CERT = M2Crypto.X509.load_cert(pkg_resources.resource_filename(
|
||||
'letsencrypt.tests', os.path.join('testdata', 'cert.pem')))
|
||||
CSR = M2Crypto.X509.load_request(pkg_resources.resource_filename(
|
||||
'letsencrypt.tests', os.path.join('testdata', 'csr.pem')))
|
||||
CERT = util.ComparableX509(OpenSSL.crypto.load_certificate(
|
||||
OpenSSL.crypto.FILETYPE_PEM, pkg_resources.resource_string(
|
||||
'letsencrypt.tests', os.path.join('testdata', 'cert.pem'))))
|
||||
CSR = util.ComparableX509(OpenSSL.crypto.load_certificate_request(
|
||||
OpenSSL.crypto.FILETYPE_PEM, pkg_resources.resource_string(
|
||||
'letsencrypt.tests', os.path.join('testdata', 'csr.pem'))))
|
||||
|
||||
|
||||
class FieldTest(unittest.TestCase):
|
||||
|
|
@ -280,7 +282,7 @@ class DeEncodersTest(unittest.TestCase):
|
|||
|
||||
def test_encode_csr(self):
|
||||
from acme.jose.json_util import encode_csr
|
||||
self.assertEqual(self.b64_cert, encode_csr(CERT))
|
||||
self.assertEqual(self.b64_csr, encode_csr(CSR))
|
||||
|
||||
def test_decode_csr(self):
|
||||
from acme.jose.json_util import decode_csr
|
||||
|
|
|
|||
123
acme/jose/jwa.py
123
acme/jose/jwa.py
|
|
@ -4,20 +4,22 @@ https://tools.ietf.org/html/draft-ietf-jose-json-web-algorithms-40
|
|||
|
||||
"""
|
||||
import abc
|
||||
import logging
|
||||
|
||||
from Crypto.Hash import HMAC
|
||||
from Crypto.Hash import SHA256
|
||||
from Crypto.Hash import SHA384
|
||||
from Crypto.Hash import SHA512
|
||||
|
||||
from Crypto.Signature import PKCS1_PSS
|
||||
from Crypto.Signature import PKCS1_v1_5
|
||||
import cryptography.exceptions
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives import hmac
|
||||
from cryptography.hazmat.primitives.asymmetric import padding
|
||||
|
||||
from acme.jose import errors
|
||||
from acme.jose import interfaces
|
||||
from acme.jose import jwk
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class JWA(interfaces.JSONDeSerializable): # pylint: disable=abstract-method
|
||||
# pylint: disable=too-few-public-methods
|
||||
# for some reason disable=abstract-method has to be on the line
|
||||
|
|
@ -33,7 +35,12 @@ class JWASignature(JWA):
|
|||
self.name = name
|
||||
|
||||
def __eq__(self, other):
|
||||
return isinstance(other, JWASignature) and self.name == other.name
|
||||
if not isinstance(other, JWASignature):
|
||||
return NotImplemented
|
||||
return self.name == other.name
|
||||
|
||||
def __ne__(self, other):
|
||||
return not self == other
|
||||
|
||||
@classmethod
|
||||
def register(cls, signature_cls):
|
||||
|
|
@ -66,43 +73,79 @@ class _JWAHS(JWASignature):
|
|||
|
||||
kty = jwk.JWKOct
|
||||
|
||||
def __init__(self, name, digestmod):
|
||||
def __init__(self, name, hash_):
|
||||
super(_JWAHS, self).__init__(name)
|
||||
self.digestmod = digestmod
|
||||
self.hash = hash_()
|
||||
|
||||
def sign(self, key, msg):
|
||||
return HMAC.new(key, msg, self.digestmod).digest()
|
||||
signer = hmac.HMAC(key, self.hash, backend=default_backend())
|
||||
signer.update(msg)
|
||||
return signer.finalize()
|
||||
|
||||
def verify(self, key, msg, sig):
|
||||
"""Verify the signature.
|
||||
|
||||
.. warning::
|
||||
Does not protect against timing attack (no constant compare).
|
||||
|
||||
"""
|
||||
return self.sign(key, msg) == sig
|
||||
verifier = hmac.HMAC(key, self.hash, backend=default_backend())
|
||||
verifier.update(msg)
|
||||
try:
|
||||
verifier.verify(sig)
|
||||
except cryptography.exceptions.InvalidSignature as error:
|
||||
logger.debug(error, exc_info=True)
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
|
||||
class _JWARS(JWASignature):
|
||||
class _JWARSA(object):
|
||||
|
||||
kty = jwk.JWKRSA
|
||||
|
||||
def __init__(self, name, padding, digestmod):
|
||||
super(_JWARS, self).__init__(name)
|
||||
self.padding = padding
|
||||
self.digestmod = digestmod
|
||||
padding = NotImplemented
|
||||
hash = NotImplemented
|
||||
|
||||
def sign(self, key, msg):
|
||||
"""Sign the ``msg`` using ``key``."""
|
||||
try:
|
||||
return self.padding.new(key).sign(self.digestmod.new(msg))
|
||||
except TypeError:
|
||||
raise errors.Error('Key has no private part necessary for signing')
|
||||
except (AttributeError, ValueError):
|
||||
# ValueError for PS, AttributeError for RS
|
||||
raise errors.Error('Key too small ({0})'.format(key.size()))
|
||||
signer = key.signer(self.padding, self.hash)
|
||||
except AttributeError as error:
|
||||
logger.debug(error, exc_info=True)
|
||||
raise errors.Error("Public key cannot be used for signing")
|
||||
except ValueError as error: # digest too large
|
||||
logger.debug(error, exc_info=True)
|
||||
raise errors.Error(str(error))
|
||||
signer.update(msg)
|
||||
try:
|
||||
return signer.finalize()
|
||||
except ValueError as error:
|
||||
logger.debug(error, exc_info=True)
|
||||
raise errors.Error(str(error))
|
||||
|
||||
def verify(self, key, msg, sig):
|
||||
return self.padding.new(key).verify(self.digestmod.new(msg), sig)
|
||||
"""Verify the ``msg` and ``sig`` using ``key``."""
|
||||
verifier = key.verifier(sig, self.padding, self.hash)
|
||||
verifier.update(msg)
|
||||
try:
|
||||
verifier.verify()
|
||||
except cryptography.exceptions.InvalidSignature as error:
|
||||
logger.debug(error, exc_info=True)
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
|
||||
class _JWARS(_JWARSA, JWASignature):
|
||||
|
||||
def __init__(self, name, hash_):
|
||||
super(_JWARS, self).__init__(name)
|
||||
self.padding = padding.PKCS1v15()
|
||||
self.hash = hash_()
|
||||
|
||||
|
||||
class _JWAPS(_JWARSA, JWASignature):
|
||||
|
||||
def __init__(self, name, hash_):
|
||||
super(_JWAPS, self).__init__(name)
|
||||
self.padding = padding.PSS(
|
||||
mgf=padding.MGF1(hash_()),
|
||||
salt_length=padding.PSS.MAX_LENGTH)
|
||||
self.hash = hash_()
|
||||
|
||||
|
||||
class _JWAES(JWASignature): # pylint: disable=abstract-class-not-used
|
||||
|
|
@ -116,17 +159,17 @@ class _JWAES(JWASignature): # pylint: disable=abstract-class-not-used
|
|||
raise NotImplementedError()
|
||||
|
||||
|
||||
HS256 = JWASignature.register(_JWAHS('HS256', SHA256))
|
||||
HS384 = JWASignature.register(_JWAHS('HS384', SHA384))
|
||||
HS512 = JWASignature.register(_JWAHS('HS512', SHA512))
|
||||
HS256 = JWASignature.register(_JWAHS('HS256', hashes.SHA256))
|
||||
HS384 = JWASignature.register(_JWAHS('HS384', hashes.SHA384))
|
||||
HS512 = JWASignature.register(_JWAHS('HS512', hashes.SHA512))
|
||||
|
||||
RS256 = JWASignature.register(_JWARS('RS256', PKCS1_v1_5, SHA256))
|
||||
RS384 = JWASignature.register(_JWARS('RS384', PKCS1_v1_5, SHA384))
|
||||
RS512 = JWASignature.register(_JWARS('RS512', PKCS1_v1_5, SHA512))
|
||||
RS256 = JWASignature.register(_JWARS('RS256', hashes.SHA256))
|
||||
RS384 = JWASignature.register(_JWARS('RS384', hashes.SHA384))
|
||||
RS512 = JWASignature.register(_JWARS('RS512', hashes.SHA512))
|
||||
|
||||
PS256 = JWASignature.register(_JWARS('PS256', PKCS1_PSS, SHA256))
|
||||
PS384 = JWASignature.register(_JWARS('PS384', PKCS1_PSS, SHA384))
|
||||
PS512 = JWASignature.register(_JWARS('PS512', PKCS1_PSS, SHA512))
|
||||
PS256 = JWASignature.register(_JWAPS('PS256', hashes.SHA256))
|
||||
PS384 = JWASignature.register(_JWAPS('PS384', hashes.SHA384))
|
||||
PS512 = JWASignature.register(_JWAPS('PS512', hashes.SHA512))
|
||||
|
||||
ES256 = JWASignature.register(_JWAES('ES256'))
|
||||
ES256 = JWASignature.register(_JWAES('ES384'))
|
||||
|
|
|
|||
|
|
@ -3,17 +3,17 @@ import os
|
|||
import pkg_resources
|
||||
import unittest
|
||||
|
||||
from Crypto.PublicKey import RSA
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
|
||||
from acme.jose import errors
|
||||
from acme.jose import jwk_test
|
||||
|
||||
|
||||
RSA256_KEY = RSA.importKey(pkg_resources.resource_string(
|
||||
__name__, os.path.join('testdata', 'rsa256_key.pem')))
|
||||
RSA512_KEY = RSA.importKey(pkg_resources.resource_string(
|
||||
__name__, os.path.join('testdata', 'rsa512_key.pem')))
|
||||
RSA1024_KEY = RSA.importKey(pkg_resources.resource_string(
|
||||
__name__, os.path.join('testdata', 'rsa1024_key.pem')))
|
||||
RSA1024_KEY = serialization.load_pem_private_key(
|
||||
pkg_resources.resource_string(
|
||||
__name__, os.path.join('testdata', 'rsa1024_key.pem')),
|
||||
password=None, backend=default_backend())
|
||||
|
||||
|
||||
class JWASignatureTest(unittest.TestCase):
|
||||
|
|
@ -37,8 +37,13 @@ class JWASignatureTest(unittest.TestCase):
|
|||
|
||||
def test_eq(self):
|
||||
self.assertEqual(self.Sig1, self.Sig1)
|
||||
|
||||
def test_ne(self):
|
||||
self.assertNotEqual(self.Sig1, self.Sig2)
|
||||
|
||||
def test_ne_other_type(self):
|
||||
self.assertNotEqual(self.Sig1, 5)
|
||||
|
||||
def test_repr(self):
|
||||
self.assertEqual('Sig1', repr(self.Sig1))
|
||||
self.assertEqual('Sig2', repr(self.Sig2))
|
||||
|
|
@ -71,14 +76,13 @@ class JWARSTest(unittest.TestCase):
|
|||
def test_sign_no_private_part(self):
|
||||
from acme.jose.jwa import RS256
|
||||
self.assertRaises(
|
||||
errors.Error, RS256.sign, RSA512_KEY.publickey(), 'foo')
|
||||
errors.Error, RS256.sign, jwk_test.RSA512_KEY.public_key(), 'foo')
|
||||
|
||||
def test_sign_key_too_small(self):
|
||||
from acme.jose.jwa import RS256
|
||||
from acme.jose.jwa import PS256
|
||||
self.assertRaises(errors.Error, RS256.sign, RSA256_KEY, 'foo')
|
||||
self.assertRaises(errors.Error, PS256.sign, RSA256_KEY, 'foo')
|
||||
self.assertRaises(errors.Error, PS256.sign, RSA512_KEY, 'foo')
|
||||
self.assertRaises(errors.Error, RS256.sign, jwk_test.RSA256_KEY, 'foo')
|
||||
self.assertRaises(errors.Error, PS256.sign, jwk_test.RSA256_KEY, 'foo')
|
||||
|
||||
def test_rs(self):
|
||||
from acme.jose.jwa import RS256
|
||||
|
|
@ -88,17 +92,17 @@ class JWARSTest(unittest.TestCase):
|
|||
'\xa4\x99\x1e\x19&\xd8\xc7\x99S\x97\xfc\x85\x0cOV\xe6\x07\x99'
|
||||
'\xd2\xb9.>}\xfd'
|
||||
)
|
||||
self.assertEqual(RS256.sign(RSA512_KEY, 'foo'), sig)
|
||||
# next tests guard that only True/False are return as oppossed
|
||||
# to e.g. 1/0
|
||||
self.assertTrue(RS256.verify(RSA512_KEY, 'foo', sig) is True)
|
||||
self.assertFalse(RS256.verify(RSA512_KEY, 'foo', sig + '!') is False)
|
||||
self.assertEqual(RS256.sign(jwk_test.RSA512_KEY, 'foo'), sig)
|
||||
self.assertTrue(RS256.verify(
|
||||
jwk_test.RSA512_KEY.public_key(), 'foo', sig))
|
||||
self.assertFalse(RS256.verify(
|
||||
jwk_test.RSA512_KEY.public_key(), 'foo', sig + '!'))
|
||||
|
||||
def test_ps(self):
|
||||
from acme.jose.jwa import PS256
|
||||
sig = PS256.sign(RSA1024_KEY, 'foo')
|
||||
self.assertTrue(PS256.verify(RSA1024_KEY, 'foo', sig) is True)
|
||||
self.assertTrue(PS256.verify(RSA1024_KEY, 'foo', sig + '!') is False)
|
||||
self.assertTrue(PS256.verify(RSA1024_KEY.public_key(), 'foo', sig))
|
||||
self.assertFalse(PS256.verify(RSA1024_KEY.public_key(), 'foo', sig + '!'))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
|||
183
acme/jose/jwk.py
183
acme/jose/jwk.py
|
|
@ -1,8 +1,13 @@
|
|||
"""JSON Web Key."""
|
||||
import abc
|
||||
import binascii
|
||||
import logging
|
||||
|
||||
import Crypto.PublicKey.RSA
|
||||
import cryptography.exceptions
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import ec
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||
|
||||
from acme.jose import b64
|
||||
from acme.jose import errors
|
||||
|
|
@ -10,28 +15,83 @@ from acme.jose import json_util
|
|||
from acme.jose import util
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class JWK(json_util.TypedJSONObjectWithFields):
|
||||
# pylint: disable=too-few-public-methods
|
||||
"""JSON Web Key."""
|
||||
type_field_name = 'kty'
|
||||
TYPES = {}
|
||||
|
||||
@util.abstractclassmethod
|
||||
def load(cls, string): # pragma: no cover
|
||||
"""Load key from normalized string form."""
|
||||
raise NotImplementedError()
|
||||
cryptography_key_types = ()
|
||||
"""Subclasses should override."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def public(self): # pragma: no cover
|
||||
def public_key(self): # pragma: no cover
|
||||
"""Generate JWK with public key.
|
||||
|
||||
For symmetric cryptosystems, this would return ``self``.
|
||||
|
||||
"""
|
||||
# TODO: rename publickey to stay consistent with
|
||||
# HashableRSAKey.publickey
|
||||
raise NotImplementedError()
|
||||
|
||||
@classmethod
|
||||
def _load_cryptography_key(cls, data, password=None, backend=None):
|
||||
backend = default_backend() if backend is None else backend
|
||||
exceptions = {}
|
||||
|
||||
# private key?
|
||||
for loader in (serialization.load_pem_private_key,
|
||||
serialization.load_der_private_key):
|
||||
try:
|
||||
return loader(data, password, backend)
|
||||
except (ValueError, TypeError,
|
||||
cryptography.exceptions.UnsupportedAlgorithm) as error:
|
||||
exceptions[loader] = error
|
||||
|
||||
# public key?
|
||||
for loader in (serialization.load_pem_public_key,
|
||||
serialization.load_der_public_key):
|
||||
try:
|
||||
return loader(data, backend)
|
||||
except (ValueError,
|
||||
cryptography.exceptions.UnsupportedAlgorithm) as error:
|
||||
exceptions[loader] = error
|
||||
|
||||
# no luck
|
||||
raise errors.Error("Unable to deserialize key: {0}".format(exceptions))
|
||||
|
||||
@classmethod
|
||||
def load(cls, data, password=None, backend=None):
|
||||
"""Load serialized key as JWK.
|
||||
|
||||
:param str data: Public or private key serialized as PEM or DER.
|
||||
:param str password: Optional password.
|
||||
:param backend: A `.PEMSerializationBackend` and
|
||||
`.DERSerializationBackend` provider.
|
||||
|
||||
:raises errors.Error: if unable to deserialize, or unsupported
|
||||
JWK algorithm
|
||||
|
||||
:returns: JWK of an appropriate type.
|
||||
:rtype: `JWK`
|
||||
|
||||
"""
|
||||
try:
|
||||
key = cls._load_cryptography_key(data, password, backend)
|
||||
except errors.Error as error:
|
||||
logger.debug("Loading symmetric key, assymentric failed: %s", error)
|
||||
return JWKOct(key=data)
|
||||
|
||||
if cls.typ is not NotImplemented and not isinstance(
|
||||
key, cls.cryptography_key_types):
|
||||
raise errors.Error("Unable to deserialize {0} into {1}".format(
|
||||
key.__class__, cls.__class__))
|
||||
for jwk_cls in cls.TYPES.itervalues():
|
||||
if isinstance(key, jwk_cls.cryptography_key_types):
|
||||
return jwk_cls(key=key)
|
||||
raise errors.Error("Unsupported algorithm: {0}".format(key.__class__))
|
||||
|
||||
|
||||
@JWK.register
|
||||
class JWKES(JWK): # pragma: no cover
|
||||
|
|
@ -42,6 +102,8 @@ class JWKES(JWK): # pragma: no cover
|
|||
|
||||
"""
|
||||
typ = 'ES'
|
||||
cryptography_key_types = (
|
||||
ec.EllipticCurvePublicKey, ec.EllipticCurvePrivateKey)
|
||||
|
||||
def fields_to_partial_json(self):
|
||||
raise NotImplementedError()
|
||||
|
|
@ -50,11 +112,7 @@ class JWKES(JWK): # pragma: no cover
|
|||
def fields_from_json(cls, jobj):
|
||||
raise NotImplementedError()
|
||||
|
||||
@classmethod
|
||||
def load(cls, string):
|
||||
raise NotImplementedError()
|
||||
|
||||
def public(self):
|
||||
def public_key(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
|
|
@ -75,11 +133,7 @@ class JWKOct(JWK):
|
|||
def fields_from_json(cls, jobj):
|
||||
return cls(key=jobj['k'])
|
||||
|
||||
@classmethod
|
||||
def load(cls, string):
|
||||
return cls(key=string)
|
||||
|
||||
def public(self):
|
||||
def public_key(self):
|
||||
return self
|
||||
|
||||
|
||||
|
|
@ -87,12 +141,21 @@ class JWKOct(JWK):
|
|||
class JWKRSA(JWK):
|
||||
"""RSA JWK.
|
||||
|
||||
:ivar key: `Crypto.PublicKey.RSA` wrapped in `.HashableRSAKey`
|
||||
:ivar key: `cryptography.hazmat.primitives.rsa.RSAPrivateKey`
|
||||
or `cryptography.hazmat.primitives.rsa.RSAPublicKey` wrapped
|
||||
in `.ComparableRSAKey`
|
||||
|
||||
"""
|
||||
typ = 'RSA'
|
||||
cryptography_key_types = (rsa.RSAPublicKey, rsa.RSAPrivateKey)
|
||||
__slots__ = ('key',)
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
if 'key' in kwargs and not isinstance(
|
||||
kwargs['key'], util.ComparableRSAKey):
|
||||
kwargs['key'] = util.ComparableRSAKey(kwargs['key'])
|
||||
super(JWKRSA, self).__init__(*args, **kwargs)
|
||||
|
||||
@classmethod
|
||||
def _encode_param(cls, data):
|
||||
def _leading_zeros(arg):
|
||||
|
|
@ -110,31 +173,65 @@ class JWKRSA(JWK):
|
|||
except ValueError: # invalid literal for long() with base 16
|
||||
raise errors.DeserializationError()
|
||||
|
||||
@classmethod
|
||||
def load(cls, string):
|
||||
"""Load RSA key from string.
|
||||
|
||||
:param str string: RSA key in string form.
|
||||
|
||||
:returns:
|
||||
:rtype: :class:`JWKRSA`
|
||||
|
||||
"""
|
||||
return cls(key=util.HashableRSAKey(
|
||||
Crypto.PublicKey.RSA.importKey(string)))
|
||||
|
||||
def public(self):
|
||||
return type(self)(key=self.key.publickey())
|
||||
def public_key(self):
|
||||
return type(self)(key=self.key.public_key())
|
||||
|
||||
@classmethod
|
||||
def fields_from_json(cls, jobj):
|
||||
return cls(key=util.HashableRSAKey(
|
||||
Crypto.PublicKey.RSA.construct(
|
||||
(cls._decode_param(jobj['n']),
|
||||
cls._decode_param(jobj['e'])))))
|
||||
# pylint: disable=invalid-name
|
||||
n, e = (cls._decode_param(jobj[x]) for x in ('n', 'e'))
|
||||
public_numbers = rsa.RSAPublicNumbers(e=e, n=n)
|
||||
if 'd' not in jobj: # public key
|
||||
key = public_numbers.public_key(default_backend())
|
||||
else: # private key
|
||||
d = cls._decode_param(jobj['d'])
|
||||
if ('p' in jobj or 'q' in jobj or 'dp' in jobj or
|
||||
'dq' in jobj or 'qi' in jobj or 'oth' in jobj):
|
||||
# "If the producer includes any of the other private
|
||||
# key parameters, then all of the others MUST be
|
||||
# present, with the exception of "oth", which MUST
|
||||
# only be present when more than two prime factors
|
||||
# were used."
|
||||
p, q, dp, dq, qi, = all_params = tuple(
|
||||
jobj.get(x) for x in ('p', 'q', 'dp', 'dq', 'qi'))
|
||||
if tuple(param for param in all_params if param is None):
|
||||
raise errors.Error(
|
||||
"Some private parameters are missing: {0}".format(
|
||||
all_params))
|
||||
p, q, dp, dq, qi = tuple(cls._decode_param(x) for x in all_params)
|
||||
|
||||
# TODO: check for oth
|
||||
else:
|
||||
p, q = rsa.rsa_recover_prime_factors(n, e, d) # cryptography>=0.8
|
||||
dp = rsa.rsa_crt_dmp1(d, p)
|
||||
dq = rsa.rsa_crt_dmq1(d, q)
|
||||
qi = rsa.rsa_crt_iqmp(p, q)
|
||||
|
||||
key = rsa.RSAPrivateNumbers(
|
||||
p, q, d, dp, dq, qi, public_numbers).private_key(default_backend())
|
||||
|
||||
return cls(key=key)
|
||||
|
||||
def fields_to_partial_json(self):
|
||||
return {
|
||||
'n': self._encode_param(self.key.n),
|
||||
'e': self._encode_param(self.key.e),
|
||||
}
|
||||
# pylint: disable=protected-access
|
||||
if isinstance(self.key._wrapped, rsa.RSAPublicKey):
|
||||
numbers = self.key.public_numbers()
|
||||
params = {
|
||||
'n': numbers.n,
|
||||
'e': numbers.e,
|
||||
}
|
||||
else: # rsa.RSAPrivateKey
|
||||
private = self.key.private_numbers()
|
||||
public = self.key.public_key().public_numbers()
|
||||
params = {
|
||||
'n': public.n,
|
||||
'e': public.e,
|
||||
'd': private.d,
|
||||
'p': private.p,
|
||||
'q': private.q,
|
||||
'dp': private.dmp1,
|
||||
'dq': private.dmq1,
|
||||
'qi': private.iqmp,
|
||||
}
|
||||
return dict((key, self._encode_param(value))
|
||||
for key, value in params.iteritems())
|
||||
|
|
|
|||
|
|
@ -3,16 +3,35 @@ import os
|
|||
import pkg_resources
|
||||
import unittest
|
||||
|
||||
from Crypto.PublicKey import RSA
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
|
||||
from acme.jose import errors
|
||||
from acme.jose import util
|
||||
|
||||
|
||||
RSA256_KEY = util.HashableRSAKey(RSA.importKey(pkg_resources.resource_string(
|
||||
__name__, os.path.join('testdata', 'rsa256_key.pem'))))
|
||||
RSA512_KEY = util.HashableRSAKey(RSA.importKey(pkg_resources.resource_string(
|
||||
__name__, os.path.join('testdata', 'rsa512_key.pem'))))
|
||||
DSA_PEM = pkg_resources.resource_string(
|
||||
'letsencrypt.tests', os.path.join('testdata', 'dsa512_key.pem'))
|
||||
RSA256_KEY = serialization.load_pem_private_key(
|
||||
pkg_resources.resource_string(
|
||||
__name__, os.path.join('testdata', 'rsa256_key.pem')),
|
||||
password=None, backend=default_backend())
|
||||
RSA512_KEY = serialization.load_pem_private_key(
|
||||
pkg_resources.resource_string(
|
||||
__name__, os.path.join('testdata', 'rsa512_key.pem')),
|
||||
password=None, backend=default_backend())
|
||||
|
||||
|
||||
class JWKTest(unittest.TestCase):
|
||||
"""Tests for acme.jose.jwk.JWK."""
|
||||
|
||||
def test_load(self):
|
||||
from acme.jose.jwk import JWK
|
||||
self.assertRaises(errors.Error, JWK.load, DSA_PEM)
|
||||
|
||||
def test_load_subclass_wrong_type(self):
|
||||
from acme.jose.jwk import JWKRSA
|
||||
self.assertRaises(errors.Error, JWKRSA.load, DSA_PEM)
|
||||
|
||||
|
||||
class JWKOctTest(unittest.TestCase):
|
||||
|
|
@ -38,29 +57,48 @@ class JWKOctTest(unittest.TestCase):
|
|||
from acme.jose.jwk import JWKOct
|
||||
self.assertEqual(self.jwk, JWKOct.load('foo'))
|
||||
|
||||
def test_public(self):
|
||||
self.assertTrue(self.jwk.public() is self.jwk)
|
||||
def test_public_key(self):
|
||||
self.assertTrue(self.jwk.public_key() is self.jwk)
|
||||
|
||||
|
||||
class JWKRSATest(unittest.TestCase):
|
||||
"""Tests for acme.jose.jwk.JWKRSA."""
|
||||
# pylint: disable=too-many-instance-attributes
|
||||
|
||||
def setUp(self):
|
||||
from acme.jose.jwk import JWKRSA
|
||||
self.jwk256 = JWKRSA(key=RSA256_KEY.publickey())
|
||||
self.jwk256_private = JWKRSA(key=RSA256_KEY)
|
||||
self.jwk256 = JWKRSA(key=RSA256_KEY.public_key())
|
||||
self.jwk256json = {
|
||||
'kty': 'RSA',
|
||||
'e': 'AQAB',
|
||||
'n': 'm2Fylv-Uz7trgTW8EBHP3FQSMeZs2GNQ6VRo1sIVJEk',
|
||||
}
|
||||
self.jwk512 = JWKRSA(key=RSA512_KEY.publickey())
|
||||
self.jwk256_comparable = JWKRSA(key=util.ComparableRSAKey(
|
||||
RSA256_KEY.public_key()))
|
||||
self.jwk512 = JWKRSA(key=RSA512_KEY.public_key())
|
||||
self.jwk512json = {
|
||||
'kty': 'RSA',
|
||||
'e': 'AQAB',
|
||||
'n': 'rHVztFHtH92ucFJD_N_HW9AsdRsUuHUBBBDlHwNlRd3fp5'
|
||||
'80rv2-6QWE30cWgdmJS86ObRz6lUTor4R0T-3C5Q',
|
||||
}
|
||||
self.private = JWKRSA(key=RSA256_KEY)
|
||||
self.private_json_small = self.jwk256json.copy()
|
||||
self.private_json_small['d'] = (
|
||||
'lPQED_EPTV0UIBfNI3KP2d9Jlrc2mrMllmf946bu-CE')
|
||||
self.private_json = self.jwk256json.copy()
|
||||
self.private_json.update({
|
||||
'd': 'lPQED_EPTV0UIBfNI3KP2d9Jlrc2mrMllmf946bu-CE',
|
||||
'p': 'zUVNZn4lLLBD1R6NE8TKNQ',
|
||||
'q': 'wcfKfc7kl5jfqXArCRSURQ',
|
||||
'dp': 'CWJFq43QvT5Bm5iN8n1okQ',
|
||||
'dq': 'bHh2u7etM8LKKCF2pY2UdQ',
|
||||
'qi': 'oi45cEkbVoJjAbnQpFY87Q',
|
||||
})
|
||||
|
||||
def test_init_comparable(self):
|
||||
self.assertTrue(isinstance(self.jwk256.key, util.ComparableRSAKey))
|
||||
self.assertEqual(self.jwk256, self.jwk256_comparable)
|
||||
|
||||
def test_equals(self):
|
||||
self.assertEqual(self.jwk256, self.jwk256)
|
||||
|
|
@ -73,22 +111,33 @@ class JWKRSATest(unittest.TestCase):
|
|||
def test_load(self):
|
||||
from acme.jose.jwk import JWKRSA
|
||||
self.assertEqual(
|
||||
JWKRSA(key=util.HashableRSAKey(RSA256_KEY)), JWKRSA.load(
|
||||
pkg_resources.resource_string(
|
||||
__name__, os.path.join('testdata', 'rsa256_key.pem'))))
|
||||
self.private, JWKRSA.load(pkg_resources.resource_string(
|
||||
__name__, os.path.join('testdata', 'rsa256_key.pem'))))
|
||||
|
||||
def test_public(self):
|
||||
self.assertEqual(self.jwk256, self.jwk256_private.public())
|
||||
def test_public_key(self):
|
||||
self.assertEqual(self.jwk256, self.private.public_key())
|
||||
|
||||
def test_to_partial_json(self):
|
||||
self.assertEqual(self.jwk256.to_partial_json(), self.jwk256json)
|
||||
self.assertEqual(self.jwk512.to_partial_json(), self.jwk512json)
|
||||
self.assertEqual(self.private.to_partial_json(), self.private_json)
|
||||
|
||||
def test_from_json(self):
|
||||
from acme.jose.jwk import JWK
|
||||
self.assertEqual(self.jwk256, JWK.from_json(self.jwk256json))
|
||||
# TODO: fix schemata to allow RSA512
|
||||
#self.assertEqual(self.jwk512, JWK.from_json(self.jwk512json))
|
||||
self.assertEqual(
|
||||
self.jwk256, JWK.from_json(self.jwk256json))
|
||||
self.assertEqual(
|
||||
self.jwk512, JWK.from_json(self.jwk512json))
|
||||
self.assertEqual(self.private, JWK.from_json(self.private_json))
|
||||
|
||||
def test_from_json_private_small(self):
|
||||
from acme.jose.jwk import JWK
|
||||
self.assertEqual(self.private, JWK.from_json(self.private_json_small))
|
||||
|
||||
def test_from_json_missing_one_additional(self):
|
||||
from acme.jose.jwk import JWK
|
||||
del self.private_json['q']
|
||||
self.assertRaises(errors.Error, JWK.from_json, self.private_json)
|
||||
|
||||
def test_from_json_hashable(self):
|
||||
from acme.jose.jwk import JWK
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@ import argparse
|
|||
import base64
|
||||
import sys
|
||||
|
||||
import M2Crypto
|
||||
import OpenSSL
|
||||
|
||||
from acme.jose import b64
|
||||
from acme.jose import errors
|
||||
|
|
@ -122,14 +122,16 @@ class Header(json_util.JSONObjectWithFields):
|
|||
|
||||
@x5c.encoder
|
||||
def x5c(value): # pylint: disable=missing-docstring,no-self-argument
|
||||
return [base64.b64encode(cert.as_der()) for cert in value]
|
||||
return [base64.b64encode(OpenSSL.crypto.dump_certificate(
|
||||
OpenSSL.crypto.FILETYPE_ASN1, cert)) for cert in value]
|
||||
|
||||
@x5c.decoder
|
||||
def x5c(value): # pylint: disable=missing-docstring,no-self-argument
|
||||
try:
|
||||
return tuple(util.ComparableX509(M2Crypto.X509.load_cert_der_string(
|
||||
return tuple(util.ComparableX509(OpenSSL.crypto.load_certificate(
|
||||
OpenSSL.crypto.FILETYPE_ASN1,
|
||||
base64.b64decode(cert))) for cert in value)
|
||||
except M2Crypto.X509.X509Error as error:
|
||||
except OpenSSL.crypto.Error as error:
|
||||
raise errors.DeserializationError(error)
|
||||
|
||||
|
||||
|
|
@ -203,7 +205,7 @@ class Signature(json_util.JSONObjectWithFields):
|
|||
header_params = kwargs
|
||||
header_params['alg'] = alg
|
||||
if include_jwk:
|
||||
header_params['jwk'] = key.public()
|
||||
header_params['jwk'] = key.public_key()
|
||||
|
||||
assert set(header_params).issubset(cls.header_cls._fields)
|
||||
assert protect.issubset(cls.header_cls._fields)
|
||||
|
|
@ -354,12 +356,12 @@ class CLI(object):
|
|||
|
||||
if args.key is not None:
|
||||
assert args.kty is not None
|
||||
key = args.kty.load(args.key.read())
|
||||
key = args.kty.load(args.key.read()).public_key()
|
||||
else:
|
||||
key = None
|
||||
|
||||
sys.stdout.write(sig.payload)
|
||||
return int(not sig.verify(key=key))
|
||||
return not sig.verify(key=key)
|
||||
|
||||
@classmethod
|
||||
def _alg_type(cls, arg):
|
||||
|
|
|
|||
|
|
@ -4,9 +4,10 @@ import os
|
|||
import pkg_resources
|
||||
import unittest
|
||||
|
||||
import Crypto.PublicKey.RSA
|
||||
import M2Crypto
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
import mock
|
||||
import OpenSSL
|
||||
|
||||
from acme.jose import b64
|
||||
from acme.jose import errors
|
||||
|
|
@ -15,11 +16,13 @@ from acme.jose import jwk
|
|||
from acme.jose import util
|
||||
|
||||
|
||||
CERT = util.ComparableX509(M2Crypto.X509.load_cert(
|
||||
pkg_resources.resource_filename(
|
||||
CERT = util.ComparableX509(OpenSSL.crypto.load_certificate(
|
||||
OpenSSL.crypto.FILETYPE_PEM, pkg_resources.resource_string(
|
||||
'letsencrypt.tests', 'testdata/cert.pem')))
|
||||
RSA512_KEY = Crypto.PublicKey.RSA.importKey(pkg_resources.resource_string(
|
||||
__name__, os.path.join('testdata', 'rsa512_key.pem')))
|
||||
RSA512_KEY = serialization.load_pem_private_key(
|
||||
pkg_resources.resource_string(
|
||||
__name__, os.path.join('testdata', 'rsa512_key.pem')),
|
||||
password=None, backend=default_backend())
|
||||
|
||||
|
||||
class MediaTypeTest(unittest.TestCase):
|
||||
|
|
@ -73,10 +76,13 @@ class HeaderTest(unittest.TestCase):
|
|||
from acme.jose.jws import Header
|
||||
header = Header(x5c=(CERT, CERT))
|
||||
jobj = header.to_partial_json()
|
||||
cert_b64 = base64.b64encode(CERT.as_der())
|
||||
cert_b64 = base64.b64encode(OpenSSL.crypto.dump_certificate(
|
||||
OpenSSL.crypto.FILETYPE_ASN1, CERT))
|
||||
self.assertEqual(jobj, {'x5c': [cert_b64, cert_b64]})
|
||||
self.assertEqual(header, Header.from_json(jobj))
|
||||
jobj['x5c'][0] = base64.b64encode('xxx' + CERT.as_der())
|
||||
jobj['x5c'][0] = base64.b64encode(
|
||||
'xxx' + OpenSSL.crypto.dump_certificate(
|
||||
OpenSSL.crypto.FILETYPE_ASN1, CERT))
|
||||
self.assertRaises(errors.DeserializationError, Header.from_json, jobj)
|
||||
|
||||
def test_find_key(self):
|
||||
|
|
@ -107,7 +113,7 @@ class JWSTest(unittest.TestCase):
|
|||
|
||||
def setUp(self):
|
||||
self.privkey = jwk.JWKRSA(key=RSA512_KEY)
|
||||
self.pubkey = self.privkey.public()
|
||||
self.pubkey = self.privkey.public_key()
|
||||
|
||||
from acme.jose.jws import JWS
|
||||
self.unprotected = JWS.sign(
|
||||
|
|
|
|||
|
|
@ -1,6 +1,9 @@
|
|||
"""JOSE utilities."""
|
||||
import collections
|
||||
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||
import OpenSSL
|
||||
|
||||
|
||||
class abstractclassmethod(classmethod):
|
||||
# pylint: disable=invalid-name,too-few-public-methods
|
||||
|
|
@ -23,12 +26,51 @@ class abstractclassmethod(classmethod):
|
|||
|
||||
|
||||
class ComparableX509(object): # pylint: disable=too-few-public-methods
|
||||
"""Wrapper for M2Crypto.X509.* objects that supports __eq__.
|
||||
"""Wrapper for OpenSSL.crypto.X509** objects that supports __eq__.
|
||||
|
||||
Wraps around:
|
||||
|
||||
- :class:`M2Crypto.X509.X509`
|
||||
- :class:`M2Crypto.X509.Request`
|
||||
- :class:`OpenSSL.crypto.X509`
|
||||
- :class:`OpenSSL.crypto.X509Req`
|
||||
|
||||
"""
|
||||
def __init__(self, wrapped):
|
||||
assert isinstance(wrapped, OpenSSL.crypto.X509) or isinstance(
|
||||
wrapped, OpenSSL.crypto.X509Req)
|
||||
self._wrapped = wrapped
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self._wrapped, name)
|
||||
|
||||
def _dump(self, filetype=OpenSSL.crypto.FILETYPE_ASN1):
|
||||
# pylint: disable=missing-docstring,protected-access
|
||||
if isinstance(self._wrapped, OpenSSL.crypto.X509):
|
||||
func = OpenSSL.crypto.dump_certificate
|
||||
else: # assert in __init__ makes sure this is X509Req
|
||||
func = OpenSSL.crypto.dump_certificate_request
|
||||
return func(filetype, self._wrapped)
|
||||
|
||||
def __eq__(self, other):
|
||||
if not isinstance(other, self.__class__):
|
||||
return NotImplemented
|
||||
return self._dump() == other._dump() # pylint: disable=protected-access
|
||||
|
||||
def __hash__(self):
|
||||
return hash((self.__class__, self._dump()))
|
||||
|
||||
def __ne__(self, other):
|
||||
return not self == other
|
||||
|
||||
def __repr__(self):
|
||||
return '<{0}({1!r})>'.format(self.__class__.__name__, self._wrapped)
|
||||
|
||||
|
||||
class ComparableRSAKey(object): # pylint: disable=too-few-public-methods
|
||||
"""Wrapper for `cryptography` RSA keys.
|
||||
|
||||
Wraps around:
|
||||
- `cryptography.hazmat.primitives.assymetric.RSAPrivateKey`
|
||||
- `cryptography.hazmat.primitives.assymetric.RSAPublicKey`
|
||||
|
||||
"""
|
||||
def __init__(self, wrapped):
|
||||
|
|
@ -38,27 +80,38 @@ class ComparableX509(object): # pylint: disable=too-few-public-methods
|
|||
return getattr(self._wrapped, name)
|
||||
|
||||
def __eq__(self, other):
|
||||
return self.as_der() == other.as_der()
|
||||
# pylint: disable=protected-access
|
||||
if (not isinstance(other, self.__class__) or
|
||||
self._wrapped.__class__ is not other._wrapped.__class__):
|
||||
return NotImplemented
|
||||
# RSA*KeyWithSerialization requires cryptography>=0.8
|
||||
if isinstance(self._wrapped, rsa.RSAPrivateKeyWithSerialization):
|
||||
return self.private_numbers() == other.private_numbers()
|
||||
elif isinstance(self._wrapped, rsa.RSAPublicKeyWithSerialization):
|
||||
return self.public_numbers() == other.public_numbers()
|
||||
else:
|
||||
return False # we shouldn't reach here...
|
||||
|
||||
|
||||
class HashableRSAKey(object): # pylint: disable=too-few-public-methods
|
||||
"""Wrapper for `Crypto.PublicKey.RSA` objects that supports hashing."""
|
||||
|
||||
def __init__(self, wrapped):
|
||||
self._wrapped = wrapped
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self._wrapped, name)
|
||||
|
||||
def __eq__(self, other):
|
||||
return self._wrapped == other
|
||||
def __ne__(self, other):
|
||||
return not self == other
|
||||
|
||||
def __hash__(self):
|
||||
return hash((type(self), self.exportKey(format='DER')))
|
||||
# public_numbers() hasn't got stable hash!
|
||||
if isinstance(self._wrapped, rsa.RSAPrivateKeyWithSerialization):
|
||||
priv = self.private_numbers()
|
||||
pub = priv.public_numbers
|
||||
return hash((self.__class__, priv.p, priv.q, priv.dmp1,
|
||||
priv.dmq1, priv.iqmp, pub.n, pub.e))
|
||||
elif isinstance(self._wrapped, rsa.RSAPublicKeyWithSerialization):
|
||||
pub = self.public_numbers()
|
||||
return hash((self.__class__, pub.n, pub.e))
|
||||
|
||||
def publickey(self):
|
||||
def __repr__(self):
|
||||
return '<{0}({1!r})>'.format(self.__class__.__name__, self._wrapped)
|
||||
|
||||
def public_key(self):
|
||||
"""Get wrapped public key."""
|
||||
return type(self)(self._wrapped.publickey())
|
||||
return self.__class__(self._wrapped.public_key())
|
||||
|
||||
|
||||
class ImmutableMap(collections.Mapping, collections.Hashable):
|
||||
|
|
|
|||
|
|
@ -4,32 +4,104 @@ import os
|
|||
import pkg_resources
|
||||
import unittest
|
||||
|
||||
import Crypto.PublicKey.RSA
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
import OpenSSL
|
||||
|
||||
|
||||
class HashableRSAKeyTest(unittest.TestCase):
|
||||
"""Tests for acme.jose.util.HashableRSAKey."""
|
||||
class ComparableX509Test(unittest.TestCase):
|
||||
"""Tests for acme.jose.util.ComparableX509."""
|
||||
|
||||
def setUp(self):
|
||||
from acme.jose.util import HashableRSAKey
|
||||
self.key = HashableRSAKey(Crypto.PublicKey.RSA.importKey(
|
||||
pkg_resources.resource_string(
|
||||
__name__, os.path.join('testdata', 'rsa256_key.pem'))))
|
||||
self.key_same = HashableRSAKey(Crypto.PublicKey.RSA.importKey(
|
||||
pkg_resources.resource_string(
|
||||
__name__, os.path.join('testdata', 'rsa256_key.pem'))))
|
||||
from acme.jose.util import ComparableX509
|
||||
def _load(method, filename): # pylint: disable=missing-docstring
|
||||
return ComparableX509(method(
|
||||
OpenSSL.crypto.FILETYPE_PEM, pkg_resources.resource_string(
|
||||
'letsencrypt.tests', os.path.join('testdata', filename))))
|
||||
|
||||
self.req1 = _load(OpenSSL.crypto.load_certificate_request, 'csr.pem')
|
||||
self.req2 = _load(OpenSSL.crypto.load_certificate_request, 'csr.pem')
|
||||
self.req_other = _load(OpenSSL.crypto.load_certificate_request, 'csr-san.pem')
|
||||
|
||||
self.cert1 = _load(OpenSSL.crypto.load_certificate, 'cert.pem')
|
||||
self.cert2 = _load(OpenSSL.crypto.load_certificate, 'cert.pem')
|
||||
self.cert_other = _load(OpenSSL.crypto.load_certificate, 'cert-san.pem')
|
||||
|
||||
def test_eq(self):
|
||||
self.assertEqual(self.req1, self.req2)
|
||||
self.assertEqual(self.cert1, self.cert2)
|
||||
|
||||
def test_ne(self):
|
||||
self.assertNotEqual(self.req1, self.req_other)
|
||||
self.assertNotEqual(self.cert1, self.cert_other)
|
||||
|
||||
def test_ne_wrong_types(self):
|
||||
self.assertNotEqual(self.req1, 5)
|
||||
self.assertNotEqual(self.cert1, 5)
|
||||
|
||||
def test_hash(self):
|
||||
self.assertEqual(hash(self.req1), hash(self.req2))
|
||||
self.assertNotEqual(hash(self.req1), hash(self.req_other))
|
||||
|
||||
self.assertEqual(hash(self.cert1), hash(self.cert2))
|
||||
self.assertNotEqual(hash(self.cert1), hash(self.cert_other))
|
||||
|
||||
def test_repr(self):
|
||||
for x509 in self.req1, self.cert1:
|
||||
self.assertTrue(repr(x509).startswith(
|
||||
'<ComparableX509(<OpenSSL.crypto.X509'))
|
||||
|
||||
|
||||
class ComparableRSAKeyTest(unittest.TestCase):
|
||||
"""Tests for acme.jose.util.ComparableRSAKey."""
|
||||
|
||||
def setUp(self):
|
||||
from acme.jose.util import ComparableRSAKey
|
||||
backend = default_backend()
|
||||
def load_key(): # pylint: disable=missing-docstring
|
||||
return ComparableRSAKey(serialization.load_pem_private_key(
|
||||
pkg_resources.resource_string(
|
||||
__name__, os.path.join('testdata', 'rsa256_key.pem')),
|
||||
password=None, backend=backend))
|
||||
self.key = load_key()
|
||||
self.key_same = load_key()
|
||||
self.key2 = ComparableRSAKey(serialization.load_pem_private_key(
|
||||
pkg_resources.resource_string(
|
||||
__name__, os.path.join('testdata', 'rsa512_key.pem')),
|
||||
password=None, backend=backend))
|
||||
|
||||
def test_getattr_proxy(self):
|
||||
self.assertEqual(256, self.key.key_size)
|
||||
|
||||
def test_eq(self):
|
||||
# if __eq__ is not defined, then two HashableRSAKeys with same
|
||||
# _wrapped do not equate
|
||||
self.assertEqual(self.key, self.key_same)
|
||||
|
||||
def test_ne(self):
|
||||
self.assertNotEqual(self.key, self.key2)
|
||||
|
||||
def test_ne_different_types(self):
|
||||
self.assertNotEqual(self.key, 5)
|
||||
|
||||
def test_ne_not_wrapped(self):
|
||||
# pylint: disable=protected-access
|
||||
self.assertNotEqual(self.key, self.key_same._wrapped)
|
||||
|
||||
def test_ne_no_serialization(self):
|
||||
from acme.jose.util import ComparableRSAKey
|
||||
self.assertNotEqual(ComparableRSAKey(5), ComparableRSAKey(5))
|
||||
|
||||
def test_hash(self):
|
||||
self.assertTrue(isinstance(hash(self.key), int))
|
||||
self.assertEqual(hash(self.key), hash(self.key_same))
|
||||
self.assertNotEqual(hash(self.key), hash(self.key2))
|
||||
|
||||
def test_publickey(self):
|
||||
from acme.jose.util import HashableRSAKey
|
||||
self.assertTrue(isinstance(self.key.publickey(), HashableRSAKey))
|
||||
def test_repr(self):
|
||||
self.assertTrue(repr(self.key).startswith(
|
||||
'<ComparableRSAKey(<cryptography.hazmat.'))
|
||||
|
||||
def test_public_key(self):
|
||||
from acme.jose.util import ComparableRSAKey
|
||||
self.assertTrue(isinstance(self.key.public_key(), ComparableRSAKey))
|
||||
|
||||
|
||||
class ImmutableMapTest(unittest.TestCase):
|
||||
|
|
|
|||
|
|
@ -3,14 +3,17 @@ import os
|
|||
import pkg_resources
|
||||
import unittest
|
||||
|
||||
import Crypto.PublicKey.RSA
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
|
||||
from acme import errors
|
||||
from acme import jose
|
||||
|
||||
|
||||
RSA512_KEY = Crypto.PublicKey.RSA.importKey(pkg_resources.resource_string(
|
||||
'acme.jose', os.path.join('testdata', 'rsa512_key.pem')))
|
||||
RSA512_KEY = serialization.load_pem_private_key(
|
||||
pkg_resources.resource_string(
|
||||
'acme.jose', os.path.join('testdata', 'rsa512_key.pem')),
|
||||
password=None, backend=default_backend())
|
||||
|
||||
|
||||
class HeaderTest(unittest.TestCase):
|
||||
|
|
@ -44,7 +47,7 @@ class JWSTest(unittest.TestCase):
|
|||
|
||||
def setUp(self):
|
||||
self.privkey = jose.JWKRSA(key=RSA512_KEY)
|
||||
self.pubkey = self.privkey.public()
|
||||
self.pubkey = self.privkey.public_key()
|
||||
self.nonce = jose.b64encode('Nonce')
|
||||
|
||||
def test_it(self):
|
||||
|
|
|
|||
|
|
@ -84,7 +84,7 @@ class _Constant(jose.JSONDeSerializable):
|
|||
return isinstance(other, type(self)) and other.name == self.name
|
||||
|
||||
def __ne__(self, other):
|
||||
return not self.__eq__(other)
|
||||
return not self == other
|
||||
|
||||
|
||||
class Status(_Constant):
|
||||
|
|
@ -312,7 +312,7 @@ class CertificateRequest(jose.JSONObjectWithFields):
|
|||
"""ACME new-cert request.
|
||||
|
||||
:ivar acme.jose.util.ComparableX509 csr:
|
||||
`M2Crypto.X509.Request` wrapped in `.ComparableX509`
|
||||
`OpenSSL.crypto.X509Req` wrapped in `.ComparableX509`
|
||||
:ivar tuple authorizations: `tuple` of URIs (`str`)
|
||||
|
||||
"""
|
||||
|
|
@ -324,7 +324,7 @@ class CertificateResource(ResourceWithURI):
|
|||
"""Certificate Resource.
|
||||
|
||||
:ivar acme.jose.util.ComparableX509 body:
|
||||
`M2Crypto.X509.X509` wrapped in `.ComparableX509`
|
||||
`OpenSSL.crypto.X509` wrapped in `.ComparableX509`
|
||||
:ivar str cert_chain_uri: URI found in the 'up' ``Link`` header
|
||||
:ivar tuple authzrs: `tuple` of `AuthorizationResource`.
|
||||
|
||||
|
|
@ -336,7 +336,7 @@ class CertificateResource(ResourceWithURI):
|
|||
class Revocation(jose.JSONObjectWithFields):
|
||||
"""Revocation message.
|
||||
|
||||
:ivar .ComparableX509 certificate: `M2Crypto.X509.X509` wrapped in
|
||||
:ivar .ComparableX509 certificate: `OpenSSL.crypto.X509` wrapped in
|
||||
`.ComparableX509`
|
||||
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -3,26 +3,27 @@ import os
|
|||
import pkg_resources
|
||||
import unittest
|
||||
|
||||
from Crypto.PublicKey import RSA
|
||||
import M2Crypto
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
import mock
|
||||
import OpenSSL
|
||||
|
||||
from acme import challenges
|
||||
from acme import jose
|
||||
|
||||
|
||||
CERT = jose.ComparableX509(M2Crypto.X509.load_cert_string(
|
||||
CERT = jose.ComparableX509(OpenSSL.crypto.load_certificate(
|
||||
OpenSSL.crypto.FILETYPE_ASN1, pkg_resources.resource_string(
|
||||
'acme.jose', os.path.join('testdata', 'cert.der'))))
|
||||
CSR = jose.ComparableX509(OpenSSL.crypto.load_certificate_request(
|
||||
OpenSSL.crypto.FILETYPE_ASN1, pkg_resources.resource_string(
|
||||
'acme.jose', os.path.join('testdata', 'csr.der'))))
|
||||
KEY = serialization.load_pem_private_key(
|
||||
pkg_resources.resource_string(
|
||||
'acme.jose', os.path.join('testdata', 'cert.der')),
|
||||
M2Crypto.X509.FORMAT_DER))
|
||||
CSR = jose.ComparableX509(M2Crypto.X509.load_request_string(
|
||||
pkg_resources.resource_string(
|
||||
'acme.jose', os.path.join('testdata', 'csr.der')),
|
||||
M2Crypto.X509.FORMAT_DER))
|
||||
KEY = jose.util.HashableRSAKey(RSA.importKey(pkg_resources.resource_string(
|
||||
'acme.jose', os.path.join('testdata', 'rsa512_key.pem'))))
|
||||
CERT = jose.ComparableX509(M2Crypto.X509.load_cert(
|
||||
format=M2Crypto.X509.FORMAT_DER, file=pkg_resources.resource_filename(
|
||||
'acme.jose', os.path.join('testdata', 'rsa512_key.pem')),
|
||||
password=None, backend=default_backend())
|
||||
CERT = jose.ComparableX509(OpenSSL.crypto.load_certificate(
|
||||
OpenSSL.crypto.FILETYPE_ASN1, pkg_resources.resource_string(
|
||||
'acme.jose', os.path.join('testdata', 'cert.der'))))
|
||||
|
||||
|
||||
|
|
@ -109,7 +110,7 @@ class RegistrationTest(unittest.TestCase):
|
|||
"""Tests for acme.messages.Registration."""
|
||||
|
||||
def setUp(self):
|
||||
key = jose.jwk.JWKRSA(key=KEY.publickey())
|
||||
key = jose.jwk.JWKRSA(key=KEY.public_key())
|
||||
contact = (
|
||||
'mailto:admin@foo.com',
|
||||
'tel:1234',
|
||||
|
|
|
|||
|
|
@ -1,9 +1,7 @@
|
|||
"""Other ACME objects."""
|
||||
import functools
|
||||
import logging
|
||||
|
||||
import Crypto.Random
|
||||
import Crypto.PublicKey.RSA
|
||||
import os
|
||||
|
||||
from acme import jose
|
||||
|
||||
|
|
@ -43,7 +41,8 @@ class Signature(jose.JSONObjectWithFields):
|
|||
:param str msg: Message to be signed.
|
||||
|
||||
:param key: Key used for signing.
|
||||
:type key: :class:`Crypto.PublicKey.RSA`
|
||||
:type key: `cryptography.hazmat.primitives.assymetric.rsa.RSAPrivateKey`
|
||||
(optionally wrapped in `.ComparableRSAKey`).
|
||||
|
||||
:param str nonce: Nonce to be used. If None, nonce of
|
||||
``nonce_size`` will be randomly generated.
|
||||
|
|
@ -52,15 +51,14 @@ class Signature(jose.JSONObjectWithFields):
|
|||
|
||||
"""
|
||||
nonce_size = cls.NONCE_SIZE if nonce_size is None else nonce_size
|
||||
if nonce is None:
|
||||
nonce = Crypto.Random.get_random_bytes(nonce_size)
|
||||
nonce = os.urandom(nonce_size) if nonce is None else nonce
|
||||
|
||||
msg_with_nonce = nonce + msg
|
||||
sig = alg.sign(key, nonce + msg)
|
||||
logger.debug('%s signed as %s', msg_with_nonce, sig)
|
||||
|
||||
return cls(alg=alg, sig=sig, nonce=nonce,
|
||||
jwk=alg.kty(key=key.publickey()))
|
||||
jwk=alg.kty(key=key.public_key()))
|
||||
|
||||
def verify(self, msg):
|
||||
"""Verify the signature.
|
||||
|
|
|
|||
|
|
@ -3,14 +3,16 @@ import os
|
|||
import pkg_resources
|
||||
import unittest
|
||||
|
||||
import Crypto.PublicKey.RSA
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
|
||||
from acme import jose
|
||||
|
||||
|
||||
KEY = jose.HashableRSAKey(Crypto.PublicKey.RSA.importKey(
|
||||
KEY = serialization.load_pem_private_key(
|
||||
pkg_resources.resource_string(
|
||||
'acme.jose', os.path.join('testdata', 'rsa512_key.pem'))))
|
||||
'acme.jose', os.path.join('testdata', 'rsa512_key.pem')),
|
||||
password=None, backend=default_backend())
|
||||
|
||||
|
||||
class SignatureTest(unittest.TestCase):
|
||||
|
|
@ -26,7 +28,7 @@ class SignatureTest(unittest.TestCase):
|
|||
self.nonce = '\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9'
|
||||
|
||||
self.alg = jose.RS256
|
||||
self.jwk = jose.JWKRSA(key=KEY.publickey())
|
||||
self.jwk = jose.JWKRSA(key=KEY.public_key())
|
||||
|
||||
b64sig = ('SUPYKucUnhlTt8_sMxLiigOYdf_wlOLXPI-o7aRLTsOquVjDd6r'
|
||||
'AX9AFJHk-bCMQPJbSzXKjG6H1IWbvxjS2Ew')
|
||||
|
|
|
|||
|
|
@ -44,20 +44,14 @@ else
|
|||
fi
|
||||
|
||||
|
||||
# dpkg-dev: dpkg-architecture binary necessary to compile M2Crypto, c.f.
|
||||
# #276, https://github.com/martinpaljak/M2Crypto/issues/62,
|
||||
# M2Crypto setup.py:add_multiarch_paths
|
||||
|
||||
apt-get install -y --no-install-recommends \
|
||||
git-core \
|
||||
python \
|
||||
python-dev \
|
||||
"$virtualenv" \
|
||||
gcc \
|
||||
swig \
|
||||
dialog \
|
||||
libaugeas0 \
|
||||
libssl-dev \
|
||||
libffi-dev \
|
||||
ca-certificates \
|
||||
dpkg-dev \
|
||||
|
|
|
|||
|
|
@ -12,7 +12,6 @@ yum install -y \
|
|||
python-virtualenv \
|
||||
python-devel \
|
||||
gcc \
|
||||
swig \
|
||||
dialog \
|
||||
augeas-libs \
|
||||
openssl-devel \
|
||||
|
|
|
|||
|
|
@ -1,2 +1,2 @@
|
|||
#!/bin/sh
|
||||
brew install augeas swig
|
||||
brew install augeas
|
||||
|
|
|
|||
|
|
@ -23,7 +23,7 @@ import mock
|
|||
# http://docs.readthedocs.org/en/latest/faq.html#i-get-import-errors-on-libraries-that-depend-on-c-modules
|
||||
# c.f. #262
|
||||
sys.modules.update(
|
||||
(mod_name, mock.MagicMock()) for mod_name in ['augeas', 'M2Crypto'])
|
||||
(mod_name, mock.MagicMock()) for mod_name in ['augeas'])
|
||||
|
||||
here = os.path.abspath(os.path.dirname(__file__))
|
||||
|
||||
|
|
|
|||
|
|
@ -52,7 +52,6 @@ are provided mainly for the :ref:`developers <hacking>` reference.
|
|||
In general:
|
||||
|
||||
* ``sudo`` is required as a suggested way of running privileged process
|
||||
* `SWIG`_ is required for compiling `M2Crypto`_
|
||||
* `Augeas`_ is required for the Python bindings
|
||||
|
||||
|
||||
|
|
@ -102,14 +101,6 @@ Centos 7
|
|||
|
||||
sudo ./bootstrap/centos.sh
|
||||
|
||||
For installation run this modified command (note the trailing
|
||||
backslash):
|
||||
|
||||
.. code-block:: shell
|
||||
|
||||
SWIG_FEATURES="-includeall -D__`uname -m`__-I/usr/include/openssl" \
|
||||
./venv/bin/pip install -r requirements.txt .
|
||||
|
||||
|
||||
Installation
|
||||
============
|
||||
|
|
@ -127,13 +118,6 @@ Installation
|
|||
your operating system and are **not supported** by the
|
||||
Let's Encrypt team!
|
||||
|
||||
.. note:: If your operating system uses SWIG 3.0.5+, you will need to
|
||||
run ``pip install -r requirements-swig-3.0.5.txt -r
|
||||
requirements.txt .`` instead. Known affected systems:
|
||||
|
||||
* Fedora 22
|
||||
* some versions of Mac OS X
|
||||
|
||||
|
||||
Usage
|
||||
=====
|
||||
|
|
@ -173,6 +157,4 @@ By default, the following locations are searched:
|
|||
|
||||
|
||||
.. _Augeas: http://augeas.net/
|
||||
.. _M2Crypto: https://github.com/M2Crypto/M2Crypto
|
||||
.. _SWIG: http://www.swig.org/
|
||||
.. _Virtualenv: https://virtualenv.pypa.io
|
||||
|
|
|
|||
|
|
@ -3,8 +3,9 @@ import logging
|
|||
import os
|
||||
import pkg_resources
|
||||
|
||||
import Crypto.PublicKey.RSA
|
||||
import M2Crypto
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||
import OpenSSL
|
||||
|
||||
from acme import client
|
||||
from acme import messages
|
||||
|
|
@ -18,8 +19,11 @@ NEW_REG_URL = 'https://www.letsencrypt-demo.org/acme/new-reg'
|
|||
BITS = 2048 # minimum for Boulder
|
||||
DOMAIN = 'example1.com' # example.com is ignored by Boulder
|
||||
|
||||
key = jose.JWKRSA.load(
|
||||
Crypto.PublicKey.RSA.generate(BITS).exportKey(format="PEM"))
|
||||
# generate_private_key requires cryptography>=0.5
|
||||
key = jose.JWKRSA(key=rsa.generate_private_key(
|
||||
public_exponent=65537,
|
||||
key_size=2048,
|
||||
backend=default_backend()))
|
||||
acme = client.Client(NEW_REG_URL, key)
|
||||
|
||||
regr = acme.register(contact=())
|
||||
|
|
@ -35,9 +39,9 @@ logging.debug(authzr)
|
|||
|
||||
authzr, authzr_response = acme.poll(authzr)
|
||||
|
||||
csr = M2Crypto.X509.load_request_string(pkg_resources.resource_string(
|
||||
'acme.jose', os.path.join('testdata', 'csr.der')),
|
||||
M2Crypto.X509.FORMAT_DER)
|
||||
csr = OpenSSL.crypto.load_certificate_request(
|
||||
OpenSSL.crypto.FILETYPE_ASN1, pkg_resources.resource_string(
|
||||
'acme.jose', os.path.join('testdata', 'csr.der')))
|
||||
try:
|
||||
acme.request_issuance(csr, (authzr,))
|
||||
except messages.Error as error:
|
||||
|
|
|
|||
|
|
@ -58,7 +58,7 @@ class DVSNI(AnnotatedChallenge):
|
|||
"""
|
||||
response = challenges.DVSNIResponse(s=s)
|
||||
cert_pem = crypto_util.make_ss_cert(self.key.pem, [
|
||||
self.nonce_domain, self.domain, response.z_domain(self.challb)])
|
||||
self.domain, self.nonce_domain, response.z_domain(self.challb)])
|
||||
return cert_pem, response
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -3,8 +3,7 @@ import logging
|
|||
import os
|
||||
import pkg_resources
|
||||
|
||||
import M2Crypto
|
||||
import OpenSSL.crypto
|
||||
import OpenSSL
|
||||
import zope.component
|
||||
|
||||
from acme import jose
|
||||
|
|
@ -157,8 +156,8 @@ class Client(object):
|
|||
|
||||
authzr = self.auth_handler.get_authorizations(domains)
|
||||
certr = self.network.request_issuance(
|
||||
jose.ComparableX509(
|
||||
M2Crypto.X509.load_request_der_string(csr.data)),
|
||||
jose.ComparableX509(OpenSSL.crypto.load_certificate_request(
|
||||
OpenSSL.crypto.FILETYPE_ASN1, csr.data)),
|
||||
authzr)
|
||||
return certr, self.network.fetch_chain(certr)
|
||||
|
||||
|
|
@ -247,10 +246,12 @@ class Client(object):
|
|||
|
||||
# XXX: just to stop RenewableCert from complaining; this is
|
||||
# probably not a good solution
|
||||
chain_pem = "" if chain is None else chain.as_pem()
|
||||
chain_pem = "" if chain is None else OpenSSL.crypto.dump_certificate(
|
||||
OpenSSL.crypto.FILETYPE_PEM, chain)
|
||||
lineage = storage.RenewableCert.new_lineage(
|
||||
domains[0], certr.body.as_pem(), key.pem, chain_pem, params,
|
||||
config, cli_config)
|
||||
domains[0], OpenSSL.crypto.dump_certificate(
|
||||
OpenSSL.crypto.FILETYPE_PEM, certr.body),
|
||||
key.pem, chain_pem, params, config, cli_config)
|
||||
self._report_renewal_status(lineage)
|
||||
return lineage
|
||||
|
||||
|
|
@ -306,7 +307,8 @@ class Client(object):
|
|||
cert_chain_abspath = None
|
||||
cert_file, act_cert_path = le_util.unique_file(cert_path, 0o644)
|
||||
# TODO: Except
|
||||
cert_pem = certr.body.as_pem()
|
||||
cert_pem = OpenSSL.crypto.dump_certificate(
|
||||
OpenSSL.crypto.FILETYPE_PEM, certr.body)
|
||||
try:
|
||||
cert_file.write(cert_pem)
|
||||
finally:
|
||||
|
|
@ -318,7 +320,8 @@ class Client(object):
|
|||
chain_file, act_chain_path = le_util.unique_file(
|
||||
chain_path, 0o644)
|
||||
# TODO: Except
|
||||
chain_pem = chain_cert.as_pem()
|
||||
chain_pem = OpenSSL.crypto.dump_certificate(
|
||||
OpenSSL.crypto.FILETYPE_PEM, chain_cert)
|
||||
try:
|
||||
chain_file.write(chain_pem)
|
||||
finally:
|
||||
|
|
@ -431,8 +434,10 @@ def validate_key_csr(privkey, csr=None):
|
|||
|
||||
if csr:
|
||||
if csr.form == "der":
|
||||
csr_obj = M2Crypto.X509.load_request_der_string(csr.data)
|
||||
csr = le_util.CSR(csr.file, csr_obj.as_pem(), "der")
|
||||
csr_obj = OpenSSL.crypto.load_certificate_request(
|
||||
OpenSSL.crypto.FILETYPE_ASN1, csr.data)
|
||||
csr = le_util.CSR(csr.file, OpenSSL.crypto.dump_certificate(
|
||||
OpenSSL.crypto.FILETYPE_PEM, csr_obj), "pem")
|
||||
|
||||
# If CSR is provided, it must be readable and valid.
|
||||
if csr.data and not crypto_util.valid_csr(csr.data):
|
||||
|
|
|
|||
|
|
@ -4,17 +4,13 @@
|
|||
is capable of handling the signatures.
|
||||
|
||||
"""
|
||||
import datetime
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
|
||||
import Crypto.Hash.SHA256
|
||||
import Crypto.PublicKey.RSA
|
||||
import Crypto.Signature.PKCS1_v1_5
|
||||
|
||||
import M2Crypto
|
||||
import OpenSSL
|
||||
|
||||
from letsencrypt import errors
|
||||
from letsencrypt import le_util
|
||||
|
||||
|
||||
|
|
@ -90,7 +86,7 @@ def init_save_csr(privkey, names, path, csrname="csr-letsencrypt.pem"):
|
|||
def make_csr(key_str, domains):
|
||||
"""Generate a CSR.
|
||||
|
||||
:param str key_str: RSA key.
|
||||
:param str key_str: PEM-encoded RSA key.
|
||||
:param list domains: Domains included in the certificate.
|
||||
|
||||
.. todo:: Detect duplicates in `domains`? Using a set doesn't
|
||||
|
|
@ -101,25 +97,23 @@ def make_csr(key_str, domains):
|
|||
|
||||
"""
|
||||
assert domains, "Must provide one or more hostnames for the CSR."
|
||||
rsa_key = M2Crypto.RSA.load_key_string(key_str)
|
||||
pubkey = M2Crypto.EVP.PKey()
|
||||
pubkey.assign_rsa(rsa_key)
|
||||
|
||||
csr = M2Crypto.X509.Request()
|
||||
csr.set_pubkey(pubkey)
|
||||
# TODO: what to put into csr.get_subject()?
|
||||
|
||||
extstack = M2Crypto.X509.X509_Extension_Stack()
|
||||
ext = M2Crypto.X509.new_extension(
|
||||
"subjectAltName", ", ".join("DNS:%s" % d for d in domains))
|
||||
|
||||
extstack.push(ext)
|
||||
csr.add_extensions(extstack)
|
||||
csr.sign(pubkey, "sha256")
|
||||
assert csr.verify(pubkey)
|
||||
pubkey2 = csr.get_pubkey()
|
||||
assert csr.verify(pubkey2)
|
||||
return csr.as_pem(), csr.as_der()
|
||||
pkey = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, key_str)
|
||||
req = OpenSSL.crypto.X509Req()
|
||||
req.get_subject().CN = domains[0]
|
||||
# TODO: what to put into req.get_subject()?
|
||||
# TODO: put SAN if len(domains) > 1
|
||||
req.add_extensions([
|
||||
OpenSSL.crypto.X509Extension(
|
||||
"subjectAltName",
|
||||
critical=False,
|
||||
value=", ".join("DNS:%s" % d for d in domains)
|
||||
),
|
||||
])
|
||||
req.set_pubkey(pkey)
|
||||
req.sign(pkey, "sha256")
|
||||
return tuple(OpenSSL.crypto.dump_certificate_request(method, req)
|
||||
for method in (OpenSSL.crypto.FILETYPE_PEM,
|
||||
OpenSSL.crypto.FILETYPE_ASN1))
|
||||
|
||||
|
||||
# WARNING: the csr and private key file are possible attack vectors for TOCTOU
|
||||
|
|
@ -139,9 +133,11 @@ def valid_csr(csr):
|
|||
|
||||
"""
|
||||
try:
|
||||
csr_obj = M2Crypto.X509.load_request_string(csr)
|
||||
return bool(csr_obj.verify(csr_obj.get_pubkey()))
|
||||
except M2Crypto.X509.X509Error:
|
||||
req = OpenSSL.crypto.load_certificate_request(
|
||||
OpenSSL.crypto.FILETYPE_PEM, csr)
|
||||
return req.verify(req.get_pubkey())
|
||||
except OpenSSL.crypto.Error as error:
|
||||
logger.debug(error, exc_info=True)
|
||||
return False
|
||||
|
||||
|
||||
|
|
@ -149,15 +145,20 @@ def csr_matches_pubkey(csr, privkey):
|
|||
"""Does private key correspond to the subject public key in the CSR?
|
||||
|
||||
:param str csr: CSR in PEM.
|
||||
:param str privkey: Private key file contents
|
||||
:param str privkey: Private key file contents (PEM)
|
||||
|
||||
:returns: Correspondence of private key to CSR subject public key.
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
csr_obj = M2Crypto.X509.load_request_string(csr)
|
||||
privkey_obj = M2Crypto.RSA.load_key_string(privkey)
|
||||
return csr_obj.get_pubkey().get_rsa().pub() == privkey_obj.pub()
|
||||
req = OpenSSL.crypto.load_certificate_request(
|
||||
OpenSSL.crypto.FILETYPE_PEM, csr)
|
||||
pkey = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, privkey)
|
||||
try:
|
||||
return req.verify(pkey)
|
||||
except OpenSSL.crypto.Error as error:
|
||||
logger.debug(error, exc_info=True)
|
||||
return False
|
||||
|
||||
|
||||
def make_key(bits):
|
||||
|
|
@ -169,24 +170,48 @@ def make_key(bits):
|
|||
:rtype: str
|
||||
|
||||
"""
|
||||
return Crypto.PublicKey.RSA.generate(bits).exportKey(format="PEM")
|
||||
assert bits >= 1024 # XXX
|
||||
key = OpenSSL.crypto.PKey()
|
||||
key.generate_key(OpenSSL.crypto.TYPE_RSA, bits)
|
||||
return OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)
|
||||
|
||||
|
||||
def valid_privkey(privkey):
|
||||
"""Is valid RSA private key?
|
||||
|
||||
:param str privkey: Private key file contents
|
||||
:param str privkey: Private key file contents in PEM
|
||||
|
||||
:returns: Validity of private key.
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
try:
|
||||
return bool(M2Crypto.RSA.load_key_string(privkey).check_key())
|
||||
except M2Crypto.RSA.RSAError:
|
||||
return OpenSSL.crypto.load_privatekey(
|
||||
OpenSSL.crypto.FILETYPE_PEM, privkey).check()
|
||||
except (TypeError, OpenSSL.crypto.Error):
|
||||
return False
|
||||
|
||||
|
||||
def _pyopenssl_load(data, method, types=(
|
||||
OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.FILETYPE_ASN1)):
|
||||
openssl_errors = []
|
||||
for filetype in types:
|
||||
try:
|
||||
return method(filetype, data), filetype
|
||||
except OpenSSL.crypto.Error as error: # TODO: anything else?
|
||||
openssl_errors.append(error)
|
||||
raise errors.Error("Unable to load: {0}".format(",".join(
|
||||
str(error) for error in openssl_errors)))
|
||||
|
||||
def pyopenssl_load_certificate(data):
|
||||
"""Load PEM/DER certificate.
|
||||
|
||||
:raises errors.Error:
|
||||
|
||||
"""
|
||||
return _pyopenssl_load(data, OpenSSL.crypto.load_certificate)
|
||||
|
||||
|
||||
def make_ss_cert(key_str, domains, not_before=None,
|
||||
validity=(7 * 24 * 60 * 60)):
|
||||
"""Returns new self-signed cert in PEM form.
|
||||
|
|
@ -194,44 +219,36 @@ def make_ss_cert(key_str, domains, not_before=None,
|
|||
Uses key_str and contains all domains.
|
||||
|
||||
"""
|
||||
assert domains, "Must provide one or more hostnames for the CSR."
|
||||
|
||||
rsa_key = M2Crypto.RSA.load_key_string(key_str)
|
||||
pubkey = M2Crypto.EVP.PKey()
|
||||
pubkey.assign_rsa(rsa_key)
|
||||
|
||||
cert = M2Crypto.X509.X509()
|
||||
cert.set_pubkey(pubkey)
|
||||
assert domains, "Must provide one or more hostnames for the cert."
|
||||
pkey = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, key_str)
|
||||
cert = OpenSSL.crypto.X509()
|
||||
cert.set_serial_number(1337)
|
||||
cert.set_version(2)
|
||||
|
||||
current_ts = long(time.time() if not_before is None else not_before)
|
||||
current = M2Crypto.ASN1.ASN1_UTCTIME()
|
||||
current.set_time(current_ts)
|
||||
expire = M2Crypto.ASN1.ASN1_UTCTIME()
|
||||
expire.set_time(current_ts + validity)
|
||||
cert.set_not_before(current)
|
||||
cert.set_not_after(expire)
|
||||
extensions = [
|
||||
OpenSSL.crypto.X509Extension(
|
||||
"basicConstraints", True, 'CA:TRUE, pathlen:0'),
|
||||
]
|
||||
|
||||
subject = cert.get_subject()
|
||||
subject.C = "US"
|
||||
subject.ST = "Michigan"
|
||||
subject.L = "Ann Arbor"
|
||||
subject.O = "University of Michigan and the EFF"
|
||||
subject.CN = domains[0]
|
||||
cert.get_subject().CN = domains[0]
|
||||
# TODO: what to put into cert.get_subject()?
|
||||
cert.set_issuer(cert.get_subject())
|
||||
|
||||
if len(domains) > 1:
|
||||
cert.add_ext(M2Crypto.X509.new_extension(
|
||||
"basicConstraints", "CA:FALSE"))
|
||||
cert.add_ext(M2Crypto.X509.new_extension(
|
||||
"subjectAltName", ", ".join(["DNS:%s" % d for d in domains])))
|
||||
extensions.append(OpenSSL.crypto.X509Extension(
|
||||
"subjectAltName",
|
||||
critical=False,
|
||||
value=", ".join("DNS:%s" % d for d in domains)
|
||||
))
|
||||
|
||||
cert.sign(pubkey, "sha256")
|
||||
assert cert.verify(pubkey)
|
||||
assert cert.verify()
|
||||
# print check_purpose(,0
|
||||
return cert.as_pem()
|
||||
cert.add_extensions(extensions)
|
||||
|
||||
cert.gmtime_adj_notBefore(0 if not_before is None else not_before)
|
||||
cert.gmtime_adj_notAfter(validity)
|
||||
|
||||
cert.set_pubkey(pkey)
|
||||
cert.sign(pkey, "sha256")
|
||||
return OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
|
||||
|
||||
|
||||
def _pyopenssl_cert_or_req_san(cert_or_req):
|
||||
|
|
@ -309,3 +326,21 @@ def get_sans_from_csr(csr, typ=OpenSSL.crypto.FILETYPE_PEM):
|
|||
"""
|
||||
return _get_sans_from_cert_or_req(
|
||||
csr, OpenSSL.crypto.load_certificate_request, typ)
|
||||
|
||||
|
||||
def asn1_generalizedtime_to_dt(timestamp):
|
||||
"""Convert ASN.1 GENERALIZEDTIME to datetime.
|
||||
|
||||
Useful for deserialization of `OpenSSL.crypto.X509.get_notAfter` and
|
||||
`OpenSSL.crypto.X509.get_notAfter` outputs.
|
||||
|
||||
.. todo:: This function support only one format: `%Y%m%d%H%M%SZ`.
|
||||
Implement remaining two.
|
||||
|
||||
"""
|
||||
return datetime.datetime.strptime(timestamp, '%Y%m%d%H%M%SZ')
|
||||
|
||||
|
||||
def pyopenssl_x509_name_as_text(x509name):
|
||||
"""Convert `OpenSSL.crypto.X509Name to text."""
|
||||
return "/".join("{0}={1}" for key, value in x509name.get_components())
|
||||
|
|
|
|||
|
|
@ -409,6 +409,7 @@ def separate_list_input(input_):
|
|||
"""
|
||||
no_commas = input_.replace(",", " ")
|
||||
# Each string is naturally unicode, this causes problems with M2Crypto SANs
|
||||
# TODO: check if above is still true when M2Crypto is gone ^
|
||||
return [str(string) for string in no_commas.split()]
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -6,9 +6,7 @@ import socket
|
|||
import sys
|
||||
import time
|
||||
|
||||
import Crypto.Random
|
||||
import OpenSSL.crypto
|
||||
import OpenSSL.SSL
|
||||
import OpenSSL
|
||||
import zope.component
|
||||
import zope.interface
|
||||
|
||||
|
|
@ -267,7 +265,6 @@ class StandaloneAuthenticator(common.Plugin):
|
|||
|
||||
sys.stdout.flush()
|
||||
fork_result = os.fork()
|
||||
Crypto.Random.atfork()
|
||||
if fork_result:
|
||||
# PARENT process (still the Let's Encrypt client process)
|
||||
self.child_pid = fork_result
|
||||
|
|
|
|||
|
|
@ -7,8 +7,7 @@ import socket
|
|||
import unittest
|
||||
|
||||
import mock
|
||||
import OpenSSL.crypto
|
||||
import OpenSSL.SSL
|
||||
import OpenSSL
|
||||
|
||||
from acme import challenges
|
||||
|
||||
|
|
@ -374,10 +373,8 @@ class StartListenerTest(unittest.TestCase):
|
|||
StandaloneAuthenticator
|
||||
self.authenticator = StandaloneAuthenticator(config=CONFIG, name=None)
|
||||
|
||||
@mock.patch("letsencrypt.plugins.standalone.authenticator."
|
||||
"Crypto.Random.atfork")
|
||||
@mock.patch("letsencrypt.plugins.standalone.authenticator.os.fork")
|
||||
def test_start_listener_fork_parent(self, mock_fork, mock_atfork):
|
||||
def test_start_listener_fork_parent(self, mock_fork):
|
||||
self.authenticator.do_parent_process = mock.Mock()
|
||||
self.authenticator.do_parent_process.return_value = True
|
||||
mock_fork.return_value = 22222
|
||||
|
|
@ -387,12 +384,9 @@ class StartListenerTest(unittest.TestCase):
|
|||
self.assertTrue(result)
|
||||
self.assertEqual(self.authenticator.child_pid, 22222)
|
||||
self.authenticator.do_parent_process.assert_called_once_with(1717)
|
||||
mock_atfork.assert_called_once_with()
|
||||
|
||||
@mock.patch("letsencrypt.plugins.standalone.authenticator."
|
||||
"Crypto.Random.atfork")
|
||||
@mock.patch("letsencrypt.plugins.standalone.authenticator.os.fork")
|
||||
def test_start_listener_fork_child(self, mock_fork, mock_atfork):
|
||||
def test_start_listener_fork_child(self, mock_fork):
|
||||
self.authenticator.do_parent_process = mock.Mock()
|
||||
self.authenticator.do_child_process = mock.Mock()
|
||||
mock_fork.return_value = 0
|
||||
|
|
@ -400,7 +394,7 @@ class StartListenerTest(unittest.TestCase):
|
|||
self.assertEqual(self.authenticator.child_pid, os.getpid())
|
||||
self.authenticator.do_child_process.assert_called_once_with(
|
||||
1717, "key")
|
||||
mock_atfork.assert_called_once_with()
|
||||
|
||||
|
||||
class DoParentProcessTest(unittest.TestCase):
|
||||
"""Tests for do_parent_process() method."""
|
||||
|
|
|
|||
|
|
@ -1,6 +1,9 @@
|
|||
"""Proof of Possession Identifier Validation Challenge."""
|
||||
import M2Crypto
|
||||
import logging
|
||||
import os
|
||||
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
import zope.component
|
||||
|
||||
from acme import challenges
|
||||
|
|
@ -11,6 +14,9 @@ from letsencrypt import interfaces
|
|||
from letsencrypt.display import util as display_util
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ProofOfPossession(object): # pylint: disable=too-few-public-methods
|
||||
"""Proof of Possession Identifier Validation Challenge.
|
||||
|
||||
|
|
@ -39,12 +45,19 @@ class ProofOfPossession(object): # pylint: disable=too-few-public-methods
|
|||
return None
|
||||
|
||||
for cert, key, _ in self.installer.get_all_certs_keys():
|
||||
der_cert_key = M2Crypto.X509.load_cert(cert).get_pubkey().as_der()
|
||||
with open(cert) as cert_file:
|
||||
cert_data = cert_file.read()
|
||||
try:
|
||||
cert_key = achall.alg.kty.load(der_cert_key)
|
||||
# If JWKES.load raises other exceptions, they should be caught here
|
||||
except (IndexError, ValueError, TypeError):
|
||||
continue
|
||||
cert_obj = x509.load_pem_x509_certificate(
|
||||
cert_data, default_backend())
|
||||
except ValueError:
|
||||
try:
|
||||
cert_obj = x509.load_der_x509_certificate(
|
||||
cert_data, default_backend())
|
||||
except ValueError:
|
||||
logger.warn("Certificate is neither PER nor DER: %s", cert)
|
||||
|
||||
cert_key = achall.alg.kty(key=cert_obj.public_key())
|
||||
if cert_key == achall.hints.jwk:
|
||||
return self._gen_response(achall, key)
|
||||
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ import os
|
|||
import sys
|
||||
|
||||
import configobj
|
||||
import OpenSSL
|
||||
import zope.component
|
||||
|
||||
from letsencrypt import configuration
|
||||
|
|
@ -90,8 +91,11 @@ def renew(cert, old_version):
|
|||
# best is to have obtain_certificate return None for
|
||||
# new_key if the old key is to be used (since save_successor
|
||||
# already understands this distinction!)
|
||||
return cert.save_successor(old_version, new_certr.body.as_pem(),
|
||||
new_key.pem, new_chain.as_pem())
|
||||
return cert.save_successor(
|
||||
old_version, OpenSSL.crypto.dump_certificate(
|
||||
OpenSSL.crypto.FILETYPE_PEM, new_certr.body),
|
||||
new_key.pem, OpenSSL.crypto.dump_certificate(
|
||||
OpenSSL.crypto.FILETYPE_PEM, new_chain))
|
||||
# TODO: Notify results
|
||||
else:
|
||||
# TODO: Notify negative results
|
||||
|
|
|
|||
|
|
@ -13,11 +13,11 @@ import os
|
|||
import shutil
|
||||
import tempfile
|
||||
|
||||
import Crypto.PublicKey.RSA
|
||||
import M2Crypto
|
||||
import OpenSSL
|
||||
|
||||
from acme.jose import util as jose_util
|
||||
|
||||
from letsencrypt import crypto_util
|
||||
from letsencrypt import errors
|
||||
from letsencrypt import le_util
|
||||
from letsencrypt import network
|
||||
|
|
@ -70,10 +70,11 @@ class Revoker(object):
|
|||
"""
|
||||
certs = []
|
||||
try:
|
||||
clean_pem = Crypto.PublicKey.RSA.importKey(
|
||||
authkey.pem).exportKey("PEM")
|
||||
# https://www.dlitz.net/software/pycrypto/api/current/Crypto.PublicKey.RSA-module.html
|
||||
except (IndexError, ValueError, TypeError):
|
||||
clean_pem = OpenSSL.crypto.dump_privatekey(
|
||||
OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.load_privatekey(
|
||||
OpenSSL.crypto.FILETYPE_PEM, authkey.pem))
|
||||
except OpenSSL.crypto.Error as error:
|
||||
logger.debug(error, exc_info=True)
|
||||
raise errors.RevokerError(
|
||||
"Invalid key file specified to revoke_from_key")
|
||||
|
||||
|
|
@ -86,9 +87,11 @@ class Revoker(object):
|
|||
# certificate.
|
||||
_, b_k = self._row_to_backup(row)
|
||||
try:
|
||||
test_pem = Crypto.PublicKey.RSA.importKey(
|
||||
open(b_k).read()).exportKey("PEM")
|
||||
except (IndexError, ValueError, TypeError):
|
||||
test_pem = OpenSSL.crypto.dump_privatekey(
|
||||
OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.load_privatekey(
|
||||
OpenSSL.crypto.FILETYPE_PEM, open(b_k).read()))
|
||||
except OpenSSL.crypto.Error as error:
|
||||
logger.debug(error, exc_info=True)
|
||||
# This should never happen given the assumptions of the
|
||||
# module. If it does, it is probably best to delete the
|
||||
# the offending key/cert. For now... just raise an exception
|
||||
|
|
@ -193,10 +196,15 @@ class Revoker(object):
|
|||
|
||||
for (cert_path, _, path) in self.installer.get_all_certs_keys():
|
||||
try:
|
||||
cert_sha1 = M2Crypto.X509.load_cert(
|
||||
cert_path).get_fingerprint(md="sha1")
|
||||
except (IOError, M2Crypto.X509.X509Error):
|
||||
with open(cert_path) as cert_file:
|
||||
cert_data = cert_file.read()
|
||||
except IOError:
|
||||
continue
|
||||
try:
|
||||
cert_obj, _ = crypto_util.pyopenssl_load_certificate(cert_data)
|
||||
except errors.Error:
|
||||
continue
|
||||
cert_sha1 = cert_obj.digest("sha1")
|
||||
if cert_sha1 in csha1_vhlist:
|
||||
csha1_vhlist[cert_sha1].append(path)
|
||||
else:
|
||||
|
|
@ -243,15 +251,15 @@ class Revoker(object):
|
|||
"""
|
||||
# XXX | pylint: disable=unused-variable
|
||||
|
||||
# These will both have to change in the future away from M2Crypto
|
||||
# pylint: disable=protected-access
|
||||
certificate = jose_util.ComparableX509(cert._cert)
|
||||
try:
|
||||
with open(cert.backup_key_path, "rU") as backup_key_file:
|
||||
key = Crypto.PublicKey.RSA.importKey(backup_key_file.read())
|
||||
|
||||
key = OpenSSL.crypto.load_privatekey(
|
||||
OpenSSL.crypto.FILETYPE_PEM, backup_key_file.read())
|
||||
# If the key file doesn't exist... or is corrupted
|
||||
except (IndexError, ValueError, TypeError):
|
||||
except OpenSSL.crypto.Error as error:
|
||||
logger.debug(error, exc_info=True)
|
||||
raise errors.RevokerError(
|
||||
"Corrupted backup key file: %s" % cert.backup_key_path)
|
||||
|
||||
|
|
@ -369,8 +377,8 @@ class Revoker(object):
|
|||
class Cert(object):
|
||||
"""Cert object used for Revocation convenience.
|
||||
|
||||
:ivar _cert: M2Crypto X509 cert
|
||||
:type _cert: :class:`M2Crypto.X509`
|
||||
:ivar _cert: Certificate
|
||||
:type _cert: :class:`OpenSSL.crypto.X509`
|
||||
|
||||
:ivar int idx: convenience index used for listing
|
||||
:ivar orig: (`str` path - original certificate, `str` status)
|
||||
|
|
@ -398,8 +406,16 @@ class Cert(object):
|
|||
|
||||
"""
|
||||
try:
|
||||
self._cert = M2Crypto.X509.load_cert(cert_path)
|
||||
except (IOError, M2Crypto.X509.X509Error):
|
||||
with open(cert_path) as cert_file:
|
||||
cert_data = cert_file.read()
|
||||
except IOError:
|
||||
raise errors.RevokerError(
|
||||
"Error loading certificate: %s" % cert_path)
|
||||
|
||||
try:
|
||||
self._cert = OpenSSL.crypto.load_certificate(
|
||||
OpenSSL.crypto.FILETYPE_PEM, cert_data)
|
||||
except OpenSSL.crypto.Error:
|
||||
raise errors.RevokerError(
|
||||
"Error loading certificate: %s" % cert_path)
|
||||
|
||||
|
|
@ -447,8 +463,11 @@ class Cert(object):
|
|||
if not os.path.isfile(orig):
|
||||
status = Cert.DELETED_MSG
|
||||
else:
|
||||
o_cert = M2Crypto.X509.load_cert(orig)
|
||||
if self.get_fingerprint() != o_cert.get_fingerprint(md="sha1"):
|
||||
with open(orig) as orig_file:
|
||||
orig_data = orig_file.read()
|
||||
o_cert = OpenSSL.crypto.load_certificate(
|
||||
OpenSSL.crypto.FILETYPE_PEM, orig_data)
|
||||
if self.get_fingerprint() != o_cert.digest("sha1"):
|
||||
status = Cert.CHANGED_MSG
|
||||
|
||||
# Verify original key path
|
||||
|
|
@ -468,47 +487,49 @@ class Cert(object):
|
|||
self.backup_path = backup
|
||||
self.backup_key_path = backup_key
|
||||
|
||||
# M2Crypto is eventually going to be replaced, hence the reason for _cert
|
||||
def get_cn(self):
|
||||
"""Get common name."""
|
||||
return self._cert.get_subject().CN
|
||||
|
||||
def get_fingerprint(self):
|
||||
"""Get SHA1 fingerprint."""
|
||||
return self._cert.get_fingerprint(md="sha1")
|
||||
return self._cert.digest("sha1")
|
||||
|
||||
def get_not_before(self):
|
||||
"""Get not_valid_before field."""
|
||||
return self._cert.get_not_before().get_datetime()
|
||||
return crypto_util.asn1_generalizedtime_to_dt(
|
||||
self._cert.get_notBefore())
|
||||
|
||||
def get_not_after(self):
|
||||
"""Get not_valid_after field."""
|
||||
return self._cert.get_not_after().get_datetime()
|
||||
return crypto_util.asn1_generalizedtime_to_dt(
|
||||
self._cert.get_notAfter())
|
||||
|
||||
def get_der(self):
|
||||
"""Get certificate in der format."""
|
||||
return self._cert.as_der()
|
||||
return OpenSSL.crypto.dump_certificate(
|
||||
OpenSSL.crypto.FILETYPE_ASN1, self._cert)
|
||||
|
||||
def get_pub_key(self):
|
||||
"""Get public key size.
|
||||
|
||||
.. todo:: M2Crypto doesn't support ECC, this will have to be updated
|
||||
.. todo:: Support for ECC
|
||||
|
||||
"""
|
||||
return "RSA " + str(self._cert.get_pubkey().size() * 8)
|
||||
return "RSA {0}".format(self._cert.get_pubkey().bits)
|
||||
|
||||
def get_san(self):
|
||||
"""Get subject alternative name if available."""
|
||||
try:
|
||||
return self._cert.get_ext("subjectAltName").get_value()
|
||||
except LookupError:
|
||||
return ""
|
||||
# pylint: disable=protected-access
|
||||
return ", ".join(crypto_util._pyopenssl_cert_or_req_san(self._cert))
|
||||
|
||||
def __str__(self):
|
||||
text = [
|
||||
"Subject: %s" % self._cert.get_subject().as_text(),
|
||||
"Subject: %s" % crypto_util.pyopenssl_x509_name_as_text(
|
||||
self._cert.get_subject()),
|
||||
"SAN: %s" % self.get_san(),
|
||||
"Issuer: %s" % self._cert.get_issuer().as_text(),
|
||||
"Issuer: %s" % crypto_util.pyopenssl_x509_name_as_text(
|
||||
self._cert.get_issuer()),
|
||||
"Public Key: %s" % self.get_pub_key(),
|
||||
"Not Before: %s" % str(self.get_not_before()),
|
||||
"Not After: %s" % str(self.get_not_after()),
|
||||
|
|
|
|||
|
|
@ -1,12 +1,13 @@
|
|||
"""Tests for letsencrypt.achallenges."""
|
||||
import os
|
||||
import pkg_resources
|
||||
import re
|
||||
import unittest
|
||||
|
||||
import M2Crypto
|
||||
import OpenSSL
|
||||
|
||||
from acme import challenges
|
||||
|
||||
from letsencrypt import crypto_util
|
||||
from letsencrypt import le_util
|
||||
from letsencrypt.tests import acme_util
|
||||
|
||||
|
|
@ -31,15 +32,13 @@ class DVSNITest(unittest.TestCase):
|
|||
def test_gen_cert_and_response(self):
|
||||
cert_pem, _ = self.achall.gen_cert_and_response(s=self.response.s)
|
||||
|
||||
cert = M2Crypto.X509.load_cert_string(cert_pem)
|
||||
self.assertEqual(cert.get_subject().CN, self.chall.nonce_domain)
|
||||
|
||||
sans = cert.get_ext("subjectAltName").get_value()
|
||||
self.assertEqual(
|
||||
set([self.chall.nonce_domain, "example.com",
|
||||
self.response.z_domain(self.chall)]),
|
||||
set(re.findall(r"DNS:([^, $]*)", sans)),
|
||||
)
|
||||
cert = OpenSSL.crypto.load_certificate(
|
||||
OpenSSL.crypto.FILETYPE_PEM, cert_pem)
|
||||
self.assertEqual(cert.get_subject().CN, "example.com")
|
||||
# pylint: disable=protected-access
|
||||
self.assertEqual(crypto_util._pyopenssl_cert_or_req_san(cert), [
|
||||
"example.com", self.chall.nonce_domain,
|
||||
self.response.z_domain(self.chall)])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
|
|||
|
|
@ -4,23 +4,25 @@ import itertools
|
|||
import os
|
||||
import pkg_resources
|
||||
|
||||
import Crypto.PublicKey.RSA
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
|
||||
from acme import challenges
|
||||
from acme import jose
|
||||
from acme import messages
|
||||
|
||||
|
||||
KEY = jose.HashableRSAKey(Crypto.PublicKey.RSA.importKey(
|
||||
KEY = serialization.load_pem_private_key(
|
||||
pkg_resources.resource_string(
|
||||
"acme.jose", os.path.join("testdata", "rsa512_key.pem"))))
|
||||
__name__, os.path.join('testdata', 'rsa512_key.pem')),
|
||||
password=None, backend=default_backend())
|
||||
|
||||
# Challenges
|
||||
SIMPLE_HTTP = challenges.SimpleHTTP(
|
||||
token="evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ+PCt92wr+oA")
|
||||
DVSNI = challenges.DVSNI(
|
||||
r="O*\xb4-\xad\xec\x95>\xed\xa9\r0\x94\xe8\x97\x9c&6\xbf'\xb3"
|
||||
"\xed\x9a9nX\x0f'\\m\xe7\x12", nonce="a82d5ff8ef740d12881f6d3c2277ab2e")
|
||||
r=jose.b64decode("Tyq0La3slT7tqQ0wlOiXnCY2vyez7Zo5blgPJ1xt5xI"),
|
||||
nonce=jose.b64decode("a82d5ff8ef740d12881f6d3c2277ab2e"))
|
||||
DNS = challenges.DNS(token="17817c66b60ce2e4012dfad92657527a")
|
||||
RECOVERY_CONTACT = challenges.RecoveryContact(
|
||||
activation_url="https://example.ca/sendrecovery/a5bd99383fb0",
|
||||
|
|
@ -28,9 +30,9 @@ RECOVERY_CONTACT = challenges.RecoveryContact(
|
|||
contact="c********n@example.com")
|
||||
RECOVERY_TOKEN = challenges.RecoveryToken()
|
||||
POP = challenges.ProofOfPossession(
|
||||
alg="RS256", nonce="xD\xf9\xb9\xdbU\xed\xaa\x17\xf1y|\x81\x88\x99 ",
|
||||
alg="RS256", nonce=jose.b64decode("eET5udtV7aoX8Xl8gYiZIA"),
|
||||
hints=challenges.ProofOfPossession.Hints(
|
||||
jwk=jose.JWKRSA(key=KEY.publickey()),
|
||||
jwk=jose.JWKRSA(key=KEY.public_key()),
|
||||
cert_fingerprints=(
|
||||
"93416768eb85e33adc4277f4c9acd63e7418fcfe",
|
||||
"16d95b7b63f1972b980b14c20291f3c0d1855d95",
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ import shutil
|
|||
import tempfile
|
||||
|
||||
import configobj
|
||||
import M2Crypto.X509
|
||||
import OpenSSL
|
||||
import mock
|
||||
|
||||
from acme import jose
|
||||
|
|
@ -51,8 +51,8 @@ class ClientTest(unittest.TestCase):
|
|||
self.client.auth_handler.get_authorizations.assert_called_once_with(
|
||||
["example.com", "www.example.com"])
|
||||
self.network.request_issuance.assert_callend_once_with(
|
||||
jose.ComparableX509(
|
||||
M2Crypto.X509.load_request_der_string(CSR_SAN)),
|
||||
jose.ComparableX509(OpenSSL.crypto.load_certificate_request(
|
||||
OpenSSL.crypto.FILETYPE_ASN1, CSR_SAN)),
|
||||
self.client.auth_handler.get_authorizations())
|
||||
self.network().fetch_chain.assert_called_once_with(mock.sentinel.certr)
|
||||
|
||||
|
|
|
|||
|
|
@ -6,7 +6,6 @@ import shutil
|
|||
import tempfile
|
||||
import unittest
|
||||
|
||||
import M2Crypto
|
||||
import OpenSSL
|
||||
import mock
|
||||
|
||||
|
|
@ -146,7 +145,8 @@ class MakeKeyTest(unittest.TestCase): # pylint: disable=too-few-public-methods
|
|||
def test_it(self): # pylint: disable=no-self-use
|
||||
from letsencrypt.crypto_util import make_key
|
||||
# Do not test larger keys as it takes too long.
|
||||
M2Crypto.RSA.load_key_string(make_key(1024))
|
||||
OpenSSL.crypto.load_privatekey(
|
||||
OpenSSL.crypto.FILETYPE_PEM, make_key(1024))
|
||||
|
||||
|
||||
class ValidPrivkeyTest(unittest.TestCase):
|
||||
|
|
|
|||
|
|
@ -1,9 +1,11 @@
|
|||
"""Tests for letsencrypt.proof_of_possession."""
|
||||
import Crypto.PublicKey.RSA
|
||||
import os
|
||||
import pkg_resources
|
||||
import tempfile
|
||||
import unittest
|
||||
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
import mock
|
||||
|
||||
from acme import challenges
|
||||
|
|
@ -17,9 +19,7 @@ from letsencrypt.display import util as display_util
|
|||
|
||||
BASE_PACKAGE = "letsencrypt.tests"
|
||||
CERT0_PATH = pkg_resources.resource_filename(
|
||||
BASE_PACKAGE, os.path.join("testdata", "cert.pem"))
|
||||
CERT1_PATH = pkg_resources.resource_filename(
|
||||
BASE_PACKAGE, os.path.join("testdata", "cert-san.pem"))
|
||||
"acme.jose", os.path.join("testdata", "cert.der"))
|
||||
CERT2_PATH = pkg_resources.resource_filename(
|
||||
BASE_PACKAGE, os.path.join("testdata", "dsa_cert.pem"))
|
||||
CERT2_KEY_PATH = pkg_resources.resource_filename(
|
||||
|
|
@ -28,14 +28,17 @@ CERT3_PATH = pkg_resources.resource_filename(
|
|||
BASE_PACKAGE, os.path.join("testdata", "matching_cert.pem"))
|
||||
CERT3_KEY_PATH = pkg_resources.resource_filename(
|
||||
BASE_PACKAGE, os.path.join("testdata", "rsa512_key.pem"))
|
||||
CERT3_KEY = Crypto.PublicKey.RSA.importKey(pkg_resources.resource_string(
|
||||
BASE_PACKAGE, os.path.join('testdata', 'rsa512_key.pem'))).publickey()
|
||||
with open(CERT3_KEY_PATH) as cert3_file:
|
||||
CERT3_KEY = serialization.load_pem_private_key(
|
||||
cert3_file.read(), password=None,
|
||||
backend=default_backend()).public_key()
|
||||
|
||||
|
||||
class ProofOfPossessionTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.installer = mock.MagicMock()
|
||||
certs = [CERT0_PATH, CERT1_PATH, CERT2_PATH, CERT3_PATH]
|
||||
self.cert1_path = tempfile.mkstemp()[1]
|
||||
certs = [CERT0_PATH, self.cert1_path, CERT2_PATH, CERT3_PATH]
|
||||
keys = [None, None, CERT2_KEY_PATH, CERT3_KEY_PATH]
|
||||
self.installer.get_all_certs_keys.return_value = zip(
|
||||
certs, keys, 4 * [None])
|
||||
|
|
@ -53,9 +56,12 @@ class ProofOfPossessionTest(unittest.TestCase):
|
|||
self.achall = achallenges.ProofOfPossession(
|
||||
challb=challb, domain="example.com")
|
||||
|
||||
def tearDown(self):
|
||||
os.remove(self.cert1_path)
|
||||
|
||||
def test_perform_bad_challenge(self):
|
||||
hints = challenges.ProofOfPossession.Hints(
|
||||
jwk=jose.jwk.JWKOct(key=CERT3_KEY), cert_fingerprints=(),
|
||||
jwk=jose.jwk.JWKOct(key="foo"), cert_fingerprints=(),
|
||||
certs=(), serial_numbers=(), subject_key_identifiers=(),
|
||||
issuers=(), authorized_for=())
|
||||
chall = challenges.ProofOfPossession(
|
||||
|
|
|
|||
|
|
@ -8,12 +8,18 @@ import unittest
|
|||
|
||||
import configobj
|
||||
import mock
|
||||
import OpenSSL
|
||||
import pytz
|
||||
|
||||
from letsencrypt import configuration
|
||||
from letsencrypt.storage import ALL_FOUR
|
||||
|
||||
|
||||
CERT = OpenSSL.crypto.load_certificate(
|
||||
OpenSSL.crypto.FILETYPE_PEM, pkg_resources.resource_string(
|
||||
'letsencrypt.tests', os.path.join('testdata', 'cert.pem')))
|
||||
|
||||
|
||||
def unlink_all(rc_object):
|
||||
"""Unlink all four items associated with this RenewableCert."""
|
||||
for kind in ALL_FOUR:
|
||||
|
|
@ -553,7 +559,6 @@ class RenewableCertTests(unittest.TestCase):
|
|||
@mock.patch("letsencrypt.client.determine_account")
|
||||
@mock.patch("letsencrypt.client.Client")
|
||||
def test_renew(self, mock_c, mock_da, mock_pd):
|
||||
"""Tests for renew()."""
|
||||
from letsencrypt import renewer
|
||||
|
||||
test_cert = pkg_resources.resource_string(
|
||||
|
|
@ -583,9 +588,8 @@ class RenewableCertTests(unittest.TestCase):
|
|||
mock_client = mock.MagicMock()
|
||||
# pylint: disable=star-args
|
||||
mock_client.obtain_certificate.return_value = (
|
||||
mock.Mock(**{'body.as_pem.return_value': 'cert'}),
|
||||
mock.Mock(**{'as_pem.return_value': 'chain'}),
|
||||
mock.Mock(pem="key"), mock.sentinel.csr)
|
||||
mock.MagicMock(body=CERT), CERT, mock.Mock(pem="key"),
|
||||
mock.sentinel.csr)
|
||||
mock_c.return_value = mock_client
|
||||
self.assertEqual(2, renewer.renew(self.test_rc, 1))
|
||||
# TODO: We could also make several assertions about calls that should
|
||||
|
|
|
|||
|
|
@ -7,12 +7,18 @@ import tempfile
|
|||
import unittest
|
||||
|
||||
import mock
|
||||
import OpenSSL
|
||||
|
||||
from letsencrypt import errors
|
||||
from letsencrypt import le_util
|
||||
from letsencrypt.display import util as display_util
|
||||
|
||||
|
||||
KEY = OpenSSL.crypto.load_privatekey(
|
||||
OpenSSL.crypto.FILETYPE_PEM, pkg_resources.resource_string(
|
||||
__name__, os.path.join("testdata", "rsa512_key.pem")))
|
||||
|
||||
|
||||
class RevokerBase(unittest.TestCase): # pylint: disable=too-few-public-methods
|
||||
"""Base Class for Revoker Tests."""
|
||||
def setUp(self):
|
||||
|
|
@ -77,13 +83,13 @@ class RevokerTest(RevokerBase):
|
|||
|
||||
self.assertEqual(mock_net.call_count, 2)
|
||||
|
||||
@mock.patch("letsencrypt.revoker.Crypto.PublicKey.RSA.importKey")
|
||||
def test_revoke_by_invalid_keys(self, mock_import):
|
||||
mock_import.side_effect = ValueError
|
||||
@mock.patch("letsencrypt.revoker.OpenSSL.crypto.load_privatekey")
|
||||
def test_revoke_by_invalid_keys(self, mock_load_privatekey):
|
||||
mock_load_privatekey.side_effect = OpenSSL.crypto.Error
|
||||
self.assertRaises(
|
||||
errors.RevokerError, self.revoker.revoke_from_key, self.key)
|
||||
|
||||
mock_import.side_effect = [mock.Mock(), IndexError]
|
||||
mock_load_privatekey.side_effect = [KEY, OpenSSL.crypto.Error]
|
||||
self.assertRaises(
|
||||
errors.RevokerError, self.revoker.revoke_from_key, self.key)
|
||||
|
||||
|
|
@ -192,10 +198,10 @@ class RevokerTest(RevokerBase):
|
|||
self.revoker._safe_revoke(self.certs)
|
||||
self.assertTrue(mock_log.error.called)
|
||||
|
||||
@mock.patch("letsencrypt.revoker.Crypto.PublicKey.RSA.importKey")
|
||||
def test_acme_revoke_failure(self, mock_crypto):
|
||||
@mock.patch("letsencrypt.revoker.OpenSSL.crypto.load_privatekey")
|
||||
def test_acme_revoke_failure(self, mock_load_privatekey):
|
||||
# pylint: disable=protected-access
|
||||
mock_crypto.side_effect = ValueError
|
||||
mock_load_privatekey.side_effect = OpenSSL.crypto.Error
|
||||
self.assertRaises(
|
||||
errors.Error, self.revoker._acme_revoke, self.certs[0])
|
||||
|
||||
|
|
@ -261,18 +267,28 @@ class RevokerInstallerTest(RevokerBase):
|
|||
self.assertEqual(
|
||||
sha_vh[cert.get_fingerprint()], self.installs[i])
|
||||
|
||||
@mock.patch("letsencrypt.revoker.M2Crypto.X509.load_cert")
|
||||
def test_get_installed_load_failure(self, mock_m2):
|
||||
@mock.patch("letsencrypt.revoker.OpenSSL.crypto.load_certificate")
|
||||
def test_get_installed_load_failure(self, mock_load_certificate):
|
||||
mock_installer = mock.MagicMock()
|
||||
mock_installer.get_all_certs_keys.return_value = self.certs_keys
|
||||
|
||||
mock_m2.side_effect = IOError
|
||||
mock_load_certificate.side_effect = OpenSSL.crypto.Error
|
||||
|
||||
revoker = self._get_revoker(mock_installer)
|
||||
|
||||
# pylint: disable=protected-access
|
||||
self.assertEqual(revoker._get_installed_locations(), {})
|
||||
|
||||
def test_get_installed_load_failure_open(self):
|
||||
tmp = tempfile.mkdtemp()
|
||||
mock_installer = mock.MagicMock()
|
||||
mock_installer.get_all_certs_keys.return_value = [(
|
||||
os.path.join(tmp, 'missing'), None, None)]
|
||||
revoker = self._get_revoker(mock_installer)
|
||||
# pylint: disable=protected-access
|
||||
self.assertEqual(revoker._get_installed_locations(), {})
|
||||
os.rmdir(tmp)
|
||||
|
||||
|
||||
class RevokerClassMethodsTest(RevokerBase):
|
||||
def setUp(self):
|
||||
|
|
@ -328,6 +344,13 @@ class CertTest(unittest.TestCase):
|
|||
from letsencrypt.revoker import Cert
|
||||
self.assertRaises(errors.RevokerError, Cert, self.key_path)
|
||||
|
||||
def test_failed_load_open(self):
|
||||
tmp = tempfile.mkdtemp()
|
||||
from letsencrypt.revoker import Cert
|
||||
self.assertRaises(
|
||||
errors.RevokerError, Cert, os.path.join(tmp, 'missing'))
|
||||
os.rmdir(tmp)
|
||||
|
||||
def test_no_row(self):
|
||||
self.assertEqual(self.certs[0].get_row(), None)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,67 +0,0 @@
|
|||
# Support swig 3.0.5+
|
||||
# https://github.com/M2Crypto/M2Crypto/issues/24
|
||||
# https://github.com/M2Crypto/M2Crypto/pull/30
|
||||
git+https://github.com/M2Crypto/M2Crypto.git@d13a3a46c8934c5f50b31d5f95b23e6e06f845c3#egg=M2Crypto
|
||||
|
||||
# This requirements file will fail on Travis CI 12.04 LTS Ubuntu build
|
||||
# machine under TOX_ENV=py26 with very confusing error (full tracback
|
||||
# at https://api.travis-ci.org/jobs/66529698/log.txt?deansi=true):
|
||||
|
||||
#Traceback (most recent call last):
|
||||
# File "setup.py", line 133, in <module>
|
||||
# include_package_data=True,
|
||||
# File "/opt/python/2.6.9/lib/python2.6/distutils/core.py", line 152, in setup
|
||||
# dist.run_commands()
|
||||
# File "/opt/python/2.6.9/lib/python2.6/distutils/dist.py", line 975, in run_commands
|
||||
# self.run_command(cmd)
|
||||
# File "/opt/python/2.6.9/lib/python2.6/distutils/dist.py", line 995, in run_command
|
||||
# cmd_obj.run()
|
||||
# File "/home/travis/build/letsencrypt/lets-encrypt-preview/.tox/py26/lib/python2.6/site-packages/setuptools/command/test.py", line 142, in run
|
||||
# self.with_project_on_sys_path(self.run_tests)
|
||||
# File "/home/travis/build/letsencrypt/lets-encrypt-preview/.tox/py26/lib/python2.6/site-packages/setuptools/command/test.py", line 122, in with_project_on_sys_path
|
||||
# func()
|
||||
# File "/home/travis/build/letsencrypt/lets-encrypt-preview/.tox/py26/lib/python2.6/site-packages/setuptools/command/test.py", line 163, in run_tests
|
||||
# testRunner=self._resolve_as_ep(self.test_runner),
|
||||
# File "/opt/python/2.6.9/lib/python2.6/unittest.py", line 816, in __init__
|
||||
# self.parseArgs(argv)
|
||||
# File "/opt/python/2.6.9/lib/python2.6/unittest.py", line 843, in parseArgs
|
||||
# self.createTests()
|
||||
# File "/opt/python/2.6.9/lib/python2.6/unittest.py", line 849, in createTests
|
||||
# self.module)
|
||||
# File "/opt/python/2.6.9/lib/python2.6/unittest.py", line 613, in loadTestsFromNames
|
||||
# suites = [self.loadTestsFromName(name, module) for name in names]
|
||||
# File "/opt/python/2.6.9/lib/python2.6/unittest.py", line 587, in loadTestsFromName
|
||||
# return self.loadTestsFromModule(obj)
|
||||
# File "/home/travis/build/letsencrypt/lets-encrypt-preview/.tox/py26/lib/python2.6/site-packages/setuptools/command/test.py", line 37, in loadTestsFromModule
|
||||
# tests.append(self.loadTestsFromName(submodule))
|
||||
# File "/opt/python/2.6.9/lib/python2.6/unittest.py", line 584, in loadTestsFromName
|
||||
# parent, obj = obj, getattr(obj, part)
|
||||
#AttributeError: 'module' object has no attribute 'continuity_auth'
|
||||
|
||||
# the above error happens because letsencrypt.continuity_auth cannot import M2Crypto:
|
||||
|
||||
#>>> import M2Crypto
|
||||
#Traceback (most recent call last):
|
||||
# File "<stdin>", line 1, in <module>
|
||||
# File "/root/lets-encrypt-preview/venv/lib/python2.6/site-packages/M2Crypto-0.21.1-py2.6-linux-x86_64.egg/M2Crypto/__init__.py", line 22, in <module>
|
||||
# import m2crypto
|
||||
# File "/root/lets-encrypt-preview/venv/lib/python2.6/site-packages/M2Crypto-0.21.1-py2.6-linux-x86_64.egg/M2Crypto/m2crypto.py", line 26, in <module>
|
||||
# _m2crypto = swig_import_helper()
|
||||
# File "/root/lets-encrypt-preview/venv/lib/python2.6/site-packages/M2Crypto-0.21.1-py2.6-linux-x86_64.egg/M2Crypto/m2crypto.py", line 22, in swig_import_helper
|
||||
# _mod = imp.load_module('_m2crypto', fp, pathname, description)
|
||||
#ImportError: /root/lets-encrypt-preview/venv/lib/python2.6/site-packages/M2Crypto-0.21.1-py2.6-linux-x86_64.egg/M2Crypto/_m2crypto.so: undefined symbol: SSLv2_method
|
||||
|
||||
# For more info see:
|
||||
|
||||
# - https://github.com/martinpaljak/M2Crypto/commit/84977c532c2444c5487db57146d81bb68dd5431d
|
||||
# - http://stackoverflow.com/questions/10547332/install-m2crypto-on-a-virtualenv-without-system-packages
|
||||
# - http://stackoverflow.com/questions/8206546/undefined-symbol-sslv2-method
|
||||
|
||||
# In short: Python has been built without SSLv2 support, and
|
||||
# github.com/M2Crypto/M2Crypto version doesn't contain necessary
|
||||
# patch, but it's the only one that has a patch for newer versions of
|
||||
# swig...
|
||||
|
||||
# Problem seems not exists on Python 2.7. It's unlikely that the
|
||||
# target distribution has swig 3.0.5+ and doesn't have Python 2.7, so
|
||||
# this file should only be used in conjuction with Python 2.6.
|
||||
15
setup.py
15
setup.py
|
|
@ -35,27 +35,28 @@ changes = read_file(os.path.join(here, 'CHANGES.rst'))
|
|||
# maintainers. and will make the future migration a lot easier.
|
||||
acme_install_requires = [
|
||||
'argparse',
|
||||
# load_pem_private/public_key (>=0.6)
|
||||
# rsa_recover_prime_factors (>=0.8)
|
||||
'cryptography>=0.8',
|
||||
#'letsencrypt' # TODO: uses testdata vectors
|
||||
'mock',
|
||||
'pycrypto',
|
||||
'pyrfc3339',
|
||||
'ndg-httpsclient', # urllib3 InsecurePlatformWarning (#304)
|
||||
'pyasn1', # urllib3 InsecurePlatformWarning (#304)
|
||||
#'PyOpenSSL', # version pin would cause mismatch
|
||||
'pytz',
|
||||
'requests',
|
||||
'werkzeug',
|
||||
'M2Crypto',
|
||||
]
|
||||
letsencrypt_install_requires = [
|
||||
#'acme',
|
||||
'argparse',
|
||||
'ConfigArgParse',
|
||||
'configobj',
|
||||
'M2Crypto',
|
||||
#'cryptography>=0.7', # load_pem_x509_certificate, version pin mismatch
|
||||
'mock',
|
||||
'parsedatetime',
|
||||
'psutil>=2.1.0', # net_connections introduced in 2.1.0
|
||||
'pycrypto',
|
||||
# https://pyopenssl.readthedocs.org/en/latest/api/crypto.html#OpenSSL.crypto.X509Req.get_extensions
|
||||
'PyOpenSSL>=0.15',
|
||||
'pyrfc3339',
|
||||
|
|
@ -63,7 +64,6 @@ letsencrypt_install_requires = [
|
|||
'pytz',
|
||||
'zope.component',
|
||||
'zope.interface',
|
||||
'M2Crypto',
|
||||
]
|
||||
letsencrypt_apache_install_requires = [
|
||||
#'acme',
|
||||
|
|
@ -83,6 +83,7 @@ letsencrypt_nginx_install_requires = [
|
|||
|
||||
install_requires = [
|
||||
'argparse',
|
||||
'cryptography>=0.8',
|
||||
'ConfigArgParse',
|
||||
'configobj',
|
||||
'mock',
|
||||
|
|
@ -90,7 +91,6 @@ install_requires = [
|
|||
'parsedatetime',
|
||||
'psutil>=2.1.0', # net_connections introduced in 2.1.0
|
||||
'pyasn1', # urllib3 InsecurePlatformWarning (#304)
|
||||
'pycrypto',
|
||||
# https://pyopenssl.readthedocs.org/en/latest/api/crypto.html#OpenSSL.crypto.X509Req.get_extensions
|
||||
'PyOpenSSL>=0.15',
|
||||
'pyparsing>=1.5.5', # Python3 support; perhaps unnecessary?
|
||||
|
|
@ -102,9 +102,6 @@ install_requires = [
|
|||
'werkzeug',
|
||||
'zope.component',
|
||||
'zope.interface',
|
||||
# order of items in install_requires DOES matter and M2Crypto has
|
||||
# to go last, see #152
|
||||
'M2Crypto',
|
||||
]
|
||||
|
||||
assert set(install_requires) == set.union(*(set(ireq) for ireq in (
|
||||
|
|
|
|||
Loading…
Reference in a new issue