Merge pull request #607 from kuba/dvsni-verify

Verify DVSNI
This commit is contained in:
James Kasten 2015-07-19 02:30:51 -07:00
commit 4b8651274f
18 changed files with 563 additions and 140 deletions

View file

@ -4,9 +4,15 @@ import functools
import hashlib
import logging
import os
import socket
from cryptography.hazmat.backends import default_backend
from cryptography import x509
import OpenSSL
import requests
from acme import errors
from acme import crypto_util
from acme import fields
from acme import jose
from acme import other
@ -191,6 +197,18 @@ class DVSNI(DVChallenge):
"""
return binascii.hexlify(self.nonce) + self.DOMAIN_SUFFIX
def probe_cert(self, domain, **kwargs):
"""Probe DVSNI challenge certificate."""
host = socket.gethostbyname(domain)
logging.debug('%s resolved to %s', domain, host)
kwargs.setdefault("host", host)
kwargs.setdefault("port", self.PORT)
kwargs["name"] = self.nonce_domain
# TODO: try different methods?
# pylint: disable=protected-access
return crypto_util._probe_sni(**kwargs)
@ChallengeResponse.register
class DVSNIResponse(ChallengeResponse):
@ -229,9 +247,79 @@ class DVSNIResponse(ChallengeResponse):
return z.hexdigest().encode()
def z_domain(self, chall):
"""Domain name for certificate subjectAltName."""
"""Domain name for certificate subjectAltName.
:rtype bytes:
"""
return self.z(chall) + self.DOMAIN_SUFFIX
def gen_cert(self, chall, domain, key):
"""Generate DVSNI certificate.
:param .DVSNI chall: Corresponding challenge.
:param unicode domain:
:param OpenSSL.crypto.PKey
"""
return crypto_util.gen_ss_cert(key, [
domain, chall.nonce_domain.decode(), self.z_domain(chall).decode()])
def simple_verify(self, chall, domain, public_key, **kwargs):
"""Simple verify.
Probes DVSNI certificate and checks it using `verify_cert`;
hence all arguments documented in `verify_cert`.
"""
try:
cert = chall.probe_cert(domain=domain, **kwargs)
except errors.Error as error:
logger.debug(error, exc_info=True)
return False
return self.verify_cert(chall, domain, public_key, cert)
def verify_cert(self, chall, domain, public_key, cert):
"""Verify DVSNI certificate.
:param .challenges.DVSNI chall: Corresponding challenge.
:param str domain: Domain name being validated.
:param public_key: Public key for the key pair
being authorized. If ``None`` key verification is not
performed!
:type public_key:
`~cryptography.hazmat.primitives.asymmetric.rsa.RSAPublicKey`
or
`~cryptography.hazmat.primitives.asymmetric.dsa.DSAPublicKey`
or
`~cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey`
wrapped in `.ComparableKey
:param OpenSSL.crypto.X509 cert:
:returns: ``True`` iff client's control of the domain has been
verified, ``False`` otherwise.
:rtype: bool
"""
# TODO: check "It is a valid self-signed certificate" and
# return False if not
# pylint: disable=protected-access
sans = crypto_util._pyopenssl_cert_or_req_san(cert)
logging.debug('Certificate %s. SANs: %s', cert.digest('sha1'), sans)
cert = x509.load_der_x509_certificate(
OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, cert),
default_backend())
if public_key is None:
logging.warn('No key verification is performed')
elif public_key != jose.ComparableKey(cert.public_key()):
return False
return domain in sans and self.z_domain(chall).decode() in sans
@Challenge.register
class RecoveryContact(ContinuityChallenge):
"""ACME "recoveryContact" challenge.

View file

@ -7,6 +7,7 @@ import requests
from six.moves.urllib import parse as urllib_parse # pylint: disable=import-error
from acme import errors
from acme import jose
from acme import other
from acme import test_util
@ -177,31 +178,63 @@ class DVSNITest(unittest.TestCase):
self.assertRaises(
jose.DeserializationError, DVSNI.from_json, self.jmsg)
@mock.patch('acme.challenges.socket.gethostbyname')
@mock.patch('acme.challenges.crypto_util._probe_sni')
def test_probe_cert(self, mock_probe_sni, mock_gethostbyname):
mock_gethostbyname.return_value = '127.0.0.1'
self.msg.probe_cert('foo.com')
mock_gethostbyname.assert_called_once_with('foo.com')
mock_probe_sni.assert_called_once_with(
host='127.0.0.1', port=self.msg.PORT,
name=b'a82d5ff8ef740d12881f6d3c2277ab2e.acme.invalid')
self.msg.probe_cert('foo.com', host='8.8.8.8')
mock_probe_sni.assert_called_with(
host='8.8.8.8', port=mock.ANY, name=mock.ANY)
self.msg.probe_cert('foo.com', port=1234)
mock_probe_sni.assert_called_with(
host=mock.ANY, port=1234, name=mock.ANY)
self.msg.probe_cert('foo.com', bar='baz')
mock_probe_sni.assert_called_with(
host=mock.ANY, port=mock.ANY, name=mock.ANY, bar='baz')
self.msg.probe_cert('foo.com', name=b'xxx')
mock_probe_sni.assert_called_with(
host=mock.ANY, port=mock.ANY,
name=b'a82d5ff8ef740d12881f6d3c2277ab2e.acme.invalid')
class DVSNIResponseTest(unittest.TestCase):
def setUp(self):
from acme.challenges import DVSNIResponse
self.msg = DVSNIResponse(
s=b'\xf5\xd6\xe3\xb2]\xe0L\x0bN\x9cKJ\x14I\xa1K\xa3#\xf9\xa8'
b'\xcd\x8c7\x0e\x99\x19)\xdc\xb7\xf3\x9bw')
# pylint: disable=invalid-name
s = '9dbjsl3gTAtOnEtKFEmhS6Mj-ajNjDcOmRkp3Lfzm3c'
self.msg = DVSNIResponse(s=jose.decode_b64jose(s))
self.jmsg = {
'resource': 'challenge',
'type': 'dvsni',
's': '9dbjsl3gTAtOnEtKFEmhS6Mj-ajNjDcOmRkp3Lfzm3c',
's': s,
}
def test_z_and_domain(self):
from acme.challenges import DVSNI
challenge = DVSNI(
r=b"O*\xb4-\xad\xec\x95>\xed\xa9\r0\x94\xe8\x97\x9c&6"
b"\xbf'\xb3\xed\x9a9nX\x0f'\\m\xe7\x12",
nonce=int('439736375371401115242521957580409149254868992063'
'44333654741504362774620418661'))
self.chall = DVSNI(
r=jose.decode_b64jose('Tyq0La3slT7tqQ0wlOiXnCY2vyez7Zo5blgPJ1xt5xI'),
nonce=jose.decode_b64jose('a82d5ff8ef740d12881f6d3c2277ab2e'))
self.z = (b'38e612b0397cc2624a07d351d7ef50e4'
b'6134c0213d9ed52f7d7c611acaeed41b')
self.domain = 'foo.com'
self.key = test_util.load_pyopenssl_private_key('rsa512_key.pem')
self.public_key = test_util.load_rsa_private_key(
'rsa512_key.pem').public_key()
def test_z_and_domain(self):
# pylint: disable=invalid-name
z = b'38e612b0397cc2624a07d351d7ef50e46134c0213d9ed52f7d7c611acaeed41b'
self.assertEqual(z, self.msg.z(challenge))
self.assertEqual(z + b'.acme.invalid', self.msg.z_domain(challenge))
self.assertEqual(self.z, self.msg.z(self.chall))
self.assertEqual(
self.z + b'.acme.invalid', self.msg.z_domain(self.chall))
def test_to_partial_json(self):
self.assertEqual(self.jmsg, self.msg.to_partial_json())
@ -214,6 +247,45 @@ class DVSNIResponseTest(unittest.TestCase):
from acme.challenges import DVSNIResponse
hash(DVSNIResponse.from_json(self.jmsg))
@mock.patch('acme.challenges.DVSNIResponse.verify_cert')
def test_simple_verify(self, mock_verify_cert):
chall = mock.Mock()
chall.probe_cert.return_value = mock.sentinel.cert
mock_verify_cert.return_value = 'x'
self.assertEqual('x', self.msg.simple_verify(
chall, mock.sentinel.domain, mock.sentinel.key))
chall.probe_cert.assert_called_once_with(domain=mock.sentinel.domain)
self.msg.verify_cert.assert_called_once_with(
chall, mock.sentinel.domain, mock.sentinel.key,
mock.sentinel.cert)
def test_simple_verify_false_on_probe_error(self):
chall = mock.Mock()
chall.probe_cert.side_effect = errors.Error
self.assertFalse(self.msg.simple_verify(
chall=chall, domain=None, public_key=None))
def test_gen_verify_cert_postive_no_key(self):
cert = self.msg.gen_cert(self.chall, self.domain, self.key)
self.assertTrue(self.msg.verify_cert(
self.chall, self.domain, public_key=None, cert=cert))
def test_gen_verify_cert_postive_with_key(self):
cert = self.msg.gen_cert(self.chall, self.domain, self.key)
self.assertTrue(self.msg.verify_cert(
self.chall, self.domain, public_key=self.public_key, cert=cert))
def test_gen_verify_cert_negative_with_wrong_key(self):
cert = self.msg.gen_cert(self.chall, self.domain, self.key)
key = test_util.load_rsa_private_key('rsa256_key.pem').public_key()
self.assertFalse(self.msg.verify_cert(
self.chall, self.domain, public_key=key, cert=cert))
def test_gen_verify_cert_negative(self):
cert = self.msg.gen_cert(self.chall, self.domain + 'x', self.key)
self.assertFalse(self.msg.verify_cert(
self.chall, self.domain, public_key=None, cert=cert))
class RecoveryContactTest(unittest.TestCase):

195
acme/acme/crypto_util.py Normal file
View file

@ -0,0 +1,195 @@
"""Crypto utilities."""
import contextlib
import logging
import socket
import sys
from six.moves import range # pylint: disable=import-error,redefined-builtin
import OpenSSL
from acme import errors
logger = logging.getLogger(__name__)
# DVSNI certificate serving and probing is not affected by SSL
# vulnerabilities: prober needs to check certificate for expected
# contents anyway. Working SNI is the only thing that's necessary for
# the challenge and thus scoping down SSL/TLS method (version) would
# cause interoperability issues: TLSv1_METHOD is only compatible with
# TLSv1_METHOD, while SSLv23_METHOD is compatible with all other
# methods, including TLSv2_METHOD (read more at
# https://www.openssl.org/docs/ssl/SSLv23_method.html). _serve_sni
# should be changed to use "set_options" to disable SSLv2 and SSLv3,
# in case it's used for things other than probing/serving!
_DEFAULT_DVSNI_SSL_METHOD = OpenSSL.SSL.SSLv23_METHOD
def _serve_sni(certs, sock, reuseaddr=True, method=_DEFAULT_DVSNI_SSL_METHOD,
accept=None):
"""Start SNI-enabled server, that drops connection after handshake.
:param certs: Mapping from SNI name to ``(key, cert)`` `tuple`.
:param sock: Already bound socket.
:param bool reuseaddr: Should `socket.SO_REUSEADDR` be set?
:param method: See `OpenSSL.SSL.Context` for allowed values.
:param accept: Callable that doesn't take any arguments and
returns ``True`` if more connections should be served.
"""
def _pick_certificate(connection):
try:
key, cert = certs[connection.get_servername()]
except KeyError:
return
new_context = OpenSSL.SSL.Context(method)
new_context.use_privatekey(key)
new_context.use_certificate(cert)
connection.set_context(new_context)
if reuseaddr:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.listen(1) # TODO: add func arg?
while accept is None or accept():
server, addr = sock.accept()
logger.debug('Received connection from %s', addr)
with contextlib.closing(server):
context = OpenSSL.SSL.Context(method)
context.set_tlsext_servername_callback(_pick_certificate)
server_ssl = OpenSSL.SSL.Connection(context, server)
server_ssl.set_accept_state()
try:
server_ssl.do_handshake()
server_ssl.shutdown()
except OpenSSL.SSL.Error as error:
raise errors.Error(error)
def _probe_sni(name, host, port=443, timeout=300,
method=_DEFAULT_DVSNI_SSL_METHOD, source_address=('0', 0)):
"""Probe SNI server for SSL certificate.
:param bytes name: Byte string to send as the server name in the
client hello message.
:param bytes host: Host to connect to.
:param int port: Port to connect to.
:param int timeout: Timeout in seconds.
:param method: See `OpenSSL.SSL.Context` for allowed values.
:param tuple source_address: Enables multi-path probing (selection
of source interface). See `socket.creation_connection` for more
info. Available only in Python 2.7+.
:raises acme.errors.Error: In case of any problems.
:returns: SSL certificate presented by the server.
:rtype: OpenSSL.crypto.X509
"""
context = OpenSSL.SSL.Context(method)
context.set_timeout(timeout)
socket_kwargs = {} if sys.version_info < (2, 7) else {
'source_address': source_address}
try:
# pylint: disable=star-args
sock = socket.create_connection((host, port), **socket_kwargs)
except socket.error as error:
raise errors.Error(error)
with contextlib.closing(sock) as client:
client_ssl = OpenSSL.SSL.Connection(context, client)
client_ssl.set_connect_state()
client_ssl.set_tlsext_host_name(name) # pyOpenSSL>=0.13
try:
client_ssl.do_handshake()
client_ssl.shutdown()
except OpenSSL.SSL.Error as error:
raise errors.Error(error)
return client_ssl.get_peer_certificate()
def _pyopenssl_cert_or_req_san(cert_or_req):
"""Get Subject Alternative Names from certificate or CSR using pyOpenSSL.
.. todo:: Implement directly in PyOpenSSL!
.. note:: Although this is `acme` internal API, it is used by
`letsencrypt`.
:param cert_or_req: Certificate or CSR.
:type cert_or_req: `OpenSSL.crypto.X509` or `OpenSSL.crypto.X509Req`.
:returns: A list of Subject Alternative Names.
:rtype: `list` of `unicode`
"""
# constants based on implementation of
# OpenSSL.crypto.X509Error._subjectAltNameString
parts_separator = ", "
part_separator = ":"
extension_short_name = b"subjectAltName"
if hasattr(cert_or_req, 'get_extensions'): # X509Req
extensions = cert_or_req.get_extensions()
else: # X509
extensions = [cert_or_req.get_extension(i)
for i in range(cert_or_req.get_extension_count())]
# pylint: disable=protected-access,no-member
label = OpenSSL.crypto.X509Extension._prefixes[OpenSSL.crypto._lib.GEN_DNS]
assert parts_separator not in label
prefix = label + part_separator
san_extensions = [
ext._subjectAltNameString().split(parts_separator)
for ext in extensions if ext.get_short_name() == extension_short_name]
# WARNING: this function assumes that no SAN can include
# parts_separator, hence the split!
return [part.split(part_separator)[1] for parts in san_extensions
for part in parts if part.startswith(prefix)]
def gen_ss_cert(key, domains, not_before=None, validity=(7 * 24 * 60 * 60)):
"""Generate new self-signed certificate.
:type domains: `list` of `unicode`
:param OpenSSL.crypto.PKey key:
Uses key and contains all domains.
"""
assert domains, "Must provide one or more hostnames for the cert."
cert = OpenSSL.crypto.X509()
cert.set_serial_number(1337)
cert.set_version(2)
extensions = [
OpenSSL.crypto.X509Extension(
b"basicConstraints", True, b"CA:TRUE, pathlen:0"),
]
cert.get_subject().CN = domains[0]
# TODO: what to put into cert.get_subject()?
cert.set_issuer(cert.get_subject())
if len(domains) > 1:
extensions.append(OpenSSL.crypto.X509Extension(
b"subjectAltName",
critical=False,
value=b", ".join(b"DNS:" + d.encode() for d in domains)
))
cert.add_extensions(extensions)
cert.gmtime_adj_notBefore(0 if not_before is None else not_before)
cert.gmtime_adj_notAfter(validity)
cert.set_pubkey(key)
cert.sign(key, "sha256")
return cert

View file

@ -0,0 +1,104 @@
"""Tests for acme.crypto_util."""
import socket
import threading
import time
import unittest
import mock
import OpenSSL
from acme import errors
from acme import jose
from acme import test_util
class ServeProbeSNITest(unittest.TestCase):
"""Tests for acme.crypto_util._serve_sni/_probe_sni."""
def setUp(self):
self.cert = test_util.load_cert('cert.pem')
key = OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM,
test_util.load_vector('rsa512_key.pem'))
# pylint: disable=protected-access
certs = {b'foo': (key, self.cert._wrapped)}
sock = socket.socket()
sock.bind(('', 0)) # pick random port
self.port = sock.getsockname()[1]
self.server = threading.Thread(target=self._run_server, args=(certs, sock))
self.server.start()
time.sleep(1) # TODO: avoid race conditions in other way
@classmethod
def _run_server(cls, certs, sock):
from acme.crypto_util import _serve_sni
# TODO: improve testing of server errors and their conditions
try:
return _serve_sni(
certs, sock, accept=mock.Mock(side_effect=[True, False]))
except errors.Error:
pass
def tearDown(self):
self.server.join()
def _probe(self, name):
from acme.crypto_util import _probe_sni
return jose.ComparableX509(_probe_sni(
name, host='127.0.0.1', port=self.port))
def test_probe_ok(self):
self.assertEqual(self.cert, self._probe(b'foo'))
def test_probe_not_recognized_name(self):
self.assertRaises(errors.Error, self._probe, b'bar')
def test_probe_connection_error(self):
self._probe(b'foo')
time.sleep(1) # TODO: avoid race conditions in other way
self.assertRaises(errors.Error, self._probe, b'bar')
class PyOpenSSLCertOrReqSANTest(unittest.TestCase):
"""Test for acme.crypto_util._pyopenssl_cert_or_req_san."""
@classmethod
def _call(cls, loader, name):
# pylint: disable=protected-access
from acme.crypto_util import _pyopenssl_cert_or_req_san
return _pyopenssl_cert_or_req_san(loader(name))
def _call_cert(self, name):
return self._call(test_util.load_cert, name)
def _call_csr(self, name):
return self._call(test_util.load_csr, name)
def test_cert_no_sans(self):
self.assertEqual(self._call_cert('cert.pem'), [])
def test_cert_two_sans(self):
self.assertEqual(self._call_cert('cert-san.pem'),
['example.com', 'www.example.com'])
def test_csr_no_sans(self):
self.assertEqual(self._call_csr('csr-nosans.pem'), [])
def test_csr_one_san(self):
self.assertEqual(self._call_csr('csr.pem'), ['example.com'])
def test_csr_two_sans(self):
self.assertEqual(self._call_csr('csr-san.pem'),
['example.com', 'www.example.com'])
def test_csr_six_sans(self):
self.assertEqual(self._call_csr('csr-6sans.pem'),
["example.com", "example.org", "example.net",
"example.info", "subdomain.example.com",
"other.subdomain.example.com"])
if __name__ == "__main__":
unittest.main() # pragma: no cover

View file

@ -76,6 +76,7 @@ from acme.jose.jws import (
from acme.jose.util import (
ComparableX509,
ComparableKey,
ComparableRSAKey,
ImmutableMap,
)

View file

@ -66,14 +66,14 @@ class ComparableX509(object): # pylint: disable=too-few-public-methods
return '<{0}({1!r})>'.format(self.__class__.__name__, self._wrapped)
class ComparableRSAKey(object): # pylint: disable=too-few-public-methods
"""Wrapper for `cryptography` RSA keys.
class ComparableKey(object): # pylint: disable=too-few-public-methods
"""Comparable wrapper for `cryptography` keys.
Wraps around:
- `cryptography.hazmat.primitives.assymetric.RSAPrivateKey`
- `cryptography.hazmat.primitives.assymetric.RSAPublicKey`
See https://github.com/pyca/cryptography/issues/2122.
"""
__hash__ = NotImplemented
def __init__(self, wrapped):
self._wrapped = wrapped
@ -85,19 +85,36 @@ class ComparableRSAKey(object): # pylint: disable=too-few-public-methods
if (not isinstance(other, self.__class__) or
self._wrapped.__class__ is not other._wrapped.__class__):
return NotImplemented
# RSA*KeyWithSerialization requires cryptography>=0.8
if isinstance(self._wrapped, rsa.RSAPrivateKeyWithSerialization):
elif hasattr(self._wrapped, 'private_numbers'):
return self.private_numbers() == other.private_numbers()
elif isinstance(self._wrapped, rsa.RSAPublicKeyWithSerialization):
elif hasattr(self._wrapped, 'public_numbers'):
return self.public_numbers() == other.public_numbers()
else:
return False # we shouldn't reach here...
return NotImplemented
def __ne__(self, other):
return not self == other
def __repr__(self):
return '<{0}({1!r})>'.format(self.__class__.__name__, self._wrapped)
def public_key(self):
"""Get wrapped public key."""
return self.__class__(self._wrapped.public_key())
class ComparableRSAKey(ComparableKey): # pylint: disable=too-few-public-methods
"""Wrapper for `cryptography` RSA keys.
Wraps around:
- `cryptography.hazmat.primitives.assymetric.RSAPrivateKey`
- `cryptography.hazmat.primitives.assymetric.RSAPublicKey`
"""
def __hash__(self):
# public_numbers() hasn't got stable hash!
# https://github.com/pyca/cryptography/issues/2143
if isinstance(self._wrapped, rsa.RSAPrivateKeyWithSerialization):
priv = self.private_numbers()
pub = priv.public_numbers
@ -107,13 +124,6 @@ class ComparableRSAKey(object): # pylint: disable=too-few-public-methods
pub = self.public_numbers()
return hash((self.__class__, pub.n, pub.e))
def __repr__(self):
return '<{0}({1!r})>'.format(self.__class__.__name__, self._wrapped)
def public_key(self):
"""Get wrapped public key."""
return self.__class__(self._wrapped.public_key())
class ImmutableMap(collections.Mapping, collections.Hashable):
# pylint: disable=too-few-public-methods

View file

@ -55,3 +55,9 @@ def load_rsa_private_key(*names):
serialization.load_der_private_key)
return jose.ComparableRSAKey(loader(
load_vector(*names), password=None, backend=default_backend()))
def load_pyopenssl_private_key(*names):
"""Load pyOpenSSL private key."""
loader = _guess_loader(
names[-1], OpenSSL.crypto.FILETYPE_PEM, OpenSSL.crypto.FILETYPE_ASN1)
return OpenSSL.crypto.load_privatekey(loader, load_vector(*names))

12
acme/acme/testdata/csr-6sans.pem vendored Normal file
View file

@ -0,0 +1,12 @@
-----BEGIN CERTIFICATE REQUEST-----
MIIBuzCCAWUCAQAweTELMAkGA1UEBhMCVVMxETAPBgNVBAgTCE1pY2hpZ2FuMRIw
EAYDVQQHEwlBbm4gQXJib3IxDDAKBgNVBAoTA0VGRjEfMB0GA1UECxMWVW5pdmVy
c2l0eSBvZiBNaWNoaWdhbjEUMBIGA1UEAxMLZXhhbXBsZS5jb20wXDANBgkqhkiG
9w0BAQEFAANLADBIAkEA9LYRcVE3Nr+qleecEcX8JwVDnjeG1X7ucsCasuuZM0e0
9cmYuUzxIkMjO/9x4AVcvXXRXPEV+LzWWkfkTlzRMwIDAQABoIGGMIGDBgkqhkiG
9w0BCQ4xdjB0MHIGA1UdEQRrMGmCC2V4YW1wbGUuY29tggtleGFtcGxlLm9yZ4IL
ZXhhbXBsZS5uZXSCDGV4YW1wbGUuaW5mb4IVc3ViZG9tYWluLmV4YW1wbGUuY29t
ghtvdGhlci5zdWJkb21haW4uZXhhbXBsZS5jb20wDQYJKoZIhvcNAQELBQADQQBd
k4BE5qvEvkYoZM/2++Xd9RrQ6wsdj0QiJQCozfsI4lQx6ZJnbtNc7HpDrX4W6XIv
IvzVBz/nD11drfz/RNuX
-----END CERTIFICATE REQUEST-----

8
acme/acme/testdata/csr-nosans.pem vendored Normal file
View file

@ -0,0 +1,8 @@
-----BEGIN CERTIFICATE REQUEST-----
MIIBFTCBwAIBADBbMQswCQYDVQQGEwJBVTETMBEGA1UECAwKU29tZS1TdGF0ZTEh
MB8GA1UECgwYSW50ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMRQwEgYDVQQDDAtleGFt
cGxlLm9yZzBcMA0GCSqGSIb3DQEBAQUAA0sAMEgCQQD0thFxUTc2v6qV55wRxfwn
BUOeN4bVfu5ywJqy65kzR7T1yZi5TPEiQyM7/3HgBVy9ddFc8RX4vNZaR+ROXNEz
AgMBAAGgADANBgkqhkiG9w0BAQsFAANBAMikGL8Ch7hQCStXH7chhDp6+pt2+VSo
wgsrPQ2Bw4veDMlSemUrH+4e0TwbbntHfvXTDHWs9P3BiIDJLxFrjuA=
-----END CERTIFICATE REQUEST-----

View file

@ -13,7 +13,8 @@ install_requires = [
'pyrfc3339',
'ndg-httpsclient', # urllib3 InsecurePlatformWarning (#304)
'pyasn1', # urllib3 InsecurePlatformWarning (#304)
'PyOpenSSL',
# Connection.set_tlsext_host_name (>=0.13), X509Req.get_extensions (>=0.15)
'PyOpenSSL>=0.15',
'pytz',
'requests',
'six',

View file

@ -7,9 +7,11 @@ import socket
import subprocess
import sys
import OpenSSL
import zope.interface
from acme import challenges
from acme import crypto_util as acme_crypto_util
from letsencrypt import achallenges
from letsencrypt import constants as core_constants
@ -271,14 +273,17 @@ class NginxConfigurator(common.Plugin):
def _get_snakeoil_paths(self):
# TODO: generate only once
tmp_dir = os.path.join(self.config.work_dir, "snakeoil")
key = crypto_util.init_save_key(
le_key = crypto_util.init_save_key(
key_size=1024, key_dir=tmp_dir, keyname="key.pem")
cert_pem = crypto_util.make_ss_cert(
key.pem, domains=[socket.gethostname()])
cert = os.path.join(tmp_dir, "cert.pem")
with open(cert, 'w') as cert_file:
key = OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, le_key.pem)
cert = acme_crypto_util.gen_ss_cert(key, domains=[socket.gethostname()])
cert_path = os.path.join(tmp_dir, "cert.pem")
cert_pem = OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, cert)
with open(cert_path, 'w') as cert_file:
cert_file.write(cert_pem)
return cert, key.file
return cert_path, le_key.file
def _make_server_ssl(self, vhost):
"""Make a server SSL.

View file

@ -17,6 +17,8 @@ Note, that all annotated challenges act as a proxy objects::
achall.token == challb.token
"""
import OpenSSL
from acme import challenges
from acme.jose import util as jose_util
@ -48,7 +50,7 @@ class DVSNI(AnnotatedChallenge):
acme_type = challenges.DVSNI
def gen_cert_and_response(self, s=None): # pylint: disable=invalid-name
"""Generate a DVSNI cert and save it to filepath.
"""Generate a DVSNI cert and response.
:returns: ``(cert_pem, response)`` tuple, where ``cert_pem`` is the PEM
encoded certificate and ``response`` is an instance
@ -56,9 +58,12 @@ class DVSNI(AnnotatedChallenge):
:rtype: tuple
"""
key = crypto_util.private_jwk_to_pyopenssl(self.key)
response = challenges.DVSNIResponse(s=s)
cert_pem = crypto_util.make_ss_cert(self.key, [
self.domain, self.nonce_domain, response.z_domain(self.challb)])
cert = response.gen_cert(self.challb.chall, self.domain, key)
cert_pem = OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, cert)
return cert_pem, response

View file

@ -11,7 +11,7 @@ import os
from cryptography.hazmat.primitives import serialization
import OpenSSL
from acme import jose
from acme import crypto_util as acme_crypto_util
from letsencrypt import errors
from letsencrypt import le_util
@ -215,88 +215,13 @@ def pyopenssl_load_certificate(data):
return _pyopenssl_load(data, OpenSSL.crypto.load_certificate)
def make_ss_cert(key, domains, not_before=None,
validity=(7 * 24 * 60 * 60)):
"""Returns new self-signed cert in PEM form.
Uses key and contains all domains.
"""
if isinstance(key, jose.JWK):
key = key.key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption())
assert domains, "Must provide one or more hostnames for the cert."
pkey = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)
cert = OpenSSL.crypto.X509()
cert.set_serial_number(1337)
cert.set_version(2)
extensions = [
OpenSSL.crypto.X509Extension(
"basicConstraints", True, 'CA:TRUE, pathlen:0'),
]
cert.get_subject().CN = domains[0]
# TODO: what to put into cert.get_subject()?
cert.set_issuer(cert.get_subject())
if len(domains) > 1:
extensions.append(OpenSSL.crypto.X509Extension(
"subjectAltName",
critical=False,
value=", ".join("DNS:%s" % d for d in domains)
))
cert.add_extensions(extensions)
cert.gmtime_adj_notBefore(0 if not_before is None else not_before)
cert.gmtime_adj_notAfter(validity)
cert.set_pubkey(pkey)
cert.sign(pkey, "sha256")
return OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
def _pyopenssl_cert_or_req_san(cert_or_req):
"""Get Subject Alternative Names from certificate or CSR using pyOpenSSL.
.. todo:: Implement directly in PyOpenSSL!
:param cert_or_req: Certificate or CSR.
:type cert_or_req: `OpenSSL.crypto.X509` or `OpenSSL.crypto.X509Req`.
:returns: A list of Subject Alternative Names.
:rtype: list
"""
# constants based on implementation of
# OpenSSL.crypto.X509Error._subjectAltNameString
parts_separator = ", "
part_separator = ":"
extension_short_name = "subjectAltName"
if hasattr(cert_or_req, 'get_extensions'): # X509Req
extensions = cert_or_req.get_extensions()
else: # X509
extensions = [cert_or_req.get_extension(i)
for i in xrange(cert_or_req.get_extension_count())]
# pylint: disable=protected-access,no-member
label = OpenSSL.crypto.X509Extension._prefixes[OpenSSL.crypto._lib.GEN_DNS]
assert parts_separator not in label
prefix = label + part_separator
san_extensions = [
ext._subjectAltNameString().split(parts_separator)
for ext in extensions if ext.get_short_name() == extension_short_name]
# WARNING: this function assumes that no SAN can include
# parts_separator, hence the split!
return [part.split(part_separator)[1] for parts in san_extensions
for part in parts if part.startswith(prefix)]
def private_jwk_to_pyopenssl(jwk):
"""Convert private JWK to pyOpenSSL key."""
key_pem = jwk.key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption())
return OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, key_pem)
def _get_sans_from_cert_or_req(
@ -306,7 +231,8 @@ def _get_sans_from_cert_or_req(
except OpenSSL.crypto.Error as error:
logger.exception(error)
raise
return _pyopenssl_cert_or_req_san(cert_or_req)
# pylint: disable=protected-access
return acme_crypto_util._pyopenssl_cert_or_req_san(cert_or_req)
def get_sans_from_cert(cert, typ=OpenSSL.crypto.FILETYPE_PEM):

View file

@ -182,7 +182,7 @@ class Dvsni(object):
# Write out challenge key
key_pem = achall.key.key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption())
with le_util.safe_open(key_path, 'wb', chmod=0o400) as key_file:
key_file.write(key_pem)

View file

@ -16,6 +16,7 @@ import tempfile
import OpenSSL
from acme import client as acme_client
from acme import crypto_util as acme_crypto_util
from acme.jose import util as jose_util
from letsencrypt import crypto_util
@ -520,7 +521,7 @@ class Cert(object):
def get_san(self):
"""Get subject alternative name if available."""
# pylint: disable=protected-access
return ", ".join(crypto_util._pyopenssl_cert_or_req_san(self._cert))
return ", ".join(acme_crypto_util._pyopenssl_cert_or_req_san(self._cert))
def __str__(self):
text = [

View file

@ -4,10 +4,9 @@ import unittest
import OpenSSL
from acme import challenges
from acme import crypto_util as acme_crypto_util
from acme import jose
from letsencrypt import crypto_util
from letsencrypt.tests import acme_util
from letsencrypt.tests import test_util
@ -35,7 +34,7 @@ class DVSNITest(unittest.TestCase):
OpenSSL.crypto.FILETYPE_PEM, cert_pem)
self.assertEqual(cert.get_subject().CN, "example.com")
# pylint: disable=protected-access
self.assertEqual(crypto_util._pyopenssl_cert_or_req_san(cert), [
self.assertEqual(acme_crypto_util._pyopenssl_cert_or_req_san(cert), [
"example.com", self.chall.nonce_domain,
self.response.z_domain(self.chall)])

View file

@ -160,15 +160,6 @@ class ValidPrivkeyTest(unittest.TestCase):
self.assertFalse(self._call('foo bar'))
class MakeSSCertTest(unittest.TestCase):
# pylint: disable=too-few-public-methods
"""Tests for letsencrypt.crypto_util.make_ss_cert."""
def test_it(self): # pylint: disable=no-self-use
from letsencrypt.crypto_util import make_ss_cert
make_ss_cert(RSA512_KEY, ['example.com', 'www.example.com'])
class GetSANsFromCertTest(unittest.TestCase):
"""Tests for letsencrypt.crypto_util.get_sans_from_cert."""

View file

@ -37,8 +37,7 @@ install_requires = [
'mock<1.1.0', # py26
'parsedatetime',
'psutil>=2.1.0', # net_connections introduced in 2.1.0
# https://pyopenssl.readthedocs.org/en/latest/api/crypto.html#OpenSSL.crypto.X509Req.get_extensions
'PyOpenSSL>=0.15',
'PyOpenSSL',
'pyrfc3339',
'python2-pythondialog>=3.2.2rc1', # Debian squeeze support, cf. #280
'pytz',