From 0a7ca2f32e89a65b93bca0c0e540ff004633e059 Mon Sep 17 00:00:00 2001 From: Erica Portnoy Date: Thu, 8 Dec 2016 11:53:30 -0800 Subject: [PATCH] Implement the --cert-name flag to select a lineage by its name, and the rename verb. (#3785) * Rename and simplify main functions * pass certname to auth method * find cert by certname flag * Implement --cert-name command * don't ask to confirm new cert when we have domains and no existing certs with the lineage name * Refactor and add --new-cert-name flag * add interactivity to rename verb * allow noninteractive and more descriptive function names --- certbot/cert_manager.py | 109 +++++++++++++ certbot/cli.py | 23 ++- certbot/client.py | 8 +- certbot/main.py | 136 ++++++++++------- certbot/renewal.py | 10 +- certbot/storage.py | 33 +++- certbot/tests/cert_manager_test.py | 236 +++++++++++++++++++++++++++++ certbot/tests/cli_test.py | 1 + certbot/tests/client_test.py | 28 +++- certbot/tests/main_test.py | 130 +++++++++------- certbot/util.py | 1 - 11 files changed, 590 insertions(+), 125 deletions(-) diff --git a/certbot/cert_manager.py b/certbot/cert_manager.py index f16d19201..0f6f4c730 100644 --- a/certbot/cert_manager.py +++ b/certbot/cert_manager.py @@ -1,14 +1,19 @@ """Tools for managing certificates.""" import datetime import logging +import os import pytz import traceback import zope.component from certbot import configuration +from certbot import errors from certbot import interfaces from certbot import renewal from certbot import storage +from certbot import util + +from certbot.display import util as display_util logger = logging.getLogger(__name__) @@ -30,6 +35,43 @@ def update_live_symlinks(config): configuration.RenewerConfiguration(renewer_config), update_symlinks=True) +def rename_lineage(config): + """Rename the specified lineage to the new name. + + :param config: Configuration. + :type config: :class:`certbot.interfaces.IConfig` + + """ + disp = zope.component.getUtility(interfaces.IDisplay) + renewer_config = configuration.RenewerConfiguration(config) + + certname = config.certname + if not certname: + filenames = renewal.renewal_conf_files(renewer_config) + choices = [storage.lineagename_for_filename(name) for name in filenames] + if not choices: + raise errors.Error("No existing certificates found.") + code, index = disp.menu("Which certificate would you like to rename?", + choices, ok_label="Select", flag="--cert-name") + if code != display_util.OK or not index in range(0, len(choices)): + raise errors.Error("User ended interaction.") + certname = choices[index] + + new_certname = config.new_certname + if not new_certname: + code, new_certname = disp.input("Enter the new name for certificate {0}" + .format(certname), flag="--updated-cert-name") + if code != display_util.OK or not new_certname: + raise errors.Error("User ended interaction.") + + lineage = lineage_for_certname(config, certname) + if not lineage: + raise errors.ConfigurationError("No existing certificate with name " + "{0} found.".format(certname)) + storage.rename_renewal_config(certname, new_certname, renewer_config) + disp.notification("Successfully renamed {0} to {1}." + .format(certname, new_certname), pause=False) + def _report_lines(msgs): """Format a results report for a category of single-line renewal outcomes""" return " " + "\n ".join(str(msg) for msg in msgs) @@ -106,3 +148,70 @@ def certificates(config): # Describe all the certs _describe_certs(parsed_certs, parse_failures) + +def _search_lineages(config, func, initial_rv): + """Iterate func over unbroken lineages, allowing custom return conditions. + + Allows flexible customization of return values, including multiple + return values and complex checks. + """ + cli_config = configuration.RenewerConfiguration(config) + configs_dir = cli_config.renewal_configs_dir + # Verify the directory is there + util.make_or_verify_dir(configs_dir, mode=0o755, uid=os.geteuid()) + + rv = initial_rv + for renewal_file in renewal.renewal_conf_files(cli_config): + try: + candidate_lineage = storage.RenewableCert(renewal_file, cli_config) + except (errors.CertStorageError, IOError): + logger.debug("Renewal conf file %s is broken. Skipping.", renewal_file) + logger.debug("Traceback was:\n%s", traceback.format_exc()) + continue + rv = func(candidate_lineage, rv) + return rv + +def lineage_for_certname(config, certname): + """Find a lineage object with name certname.""" + def update_cert_for_name_match(candidate_lineage, rv): + """Return cert if it has name certname, else return rv + """ + matching_lineage_name_cert = rv + if candidate_lineage.lineagename == certname: + matching_lineage_name_cert = candidate_lineage + return matching_lineage_name_cert + return _search_lineages(config, update_cert_for_name_match, None) + +def domains_for_certname(config, certname): + """Find the domains in the cert with name certname.""" + def update_domains_for_name_match(candidate_lineage, rv): + """Return domains if certname matches, else return rv + """ + matching_domains = rv + if candidate_lineage.lineagename == certname: + matching_domains = candidate_lineage.names() + return matching_domains + return _search_lineages(config, update_domains_for_name_match, None) + +def find_duplicative_certs(config, domains): + """Find existing certs that duplicate the request.""" + def update_certs_for_domain_matches(candidate_lineage, rv): + """Return cert as identical_names_cert if it matches, + or subset_names_cert if it matches as subset + """ + # TODO: Handle these differently depending on whether they are + # expired or still valid? + identical_names_cert, subset_names_cert = rv + candidate_names = set(candidate_lineage.names()) + if candidate_names == set(domains): + identical_names_cert = candidate_lineage + elif candidate_names.issubset(set(domains)): + # This logic finds and returns the largest subset-names cert + # in the case where there are several available. + if subset_names_cert is None: + subset_names_cert = candidate_lineage + elif len(candidate_names) > len(subset_names_cert.names()): + subset_names_cert = candidate_lineage + return (identical_names_cert, subset_names_cert) + + return _search_lineages(config, update_certs_for_domain_matches, (None, None)) diff --git a/certbot/cli.py b/certbot/cli.py index d327240f4..356a03764 100644 --- a/certbot/cli.py +++ b/certbot/cli.py @@ -68,6 +68,7 @@ cert. Major SUBCOMMANDS are: rollback Rollback server configuration changes made during install config_changes Show changes made to server config during installation update_symlinks Update cert symlinks based on renewal config file + rename Update a certificate's name plugins Display information about installed plugins certificates Display information about certs configured with Certbot @@ -326,7 +327,7 @@ class HelpfulArgumentParser(object): "register": main.register, "renew": main.renew, "revoke": main.revoke, "rollback": main.rollback, "everything": main.run, "update_symlinks": main.update_symlinks, - "certificates": main.certificates} + "certificates": main.certificates, "rename": main.rename} # List of topics for which additional help can be provided HELP_TOPICS = ["all", "security", "paths", "automation", "testing"] + list(self.VERBS) @@ -686,6 +687,19 @@ def prepare_and_parse_args(plugins, args, detect_defaults=False): # pylint: dis help="Domain names to apply. For multiple domains you can use " "multiple -d flags or enter a comma separated list of domains " "as a parameter.") + helpful.add( + [None, "run", "certonly"], + "--cert-name", dest="certname", + metavar="CERTNAME", default=None, + help="Certificate name to apply. Only one certificate name can be used " + "per Certbot run. To see certificate names, run 'certbot certificates'." + "If there is no existing certificate with this name and " + "domains are requested, create a new certificate with this name.") + helpful.add( + "rename", + "--updated-cert-name", dest="new_certname", + metavar="NEW_CERTNAME", default=None, + help="New name for the certificate. Must be a valid filename.") helpful.add( [None, "testing", "renew", "certonly"], "--dry-run", action="store_true", dest="dry_run", @@ -738,6 +752,12 @@ def prepare_and_parse_args(plugins, args, detect_defaults=False): # pylint: dis "regardless of whether it is near expiry. (Often " "--keep-until-expiring is more appropriate). Also implies " "--expand.") + helpful.add( + "automation", "--renew-with-new-domains", + action="store_true", dest="renew_with_new_domains", help="If a " + "certificate already exists for the requested certificate name " + "but does not match the requested domains, renew it now, " + "regardless of whether it is near expiry.") helpful.add( ["automation", "renew", "certonly"], "--allow-subset-of-names", action="store_true", @@ -1015,7 +1035,6 @@ class _DomainsAction(argparse.Action): """Just wrap add_domains in argparseese.""" add_domains(namespace, domain) - def add_domains(args_or_config, domains): """Registers new domains to be used during the current client run. diff --git a/certbot/client.py b/certbot/client.py index 4d6de6375..d58f9457f 100644 --- a/certbot/client.py +++ b/certbot/client.py @@ -263,7 +263,7 @@ class Client(object): return (self.obtain_certificate_from_csr(domains, csr, authzr=authzr) + (key, csr)) - def obtain_and_enroll_certificate(self, domains): + def obtain_and_enroll_certificate(self, domains, certname): """Obtain and enroll certificate. Get a new certificate for the specified domains using the specified @@ -272,6 +272,7 @@ class Client(object): :param list domains: Domains to request. :param plugins: A PluginsFactory object. + :param str certname: Name of new cert :returns: A new :class:`certbot.storage.RenewableCert` instance referred to the enrolled cert lineage, False if the cert could not @@ -286,13 +287,14 @@ class Client(object): "Non-standard path(s), might not work with crontab installed " "by your operating system package manager") + new_name = certname if certname else domains[0] if self.config.dry_run: logger.debug("Dry run: Skipping creating new lineage for %s", - domains[0]) + new_name) return None else: return storage.RenewableCert.new_lineage( - domains[0], OpenSSL.crypto.dump_certificate( + new_name, OpenSSL.crypto.dump_certificate( OpenSSL.crypto.FILETYPE_PEM, certr.body.wrapped), key.pem, crypto_util.dump_pyopenssl_chain(chain), configuration.RenewerConfiguration(self.config.namespace)) diff --git a/certbot/main.py b/certbot/main.py index 471dcd838..e3c271162 100644 --- a/certbot/main.py +++ b/certbot/main.py @@ -30,7 +30,6 @@ from certbot import interfaces from certbot import util from certbot import reporter from certbot import renewal -from certbot import storage from certbot.display import util as display_util, ops as display_ops from certbot.plugins import disco as plugins_disco @@ -42,6 +41,9 @@ _PERM_ERR_FMT = os.linesep.join(( "If running as non-root, set --config-dir, " "--logs-dir, and --work-dir to writeable paths.")) +USER_CANCELLED = ("User chose to cancel the operation and may " + "reinvoke the client.") + logger = logging.getLogger(__name__) @@ -68,17 +70,21 @@ def _report_successful_dry_run(config): reporter_util.HIGH_PRIORITY, on_crash=False) -def _auth_from_domains(le_client, config, domains, lineage=None): +def _auth_from_available(le_client, config, domains=None, certname=None, lineage=None): """Authenticate and enroll certificate. - :returns: Tuple of (str action, cert_or_None) as per _treat_as_renewal + This method finds the relevant lineage, figures out what to do with it, + then performs that action. Includes calls to hooks, various reports, + checks, and requests for user input. + + :returns: Tuple of (str action, cert_or_None) as per _find_lineage_for_domains_and_certname action can be: "newcert" | "renew" | "reinstall" """ # If lineage is specified, use that one instead of looking around for # a matching one. if lineage is None: # This will find a relevant matching lineage that exists - action, lineage = _treat_as_renewal(config, domains) + action, lineage = _find_lineage_for_domains_and_certname(config, domains, certname) else: # Renewal, where we already know the specific lineage we're # interested in @@ -94,11 +100,11 @@ def _auth_from_domains(le_client, config, domains, lineage=None): try: if action == "renew": logger.info("Renewing an existing certificate") - renewal.renew_cert(config, domains, le_client, lineage) + renewal.renew_cert(config, le_client, lineage) elif action == "newcert": # TREAT AS NEW REQUEST logger.info("Obtaining a new certificate") - lineage = le_client.obtain_and_enroll_certificate(domains) + lineage = le_client.obtain_and_enroll_certificate(domains, certname) if lineage is False: raise errors.Error("Certificate could not be obtained") finally: @@ -115,7 +121,7 @@ def _handle_subset_cert_request(config, domains, cert): :param storage.RenewableCert cert: - :returns: Tuple of (str action, cert_or_None) as per _treat_as_renewal + :returns: Tuple of (str action, cert_or_None) as per _find_lineage_for_domains_and_certname action can be: "newcert" | "renew" | "reinstall" :rtype: tuple @@ -147,9 +153,7 @@ def _handle_subset_cert_request(config, domains, cert): br=os.linesep ), reporter_util.HIGH_PRIORITY) - raise errors.Error( - "User chose to cancel the operation and may " - "reinvoke the client.") + raise errors.Error(USER_CANCELLED) def _handle_identical_cert_request(config, lineage): @@ -157,7 +161,7 @@ def _handle_identical_cert_request(config, lineage): :param storage.RenewableCert lineage: - :returns: Tuple of (str action, cert_or_None) as per _treat_as_renewal + :returns: Tuple of (str action, cert_or_None) as per _find_lineage_for_domains_and_certname action can be: "newcert" | "renew" | "reinstall" :rtype: tuple @@ -171,8 +175,8 @@ def _handle_identical_cert_request(config, lineage): # reinstalled without further prompting. return "reinstall", lineage question = ( - "You have an existing certificate that contains exactly the same " - "domains you requested and isn't close to expiry." + "You have an existing certificate that has exactly the same " + "domains or certificate name you requested and isn't close to expiry." "{br}(ref: {0}){br}{br}What would you like to do?" ).format(lineage.configfile.filename, br=os.linesep) @@ -198,8 +202,7 @@ def _handle_identical_cert_request(config, lineage): else: assert False, "This is impossible" - -def _treat_as_renewal(config, domains): +def _find_lineage_for_domains(config, domains): """Determine whether there are duplicated names and how to handle them (renew, reinstall, newcert, or raising an error to stop the client run if the user chooses to cancel the operation when @@ -219,7 +222,7 @@ def _treat_as_renewal(config, domains): if config.duplicate: return "newcert", None # TODO: Also address superset case - ident_names_cert, subset_names_cert = _find_duplicative_certs(config, domains) + ident_names_cert, subset_names_cert = cert_manager.find_duplicative_certs(config, domains) # XXX ^ schoen is not sure whether that correctly reads the systemwide # configuration file. if ident_names_cert is None and subset_names_cert is None: @@ -230,51 +233,66 @@ def _treat_as_renewal(config, domains): elif subset_names_cert is not None: return _handle_subset_cert_request(config, domains, subset_names_cert) +def _find_lineage_for_domains_and_certname(config, domains, certname): + """Find appropriate lineage based on given domains and/or certname. -def _find_duplicative_certs(config, domains): - """Find existing certs that duplicate the request.""" + :returns: Two-element tuple containing desired new-certificate behavior as + a string token ("reinstall", "renew", or "newcert"), plus either + a RenewableCert instance or None if renewal shouldn't occur. - identical_names_cert, subset_names_cert = None, None + :raises .Error: If the user would like to rerun the client again. - cli_config = configuration.RenewerConfiguration(config) - configs_dir = cli_config.renewal_configs_dir - # Verify the directory is there - util.make_or_verify_dir(configs_dir, mode=0o755, uid=os.geteuid()) + """ + if not certname: + return _find_lineage_for_domains(config, domains) + else: + lineage = cert_manager.lineage_for_certname(config, certname) + if lineage: + if domains: + if set(cert_manager.domains_for_certname(config, certname)) != set(domains): + _ask_user_to_confirm_new_names(config, domains, certname, + lineage.names()) # raises if no + return "renew", lineage + # unnecessarily specified domains or no domains specified + return _handle_identical_cert_request(config, lineage) + else: + if domains: + return "newcert", None + else: + raise errors.ConfigurationError("No certificate with name {0} found. " + "Use -d to specify domains, or run certbot --certificates to see " + "possible certificate names.".format(certname)) - for renewal_file in renewal.renewal_conf_files(cli_config): - try: - candidate_lineage = storage.RenewableCert(renewal_file, cli_config) - except (errors.CertStorageError, IOError): - logger.warning("Renewal conf file %s is broken. Skipping.", renewal_file) - logger.debug("Traceback was:\n%s", traceback.format_exc()) - continue - # TODO: Handle these differently depending on whether they are - # expired or still valid? - candidate_names = set(candidate_lineage.names()) - if candidate_names == set(domains): - identical_names_cert = candidate_lineage - elif candidate_names.issubset(set(domains)): - # This logic finds and returns the largest subset-names cert - # in the case where there are several available. - if subset_names_cert is None: - subset_names_cert = candidate_lineage - elif len(candidate_names) > len(subset_names_cert.names()): - subset_names_cert = candidate_lineage +def _ask_user_to_confirm_new_names(config, new_domains, certname, old_domains): + """Ask user to confirm update cert certname to contain new_domains. + """ + if config.renew_with_new_domains: + return + msg = ("Confirm that you intend to update certificate {0} " + "to include domains {1}. Note that it previously " + "contained domains {2}.".format( + certname, + new_domains, + old_domains)) + obj = zope.component.getUtility(interfaces.IDisplay) + if not obj.yesno(msg, "Update cert", "Cancel", default=True): + raise errors.ConfigurationError("Specified mismatched cert name and domains.") - return identical_names_cert, subset_names_cert - - -def _find_domains(config, installer): +def _find_domains_or_certname(config, installer): + """Retrieve domains and certname from config or user input. + """ + domains = None if config.domains: domains = config.domains - else: + elif not config.certname: domains = display_ops.choose_names(installer) - if not domains: + if not domains and not config.certname: raise errors.Error("Please specify --domains, or --installer that " - "will help in domain names autodiscovery") + "will help in domain names autodiscovery, or " + "--cert-name for an existing certificate name.") - return domains + return domains, config.certname def _report_new_cert(config, cert_path, fullchain_path): @@ -429,7 +447,7 @@ def install(config, plugins): except errors.PluginSelectionError as e: return e.message - domains = _find_domains(config, installer) + domains, _ = _find_domains_or_certname(config, installer) le_client = _init_le_client(config, authenticator=None, installer=installer) assert config.cert_path is not None # required=True in the subparser le_client.deploy_certificate( @@ -485,6 +503,14 @@ def update_symlinks(config, unused_plugins): """ cert_manager.update_live_symlinks(config) +def rename(config, unused_plugins): + """Rename a certificate + + Use the information in the config file to rename an existing + lineage. + """ + cert_manager.rename_lineage(config) + def certificates(config, unused_plugins): """Display information about certs configured with Certbot """ @@ -521,12 +547,12 @@ def run(config, plugins): # pylint: disable=too-many-branches,too-many-locals except errors.PluginSelectionError as e: return e.message - domains = _find_domains(config, installer) + domains, certname = _find_domains_or_certname(config, installer) # TODO: Handle errors from _init_le_client? le_client = _init_le_client(config, authenticator, installer) - action, lineage = _auth_from_domains(le_client, config, domains) + action, lineage = _auth_from_available(le_client, config, domains, certname) le_client.deploy_certificate( domains, lineage.privkey, lineage.cert, @@ -576,8 +602,8 @@ def obtain_cert(config, plugins, lineage=None): # SHOWTIME: Possibly obtain/renew a cert, and set action to renew | newcert | reinstall if config.csr is None: # the common case - domains = _find_domains(config, installer) - action, _ = _auth_from_domains(le_client, config, domains, lineage) + domains, certname = _find_domains_or_certname(config, installer) + action, _ = _auth_from_available(le_client, config, domains, certname, lineage) else: assert lineage is None, "Did not expect a CSR with a RenewableCert" _csr_obtain_cert(config, le_client) diff --git a/certbot/renewal.py b/certbot/renewal.py index 09a58ed1b..064df8bd2 100644 --- a/certbot/renewal.py +++ b/certbot/renewal.py @@ -226,12 +226,12 @@ def _avoid_invalidating_lineage(config, lineage, original_server): "unless you use the --break-my-certs flag!".format(names)) -def renew_cert(config, domains, le_client, lineage): +def renew_cert(config, le_client, lineage): "Renew a certificate lineage." renewal_params = lineage.configuration["renewalparams"] original_server = renewal_params.get("server", cli.flag_default("server")) _avoid_invalidating_lineage(config, lineage, original_server) - new_certr, new_chain, new_key, _ = le_client.obtain_certificate(domains) + new_certr, new_chain, new_key, _ = le_client.obtain_certificate(lineage.names()) if config.dry_run: logger.debug("Dry run: skipping updating lineage at %s", os.path.dirname(lineage.cert)) @@ -245,7 +245,7 @@ def renew_cert(config, domains, le_client, lineage): lineage.save_successor(prior_version, new_cert, new_key.pem, new_chain, renewal_conf) lineage.update_all_links_to(lineage.latest_common_version()) - hooks.renew_hook(config, domains, lineage.live_dir) + hooks.renew_hook(config, lineage.names(), lineage.live_dir) def report(msgs, category): @@ -300,12 +300,12 @@ def renew_all_lineages(config): """Examine each lineage; renew if due and report results""" # This is trivially False if config.domains is empty - if any(domain not in config.webroot_map for domain in config.domains): + if any(domain not in config.webroot_map for domain in config.domains) or config.certname: # If more plugins start using cli.add_domains, # we may want to only log a warning here raise errors.Error("Currently, the renew verb is only capable of " "renewing all installed certificates that are due " - "to be renewed; individual domains cannot be " + "to be renewed; individual domains or lineages cannot be " "specified with this action. If you would like to " "renew specific certificates, use the certonly " "command. The renew verb may provide other options " diff --git a/certbot/storage.py b/certbot/storage.py index 2536e59ca..61ab69ff7 100644 --- a/certbot/storage.py +++ b/certbot/storage.py @@ -96,6 +96,25 @@ def write_renewal_config(o_filename, n_filename, archive_dir, target, relevant_d config.write(outfile=f) return config +def rename_renewal_config(prev_name, new_name, cli_config): + """Renames cli_config.certname's config to cli_config.new_certname. + + :param .RenewerConfiguration cli_config: parsed command line + arguments + """ + prev_filename = os.path.join( + cli_config.renewal_configs_dir, prev_name) + ".conf" + new_filename = os.path.join( + cli_config.renewal_configs_dir, new_name) + ".conf" + if os.path.exists(new_filename): + raise errors.ConfigurationError("The new certificate name " + "is already in use.") + try: + os.rename(prev_filename, new_filename) + except OSError: + raise errors.ConfigurationError("Please specify a valid filename " + "for the new certificate name.") + def update_configuration(lineagename, archive_dir, target, cli_config): """Modifies lineagename's config to contain the specified values. @@ -171,6 +190,14 @@ def relevant_values(all_values): for option, value in six.iteritems(all_values) if _relevant(option) and cli.option_was_set(option, value)) +def lineagename_for_filename(config_filename): + """Returns the lineagename for a configuration filename. + """ + if not config_filename.endswith(".conf"): + raise errors.CertStorageError( + "renewal config file name must end in .conf") + return os.path.basename(config_filename[:-len(".conf")]) + class RenewableCert(object): # pylint: disable=too-many-instance-attributes,too-many-public-methods @@ -220,11 +247,7 @@ class RenewableCert(object): """ self.cli_config = cli_config - if not config_filename.endswith(".conf"): - raise errors.CertStorageError( - "renewal config file name must end in .conf") - self.lineagename = os.path.basename( - config_filename[:-len(".conf")]) + self.lineagename = lineagename_for_filename(config_filename) # self.configuration should be used to read parameters that # may have been chosen based on default values from the diff --git a/certbot/tests/cert_manager_test.py b/certbot/tests/cert_manager_test.py index 68e095cd6..f3569dc00 100644 --- a/certbot/tests/cert_manager_test.py +++ b/certbot/tests/cert_manager_test.py @@ -8,8 +8,15 @@ import unittest import configobj import mock +from certbot import configuration +from certbot import errors + +from certbot.display import util as display_util from certbot.storage import ALL_FOUR +from certbot.tests import storage_test +from certbot.tests import util as test_util + class BaseCertManagerTest(unittest.TestCase): """Base class for setting up Cert Manager tests. """ @@ -63,6 +70,7 @@ class BaseCertManagerTest(unittest.TestCase): def tearDown(self): shutil.rmtree(self.tempdir) + class UpdateLiveSymlinksTest(BaseCertManagerTest): """Tests for certbot.cert_manager.update_live_symlinks """ @@ -96,6 +104,7 @@ class UpdateLiveSymlinksTest(BaseCertManagerTest): self.assertEqual(os.readlink(self.configs[domain][kind]), archive_paths[domain][kind]) + class CertificatesTest(BaseCertManagerTest): """Tests for certbot.cert_manager.certificates """ @@ -186,5 +195,232 @@ class CertificatesTest(BaseCertManagerTest): out = cert_manager._report_human_readable(parsed_certs) self.assertTrue('INVALID: TEST CERT' in out) + +class SearchLineagesTest(unittest.TestCase): + """Tests for certbot.cert_manager._search_lineages.""" + + @mock.patch('certbot.configuration.RenewerConfiguration') + @mock.patch('certbot.util.make_or_verify_dir') + @mock.patch('certbot.renewal.renewal_conf_files') + @mock.patch('certbot.storage.RenewableCert') + def test_cert_storage_error(self, mock_renewable_cert, mock_renewal_conf_files, + mock_make_or_verify_dir, mock_renewer_config): + mock_renewal_conf_files.return_value = ["badfile"] + mock_renewable_cert.side_effect = errors.CertStorageError + from certbot import cert_manager + # pylint: disable=protected-access + self.assertEqual(cert_manager._search_lineages(None, lambda x: x, "check"), "check") + self.assertTrue(mock_make_or_verify_dir.called) + self.assertTrue(mock_renewer_config) + + +class LineageForCertnameTest(unittest.TestCase): + """Tests for certbot.cert_manager.lineage_for_certname""" + + @mock.patch('certbot.configuration.RenewerConfiguration') + @mock.patch('certbot.util.make_or_verify_dir') + @mock.patch('certbot.renewal.renewal_conf_files') + @mock.patch('certbot.storage.RenewableCert') + def test_found_match(self, mock_renewable_cert, mock_renewal_conf_files, + mock_make_or_verify_dir, mock_renewer_config): + mock_renewal_conf_files.return_value = ["somefile.conf"] + mock_match = mock.Mock(lineagename="example.com") + mock_renewable_cert.return_value = mock_match + from certbot import cert_manager + self.assertEqual(cert_manager.lineage_for_certname(None, "example.com"), mock_match) + self.assertTrue(mock_make_or_verify_dir.called) + self.assertTrue(mock_renewer_config) + + @mock.patch('certbot.configuration.RenewerConfiguration') + @mock.patch('certbot.util.make_or_verify_dir') + @mock.patch('certbot.renewal.renewal_conf_files') + @mock.patch('certbot.storage.RenewableCert') + def test_no_match(self, mock_renewable_cert, mock_renewal_conf_files, + mock_make_or_verify_dir, mock_renewer_config): + mock_renewal_conf_files.return_value = ["somefile.conf"] + mock_match = mock.Mock(lineagename="other.com") + mock_renewable_cert.return_value = mock_match + from certbot import cert_manager + self.assertEqual(cert_manager.lineage_for_certname(None, "example.com"), None) + self.assertTrue(mock_make_or_verify_dir.called) + self.assertTrue(mock_renewer_config) + + +class DomainsForCertnameTest(unittest.TestCase): + """Tests for certbot.cert_manager.domains_for_certname""" + + @mock.patch('certbot.configuration.RenewerConfiguration') + @mock.patch('certbot.util.make_or_verify_dir') + @mock.patch('certbot.renewal.renewal_conf_files') + @mock.patch('certbot.storage.RenewableCert') + def test_found_match(self, mock_renewable_cert, mock_renewal_conf_files, + mock_make_or_verify_dir, mock_renewer_config): + mock_renewal_conf_files.return_value = ["somefile.conf"] + mock_match = mock.Mock(lineagename="example.com") + domains = ["example.com", "example.org"] + mock_match.names.return_value = domains + mock_renewable_cert.return_value = mock_match + from certbot import cert_manager + self.assertEqual(cert_manager.domains_for_certname(None, "example.com"), domains) + self.assertTrue(mock_make_or_verify_dir.called) + self.assertTrue(mock_renewer_config) + + @mock.patch('certbot.configuration.RenewerConfiguration') + @mock.patch('certbot.util.make_or_verify_dir') + @mock.patch('certbot.renewal.renewal_conf_files') + @mock.patch('certbot.storage.RenewableCert') + def test_no_match(self, mock_renewable_cert, mock_renewal_conf_files, + mock_make_or_verify_dir, mock_renewer_config): + mock_renewal_conf_files.return_value = ["somefile.conf"] + mock_match = mock.Mock(lineagename="example.com") + domains = ["example.com", "example.org"] + mock_match.names.return_value = domains + mock_renewable_cert.return_value = mock_match + from certbot import cert_manager + self.assertEqual(cert_manager.domains_for_certname(None, "other.com"), None) + self.assertTrue(mock_make_or_verify_dir.called) + self.assertTrue(mock_renewer_config) + + +class RenameLineageTest(BaseCertManagerTest): + """Tests for certbot.cert_manager.rename_lineage""" + + def setUp(self): + super(RenameLineageTest, self).setUp() + self.mock_config = configuration.RenewerConfiguration( + namespace=mock.MagicMock( + config_dir=self.tempdir, + work_dir=self.tempdir, + logs_dir=self.tempdir, + certname="example.org", + new_certname="after", + ) + ) + + def _call(self, *args, **kwargs): + from certbot import cert_manager + return cert_manager.rename_lineage(*args, **kwargs) + + @mock.patch('certbot.renewal.renewal_conf_files') + @mock.patch('certbot.main.zope.component.getUtility') + def test_no_certname(self, mock_get_utility, mock_renewal_conf_files): + mock_config = mock.Mock(certname=None, new_certname="two") + + # if not choices + mock_renewal_conf_files.return_value = [] + self.assertRaises(errors.Error, self._call, mock_config) + + mock_renewal_conf_files.return_value = ["one.conf"] + util_mock = mock.Mock() + util_mock.menu.return_value = (display_util.CANCEL, 0) + mock_get_utility.return_value = util_mock + self.assertRaises(errors.Error, self._call, mock_config) + + util_mock.menu.return_value = (display_util.OK, -1) + self.assertRaises(errors.Error, self._call, mock_config) + + @mock.patch('certbot.main.zope.component.getUtility') + def test_no_new_certname(self, mock_get_utility): + mock_config = mock.Mock(certname="one", new_certname=None) + + util_mock = mock.Mock() + util_mock.input.return_value = (display_util.CANCEL, "name") + mock_get_utility.return_value = util_mock + self.assertRaises(errors.Error, self._call, mock_config) + + util_mock = mock.Mock() + util_mock.input.return_value = (display_util.OK, None) + mock_get_utility.return_value = util_mock + self.assertRaises(errors.Error, self._call, mock_config) + + @mock.patch('certbot.main.zope.component.getUtility') + @mock.patch('certbot.cert_manager.lineage_for_certname') + def test_no_existing_certname(self, mock_lineage_for_certname, unused_get_utility): + mock_config = mock.Mock(certname="one", new_certname="two") + mock_lineage_for_certname.return_value = None + self.assertRaises(errors.ConfigurationError, + self._call, mock_config) + + @mock.patch('certbot.main.zope.component.getUtility') + @mock.patch("certbot.storage.RenewableCert._check_symlinks") + def test_rename_cert(self, mock_check, unused_get_utility): + mock_check.return_value = True + mock_config = self.mock_config + self._call(mock_config) + from certbot import cert_manager + updated_lineage = cert_manager.lineage_for_certname(mock_config, mock_config.new_certname) + self.assertTrue(updated_lineage is not None) + self.assertEqual(updated_lineage.lineagename, mock_config.new_certname) + + @mock.patch('certbot.main.zope.component.getUtility') + @mock.patch("certbot.storage.RenewableCert._check_symlinks") + def test_rename_cert_interactive_certname(self, mock_check, mock_get_utility): + mock_check.return_value = True + mock_config = self.mock_config + mock_config.certname = None + util_mock = mock.Mock() + util_mock.menu.return_value = (display_util.OK, 0) + mock_get_utility.return_value = util_mock + self._call(mock_config) + from certbot import cert_manager + updated_lineage = cert_manager.lineage_for_certname(mock_config, mock_config.new_certname) + self.assertTrue(updated_lineage is not None) + self.assertEqual(updated_lineage.lineagename, mock_config.new_certname) + + @mock.patch('certbot.main.zope.component.getUtility') + @mock.patch("certbot.storage.RenewableCert._check_symlinks") + def test_rename_cert_bad_new_certname(self, mock_check, unused_get_utility): + mock_check.return_value = True + mock_config = self.mock_config + + # for example, don't rename to existing certname + mock_config.new_certname = "example.org" + self.assertRaises(errors.ConfigurationError, self._call, mock_config) + + mock_config.new_certname = "one/two" + self.assertRaises(errors.ConfigurationError, self._call, mock_config) + + +class DuplicativeCertsTest(storage_test.BaseRenewableCertTest): + """Test to avoid duplicate lineages.""" + + def setUp(self): + super(DuplicativeCertsTest, self).setUp() + self.config.write() + self._write_out_ex_kinds() + + def tearDown(self): + shutil.rmtree(self.tempdir) + + @mock.patch('certbot.util.make_or_verify_dir') + def test_find_duplicative_names(self, unused_makedir): + from certbot.cert_manager import find_duplicative_certs + test_cert = test_util.load_vector('cert-san.pem') + with open(self.test_rc.cert, 'wb') as f: + f.write(test_cert) + + # No overlap at all + result = find_duplicative_certs( + self.cli_config, ['wow.net', 'hooray.org']) + self.assertEqual(result, (None, None)) + + # Totally identical + result = find_duplicative_certs( + self.cli_config, ['example.com', 'www.example.com']) + self.assertTrue(result[0].configfile.filename.endswith('example.org.conf')) + self.assertEqual(result[1], None) + + # Superset + result = find_duplicative_certs( + self.cli_config, ['example.com', 'www.example.com', 'something.new']) + self.assertEqual(result[0], None) + self.assertTrue(result[1].configfile.filename.endswith('example.org.conf')) + + # Partial overlap doesn't count + result = find_duplicative_certs( + self.cli_config, ['example.com', 'something.new']) + self.assertEqual(result, (None, None)) + + if __name__ == "__main__": unittest.main() # pragma: no cover diff --git a/certbot/tests/cli_test.py b/certbot/tests/cli_test.py index 03c8c6fbd..72aea50ea 100644 --- a/certbot/tests/cli_test.py +++ b/certbot/tests/cli_test.py @@ -125,6 +125,7 @@ class ParseTest(unittest.TestCase): self.assertTrue("--key-path" not in out) out = self._help_output(['-h']) + self.assertTrue(cli.usage_strings(self.plugins)[0] in out) def test_parse_domains(self): diff --git a/certbot/tests/client_test.py b/certbot/tests/client_test.py index b9f5cf15b..bf091a478 100644 --- a/certbot/tests/client_test.py +++ b/certbot/tests/client_test.py @@ -106,9 +106,14 @@ class RegisterTest(unittest.TestCase): class ClientTestCommon(unittest.TestCase): """Common base class for certbot.client.Client tests.""" def setUp(self): + self.config = mock.MagicMock( + no_verify_ssl=False, + config_dir="/etc/letsencrypt", + work_dir="/var/lib/letsencrypt", + allow_subset_of_names=False) + # pylint: disable=star-args self.account = mock.MagicMock(**{"key.pem": KEY}) - self.config = mock.MagicMock(no_verify_ssl=False) from certbot.client import Client with mock.patch("certbot.client.acme_client.Client") as acme: @@ -221,6 +226,27 @@ class ClientTest(ClientTestCommon): mock.sentinel.key, domains, self.config.csr_dir) self._check_obtain_certificate() + @mock.patch('certbot.client.Client.obtain_certificate') + @mock.patch('certbot.storage.RenewableCert.new_lineage') + @mock.patch('OpenSSL.crypto.dump_certificate') + def test_obtain_and_enroll_certificate(self, mock_dump_certificate, + mock_storage, mock_obtain_certificate): + domains = ["example.com", "www.example.com"] + mock_obtain_certificate.return_value = (mock.MagicMock(), + mock.MagicMock(), mock.MagicMock(), None) + + self.client.config.dry_run = False + self.assertTrue(self.client.obtain_and_enroll_certificate(domains, "example_cert")) + + self.assertTrue(self.client.obtain_and_enroll_certificate(domains, None)) + + self.client.config.dry_run = True + + self.assertFalse(self.client.obtain_and_enroll_certificate(domains, None)) + + self.assertTrue(mock_storage.call_count == 2) + self.assertTrue(mock_dump_certificate.call_count == 2) + @mock.patch("certbot.cli.helpful_parser") def test_save_certificate(self, mock_parser): # pylint: disable=too-many-locals diff --git a/certbot/tests/main_test.py b/certbot/tests/main_test.py index 68e5ef00a..5e1ce1ab5 100644 --- a/certbot/tests/main_test.py +++ b/certbot/tests/main_test.py @@ -30,7 +30,6 @@ from certbot import util from certbot.plugins import disco from certbot.plugins import manual -from certbot.tests import storage_test import certbot.tests.util as test_util CERT_PATH = test_util.vector_path('cert.pem') @@ -56,7 +55,7 @@ class RunTest(unittest.TestCase): def setUp(self): self.domain = 'example.org' self.patches = [ - mock.patch('certbot.main._auth_from_domains'), + mock.patch('certbot.main._auth_from_available'), mock.patch('certbot.main.display_ops.success_installation'), mock.patch('certbot.main.display_ops.success_renewal'), mock.patch('certbot.main._init_le_client'), @@ -118,7 +117,7 @@ class ObtainCertTest(unittest.TestCase): return mock_init() # returns the client - @mock.patch('certbot.main._auth_from_domains') + @mock.patch('certbot.main._auth_from_available') def test_no_reinstall_text_pause(self, mock_auth): mock_notification = self.mock_get_utility().notification mock_notification.side_effect = self._assert_no_pause @@ -129,6 +128,71 @@ class ObtainCertTest(unittest.TestCase): # pylint: disable=unused-argument self.assertFalse(pause) + @mock.patch('certbot.cert_manager.lineage_for_certname') + @mock.patch('certbot.cert_manager.domains_for_certname') + @mock.patch('certbot.renewal.renew_cert') + @mock.patch('certbot.main._report_new_cert') + def test_find_lineage_for_domains_and_certname(self, mock_report_cert, + mock_renew_cert, mock_domains, mock_lineage): + domains = ['example.com', 'test.org'] + mock_domains.return_value = domains + mock_lineage.names.return_value = domains + self._call(('certonly --webroot -d example.com -d test.org ' + '--cert-name example.com').split()) + self.assertTrue(mock_lineage.call_count == 1) + self.assertTrue(mock_domains.call_count == 1) + self.assertTrue(mock_renew_cert.call_count == 1) + self.assertTrue(mock_report_cert.call_count == 1) + + # user confirms updating lineage with new domains + self._call(('certonly --webroot -d example.com -d test.com ' + '--cert-name example.com').split()) + self.assertTrue(mock_lineage.call_count == 2) + self.assertTrue(mock_domains.call_count == 2) + self.assertTrue(mock_renew_cert.call_count == 2) + self.assertTrue(mock_report_cert.call_count == 2) + + # error in _ask_user_to_confirm_new_names + util_mock = mock.Mock() + util_mock.yesno.return_value = False + self.mock_get_utility.return_value = util_mock + self.assertRaises(errors.ConfigurationError, self._call, + ('certonly --webroot -d example.com -d test.com --cert-name example.com').split()) + + @mock.patch('certbot.cert_manager.lineage_for_certname') + @mock.patch('certbot.main._report_new_cert') + def test_find_lineage_for_domains_new_certname(self, mock_report_cert, + mock_lineage): + mock_lineage.return_value = None + + # no lineage with this name but we specified domains so create a new cert + self._call(('certonly --webroot -d example.com -d test.com ' + '--cert-name example.com').split()) + self.assertTrue(mock_lineage.call_count == 1) + self.assertTrue(mock_report_cert.call_count == 1) + + # no lineage with this name and we didn't give domains + self.assertRaises(errors.ConfigurationError, self._call, + ('certonly --webroot --cert-name example.com').split()) + +class FindDomainsOrCertnameTest(unittest.TestCase): + """Tests for certbot.main._find_domains_or_certname.""" + + @mock.patch('certbot.display.ops.choose_names') + def test_display_ops(self, mock_choose_names): + mock_config = mock.Mock(domains=None, certname=None) + mock_choose_names.return_value = "domainname" + # pylint: disable=protected-access + self.assertEqual(main._find_domains_or_certname(mock_config, None), + ("domainname", None)) + + @mock.patch('certbot.display.ops.choose_names') + def test_no_results(self, mock_choose_names): + mock_config = mock.Mock(domains=None, certname=None) + mock_choose_names.return_value = [] + # pylint: disable=protected-access + self.assertRaises(errors.Error, main._find_domains_or_certname, mock_config, None) + class RevokeTest(unittest.TestCase): """Tests for certbot.main.revoke.""" @@ -333,47 +397,6 @@ class DetermineAccountTest(unittest.TestCase): self.assertEqual('other email', self.config.email) -class DuplicativeCertsTest(storage_test.BaseRenewableCertTest): - """Test to avoid duplicate lineages.""" - - def setUp(self): - super(DuplicativeCertsTest, self).setUp() - self.config.write() - self._write_out_ex_kinds() - - def tearDown(self): - shutil.rmtree(self.tempdir) - - @mock.patch('certbot.util.make_or_verify_dir') - def test_find_duplicative_names(self, unused_makedir): - from certbot.main import _find_duplicative_certs - test_cert = test_util.load_vector('cert-san.pem') - with open(self.test_rc.cert, 'wb') as f: - f.write(test_cert) - - # No overlap at all - result = _find_duplicative_certs( - self.cli_config, ['wow.net', 'hooray.org']) - self.assertEqual(result, (None, None)) - - # Totally identical - result = _find_duplicative_certs( - self.cli_config, ['example.com', 'www.example.com']) - self.assertTrue(result[0].configfile.filename.endswith('example.org.conf')) - self.assertEqual(result[1], None) - - # Superset - result = _find_duplicative_certs( - self.cli_config, ['example.com', 'www.example.com', 'something.new']) - self.assertEqual(result[0], None) - self.assertTrue(result[1].configfile.filename.endswith('example.org.conf')) - - # Partial overlap doesn't count - result = _find_duplicative_certs( - self.cli_config, ['example.com', 'something.new']) - self.assertEqual(result, (None, None)) - - class MainTest(unittest.TestCase): # pylint: disable=too-many-public-methods """Tests for different commands.""" @@ -445,7 +468,7 @@ class MainTest(unittest.TestCase): # pylint: disable=too-many-public-methods self._cli_missing_flag(args, "specify a plugin") args.extend(['--standalone', '-d', 'eg.is']) self._cli_missing_flag(args, "register before running") - with mock.patch('certbot.main._auth_from_domains'): + with mock.patch('certbot.main._auth_from_available'): with mock.patch('certbot.main.client.acme_from_config_key'): args.extend(['--email', 'io@io.is']) self._cli_missing_flag(args, "--agree-tos") @@ -453,14 +476,14 @@ class MainTest(unittest.TestCase): # pylint: disable=too-many-public-methods @mock.patch('certbot.main.client.acme_client.Client') @mock.patch('certbot.main._determine_account') @mock.patch('certbot.main.client.Client.obtain_and_enroll_certificate') - @mock.patch('certbot.main._auth_from_domains') - def test_user_agent(self, afd, _obt, det, _client): + @mock.patch('certbot.main._auth_from_available') + def test_user_agent(self, afa, _obt, det, _client): # Normally the client is totally mocked out, but here we need more # arguments to automate it... args = ["--standalone", "certonly", "-m", "none@none.com", "-d", "example.com", '--agree-tos'] + self.standard_args det.return_value = mock.MagicMock(), None - afd.return_value = "newcert", mock.MagicMock() + afa.return_value = "newcert", mock.MagicMock() with mock.patch('certbot.main.client.acme_client.ClientNetwork') as acme_net: self._call_no_clientmock(args) @@ -512,8 +535,8 @@ class MainTest(unittest.TestCase): # pylint: disable=too-many-public-methods self._cli_missing_flag(["--standalone"], "With the standalone plugin, you probably") with mock.patch("certbot.main._init_le_client") as mock_init: - with mock.patch("certbot.main._auth_from_domains") as mock_afd: - mock_afd.return_value = (mock.MagicMock(), mock.MagicMock()) + with mock.patch("certbot.main._auth_from_available") as mock_afa: + mock_afa.return_value = (mock.MagicMock(), mock.MagicMock()) self._call(["certonly", "--manual", "-d", "foo.bar"]) unused_config, auth, unused_installer = mock_init.call_args[0] self.assertTrue(isinstance(auth, manual.Authenticator)) @@ -664,7 +687,7 @@ class MainTest(unittest.TestCase): # pylint: disable=too-many-public-methods 'certonly -d example.org --csr {0}'.format(CSR).split()) def _certonly_new_request_common(self, mock_client, args=None): - with mock.patch('certbot.main._treat_as_renewal') as mock_renewal: + with mock.patch('certbot.main._find_lineage_for_domains_and_certname') as mock_renewal: mock_renewal.return_value = ("newcert", None) with mock.patch('certbot.main._init_le_client') as mock_init: mock_init.return_value = mock_client @@ -718,6 +741,7 @@ class MainTest(unittest.TestCase): # pylint: disable=too-many-public-methods mock_lineage = mock.MagicMock(cert=cert_path, fullchain=chain_path) mock_lineage.should_autorenew.return_value = due_for_renewal mock_lineage.has_pending_deployment.return_value = False + mock_lineage.names.return_value = ['isnot.org'] mock_certr = mock.MagicMock() mock_key = mock.MagicMock(pem='pem_key') mock_client = mock.MagicMock() @@ -725,7 +749,7 @@ class MainTest(unittest.TestCase): # pylint: disable=too-many-public-methods mock_client.obtain_certificate.return_value = (mock_certr, 'chain', mock_key, 'csr') try: - with mock.patch('certbot.main._find_duplicative_certs') as mock_fdc: + with mock.patch('certbot.cert_manager.find_duplicative_certs') as mock_fdc: mock_fdc.return_value = (mock_lineage, None) with mock.patch('certbot.main._init_le_client') as mock_init: mock_init.return_value = mock_client @@ -945,7 +969,7 @@ class MainTest(unittest.TestCase): # pylint: disable=too-many-public-methods should_renew=False, error_expected=True) @mock.patch('certbot.main.zope.component.getUtility') - @mock.patch('certbot.main._treat_as_renewal') + @mock.patch('certbot.main._find_lineage_for_domains_and_certname') @mock.patch('certbot.main._init_le_client') def test_certonly_reinstall(self, mock_init, mock_renewal, mock_get_utility): mock_renewal.return_value = ('reinstall', mock.MagicMock()) diff --git a/certbot/util.py b/certbot/util.py index 1597c5bd8..7d49a66a3 100644 --- a/certbot/util.py +++ b/certbot/util.py @@ -425,7 +425,6 @@ def enforce_le_validity(domain): label, domain)) return domain - def enforce_domain_sanity(domain): """Method which validates domain value and errors out if the requirements are not met.