From 7aa9fe845ae6ba2f55c373d22a99ec9966ce725b Mon Sep 17 00:00:00 2001 From: Seth Schoen Date: Tue, 8 Sep 2015 01:33:03 -0700 Subject: [PATCH] Basic fix for #411 --- letsencrypt/cli.py | 111 ++++++++++++++++++++++++-- letsencrypt/client.py | 2 - letsencrypt/display/ops.py | 20 +++++ letsencrypt/storage.py | 19 +++++ letsencrypt/tests/display/ops_test.py | 22 +++++ letsencrypt/tests/renewer_test.py | 20 +++++ 6 files changed, 186 insertions(+), 8 deletions(-) diff --git a/letsencrypt/cli.py b/letsencrypt/cli.py index 066aa388d..4844c4691 100644 --- a/letsencrypt/cli.py +++ b/letsencrypt/cli.py @@ -1,7 +1,9 @@ """Let's Encrypt CLI.""" # TODO: Sanity check all input. Be sure to avoid shell code etc... + import argparse import atexit +import configobj import functools import logging import logging.handlers @@ -12,6 +14,7 @@ import time import traceback import configargparse +import OpenSSL import zope.component import zope.interface.exceptions import zope.interface.verify @@ -27,6 +30,7 @@ from letsencrypt import interfaces from letsencrypt import le_util from letsencrypt import log from letsencrypt import reporter +from letsencrypt import storage from letsencrypt.display import util as display_util from letsencrypt.display import ops as display_ops @@ -69,7 +73,7 @@ Choice of server for authentication/installation: More detailed help: - -h, --help [topic] print this message, or detailed help on a topic; + -h, --help [topic] print this message, or detailed help on a topic; the available topics are: all, apache, automation, nginx, paths, security, testing, or any of the @@ -159,7 +163,8 @@ def _init_le_client(args, config, authenticator, installer): return client.Client(config, acc, authenticator, installer, acme=acme) -def run(args, config, plugins): +def run(args, config, plugins): # pylint: disable=too-many-locals,too-many-branches,too-many-statements + """Obtain a certificate and install.""" if args.configurator is not None and (args.installer is not None or args.authenticator is not None): @@ -181,15 +186,109 @@ def run(args, config, plugins): return "Configurator could not be determined" domains = _find_domains(args, installer) + domains_set = set(domains) + renew_config = configuration.RenewerConfiguration(config) + # I am not sure whether that correctly reads the systemwide + # configuration file. + + configs_dir = renew_config.renewal_configs_dir + identical_names_cert = None + subset_names_cert = None + + for renewal_file in os.listdir(configs_dir): + full_path = os.path.join(configs_dir, renewal_file) + rc_config = configobj.ConfigObj(renew_config.renewer_config_file) + rc_config.merge(configobj.ConfigObj(full_path)) + rc_config.filename = full_path + cli_config = configuration.RenewerConfiguration(config.namespace) + + candidate_lineage = storage.RenewableCert(rc_config, None, cli_config) + # TODO: Handle these differently depending on whether they are + # expired or still valid? + candidate_names = set(candidate_lineage.names()) + if candidate_names == domains_set: + identical_names_cert = (renewal_file, candidate_lineage) + elif candidate_names.issubset(domains_set): + subset_names_cert = (renewal_file, candidate_lineage, + candidate_lineage.names()) + treat_as_renewal = False + question = None + same_cert_question = "You have an existing certificate that contains " + same_cert_question += "exactly the same domains you requested.\n\n{0}" + same_cert_question += "\n\nDo you want to renew and replace this " + same_cert_question += "certificate with a newly-issued one?" + subset_cert_question = "You have an existing certificate that contains " + subset_cert_question += "a portion of the domains you requested (ref: {0})" + subset_cert_question += "\n\nIt contains these names: {1}\n\nYou " + subset_cert_question += "requested these names for the new certificate: " + subset_cert_question += "{2}.\n\nDo you want to replace this existing " + subset_cert_question += "certificate with the new certificate?" + + if identical_names_cert: + question = same_cert_question.format(identical_names_cert[0]) + elif subset_names_cert: + question = subset_cert_question.format(subset_names_cert[0], + ", ".join(subset_names_cert[2]), + ", ".join(domains)) + if question: + if zope.component.getUtility(interfaces.IDisplay).yesno(question, + "Replace", + "Cancel"): + treat_as_renewal = True + else: + # TODO: Once the --duplicate (?) option is implemented, this + # exit will be suppressed and we will not treat this + # as a renewal. This should ideally be done by leaving + # question as None above so that we don't even prompt + # the user with the question. (We might say "if question + # and not duplicate_option:" instead of "if question".) + msg = "To obtain a new certificate that {0} an existing " + msg += "certificate in its domain-name coverage, consult the " + msg += "documentation about the --XXX-TODO option." + what = "duplicates" if identical_names_cert else "overlaps with" + msg = msg.format(what) + reporter_util = zope.component.getUtility(interfaces.IReporter) + reporter_util.add_message(msg, reporter_util.HIGH_PRIORITY, True) + sys.exit(1) + # TODO: Handle errors from _init_le_client? le_client = _init_le_client(args, config, authenticator, installer) - lineage = le_client.obtain_and_enroll_certificate( - domains, authenticator, installer, plugins) - if not lineage: - return "Certificate could not be obtained" + if treat_as_renewal: + # TREAT AS RENEWAL + if identical_names_cert: + lineage = identical_names_cert[1] + else: + lineage = subset_names_cert[1] + # TODO: Use existing privkey instead of generating a new one + new_certr, new_chain, new_key, _ = le_client.obtain_certificate(domains) + # TODO: Check whether it worked! + old_version = lineage.latest_common_version() + lineage.save_successor(old_version, + OpenSSL.crypto.dump_certificate( + OpenSSL.crypto.FILETYPE_PEM, + new_certr.body), + new_key.pem, + OpenSSL.crypto.dump_certificate( + OpenSSL.crypto.FILETYPE_PEM, + new_chain)) + lineage.update_all_links_to(lineage.latest_common_version()) + # TODO: Check return value of save_successor + # TODO: Also update lineage renewal config with any relevant + # configuration values from this attempt? + else: + # TREAT AS NEW REQUEST + lineage = le_client.obtain_and_enroll_certificate( + domains, authenticator, installer, plugins) + if not lineage: + return "Certificate could not be obtained" + # TODO: This treats the key as changed even when it wasn't le_client.deploy_certificate( domains, lineage.privkey, lineage.cert, lineage.chain) le_client.enhance_config(domains, args.redirect) + if treat_as_renewal: + display_ops.success_renewal(domains) + else: + display_ops.success_installation(domains) def auth(args, config, plugins): diff --git a/letsencrypt/client.py b/letsencrypt/client.py index e8dd08c8e..5dad04c09 100644 --- a/letsencrypt/client.py +++ b/letsencrypt/client.py @@ -378,8 +378,6 @@ class Client(object): # sites may have been enabled / final cleanup self.installer.restart() - display_ops.success_installation(domains) - def enhance_config(self, domains, redirect=None): """Enhance the configuration. diff --git a/letsencrypt/display/ops.py b/letsencrypt/display/ops.py index a220d07d9..8370750db 100644 --- a/letsencrypt/display/ops.py +++ b/letsencrypt/display/ops.py @@ -233,6 +233,26 @@ def success_installation(domains): pause=False) +def success_renewal(domains): + """Display a box confirming the renewal of an existing certificate. + + .. todo:: This should be centered on the screen + + :param list domains: domain names which were renewed + + """ + util(interfaces.IDisplay).notification( + "Your existing certificate has been successfully renewed, and the " + "new certificate has been installed.{1}{1}" + "The new certificate covers the following domains: {0}{1}{1}" + "You should test your configuration at:{1}{2}".format( + _gen_https_names(domains), + os.linesep, + os.linesep.join(_gen_ssl_lab_urls(domains))), + height=(14 + len(domains)), + pause=False) + + def _gen_ssl_lab_urls(domains): """Returns a list of urls. diff --git a/letsencrypt/storage.py b/letsencrypt/storage.py index 431f56aff..2cdc8b765 100644 --- a/letsencrypt/storage.py +++ b/letsencrypt/storage.py @@ -11,6 +11,7 @@ import pytz import pyrfc3339 from letsencrypt import constants +from letsencrypt import crypto_util from letsencrypt import errors from letsencrypt import le_util @@ -421,6 +422,24 @@ class RenewableCert(object): # pylint: disable=too-many-instance-attributes """ return self._notafterbefore(lambda x509: x509.get_notAfter(), version) + def names(self, version=None): + """What are the subject names of this certificate? + + (If no version is specified, use the current version.) + + :param int version: the desired version number + :returns: the subject names + :rtype: `list` of `str` + + """ + if version is None: + target = self.current_target("cert") + else: + target = self.version("cert", version) + with open(target) as f: + sans = crypto_util.get_sans_from_cert(f.read()) + return sans + def should_autodeploy(self): """Should this lineage now automatically deploy a newer version? diff --git a/letsencrypt/tests/display/ops_test.py b/letsencrypt/tests/display/ops_test.py index fc4013bed..696deb3b0 100644 --- a/letsencrypt/tests/display/ops_test.py +++ b/letsencrypt/tests/display/ops_test.py @@ -383,5 +383,27 @@ class SuccessInstallationTest(unittest.TestCase): self.assertTrue(name in arg) +class SuccessRenewalTest(unittest.TestCase): + # pylint: disable=too-few-public-methods + """Test the success renewal message.""" + @classmethod + def _call(cls, names): + from letsencrypt.display.ops import success_renewal + success_renewal(names) + + @mock.patch("letsencrypt.display.ops.util") + def test_success_renewal(self, mock_util): + mock_util().notification.return_value = None + names = ["example.com", "abc.com"] + + self._call(names) + + self.assertEqual(mock_util().notification.call_count, 1) + arg = mock_util().notification.call_args_list[0][0][0] + + for name in names: + self.assertTrue(name in arg) + + if __name__ == "__main__": unittest.main() # pragma: no cover diff --git a/letsencrypt/tests/renewer_test.py b/letsencrypt/tests/renewer_test.py index 1b58d9e0f..0169f1159 100644 --- a/letsencrypt/tests/renewer_test.py +++ b/letsencrypt/tests/renewer_test.py @@ -295,6 +295,26 @@ class RenewableCertTests(unittest.TestCase): else: self.assertFalse(self.test_rc.has_pending_deployment()) + def test_names(self): + # Trying the current version + test_cert = test_util.load_vector("cert-san.pem") + os.symlink(os.path.join("..", "..", "archive", "example.org", + "cert12.pem"), self.test_rc.cert) + with open(self.test_rc.cert, "w") as f: + f.write(test_cert) + self.assertEqual(self.test_rc.names(), + ["example.com", "www.example.com"]) + + # Trying a non-current version + test_cert = test_util.load_vector("cert.pem") + os.unlink(self.test_rc.cert) + os.symlink(os.path.join("..", "..", "archive", "example.org", + "cert15.pem"), self.test_rc.cert) + with open(self.test_rc.cert, "w") as f: + f.write(test_cert) + self.assertEqual(self.test_rc.names(12), + ["example.com", "www.example.com"]) + def _test_notafterbefore(self, function, timestamp): test_cert = test_util.load_vector("cert.pem") os.symlink(os.path.join("..", "..", "archive", "example.org",