auth use renewal

This commit is contained in:
James Kasten 2015-09-15 22:48:36 -07:00
parent d9cf160aca
commit c025c17b5d
3 changed files with 121 additions and 83 deletions

View file

@ -196,8 +196,101 @@ def _find_duplicative_certs(domains, config, renew_config):
return identical_names_cert, subset_names_cert
def _treat_as_renewal(config, domains):
"""Determine whether or not the call should be treated as a renewal.
:returns: RenewableCert or None if renewal shouldn't occur.
:rtype: :class:`.storage.RenewableCert`
:raises .Error: If the user would like to rerun the client again.
"""
renewal = False
# Considering the possibility that the requested certificate is
# related to an existing certificate. (config.duplicate, which
# is set with --duplicate, skips all of this logic and forces any
# kind of certificate to be obtained with renewal = False.)
if not config.duplicate:
ident_names_cert, subset_names_cert = _find_duplicative_certs(
domains, config, configuration.RenewerConfiguration(config))
# I am not sure whether that correctly reads the systemwide
# configuration file.
question = None
if ident_names_cert is not None:
question = (
"You have an existing certificate that contains exactly the "
"same domains you requested (ref: {0})\n\nDo you want to "
"renew and replace this certificate with a newly-issued one?"
).format(ident_names_cert.configfile.filename)
elif subset_names_cert is not None:
question = (
"You have an existing certificate that contains a portion of "
"the domains you requested (ref: {0})\n\nIt contains these "
"names: {1}\n\nYou requested these names for the new "
"certificate: {2}.\n\nDo you want to replace this existing "
"certificate with the new certificate?"
).format(subset_names_cert.configfile.filename,
", ".join(subset_names_cert.names()),
", ".join(domains))
if question is None:
# We aren't in a duplicative-names situation at all, so we don't
# have to tell or ask the user anything about this.
pass
elif zope.component.getUtility(interfaces.IDisplay).yesno(
question, "Replace", "Cancel"):
renewal = True
else:
reporter_util = zope.component.getUtility(interfaces.IReporter)
reporter_util.add_message(
"To obtain a new certificate that {0} an existing certificate "
"in its domain-name coverage, you must use the --duplicate "
"option.\n\nFor example:\n\n{1} --duplicate {2}".format(
"duplicates" if ident_names_cert is not None else
"overlaps with", sys.argv[0], " ".join(sys.argv[1:])),
reporter_util.HIGH_PRIORITY)
raise errors.Error(
"BUser did not use proper CLI and would like "
"to reinvoke the client.")
if renewal:
return ident_names_cert if ident_names_cert is not None else subset_names_cert
return None
def auth_from_domains(le_client, config, domains, plugins):
"""Authenticate and enroll certificate."""
# Note: This can raise errors... caught above us though.
lineage = _treat_as_renewal(config, domains)
if lineage is not None:
new_certr, new_chain, new_key, _ = le_client.obtain_certificate(domains)
# TODO: Check whether it worked!
lineage.save_successor(
lineage.latest_common_version(), OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, new_certr.body),
new_key.pem, OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, new_chain))
lineage.update_all_links_to(lineage.latest_common_version())
# TODO: Check return value of save_successor
# TODO: Also update lineage renewal config with any relevant
# configuration values from this attempt? - YES
else:
# TREAT AS NEW REQUEST
lineage = le_client.obtain_and_enroll_certificate(
domains, le_client.dv_auth, le_client.installer, plugins)
if not lineage:
raise errors.Error("Certificate could not be obtained")
return lineage
# TODO: Make run as close to auth + install as possible
# Possible difficulties: args.csr was hacked into auth
def run(args, config, plugins): # pylint: disable=too-many-branches,too-many-locals
"""Obtain a certificate and install."""
# Begin authenticator and installer setup
if args.configurator is not None and (args.installer is not None or
args.authenticator is not None):
return ("Either --configurator or --authenticator/--installer"
@ -216,88 +309,28 @@ def run(args, config, plugins): # pylint: disable=too-many-branches,too-many-lo
if installer is None or authenticator is None:
return "Configurator could not be determined"
# End authenticator and installer setup
domains = _find_domains(args, installer)
treat_as_renewal = False
# Considering the possibility that the requested certificate is
# related to an existing certificate. (config.duplicate, which
# is set with --duplicate, skips all of this logic and forces any
# kind of certificate to be obtained with treat_as_renewal = False.)
if not config.duplicate:
identical_names_cert, subset_names_cert = _find_duplicative_certs(
domains, config, configuration.RenewerConfiguration(config))
# I am not sure whether that correctly reads the systemwide
# configuration file.
question = None
if identical_names_cert is not None:
question = (
"You have an existing certificate that contains exactly the "
"same domains you requested (ref: {0})\n\nDo you want to "
"renew and replace this certificate with a newly-issued one?"
).format(identical_names_cert.configfile.filename)
elif subset_names_cert is not None:
question = (
"You have an existing certificate that contains a portion of "
"the domains you requested (ref: {0})\n\nIt contains these "
"names: {1}\n\nYou requested these names for the new "
"certificate: {2}.\n\nDo you want to replace this existing "
"certificate with the new certificate?"
).format(subset_names_cert.configfile.filename,
", ".join(subset_names_cert.names()),
", ".join(domains))
if question is None:
# We aren't in a duplicative-names situation at all, so we don't
# have to tell or ask the user anything about this.
pass
elif zope.component.getUtility(interfaces.IDisplay).yesno(
question, "Replace", "Cancel"):
treat_as_renewal = True
else:
reporter_util = zope.component.getUtility(interfaces.IReporter)
reporter_util.add_message(
"To obtain a new certificate that {0} an existing certificate "
"in its domain-name coverage, you must use the --duplicate "
"option.\n\nFor example:\n\n{1} --duplicate {2}".format(
"duplicates" if identical_names_cert is not None else
"overlaps with", sys.argv[0], " ".join(sys.argv[1:])),
reporter_util.HIGH_PRIORITY)
return 1
# Attempting to obtain the certificate
# TODO: Handle errors from _init_le_client?
le_client = _init_le_client(args, config, authenticator, installer)
if treat_as_renewal:
lineage = identical_names_cert if identical_names_cert is not None else subset_names_cert
# TODO: Use existing privkey instead of generating a new one
new_certr, new_chain, new_key, _ = le_client.obtain_certificate(domains)
# TODO: Check whether it worked!
lineage.save_successor(
lineage.latest_common_version(), OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, new_certr.body),
new_key.pem, OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, new_chain))
lineage.update_all_links_to(lineage.latest_common_version())
# TODO: Check return value of save_successor
# TODO: Also update lineage renewal config with any relevant
# configuration values from this attempt?
le_client.deploy_certificate(
domains, lineage.privkey, lineage.cert, lineage.chain)
display_ops.success_renewal(domains)
else:
# TREAT AS NEW REQUEST
lineage = le_client.obtain_and_enroll_certificate(
domains, authenticator, installer, plugins)
if not lineage:
return "Certificate could not be obtained"
# TODO: This treats the key as changed even when it wasn't
# TODO: We also need to pass the fullchain (for Nginx)
le_client.deploy_certificate(
domains, lineage.privkey, lineage.cert, lineage.chain)
le_client.enhance_config(domains, args.redirect)
try:
lineage = auth_from_domains(le_client, config, domains, plugins)
except errors.Error as err:
return str(err)
# TODO: We also need to pass the fullchain (for Nginx)
le_client.deploy_certificate(
domains, lineage.privkey, lineage.cert, lineage.chain)
le_client.enhance_config(domains, args.redirect)
if lineage.available_versions("cert") == 1:
display_ops.success_installation(domains)
else:
display_ops.success_renewal(domains)
def auth(args, config, plugins):
@ -322,6 +355,7 @@ def auth(args, config, plugins):
# TODO: Handle errors from _init_le_client?
le_client = _init_le_client(args, config, authenticator, installer)
# This is a special case; cert and chain are simply saved
if args.csr is not None:
certr, chain = le_client.obtain_certificate_from_csr(le_util.CSR(
file=args.csr[0], data=args.csr[1], form="der"))
@ -329,9 +363,10 @@ def auth(args, config, plugins):
certr, chain, args.cert_path, args.chain_path)
else:
domains = _find_domains(args, installer)
if not le_client.obtain_and_enroll_certificate(
domains, authenticator, installer, plugins):
return "Certificate could not be obtained"
try:
auth_from_domains(le_client, config, domains, plugins)
except errors.Error as err:
return str(err)
def install(args, config, plugins):

View file

@ -111,6 +111,8 @@ class Client(object):
:ivar .AuthHandler auth_handler: Authorizations handler that will
dispatch DV and Continuity challenges to appropriate
authenticators (providing `.IAuthenticator` interface).
:ivar .IAuthenticator dv_auth: Prepared (`.IAuthenticator.prepare`)
authenticator that can solve the `.constants.DV_CHALLENGES`.
:ivar .IInstaller installer: Installer.
:ivar acme.client.Client acme: Optional ACME client API handle.
You might already have one from `register`.
@ -118,14 +120,10 @@ class Client(object):
"""
def __init__(self, config, account_, dv_auth, installer, acme=None):
"""Initialize a client.
:param .IAuthenticator dv_auth: Prepared (`.IAuthenticator.prepare`)
authenticator that can solve the `.constants.DV_CHALLENGES`.
"""
"""Initialize a client."""
self.config = config
self.account = account_
self.dv_auth = dv_auth
self.installer = installer
# Initialize ACME if account is provided

View file

@ -33,7 +33,12 @@ def fill_with_sample_data(rc_object):
class BaseRenewableCertTest(unittest.TestCase):
"""Base class for setting up Renewable Cert tests.
.. note:: It may be required to write out self.config for
your test. Check :class:`.cli_test.DuplicateCertTest` for an example.
"""
def setUp(self):
from letsencrypt import storage
self.tempdir = tempfile.mkdtemp()