Merge remote-tracking branch 'letsencrypt/master'

This commit is contained in:
Amjad Mashaal 2016-03-25 01:32:39 +02:00
commit 5df0a1fa66
60 changed files with 1411 additions and 1182 deletions

View file

@ -38,7 +38,7 @@ load-plugins=linter_plugin
# --enable=similarities". If you want to run only the classes checker, but have
# no Warning level messages displayed, use"--disable=all --enable=classes
# --disable=W"
disable=fixme,locally-disabled,abstract-class-not-used,abstract-class-little-used,bad-continuation,too-few-public-methods,no-self-use,invalid-name,too-many-instance-attributes
disable=fixme,locally-disabled,abstract-class-not-used,abstract-class-little-used,bad-continuation,too-few-public-methods,no-self-use,invalid-name,too-many-instance-attributes,cyclic-import
# abstract-class-not-used cannot be disabled locally (at least in
# pylint 1.4.1), same for abstract-class-little-used

View file

@ -18,16 +18,17 @@ The Let's Encrypt Client is a fully-featured, extensible client for the Let's
Encrypt CA (or any other CA that speaks the `ACME
<https://github.com/ietf-wg-acme/acme/blob/master/draft-ietf-acme-acme.md>`_
protocol) that can automate the tasks of obtaining certificates and
configuring webservers to use them.
configuring webservers to use them. This client runs on Unix-based operating
systems.
Installation
------------
If ``letsencrypt`` is packaged for your OS, you can install it from there, and
run it by typing ``letsencrypt``. Because not all operating systems have
packages yet, we provide a temporary solution via the ``letsencrypt-auto``
wrapper script, which obtains some dependencies from your OS and puts others
in a python virtual environment::
If ``letsencrypt`` is packaged for your Unix OS, you can install it from
there, and run it by typing ``letsencrypt``. Because not all operating
systems have packages yet, we provide a temporary solution via the
``letsencrypt-auto`` wrapper script, which obtains some dependencies
from your OS and puts others in a python virtual environment::
user@webserver:~$ git clone https://github.com/letsencrypt/letsencrypt
user@webserver:~$ cd letsencrypt

View file

@ -545,6 +545,7 @@ class ApacheConfigurator(augeas_configurator.AugeasConfigurator):
paths = self.aug.match(
("/files%s//*[label()=~regexp('%s')]" %
(vhost_path, parser.case_i("VirtualHost"))))
paths = [path for path in paths if os.path.basename(path) == "VirtualHost"]
for path in paths:
new_vhost = self._create_vhost(path)
realpath = os.path.realpath(new_vhost.filep)

View file

@ -83,7 +83,8 @@ def _vhost_menu(domain, vhosts):
code, tag = zope.component.getUtility(interfaces.IDisplay).menu(
"We were unable to find a vhost with a ServerName "
"or Address of {0}.{1}Which virtual host would you "
"like to choose?".format(domain, os.linesep),
"like to choose?\n(note: conf files with multiple "
"vhosts are not yet supported)".format(domain, os.linesep),
choices, help_label="More Info", ok_label="Select")
except errors.MissingCommandlineFlag as e:
msg = ("Failed to run Apache plugin non-interactively{1}{0}{1}"

View file

@ -20,7 +20,7 @@ class AugeasConfiguratorTest(util.ApacheTest):
self.config_path, self.vhost_path, self.config_dir, self.work_dir)
self.vh_truth = util.get_vh_truth(
self.temp_dir, "debian_apache_2_4/two_vhost_80")
self.temp_dir, "debian_apache_2_4/multiple_vhosts")
def tearDown(self):
shutil.rmtree(self.config_dir)

View file

@ -20,17 +20,17 @@ from letsencrypt_apache import obj
from letsencrypt_apache.tests import util
class TwoVhost80Test(util.ApacheTest):
class MultipleVhostsTest(util.ApacheTest):
"""Test two standard well-configured HTTP vhosts."""
def setUp(self): # pylint: disable=arguments-differ
super(TwoVhost80Test, self).setUp()
super(MultipleVhostsTest, self).setUp()
self.config = util.get_apache_configurator(
self.config_path, self.vhost_path, self.config_dir, self.work_dir)
self.config = self.mock_deploy_cert(self.config)
self.vh_truth = util.get_vh_truth(
self.temp_dir, "debian_apache_2_4/two_vhost_80")
self.temp_dir, "debian_apache_2_4/multiple_vhosts")
def mock_deploy_cert(self, config):
"""A test for a mock deploy cert"""

View file

@ -20,7 +20,7 @@ class SelectVhostTest(unittest.TestCase):
zope.component.provideUtility(display_util.FileDisplay(sys.stdout))
self.base_dir = "/example_path"
self.vhosts = util.get_vh_truth(
self.base_dir, "debian_apache_2_4/two_vhost_80")
self.base_dir, "debian_apache_2_4/multiple_vhosts")
@classmethod
def _call(cls, vhosts):

View file

@ -193,7 +193,7 @@ class ParserInitTest(util.ApacheTest):
"update_runtime_variables"):
path = os.path.join(
self.temp_dir,
"debian_apache_2_4/////two_vhost_80/../two_vhost_80/apache2")
"debian_apache_2_4/////multiple_vhosts/../multiple_vhosts/apache2")
parser = ApacheParser(self.aug, path,
"/dummy/vhostpath")

View file

@ -22,9 +22,9 @@ from letsencrypt_apache import obj
class ApacheTest(unittest.TestCase): # pylint: disable=too-few-public-methods
def setUp(self, test_dir="debian_apache_2_4/two_vhost_80",
config_root="debian_apache_2_4/two_vhost_80/apache2",
vhost_root="debian_apache_2_4/two_vhost_80/apache2/sites-available"):
def setUp(self, test_dir="debian_apache_2_4/multiple_vhosts",
config_root="debian_apache_2_4/multiple_vhosts/apache2",
vhost_root="debian_apache_2_4/multiple_vhosts/apache2/sites-available"):
# pylint: disable=arguments-differ
super(ApacheTest, self).setUp()
@ -59,9 +59,9 @@ class ApacheTest(unittest.TestCase): # pylint: disable=too-few-public-methods
class ParserTest(ApacheTest): # pytlint: disable=too-few-public-methods
def setUp(self, test_dir="debian_apache_2_4/two_vhost_80",
config_root="debian_apache_2_4/two_vhost_80/apache2",
vhost_root="debian_apache_2_4/two_vhost_80/apache2/sites-available"):
def setUp(self, test_dir="debian_apache_2_4/multiple_vhosts",
config_root="debian_apache_2_4/multiple_vhosts/apache2",
vhost_root="debian_apache_2_4/multiple_vhosts/apache2/sites-available"):
super(ParserTest, self).setUp(test_dir, config_root, vhost_root)
zope.component.provideUtility(display_util.FileDisplay(sys.stdout))
@ -116,7 +116,7 @@ def get_apache_configurator(
def get_vh_truth(temp_dir, config_name):
"""Return the ground truth for the specified directory."""
if config_name == "debian_apache_2_4/two_vhost_80":
if config_name == "debian_apache_2_4/multiple_vhosts":
prefix = os.path.join(
temp_dir, config_name, "apache2/sites-available")
aug_pre = "/files" + prefix

View file

@ -52,9 +52,8 @@ class AuthHandler(object):
:param bool best_effort: Whether or not all authorizations are
required (this is useful in renewal)
:returns: tuple of lists of authorization resources. Takes the
form of (`completed`, `failed`)
:rtype: tuple
:returns: List of authorization resources
:rtype: list
:raises .AuthorizationError: If unable to retrieve all
authorizations
@ -76,9 +75,16 @@ class AuthHandler(object):
# Just make sure all decisions are complete.
self.verify_authzr_complete()
# Only return valid authorizations
return [authzr for authzr in self.authzr.values()
if authzr.body.status == messages.STATUS_VALID]
retVal = [authzr for authzr in self.authzr.values()
if authzr.body.status == messages.STATUS_VALID]
if not retVal:
raise errors.AuthorizationError(
"Challenges failed for all domains")
return retVal
def _choose_challenges(self, domains):
"""Retrieve necessary challenges to satisfy server."""
@ -175,9 +181,11 @@ class AuthHandler(object):
chall_update[domain].remove(achall)
# We failed some challenges... damage control
else:
# Right now... just assume a loss and carry on...
if best_effort:
comp_domains.add(domain)
logger.warning(
"Challenge failed for domain %s",
domain)
else:
all_failed_achalls.update(
updated for _, updated in failed_achalls)

File diff suppressed because it is too large Load diff

View file

@ -146,8 +146,7 @@ def perform_registration(acme, config):
try:
return acme.register(messages.NewRegistration.from_data(email=config.email))
except messages.Error as e:
err = repr(e)
if "MX record" in err or "Validation of contact mailto" in err:
if e.typ == "urn:acme:error:invalidEmail":
config.namespace.email = display_ops.get_email(more=True, invalid=True)
return perform_registration(acme, config)
else:
@ -189,7 +188,7 @@ class Client(object):
self.auth_handler = None
def obtain_certificate_from_csr(self, domains, csr,
typ=OpenSSL.crypto.FILETYPE_ASN1):
typ=OpenSSL.crypto.FILETYPE_ASN1, authzr=None):
"""Obtain certificate.
Internal function with precondition that `domains` are
@ -199,6 +198,8 @@ class Client(object):
:param .le_util.CSR csr: DER-encoded Certificate Signing
Request. The key used to generate this CSR can be different
than `authkey`.
:param list authzr: List of
:class:`acme.messages.AuthorizationResource`
:returns: `.CertificateResource` and certificate chain (as
returned by `.fetch_chain`).
@ -215,14 +216,15 @@ class Client(object):
logger.debug("CSR: %s, domains: %s", csr, domains)
authzr = self.auth_handler.get_authorizations(domains)
if authzr is None:
authzr = self.auth_handler.get_authorizations(domains)
certr = self.acme.request_issuance(
jose.ComparableX509(
OpenSSL.crypto.load_certificate_request(typ, csr.data)),
authzr)
authzr)
return certr, self.acme.fetch_chain(certr)
def obtain_certificate(self, domains):
"""Obtains a certificate from the ACME server.
@ -237,12 +239,20 @@ class Client(object):
:rtype: tuple
"""
authzr = self.auth_handler.get_authorizations(
domains,
self.config.allow_subset_of_names)
domains = [a.body.identifier.value.encode('ascii')
for a in authzr]
# Create CSR from names
key = crypto_util.init_save_key(
self.config.rsa_key_size, self.config.key_dir)
csr = crypto_util.init_save_csr(key, domains, self.config.csr_dir)
return self.obtain_certificate_from_csr(domains, csr) + (key, csr)
return (self.obtain_certificate_from_csr(domains, csr, authzr=authzr)
+ (key, csr))
def obtain_and_enroll_certificate(self, domains):
"""Obtain and enroll certificate.

714
letsencrypt/main.py Normal file
View file

@ -0,0 +1,714 @@
"""Let's Encrypt main entry point."""
from __future__ import print_function
import atexit
import functools
import logging.handlers
import os
import sys
import time
import traceback
import OpenSSL
import zope.component
from acme import jose
import letsencrypt
from letsencrypt import account
from letsencrypt import client
from letsencrypt import cli
from letsencrypt import crypto_util
from letsencrypt import colored_logging
from letsencrypt import configuration
from letsencrypt import constants
from letsencrypt import errors
from letsencrypt import interfaces
from letsencrypt import le_util
from letsencrypt import log
from letsencrypt import reporter
from letsencrypt import renewal
from letsencrypt import storage
from letsencrypt.display import util as display_util, ops as display_ops
from letsencrypt.plugins import disco as plugins_disco
logger = logging.getLogger(__name__)
def _suggest_donation_if_appropriate(config, action):
"""Potentially suggest a donation to support Let's Encrypt."""
if config.staging or config.verb == "renew":
# --dry-run implies --staging
return
if action not in ["renew", "newcert"]:
return
reporter_util = zope.component.getUtility(interfaces.IReporter)
msg = ("If you like Let's Encrypt, please consider supporting our work by:\n\n"
"Donating to ISRG / Let's Encrypt: https://letsencrypt.org/donate\n"
"Donating to EFF: https://eff.org/donate-le\n\n")
reporter_util.add_message(msg, reporter_util.LOW_PRIORITY)
def _avoid_invalidating_lineage(config, lineage, original_server):
"Do not renew a valid cert with one from a staging server!"
def _is_staging(srv):
return srv == constants.STAGING_URI or "staging" in srv
# Some lineages may have begun with --staging, but then had production certs
# added to them
latest_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
open(lineage.cert).read())
# all our test certs are from happy hacker fake CA, though maybe one day
# we should test more methodically
now_valid = "fake" not in repr(latest_cert.get_issuer()).lower()
if _is_staging(config.server):
if not _is_staging(original_server) or now_valid:
if not config.break_my_certs:
names = ", ".join(lineage.names())
raise errors.Error(
"You've asked to renew/replace a seemingly valid certificate with "
"a test certificate (domains: {0}). We will not do that "
"unless you use the --break-my-certs flag!".format(names))
def _report_successful_dry_run(config):
reporter_util = zope.component.getUtility(interfaces.IReporter)
if config.verb != "renew":
reporter_util.add_message("The dry run was successful.",
reporter_util.HIGH_PRIORITY, on_crash=False)
def _auth_from_domains(le_client, config, domains, lineage=None):
"""Authenticate and enroll certificate."""
# Note: This can raise errors... caught above us though. This is now
# a three-way case: reinstall (which results in a no-op here because
# although there is a relevant lineage, we don't do anything to it
# inside this function -- we don't obtain a new certificate), renew
# (which results in treating the request as a renewal), or newcert
# (which results in treating the request as a new certificate request).
# If lineage is specified, use that one instead of looking around for
# a matching one.
if lineage is None:
# This will find a relevant matching lineage that exists
action, lineage = _treat_as_renewal(config, domains)
else:
# Renewal, where we already know the specific lineage we're
# interested in
action = "renew"
if action == "reinstall":
# The lineage already exists; allow the caller to try installing
# it without getting a new certificate at all.
return lineage, "reinstall"
elif action == "renew":
original_server = lineage.configuration["renewalparams"]["server"]
_avoid_invalidating_lineage(config, lineage, original_server)
# TODO: schoen wishes to reuse key - discussion
# https://github.com/letsencrypt/letsencrypt/pull/777/files#r40498574
new_certr, new_chain, new_key, _ = le_client.obtain_certificate(domains)
# TODO: Check whether it worked! <- or make sure errors are thrown (jdk)
if config.dry_run:
logger.info("Dry run: skipping updating lineage at %s",
os.path.dirname(lineage.cert))
else:
lineage.save_successor(
lineage.latest_common_version(), OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, new_certr.body.wrapped),
new_key.pem, crypto_util.dump_pyopenssl_chain(new_chain),
configuration.RenewerConfiguration(config.namespace))
lineage.update_all_links_to(lineage.latest_common_version())
# TODO: Check return value of save_successor
# TODO: Also update lineage renewal config with any relevant
# configuration values from this attempt? <- Absolutely (jdkasten)
elif action == "newcert":
# TREAT AS NEW REQUEST
lineage = le_client.obtain_and_enroll_certificate(domains)
if lineage is False:
raise errors.Error("Certificate could not be obtained")
if not config.dry_run and not config.verb == "renew":
_report_new_cert(lineage.cert, lineage.fullchain)
return lineage, action
def _handle_subset_cert_request(config, domains, cert):
"""Figure out what to do if a previous cert had a subset of the names now requested
:param storage.RenewableCert cert:
:returns: Tuple of (string, cert_or_None) as per _treat_as_renewal
:rtype: tuple
"""
existing = ", ".join(cert.names())
question = (
"You have an existing certificate that contains a portion of "
"the domains you requested (ref: {0}){br}{br}It contains these "
"names: {1}{br}{br}You requested these names for the new "
"certificate: {2}.{br}{br}Do you want to expand and replace this existing "
"certificate with the new certificate?"
).format(cert.configfile.filename,
existing,
", ".join(domains),
br=os.linesep)
if config.expand or config.renew_by_default or zope.component.getUtility(
interfaces.IDisplay).yesno(question, "Expand", "Cancel",
cli_flag="--expand"):
return "renew", cert
else:
reporter_util = zope.component.getUtility(interfaces.IReporter)
reporter_util.add_message(
"To obtain a new certificate that contains these names without "
"replacing your existing certificate for {0}, you must use the "
"--duplicate option.{br}{br}"
"For example:{br}{br}{1} --duplicate {2}".format(
existing,
sys.argv[0], " ".join(sys.argv[1:]),
br=os.linesep
),
reporter_util.HIGH_PRIORITY)
raise errors.Error(
"User chose to cancel the operation and may "
"reinvoke the client.")
def _handle_identical_cert_request(config, cert):
"""Figure out what to do if a cert has the same names as a previously obtained one
:param storage.RenewableCert cert:
:returns: Tuple of (string, cert_or_None) as per _treat_as_renewal
:rtype: tuple
"""
if renewal.should_renew(config, cert):
return "renew", cert
if config.reinstall:
# Set with --reinstall, force an identical certificate to be
# reinstalled without further prompting.
return "reinstall", cert
question = (
"You have an existing certificate that contains exactly the same "
"domains you requested and isn't close to expiry."
"{br}(ref: {0}){br}{br}What would you like to do?"
).format(cert.configfile.filename, br=os.linesep)
if config.verb == "run":
keep_opt = "Attempt to reinstall this existing certificate"
elif config.verb == "certonly":
keep_opt = "Keep the existing certificate for now"
choices = [keep_opt,
"Renew & replace the cert (limit ~5 per 7 days)"]
display = zope.component.getUtility(interfaces.IDisplay)
response = display.menu(question, choices, "OK", "Cancel", default=0)
if response[0] == display_util.CANCEL:
# TODO: Add notification related to command-line options for
# skipping the menu for this case.
raise errors.Error(
"User chose to cancel the operation and may "
"reinvoke the client.")
elif response[1] == 0:
return "reinstall", cert
elif response[1] == 1:
return "renew", cert
else:
assert False, "This is impossible"
def _treat_as_renewal(config, domains):
"""Determine whether there are duplicated names and how to handle
them (renew, reinstall, newcert, or raising an error to stop
the client run if the user chooses to cancel the operation when
prompted).
:returns: Two-element tuple containing desired new-certificate behavior as
a string token ("reinstall", "renew", or "newcert"), plus either
a RenewableCert instance or None if renewal shouldn't occur.
:raises .Error: If the user would like to rerun the client again.
"""
# Considering the possibility that the requested certificate is
# related to an existing certificate. (config.duplicate, which
# is set with --duplicate, skips all of this logic and forces any
# kind of certificate to be obtained with renewal = False.)
if config.duplicate:
return "newcert", None
# TODO: Also address superset case
ident_names_cert, subset_names_cert = _find_duplicative_certs(config, domains)
# XXX ^ schoen is not sure whether that correctly reads the systemwide
# configuration file.
if ident_names_cert is None and subset_names_cert is None:
return "newcert", None
if ident_names_cert is not None:
return _handle_identical_cert_request(config, ident_names_cert)
elif subset_names_cert is not None:
return _handle_subset_cert_request(config, domains, subset_names_cert)
def _find_duplicative_certs(config, domains):
"""Find existing certs that duplicate the request."""
identical_names_cert, subset_names_cert = None, None
cli_config = configuration.RenewerConfiguration(config)
configs_dir = cli_config.renewal_configs_dir
# Verify the directory is there
le_util.make_or_verify_dir(configs_dir, mode=0o755, uid=os.geteuid())
for renewal_file in renewal.renewal_conf_files(cli_config):
try:
candidate_lineage = storage.RenewableCert(renewal_file, cli_config)
except (errors.CertStorageError, IOError):
logger.warning("Renewal conf file %s is broken. Skipping.", renewal_file)
logger.debug("Traceback was:\n%s", traceback.format_exc())
continue
# TODO: Handle these differently depending on whether they are
# expired or still valid?
candidate_names = set(candidate_lineage.names())
if candidate_names == set(domains):
identical_names_cert = candidate_lineage
elif candidate_names.issubset(set(domains)):
# This logic finds and returns the largest subset-names cert
# in the case where there are several available.
if subset_names_cert is None:
subset_names_cert = candidate_lineage
elif len(candidate_names) > len(subset_names_cert.names()):
subset_names_cert = candidate_lineage
return identical_names_cert, subset_names_cert
def _find_domains(config, installer):
if not config.domains:
domains = display_ops.choose_names(installer)
# record in config.domains (so that it can be serialised in renewal config files),
# and set webroot_map entries if applicable
for d in domains:
cli.process_domain(config, d)
else:
domains = config.domains
if not domains:
raise errors.Error("Please specify --domains, or --installer that "
"will help in domain names autodiscovery")
return domains
def _report_new_cert(cert_path, fullchain_path):
"""Reports the creation of a new certificate to the user.
:param str cert_path: path to cert
:param str fullchain_path: path to full chain
"""
expiry = crypto_util.notAfter(cert_path).date()
reporter_util = zope.component.getUtility(interfaces.IReporter)
if fullchain_path:
# Print the path to fullchain.pem because that's what modern webservers
# (Nginx and Apache2.4) will want.
and_chain = "and chain have"
path = fullchain_path
else:
# Unless we're in .csr mode and there really isn't one
and_chain = "has "
path = cert_path
# XXX Perhaps one day we could detect the presence of known old webservers
# and say something more informative here.
msg = ("Congratulations! Your certificate {0} been saved at {1}."
" Your cert will expire on {2}. To obtain a new version of the "
"certificate in the future, simply run Let's Encrypt again."
.format(and_chain, path, expiry))
reporter_util.add_message(msg, reporter_util.MEDIUM_PRIORITY)
def _determine_account(config):
"""Determine which account to use.
In order to make the renewer (configuration de/serialization) happy,
if ``config.account`` is ``None``, it will be updated based on the
user input. Same for ``config.email``.
:param argparse.Namespace config: CLI arguments
:param letsencrypt.interface.IConfig config: Configuration object
:param .AccountStorage account_storage: Account storage.
:returns: Account and optionally ACME client API (biproduct of new
registration).
:rtype: `tuple` of `letsencrypt.account.Account` and
`acme.client.Client`
"""
account_storage = account.AccountFileStorage(config)
acme = None
if config.account is not None:
acc = account_storage.load(config.account)
else:
accounts = account_storage.find_all()
if len(accounts) > 1:
acc = display_ops.choose_account(accounts)
elif len(accounts) == 1:
acc = accounts[0]
else: # no account registered yet
if config.email is None and not config.register_unsafely_without_email:
config.namespace.email = display_ops.get_email()
def _tos_cb(regr):
if config.tos:
return True
msg = ("Please read the Terms of Service at {0}. You "
"must agree in order to register with the ACME "
"server at {1}".format(
regr.terms_of_service, config.server))
obj = zope.component.getUtility(interfaces.IDisplay)
return obj.yesno(msg, "Agree", "Cancel", cli_flag="--agree-tos")
try:
acc, acme = client.register(
config, account_storage, tos_cb=_tos_cb)
except errors.MissingCommandlineFlag:
raise
except errors.Error as error:
logger.debug(error, exc_info=True)
raise errors.Error(
"Unable to register an account with ACME server")
config.namespace.account = acc.id
return acc, acme
def _init_le_client(config, authenticator, installer):
if authenticator is not None:
# if authenticator was given, then we will need account...
acc, acme = _determine_account(config)
logger.debug("Picked account: %r", acc)
# XXX
#crypto_util.validate_key_csr(acc.key)
else:
acc, acme = None, None
return client.Client(config, acc, authenticator, installer, acme=acme)
def install(config, plugins):
"""Install a previously obtained cert in a server."""
# XXX: Update for renewer/RenewableCert
# FIXME: be consistent about whether errors are raised or returned from
# this function ...
try:
installer, _ = cli.choose_configurator_plugins(config, plugins, "install")
except errors.PluginSelectionError as e:
return e.message
domains = _find_domains(config, installer)
le_client = _init_le_client(config, authenticator=None, installer=installer)
assert config.cert_path is not None # required=True in the subparser
le_client.deploy_certificate(
domains, config.key_path, config.cert_path, config.chain_path,
config.fullchain_path)
le_client.enhance_config(domains, config)
def plugins_cmd(config, plugins): # TODO: Use IDisplay rather than print
"""List server software plugins."""
logger.debug("Expected interfaces: %s", config.ifaces)
ifaces = [] if config.ifaces is None else config.ifaces
filtered = plugins.visible().ifaces(ifaces)
logger.debug("Filtered plugins: %r", filtered)
if not config.init and not config.prepare:
print(str(filtered))
return
filtered.init(config)
verified = filtered.verify(ifaces)
logger.debug("Verified plugins: %r", verified)
if not config.prepare:
print(str(verified))
return
verified.prepare()
available = verified.available()
logger.debug("Prepared plugins: %s", available)
print(str(available))
def rollback(config, plugins):
"""Rollback server configuration changes made during install."""
client.rollback(config.installer, config.checkpoints, config, plugins)
def config_changes(config, unused_plugins):
"""Show changes made to server config during installation
View checkpoints and associated configuration changes.
"""
client.view_config_changes(config, num=config.num)
def revoke(config, unused_plugins): # TODO: coop with renewal config
"""Revoke a previously obtained certificate."""
# For user-agent construction
config.namespace.installer = config.namespace.authenticator = "None"
if config.key_path is not None: # revocation by cert key
logger.debug("Revoking %s using cert key %s",
config.cert_path[0], config.key_path[0])
key = jose.JWK.load(config.key_path[1])
else: # revocation by account key
logger.debug("Revoking %s using Account Key", config.cert_path[0])
acc, _ = _determine_account(config)
key = acc.key
acme = client.acme_from_config_key(config, key)
cert = crypto_util.pyopenssl_load_certificate(config.cert_path[1])[0]
acme.revoke(jose.ComparableX509(cert))
def run(config, plugins): # pylint: disable=too-many-branches,too-many-locals
"""Obtain a certificate and install."""
# TODO: Make run as close to auth + install as possible
# Possible difficulties: config.csr was hacked into auth
try:
installer, authenticator = cli.choose_configurator_plugins(config, plugins, "run")
except errors.PluginSelectionError as e:
return e.message
domains = _find_domains(config, installer)
# TODO: Handle errors from _init_le_client?
le_client = _init_le_client(config, authenticator, installer)
lineage, action = _auth_from_domains(le_client, config, domains)
le_client.deploy_certificate(
domains, lineage.privkey, lineage.cert,
lineage.chain, lineage.fullchain)
le_client.enhance_config(domains, config)
if len(lineage.available_versions("cert")) == 1:
display_ops.success_installation(domains)
else:
display_ops.success_renewal(domains, action)
_suggest_donation_if_appropriate(config, action)
def obtain_cert(config, plugins, lineage=None):
"""Implements "certonly": authenticate & obtain cert, but do not install it."""
# pylint: disable=too-many-locals
try:
# installers are used in auth mode to determine domain names
installer, authenticator = cli.choose_configurator_plugins(config, plugins, "certonly")
except errors.PluginSelectionError as e:
logger.info("Could not choose appropriate plugin: %s", e)
raise
# TODO: Handle errors from _init_le_client?
le_client = _init_le_client(config, authenticator, installer)
action = "newcert"
# This is a special case; cert and chain are simply saved
if config.csr is not None:
assert lineage is None, "Did not expect a CSR with a RenewableCert"
csr, typ = config.actual_csr
certr, chain = le_client.obtain_certificate_from_csr(config.domains, csr, typ)
if config.dry_run:
logger.info(
"Dry run: skipping saving certificate to %s", config.cert_path)
else:
cert_path, _, cert_fullchain = le_client.save_certificate(
certr, chain, config.cert_path, config.chain_path, config.fullchain_path)
_report_new_cert(cert_path, cert_fullchain)
else:
domains = _find_domains(config, installer)
_, action = _auth_from_domains(le_client, config, domains, lineage)
if config.dry_run:
_report_successful_dry_run(config)
elif config.verb == "renew":
if installer is None:
# Tell the user that the server was not restarted.
print("new certificate deployed without reload, fullchain is",
lineage.fullchain)
else:
# In case of a renewal, reload server to pick up new certificate.
# In principle we could have a configuration option to inhibit this
# from happening.
installer.restart()
print("new certificate deployed with reload of",
config.installer, "server; fullchain is", lineage.fullchain)
_suggest_donation_if_appropriate(config, action)
def renew(config, unused_plugins):
"""Renew previously-obtained certificates."""
renewal.renew_all_lineages(config)
def setup_log_file_handler(config, logfile, fmt):
"""Setup file debug logging."""
log_file_path = os.path.join(config.logs_dir, logfile)
handler = logging.handlers.RotatingFileHandler(
log_file_path, maxBytes=2 ** 20, backupCount=10)
# rotate on each invocation, rollover only possible when maxBytes
# is nonzero and backupCount is nonzero, so we set maxBytes as big
# as possible not to overrun in single CLI invocation (1MB).
handler.doRollover() # TODO: creates empty letsencrypt.log.1 file
handler.setLevel(logging.DEBUG)
handler_formatter = logging.Formatter(fmt=fmt)
handler_formatter.converter = time.gmtime # don't use localtime
handler.setFormatter(handler_formatter)
return handler, log_file_path
def _cli_log_handler(config, level, fmt):
if config.text_mode or config.noninteractive_mode or config.verb == "renew":
handler = colored_logging.StreamHandler()
handler.setFormatter(logging.Formatter(fmt))
else:
handler = log.DialogHandler()
# dialog box is small, display as less as possible
handler.setFormatter(logging.Formatter("%(message)s"))
handler.setLevel(level)
return handler
def setup_logging(config, cli_handler_factory, logfile):
"""Setup logging."""
fmt = "%(asctime)s:%(levelname)s:%(name)s:%(message)s"
level = -config.verbose_count * 10
file_handler, log_file_path = setup_log_file_handler(
config, logfile=logfile, fmt=fmt)
cli_handler = cli_handler_factory(config, level, fmt)
# TODO: use fileConfig?
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG) # send all records to handlers
root_logger.addHandler(cli_handler)
root_logger.addHandler(file_handler)
logger.debug("Root logging level set at %d", level)
logger.info("Saving debug log to %s", log_file_path)
def _handle_exception(exc_type, exc_value, trace, config):
"""Logs exceptions and reports them to the user.
Config is used to determine how to display exceptions to the user. In
general, if config.debug is True, then the full exception and traceback is
shown to the user, otherwise it is suppressed. If config itself is None,
then the traceback and exception is attempted to be written to a logfile.
If this is successful, the traceback is suppressed, otherwise it is shown
to the user. sys.exit is always called with a nonzero status.
"""
logger.debug(
"Exiting abnormally:%s%s",
os.linesep,
"".join(traceback.format_exception(exc_type, exc_value, trace)))
if issubclass(exc_type, Exception) and (config is None or not config.debug):
if config is None:
logfile = "letsencrypt.log"
try:
with open(logfile, "w") as logfd:
traceback.print_exception(
exc_type, exc_value, trace, file=logfd)
except: # pylint: disable=bare-except
sys.exit("".join(
traceback.format_exception(exc_type, exc_value, trace)))
if issubclass(exc_type, errors.Error):
sys.exit(exc_value)
else:
# Here we're passing a client or ACME error out to the client at the shell
# Tell the user a bit about what happened, without overwhelming
# them with a full traceback
err = traceback.format_exception_only(exc_type, exc_value)[0]
# Typical error from the ACME module:
# acme.messages.Error: urn:acme:error:malformed :: The request message was
# malformed :: Error creating new registration :: Validation of contact
# mailto:none@longrandomstring.biz failed: Server failure at resolver
if (("urn:acme" in err and ":: " in err and
config.verbose_count <= cli.flag_default("verbose_count"))):
# prune ACME error code, we have a human description
_code, _sep, err = err.partition(":: ")
msg = "An unexpected error occurred:\n" + err + "Please see the "
if config is None:
msg += "logfile '{0}' for more details.".format(logfile)
else:
msg += "logfiles in {0} for more details.".format(config.logs_dir)
sys.exit(msg)
else:
sys.exit("".join(
traceback.format_exception(exc_type, exc_value, trace)))
def main(cli_args=sys.argv[1:]):
"""Command line argument parsing and main script execution."""
sys.excepthook = functools.partial(_handle_exception, config=None)
plugins = plugins_disco.PluginsRegistry.find_all()
# note: arg parser internally handles --help (and exits afterwards)
args = cli.prepare_and_parse_args(plugins, cli_args)
config = configuration.NamespaceConfig(args)
zope.component.provideUtility(config)
# Setup logging ASAP, otherwise "No handlers could be found for
# logger ..." TODO: this should be done before plugins discovery
for directory in config.config_dir, config.work_dir:
le_util.make_or_verify_dir(
directory, constants.CONFIG_DIRS_MODE, os.geteuid(),
"--strict-permissions" in cli_args)
# TODO: logs might contain sensitive data such as contents of the
# private key! #525
le_util.make_or_verify_dir(
config.logs_dir, 0o700, os.geteuid(), "--strict-permissions" in cli_args)
setup_logging(config, _cli_log_handler, logfile='letsencrypt.log')
logger.debug("letsencrypt version: %s", letsencrypt.__version__)
# do not log `config`, as it contains sensitive data (e.g. revoke --key)!
logger.debug("Arguments: %r", cli_args)
logger.debug("Discovered plugins: %r", plugins)
sys.excepthook = functools.partial(_handle_exception, config=config)
# Displayer
if config.noninteractive_mode:
displayer = display_util.NoninteractiveDisplay(sys.stdout)
elif config.text_mode:
displayer = display_util.FileDisplay(sys.stdout)
elif config.verb == "renew":
config.noninteractive_mode = True
displayer = display_util.NoninteractiveDisplay(sys.stdout)
else:
displayer = display_util.NcursesDisplay()
zope.component.provideUtility(displayer)
# Reporter
report = reporter.Reporter()
zope.component.provideUtility(report)
atexit.register(report.atexit_print_messages)
return config.func(config, plugins)
if __name__ == "__main__":
err_string = main()
if err_string:
logger.warn("Exiting with message %s", err_string)
sys.exit(err_string) # pragma: no cover

View file

@ -52,7 +52,7 @@ class Plugin(object):
NOTE: if you add argpase arguments such that users setting them can
create a config entry that python's bool() would consider false (ie,
the use might set the variable to "", [], 0, etc), please ensure that
cli._set_by_cli() works for your variable.
cli.set_by_cli() works for your variable.
"""

View file

@ -4,6 +4,7 @@ import logging
import pkg_resources
import zope.interface
import zope.interface.verify
from letsencrypt import constants
from letsencrypt import errors

View file

@ -152,5 +152,8 @@ to serve all files under specified web root ({0})."""
if exc.errno == errno.ENOTEMPTY:
logger.debug("Challenges cleaned up but %s not empty",
root_path)
elif exc.errno == errno.EACCES:
logger.debug("Challenges cleaned up but no permissions for %s",
root_path)
else:
raise

View file

@ -158,7 +158,7 @@ class AuthenticatorTest(unittest.TestCase):
os.rmdir(leftover_path)
@mock.patch('os.rmdir')
def test_cleanup_oserror(self, mock_rmdir):
def test_cleanup_permission_denied(self, mock_rmdir):
self.auth.prepare()
self.auth.perform([self.achall])
@ -166,10 +166,22 @@ class AuthenticatorTest(unittest.TestCase):
os_error.errno = errno.EACCES
mock_rmdir.side_effect = os_error
self.auth.cleanup([self.achall])
self.assertFalse(os.path.exists(self.validation_path))
self.assertTrue(os.path.exists(self.root_challenge_path))
@mock.patch('os.rmdir')
def test_cleanup_oserror(self, mock_rmdir):
self.auth.prepare()
self.auth.perform([self.achall])
os_error = OSError()
os_error.errno = errno.ENOENT
mock_rmdir.side_effect = os_error
self.assertRaises(OSError, self.auth.cleanup, [self.achall])
self.assertFalse(os.path.exists(self.validation_path))
self.assertTrue(os.path.exists(self.root_challenge_path))
if __name__ == "__main__":
unittest.main() # pragma: no cover

298
letsencrypt/renewal.py Normal file
View file

@ -0,0 +1,298 @@
"""Functionality for autorenewal and associated juggling of configurations"""
from __future__ import print_function
import copy
import glob
import logging
import os
import traceback
import six
import zope.component
from letsencrypt import configuration
from letsencrypt import cli
from letsencrypt import errors
from letsencrypt import storage
from letsencrypt.plugins import disco as plugins_disco
logger = logging.getLogger(__name__)
# These are the items which get pulled out of a renewal configuration
# file's renewalparams and actually used in the client configuration
# during the renewal process. We have to record their types here because
# the renewal configuration process loses this information.
STR_CONFIG_ITEMS = ["config_dir", "logs_dir", "work_dir", "user_agent",
"server", "account", "authenticator", "installer",
"standalone_supported_challenges"]
INT_CONFIG_ITEMS = ["rsa_key_size", "tls_sni_01_port", "http01_port"]
def renewal_conf_files(config):
"""Return /path/to/*.conf in the renewal conf directory"""
return glob.glob(os.path.join(config.renewal_configs_dir, "*.conf"))
def _reconstitute(config, full_path):
"""Try to instantiate a RenewableCert, updating config with relevant items.
This is specifically for use in renewal and enforces several checks
and policies to ensure that we can try to proceed with the renwal
request. The config argument is modified by including relevant options
read from the renewal configuration file.
:param configuration.NamespaceConfig config: configuration for the
current lineage
:param str full_path: Absolute path to the configuration file that
defines this lineage
:returns: the RenewableCert object or None if a fatal error occurred
:rtype: `storage.RenewableCert` or NoneType
"""
try:
renewal_candidate = storage.RenewableCert(
full_path, configuration.RenewerConfiguration(config))
except (errors.CertStorageError, IOError):
logger.warning("Renewal configuration file %s is broken. Skipping.", full_path)
logger.debug("Traceback was:\n%s", traceback.format_exc())
return None
if "renewalparams" not in renewal_candidate.configuration:
logger.warning("Renewal configuration file %s lacks "
"renewalparams. Skipping.", full_path)
return None
renewalparams = renewal_candidate.configuration["renewalparams"]
if "authenticator" not in renewalparams:
logger.warning("Renewal configuration file %s does not specify "
"an authenticator. Skipping.", full_path)
return None
# Now restore specific values along with their data types, if
# those elements are present.
try:
_restore_required_config_elements(config, renewalparams)
_restore_plugin_configs(config, renewalparams)
except (ValueError, errors.Error) as error:
logger.warning(
"An error occured while parsing %s. The error was %s. "
"Skipping the file.", full_path, error.message)
logger.debug("Traceback was:\n%s", traceback.format_exc())
return None
try:
for d in renewal_candidate.names():
cli.process_domain(config, d)
except errors.ConfigurationError as error:
logger.warning("Renewal configuration file %s references a cert "
"that contains an invalid domain name. The problem "
"was: %s. Skipping.", full_path, error)
return None
return renewal_candidate
def _restore_webroot_config(config, renewalparams):
"""
webroot_map is, uniquely, a dict, and the general-purpose configuration
restoring logic is not able to correctly parse it from the serialized
form.
"""
if "webroot_map" in renewalparams:
# if the user does anything that would create a new webroot map on the
# CLI, don't use the old one
if not (cli.set_by_cli("webroot_map") or cli.set_by_cli("webroot_path")):
setattr(config.namespace, "webroot_map", renewalparams["webroot_map"])
elif "webroot_path" in renewalparams:
logger.info("Ancient renewal conf file without webroot-map, restoring webroot-path")
wp = renewalparams["webroot_path"]
if isinstance(wp, str): # prior to 0.1.0, webroot_path was a string
wp = [wp]
setattr(config.namespace, "webroot_path", wp)
def _restore_plugin_configs(config, renewalparams):
"""Sets plugin specific values in config from renewalparams
:param configuration.NamespaceConfig config: configuration for the
current lineage
:param configobj.Section renewalparams: Parameters from the renewal
configuration file that defines this lineage
"""
# Now use parser to get plugin-prefixed items with correct types
# XXX: the current approach of extracting only prefixed items
# related to the actually-used installer and authenticator
# works as long as plugins don't need to read plugin-specific
# variables set by someone else (e.g., assuming Apache
# configurator doesn't need to read webroot_ variables).
# Note: if a parameter that used to be defined in the parser is no
# longer defined, stored copies of that parameter will be
# deserialized as strings by this logic even if they were
# originally meant to be some other type.
if renewalparams["authenticator"] == "webroot":
_restore_webroot_config(config, renewalparams)
plugin_prefixes = []
else:
plugin_prefixes = [renewalparams["authenticator"]]
if renewalparams.get("installer", None) is not None:
plugin_prefixes.append(renewalparams["installer"])
for plugin_prefix in set(plugin_prefixes):
for config_item, config_value in six.iteritems(renewalparams):
if config_item.startswith(plugin_prefix + "_") and not cli.set_by_cli(config_item):
# Values None, True, and False need to be treated specially,
# As their types aren't handled correctly by configobj
if config_value in ("None", "True", "False"):
# bool("False") == True
# pylint: disable=eval-used
setattr(config.namespace, config_item, eval(config_value))
else:
cast = cli.argparse_type(config_item)
setattr(config.namespace, config_item, cast(config_value))
def _restore_required_config_elements(config, renewalparams):
"""Sets non-plugin specific values in config from renewalparams
:param configuration.NamespaceConfig config: configuration for the
current lineage
:param configobj.Section renewalparams: parameters from the renewal
configuration file that defines this lineage
"""
# string-valued items to add if they're present
for config_item in STR_CONFIG_ITEMS:
if config_item in renewalparams and not cli.set_by_cli(config_item):
value = renewalparams[config_item]
# Unfortunately, we've lost type information from ConfigObj,
# so we don't know if the original was NoneType or str!
if value == "None":
value = None
setattr(config.namespace, config_item, value)
# int-valued items to add if they're present
for config_item in INT_CONFIG_ITEMS:
if config_item in renewalparams and not cli.set_by_cli(config_item):
config_value = renewalparams[config_item]
# the default value for http01_port was None during private beta
if config_item == "http01_port" and config_value == "None":
logger.info("updating legacy http01_port value")
int_value = cli.flag_default("http01_port")
else:
try:
int_value = int(config_value)
except ValueError:
raise errors.Error(
"Expected a numeric value for {0}".format(config_item))
setattr(config.namespace, config_item, int_value)
def should_renew(config, lineage):
"Return true if any of the circumstances for automatic renewal apply."
if config.renew_by_default:
logger.info("Auto-renewal forced with --force-renewal...")
return True
if lineage.should_autorenew(interactive=True):
logger.info("Cert is due for renewal, auto-renewing...")
return True
if config.dry_run:
logger.info("Cert not due for renewal, but simulating renewal for dry run")
return True
logger.info("Cert not yet due for renewal")
return False
def _renew_describe_results(config, renew_successes, renew_failures,
renew_skipped, parse_failures):
def _status(msgs, category):
return " " + "\n ".join("%s (%s)" % (m, category) for m in msgs)
if config.dry_run:
print("** DRY RUN: simulating 'letsencrypt renew' close to cert expiry")
print("** (The test certificates below have not been saved.)")
print()
if renew_skipped:
print("The following certs are not due for renewal yet:")
print(_status(renew_skipped, "skipped"))
if not renew_successes and not renew_failures:
print("No renewals were attempted.")
elif renew_successes and not renew_failures:
print("Congratulations, all renewals succeeded. The following certs "
"have been renewed:")
print(_status(renew_successes, "success"))
elif renew_failures and not renew_successes:
print("All renewal attempts failed. The following certs could not be "
"renewed:")
print(_status(renew_failures, "failure"))
elif renew_failures and renew_successes:
print("The following certs were successfully renewed:")
print(_status(renew_successes, "success"))
print("\nThe following certs could not be renewed:")
print(_status(renew_failures, "failure"))
if parse_failures:
print("\nAdditionally, the following renewal configuration files "
"were invalid: ")
print(_status(parse_failures, "parsefail"))
if config.dry_run:
print("** DRY RUN: simulating 'letsencrypt renew' close to cert expiry")
print("** (The test certificates above have not been saved.)")
def renew_all_lineages(config):
"""Examine each lineage; renew if due and report results"""
if config.domains != []:
raise errors.Error("Currently, the renew verb is only capable of "
"renewing all installed certificates that are due "
"to be renewed; individual domains cannot be "
"specified with this action. If you would like to "
"renew specific certificates, use the certonly "
"command. The renew verb may provide other options "
"for selecting certificates to renew in the future.")
renewer_config = configuration.RenewerConfiguration(config)
renew_successes = []
renew_failures = []
renew_skipped = []
parse_failures = []
for renewal_file in renewal_conf_files(renewer_config):
print("Processing " + renewal_file)
lineage_config = copy.deepcopy(config)
# Note that this modifies config (to add back the configuration
# elements from within the renewal configuration file).
try:
renewal_candidate = _reconstitute(lineage_config, renewal_file)
except Exception as e: # pylint: disable=broad-except
logger.warning("Renewal configuration file %s produced an "
"unexpected error: %s. Skipping.", renewal_file, e)
logger.debug("Traceback was:\n%s", traceback.format_exc())
parse_failures.append(renewal_file)
continue
try:
if renewal_candidate is None:
parse_failures.append(renewal_file)
else:
# XXX: ensure that each call here replaces the previous one
zope.component.provideUtility(lineage_config)
if should_renew(lineage_config, renewal_candidate):
plugins = plugins_disco.PluginsRegistry.find_all()
from letsencrypt import main
main.obtain_cert(lineage_config, plugins, renewal_candidate)
renew_successes.append(renewal_candidate.fullchain)
else:
renew_skipped.append(renewal_candidate.fullchain)
except Exception as e: # pylint: disable=broad-except
# obtain_cert (presumably) encountered an unanticipated problem.
logger.warning("Attempting to renew cert from %s produced an "
"unexpected error: %s. Skipping.", renewal_file, e)
logger.debug("Traceback was:\n%s", traceback.format_exc())
renew_failures.append(renewal_candidate.fullchain)
# Describe all the results
_renew_describe_results(config, renew_successes, renew_failures,
renew_skipped, parse_failures)
if renew_failures or parse_failures:
raise errors.Error("{0} renew failure(s), {1} parse failure(s)".format(
len(renew_failures), len(parse_failures)))
else:
logger.debug("no renewal failures")

View file

@ -50,13 +50,12 @@ def add_time_interval(base_time, interval, textparser=parsedatetime.Calendar()):
return textparser.parseDT(interval, base_time, tzinfo=tzinfo)[0]
def write_renewal_config(filename, target, cli_config):
def write_renewal_config(filename, target, relevant_data):
"""Writes a renewal config file with the specified name and values.
:param str filename: Absolute path to the config file
:param dict target: Maps ALL_FOUR to their symlink paths
:param .RenewerConfiguration cli_config: parsed command line
arguments
:param dict relevant_data: Renewal configuration options to save
:returns: Configuration object for the new config file
:rtype: configobj.ConfigObj
@ -67,18 +66,11 @@ def write_renewal_config(filename, target, cli_config):
for kind in ALL_FOUR:
config[kind] = target[kind]
# XXX: We clearly need a more general and correct way of getting
# options into the configobj for the RenewableCert instance.
# This is a quick-and-dirty way to do it to allow integration
# testing to start. (Note that the config parameter to new_lineage
# ideally should be a ConfigObj, but in this case a dict will be
# accepted in practice.)
renewalparams = vars(cli_config.namespace)
if renewalparams:
config["renewalparams"] = renewalparams
if relevant_data:
config["renewalparams"] = relevant_data
config.comments["renewalparams"] = ["",
"Options and defaults used"
" in the renewal process"]
"Options used in "
"the renewal process"]
# TODO: add human-readable comments explaining other available
# parameters
@ -106,7 +98,10 @@ def update_configuration(lineagename, target, cli_config):
# If an existing tempfile exists, delete it
if os.path.exists(temp_filename):
os.unlink(temp_filename)
write_renewal_config(temp_filename, target, cli_config)
# Save only the config items that are relevant to renewal
values = relevant_values(vars(cli_config.namespace))
write_renewal_config(temp_filename, target, values)
os.rename(temp_filename, config_filename)
return configobj.ConfigObj(config_filename)
@ -127,6 +122,60 @@ def get_link_target(link):
return os.path.abspath(target)
def _relevant(option):
"""
Is this option one that could be restored for future renewal purposes?
:param str option: the name of the option
:rtype: bool
"""
# The list() here produces a list of the plugin names as strings.
from letsencrypt import renewal
from letsencrypt.plugins import disco as plugins_disco
plugins = list(plugins_disco.PluginsRegistry.find_all())
return (option in renewal.STR_CONFIG_ITEMS
or option in renewal.INT_CONFIG_ITEMS
or any(option.startswith(x + "_") for x in plugins))
def relevant_values(all_values):
"""Return a new dict containing only items relevant for renewal.
:param dict all_values: The original values.
:returns: A new dictionary containing items that can be used in renewal.
:rtype dict:"""
from letsencrypt import cli
def _is_cli_default(option, value):
# Look through the CLI parser defaults and see if this option is
# both present and equal to the specified value. If not, return
# False.
# pylint: disable=protected-access
for x in cli.helpful_parser.parser._actions:
if x.dest == option:
if x.default == value:
return True
else:
break
return False
values = dict()
for option, value in all_values.iteritems():
# Try to find reasons to store this item in the
# renewal config. It can be stored if it is relevant and
# (it is set_by_cli() or flag_default() is different
# from the value or flag_default() doesn't exist).
if _relevant(option):
if (cli.set_by_cli(option)
or not _is_cli_default(option, value)):
# or option not in constants.CLI_DEFAULTS
# or constants.CLI_DEFAULTS[option] != value):
values[option] = value
return values
class RenewableCert(object): # pylint: disable=too-many-instance-attributes
"""Renewable certificate.
@ -690,6 +739,7 @@ class RenewableCert(object): # pylint: disable=too-many-instance-attributes
:rtype: :class:`storage.renewableCert`
"""
# Examine the configuration and find the new lineage's name
for i in (cli_config.renewal_configs_dir, cli_config.archive_dir,
cli_config.live_dir):
@ -744,7 +794,11 @@ class RenewableCert(object): # pylint: disable=too-many-instance-attributes
# Document what we've done in a new renewal config file
config_file.close()
new_config = write_renewal_config(config_filename, target, cli_config)
# Save only the config items that are relevant to renewal
values = relevant_values(vars(cli_config.namespace))
new_config = write_renewal_config(config_filename, target, values)
return cls(new_config.filename, cli_config)
def save_successor(self, prior_version, new_cert,

View file

@ -126,6 +126,7 @@ class GetAuthorizationsTest(unittest.TestCase):
for achall in self.mock_auth.cleanup.call_args[0][0]:
self.assertTrue(achall.typ in ["tls-sni-01", "http-01", "dns"])
# Length of authorizations list
self.assertEqual(len(authzr), 1)
@mock.patch("letsencrypt.auth_handler.AuthHandler._poll_challenges")
@ -162,6 +163,9 @@ class GetAuthorizationsTest(unittest.TestCase):
self.assertRaises(
errors.AuthorizationError, self.handler.get_authorizations, ["0"])
def test_no_domains(self):
self.assertRaises(errors.AuthorizationError, self.handler.get_authorizations, [])
def _validate_all(self, unused_1, unused_2):
for dom in self.handler.authzr.keys():
azr = self.handler.authzr[dom]

View file

@ -1,5 +1,4 @@
"""Tests for letsencrypt.cli."""
from __future__ import print_function
import argparse
@ -23,6 +22,8 @@ from letsencrypt import constants
from letsencrypt import crypto_util
from letsencrypt import errors
from letsencrypt import le_util
from letsencrypt import main
from letsencrypt import renewal
from letsencrypt import storage
from letsencrypt.plugins import disco
@ -51,19 +52,22 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
def tearDown(self):
shutil.rmtree(self.tmp_dir)
# Reset globals in cli
# pylint: disable=protected-access
cli._parser = cli.set_by_cli.detector = None
def _call(self, args):
"Run the cli with output streams and actual client mocked out"
with mock.patch('letsencrypt.cli.client') as client:
with mock.patch('letsencrypt.main.client') as client:
ret, stdout, stderr = self._call_no_clientmock(args)
return ret, stdout, stderr, client
def _call_no_clientmock(self, args):
"Run the client with output streams mocked out"
args = self.standard_args + args
with mock.patch('letsencrypt.cli.sys.stdout') as stdout:
with mock.patch('letsencrypt.cli.sys.stderr') as stderr:
ret = cli.main(args[:]) # NOTE: parser can alter its args!
with mock.patch('letsencrypt.main.sys.stdout') as stdout:
with mock.patch('letsencrypt.main.sys.stderr') as stderr:
ret = main.main(args[:]) # NOTE: parser can alter its args!
return ret, stdout, stderr
def _call_stdout(self, args):
@ -72,20 +76,21 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
caller.
"""
args = self.standard_args + args
with mock.patch('letsencrypt.cli.sys.stderr') as stderr:
with mock.patch('letsencrypt.cli.client') as client:
ret = cli.main(args[:]) # NOTE: parser can alter its args!
with mock.patch('letsencrypt.main.sys.stderr') as stderr:
with mock.patch('letsencrypt.main.client') as client:
ret = main.main(args[:]) # NOTE: parser can alter its args!
return ret, None, stderr, client
def test_no_flags(self):
with MockedVerb("run") as mock_run:
with mock.patch('letsencrypt.main.run') as mock_run:
self._call([])
self.assertEqual(1, mock_run.call_count)
def _help_output(self, args):
"Run a command, and return the ouput string for scrutiny"
output = six.StringIO()
with mock.patch('letsencrypt.cli.sys.stdout', new=output):
with mock.patch('letsencrypt.main.sys.stdout', new=output):
self.assertRaises(SystemExit, self._call_stdout, args)
out = output.getvalue()
return out
@ -133,13 +138,12 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
out = self._help_output(['-h'])
self.assertTrue(cli.usage_strings(plugins)[0] in out)
def _cli_missing_flag(self, args, message):
"Ensure that a particular error raises a missing cli flag error containing message"
exc = None
try:
with mock.patch('letsencrypt.cli.sys.stderr'):
cli.main(self.standard_args + args[:]) # NOTE: parser can alter its args!
with mock.patch('letsencrypt.main.sys.stderr'):
main.main(self.standard_args + args[:]) # NOTE: parser can alter its args!
except errors.MissingCommandlineFlag as exc:
self.assertTrue(message in str(exc))
self.assertTrue(exc is not None)
@ -149,15 +153,15 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
self._cli_missing_flag(args, "specify a plugin")
args.extend(['--standalone', '-d', 'eg.is'])
self._cli_missing_flag(args, "register before running")
with mock.patch('letsencrypt.cli._auth_from_domains'):
with mock.patch('letsencrypt.cli.client.acme_from_config_key'):
with mock.patch('letsencrypt.main._auth_from_domains'):
with mock.patch('letsencrypt.main.client.acme_from_config_key'):
args.extend(['--email', 'io@io.is'])
self._cli_missing_flag(args, "--agree-tos")
@mock.patch('letsencrypt.cli.client.acme_client.Client')
@mock.patch('letsencrypt.cli._determine_account')
@mock.patch('letsencrypt.cli.client.Client.obtain_and_enroll_certificate')
@mock.patch('letsencrypt.cli._auth_from_domains')
@mock.patch('letsencrypt.main.client.acme_client.Client')
@mock.patch('letsencrypt.main._determine_account')
@mock.patch('letsencrypt.main.client.Client.obtain_and_enroll_certificate')
@mock.patch('letsencrypt.main._auth_from_domains')
def test_user_agent(self, afd, _obt, det, _client):
# Normally the client is totally mocked out, but here we need more
# arguments to automate it...
@ -166,7 +170,7 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
det.return_value = mock.MagicMock(), None
afd.return_value = mock.MagicMock(), "newcert"
with mock.patch('letsencrypt.cli.client.acme_client.ClientNetwork') as acme_net:
with mock.patch('letsencrypt.main.client.acme_client.ClientNetwork') as acme_net:
self._call_no_clientmock(args)
os_ver = " ".join(le_util.get_os_info())
ua = acme_net.call_args[1]["user_agent"]
@ -176,7 +180,7 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
if "linux" in plat.lower():
self.assertTrue(platform.linux_distribution()[0] in ua)
with mock.patch('letsencrypt.cli.client.acme_client.ClientNetwork') as acme_net:
with mock.patch('letsencrypt.main.client.acme_client.ClientNetwork') as acme_net:
ua = "bandersnatch"
args += ["--user-agent", ua]
self._call_no_clientmock(args)
@ -188,7 +192,7 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
chain = 'chain'
fullchain = 'fullchain'
with MockedVerb('install') as mock_install:
with mock.patch('letsencrypt.main.install') as mock_install:
self._call(['install', '--cert-path', cert, '--key-path', 'key',
'--chain-path', 'chain',
'--fullchain-path', 'fullchain'])
@ -199,8 +203,8 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
self.assertEqual(args.chain_path, os.path.abspath(chain))
self.assertEqual(args.fullchain_path, os.path.abspath(fullchain))
@mock.patch('letsencrypt.cli.record_chosen_plugins')
@mock.patch('letsencrypt.cli.display_ops')
@mock.patch('letsencrypt.main.cli.record_chosen_plugins')
@mock.patch('letsencrypt.main.cli.display_ops')
def test_installer_selection(self, mock_display_ops, _rec):
self._call(['install', '--domains', 'foo.bar', '--cert-path', 'cert',
'--key-path', 'key', '--chain-path', 'chain'])
@ -239,14 +243,14 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
self._cli_missing_flag(["--standalone"], "With the standalone plugin, you probably")
with mock.patch("letsencrypt.cli._init_le_client") as mock_init:
with mock.patch("letsencrypt.cli._auth_from_domains") as mock_afd:
with mock.patch("letsencrypt.main._init_le_client") as mock_init:
with mock.patch("letsencrypt.main._auth_from_domains") as mock_afd:
mock_afd.return_value = (mock.MagicMock(), mock.MagicMock())
self._call(["certonly", "--manual", "-d", "foo.bar"])
unused_config, auth, unused_installer = mock_init.call_args[0]
self.assertTrue(isinstance(auth, manual.Authenticator))
with MockedVerb("certonly") as mock_certonly:
with mock.patch('letsencrypt.main.obtain_cert') as mock_certonly:
self._call(["auth", "--standalone"])
self.assertEqual(1, mock_certonly.call_count)
@ -269,8 +273,8 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
for r in xrange(len(flags)))):
self._call(['plugins'] + list(args))
@mock.patch('letsencrypt.cli.plugins_disco')
@mock.patch('letsencrypt.cli.HelpfulArgumentParser.determine_help_topics')
@mock.patch('letsencrypt.main.plugins_disco')
@mock.patch('letsencrypt.main.cli.HelpfulArgumentParser.determine_help_topics')
def test_plugins_no_args(self, _det, mock_disco):
ifaces = []
plugins = mock_disco.PluginsRegistry.find_all()
@ -281,8 +285,8 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
filtered = plugins.visible().ifaces()
stdout.write.called_once_with(str(filtered))
@mock.patch('letsencrypt.cli.plugins_disco')
@mock.patch('letsencrypt.cli.HelpfulArgumentParser.determine_help_topics')
@mock.patch('letsencrypt.main.plugins_disco')
@mock.patch('letsencrypt.main.cli.HelpfulArgumentParser.determine_help_topics')
def test_plugins_init(self, _det, mock_disco):
ifaces = []
plugins = mock_disco.PluginsRegistry.find_all()
@ -296,8 +300,8 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
verified = filtered.verify()
stdout.write.called_once_with(str(verified))
@mock.patch('letsencrypt.cli.plugins_disco')
@mock.patch('letsencrypt.cli.HelpfulArgumentParser.determine_help_topics')
@mock.patch('letsencrypt.main.plugins_disco')
@mock.patch('letsencrypt.main.cli.HelpfulArgumentParser.determine_help_topics')
def test_plugins_prepare(self, _det, mock_disco):
ifaces = []
plugins = mock_disco.PluginsRegistry.find_all()
@ -319,7 +323,7 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
chain = 'chain'
fullchain = 'fullchain'
with MockedVerb('certonly') as mock_obtaincert:
with mock.patch('letsencrypt.main.obtain_cert') as mock_obtaincert:
self._call(['certonly', '--cert-path', cert, '--key-path', 'key',
'--chain-path', 'chain',
'--fullchain-path', 'fullchain'])
@ -360,6 +364,10 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
self._call,
['-d', '204.11.231.35'])
def test_csr_with_besteffort(self):
args = ["--csr", CSR, "--allow-subset-of-names"]
self.assertRaises(errors.Error, self._call, args)
def test_run_with_csr(self):
# This is an error because you can only use --csr with certonly
try:
@ -462,7 +470,7 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
if domains_arg:
webroot_map_args.extend(["-d", domains_arg])
namespace = parse(webroot_map_args)
domains = cli._find_domains(namespace, mock.MagicMock()) # pylint: disable=protected-access
domains = main._find_domains(namespace, mock.MagicMock()) # pylint: disable=protected-access
self.assertEqual(namespace.webroot_map, expected_map)
self.assertEqual(set(domains), set(expectect_domains))
@ -506,16 +514,16 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
{"eg.com": "/tmp", "www.eg.com": "/tmp", "eg.is": "/tmp2"})
def _certonly_new_request_common(self, mock_client, args=None):
with mock.patch('letsencrypt.cli._treat_as_renewal') as mock_renewal:
with mock.patch('letsencrypt.main._treat_as_renewal') as mock_renewal:
mock_renewal.return_value = ("newcert", None)
with mock.patch('letsencrypt.cli._init_le_client') as mock_init:
with mock.patch('letsencrypt.main._init_le_client') as mock_init:
mock_init.return_value = mock_client
if args is None:
args = []
args += '-d foo.bar -a standalone certonly'.split()
self._call(args)
@mock.patch('letsencrypt.cli.zope.component.getUtility')
@mock.patch('letsencrypt.main.zope.component.getUtility')
def test_certonly_dry_run_new_request_success(self, mock_get_utility):
mock_client = mock.MagicMock()
mock_client.obtain_and_enroll_certificate.return_value = None
@ -528,7 +536,7 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
self.assertEqual(mock_get_utility().add_message.call_count, 1)
@mock.patch('letsencrypt.crypto_util.notAfter')
@mock.patch('letsencrypt.cli.zope.component.getUtility')
@mock.patch('letsencrypt.main.zope.component.getUtility')
def test_certonly_new_request_success(self, mock_get_utility, mock_notAfter):
cert_path = '/etc/letsencrypt/live/foo.bar'
date = '1970-01-01'
@ -553,7 +561,7 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
self._certonly_new_request_common, mock_client)
def _test_renewal_common(self, due_for_renewal, extra_args, log_out=None,
args=None, renew=True, error_expected=False):
args=None, should_renew=True, error_expected=False):
# pylint: disable=too-many-locals,too-many-arguments
cert_path = 'letsencrypt/tests/testdata/cert.pem'
chain_path = '/etc/letsencrypt/live/foo.bar/fullchain.pem'
@ -565,17 +573,17 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
mock_client.obtain_certificate.return_value = (mock_certr, 'chain',
mock_key, 'csr')
try:
with mock.patch('letsencrypt.cli._find_duplicative_certs') as mock_fdc:
with mock.patch('letsencrypt.main._find_duplicative_certs') as mock_fdc:
mock_fdc.return_value = (mock_lineage, None)
with mock.patch('letsencrypt.cli._init_le_client') as mock_init:
with mock.patch('letsencrypt.main._init_le_client') as mock_init:
mock_init.return_value = mock_client
get_utility_path = 'letsencrypt.cli.zope.component.getUtility'
get_utility_path = 'letsencrypt.main.zope.component.getUtility'
with mock.patch(get_utility_path) as mock_get_utility:
with mock.patch('letsencrypt.cli.OpenSSL') as mock_ssl:
with mock.patch('letsencrypt.main.OpenSSL') as mock_ssl:
mock_latest = mock.MagicMock()
mock_latest.get_issuer.return_value = "Fake fake"
mock_ssl.crypto.load_certificate.return_value = mock_latest
with mock.patch('letsencrypt.cli.crypto_util'):
with mock.patch('letsencrypt.main.crypto_util'):
if not args:
args = ['-d', 'isnot.org', '-a', 'standalone', 'certonly']
if extra_args:
@ -592,7 +600,7 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
"Unexpected renewal error:\n" +
traceback.format_exc())
if renew:
if should_renew:
mock_client.obtain_certificate.assert_called_once_with(['isnot.org'])
else:
self.assertEqual(mock_client.obtain_certificate.call_count, 0)
@ -627,7 +635,7 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
self.assertEqual(get_utility().add_message.call_count, 1)
_, _ = self._test_renewal_common(False, ['-tvv', '--debug', '--keep'],
log_out="not yet due", renew=False)
log_out="not yet due", should_renew=False)
def _dump_log(self):
with open(os.path.join(self.logs_dir, "letsencrypt.log")) as lf:
@ -650,9 +658,9 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
def test_renew_verb(self):
self._make_test_renewal_conf('sample-renewal.conf')
args = ["renew", "--dry-run", "-tvv"]
self._test_renewal_common(True, [], args=args, renew=True)
self._test_renewal_common(True, [], args=args, should_renew=True)
@mock.patch("letsencrypt.cli._set_by_cli")
@mock.patch("letsencrypt.cli.set_by_cli")
def test_ancient_webroot_renewal_conf(self, mock_set_by_cli):
mock_set_by_cli.return_value = False
rc_path = self._make_test_renewal_conf('sample-renewal-ancient.conf')
@ -662,7 +670,7 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
configuration.RenewerConfiguration(config))
renewalparams = lineage.configuration["renewalparams"]
# pylint: disable=protected-access
cli._restore_webroot_config(config, renewalparams)
renewal._restore_webroot_config(config, renewalparams)
self.assertEqual(config.webroot_path, ["/var/www/"])
def test_renew_verb_empty_config(self):
@ -672,7 +680,7 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
with open(os.path.join(rd, 'empty.conf'), 'w'):
pass # leave the file empty
args = ["renew", "--dry-run", "-tvv"]
self._test_renewal_common(False, [], args=args, renew=False, error_expected=True)
self._test_renewal_common(False, [], args=args, should_renew=False, error_expected=True)
def _make_dummy_renewal_config(self):
renewer_configs_dir = os.path.join(self.config_dir, 'renewal')
@ -680,8 +688,8 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
with open(os.path.join(renewer_configs_dir, 'test.conf'), 'w') as f:
f.write("My contents don't matter")
def _test_renew_common(self, renewalparams=None, error_expected=False,
names=None, assert_oc_called=None):
def _test_renew_common(self, renewalparams=None, names=None,
assert_oc_called=None, **kwargs):
self._make_dummy_renewal_config()
with mock.patch('letsencrypt.storage.RenewableCert') as mock_rc:
mock_lineage = mock.MagicMock()
@ -691,9 +699,10 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
if names is not None:
mock_lineage.names.return_value = names
mock_rc.return_value = mock_lineage
with mock.patch('letsencrypt.cli.obtain_cert') as mock_obtain_cert:
self._test_renewal_common(True, None, error_expected=error_expected,
args=['renew'], renew=False)
with mock.patch('letsencrypt.main.obtain_cert') as mock_obtain_cert:
kwargs.setdefault('args', ['renew'])
self._test_renewal_common(True, None, should_renew=False, **kwargs)
if assert_oc_called is not None:
if assert_oc_called:
self.assertTrue(mock_obtain_cert.called)
@ -716,7 +725,7 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
def test_renew_with_nonetype_http01(self):
renewalparams = {'authenticator': 'webroot',
'http01_port': 'None'}
self._test_renew_common(renewalparams=renewalparams, error_expected=False,
self._test_renew_common(renewalparams=renewalparams,
assert_oc_called=True)
def test_renew_with_bad_domain(self):
@ -725,6 +734,12 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
self._test_renew_common(renewalparams=renewalparams, error_expected=True,
names=names, assert_oc_called=False)
def test_renew_with_configurator(self):
renewalparams = {'authenticator': 'webroot'}
self._test_renew_common(
renewalparams=renewalparams, assert_oc_called=True,
args='renew --configurator apache'.split())
def test_renew_plugin_config_restoration(self):
renewalparams = {'authenticator': 'webroot',
'webroot_path': 'None',
@ -734,7 +749,7 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
def test_renew_reconstitute_error(self):
# pylint: disable=protected-access
with mock.patch('letsencrypt.cli._reconstitute') as mock_reconstitute:
with mock.patch('letsencrypt.main.renewal._reconstitute') as mock_reconstitute:
mock_reconstitute.side_effect = Exception
self._test_renew_common(assert_oc_called=False, error_expected=True)
@ -746,20 +761,20 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
mock_rc.return_value = mock_lineage
mock_lineage.configuration = {
'renewalparams': {'authenticator': 'webroot'}}
with mock.patch('letsencrypt.cli.obtain_cert') as mock_obtain_cert:
with mock.patch('letsencrypt.main.obtain_cert') as mock_obtain_cert:
mock_obtain_cert.side_effect = Exception
self._test_renewal_common(True, None, error_expected=True,
args=['renew'], renew=False)
args=['renew'], should_renew=False)
def test_renew_with_bad_cli_args(self):
self._test_renewal_common(True, None, args='renew -d example.com'.split(),
renew=False, error_expected=True)
should_renew=False, error_expected=True)
self._test_renewal_common(True, None, args='renew --csr {0}'.format(CSR).split(),
renew=False, error_expected=True)
should_renew=False, error_expected=True)
@mock.patch('letsencrypt.cli.zope.component.getUtility')
@mock.patch('letsencrypt.cli._treat_as_renewal')
@mock.patch('letsencrypt.cli._init_le_client')
@mock.patch('letsencrypt.main.zope.component.getUtility')
@mock.patch('letsencrypt.main._treat_as_renewal')
@mock.patch('letsencrypt.main._init_le_client')
def test_certonly_reinstall(self, mock_init, mock_renewal, mock_get_utility):
mock_renewal.return_value = ('reinstall', mock.MagicMock())
mock_init.return_value = mock_client = mock.MagicMock()
@ -776,9 +791,9 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
mock_client.obtain_certificate_from_csr.return_value = (certr, chain)
cert_path = '/etc/letsencrypt/live/example.com/cert.pem'
mock_client.save_certificate.return_value = cert_path, None, None
with mock.patch('letsencrypt.cli._init_le_client') as mock_init:
with mock.patch('letsencrypt.main._init_le_client') as mock_init:
mock_init.return_value = mock_client
get_utility_path = 'letsencrypt.cli.zope.component.getUtility'
get_utility_path = 'letsencrypt.main.zope.component.getUtility'
with mock.patch(get_utility_path) as mock_get_utility:
chain_path = '/etc/letsencrypt/live/example.com/chain.pem'
full_path = '/etc/letsencrypt/live/example.com/fullchain.pem'
@ -787,7 +802,7 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
CSR, cert_path, chain_path, full_path).split()
if extra_args:
args += extra_args
with mock.patch('letsencrypt.cli.crypto_util'):
with mock.patch('letsencrypt.main.crypto_util'):
self._call(args)
if '--dry-run' in args:
@ -811,7 +826,7 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
self.assertTrue(
'dry run' in mock_get_utility().add_message.call_args[0][0])
@mock.patch('letsencrypt.cli.client.acme_client')
@mock.patch('letsencrypt.main.client.acme_client')
def test_revoke_with_key(self, mock_acme_client):
server = 'foo.bar'
self._call_no_clientmock(['--cert-path', CERT, '--key-path', KEY,
@ -824,7 +839,7 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
mock_revoke = mock_acme_client.Client().revoke
mock_revoke.assert_called_once_with(jose.ComparableX509(cert))
@mock.patch('letsencrypt.cli._determine_account')
@mock.patch('letsencrypt.main._determine_account')
def test_revoke_without_key(self, mock_determine_account):
mock_determine_account.return_value = (mock.MagicMock(), None)
_, _, _, client = self._call(['--cert-path', CERT, 'revoke'])
@ -833,7 +848,7 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
mock_revoke = client.acme_from_config_key().revoke
mock_revoke.assert_called_once_with(jose.ComparableX509(cert))
@mock.patch('letsencrypt.cli.sys')
@mock.patch('letsencrypt.main.sys')
def test_handle_exception(self, mock_sys):
# pylint: disable=protected-access
from acme import messages
@ -841,20 +856,20 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
config = mock.MagicMock()
mock_open = mock.mock_open()
with mock.patch('letsencrypt.cli.open', mock_open, create=True):
with mock.patch('letsencrypt.main.open', mock_open, create=True):
exception = Exception('detail')
config.verbose_count = 1
cli._handle_exception(
main._handle_exception(
Exception, exc_value=exception, trace=None, config=None)
mock_open().write.assert_called_once_with(''.join(
traceback.format_exception_only(Exception, exception)))
error_msg = mock_sys.exit.call_args_list[0][0][0]
self.assertTrue('unexpected error' in error_msg)
with mock.patch('letsencrypt.cli.open', mock_open, create=True):
with mock.patch('letsencrypt.main.open', mock_open, create=True):
mock_open.side_effect = [KeyboardInterrupt]
error = errors.Error('detail')
cli._handle_exception(
main._handle_exception(
errors.Error, exc_value=error, trace=None, config=None)
# assert_any_call used because sys.exit doesn't exit in cli.py
mock_sys.exit.assert_any_call(''.join(
@ -863,7 +878,7 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
exception = messages.Error(detail='alpha', typ='urn:acme:error:triffid',
title='beta')
config = mock.MagicMock(debug=False, verbose_count=-3)
cli._handle_exception(
main._handle_exception(
messages.Error, exc_value=exception, trace=None, config=config)
error_msg = mock_sys.exit.call_args_list[-1][0][0]
self.assertTrue('unexpected error' in error_msg)
@ -871,7 +886,7 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
self.assertTrue('alpha' in error_msg)
self.assertTrue('beta' in error_msg)
config = mock.MagicMock(debug=False, verbose_count=1)
cli._handle_exception(
main._handle_exception(
messages.Error, exc_value=exception, trace=None, config=config)
error_msg = mock_sys.exit.call_args_list[-1][0][0]
self.assertTrue('unexpected error' in error_msg)
@ -879,7 +894,7 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
self.assertTrue('alpha' in error_msg)
interrupt = KeyboardInterrupt('detail')
cli._handle_exception(
main._handle_exception(
KeyboardInterrupt, exc_value=interrupt, trace=None, config=None)
mock_sys.exit.assert_called_with(''.join(
traceback.format_exception_only(KeyboardInterrupt, interrupt)))
@ -898,7 +913,7 @@ class CLITest(unittest.TestCase): # pylint: disable=too-many-public-methods
self.assertEqual(contents, test_contents)
def test_agree_dev_preview_config(self):
with MockedVerb('run') as mocked_run:
with mock.patch('letsencrypt.main.run') as mocked_run:
self._call(['-c', test_util.vector_path('cli.ini')])
self.assertTrue(mocked_run.called)
@ -915,8 +930,8 @@ class DetermineAccountTest(unittest.TestCase):
def _call(self):
# pylint: disable=protected-access
from letsencrypt.cli import _determine_account
with mock.patch('letsencrypt.cli.account.AccountFileStorage') as mock_storage:
from letsencrypt.main import _determine_account
with mock.patch('letsencrypt.main.account.AccountFileStorage') as mock_storage:
mock_storage.return_value = self.account_storage
return _determine_account(self.config)
@ -948,7 +963,7 @@ class DetermineAccountTest(unittest.TestCase):
def test_no_accounts_no_email(self, mock_get_email):
mock_get_email.return_value = 'foo@bar.baz'
with mock.patch('letsencrypt.cli.client') as client:
with mock.patch('letsencrypt.main.client') as client:
client.register.return_value = (
self.accs[0], mock.sentinel.acme)
self.assertEqual((self.accs[0], mock.sentinel.acme), self._call())
@ -960,7 +975,7 @@ class DetermineAccountTest(unittest.TestCase):
def test_no_accounts_email(self):
self.config.email = 'other email'
with mock.patch('letsencrypt.cli.client') as client:
with mock.patch('letsencrypt.main.client') as client:
client.register.return_value = (self.accs[1], mock.sentinel.acme)
self._call()
self.assertEqual(self.accs[1].id, self.config.account)
@ -980,7 +995,7 @@ class DuplicativeCertsTest(storage_test.BaseRenewableCertTest):
@mock.patch('letsencrypt.le_util.make_or_verify_dir')
def test_find_duplicative_names(self, unused_makedir):
from letsencrypt.cli import _find_duplicative_certs
from letsencrypt.main import _find_duplicative_certs
test_cert = test_util.load_vector('cert-san.pem')
with open(self.test_rc.cert, 'w') as f:
f.write(test_cert)
@ -1008,34 +1023,5 @@ class DuplicativeCertsTest(storage_test.BaseRenewableCertTest):
self.assertEqual(result, (None, None))
class MockedVerb(object):
"""Simple class that can be used for mocking out verbs/subcommands.
Storing a dictionary of verbs and the functions that implement them
in letsencrypt.cli makes mocking much more complicated. This class
can be used as a simple context manager for mocking out verbs in CLI
tests. For example:
with MockedVerb("run") as mock_run:
self._call([])
self.assertEqual(1, mock_run.call_count)
"""
def __init__(self, verb_name):
self.verb_dict = cli.HelpfulArgumentParser.VERBS
self.verb_func = None
self.verb_name = verb_name
def __enter__(self):
self.verb_func = self.verb_dict[self.verb_name]
mocked_func = mock.MagicMock()
self.verb_dict[self.verb_name] = mocked_func
return mocked_func
def __exit__(self, unused_type, unused_value, unused_trace):
self.verb_dict[self.verb_name] = self.verb_func
if __name__ == '__main__':
unittest.main() # pragma: no cover

View file

@ -63,8 +63,8 @@ class RegisterTest(unittest.TestCase):
@mock.patch("letsencrypt.client.display_ops.get_email")
def test_email_retry(self, _rep, mock_get_email):
from acme import messages
msg = "Validation of contact mailto:sousaphone@improbablylongggstring.tld failed"
mx_err = messages.Error(detail=msg, typ="malformed", title="title")
msg = "DNS problem: NXDOMAIN looking up MX for example.com"
mx_err = messages.Error(detail=msg, typ="urn:acme:error:invalidEmail")
with mock.patch("letsencrypt.client.acme_client.Client") as mock_client:
mock_client().register.side_effect = [mx_err, mock.MagicMock()]
self._call()
@ -96,7 +96,7 @@ class ClientTest(unittest.TestCase):
def setUp(self):
self.config = mock.MagicMock(
no_verify_ssl=False, config_dir="/etc/letsencrypt")
no_verify_ssl=False, config_dir="/etc/letsencrypt", allow_subset_of_names=False)
# pylint: disable=star-args
self.account = mock.MagicMock(**{"key.pem": KEY})
self.eg_domains = ["example.com", "www.example.com"]
@ -115,20 +115,27 @@ class ClientTest(unittest.TestCase):
def _mock_obtain_certificate(self):
self.client.auth_handler = mock.MagicMock()
self.client.auth_handler.get_authorizations.return_value = [None]
self.acme.request_issuance.return_value = mock.sentinel.certr
self.acme.fetch_chain.return_value = mock.sentinel.chain
def _check_obtain_certificate(self):
self.client.auth_handler.get_authorizations.assert_called_once_with(self.eg_domains)
self.client.auth_handler.get_authorizations.assert_called_once_with(
self.eg_domains,
self.config.allow_subset_of_names)
authzr = self.client.auth_handler.get_authorizations()
self.acme.request_issuance.assert_called_once_with(
jose.ComparableX509(OpenSSL.crypto.load_certificate_request(
OpenSSL.crypto.FILETYPE_ASN1, CSR_SAN)),
self.client.auth_handler.get_authorizations())
authzr)
self.acme.fetch_chain.assert_called_once_with(mock.sentinel.certr)
# FIXME move parts of this to test_cli.py...
@mock.patch("letsencrypt.client.logger")
@mock.patch("letsencrypt.cli._process_domain")
@mock.patch("letsencrypt.cli.process_domain")
def test_obtain_certificate_from_csr(self, mock_process_domain, mock_logger):
self._mock_obtain_certificate()
from letsencrypt import cli
@ -151,12 +158,28 @@ class ClientTest(unittest.TestCase):
self.assertRaises(errors.ConfigurationError,
cli.HelpfulArgumentParser.handle_csr, mock_parser, mock_parsed_args)
authzr = self.client.auth_handler.get_authorizations(self.eg_domains, False)
self.assertEqual(
(mock.sentinel.certr, mock.sentinel.chain),
self.client.obtain_certificate_from_csr(self.eg_domains, test_csr))
self.client.obtain_certificate_from_csr(
self.eg_domains,
test_csr,
authzr=authzr))
# and that the cert was obtained correctly
self._check_obtain_certificate()
# Test for authzr=None
self.assertEqual(
(mock.sentinel.certr, mock.sentinel.chain),
self.client.obtain_certificate_from_csr(
self.eg_domains,
test_csr,
authzr=None))
self.client.auth_handler.get_authorizations.assert_called_with(
self.eg_domains)
# Test for no auth_handler
self.client.auth_handler = None
self.assertRaises(
@ -175,6 +198,21 @@ class ClientTest(unittest.TestCase):
mock_crypto_util.init_save_key.return_value = mock.sentinel.key
domains = ["example.com", "www.example.com"]
# return_value is essentially set to (None, None) in
# _mock_obtain_certificate(), which breaks this test.
# Thus fixed by the next line.
authzr = []
for domain in domains:
authzr.append(
mock.MagicMock(
body=mock.MagicMock(
identifier=mock.MagicMock(
value=domain))))
self.client.auth_handler.get_authorizations.return_value = authzr
self.assertEqual(
self.client.obtain_certificate(domains),
(mock.sentinel.certr, mock.sentinel.chain, mock.sentinel.key, csr))

View file

@ -493,7 +493,12 @@ class RenewableCertTests(BaseRenewableCertTest):
self.assertTrue(self.test_rc.should_autorenew())
mock_ocsp.return_value = False
def test_save_successor(self):
@mock.patch("letsencrypt.storage.relevant_values")
def test_save_successor(self, mock_rv):
# Mock relevant_values() to claim that all values are relevant here
# (to avoid instantiating parser)
mock_rv.side_effect = lambda x: x
for ver in xrange(1, 6):
for kind in ALL_FOUR:
where = getattr(self.test_rc, kind)
@ -557,8 +562,47 @@ class RenewableCertTests(BaseRenewableCertTest):
self.assertFalse(os.path.islink(self.test_rc.version("privkey", 10)))
self.assertFalse(os.path.exists(temp_config_file))
def test_new_lineage(self):
@mock.patch("letsencrypt.cli.helpful_parser")
def test_relevant_values(self, mock_parser):
"""Test that relevant_values() can reject an irrelevant value."""
# pylint: disable=protected-access
from letsencrypt import storage
mock_parser.verb = "certonly"
mock_parser.args = ["--standalone"]
mock_action = mock.Mock(dest="rsa_key_size", default=2048)
mock_parser.parser._actions = [mock_action]
self.assertEqual(storage.relevant_values({"hello": "there"}), {})
@mock.patch("letsencrypt.cli.helpful_parser")
def test_relevant_values_default(self, mock_parser):
"""Test that relevant_values() can reject a default value."""
# pylint: disable=protected-access
from letsencrypt import storage
mock_parser.verb = "certonly"
mock_parser.args = ["--standalone"]
mock_action = mock.Mock(dest="rsa_key_size", default=2048)
mock_parser.parser._actions = [mock_action]
self.assertEqual(storage.relevant_values({"rsa_key_size": 2048}), {})
@mock.patch("letsencrypt.cli.helpful_parser")
def test_relevant_values_nondefault(self, mock_parser):
"""Test that relevant_values() can retain a non-default value."""
# pylint: disable=protected-access
from letsencrypt import storage
mock_parser.verb = "certonly"
mock_parser.args = ["--standalone"]
mock_action = mock.Mock(dest="rsa_key_size", default=2048)
mock_parser.parser._actions = [mock_action]
self.assertEqual(storage.relevant_values({"rsa_key_size": 12}),
{"rsa_key_size": 12})
@mock.patch("letsencrypt.storage.relevant_values")
def test_new_lineage(self, mock_rv):
"""Test for new_lineage() class method."""
# Mock relevant_values to say everything is relevant here (so we
# don't have to mock the parser to help it decide!)
mock_rv.side_effect = lambda x: x
from letsencrypt import storage
result = storage.RenewableCert.new_lineage(
"the-lineage.com", "cert", "privkey", "chain", self.cli_config)
@ -592,8 +636,13 @@ class RenewableCertTests(BaseRenewableCertTest):
# TODO: Conceivably we could test that the renewal parameters actually
# got saved
def test_new_lineage_nonexistent_dirs(self):
@mock.patch("letsencrypt.storage.relevant_values")
def test_new_lineage_nonexistent_dirs(self, mock_rv):
"""Test that directories can be created if they don't exist."""
# Mock relevant_values to say everything is relevant here (so we
# don't have to mock the parser to help it decide!)
mock_rv.side_effect = lambda x: x
from letsencrypt import storage
shutil.rmtree(self.cli_config.renewal_configs_dir)
shutil.rmtree(self.cli_config.archive_dir)

View file

@ -14,7 +14,7 @@ apache_dismod = a2dismod
register_unsafely_without_email = False
apache_handle_modules = True
uir = None
installer = none
installer = None
nginx_ctl = nginx
config_dir = MAGICDIR
text_mode = False

View file

@ -14,7 +14,7 @@ apache_dismod = a2dismod
register_unsafely_without_email = False
apache_handle_modules = True
uir = None
installer = none
installer = None
nginx_ctl = nginx
config_dir = MAGICDIR
text_mode = False

View file

@ -127,7 +127,7 @@ setup(
entry_points={
'console_scripts': [
'letsencrypt = letsencrypt.cli:main',
'letsencrypt = letsencrypt.main:main',
],
'letsencrypt.plugins': [
'manual = letsencrypt.plugins.manual:Authenticator',