diff --git a/acme/acme/crypto_util.py b/acme/acme/crypto_util.py index 6a33b3e52..f86a9971a 100644 --- a/acme/acme/crypto_util.py +++ b/acme/acme/crypto_util.py @@ -149,6 +149,36 @@ def probe_sni(name, host, port=443, timeout=300, raise errors.Error(error) return client_ssl.get_peer_certificate() +def make_csr(private_key_pem, domains, must_staple=False): + """Generate a CSR containing a list of domains as subjectAltNames. + + :param buffer private_key_pem: Private key, in PEM PKCS#8 format. + :param list domains: List of DNS names to include in subjectAltNames of CSR. + :param bool must_staple: Whether to include the TLS Feature extension (aka + OCSP Must Staple: https://tools.ietf.org/html/rfc7633). + :returns: buffer PEM-encoded Certificate Signing Request. + """ + private_key = OpenSSL.crypto.load_privatekey( + OpenSSL.crypto.FILETYPE_PEM, private_key_pem) + csr = OpenSSL.crypto.X509Req() + extensions = [ + OpenSSL.crypto.X509Extension( + b'subjectAltName', + critical=False, + value=', '.join('DNS:' + d for d in domains).encode('ascii') + ), + ] + if must_staple: + extensions.append(OpenSSL.crypto.X509Extension( + b"1.3.6.1.5.5.7.1.24", + critical=False, + value=b"DER:30:03:02:01:05")) + csr.add_extensions(extensions) + csr.set_pubkey(private_key) + csr.set_version(2) + csr.sign(private_key, 'sha256') + return OpenSSL.crypto.dump_certificate_request( + OpenSSL.crypto.FILETYPE_PEM, csr) def _pyopenssl_cert_or_req_san(cert_or_req): """Get Subject Alternative Names from certificate or CSR using pyOpenSSL. diff --git a/acme/acme/crypto_util_test.py b/acme/acme/crypto_util_test.py index 9cf1f7deb..845f43914 100644 --- a/acme/acme/crypto_util_test.py +++ b/acme/acme/crypto_util_test.py @@ -151,6 +151,53 @@ class RandomSnTest(unittest.TestCase): self.serial_num.append(cert.get_serial_number()) self.assertTrue(len(set(self.serial_num)) > 1) +class MakeCSRTest(unittest.TestCase): + """Test for standalone functions.""" + + @classmethod + def _call_with_key(cls, *args, **kwargs): + privkey = OpenSSL.crypto.PKey() + privkey.generate_key(OpenSSL.crypto.TYPE_RSA, 2048) + privkey_pem = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, privkey) + from acme.crypto_util import make_csr + return make_csr(privkey_pem, *args, **kwargs) + + def test_make_csr(self): + csr_pem = self._call_with_key(["a.example", "b.example"]) + self.assertTrue(b'--BEGIN CERTIFICATE REQUEST--' in csr_pem) + self.assertTrue(b'--END CERTIFICATE REQUEST--' in csr_pem) + csr = OpenSSL.crypto.load_certificate_request( + OpenSSL.crypto.FILETYPE_PEM, csr_pem) + # In pyopenssl 0.13 (used with TOXENV=py26-oldest and py27-oldest), csr + # objects don't have a get_extensions() method, so we skip this test if + # the method isn't available. + if hasattr(csr, 'get_extensions'): + self.assertEquals(len(csr.get_extensions()), 1) + self.assertEquals(csr.get_extensions()[0].get_data(), + OpenSSL.crypto.X509Extension( + b'subjectAltName', + critical=False, + value=b'DNS:a.example, DNS:b.example', + ).get_data(), + ) + + def test_make_csr_must_staple(self): + csr_pem = self._call_with_key(["a.example"], must_staple=True) + csr = OpenSSL.crypto.load_certificate_request( + OpenSSL.crypto.FILETYPE_PEM, csr_pem) + + # In pyopenssl 0.13 (used with TOXENV=py26-oldest and py27-oldest), csr + # objects don't have a get_extensions() method, so we skip this test if + # the method isn't available. + if hasattr(csr, 'get_extensions'): + self.assertEquals(len(csr.get_extensions()), 2) + # NOTE: Ideally we would filter by the TLS Feature OID, but + # OpenSSL.crypto.X509Extension doesn't give us the extension's raw OID, + # and the shortname field is just "UNDEF" + must_staple_exts = [e for e in csr.get_extensions() + if e.get_data() == b"0\x03\x02\x01\x05"] + self.assertEqual(len(must_staple_exts), 1, + "Expected exactly one Must Staple extension") if __name__ == '__main__': unittest.main() # pragma: no cover diff --git a/certbot/client.py b/certbot/client.py index f6de26e3d..3e9caad40 100644 --- a/certbot/client.py +++ b/certbot/client.py @@ -207,15 +207,14 @@ class Client(object): else: self.auth_handler = None - def obtain_certificate_from_csr(self, domains, csr, - typ=OpenSSL.crypto.FILETYPE_ASN1, authzr=None): + def obtain_certificate_from_csr(self, domains, csr, authzr=None): """Obtain certificate. Internal function with precondition that `domains` are consistent with identifiers present in the `csr`. :param list domains: Domain names. - :param .util.CSR csr: DER-encoded Certificate Signing + :param .util.CSR csr: PEM-encoded Certificate Signing Request. The key used to generate this CSR can be different than `authkey`. :param list authzr: List of @@ -241,7 +240,7 @@ class Client(object): certr = self.acme.request_issuance( jose.ComparableX509( - OpenSSL.crypto.load_certificate_request(typ, csr.data)), + OpenSSL.crypto.load_certificate_request(OpenSSL.crypto.FILETYPE_PEM, csr.data)), authzr) notify = zope.component.getUtility(interfaces.IDisplay).notification diff --git a/certbot/crypto_util.py b/certbot/crypto_util.py index 1ad76d503..5671341e8 100644 --- a/certbot/crypto_util.py +++ b/certbot/crypto_util.py @@ -6,7 +6,6 @@ """ import logging import os -import traceback import OpenSSL import pyrfc3339 @@ -66,7 +65,7 @@ def init_save_key(key_size, key_dir, keyname="key-certbot.pem"): return util.Key(key_path, key_pem) -def init_save_csr(privkey, names, path, csrname="csr-certbot.pem"): +def init_save_csr(privkey, names, path): """Initialize a CSR with the given private key. :param privkey: Key to include in the CSR @@ -82,8 +81,8 @@ def init_save_csr(privkey, names, path, csrname="csr-certbot.pem"): """ config = zope.component.getUtility(interfaces.IConfig) - csr_pem, csr_der = make_csr(privkey.pem, names, - must_staple=config.must_staple) + csr_pem = acme_crypto_util.make_csr( + privkey.pem, names, must_staple=config.must_staple) # Save CSR util.make_or_verify_dir(path, 0o755, os.geteuid(), @@ -93,53 +92,12 @@ def init_save_csr(privkey, names, path, csrname="csr-certbot.pem"): logger.info("Creating CSR: not saving to file") else: csr_f, csr_filename = util.unique_file( - os.path.join(path, csrname), 0o644, "wb") + os.path.join(path, "csr-certbot.pem"), 0o644, "wb") with csr_f: csr_f.write(csr_pem) logger.info("Creating CSR: %s", csr_filename) - return util.CSR(csr_filename, csr_der, "der") - - -# Lower level functions -def make_csr(key_str, domains, must_staple=False): - """Generate a CSR. - - :param str key_str: PEM-encoded RSA key. - :param list domains: Domains included in the certificate. - - .. todo:: Detect duplicates in `domains`? Using a set doesn't - preserve order... - - :returns: new CSR in PEM and DER form containing all domains - :rtype: tuple - - """ - assert domains, "Must provide one or more hostnames for the CSR." - pkey = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, key_str) - req = OpenSSL.crypto.X509Req() - req.get_subject().CN = domains[0] - # TODO: what to put into req.get_subject()? - # TODO: put SAN if len(domains) > 1 - extensions = [ - OpenSSL.crypto.X509Extension( - b"subjectAltName", - critical=False, - value=", ".join("DNS:%s" % d for d in domains).encode('ascii') - ) - ] - if must_staple: - extensions.append(OpenSSL.crypto.X509Extension( - b"1.3.6.1.5.5.7.1.24", - critical=False, - value=b"DER:30:03:02:01:05")) - req.add_extensions(extensions) - req.set_version(2) - req.set_pubkey(pkey) - req.sign(pkey, "sha256") - return tuple(OpenSSL.crypto.dump_certificate_request(method, req) - for method in (OpenSSL.crypto.FILETYPE_PEM, - OpenSSL.crypto.FILETYPE_ASN1)) + return util.CSR(csr_filename, csr_pem, "pem") # WARNING: the csr and private key file are possible attack vectors for TOCTOU @@ -193,22 +151,27 @@ def import_csr_file(csrfile, data): :param str csrfile: CSR filename :param str data: contents of the CSR file - :returns: (`OpenSSL.crypto.FILETYPE_PEM` or `OpenSSL.crypto.FILETYPE_ASN1`, + :returns: (`OpenSSL.crypto.FILETYPE_PEM`, util.CSR object representing the CSR, list of domains requested in the CSR) :rtype: tuple """ - for form, typ in (("der", OpenSSL.crypto.FILETYPE_ASN1,), - ("pem", OpenSSL.crypto.FILETYPE_PEM,),): + PEM = OpenSSL.crypto.FILETYPE_PEM + load = OpenSSL.crypto.load_certificate_request + try: + # Try to parse as DER first, then fall back to PEM. + csr = load(OpenSSL.crypto.FILETYPE_ASN1, data) + except OpenSSL.crypto.Error: try: - domains = get_names_from_csr(data, typ) + csr = load(PEM, data) except OpenSSL.crypto.Error: - logger.debug("CSR parse error (form=%s, typ=%s):", form, typ) - logger.debug(traceback.format_exc()) - continue - return typ, util.CSR(file=csrfile, data=data, form=form), domains - raise errors.Error("Failed to parse CSR file: {0}".format(csrfile)) + raise errors.Error("Failed to parse CSR file: {0}".format(csrfile)) + + domains = _get_names_from_loaded_cert_or_req(csr) + # Internally we always use PEM, so re-encode as PEM before returning. + data_pem = OpenSSL.crypto.dump_certificate_request(PEM, csr) + return PEM, util.CSR(file=csrfile, data=data_pem, form="pem"), domains def make_key(bits): @@ -290,22 +253,12 @@ def get_sans_from_cert(cert, typ=OpenSSL.crypto.FILETYPE_PEM): cert, OpenSSL.crypto.load_certificate, typ) -def get_sans_from_csr(csr, typ=OpenSSL.crypto.FILETYPE_PEM): - """Get a list of Subject Alternative Names from a CSR. - - :param str csr: CSR (encoded). - :param typ: `OpenSSL.crypto.FILETYPE_PEM` or `OpenSSL.crypto.FILETYPE_ASN1` - - :returns: A list of Subject Alternative Names. - :rtype: list - - """ - return _get_sans_from_cert_or_req( - csr, OpenSSL.crypto.load_certificate_request, typ) - - def _get_names_from_cert_or_req(cert_or_req, load_func, typ): loaded_cert_or_req = _load_cert_or_req(cert_or_req, load_func, typ) + return _get_names_from_loaded_cert_or_req(loaded_cert_or_req) + + +def _get_names_from_loaded_cert_or_req(loaded_cert_or_req): common_name = loaded_cert_or_req.get_subject().CN # pylint: disable=protected-access sans = acme_crypto_util._pyopenssl_cert_or_req_san(loaded_cert_or_req) @@ -330,20 +283,6 @@ def get_names_from_cert(csr, typ=OpenSSL.crypto.FILETYPE_PEM): csr, OpenSSL.crypto.load_certificate, typ) -def get_names_from_csr(csr, typ=OpenSSL.crypto.FILETYPE_PEM): - """Get a list of domains from a CSR, including the CN if it is set. - - :param str csr: CSR (encoded). - :param typ: `OpenSSL.crypto.FILETYPE_PEM` or `OpenSSL.crypto.FILETYPE_ASN1` - - :returns: A list of domain names. - :rtype: list - - """ - return _get_names_from_cert_or_req( - csr, OpenSSL.crypto.load_certificate_request, typ) - - def dump_pyopenssl_chain(chain, filetype=OpenSSL.crypto.FILETYPE_PEM): """Dump certificate chain into a bundle. diff --git a/certbot/main.py b/certbot/main.py index 8c3ecd6bc..64e7d07a4 100644 --- a/certbot/main.py +++ b/certbot/main.py @@ -617,8 +617,8 @@ def _csr_get_and_save_cert(config, le_client): have the privkey, and therefore can't construct the files for a lineage. So we just save the cert & chain to disk :/ """ - csr, typ = config.actual_csr - certr, chain = le_client.obtain_certificate_from_csr(config.domains, csr, typ) + csr, _ = config.actual_csr + certr, chain = le_client.obtain_certificate_from_csr(config.domains, csr) if config.dry_run: logger.debug( "Dry run: skipping saving certificate to %s", config.cert_path) diff --git a/certbot/tests/client_test.py b/certbot/tests/client_test.py index 8b72e1df7..391a407ac 100644 --- a/certbot/tests/client_test.py +++ b/certbot/tests/client_test.py @@ -18,7 +18,7 @@ import certbot.tests.util as test_util KEY = test_util.load_vector("rsa512_key.pem") -CSR_SAN = test_util.load_vector("csr-san.der") +CSR_SAN = test_util.load_vector("csr-san.pem") class ConfigHelper(object): @@ -165,7 +165,7 @@ class ClientTest(ClientTestCommon): self.acme.request_issuance.assert_called_once_with( jose.ComparableX509(OpenSSL.crypto.load_certificate_request( - OpenSSL.crypto.FILETYPE_ASN1, CSR_SAN)), + OpenSSL.crypto.FILETYPE_PEM, CSR_SAN)), authzr) self.acme.fetch_chain.assert_called_once_with(mock.sentinel.certr) @@ -175,7 +175,7 @@ class ClientTest(ClientTestCommon): def test_obtain_certificate_from_csr(self, unused_mock_get_utility, mock_logger): self._mock_obtain_certificate() - test_csr = util.CSR(form="der", file=None, data=CSR_SAN) + test_csr = util.CSR(form="pem", file=None, data=CSR_SAN) auth_handler = self.client.auth_handler authzr = auth_handler.get_authorizations(self.eg_domains, False) @@ -246,7 +246,7 @@ class ClientTest(ClientTestCommon): mock_crypto_util): self._mock_obtain_certificate() - csr = util.CSR(form="der", file=None, data=CSR_SAN) + csr = util.CSR(form="pem", file=None, data=CSR_SAN) mock_crypto_util.init_save_csr.return_value = csr mock_crypto_util.init_save_key.return_value = mock.sentinel.key domains = ["example.com", "www.example.com"] diff --git a/certbot/tests/crypto_util_test.py b/certbot/tests/crypto_util_test.py index df5d9f0f6..c83ad96b1 100644 --- a/certbot/tests/crypto_util_test.py +++ b/certbot/tests/crypto_util_test.py @@ -74,21 +74,20 @@ class InitSaveCSRTest(test_util.TempDirTestCase): mock.Mock(strict_permissions=True, dry_run=False), interfaces.IConfig) - @mock.patch('certbot.crypto_util.make_csr') + @mock.patch('acme.crypto_util.make_csr') @mock.patch('certbot.crypto_util.util.make_or_verify_dir') def test_success(self, unused_mock_verify, mock_csr): from certbot.crypto_util import init_save_csr - mock_csr.return_value = (b'csr_pem', b'csr_der') + mock_csr.return_value = b'csr_pem' csr = init_save_csr( - mock.Mock(pem='dummy_key'), 'example.com', self.tempdir, - 'csr-certbot.pem') + mock.Mock(pem='dummy_key'), 'example.com', self.tempdir) - self.assertEqual(csr.data, b'csr_der') + self.assertEqual(csr.data, b'csr_pem') self.assertTrue('csr-certbot.pem' in csr.file) - @mock.patch('certbot.crypto_util.make_csr') + @mock.patch('acme.crypto_util.make_csr') @mock.patch('certbot.crypto_util.util.make_or_verify_dir') def test_success_dry_run(self, unused_mock_verify, mock_csr): from certbot.crypto_util import init_save_csr @@ -96,55 +95,15 @@ class InitSaveCSRTest(test_util.TempDirTestCase): zope.component.provideUtility( mock.Mock(strict_permissions=True, dry_run=True), interfaces.IConfig) - mock_csr.return_value = (b'csr_pem', b'csr_der') + mock_csr.return_value = b'csr_pem' csr = init_save_csr( - mock.Mock(pem='dummy_key'), 'example.com', self.tempdir, - 'csr-certbot.pem') + mock.Mock(pem='dummy_key'), 'example.com', self.tempdir) - self.assertEqual(csr.data, b'csr_der') + self.assertEqual(csr.data, b'csr_pem') self.assertTrue(csr.file is None) -class MakeCSRTest(unittest.TestCase): - """Tests for certbot.crypto_util.make_csr.""" - - @classmethod - def _call(cls, *args, **kwargs): - from certbot.crypto_util import make_csr - return make_csr(*args, **kwargs) - - def test_san(self): - from certbot.crypto_util import get_sans_from_csr - # TODO: Fails for RSA256_KEY - csr_pem, csr_der = self._call( - RSA512_KEY, ['example.com', 'www.example.com']) - self.assertEqual( - ['example.com', 'www.example.com'], get_sans_from_csr(csr_pem)) - self.assertEqual( - ['example.com', 'www.example.com'], get_sans_from_csr( - csr_der, OpenSSL.crypto.FILETYPE_ASN1)) - - def test_must_staple(self): - # TODO: Fails for RSA256_KEY - csr_pem, _ = self._call( - RSA512_KEY, ['example.com', 'www.example.com'], must_staple=True) - csr = OpenSSL.crypto.load_certificate_request( - OpenSSL.crypto.FILETYPE_PEM, csr_pem) - - # In pyopenssl 0.13 (used with TOXENV=py26-oldest and py27-oldest), csr - # objects don't have a get_extensions() method, so we skip this test if - # the method isn't available. - if hasattr(csr, 'get_extensions'): - # NOTE: Ideally we would filter by the TLS Feature OID, but - # OpenSSL.crypto.X509Extension doesn't give us the extension's raw OID, - # and the shortname field is just "UNDEF" - must_staple_exts = [e for e in csr.get_extensions() - if e.get_data() == b"0\x03\x02\x01\x05"] - self.assertEqual(len(must_staple_exts), 1, - "Expected exactly one Must Staple extension") - - class ValidCSRTest(unittest.TestCase): """Tests for certbot.crypto_util.valid_csr.""" @@ -162,9 +121,6 @@ class ValidCSRTest(unittest.TestCase): def test_valid_der_false(self): self.assertFalse(self._call(test_util.load_vector('csr.der'))) - def test_valid_der_san_false(self): - self.assertFalse(self._call(test_util.load_vector('csr-san.der'))) - def test_empty_false(self): self.assertFalse(self._call('')) @@ -200,12 +156,13 @@ class ImportCSRFileTest(unittest.TestCase): def test_der_csr(self): csrfile = test_util.vector_path('csr.der') data = test_util.load_vector('csr.der') + data_pem = test_util.load_vector('csr.pem') self.assertEqual( - (OpenSSL.crypto.FILETYPE_ASN1, + (OpenSSL.crypto.FILETYPE_PEM, util.CSR(file=csrfile, - data=data, - form="der"), + data=data_pem, + form="pem"), ["example.com"],), self._call(csrfile, data)) @@ -272,36 +229,6 @@ class GetSANsFromCertTest(unittest.TestCase): self._call(test_util.load_vector('cert-san.pem'))) -class GetSANsFromCSRTest(unittest.TestCase): - """Tests for certbot.crypto_util.get_sans_from_csr.""" - - @classmethod - def _call(cls, *args, **kwargs): - from certbot.crypto_util import get_sans_from_csr - return get_sans_from_csr(*args, **kwargs) - - def test_extract_one_san(self): - self.assertEqual(['example.com'], self._call( - test_util.load_vector('csr.pem'))) - - def test_extract_two_sans(self): - self.assertEqual(['example.com', 'www.example.com'], self._call( - test_util.load_vector('csr-san.pem'))) - - def test_extract_six_sans(self): - self.assertEqual(self._call(test_util.load_vector('csr-6sans.pem')), - ["example.com", "example.org", "example.net", - "example.info", "subdomain.example.com", - "other.subdomain.example.com"]) - - def test_parse_non_csr(self): - self.assertRaises(OpenSSL.crypto.Error, self._call, "hello there") - - def test_parse_no_sans(self): - self.assertEqual( - [], self._call(test_util.load_vector('csr-nosans.pem'))) - - class GetNamesFromCertTest(unittest.TestCase): """Tests for certbot.crypto_util.get_names_from_cert.""" @@ -327,36 +254,9 @@ class GetNamesFromCertTest(unittest.TestCase): ['example.com'] + ['{0}.example.com'.format(c) for c in 'abcd'], self._call(test_util.load_vector('cert-5sans.pem'))) - -class GetNamesFromCSRTest(unittest.TestCase): - """Tests for certbot.crypto_util.get_names_from_csr.""" - @classmethod - def _call(cls, *args, **kwargs): - from certbot.crypto_util import get_names_from_csr - return get_names_from_csr(*args, **kwargs) - - def test_extract_one_san(self): - self.assertEqual(['example.com'], self._call( - test_util.load_vector('csr.pem'))) - - def test_extract_two_sans(self): - self.assertEqual(set(('example.com', 'www.example.com',)), set( - self._call(test_util.load_vector('csr-san.pem')))) - - def test_extract_six_sans(self): - self.assertEqual( - set(self._call(test_util.load_vector('csr-6sans.pem'))), - set(("example.com", "example.org", "example.net", - "example.info", "subdomain.example.com", - "other.subdomain.example.com",))) - - def test_parse_non_csr(self): + def test_parse_non_cert(self): self.assertRaises(OpenSSL.crypto.Error, self._call, "hello there") - def test_parse_no_sans(self): - self.assertEqual(["example.org"], - self._call(test_util.load_vector('csr-nosans.pem'))) - class CertLoaderTest(unittest.TestCase): """Tests for certbot.crypto_util.pyopenssl_load_certificate""" diff --git a/certbot/tests/testdata/csr-san.der b/certbot/tests/testdata/csr-san.der deleted file mode 100644 index 68fd38723..000000000 Binary files a/certbot/tests/testdata/csr-san.der and /dev/null differ