More thoroughly rename during certbot rename. (#4320)

* rename more files in rename command

* Revert "Hide rename command (#4007)"

This reverts commit 8c14de13a5.

* Rename files in configuration files

* Delete new files if we fail during the renaming process

* update tests and error catching

* More expressive error message
This commit is contained in:
Erica Portnoy 2017-04-04 11:20:58 -07:00 committed by GitHub
parent e194e0dd5f
commit 43dccfc671
5 changed files with 157 additions and 27 deletions

View file

@ -13,6 +13,7 @@ from certbot import storage
from certbot import util
from certbot.display import util as display_util
from certbot.plugins import disco as plugins_disco
logger = logging.getLogger(__name__)
@ -46,6 +47,7 @@ def rename_lineage(config):
certname = _get_certname(config, "rename")
# what is the new name we want to use?
new_certname = config.new_certname
if not new_certname:
code, new_certname = disp.input(
@ -54,13 +56,34 @@ def rename_lineage(config):
if code != display_util.OK or not new_certname:
raise errors.Error("User ended interaction.")
lineage = lineage_for_certname(config, certname)
if not lineage:
raise errors.ConfigurationError("No existing certificate with name "
"{0} found.".format(certname))
storage.rename_renewal_config(certname, new_certname, config)
disp.notification("Successfully renamed {0} to {1}."
.format(certname, new_certname), pause=False)
try:
# copy files to new name
new_lineage = storage.duplicate_lineage(config, certname, new_certname)
# install the new name's files
config.certname = new_certname
plugins = plugins_disco.PluginsRegistry.find_all()
from certbot.main import install
install(config, plugins, new_lineage, False)
except (errors.CertStorageError, errors.ConfigurationError, IOError, OSError) as e:
# delete the new files
config.certname = new_certname
# we might not have created anything to delete
try:
storage.delete_files(config, new_certname)
except errors.CertStorageError:
pass
reporter = zope.component.getUtility(interfaces.IReporter)
reporter.add_message("Unable to rename certificate", reporter.HIGH_PRIORITY)
raise e
else:
# delete old files
config.certname = certname
storage.delete_files(config, certname)
disp.notification("Renamed files for {0} to {1}."
.format(certname, new_certname), pause=False)
finally:
config.certname = certname
def certificates(config):
"""Display information about certs configured with Certbot

View file

@ -81,6 +81,7 @@ obtain, install, and renew certificates:
manage certificates:
certificates Display information about certs you have from Certbot
revoke Revoke a certificate (supply --cert-path)
rename Rename a certificate
delete Delete a certificate
manage your account with Let's Encrypt:
@ -363,6 +364,10 @@ VERB_HELP = [
"opts": "Options for revocation of certs",
"usage": "\n\n certbot revoke --cert-path /path/to/fullchain.pem [options]\n\n"
}),
("rename", {
"short": "Change a certificate's name (for management purposes)",
"opts": "Options for changing certificate names"
}),
("register", {
"short": "Register for account with Let's Encrypt / other ACME server",
"opts": "Options for account registration & modification"
@ -420,6 +425,7 @@ class HelpfulArgumentParser(object):
"plugins": main.plugins_cmd,
"register": main.register,
"unregister": main.unregister,
"rename": main.rename,
"renew": main.renew,
"revoke": main.revoke,
"rollback": main.rollback,
@ -790,7 +796,7 @@ def _add_all_groups(helpful):
helpful.add_group("paths", description="Arguments changing execution paths & servers")
helpful.add_group("manage",
description="Various subcommands and flags are available for managing your certificates:",
verbs=["certificates", "delete", "renew", "revoke", "update_symlinks"])
verbs=["certificates", "delete", "rename", "renew", "revoke", "update_symlinks"])
# VERBS
for verb, docs in VERB_HELP:
@ -843,12 +849,17 @@ def prepare_and_parse_args(plugins, args, detect_defaults=False): # pylint: dis
"multiple -d flags or enter a comma separated list of domains "
"as a parameter. (default: Ask)")
helpful.add(
[None, "run", "certonly", "manage", "delete", "certificates"],
[None, "run", "certonly", "manage", "rename", "delete", "certificates"],
"--cert-name", dest="certname",
metavar="CERTNAME", default=None,
help="Certificate name to apply. Only one certificate name can be used "
"per Certbot run. To see certificate names, run 'certbot certificates'. "
"When creating a new certificate, specifies the new certificate's name.")
helpful.add(
["rename", "manage"],
"--updated-cert-name", dest="new_certname",
metavar="NEW_CERTNAME", default=None,
help="New name for the certificate. Must be a valid filename.")
helpful.add(
[None, "testing", "renew", "certonly"],
"--dry-run", action="store_true", dest="dry_run",

View file

@ -460,15 +460,16 @@ def register(config, unused_plugins):
eff.handle_subscription(config)
add_msg("Your e-mail address was updated to {0}.".format(config.email))
def _install_cert(config, le_client, domains, lineage=None):
def _install_cert(config, le_client, domains, lineage=None, enhance=True):
path_provider = lineage if lineage else config
assert path_provider.cert_path is not None
le_client.deploy_certificate(domains, path_provider.key_path,
path_provider.cert_path, path_provider.chain_path, path_provider.fullchain_path)
le_client.enhance_config(domains, path_provider.chain_path)
if enhance:
le_client.enhance_config(domains, path_provider.chain_path)
def install(config, plugins):
def install(config, plugins, lineage=None, enhance=True):
"""Install a previously obtained cert in a server."""
# XXX: Update for renewer/RenewableCert
# FIXME: be consistent about whether errors are raised or returned from
@ -481,7 +482,7 @@ def install(config, plugins):
domains, _ = _find_domains_or_certname(config, installer)
le_client = _init_le_client(config, authenticator=None, installer=installer)
_install_cert(config, le_client, domains)
_install_cert(config, le_client, domains, lineage, enhance)
def plugins_cmd(config, plugins): # TODO: Use IDisplay rather than print

View file

@ -34,7 +34,9 @@ def renewal_conf_files(config):
return glob.glob(os.path.join(config.renewal_configs_dir, "*.conf"))
def renewal_file_for_certname(config, certname):
"""Return /path/to/certname.conf in the renewal conf directory"""
"""Return /path/to/certname.conf in the renewal conf directory
:raises .CertStorageError: if file is missing
"""
path = os.path.join(config.renewal_configs_dir, "{0}.conf".format(certname))
if not os.path.exists(path):
raise errors.CertStorageError("No certificate found with name {0} (expected "
@ -130,6 +132,8 @@ def rename_renewal_config(prev_name, new_name, cli_config):
except OSError:
raise errors.ConfigurationError("Please specify a valid filename "
"for the new certificate name.")
else:
return new_filename
def update_configuration(lineagename, archive_dir, target, cli_config):
@ -146,19 +150,25 @@ def update_configuration(lineagename, archive_dir, target, cli_config):
"""
config_filename = renewal_filename_for_lineagename(cli_config, lineagename)
temp_filename = config_filename + ".new"
def _save_renewal_values(unused_config, temp_filename):
# Save only the config items that are relevant to renewal
values = relevant_values(vars(cli_config.namespace))
write_renewal_config(config_filename, temp_filename, archive_dir, target, values)
_modify_config_with_tempfile(config_filename, _save_renewal_values)
return configobj.ConfigObj(config_filename)
def _modify_config_with_tempfile(filename, function):
temp_filename = filename + ".new"
# If an existing tempfile exists, delete it
if os.path.exists(temp_filename):
os.unlink(temp_filename)
# Save only the config items that are relevant to renewal
values = relevant_values(vars(cli_config.namespace))
write_renewal_config(config_filename, temp_filename, archive_dir, target, values)
os.rename(temp_filename, config_filename)
return configobj.ConfigObj(config_filename)
config = configobj.ConfigObj(filename)
function(config, temp_filename)
os.rename(temp_filename, filename)
def get_link_target(link):
"""Get an absolute path to the target of link.
@ -243,6 +253,7 @@ def delete_files(config, certname):
"""Delete all files related to the certificate.
If some files are not found, ignore them and continue.
:raises .CertStorageError: if lineage is missing
"""
renewal_filename = renewal_file_for_certname(config, certname)
# file exists
@ -304,6 +315,79 @@ def delete_files(config, certname):
except OSError:
logger.debug("Unable to remove %s", archive_path)
def duplicate_lineage(config, certname, new_certname):
"""Create a duplicate of certname with name new_certname
:raises .CertStorageError: for storage errors
:raises .ConfigurationError: for cli and renewal configuration errors
:raises IOError: for filename errors
:raises OSError: for OS errors
"""
# copy renewal config file
prev_filename = renewal_filename_for_lineagename(config, certname)
new_filename = renewal_filename_for_lineagename(config, new_certname)
if os.path.exists(new_filename):
raise errors.ConfigurationError("The new certificate name "
"is already in use.")
try:
shutil.copy2(prev_filename, new_filename)
except (OSError, IOError):
raise errors.ConfigurationError("Please specify a valid filename "
"for the new certificate name.")
logger.debug("Copied %s to %s", prev_filename, new_filename)
# load config file
try:
renewal_config = configobj.ConfigObj(new_filename)
except configobj.ConfigObjError:
# config is corrupted
logger.warning("Could not parse %s. Only the certificate has been renamed.",
new_filename)
raise errors.CertStorageError(
"error parsing {0}".format(new_filename))
def copy_to_new_dir(prev_dir):
"""Replace certname with new_certname in prev_dir"""
new_dir = prev_dir.replace(certname, new_certname)
# make dir iff it doesn't exist
shutil.copytree(prev_dir, new_dir, symlinks=True)
logger.debug("Copied %s to %s", prev_dir, new_dir)
return new_dir
# archive dir
prev_archive_dir = _full_archive_path(renewal_config, config, certname)
new_archive_dir = prev_archive_dir
if not certname in prev_archive_dir:
raise errors.CertStorageError("Archive directory does not conform to defaults: "
"{0} not in {1}", certname, prev_archive_dir)
else:
new_archive_dir = copy_to_new_dir(prev_archive_dir)
# live dir
# if things aren't in their default places, don't try to change things.
prev_live_dir = _full_live_path(config, certname)
prev_links = dict((kind, renewal_config.get(kind)) for kind in ALL_FOUR)
if (certname not in prev_live_dir or
len(set(os.path.dirname(renewal_config.get(kind)) for kind in ALL_FOUR)) != 1):
raise errors.CertStorageError("Live directory does not conform to defaults.")
else:
copy_to_new_dir(prev_live_dir)
new_links = dict((k, prev_links[k].replace(certname, new_certname)) for k in prev_links)
# Update renewal config file
def _update_and_write(renewal_config, temp_filename):
renewal_config["archive_dir"] = new_archive_dir
renewal_config["version"] = certbot.__version__
for kind in ALL_FOUR:
renewal_config[kind] = new_links[kind]
with open(temp_filename, "wb") as f:
renewal_config.write(outfile=f)
_modify_config_with_tempfile(new_filename, _update_and_write)
# Update symlinks
return RenewableCert(new_filename, config, update_symlinks=True)
class RenewableCert(object):
# pylint: disable=too-many-instance-attributes,too-many-public-methods

View file

@ -384,14 +384,19 @@ class RenameLineageTest(BaseCertManagerTest):
@test_util.patch_get_utility()
@mock.patch('certbot.cert_manager.lineage_for_certname')
def test_no_existing_certname(self, mock_lineage_for_certname, unused_get_utility):
mock_config = mock.Mock(certname="one", new_certname="two")
mock_config = mock.Mock(certname="one", new_certname="two",
renewal_configs_dir="/tmp/etc/letsencrypt/renewal/")
mock_lineage_for_certname.return_value = None
self.assertRaises(errors.ConfigurationError,
self._call, mock_config)
self.assertRaises(errors.ConfigurationError, self._call, mock_config)
@mock.patch("certbot.storage.RenewableCert._update_symlinks")
@test_util.patch_get_utility()
@mock.patch("certbot.storage.RenewableCert._check_symlinks")
def test_rename_cert(self, mock_check, unused_get_utility):
@mock.patch("certbot.storage.relevant_values")
def test_rename_cert(self, mock_rv, mock_check, unused_get_utility, unused_update_symlinks):
# Mock relevant_values() to claim that all values are relevant here
# (to avoid instantiating parser)
mock_rv.side_effect = lambda x: x
mock_check.return_value = True
mock_config = self.mock_config
self._call(mock_config)
@ -400,9 +405,15 @@ class RenameLineageTest(BaseCertManagerTest):
self.assertTrue(updated_lineage is not None)
self.assertEqual(updated_lineage.lineagename, mock_config.new_certname)
@mock.patch("certbot.storage.RenewableCert._update_symlinks")
@test_util.patch_get_utility()
@mock.patch("certbot.storage.RenewableCert._check_symlinks")
def test_rename_cert_interactive_certname(self, mock_check, mock_get_utility):
@mock.patch("certbot.storage.relevant_values")
def test_rename_cert_interactive_certname(self, mock_rv, mock_check, mock_get_utility,
unused_update_symlinks):
# python 3.4 and 3.5 order things differently, so remove other.com for this test
os.remove(self.configs["other.com"].filename)
mock_rv.side_effect = lambda x: x
mock_check.return_value = True
mock_config = self.mock_config
mock_config.certname = None