certbot/certbot/renewal.py
Erica Portnoy 81fd0cd32c Implement delete command (#3913)
* organize cert_manager.py

* add delete files to cert manager and storage

* add tests

* add to main and cli

* Clean up all related files we can find, even if some are missing.

* error messages, debug logs, and remove RenewerConfiguration

* add logs for failure to remove

* remove renewer_config_file
2016-12-15 20:23:02 -08:00

359 lines
16 KiB
Python

"""Functionality for autorenewal and associated juggling of configurations"""
from __future__ import print_function
import copy
import logging
import os
import traceback
import six
import zope.component
import OpenSSL
from certbot import cli
from certbot import crypto_util
from certbot import errors
from certbot import interfaces
from certbot import util
from certbot import hooks
from certbot import storage
from certbot.plugins import disco as plugins_disco
logger = logging.getLogger(__name__)
# These are the items which get pulled out of a renewal configuration
# file's renewalparams and actually used in the client configuration
# during the renewal process. We have to record their types here because
# the renewal configuration process loses this information.
STR_CONFIG_ITEMS = ["config_dir", "logs_dir", "work_dir", "user_agent",
"server", "account", "authenticator", "installer",
"standalone_supported_challenges", "renew_hook"]
INT_CONFIG_ITEMS = ["rsa_key_size", "tls_sni_01_port", "http01_port"]
def _reconstitute(config, full_path):
"""Try to instantiate a RenewableCert, updating config with relevant items.
This is specifically for use in renewal and enforces several checks
and policies to ensure that we can try to proceed with the renewal
request. The config argument is modified by including relevant options
read from the renewal configuration file.
:param configuration.NamespaceConfig config: configuration for the
current lineage
:param str full_path: Absolute path to the configuration file that
defines this lineage
:returns: the RenewableCert object or None if a fatal error occurred
:rtype: `storage.RenewableCert` or NoneType
"""
try:
renewal_candidate = storage.RenewableCert(full_path, config)
except (errors.CertStorageError, IOError) as exc:
logger.warning(exc)
logger.warning("Renewal configuration file %s is broken. Skipping.", full_path)
logger.debug("Traceback was:\n%s", traceback.format_exc())
return None
if "renewalparams" not in renewal_candidate.configuration:
logger.warning("Renewal configuration file %s lacks "
"renewalparams. Skipping.", full_path)
return None
renewalparams = renewal_candidate.configuration["renewalparams"]
if "authenticator" not in renewalparams:
logger.warning("Renewal configuration file %s does not specify "
"an authenticator. Skipping.", full_path)
return None
# Now restore specific values along with their data types, if
# those elements are present.
try:
_restore_required_config_elements(config, renewalparams)
_restore_plugin_configs(config, renewalparams)
except (ValueError, errors.Error) as error:
logger.warning(
"An error occurred while parsing %s. The error was %s. "
"Skipping the file.", full_path, error.message)
logger.debug("Traceback was:\n%s", traceback.format_exc())
return None
try:
config.domains = [util.enforce_domain_sanity(d)
for d in renewal_candidate.names()]
except errors.ConfigurationError as error:
logger.warning("Renewal configuration file %s references a cert "
"that contains an invalid domain name. The problem "
"was: %s. Skipping.", full_path, error)
return None
return renewal_candidate
def _restore_webroot_config(config, renewalparams):
"""
webroot_map is, uniquely, a dict, and the general-purpose configuration
restoring logic is not able to correctly parse it from the serialized
form.
"""
if "webroot_map" in renewalparams:
if not cli.set_by_cli("webroot_map"):
config.namespace.webroot_map = renewalparams["webroot_map"]
elif "webroot_path" in renewalparams:
logger.debug("Ancient renewal conf file without webroot-map, restoring webroot-path")
wp = renewalparams["webroot_path"]
if isinstance(wp, str): # prior to 0.1.0, webroot_path was a string
wp = [wp]
config.namespace.webroot_path = wp
def _restore_plugin_configs(config, renewalparams):
"""Sets plugin specific values in config from renewalparams
:param configuration.NamespaceConfig config: configuration for the
current lineage
:param configobj.Section renewalparams: Parameters from the renewal
configuration file that defines this lineage
"""
# Now use parser to get plugin-prefixed items with correct types
# XXX: the current approach of extracting only prefixed items
# related to the actually-used installer and authenticator
# works as long as plugins don't need to read plugin-specific
# variables set by someone else (e.g., assuming Apache
# configurator doesn't need to read webroot_ variables).
# Note: if a parameter that used to be defined in the parser is no
# longer defined, stored copies of that parameter will be
# deserialized as strings by this logic even if they were
# originally meant to be some other type.
if renewalparams["authenticator"] == "webroot":
_restore_webroot_config(config, renewalparams)
plugin_prefixes = []
else:
plugin_prefixes = [renewalparams["authenticator"]]
if renewalparams.get("installer", None) is not None:
plugin_prefixes.append(renewalparams["installer"])
for plugin_prefix in set(plugin_prefixes):
plugin_prefix = plugin_prefix.replace('-', '_')
for config_item, config_value in six.iteritems(renewalparams):
if config_item.startswith(plugin_prefix + "_") and not cli.set_by_cli(config_item):
# Values None, True, and False need to be treated specially,
# As their types aren't handled correctly by configobj
if config_value in ("None", "True", "False"):
# bool("False") == True
# pylint: disable=eval-used
setattr(config.namespace, config_item, eval(config_value))
else:
cast = cli.argparse_type(config_item)
setattr(config.namespace, config_item, cast(config_value))
def _restore_required_config_elements(config, renewalparams):
"""Sets non-plugin specific values in config from renewalparams
:param configuration.NamespaceConfig config: configuration for the
current lineage
:param configobj.Section renewalparams: parameters from the renewal
configuration file that defines this lineage
"""
# string-valued items to add if they're present
for config_item in STR_CONFIG_ITEMS:
if config_item in renewalparams and not cli.set_by_cli(config_item):
value = renewalparams[config_item]
# Unfortunately, we've lost type information from ConfigObj,
# so we don't know if the original was NoneType or str!
if value == "None":
value = None
setattr(config.namespace, config_item, value)
# int-valued items to add if they're present
for config_item in INT_CONFIG_ITEMS:
if config_item in renewalparams and not cli.set_by_cli(config_item):
config_value = renewalparams[config_item]
# the default value for http01_port was None during private beta
if config_item == "http01_port" and config_value == "None":
logger.info("updating legacy http01_port value")
int_value = cli.flag_default("http01_port")
else:
try:
int_value = int(config_value)
except ValueError:
raise errors.Error(
"Expected a numeric value for {0}".format(config_item))
setattr(config.namespace, config_item, int_value)
def should_renew(config, lineage):
"Return true if any of the circumstances for automatic renewal apply."
if config.renew_by_default:
logger.debug("Auto-renewal forced with --force-renewal...")
return True
if lineage.should_autorenew(interactive=True):
logger.info("Cert is due for renewal, auto-renewing...")
return True
if config.dry_run:
logger.info("Cert not due for renewal, but simulating renewal for dry run")
return True
logger.info("Cert not yet due for renewal")
return False
def _avoid_invalidating_lineage(config, lineage, original_server):
"Do not renew a valid cert with one from a staging server!"
# Some lineages may have begun with --staging, but then had production certs
# added to them
latest_cert = OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, open(lineage.cert).read())
# all our test certs are from happy hacker fake CA, though maybe one day
# we should test more methodically
now_valid = "fake" not in repr(latest_cert.get_issuer()).lower()
if util.is_staging(config.server):
if not util.is_staging(original_server) or now_valid:
if not config.break_my_certs:
names = ", ".join(lineage.names())
raise errors.Error(
"You've asked to renew/replace a seemingly valid certificate with "
"a test certificate (domains: {0}). We will not do that "
"unless you use the --break-my-certs flag!".format(names))
def renew_cert(config, le_client, lineage):
"Renew a certificate lineage."
renewal_params = lineage.configuration["renewalparams"]
original_server = renewal_params.get("server", cli.flag_default("server"))
_avoid_invalidating_lineage(config, lineage, original_server)
new_certr, new_chain, new_key, _ = le_client.obtain_certificate(lineage.names())
if config.dry_run:
logger.debug("Dry run: skipping updating lineage at %s",
os.path.dirname(lineage.cert))
else:
prior_version = lineage.latest_common_version()
new_cert = OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, new_certr.body.wrapped)
new_chain = crypto_util.dump_pyopenssl_chain(new_chain)
# TODO: Check return value of save_successor
lineage.save_successor(prior_version, new_cert, new_key.pem, new_chain, config)
lineage.update_all_links_to(lineage.latest_common_version())
hooks.renew_hook(config, lineage.names(), lineage.live_dir)
def report(msgs, category):
"Format a results report for a category of renewal outcomes"
lines = ("%s (%s)" % (m, category) for m in msgs)
return " " + "\n ".join(lines)
def _renew_describe_results(config, renew_successes, renew_failures,
renew_skipped, parse_failures):
out = []
notify = out.append
if config.dry_run:
notify("** DRY RUN: simulating 'certbot renew' close to cert expiry")
notify("** (The test certificates below have not been saved.)")
notify("")
if renew_skipped:
notify("The following certs are not due for renewal yet:")
notify(report(renew_skipped, "skipped"))
if not renew_successes and not renew_failures:
notify("No renewals were attempted.")
elif renew_successes and not renew_failures:
notify("Congratulations, all renewals succeeded. The following certs "
"have been renewed:")
notify(report(renew_successes, "success"))
elif renew_failures and not renew_successes:
notify("All renewal attempts failed. The following certs could not be "
"renewed:")
notify(report(renew_failures, "failure"))
elif renew_failures and renew_successes:
notify("The following certs were successfully renewed:")
notify(report(renew_successes, "success"))
notify("\nThe following certs could not be renewed:")
notify(report(renew_failures, "failure"))
if parse_failures:
notify("\nAdditionally, the following renewal configuration files "
"were invalid: ")
notify(report(parse_failures, "parsefail"))
if config.dry_run:
notify("** DRY RUN: simulating 'certbot renew' close to cert expiry")
notify("** (The test certificates above have not been saved.)")
if config.quiet and not (renew_failures or parse_failures):
return
print("\n".join(out))
def handle_renewal_request(config):
"""Examine each lineage; renew if due and report results"""
# This is trivially False if config.domains is empty
if any(domain not in config.webroot_map for domain in config.domains):
# If more plugins start using cli.add_domains,
# we may want to only log a warning here
raise errors.Error("Currently, the renew verb is capable of either "
"renewing all installed certificates that are due "
"to be renewed or renewing a single certificate specified "
"by its name. If you would like to renew specific "
"certificates by their domains, use the certonly "
"command. The renew verb may provide other options "
"for selecting certificates to renew in the future.")
if config.certname:
conf_files = [storage.renewal_file_for_certname(config, config.certname)]
else:
conf_files = storage.renewal_conf_files(config)
renew_successes = []
renew_failures = []
renew_skipped = []
parse_failures = []
for renewal_file in conf_files:
disp = zope.component.getUtility(interfaces.IDisplay)
disp.notification("Processing " + renewal_file, pause=False)
lineage_config = copy.deepcopy(config)
# Note that this modifies config (to add back the configuration
# elements from within the renewal configuration file).
try:
renewal_candidate = _reconstitute(lineage_config, renewal_file)
except Exception as e: # pylint: disable=broad-except
logger.warning("Renewal configuration file %s produced an "
"unexpected error: %s. Skipping.", renewal_file, e)
logger.debug("Traceback was:\n%s", traceback.format_exc())
parse_failures.append(renewal_file)
continue
try:
if renewal_candidate is None:
parse_failures.append(renewal_file)
else:
# XXX: ensure that each call here replaces the previous one
zope.component.provideUtility(lineage_config)
renewal_candidate.ensure_deployed()
if should_renew(lineage_config, renewal_candidate):
plugins = plugins_disco.PluginsRegistry.find_all()
from certbot import main
main.obtain_cert(lineage_config, plugins, renewal_candidate)
renew_successes.append(renewal_candidate.fullchain)
else:
renew_skipped.append(renewal_candidate.fullchain)
except Exception as e: # pylint: disable=broad-except
# obtain_cert (presumably) encountered an unanticipated problem.
logger.warning("Attempting to renew cert from %s produced an "
"unexpected error: %s. Skipping.", renewal_file, e)
logger.debug("Traceback was:\n%s", traceback.format_exc())
renew_failures.append(renewal_candidate.fullchain)
# Describe all the results
_renew_describe_results(config, renew_successes, renew_failures,
renew_skipped, parse_failures)
if renew_failures or parse_failures:
raise errors.Error("{0} renew failure(s), {1} parse failure(s)".format(
len(renew_failures), len(parse_failures)))
else:
logger.debug("no renewal failures")