certbot/certbot-compatibility-test/certbot_compatibility_test/test_driver.py

378 lines
12 KiB
Python
Raw Permalink Normal View History

"""Tests Certbot plugins against different server configurations."""
2015-07-14 21:04:43 -04:00
import argparse
2015-07-21 21:14:57 -04:00
import filecmp
2015-07-14 21:04:43 -04:00
import logging
2015-07-21 21:14:57 -04:00
import os
import shutil
import tempfile
2015-08-03 21:32:32 -04:00
import time
import sys
2015-07-14 21:04:43 -04:00
2015-07-21 21:14:57 -04:00
import OpenSSL
from acme import challenges
from acme import crypto_util
from acme import messages
from certbot import achallenges
from certbot import errors as le_errors
from certbot.tests import acme_util
from certbot_compatibility_test import errors
from certbot_compatibility_test import util
from certbot_compatibility_test import validator
from certbot_compatibility_test.configurators.apache import common as a_common
from certbot_compatibility_test.configurators.nginx import common as n_common
2015-07-14 21:04:43 -04:00
2015-07-17 19:21:17 -04:00
2015-07-14 21:04:43 -04:00
DESCRIPTION = """
Tests Certbot plugins against different server configurations. It is
assumed that Docker is already installed. If no test type is specified, all
2015-07-16 19:57:47 -04:00
tests that the plugin supports are performed.
2015-07-14 21:04:43 -04:00
"""
PLUGINS = {"apache": a_common.Proxy, "nginx": n_common.Proxy}
2015-07-14 21:04:43 -04:00
logger = logging.getLogger(__name__)
2015-07-21 21:14:57 -04:00
def test_authenticator(plugin, config, temp_dir):
2015-07-23 20:09:09 -04:00
"""Tests authenticator, returning True if the tests are successful"""
2015-07-22 21:25:09 -04:00
backup = _create_backup(config, temp_dir)
2015-07-21 21:14:57 -04:00
achalls = _create_achalls(plugin)
2015-07-22 21:25:09 -04:00
if not achalls:
2015-08-03 14:38:22 -04:00
logger.error("The plugin and this program support no common "
"challenge types")
return False
2015-07-22 21:25:09 -04:00
try:
responses = plugin.perform(achalls)
except le_errors.Error as error:
2015-07-23 20:09:09 -04:00
logger.error("Performing challenges on %s caused an error:", config)
logger.exception(error)
return False
2015-07-21 21:14:57 -04:00
2015-07-23 20:09:09 -04:00
success = True
for i in xrange(len(responses)):
if not responses[i]:
logger.error(
"Plugin failed to complete %s for %s in %s",
type(achalls[i]), achalls[i].domain, config)
success = False
2016-06-06 14:49:36 -04:00
elif isinstance(responses[i], challenges.TLSSNI01Response):
verified = responses[i].simple_verify(achalls[i].chall,
achalls[i].domain,
util.JWK.public_key(),
host="127.0.0.1",
port=plugin.https_port)
if verified:
2015-07-23 20:09:09 -04:00
logger.info(
2015-11-07 13:10:56 -05:00
"tls-sni-01 verification for %s succeeded", achalls[i].domain)
2015-07-23 20:09:09 -04:00
else:
logger.error(
"**** tls-sni-01 verification for %s in %s failed",
2015-07-23 20:09:09 -04:00
achalls[i].domain, config)
success = False
if success:
try:
plugin.cleanup(achalls)
except le_errors.Error as error:
logger.error("Challenge cleanup for %s caused an error:", config)
logger.exception(error)
success = False
if _dirs_are_unequal(config, backup):
logger.error("Challenge cleanup failed for %s", config)
return False
else:
logger.info("Challenge cleanup succeeded")
return success
2015-07-21 21:14:57 -04:00
def _create_achalls(plugin):
"""Returns a list of annotated challenges to test on plugin"""
achalls = list()
names = plugin.get_testable_domain_names()
for domain in names:
prefs = plugin.get_chall_pref(domain)
for chall_type in prefs:
2015-11-07 13:10:56 -05:00
if chall_type == challenges.TLSSNI01:
chall = challenges.TLSSNI01(
token=os.urandom(challenges.TLSSNI01.TOKEN_SIZE))
2015-07-21 21:14:57 -04:00
challb = acme_util.chall_to_challb(
chall, messages.STATUS_PENDING)
2015-11-07 13:10:56 -05:00
achall = achallenges.KeyAuthorizationAnnotatedChallenge(
challb=challb, domain=domain, account_key=util.JWK)
2015-07-21 21:14:57 -04:00
achalls.append(achall)
return achalls
2015-07-22 21:25:09 -04:00
def test_installer(args, plugin, config, temp_dir):
2015-07-21 21:14:57 -04:00
"""Tests plugin as an installer"""
2015-07-22 21:25:09 -04:00
backup = _create_backup(config, temp_dir)
2015-07-21 21:14:57 -04:00
2015-07-23 20:09:09 -04:00
names_match = plugin.get_all_names() == plugin.get_all_names_answer()
if names_match:
2015-07-22 21:25:09 -04:00
logger.info("get_all_names test succeeded")
2015-07-21 21:14:57 -04:00
else:
logger.error("**** get_all_names test failed for config %s", config)
2015-07-21 21:14:57 -04:00
domains = list(plugin.get_testable_domain_names())
2015-07-23 20:09:09 -04:00
success = test_deploy_cert(plugin, temp_dir, domains)
if success and args.enhance:
success = test_enhancements(plugin, domains)
2015-07-22 21:25:09 -04:00
2015-07-23 20:09:09 -04:00
good_rollback = test_rollback(plugin, config, backup)
return names_match and success and good_rollback
2015-07-22 21:25:09 -04:00
def test_deploy_cert(plugin, temp_dir, domains):
"""Tests deploy_cert returning True if the tests are successful"""
2015-07-21 21:14:57 -04:00
cert = crypto_util.gen_ss_cert(util.KEY, domains)
cert_path = os.path.join(temp_dir, "cert.pem")
with open(cert_path, "w") as f:
f.write(OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM, cert))
for domain in domains:
2015-07-23 20:09:09 -04:00
try:
plugin.deploy_cert(domain, cert_path, util.KEY_PATH, cert_path, cert_path)
2016-06-06 17:17:11 -04:00
plugin.save() # Needed by the Apache plugin
2015-07-23 20:09:09 -04:00
except le_errors.Error as error:
logger.error("**** Plugin failed to deploy certificate for %s:", domain)
2015-07-23 20:09:09 -04:00
logger.exception(error)
return False
if not _save_and_restart(plugin, "deployed"):
return False
2015-07-22 21:25:09 -04:00
success = True
time.sleep(3)
2015-07-22 21:25:09 -04:00
for domain in domains:
verified = validator.Validator().certificate(
cert, domain, "127.0.0.1", plugin.https_port)
if not verified:
logger.error("**** Could not verify certificate for domain %s", domain)
2015-07-22 21:25:09 -04:00
success = False
if success:
logger.info("HTTPS validation succeeded")
return success
def test_enhancements(plugin, domains):
2015-07-23 20:09:09 -04:00
"""Tests supported enhancements returning True if successful"""
2015-07-22 21:25:09 -04:00
supported = plugin.supported_enhancements()
if "redirect" not in supported:
logger.error("The plugin and this program support no common "
"enhancements")
return False
2015-07-22 21:25:09 -04:00
domains_and_info = [(domain, []) for domain in domains]
for domain, info in domains_and_info:
2015-07-23 20:09:09 -04:00
try:
previous_redirect = validator.Validator().any_redirect(
"localhost", plugin.http_port, headers={"Host": domain})
info.append(previous_redirect)
2015-07-23 20:09:09 -04:00
plugin.enhance(domain, "redirect")
2016-06-06 17:17:11 -04:00
plugin.save() # Needed by the Apache plugin
2015-08-04 15:18:10 -04:00
except le_errors.PluginError as error:
2015-08-03 21:32:32 -04:00
# Don't immediately fail because a redirect may already be enabled
logger.warning("*** Plugin failed to enable redirect for %s:", domain)
2015-08-03 21:32:32 -04:00
logger.warning("%s", error)
2015-08-04 15:18:10 -04:00
except le_errors.Error as error:
logger.error("*** An error occurred while enabling redirect for %s:",
2015-08-04 15:18:10 -04:00
domain)
logger.exception(error)
2015-07-22 21:25:09 -04:00
2015-07-23 20:09:09 -04:00
if not _save_and_restart(plugin, "enhanced"):
return False
2015-07-21 21:14:57 -04:00
2015-07-22 21:25:09 -04:00
success = True
for domain, info in domains_and_info:
previous_redirect = info[0]
if not previous_redirect:
verified = validator.Validator().redirect(
"localhost", plugin.http_port, headers={"Host": domain})
if not verified:
logger.error("*** Improper redirect for domain %s", domain)
success = False
2015-07-22 21:25:09 -04:00
if success:
logger.info("Enhancements test succeeded")
2015-07-22 21:25:09 -04:00
2015-07-23 20:09:09 -04:00
return success
def _save_and_restart(plugin, title=None):
"""Saves and restart the plugin, returning True if no errors occurred"""
try:
plugin.save(title)
plugin.restart()
return True
except le_errors.Error as error:
logger.error("*** Plugin failed to save and restart server:")
2015-07-23 20:09:09 -04:00
logger.exception(error)
return False
2015-07-22 21:25:09 -04:00
def test_rollback(plugin, config, backup):
"""Tests the rollback checkpoints function"""
2015-07-23 20:09:09 -04:00
try:
2015-08-03 14:38:22 -04:00
plugin.rollback_checkpoints(1337)
2015-07-23 20:09:09 -04:00
except le_errors.Error as error:
logger.error("*** Plugin raised an exception during rollback:")
2015-07-23 20:09:09 -04:00
logger.exception(error)
return False
2015-07-22 21:25:09 -04:00
if _dirs_are_unequal(config, backup):
logger.error("*** Rollback failed for config `%s`", config)
2015-07-23 20:09:09 -04:00
return False
2015-07-22 21:25:09 -04:00
else:
logger.info("Rollback succeeded")
2015-07-23 20:09:09 -04:00
return True
2015-07-22 21:25:09 -04:00
def _create_backup(config, temp_dir):
"""Creates a backup of config in temp_dir"""
backup = os.path.join(temp_dir, "backup")
shutil.rmtree(backup, ignore_errors=True)
shutil.copytree(config, backup, symlinks=True)
return backup
2015-07-21 21:14:57 -04:00
def _dirs_are_unequal(dir1, dir2):
2015-08-03 22:23:01 -04:00
"""Returns True if dir1 and dir2 are unequal"""
dircmps = [filecmp.dircmp(dir1, dir2)]
while len(dircmps):
dircmp = dircmps.pop()
if dircmp.left_only or dircmp.right_only:
logger.error("The following files and directories are only "
"present in one directory")
if dircmp.left_only:
logger.error(dircmp.left_only)
else:
logger.error(dircmp.right_only)
return True
elif dircmp.common_funny or dircmp.funny_files:
logger.error("The following files and directories could not be "
"compared:")
if dircmp.common_funny:
logger.error(dircmp.common_funny)
else:
logger.error(dircmp.funny_files)
return True
elif dircmp.diff_files:
logger.error("The following files differ:")
logger.error(dircmp.diff_files)
return True
for subdir in dircmp.subdirs.itervalues():
dircmps.append(subdir)
2015-07-21 21:14:57 -04:00
2015-08-03 22:23:01 -04:00
return False
2015-07-21 21:14:57 -04:00
2015-07-14 21:04:43 -04:00
def get_args():
"""Returns parsed command line arguments."""
2015-07-16 19:57:47 -04:00
parser = argparse.ArgumentParser(
description=DESCRIPTION,
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
2015-07-14 21:04:43 -04:00
2015-07-16 02:00:18 -04:00
group = parser.add_argument_group("general")
2015-07-14 21:04:43 -04:00
group.add_argument(
2015-07-16 02:00:18 -04:00
"-c", "--configs", default="configs.tar.gz",
help="a directory or tarball containing server configurations")
2015-07-14 21:04:43 -04:00
group.add_argument(
2015-07-16 02:00:18 -04:00
"-p", "--plugin", default="apache", help="the plugin to be tested")
2015-07-17 19:21:17 -04:00
group.add_argument(
"-v", "--verbose", dest="verbose_count", action="count",
2015-07-17 20:33:32 -04:00
default=0, help="you know how to use this")
2015-07-14 21:04:43 -04:00
group.add_argument(
2015-07-16 02:00:18 -04:00
"-a", "--auth", action="store_true",
2015-07-16 19:57:47 -04:00
help="tests the challenges the plugin supports")
2015-07-14 21:04:43 -04:00
group.add_argument(
2015-07-16 02:00:18 -04:00
"-i", "--install", action="store_true",
help="tests the plugin as an installer")
2015-07-14 21:04:43 -04:00
group.add_argument(
2015-07-16 19:57:47 -04:00
"-e", "--enhance", action="store_true", help="tests the enhancements "
"the plugin supports (implicitly includes installer tests)")
2015-07-14 21:04:43 -04:00
for plugin in PLUGINS.itervalues():
plugin.add_parser_arguments(parser)
args = parser.parse_args()
2015-07-16 19:57:47 -04:00
if args.enhance:
2015-07-14 21:04:43 -04:00
args.install = True
elif not (args.auth or args.install):
2015-07-22 21:25:09 -04:00
args.auth = args.install = args.enhance = True
2015-07-14 21:04:43 -04:00
return args
2015-07-17 19:21:17 -04:00
def setup_logging(args):
2015-07-14 21:04:43 -04:00
"""Prepares logging for the program"""
handler = logging.StreamHandler()
root_logger = logging.getLogger()
2015-08-03 14:38:22 -04:00
root_logger.setLevel(logging.ERROR - args.verbose_count * 10)
2015-07-14 21:04:43 -04:00
root_logger.addHandler(handler)
2015-07-10 14:42:10 -04:00
def main():
"""Main test script execution."""
2015-07-14 21:04:43 -04:00
args = get_args()
2015-07-17 19:21:17 -04:00
setup_logging(args)
2015-07-16 02:00:18 -04:00
if args.plugin not in PLUGINS:
raise errors.Error("Unknown plugin {0}".format(args.plugin))
2015-07-17 19:21:17 -04:00
2015-07-21 21:14:57 -04:00
temp_dir = tempfile.mkdtemp()
2015-07-17 19:21:17 -04:00
plugin = PLUGINS[args.plugin](args)
2015-07-16 19:57:47 -04:00
try:
overall_success = True
2015-07-17 19:21:17 -04:00
while plugin.has_more_configs():
2015-07-23 20:09:09 -04:00
success = True
2015-07-17 19:21:17 -04:00
try:
2015-07-21 21:14:57 -04:00
config = plugin.load_config()
logger.info("Loaded configuration: %s", config)
if args.auth:
2015-07-23 20:09:09 -04:00
success = test_authenticator(plugin, config, temp_dir)
if success and args.install:
success = test_installer(args, plugin, config, temp_dir)
2015-07-17 19:21:17 -04:00
except errors.Error as error:
2015-07-23 20:09:09 -04:00
logger.error("Tests on %s raised:", config)
logger.exception(error)
success = False
if success:
logger.info("All tests on %s succeeded", config)
else:
overall_success = False
2015-07-23 20:09:09 -04:00
logger.error("Tests on %s failed", config)
2015-07-16 19:57:47 -04:00
finally:
2015-07-17 19:21:17 -04:00
plugin.cleanup_from_tests()
2015-07-10 14:42:10 -04:00
if overall_success:
logger.warning("All compatibility tests succeeded")
sys.exit(0)
else:
logger.warning("One or more compatibility tests failed")
sys.exit(1)
2015-07-10 14:42:10 -04:00
2015-07-16 02:00:18 -04:00
if __name__ == "__main__":
2015-07-14 21:04:43 -04:00
main()