Basic fix for #411

This commit is contained in:
Seth Schoen 2015-09-08 01:33:03 -07:00
parent aed29760a8
commit 7aa9fe845a
6 changed files with 186 additions and 8 deletions

View file

@ -1,7 +1,9 @@
"""Let's Encrypt CLI."""
# TODO: Sanity check all input. Be sure to avoid shell code etc...
import argparse
import atexit
import configobj
import functools
import logging
import logging.handlers
@ -12,6 +14,7 @@ import time
import traceback
import configargparse
import OpenSSL
import zope.component
import zope.interface.exceptions
import zope.interface.verify
@ -27,6 +30,7 @@ from letsencrypt import interfaces
from letsencrypt import le_util
from letsencrypt import log
from letsencrypt import reporter
from letsencrypt import storage
from letsencrypt.display import util as display_util
from letsencrypt.display import ops as display_ops
@ -69,7 +73,7 @@ Choice of server for authentication/installation:
More detailed help:
-h, --help [topic] print this message, or detailed help on a topic;
-h, --help [topic] print this message, or detailed help on a topic;
the available topics are:
all, apache, automation, nginx, paths, security, testing, or any of the
@ -159,7 +163,8 @@ def _init_le_client(args, config, authenticator, installer):
return client.Client(config, acc, authenticator, installer, acme=acme)
def run(args, config, plugins):
def run(args, config, plugins): # pylint: disable=too-many-locals,too-many-branches,too-many-statements
"""Obtain a certificate and install."""
if args.configurator is not None and (args.installer is not None or
args.authenticator is not None):
@ -181,15 +186,109 @@ def run(args, config, plugins):
return "Configurator could not be determined"
domains = _find_domains(args, installer)
domains_set = set(domains)
renew_config = configuration.RenewerConfiguration(config)
# I am not sure whether that correctly reads the systemwide
# configuration file.
configs_dir = renew_config.renewal_configs_dir
identical_names_cert = None
subset_names_cert = None
for renewal_file in os.listdir(configs_dir):
full_path = os.path.join(configs_dir, renewal_file)
rc_config = configobj.ConfigObj(renew_config.renewer_config_file)
rc_config.merge(configobj.ConfigObj(full_path))
rc_config.filename = full_path
cli_config = configuration.RenewerConfiguration(config.namespace)
candidate_lineage = storage.RenewableCert(rc_config, None, cli_config)
# TODO: Handle these differently depending on whether they are
# expired or still valid?
candidate_names = set(candidate_lineage.names())
if candidate_names == domains_set:
identical_names_cert = (renewal_file, candidate_lineage)
elif candidate_names.issubset(domains_set):
subset_names_cert = (renewal_file, candidate_lineage,
candidate_lineage.names())
treat_as_renewal = False
question = None
same_cert_question = "You have an existing certificate that contains "
same_cert_question += "exactly the same domains you requested.\n\n{0}"
same_cert_question += "\n\nDo you want to renew and replace this "
same_cert_question += "certificate with a newly-issued one?"
subset_cert_question = "You have an existing certificate that contains "
subset_cert_question += "a portion of the domains you requested (ref: {0})"
subset_cert_question += "\n\nIt contains these names: {1}\n\nYou "
subset_cert_question += "requested these names for the new certificate: "
subset_cert_question += "{2}.\n\nDo you want to replace this existing "
subset_cert_question += "certificate with the new certificate?"
if identical_names_cert:
question = same_cert_question.format(identical_names_cert[0])
elif subset_names_cert:
question = subset_cert_question.format(subset_names_cert[0],
", ".join(subset_names_cert[2]),
", ".join(domains))
if question:
if zope.component.getUtility(interfaces.IDisplay).yesno(question,
"Replace",
"Cancel"):
treat_as_renewal = True
else:
# TODO: Once the --duplicate (?) option is implemented, this
# exit will be suppressed and we will not treat this
# as a renewal. This should ideally be done by leaving
# question as None above so that we don't even prompt
# the user with the question. (We might say "if question
# and not duplicate_option:" instead of "if question".)
msg = "To obtain a new certificate that {0} an existing "
msg += "certificate in its domain-name coverage, consult the "
msg += "documentation about the --XXX-TODO option."
what = "duplicates" if identical_names_cert else "overlaps with"
msg = msg.format(what)
reporter_util = zope.component.getUtility(interfaces.IReporter)
reporter_util.add_message(msg, reporter_util.HIGH_PRIORITY, True)
sys.exit(1)
# TODO: Handle errors from _init_le_client?
le_client = _init_le_client(args, config, authenticator, installer)
lineage = le_client.obtain_and_enroll_certificate(
domains, authenticator, installer, plugins)
if not lineage:
return "Certificate could not be obtained"
if treat_as_renewal:
# TREAT AS RENEWAL
if identical_names_cert:
lineage = identical_names_cert[1]
else:
lineage = subset_names_cert[1]
# TODO: Use existing privkey instead of generating a new one
new_certr, new_chain, new_key, _ = le_client.obtain_certificate(domains)
# TODO: Check whether it worked!
old_version = lineage.latest_common_version()
lineage.save_successor(old_version,
OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM,
new_certr.body),
new_key.pem,
OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM,
new_chain))
lineage.update_all_links_to(lineage.latest_common_version())
# TODO: Check return value of save_successor
# TODO: Also update lineage renewal config with any relevant
# configuration values from this attempt?
else:
# TREAT AS NEW REQUEST
lineage = le_client.obtain_and_enroll_certificate(
domains, authenticator, installer, plugins)
if not lineage:
return "Certificate could not be obtained"
# TODO: This treats the key as changed even when it wasn't
le_client.deploy_certificate(
domains, lineage.privkey, lineage.cert, lineage.chain)
le_client.enhance_config(domains, args.redirect)
if treat_as_renewal:
display_ops.success_renewal(domains)
else:
display_ops.success_installation(domains)
def auth(args, config, plugins):

View file

@ -378,8 +378,6 @@ class Client(object):
# sites may have been enabled / final cleanup
self.installer.restart()
display_ops.success_installation(domains)
def enhance_config(self, domains, redirect=None):
"""Enhance the configuration.

View file

@ -233,6 +233,26 @@ def success_installation(domains):
pause=False)
def success_renewal(domains):
"""Display a box confirming the renewal of an existing certificate.
.. todo:: This should be centered on the screen
:param list domains: domain names which were renewed
"""
util(interfaces.IDisplay).notification(
"Your existing certificate has been successfully renewed, and the "
"new certificate has been installed.{1}{1}"
"The new certificate covers the following domains: {0}{1}{1}"
"You should test your configuration at:{1}{2}".format(
_gen_https_names(domains),
os.linesep,
os.linesep.join(_gen_ssl_lab_urls(domains))),
height=(14 + len(domains)),
pause=False)
def _gen_ssl_lab_urls(domains):
"""Returns a list of urls.

View file

@ -11,6 +11,7 @@ import pytz
import pyrfc3339
from letsencrypt import constants
from letsencrypt import crypto_util
from letsencrypt import errors
from letsencrypt import le_util
@ -421,6 +422,24 @@ class RenewableCert(object): # pylint: disable=too-many-instance-attributes
"""
return self._notafterbefore(lambda x509: x509.get_notAfter(), version)
def names(self, version=None):
"""What are the subject names of this certificate?
(If no version is specified, use the current version.)
:param int version: the desired version number
:returns: the subject names
:rtype: `list` of `str`
"""
if version is None:
target = self.current_target("cert")
else:
target = self.version("cert", version)
with open(target) as f:
sans = crypto_util.get_sans_from_cert(f.read())
return sans
def should_autodeploy(self):
"""Should this lineage now automatically deploy a newer version?

View file

@ -383,5 +383,27 @@ class SuccessInstallationTest(unittest.TestCase):
self.assertTrue(name in arg)
class SuccessRenewalTest(unittest.TestCase):
# pylint: disable=too-few-public-methods
"""Test the success renewal message."""
@classmethod
def _call(cls, names):
from letsencrypt.display.ops import success_renewal
success_renewal(names)
@mock.patch("letsencrypt.display.ops.util")
def test_success_renewal(self, mock_util):
mock_util().notification.return_value = None
names = ["example.com", "abc.com"]
self._call(names)
self.assertEqual(mock_util().notification.call_count, 1)
arg = mock_util().notification.call_args_list[0][0][0]
for name in names:
self.assertTrue(name in arg)
if __name__ == "__main__":
unittest.main() # pragma: no cover

View file

@ -295,6 +295,26 @@ class RenewableCertTests(unittest.TestCase):
else:
self.assertFalse(self.test_rc.has_pending_deployment())
def test_names(self):
# Trying the current version
test_cert = test_util.load_vector("cert-san.pem")
os.symlink(os.path.join("..", "..", "archive", "example.org",
"cert12.pem"), self.test_rc.cert)
with open(self.test_rc.cert, "w") as f:
f.write(test_cert)
self.assertEqual(self.test_rc.names(),
["example.com", "www.example.com"])
# Trying a non-current version
test_cert = test_util.load_vector("cert.pem")
os.unlink(self.test_rc.cert)
os.symlink(os.path.join("..", "..", "archive", "example.org",
"cert15.pem"), self.test_rc.cert)
with open(self.test_rc.cert, "w") as f:
f.write(test_cert)
self.assertEqual(self.test_rc.names(12),
["example.com", "www.example.com"])
def _test_notafterbefore(self, function, timestamp):
test_cert = test_util.load_vector("cert.pem")
os.symlink(os.path.join("..", "..", "archive", "example.org",