diff --git a/acme/acme/client.py b/acme/acme/client.py index 1838fab42..97f529aae 100644 --- a/acme/acme/client.py +++ b/acme/acme/client.py @@ -677,9 +677,6 @@ class BackwardsCompatibleClientV2(object): return getattr(self.client, name) elif name in dir(ClientBase): return getattr(self.client, name) - # temporary, for breaking changes into smaller pieces - elif name in dir(Client): - return getattr(self.client, name) else: raise AttributeError() @@ -723,10 +720,48 @@ class BackwardsCompatibleClientV2(object): authorizations = [] for domain in dnsNames: authorizations.append(self.client.request_domain_challenges(domain)) - return messages.OrderResource(authorizations=authorizations) + return messages.OrderResource(authorizations=authorizations, csr_pem=csr_pem) else: return self.client.new_order(csr_pem) + def finalize_order(self, orderr, deadline): + """Finalize an order and obtain a certificate. + + :param messages.OrderResource orderr: order to finalize + :param datetime.datetime deadline: when to stop polling and timeout + + :returns: finalized order + :rtype: messages.OrderResource + + """ + if self.acme_version == 1: + csr_pem = orderr.csr_pem + certr = self.client.request_issuance( + jose.ComparableX509( + OpenSSL.crypto.load_certificate_request(OpenSSL.crypto.FILETYPE_PEM, csr_pem)), + orderr.authorizations) + + chain = None + while datetime.datetime.now() < deadline: + try: + chain = self.client.fetch_chain(certr) + break + except errors.Error: + time.sleep(1) + + if chain is None: + raise errors.TimeoutError( + 'Failed to fetch chain. You should not deploy the generated ' + 'certificate, please rerun the command for a new one.') + + cert = OpenSSL.crypto.dump_certificate( + OpenSSL.crypto.FILETYPE_PEM, certr.body.wrapped) + chain = crypto_util.dump_pyopenssl_chain(chain) + + return orderr.update(fullchain_pem=(cert + chain)) + else: + return self.client.finalize_order(orderr, deadline) + def _acme_version_from_directory(self, directory): if hasattr(directory, 'newNonce'): return 2 diff --git a/acme/acme/client_test.py b/acme/acme/client_test.py index 1b33ca5d7..acc5193ca 100644 --- a/acme/acme/client_test.py +++ b/acme/acme/client_test.py @@ -8,6 +8,7 @@ from six.moves import http_client # pylint: disable=import-error import josepy as jose import mock +import OpenSSL import requests from acme import challenges @@ -82,6 +83,29 @@ class ClientTestBase(unittest.TestCase): class BackwardsCompatibleClientV2Test(ClientTestBase): """Tests for acme.client.BackwardsCompatibleClientV2.""" + def setUp(self): + super(BackwardsCompatibleClientV2Test, self).setUp() + # contains a loaded cert + self.certr = messages.CertificateResource( + body=messages_test.CERT) + + loaded = OpenSSL.crypto.load_certificate( + OpenSSL.crypto.FILETYPE_PEM, CERT_SAN_PEM) + wrapped = jose.ComparableX509(loaded) + self.chain = [wrapped, wrapped] + + self.cert_pem = OpenSSL.crypto.dump_certificate( + OpenSSL.crypto.FILETYPE_PEM, messages_test.CERT.wrapped) + + single_chain = OpenSSL.crypto.dump_certificate( + OpenSSL.crypto.FILETYPE_PEM, loaded) + self.chain_pem = single_chain + single_chain + + self.fullchain_pem = self.cert_pem + self.chain_pem + + self.orderr = messages.OrderResource( + csr_pem=CSR_SAN_PEM) + def _init(self): uri = 'http://www.letsencrypt-demo.org/directory' from acme.client import BackwardsCompatibleClientV2 @@ -109,8 +133,6 @@ class BackwardsCompatibleClientV2Test(ClientTestBase): client = self._init() self.assertEqual(client.directory, client.client.directory) self.assertEqual(client.key, KEY) - # delete this line once we finish migrating to new API: - self.assertEqual(client.register, client.client.register) self.assertEqual(client.update_registration, client.client.update_registration) self.assertRaises(AttributeError, client.__getattr__, 'nonexistent') self.assertRaises(AttributeError, client.__getattr__, 'new_account_and_tos') @@ -182,7 +204,52 @@ class BackwardsCompatibleClientV2Test(ClientTestBase): client.new_order(mock_csr_pem) mock_client().new_order.assert_called_once_with(mock_csr_pem) + @mock.patch('acme.client.Client') + def test_finalize_order_v1_success(self, mock_client): + self.response.json.return_value = DIRECTORY_V1.to_json() + mock_client().request_issuance.return_value = self.certr + mock_client().fetch_chain.return_value = self.chain + + deadline = datetime.datetime(9999, 9, 9) + client = self._init() + result = client.finalize_order(self.orderr, deadline) + self.assertEqual(result.fullchain_pem, self.fullchain_pem) + mock_client().fetch_chain.assert_called_once_with(self.certr) + + @mock.patch('acme.client.Client') + def test_finalize_order_v1_fetch_chain_error(self, mock_client): + self.response.json.return_value = DIRECTORY_V1.to_json() + + mock_client().request_issuance.return_value = self.certr + mock_client().fetch_chain.return_value = self.chain + mock_client().fetch_chain.side_effect = [errors.Error, self.chain] + + deadline = datetime.datetime(9999, 9, 9) + client = self._init() + result = client.finalize_order(self.orderr, deadline) + self.assertEqual(result.fullchain_pem, self.fullchain_pem) + self.assertEqual(mock_client().fetch_chain.call_count, 2) + + @mock.patch('acme.client.Client') + def test_finalize_order_v1_timeout(self, mock_client): + self.response.json.return_value = DIRECTORY_V1.to_json() + + mock_client().request_issuance.return_value = self.certr + + deadline = deadline = datetime.datetime.now() - datetime.timedelta(seconds=60) + client = self._init() + self.assertRaises(errors.TimeoutError, client.finalize_order, + self.orderr, deadline) + + def test_finalize_order_v2(self): + self.response.json.return_value = DIRECTORY_V2.to_json() + mock_orderr = mock.MagicMock() + mock_deadline = mock.MagicMock() + with mock.patch('acme.client.ClientV2') as mock_client: + client = self._init() + client.finalize_order(mock_orderr, mock_deadline) + mock_client().finalize_order.assert_called_once_with(mock_orderr, mock_deadline) class ClientTest(ClientTestBase): diff --git a/acme/acme/crypto_util.py b/acme/acme/crypto_util.py index a986721f0..f13c5109c 100644 --- a/acme/acme/crypto_util.py +++ b/acme/acme/crypto_util.py @@ -8,6 +8,8 @@ import socket import sys import OpenSSL +import josepy as jose + from acme import errors @@ -280,3 +282,23 @@ def gen_ss_cert(key, domains, not_before=None, cert.set_pubkey(key) cert.sign(key, "sha256") return cert + +def dump_pyopenssl_chain(chain, filetype=OpenSSL.crypto.FILETYPE_PEM): + """Dump certificate chain into a bundle. + + :param list chain: List of `OpenSSL.crypto.X509` (or wrapped in + :class:`josepy.util.ComparableX509`). + + """ + # XXX: returns empty string when no chain is available, which + # shuts up RenewableCert, but might not be the best solution... + + def _dump_cert(cert): + if isinstance(cert, jose.ComparableX509): + # pylint: disable=protected-access + cert = cert.wrapped + return OpenSSL.crypto.dump_certificate(filetype, cert) + + # assumes that OpenSSL.crypto.dump_certificate includes ending + # newline character + return b"".join(_dump_cert(cert) for cert in chain) diff --git a/acme/acme/crypto_util_test.py b/acme/acme/crypto_util_test.py index 14aaac8b5..e8dd3b20c 100644 --- a/acme/acme/crypto_util_test.py +++ b/acme/acme/crypto_util_test.py @@ -225,5 +225,33 @@ class MakeCSRTest(unittest.TestCase): self.assertEqual(len(must_staple_exts), 1, "Expected exactly one Must Staple extension") + +class DumpPyopensslChainTest(unittest.TestCase): + """Test for dump_pyopenssl_chain.""" + + @classmethod + def _call(cls, loaded): + # pylint: disable=protected-access + from acme.crypto_util import dump_pyopenssl_chain + return dump_pyopenssl_chain(loaded) + + def test_dump_pyopenssl_chain(self): + names = ['cert.pem', 'cert-san.pem', 'cert-idnsans.pem'] + loaded = [test_util.load_cert(name) for name in names] + length = sum( + len(OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)) + for cert in loaded) + self.assertEqual(len(self._call(loaded)), length) + + def test_dump_pyopenssl_chain_wrapped(self): + names = ['cert.pem', 'cert-san.pem', 'cert-idnsans.pem'] + loaded = [test_util.load_cert(name) for name in names] + wrap_func = jose.ComparableX509 + wrapped = [wrap_func(cert) for cert in loaded] + dump_func = OpenSSL.crypto.dump_certificate + length = sum(len(dump_func(OpenSSL.crypto.FILETYPE_PEM, cert)) for cert in loaded) + self.assertEqual(len(self._call(wrapped)), length) + + if __name__ == '__main__': unittest.main() # pragma: no cover diff --git a/certbot/client.py b/certbot/client.py index dd11f2204..fc3848a5c 100644 --- a/certbot/client.py +++ b/certbot/client.py @@ -1,4 +1,5 @@ """Certbot client API.""" +import datetime import logging import os import platform @@ -11,7 +12,6 @@ import zope.component from acme import client as acme_client from acme import crypto_util as acme_crypto_util -from acme import errors as acme_errors from acme import messages import certbot @@ -243,8 +243,7 @@ class Client(object): than `authkey`. :param acme.messages.OrderResource orderr: contains authzrs - :returns: `.CertificateResource` and certificate chain (as - returned by `.fetch_chain`). + :returns: certificate and chain as PEM strings :rtype: tuple """ @@ -264,32 +263,9 @@ class Client(object): orderr = orderr.update(authorizations=authzr) authzr = orderr.authorizations - certr = self.acme.request_issuance( - jose.ComparableX509( - OpenSSL.crypto.load_certificate_request(OpenSSL.crypto.FILETYPE_PEM, csr.data)), - authzr) - - notify = zope.component.getUtility(interfaces.IDisplay).notification - retries = 0 - chain = None - - while retries <= 1: - if retries: - notify('Failed to fetch chain, please check your network ' - 'and continue', pause=True) - try: - chain = self.acme.fetch_chain(certr) - break - except acme_errors.Error: - logger.debug('Failed to fetch chain', exc_info=True) - retries += 1 - - if chain is None: - raise acme_errors.Error( - 'Failed to fetch chain. You should not deploy the generated ' - 'certificate, please rerun the command for a new one.') - - return certr, chain + deadline = datetime.datetime.now() + datetime.timedelta(seconds=90) + orderr = self.acme.finalize_order(orderr, deadline) + return crypto_util.cert_and_chain_from_fullchain(orderr.fullchain_pem) def obtain_certificate(self, domains): """Obtains a certificate from the ACME server. @@ -298,10 +274,9 @@ class Client(object): :param list domains: domains to get a certificate - :returns: `.CertificateResource`, certificate chain (as - returned by `.fetch_chain`), and newly generated private key - (`.util.Key`) and DER-encoded Certificate Signing Request - (`.util.CSR`). + :returns: :returns: certificate as PEM string, chain as PEM string, + newly generated private key (`.util.Key`), and DER-encoded + Certificate Signing Request (`.util.CSR`). :rtype: tuple """ @@ -329,9 +304,9 @@ class Client(object): os.remove(csr.file) return self.obtain_certificate(successful_domains) else: - certr, chain = self.obtain_certificate_from_csr(csr, orderr) + cert, chain = self.obtain_certificate_from_csr(csr, orderr) - return certr, chain, key, csr + return cert, chain, key, csr # pylint: disable=no-member def obtain_and_enroll_certificate(self, domains, certname): @@ -350,7 +325,7 @@ class Client(object): be obtained, or None if doing a successful dry run. """ - certr, chain, key, _ = self.obtain_certificate(domains) + cert, chain, key, _ = self.obtain_certificate(domains) if (self.config.config_dir != constants.CLI_DEFAULTS["config_dir"] or self.config.work_dir != constants.CLI_DEFAULTS["work_dir"]): @@ -365,19 +340,16 @@ class Client(object): return None else: return storage.RenewableCert.new_lineage( - new_name, OpenSSL.crypto.dump_certificate( - OpenSSL.crypto.FILETYPE_PEM, certr.body.wrapped), - key.pem, crypto_util.dump_pyopenssl_chain(chain), + new_name, cert, + key.pem, chain, self.config) - def save_certificate(self, certr, chain_cert, + def save_certificate(self, cert_pem, chain_pem, cert_path, chain_path, fullchain_path): """Saves the certificate received from the ACME server. - :param certr: ACME "certificate" resource. - :type certr: :class:`acme.messages.Certificate` - - :param list chain_cert: + :param str cert_pem: + :param str chain_pem: :param str cert_path: Candidate path to a certificate. :param str chain_path: Candidate path to a certificate chain. :param str fullchain_path: Candidate path to a full cert chain. @@ -394,8 +366,6 @@ class Client(object): os.path.dirname(path), 0o755, os.geteuid(), self.config.strict_permissions) - cert_pem = OpenSSL.crypto.dump_certificate( - OpenSSL.crypto.FILETYPE_PEM, certr.body.wrapped) cert_file, abs_cert_path = _open_pem_file('cert_path', cert_path) @@ -406,20 +376,15 @@ class Client(object): logger.info("Server issued certificate; certificate written to %s", abs_cert_path) - if not chain_cert: - return abs_cert_path, None, None - else: - chain_pem = crypto_util.dump_pyopenssl_chain(chain_cert) + chain_file, abs_chain_path =\ + _open_pem_file('chain_path', chain_path) + fullchain_file, abs_fullchain_path =\ + _open_pem_file('fullchain_path', fullchain_path) - chain_file, abs_chain_path =\ - _open_pem_file('chain_path', chain_path) - fullchain_file, abs_fullchain_path =\ - _open_pem_file('fullchain_path', fullchain_path) + _save_chain(chain_pem, chain_file) + _save_chain(cert_pem + chain_pem, fullchain_file) - _save_chain(chain_pem, chain_file) - _save_chain(cert_pem + chain_pem, fullchain_file) - - return abs_cert_path, abs_chain_path, abs_fullchain_path + return abs_cert_path, abs_chain_path, abs_fullchain_path def deploy_certificate(self, domains, privkey_path, cert_path, chain_path, fullchain_path): diff --git a/certbot/crypto_util.py b/certbot/crypto_util.py index 8368855cd..11721cc10 100644 --- a/certbot/crypto_util.py +++ b/certbot/crypto_util.py @@ -14,7 +14,6 @@ import six import zope.component from cryptography.hazmat.backends import default_backend from cryptography import x509 -import josepy as jose from acme import crypto_util as acme_crypto_util @@ -367,16 +366,7 @@ def dump_pyopenssl_chain(chain, filetype=OpenSSL.crypto.FILETYPE_PEM): """ # XXX: returns empty string when no chain is available, which # shuts up RenewableCert, but might not be the best solution... - - def _dump_cert(cert): - if isinstance(cert, jose.ComparableX509): - # pylint: disable=protected-access - cert = cert.wrapped - return OpenSSL.crypto.dump_certificate(filetype, cert) - - # assumes that OpenSSL.crypto.dump_certificate includes ending - # newline character - return b"".join(_dump_cert(cert) for cert in chain) + return acme_crypto_util.dump_pyopenssl_chain(chain, filetype) def notBefore(cert_path): @@ -443,3 +433,16 @@ def sha256sum(filename): with open(filename, 'rb') as f: sha256.update(f.read()) return sha256.hexdigest() + +def cert_and_chain_from_fullchain(fullchain_pem): + """Split fullchain_pem into cert_pem and chain_pem + + :param str fullchain_pem: concatenated cert + chain + + :returns: tuple of string cert_pem and chain_pem + :rtype: tuple + """ + cert = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, + OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, fullchain_pem)) + chain = fullchain_pem[len(cert):] + return (cert, chain) diff --git a/certbot/main.py b/certbot/main.py index d01f68920..eff4c9c8f 100644 --- a/certbot/main.py +++ b/certbot/main.py @@ -1064,13 +1064,13 @@ def _csr_get_and_save_cert(config, le_client): """ csr, _ = config.actual_csr - certr, chain = le_client.obtain_certificate_from_csr(csr) + cert, chain = le_client.obtain_certificate_from_csr(csr) if config.dry_run: logger.debug( "Dry run: skipping saving certificate to %s", config.cert_path) return None, None cert_path, _, fullchain_path = le_client.save_certificate( - certr, chain, config.cert_path, config.chain_path, config.fullchain_path) + cert, chain, config.cert_path, config.chain_path, config.fullchain_path) return cert_path, fullchain_path def renew_cert(config, plugins, lineage): diff --git a/certbot/renewal.py b/certbot/renewal.py index 024a815cc..ea5d87a5e 100644 --- a/certbot/renewal.py +++ b/certbot/renewal.py @@ -294,15 +294,12 @@ def renew_cert(config, domains, le_client, lineage): _avoid_invalidating_lineage(config, lineage, original_server) if not domains: domains = lineage.names() - new_certr, new_chain, new_key, _ = le_client.obtain_certificate(domains) + new_cert, new_chain, new_key, _ = le_client.obtain_certificate(domains) if config.dry_run: logger.debug("Dry run: skipping updating lineage at %s", os.path.dirname(lineage.cert)) else: prior_version = lineage.latest_common_version() - new_cert = OpenSSL.crypto.dump_certificate( - OpenSSL.crypto.FILETYPE_PEM, new_certr.body.wrapped) - new_chain = crypto_util.dump_pyopenssl_chain(new_chain) # TODO: Check return value of save_successor lineage.save_successor(prior_version, new_cert, new_key.pem, new_chain, config) lineage.update_all_links_to(lineage.latest_common_version()) diff --git a/certbot/tests/client_test.py b/certbot/tests/client_test.py index a65341692..ed9c140e7 100644 --- a/certbot/tests/client_test.py +++ b/certbot/tests/client_test.py @@ -4,12 +4,8 @@ import shutil import tempfile import unittest -import josepy as jose -import OpenSSL import mock -from acme import errors as acme_errors - from certbot import account from certbot import errors from certbot import util @@ -134,7 +130,10 @@ class ClientTest(ClientTestCommon): self.config.allow_subset_of_names = False self.config.dry_run = False self.eg_domains = ["example.com", "www.example.com"] - self.eg_order = mock.MagicMock(authorizations=[None]) + self.eg_order = mock.MagicMock( + authorizations=[None], + fullchain_pem=mock.sentinel.fullchain_pem, + csr_pem=mock.sentinel.csr_pem) def test_init_acme_verify_ssl(self): net = self.acme_client.call_args[0][0] @@ -143,9 +142,9 @@ class ClientTest(ClientTestCommon): def _mock_obtain_certificate(self): self.client.auth_handler = mock.MagicMock() self.client.auth_handler.handle_authorizations.return_value = [None] - self.acme.request_issuance.return_value = mock.sentinel.certr - self.acme.fetch_chain.return_value = mock.sentinel.chain + self.acme.finalize_order.return_value = self.eg_order self.acme.new_order.return_value = self.eg_order + self.eg_order.update.return_value = self.eg_order def _check_obtain_certificate(self, auth_count=1): if auth_count == 1: @@ -155,27 +154,24 @@ class ClientTest(ClientTestCommon): else: self.assertEqual(self.client.auth_handler.handle_authorizations.call_count, auth_count) - authzr = self.client.auth_handler.handle_authorizations() - - self.acme.request_issuance.assert_called_once_with( - jose.ComparableX509(OpenSSL.crypto.load_certificate_request( - OpenSSL.crypto.FILETYPE_PEM, CSR_SAN)), - authzr) - - self.acme.fetch_chain.assert_called_once_with(mock.sentinel.certr) + self.acme.finalize_order.assert_called_once_with( + self.eg_order, mock.ANY) + @mock.patch("certbot.client.crypto_util") @mock.patch("certbot.client.logger") @test_util.patch_get_utility() def test_obtain_certificate_from_csr(self, unused_mock_get_utility, - mock_logger): + mock_logger, mock_crypto_util): self._mock_obtain_certificate() test_csr = util.CSR(form="pem", file=None, data=CSR_SAN) auth_handler = self.client.auth_handler + mock_crypto_util.cert_and_chain_from_fullchain.return_value = (mock.sentinel.cert, + mock.sentinel.chain) orderr = self.acme.new_order(test_csr.data) auth_handler.handle_authorizations(orderr, False) self.assertEqual( - (mock.sentinel.certr, mock.sentinel.chain), + (mock.sentinel.cert, mock.sentinel.chain), self.client.obtain_certificate_from_csr( test_csr, orderr=orderr)) @@ -184,7 +180,7 @@ class ClientTest(ClientTestCommon): # Test for orderr=None self.assertEqual( - (mock.sentinel.certr, mock.sentinel.chain), + (mock.sentinel.cert, mock.sentinel.chain), self.client.obtain_certificate_from_csr( test_csr, orderr=None)) @@ -198,41 +194,13 @@ class ClientTest(ClientTestCommon): test_csr) mock_logger.warning.assert_called_once_with(mock.ANY) - @test_util.patch_get_utility() - def test_obtain_certificate_from_csr_retry_succeeded( - self, mock_get_utility): - self._mock_obtain_certificate() - self.acme.fetch_chain.side_effect = [acme_errors.Error, - mock.sentinel.chain] - test_csr = util.CSR(form="der", file=None, data=CSR_SAN) - - orderr = self.acme.new_order(test_csr.data) - self.assertEqual( - (mock.sentinel.certr, mock.sentinel.chain), - self.client.obtain_certificate_from_csr( - test_csr, - orderr=orderr)) - self.assertEqual(1, mock_get_utility().notification.call_count) - - @test_util.patch_get_utility() - def test_obtain_certificate_from_csr_retry_failed(self, mock_get_utility): - self._mock_obtain_certificate() - self.acme.fetch_chain.side_effect = acme_errors.Error - test_csr = util.CSR(form="der", file=None, data=CSR_SAN) - - orderr = self.acme.new_order(test_csr.data) - self.assertRaises( - acme_errors.Error, - self.client.obtain_certificate_from_csr, - test_csr, - orderr=orderr) - self.assertEqual(1, mock_get_utility().notification.call_count) - @mock.patch("certbot.client.crypto_util") def test_obtain_certificate(self, mock_crypto_util): csr = util.CSR(form="pem", file=None, data=CSR_SAN) mock_crypto_util.init_save_csr.return_value = csr mock_crypto_util.init_save_key.return_value = mock.sentinel.key + mock_crypto_util.cert_and_chain_from_fullchain.return_value = (mock.sentinel.cert, + mock.sentinel.chain) self._test_obtain_certificate_common(mock.sentinel.key, csr) @@ -240,6 +208,8 @@ class ClientTest(ClientTestCommon): self.config.rsa_key_size, self.config.key_dir) mock_crypto_util.init_save_csr.assert_called_once_with( mock.sentinel.key, self.eg_domains, self.config.csr_dir) + mock_crypto_util.cert_and_chain_from_fullchain.assert_called_once_with( + mock.sentinel.fullchain_pem) @mock.patch("certbot.client.crypto_util") @mock.patch("os.remove") @@ -248,6 +218,8 @@ class ClientTest(ClientTestCommon): key = util.CSR(form="pem", file=mock.sentinel.key_file, data=CSR_SAN) mock_crypto_util.init_save_csr.return_value = csr mock_crypto_util.init_save_key.return_value = key + mock_crypto_util.cert_and_chain_from_fullchain.return_value = (mock.sentinel.cert, + mock.sentinel.chain) authzr = self._authzr_from_domains(["example.com"]) self._test_obtain_certificate_common(key, csr, authzr_ret=authzr, auth_count=2) @@ -255,6 +227,7 @@ class ClientTest(ClientTestCommon): self.assertEqual(mock_crypto_util.init_save_key.call_count, 2) self.assertEqual(mock_crypto_util.init_save_csr.call_count, 2) self.assertEqual(mock_remove.call_count, 2) + self.assertEqual(mock_crypto_util.cert_and_chain_from_fullchain.call_count, 1) @mock.patch("certbot.client.crypto_util") @mock.patch("certbot.client.acme_crypto_util") @@ -263,6 +236,8 @@ class ClientTest(ClientTestCommon): mock_acme_crypto.make_csr.return_value = CSR_SAN mock_crypto.make_key.return_value = mock.sentinel.key_pem key = util.Key(file=None, pem=mock.sentinel.key_pem) + mock_crypto.cert_and_chain_from_fullchain.return_value = (mock.sentinel.cert, + mock.sentinel.chain) self.client.config.dry_run = True self._test_obtain_certificate_common(key, csr) @@ -272,6 +247,7 @@ class ClientTest(ClientTestCommon): mock.sentinel.key_pem, self.eg_domains, self.config.must_staple) mock_crypto.init_save_key.assert_not_called() mock_crypto.init_save_csr.assert_not_called() + self.assertEqual(mock_crypto.cert_and_chain_from_fullchain.call_count, 1) def _authzr_from_domains(self, domains): authzr = [] @@ -294,7 +270,6 @@ class ClientTest(ClientTestCommon): authzr = authzr_ret or self._authzr_from_domains(self.eg_domains) self.eg_order.authorizations = authzr - self.eg_order.update().authorizations = authzr self.client.auth_handler.handle_authorizations.return_value = authzr with test_util.patch_get_utility(): @@ -302,13 +277,12 @@ class ClientTest(ClientTestCommon): self.assertEqual( result, - (mock.sentinel.certr, mock.sentinel.chain, key, csr)) + (mock.sentinel.cert, mock.sentinel.chain, key, csr)) self._check_obtain_certificate(auth_count) @mock.patch('certbot.client.Client.obtain_certificate') @mock.patch('certbot.storage.RenewableCert.new_lineage') - @mock.patch('OpenSSL.crypto.dump_certificate') - def test_obtain_and_enroll_certificate(self, mock_dump_certificate, + def test_obtain_and_enroll_certificate(self, mock_storage, mock_obtain_certificate): domains = ["example.com", "www.example.com"] mock_obtain_certificate.return_value = (mock.MagicMock(), @@ -324,7 +298,6 @@ class ClientTest(ClientTestCommon): self.assertFalse(self.client.obtain_and_enroll_certificate(domains, None)) self.assertTrue(mock_storage.call_count == 2) - self.assertTrue(mock_dump_certificate.call_count == 2) @mock.patch("certbot.cli.helpful_parser") def test_save_certificate(self, mock_parser): @@ -333,9 +306,8 @@ class ClientTest(ClientTestCommon): tmp_path = tempfile.mkdtemp() os.chmod(tmp_path, 0o755) # TODO: really?? - certr = mock.MagicMock(body=test_util.load_comparable_cert(certs[0])) - chain_cert = [test_util.load_comparable_cert(certs[0]), - test_util.load_comparable_cert(certs[1])] + cert_pem = test_util.load_vector(certs[0]) + chain_pem = (test_util.load_vector(certs[0]) + test_util.load_vector(certs[1])) candidate_cert_path = os.path.join(tmp_path, "certs", "cert_512.pem") candidate_chain_path = os.path.join(tmp_path, "chains", "chain.pem") candidate_fullchain_path = os.path.join(tmp_path, "chains", "fullchain.pem") @@ -345,7 +317,7 @@ class ClientTest(ClientTestCommon): "--fullchain-path", candidate_fullchain_path] cert_path, chain_path, fullchain_path = self.client.save_certificate( - certr, chain_cert, candidate_cert_path, candidate_chain_path, + cert_pem, chain_pem, candidate_cert_path, candidate_chain_path, candidate_fullchain_path) self.assertEqual(os.path.dirname(cert_path), diff --git a/certbot/tests/crypto_util_test.py b/certbot/tests/crypto_util_test.py index f0e2c017e..00303fab3 100644 --- a/certbot/tests/crypto_util_test.py +++ b/certbot/tests/crypto_util_test.py @@ -373,5 +373,18 @@ class Sha256sumTest(unittest.TestCase): '914ffed8daf9e2c99d90ac95c77d54f32cbd556672facac380f0c063498df84e') +class CertAndChainFromFullchainTest(unittest.TestCase): + """Tests for certbot.crypto_util.cert_and_chain_from_fullchain""" + + def test_cert_and_chain_from_fullchain(self): + cert_pem = CERT + chain_pem = CERT + SS_CERT + fullchain_pem = cert_pem + chain_pem + from certbot.crypto_util import cert_and_chain_from_fullchain + cert_out, chain_out = cert_and_chain_from_fullchain(fullchain_pem) + self.assertEqual(cert_out, cert_pem) + self.assertEqual(chain_out, chain_pem) + + if __name__ == '__main__': unittest.main() # pragma: no cover diff --git a/certbot/tests/util.py b/certbot/tests/util.py index 60d8d6084..8434d11de 100644 --- a/certbot/tests/util.py +++ b/certbot/tests/util.py @@ -57,11 +57,6 @@ def load_cert(*names): return OpenSSL.crypto.load_certificate(loader, load_vector(*names)) -def load_comparable_cert(*names): - """Load ComparableX509 cert.""" - return jose.ComparableX509(load_cert(*names)) - - def load_csr(*names): """Load certificate request.""" loader = _guess_loader(