Move make_csr into acme.crypto_util (#4165)

It's relatively finicky to make a CSR appropriate to pass to poll_and_request_issuance. I think most users want to be able to give a list of domains and a private key, and get back a CSR. This branch adds that functionality to crypto_util.

Note that the two new functions take arguments, and return values, as PEM-encoded buffers. This is a departure from some existing ACME interfaces that take PyOpenSSL types. I've discussed with the Certbot team, and we agree that this is broadly the direction the ACME API should take, so that users of the module don't need to import PyOpenSSL themselves, or use its primitives.

* Add make_csr.

* accept privkey

* Tweak API.

* Remove make_csr from certbot package.

* Skip test in older Pythons.

* Move get_Extensions call under protection.

* Remove assertIn because not backwards-compatible.

* Fix encoding, and use PEM.

* Fix test

* Fix tests on py35.

* Fix error in test.

* Make import_csr_file always return PEM.

Also delete get_sans_from_csr (unused) and get_names_from_csr (newly unused).

* Fix function doc.

* Fix indent

* Fix call of obtain_certificate_from_Csr

* lint

* Handle review feedback.

* Fix test.
This commit is contained in:
Jacob Hoffman-Andrews 2017-04-07 09:43:33 -07:00 committed by Brad Warren
parent 2e8a5ef477
commit cacee80c51
8 changed files with 122 additions and 207 deletions

View file

@ -149,6 +149,36 @@ def probe_sni(name, host, port=443, timeout=300,
raise errors.Error(error)
return client_ssl.get_peer_certificate()
def make_csr(private_key_pem, domains, must_staple=False):
"""Generate a CSR containing a list of domains as subjectAltNames.
:param buffer private_key_pem: Private key, in PEM PKCS#8 format.
:param list domains: List of DNS names to include in subjectAltNames of CSR.
:param bool must_staple: Whether to include the TLS Feature extension (aka
OCSP Must Staple: https://tools.ietf.org/html/rfc7633).
:returns: buffer PEM-encoded Certificate Signing Request.
"""
private_key = OpenSSL.crypto.load_privatekey(
OpenSSL.crypto.FILETYPE_PEM, private_key_pem)
csr = OpenSSL.crypto.X509Req()
extensions = [
OpenSSL.crypto.X509Extension(
b'subjectAltName',
critical=False,
value=', '.join('DNS:' + d for d in domains).encode('ascii')
),
]
if must_staple:
extensions.append(OpenSSL.crypto.X509Extension(
b"1.3.6.1.5.5.7.1.24",
critical=False,
value=b"DER:30:03:02:01:05"))
csr.add_extensions(extensions)
csr.set_pubkey(private_key)
csr.set_version(2)
csr.sign(private_key, 'sha256')
return OpenSSL.crypto.dump_certificate_request(
OpenSSL.crypto.FILETYPE_PEM, csr)
def _pyopenssl_cert_or_req_san(cert_or_req):
"""Get Subject Alternative Names from certificate or CSR using pyOpenSSL.

View file

@ -151,6 +151,53 @@ class RandomSnTest(unittest.TestCase):
self.serial_num.append(cert.get_serial_number())
self.assertTrue(len(set(self.serial_num)) > 1)
class MakeCSRTest(unittest.TestCase):
"""Test for standalone functions."""
@classmethod
def _call_with_key(cls, *args, **kwargs):
privkey = OpenSSL.crypto.PKey()
privkey.generate_key(OpenSSL.crypto.TYPE_RSA, 2048)
privkey_pem = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, privkey)
from acme.crypto_util import make_csr
return make_csr(privkey_pem, *args, **kwargs)
def test_make_csr(self):
csr_pem = self._call_with_key(["a.example", "b.example"])
self.assertTrue(b'--BEGIN CERTIFICATE REQUEST--' in csr_pem)
self.assertTrue(b'--END CERTIFICATE REQUEST--' in csr_pem)
csr = OpenSSL.crypto.load_certificate_request(
OpenSSL.crypto.FILETYPE_PEM, csr_pem)
# In pyopenssl 0.13 (used with TOXENV=py26-oldest and py27-oldest), csr
# objects don't have a get_extensions() method, so we skip this test if
# the method isn't available.
if hasattr(csr, 'get_extensions'):
self.assertEquals(len(csr.get_extensions()), 1)
self.assertEquals(csr.get_extensions()[0].get_data(),
OpenSSL.crypto.X509Extension(
b'subjectAltName',
critical=False,
value=b'DNS:a.example, DNS:b.example',
).get_data(),
)
def test_make_csr_must_staple(self):
csr_pem = self._call_with_key(["a.example"], must_staple=True)
csr = OpenSSL.crypto.load_certificate_request(
OpenSSL.crypto.FILETYPE_PEM, csr_pem)
# In pyopenssl 0.13 (used with TOXENV=py26-oldest and py27-oldest), csr
# objects don't have a get_extensions() method, so we skip this test if
# the method isn't available.
if hasattr(csr, 'get_extensions'):
self.assertEquals(len(csr.get_extensions()), 2)
# NOTE: Ideally we would filter by the TLS Feature OID, but
# OpenSSL.crypto.X509Extension doesn't give us the extension's raw OID,
# and the shortname field is just "UNDEF"
must_staple_exts = [e for e in csr.get_extensions()
if e.get_data() == b"0\x03\x02\x01\x05"]
self.assertEqual(len(must_staple_exts), 1,
"Expected exactly one Must Staple extension")
if __name__ == '__main__':
unittest.main() # pragma: no cover

View file

@ -207,15 +207,14 @@ class Client(object):
else:
self.auth_handler = None
def obtain_certificate_from_csr(self, domains, csr,
typ=OpenSSL.crypto.FILETYPE_ASN1, authzr=None):
def obtain_certificate_from_csr(self, domains, csr, authzr=None):
"""Obtain certificate.
Internal function with precondition that `domains` are
consistent with identifiers present in the `csr`.
:param list domains: Domain names.
:param .util.CSR csr: DER-encoded Certificate Signing
:param .util.CSR csr: PEM-encoded Certificate Signing
Request. The key used to generate this CSR can be different
than `authkey`.
:param list authzr: List of
@ -241,7 +240,7 @@ class Client(object):
certr = self.acme.request_issuance(
jose.ComparableX509(
OpenSSL.crypto.load_certificate_request(typ, csr.data)),
OpenSSL.crypto.load_certificate_request(OpenSSL.crypto.FILETYPE_PEM, csr.data)),
authzr)
notify = zope.component.getUtility(interfaces.IDisplay).notification

View file

@ -6,7 +6,6 @@
"""
import logging
import os
import traceback
import OpenSSL
import pyrfc3339
@ -66,7 +65,7 @@ def init_save_key(key_size, key_dir, keyname="key-certbot.pem"):
return util.Key(key_path, key_pem)
def init_save_csr(privkey, names, path, csrname="csr-certbot.pem"):
def init_save_csr(privkey, names, path):
"""Initialize a CSR with the given private key.
:param privkey: Key to include in the CSR
@ -82,8 +81,8 @@ def init_save_csr(privkey, names, path, csrname="csr-certbot.pem"):
"""
config = zope.component.getUtility(interfaces.IConfig)
csr_pem, csr_der = make_csr(privkey.pem, names,
must_staple=config.must_staple)
csr_pem = acme_crypto_util.make_csr(
privkey.pem, names, must_staple=config.must_staple)
# Save CSR
util.make_or_verify_dir(path, 0o755, os.geteuid(),
@ -93,53 +92,12 @@ def init_save_csr(privkey, names, path, csrname="csr-certbot.pem"):
logger.info("Creating CSR: not saving to file")
else:
csr_f, csr_filename = util.unique_file(
os.path.join(path, csrname), 0o644, "wb")
os.path.join(path, "csr-certbot.pem"), 0o644, "wb")
with csr_f:
csr_f.write(csr_pem)
logger.info("Creating CSR: %s", csr_filename)
return util.CSR(csr_filename, csr_der, "der")
# Lower level functions
def make_csr(key_str, domains, must_staple=False):
"""Generate a CSR.
:param str key_str: PEM-encoded RSA key.
:param list domains: Domains included in the certificate.
.. todo:: Detect duplicates in `domains`? Using a set doesn't
preserve order...
:returns: new CSR in PEM and DER form containing all domains
:rtype: tuple
"""
assert domains, "Must provide one or more hostnames for the CSR."
pkey = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, key_str)
req = OpenSSL.crypto.X509Req()
req.get_subject().CN = domains[0]
# TODO: what to put into req.get_subject()?
# TODO: put SAN if len(domains) > 1
extensions = [
OpenSSL.crypto.X509Extension(
b"subjectAltName",
critical=False,
value=", ".join("DNS:%s" % d for d in domains).encode('ascii')
)
]
if must_staple:
extensions.append(OpenSSL.crypto.X509Extension(
b"1.3.6.1.5.5.7.1.24",
critical=False,
value=b"DER:30:03:02:01:05"))
req.add_extensions(extensions)
req.set_version(2)
req.set_pubkey(pkey)
req.sign(pkey, "sha256")
return tuple(OpenSSL.crypto.dump_certificate_request(method, req)
for method in (OpenSSL.crypto.FILETYPE_PEM,
OpenSSL.crypto.FILETYPE_ASN1))
return util.CSR(csr_filename, csr_pem, "pem")
# WARNING: the csr and private key file are possible attack vectors for TOCTOU
@ -193,22 +151,27 @@ def import_csr_file(csrfile, data):
:param str csrfile: CSR filename
:param str data: contents of the CSR file
:returns: (`OpenSSL.crypto.FILETYPE_PEM` or `OpenSSL.crypto.FILETYPE_ASN1`,
:returns: (`OpenSSL.crypto.FILETYPE_PEM`,
util.CSR object representing the CSR,
list of domains requested in the CSR)
:rtype: tuple
"""
for form, typ in (("der", OpenSSL.crypto.FILETYPE_ASN1,),
("pem", OpenSSL.crypto.FILETYPE_PEM,),):
PEM = OpenSSL.crypto.FILETYPE_PEM
load = OpenSSL.crypto.load_certificate_request
try:
# Try to parse as DER first, then fall back to PEM.
csr = load(OpenSSL.crypto.FILETYPE_ASN1, data)
except OpenSSL.crypto.Error:
try:
domains = get_names_from_csr(data, typ)
csr = load(PEM, data)
except OpenSSL.crypto.Error:
logger.debug("CSR parse error (form=%s, typ=%s):", form, typ)
logger.debug(traceback.format_exc())
continue
return typ, util.CSR(file=csrfile, data=data, form=form), domains
raise errors.Error("Failed to parse CSR file: {0}".format(csrfile))
raise errors.Error("Failed to parse CSR file: {0}".format(csrfile))
domains = _get_names_from_loaded_cert_or_req(csr)
# Internally we always use PEM, so re-encode as PEM before returning.
data_pem = OpenSSL.crypto.dump_certificate_request(PEM, csr)
return PEM, util.CSR(file=csrfile, data=data_pem, form="pem"), domains
def make_key(bits):
@ -290,22 +253,12 @@ def get_sans_from_cert(cert, typ=OpenSSL.crypto.FILETYPE_PEM):
cert, OpenSSL.crypto.load_certificate, typ)
def get_sans_from_csr(csr, typ=OpenSSL.crypto.FILETYPE_PEM):
"""Get a list of Subject Alternative Names from a CSR.
:param str csr: CSR (encoded).
:param typ: `OpenSSL.crypto.FILETYPE_PEM` or `OpenSSL.crypto.FILETYPE_ASN1`
:returns: A list of Subject Alternative Names.
:rtype: list
"""
return _get_sans_from_cert_or_req(
csr, OpenSSL.crypto.load_certificate_request, typ)
def _get_names_from_cert_or_req(cert_or_req, load_func, typ):
loaded_cert_or_req = _load_cert_or_req(cert_or_req, load_func, typ)
return _get_names_from_loaded_cert_or_req(loaded_cert_or_req)
def _get_names_from_loaded_cert_or_req(loaded_cert_or_req):
common_name = loaded_cert_or_req.get_subject().CN
# pylint: disable=protected-access
sans = acme_crypto_util._pyopenssl_cert_or_req_san(loaded_cert_or_req)
@ -330,20 +283,6 @@ def get_names_from_cert(csr, typ=OpenSSL.crypto.FILETYPE_PEM):
csr, OpenSSL.crypto.load_certificate, typ)
def get_names_from_csr(csr, typ=OpenSSL.crypto.FILETYPE_PEM):
"""Get a list of domains from a CSR, including the CN if it is set.
:param str csr: CSR (encoded).
:param typ: `OpenSSL.crypto.FILETYPE_PEM` or `OpenSSL.crypto.FILETYPE_ASN1`
:returns: A list of domain names.
:rtype: list
"""
return _get_names_from_cert_or_req(
csr, OpenSSL.crypto.load_certificate_request, typ)
def dump_pyopenssl_chain(chain, filetype=OpenSSL.crypto.FILETYPE_PEM):
"""Dump certificate chain into a bundle.

View file

@ -617,8 +617,8 @@ def _csr_get_and_save_cert(config, le_client):
have the privkey, and therefore can't construct the files for a lineage.
So we just save the cert & chain to disk :/
"""
csr, typ = config.actual_csr
certr, chain = le_client.obtain_certificate_from_csr(config.domains, csr, typ)
csr, _ = config.actual_csr
certr, chain = le_client.obtain_certificate_from_csr(config.domains, csr)
if config.dry_run:
logger.debug(
"Dry run: skipping saving certificate to %s", config.cert_path)

View file

@ -18,7 +18,7 @@ import certbot.tests.util as test_util
KEY = test_util.load_vector("rsa512_key.pem")
CSR_SAN = test_util.load_vector("csr-san.der")
CSR_SAN = test_util.load_vector("csr-san.pem")
class ConfigHelper(object):
@ -165,7 +165,7 @@ class ClientTest(ClientTestCommon):
self.acme.request_issuance.assert_called_once_with(
jose.ComparableX509(OpenSSL.crypto.load_certificate_request(
OpenSSL.crypto.FILETYPE_ASN1, CSR_SAN)),
OpenSSL.crypto.FILETYPE_PEM, CSR_SAN)),
authzr)
self.acme.fetch_chain.assert_called_once_with(mock.sentinel.certr)
@ -175,7 +175,7 @@ class ClientTest(ClientTestCommon):
def test_obtain_certificate_from_csr(self, unused_mock_get_utility,
mock_logger):
self._mock_obtain_certificate()
test_csr = util.CSR(form="der", file=None, data=CSR_SAN)
test_csr = util.CSR(form="pem", file=None, data=CSR_SAN)
auth_handler = self.client.auth_handler
authzr = auth_handler.get_authorizations(self.eg_domains, False)
@ -246,7 +246,7 @@ class ClientTest(ClientTestCommon):
mock_crypto_util):
self._mock_obtain_certificate()
csr = util.CSR(form="der", file=None, data=CSR_SAN)
csr = util.CSR(form="pem", file=None, data=CSR_SAN)
mock_crypto_util.init_save_csr.return_value = csr
mock_crypto_util.init_save_key.return_value = mock.sentinel.key
domains = ["example.com", "www.example.com"]

View file

@ -74,21 +74,20 @@ class InitSaveCSRTest(test_util.TempDirTestCase):
mock.Mock(strict_permissions=True, dry_run=False),
interfaces.IConfig)
@mock.patch('certbot.crypto_util.make_csr')
@mock.patch('acme.crypto_util.make_csr')
@mock.patch('certbot.crypto_util.util.make_or_verify_dir')
def test_success(self, unused_mock_verify, mock_csr):
from certbot.crypto_util import init_save_csr
mock_csr.return_value = (b'csr_pem', b'csr_der')
mock_csr.return_value = b'csr_pem'
csr = init_save_csr(
mock.Mock(pem='dummy_key'), 'example.com', self.tempdir,
'csr-certbot.pem')
mock.Mock(pem='dummy_key'), 'example.com', self.tempdir)
self.assertEqual(csr.data, b'csr_der')
self.assertEqual(csr.data, b'csr_pem')
self.assertTrue('csr-certbot.pem' in csr.file)
@mock.patch('certbot.crypto_util.make_csr')
@mock.patch('acme.crypto_util.make_csr')
@mock.patch('certbot.crypto_util.util.make_or_verify_dir')
def test_success_dry_run(self, unused_mock_verify, mock_csr):
from certbot.crypto_util import init_save_csr
@ -96,55 +95,15 @@ class InitSaveCSRTest(test_util.TempDirTestCase):
zope.component.provideUtility(
mock.Mock(strict_permissions=True, dry_run=True),
interfaces.IConfig)
mock_csr.return_value = (b'csr_pem', b'csr_der')
mock_csr.return_value = b'csr_pem'
csr = init_save_csr(
mock.Mock(pem='dummy_key'), 'example.com', self.tempdir,
'csr-certbot.pem')
mock.Mock(pem='dummy_key'), 'example.com', self.tempdir)
self.assertEqual(csr.data, b'csr_der')
self.assertEqual(csr.data, b'csr_pem')
self.assertTrue(csr.file is None)
class MakeCSRTest(unittest.TestCase):
"""Tests for certbot.crypto_util.make_csr."""
@classmethod
def _call(cls, *args, **kwargs):
from certbot.crypto_util import make_csr
return make_csr(*args, **kwargs)
def test_san(self):
from certbot.crypto_util import get_sans_from_csr
# TODO: Fails for RSA256_KEY
csr_pem, csr_der = self._call(
RSA512_KEY, ['example.com', 'www.example.com'])
self.assertEqual(
['example.com', 'www.example.com'], get_sans_from_csr(csr_pem))
self.assertEqual(
['example.com', 'www.example.com'], get_sans_from_csr(
csr_der, OpenSSL.crypto.FILETYPE_ASN1))
def test_must_staple(self):
# TODO: Fails for RSA256_KEY
csr_pem, _ = self._call(
RSA512_KEY, ['example.com', 'www.example.com'], must_staple=True)
csr = OpenSSL.crypto.load_certificate_request(
OpenSSL.crypto.FILETYPE_PEM, csr_pem)
# In pyopenssl 0.13 (used with TOXENV=py26-oldest and py27-oldest), csr
# objects don't have a get_extensions() method, so we skip this test if
# the method isn't available.
if hasattr(csr, 'get_extensions'):
# NOTE: Ideally we would filter by the TLS Feature OID, but
# OpenSSL.crypto.X509Extension doesn't give us the extension's raw OID,
# and the shortname field is just "UNDEF"
must_staple_exts = [e for e in csr.get_extensions()
if e.get_data() == b"0\x03\x02\x01\x05"]
self.assertEqual(len(must_staple_exts), 1,
"Expected exactly one Must Staple extension")
class ValidCSRTest(unittest.TestCase):
"""Tests for certbot.crypto_util.valid_csr."""
@ -162,9 +121,6 @@ class ValidCSRTest(unittest.TestCase):
def test_valid_der_false(self):
self.assertFalse(self._call(test_util.load_vector('csr.der')))
def test_valid_der_san_false(self):
self.assertFalse(self._call(test_util.load_vector('csr-san.der')))
def test_empty_false(self):
self.assertFalse(self._call(''))
@ -200,12 +156,13 @@ class ImportCSRFileTest(unittest.TestCase):
def test_der_csr(self):
csrfile = test_util.vector_path('csr.der')
data = test_util.load_vector('csr.der')
data_pem = test_util.load_vector('csr.pem')
self.assertEqual(
(OpenSSL.crypto.FILETYPE_ASN1,
(OpenSSL.crypto.FILETYPE_PEM,
util.CSR(file=csrfile,
data=data,
form="der"),
data=data_pem,
form="pem"),
["example.com"],),
self._call(csrfile, data))
@ -272,36 +229,6 @@ class GetSANsFromCertTest(unittest.TestCase):
self._call(test_util.load_vector('cert-san.pem')))
class GetSANsFromCSRTest(unittest.TestCase):
"""Tests for certbot.crypto_util.get_sans_from_csr."""
@classmethod
def _call(cls, *args, **kwargs):
from certbot.crypto_util import get_sans_from_csr
return get_sans_from_csr(*args, **kwargs)
def test_extract_one_san(self):
self.assertEqual(['example.com'], self._call(
test_util.load_vector('csr.pem')))
def test_extract_two_sans(self):
self.assertEqual(['example.com', 'www.example.com'], self._call(
test_util.load_vector('csr-san.pem')))
def test_extract_six_sans(self):
self.assertEqual(self._call(test_util.load_vector('csr-6sans.pem')),
["example.com", "example.org", "example.net",
"example.info", "subdomain.example.com",
"other.subdomain.example.com"])
def test_parse_non_csr(self):
self.assertRaises(OpenSSL.crypto.Error, self._call, "hello there")
def test_parse_no_sans(self):
self.assertEqual(
[], self._call(test_util.load_vector('csr-nosans.pem')))
class GetNamesFromCertTest(unittest.TestCase):
"""Tests for certbot.crypto_util.get_names_from_cert."""
@ -327,36 +254,9 @@ class GetNamesFromCertTest(unittest.TestCase):
['example.com'] + ['{0}.example.com'.format(c) for c in 'abcd'],
self._call(test_util.load_vector('cert-5sans.pem')))
class GetNamesFromCSRTest(unittest.TestCase):
"""Tests for certbot.crypto_util.get_names_from_csr."""
@classmethod
def _call(cls, *args, **kwargs):
from certbot.crypto_util import get_names_from_csr
return get_names_from_csr(*args, **kwargs)
def test_extract_one_san(self):
self.assertEqual(['example.com'], self._call(
test_util.load_vector('csr.pem')))
def test_extract_two_sans(self):
self.assertEqual(set(('example.com', 'www.example.com',)), set(
self._call(test_util.load_vector('csr-san.pem'))))
def test_extract_six_sans(self):
self.assertEqual(
set(self._call(test_util.load_vector('csr-6sans.pem'))),
set(("example.com", "example.org", "example.net",
"example.info", "subdomain.example.com",
"other.subdomain.example.com",)))
def test_parse_non_csr(self):
def test_parse_non_cert(self):
self.assertRaises(OpenSSL.crypto.Error, self._call, "hello there")
def test_parse_no_sans(self):
self.assertEqual(["example.org"],
self._call(test_util.load_vector('csr-nosans.pem')))
class CertLoaderTest(unittest.TestCase):
"""Tests for certbot.crypto_util.pyopenssl_load_certificate"""

Binary file not shown.