mirror of
https://github.com/certbot/certbot.git
synced 2026-06-08 08:12:15 -04:00
commit
a314b40101
26 changed files with 1378 additions and 18 deletions
|
|
@ -337,15 +337,16 @@ class DVSNIResponse(ChallengeResponse):
|
|||
:param unicode domain:
|
||||
|
||||
"""
|
||||
host = socket.gethostbyname(domain)
|
||||
logging.debug('%s resolved to %s', domain, host)
|
||||
if "host" not in kwargs:
|
||||
host = socket.gethostbyname(domain)
|
||||
logging.debug('%s resolved to %s', domain, host)
|
||||
kwargs["host"] = host
|
||||
|
||||
kwargs.setdefault("host", host)
|
||||
kwargs.setdefault("port", self.PORT)
|
||||
kwargs["name"] = self.z_domain
|
||||
# TODO: try different methods?
|
||||
# pylint: disable=protected-access
|
||||
return crypto_util._probe_sni(**kwargs)
|
||||
return crypto_util.probe_sni(**kwargs)
|
||||
|
||||
def verify_cert(self, cert):
|
||||
"""Verify DVSNI challenge certificate."""
|
||||
|
|
|
|||
|
|
@ -266,7 +266,7 @@ class DVSNIResponseTest(unittest.TestCase):
|
|||
hash(DVSNIResponse.from_json(self.jmsg_from))
|
||||
|
||||
@mock.patch('acme.challenges.socket.gethostbyname')
|
||||
@mock.patch('acme.challenges.crypto_util._probe_sni')
|
||||
@mock.patch('acme.challenges.crypto_util.probe_sni')
|
||||
def test_probe_cert(self, mock_probe_sni, mock_gethostbyname):
|
||||
mock_gethostbyname.return_value = '127.0.0.1'
|
||||
self.msg.probe_cert('foo.com')
|
||||
|
|
|
|||
|
|
@ -69,8 +69,8 @@ def _serve_sni(certs, sock, reuseaddr=True, method=_DEFAULT_DVSNI_SSL_METHOD,
|
|||
raise errors.Error(error)
|
||||
|
||||
|
||||
def _probe_sni(name, host, port=443, timeout=300,
|
||||
method=_DEFAULT_DVSNI_SSL_METHOD, source_address=('0', 0)):
|
||||
def probe_sni(name, host, port=443, timeout=300,
|
||||
method=_DEFAULT_DVSNI_SSL_METHOD, source_address=('0', 0)):
|
||||
"""Probe SNI server for SSL certificate.
|
||||
|
||||
:param bytes name: Byte string to send as the server name in the
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ from acme import test_util
|
|||
|
||||
|
||||
class ServeProbeSNITest(unittest.TestCase):
|
||||
"""Tests for acme.crypto_util._serve_sni/_probe_sni."""
|
||||
"""Tests for acme.crypto_util._serve_sni/probe_sni."""
|
||||
|
||||
def setUp(self):
|
||||
self.cert = test_util.load_cert('cert.pem')
|
||||
|
|
@ -45,8 +45,8 @@ class ServeProbeSNITest(unittest.TestCase):
|
|||
self.server.join()
|
||||
|
||||
def _probe(self, name):
|
||||
from acme.crypto_util import _probe_sni
|
||||
return jose.ComparableX509(_probe_sni(
|
||||
from acme.crypto_util import probe_sni
|
||||
return jose.ComparableX509(probe_sni(
|
||||
name, host='127.0.0.1', port=self.port))
|
||||
|
||||
def test_probe_ok(self):
|
||||
|
|
|
|||
1
letsencrypt-compatibility-test/MANIFEST.in
Normal file
1
letsencrypt-compatibility-test/MANIFEST.in
Normal file
|
|
@ -0,0 +1 @@
|
|||
recursive-include letsencrypt_compatibility_test/testdata *
|
||||
|
|
@ -0,0 +1 @@
|
|||
"""Let's Encrypt compatibility test"""
|
||||
|
|
@ -0,0 +1 @@
|
|||
"""Let's Encrypt compatibility test configurators"""
|
||||
|
|
@ -0,0 +1,20 @@
|
|||
FROM httpd
|
||||
MAINTAINER Brad Warren <bradmw@umich.edu>
|
||||
|
||||
RUN mkdir /var/run/apache2
|
||||
|
||||
ENV APACHE_RUN_USER=daemon \
|
||||
APACHE_RUN_GROUP=daemon \
|
||||
APACHE_PID_FILE=/usr/local/apache2/logs/httpd.pid \
|
||||
APACHE_RUN_DIR=/var/run/apache2 \
|
||||
APACHE_LOCK_DIR=/var/lock \
|
||||
APACHE_LOG_DIR=/usr/local/apache2/logs
|
||||
|
||||
COPY letsencrypt-compatibility-test/letsencrypt_compatibility_test/configurators/apache/a2enmod.sh /usr/local/bin/
|
||||
COPY letsencrypt-compatibility-test/letsencrypt_compatibility_test/configurators/apache/a2dismod.sh /usr/local/bin/
|
||||
COPY letsencrypt-compatibility-test/letsencrypt_compatibility_test/testdata/rsa1024_key2.pem /usr/local/apache2/conf/
|
||||
COPY letsencrypt-compatibility-test/letsencrypt_compatibility_test/testdata/empty_cert.pem /usr/local/apache2/conf/
|
||||
|
||||
# Note: this only exposes the port to other docker containers. You
|
||||
# still have to bind to 443@host at runtime.
|
||||
EXPOSE 443
|
||||
|
|
@ -0,0 +1 @@
|
|||
"""Let's Encrypt compatibility test Apache configurators"""
|
||||
|
|
@ -0,0 +1,14 @@
|
|||
#!/bin/bash
|
||||
# An extremely simplified version of `a2enmod` for disabling modules in the
|
||||
# httpd docker image. First argument is the server_root and the second is the
|
||||
# module to be disabled.
|
||||
|
||||
apache_confdir=$1
|
||||
module=$2
|
||||
|
||||
sed -i "/.*"$module".*/d" "$apache_confdir/test.conf"
|
||||
enabled_conf="$apache_confdir/mods-enabled/"$module".conf"
|
||||
if [ -e "$enabled_conf" ]
|
||||
then
|
||||
rm $enabled_conf
|
||||
fi
|
||||
|
|
@ -0,0 +1,33 @@
|
|||
#!/bin/bash
|
||||
# An extremely simplified version of `a2enmod` for enabling modules in the
|
||||
# httpd docker image. First argument is the server_root and the second is the
|
||||
# module to be enabled.
|
||||
|
||||
APACHE_CONFDIR=$1
|
||||
|
||||
enable () {
|
||||
echo "LoadModule "$1"_module /usr/local/apache2/modules/mod_"$1".so" >> \
|
||||
$APACHE_CONFDIR"/test.conf"
|
||||
available_base="/mods-available/"$1".conf"
|
||||
available_conf=$APACHE_CONFDIR$available_base
|
||||
enabled_dir=$APACHE_CONFDIR"/mods-enabled"
|
||||
enabled_conf=$enabled_dir"/"$1".conf"
|
||||
if [ -e "$available_conf" -a -d "$enabled_dir" -a ! -e "$enabled_conf" ]
|
||||
then
|
||||
ln -s "..$available_base" $enabled_conf
|
||||
fi
|
||||
}
|
||||
|
||||
if [ $2 == "ssl" ]
|
||||
then
|
||||
# Enables ssl and all its dependencies
|
||||
enable "setenvif"
|
||||
enable "mime"
|
||||
enable "socache_shmcb"
|
||||
enable "ssl"
|
||||
elif [ $2 == "rewrite" ]
|
||||
then
|
||||
enable "rewrite"
|
||||
else
|
||||
exit 1
|
||||
fi
|
||||
|
|
@ -0,0 +1,64 @@
|
|||
"""Proxies ApacheConfigurator for Apache 2.4 tests"""
|
||||
|
||||
import zope.interface
|
||||
|
||||
from letsencrypt_compatibility_test import errors
|
||||
from letsencrypt_compatibility_test import interfaces
|
||||
from letsencrypt_compatibility_test.configurators.apache import common as apache_common
|
||||
|
||||
|
||||
# The docker image doesn't actually have the watchdog module, but unless the
|
||||
# config uses mod_heartbeat or mod_heartmonitor (which aren't installed and
|
||||
# therefore the config won't be loaded), I believe this isn't a problem
|
||||
# http://httpd.apache.org/docs/2.4/mod/mod_watchdog.html
|
||||
STATIC_MODULES = {"core", "so", "http", "mpm_event", "watchdog",}
|
||||
|
||||
|
||||
SHARED_MODULES = {
|
||||
"log_config", "logio", "version", "unixd", "access_compat", "actions",
|
||||
"alias", "allowmethods", "auth_basic", "auth_digest", "auth_form",
|
||||
"authn_anon", "authn_core", "authn_dbd", "authn_dbm", "authn_file",
|
||||
"authn_socache", "authnz_ldap", "authz_core", "authz_dbd", "authz_dbm",
|
||||
"authz_groupfile", "authz_host", "authz_owner", "authz_user", "autoindex",
|
||||
"buffer", "cache", "cache_disk", "cache_socache", "cgid", "dav", "dav_fs",
|
||||
"dbd", "deflate", "dir", "dumpio", "env", "expires", "ext_filter",
|
||||
"file_cache", "filter", "headers", "include", "info", "lbmethod_bybusyness",
|
||||
"lbmethod_byrequests", "lbmethod_bytraffic", "lbmethod_heartbeat", "ldap",
|
||||
"log_debug", "macro", "mime", "negotiation", "proxy", "proxy_ajp",
|
||||
"proxy_balancer", "proxy_connect", "proxy_express", "proxy_fcgi",
|
||||
"proxy_ftp", "proxy_http", "proxy_scgi", "proxy_wstunnel", "ratelimit",
|
||||
"remoteip", "reqtimeout", "request", "rewrite", "sed", "session",
|
||||
"session_cookie", "session_crypto", "session_dbd", "setenvif",
|
||||
"slotmem_shm", "socache_dbm", "socache_memcache", "socache_shmcb",
|
||||
"speling", "ssl", "status", "substitute", "unique_id", "userdir",
|
||||
"vhost_alias",}
|
||||
|
||||
|
||||
class Proxy(apache_common.Proxy):
|
||||
"""Wraps the ApacheConfigurator for Apache 2.4 tests"""
|
||||
|
||||
zope.interface.implements(interfaces.IConfiguratorProxy)
|
||||
|
||||
def __init__(self, args):
|
||||
"""Initializes the plugin with the given command line args"""
|
||||
super(Proxy, self).__init__(args)
|
||||
# Running init isn't ideal, but the Docker container needs to survive
|
||||
# Apache restarts
|
||||
self.start_docker("bradmw/apache2.4", "init")
|
||||
|
||||
def preprocess_config(self, server_root):
|
||||
"""Prepares the configuration for use in the Docker"""
|
||||
super(Proxy, self).preprocess_config(server_root)
|
||||
if self.version[1] != 4:
|
||||
raise errors.Error("Apache version not 2.4")
|
||||
|
||||
with open(self.test_conf, "a") as f:
|
||||
for module in self.modules:
|
||||
if module not in STATIC_MODULES:
|
||||
if module in SHARED_MODULES:
|
||||
f.write(
|
||||
"LoadModule {0}_module /usr/local/apache2/modules/"
|
||||
"mod_{0}.so\n".format(module))
|
||||
else:
|
||||
raise errors.Error(
|
||||
"Unsupported module {0}".format(module))
|
||||
|
|
@ -0,0 +1,284 @@
|
|||
"""Provides a common base for Apache proxies"""
|
||||
import re
|
||||
import os
|
||||
import subprocess
|
||||
|
||||
import mock
|
||||
import zope.interface
|
||||
|
||||
from letsencrypt import configuration
|
||||
from letsencrypt import errors as le_errors
|
||||
from letsencrypt_apache import configurator
|
||||
from letsencrypt_compatibility_test import errors
|
||||
from letsencrypt_compatibility_test import interfaces
|
||||
from letsencrypt_compatibility_test import util
|
||||
from letsencrypt_compatibility_test.configurators import common as configurators_common
|
||||
|
||||
|
||||
APACHE_VERSION_REGEX = re.compile(r"Apache/([0-9\.]*)", re.IGNORECASE)
|
||||
APACHE_COMMANDS = ["apachectl", "a2enmod", "a2dismod"]
|
||||
|
||||
|
||||
class Proxy(configurators_common.Proxy):
|
||||
# pylint: disable=too-many-instance-attributes
|
||||
"""A common base for Apache test configurators"""
|
||||
|
||||
zope.interface.implements(interfaces.IConfiguratorProxy)
|
||||
|
||||
def __init__(self, args):
|
||||
"""Initializes the plugin with the given command line args"""
|
||||
super(Proxy, self).__init__(args)
|
||||
self.le_config.apache_le_vhost_ext = "-le-ssl.conf"
|
||||
|
||||
self._setup_mock()
|
||||
|
||||
self.modules = self.server_root = self.test_conf = self.version = None
|
||||
self._apache_configurator = self._all_names = self._test_names = None
|
||||
|
||||
def _setup_mock(self):
|
||||
"""Replaces specific modules with mock.MagicMock"""
|
||||
mock_subprocess = mock.MagicMock()
|
||||
mock_subprocess.check_call = self.check_call
|
||||
mock_subprocess.Popen = self.popen
|
||||
|
||||
mock.patch(
|
||||
"letsencrypt_apache.configurator.subprocess",
|
||||
mock_subprocess).start()
|
||||
mock.patch(
|
||||
"letsencrypt_apache.parser.subprocess",
|
||||
mock_subprocess).start()
|
||||
mock.patch(
|
||||
"letsencrypt.le_util.subprocess",
|
||||
mock_subprocess).start()
|
||||
mock.patch(
|
||||
"letsencrypt_apache.configurator.le_util.exe_exists",
|
||||
_is_apache_command).start()
|
||||
|
||||
patch = mock.patch(
|
||||
"letsencrypt_apache.configurator.display_ops.select_vhost")
|
||||
mock_display = patch.start()
|
||||
mock_display.side_effect = le_errors.PluginError(
|
||||
"Unable to determine vhost")
|
||||
|
||||
def check_call(self, command, *args, **kwargs):
|
||||
"""If command is an Apache command, command is executed in the
|
||||
running docker image. Otherwise, subprocess.check_call is used.
|
||||
|
||||
"""
|
||||
if _is_apache_command(command):
|
||||
command = _modify_command(command)
|
||||
return super(Proxy, self).check_call(command, *args, **kwargs)
|
||||
else:
|
||||
return subprocess.check_call(command, *args, **kwargs)
|
||||
|
||||
def popen(self, command, *args, **kwargs):
|
||||
"""If command is an Apache command, command is executed in the
|
||||
running docker image. Otherwise, subprocess.Popen is used.
|
||||
|
||||
"""
|
||||
if _is_apache_command(command):
|
||||
command = _modify_command(command)
|
||||
return super(Proxy, self).popen(command, *args, **kwargs)
|
||||
else:
|
||||
return subprocess.Popen(command, *args, **kwargs)
|
||||
|
||||
def __getattr__(self, name):
|
||||
"""Wraps the Apache Configurator methods"""
|
||||
method = getattr(self._apache_configurator, name, None)
|
||||
if callable(method):
|
||||
return method
|
||||
else:
|
||||
raise AttributeError()
|
||||
|
||||
def load_config(self):
|
||||
"""Loads the next configuration for the plugin to test"""
|
||||
if hasattr(self.le_config, "apache_init_script"):
|
||||
try:
|
||||
self.check_call([self.le_config.apache_init_script, "stop"])
|
||||
except errors.Error:
|
||||
raise errors.Error(
|
||||
"Failed to stop previous apache config from running")
|
||||
|
||||
config = super(Proxy, self).load_config()
|
||||
self.modules = _get_modules(config)
|
||||
self.version = _get_version(config)
|
||||
self._all_names, self._test_names = _get_names(config)
|
||||
|
||||
server_root = _get_server_root(config)
|
||||
with open(os.path.join(config, "config_file")) as f:
|
||||
config_file = os.path.join(server_root, f.readline().rstrip())
|
||||
self.test_conf = _create_test_conf(server_root, config_file)
|
||||
|
||||
self.preprocess_config(server_root)
|
||||
self._prepare_configurator(server_root, config_file)
|
||||
|
||||
try:
|
||||
self.check_call("apachectl -d {0} -f {1} -k start".format(
|
||||
server_root, config_file))
|
||||
except errors.Error:
|
||||
raise errors.Error(
|
||||
"Apache failed to load {0} before tests started".format(
|
||||
config))
|
||||
|
||||
return config
|
||||
|
||||
def preprocess_config(self, server_root):
|
||||
# pylint: disable=anomalous-backslash-in-string, no-self-use
|
||||
"""Prepares the configuration for use in the Docker"""
|
||||
|
||||
find = subprocess.Popen(
|
||||
["find", server_root, "-type", "f"],
|
||||
stdout=subprocess.PIPE)
|
||||
subprocess.check_call([
|
||||
"xargs", "sed", "-e", "s/DocumentRoot.*/DocumentRoot "
|
||||
"\/usr\/local\/apache2\/htdocs/I",
|
||||
"-e", "s/SSLPassPhraseDialog.*/SSLPassPhraseDialog builtin/I",
|
||||
"-e", "s/TypesConfig.*/TypesConfig "
|
||||
"\/usr\/local\/apache2\/conf\/mime.types/I",
|
||||
"-e", "s/LoadModule/#LoadModule/I",
|
||||
"-e", "s/SSLCertificateFile.*/SSLCertificateFile "
|
||||
"\/usr\/local\/apache2\/conf\/empty_cert.pem/I",
|
||||
"-e", "s/SSLCertificateKeyFile.*/SSLCertificateKeyFile "
|
||||
"\/usr\/local\/apache2\/conf\/rsa1024_key2.pem/I",
|
||||
"-i"], stdin=find.stdout)
|
||||
|
||||
def _prepare_configurator(self, server_root, config_file):
|
||||
"""Prepares the Apache plugin for testing"""
|
||||
self.le_config.apache_server_root = server_root
|
||||
self.le_config.apache_ctl = "apachectl -d {0} -f {1}".format(
|
||||
server_root, config_file)
|
||||
self.le_config.apache_enmod = "a2enmod.sh {0}".format(server_root)
|
||||
self.le_config.apache_dismod = "a2dismod.sh {0}".format(server_root)
|
||||
self.le_config.apache_init_script = self.le_config.apache_ctl + " -k"
|
||||
|
||||
self._apache_configurator = configurator.ApacheConfigurator(
|
||||
config=configuration.NamespaceConfig(self.le_config),
|
||||
name="apache")
|
||||
self._apache_configurator.prepare()
|
||||
|
||||
def cleanup_from_tests(self):
|
||||
"""Performs any necessary cleanup from running plugin tests"""
|
||||
super(Proxy, self).cleanup_from_tests()
|
||||
mock.patch.stopall()
|
||||
|
||||
def get_all_names_answer(self):
|
||||
"""Returns the set of domain names that the plugin should find"""
|
||||
if self._all_names:
|
||||
return self._all_names
|
||||
else:
|
||||
raise errors.Error("No configuration file loaded")
|
||||
|
||||
def get_testable_domain_names(self):
|
||||
"""Returns the set of domain names that can be tested against"""
|
||||
if self._test_names:
|
||||
return self._test_names
|
||||
else:
|
||||
return {"example.com"}
|
||||
|
||||
def deploy_cert(self, domain, cert_path, key_path, chain_path=None):
|
||||
"""Installs cert"""
|
||||
cert_path, key_path, chain_path = self.copy_certs_and_keys(
|
||||
cert_path, key_path, chain_path)
|
||||
self._apache_configurator.deploy_cert(
|
||||
domain, cert_path, key_path, chain_path)
|
||||
|
||||
|
||||
def _is_apache_command(command):
|
||||
"""Returns true if command is an Apache command"""
|
||||
if isinstance(command, list):
|
||||
command = command[0]
|
||||
|
||||
for apache_command in APACHE_COMMANDS:
|
||||
if command.startswith(apache_command):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def _modify_command(command):
|
||||
"""Modifies command so configtest works inside the docker image"""
|
||||
if isinstance(command, list):
|
||||
for i in xrange(len(command)):
|
||||
if command[i] == "configtest":
|
||||
command[i] = "-t"
|
||||
else:
|
||||
command = command.replace("configtest", "-t")
|
||||
|
||||
return command
|
||||
|
||||
|
||||
def _create_test_conf(server_root, apache_config):
|
||||
"""Creates a test config file and adds it to the Apache config"""
|
||||
test_conf = os.path.join(server_root, "test.conf")
|
||||
open(test_conf, "w").close()
|
||||
subprocess.check_call(
|
||||
["sed", "-i", "1iInclude test.conf", apache_config])
|
||||
return test_conf
|
||||
|
||||
|
||||
def _get_server_root(config):
|
||||
"""Returns the server root directory in config"""
|
||||
subdirs = [
|
||||
name for name in os.listdir(config)
|
||||
if os.path.isdir(os.path.join(config, name))]
|
||||
|
||||
if len(subdirs) != 1:
|
||||
errors.Error("Malformed configuration directiory {0}".format(config))
|
||||
|
||||
return os.path.join(config, subdirs[0].rstrip())
|
||||
|
||||
|
||||
def _get_names(config):
|
||||
"""Returns all and testable domain names in config"""
|
||||
all_names = set()
|
||||
non_ip_names = set()
|
||||
with open(os.path.join(config, "vhosts")) as f:
|
||||
for line in f:
|
||||
# If parsing a specific vhost
|
||||
if line[0].isspace():
|
||||
words = line.split()
|
||||
if words[0] == "alias":
|
||||
all_names.add(words[1])
|
||||
non_ip_names.add(words[1])
|
||||
# If for port 80 and not IP vhost
|
||||
elif words[1] == "80" and not util.IP_REGEX.match(words[3]):
|
||||
all_names.add(words[3])
|
||||
non_ip_names.add(words[3])
|
||||
elif "NameVirtualHost" not in line:
|
||||
words = line.split()
|
||||
if (words[0].endswith("*") or words[0].endswith("80") and
|
||||
not util.IP_REGEX.match(words[1]) and
|
||||
words[1].find(".") != -1):
|
||||
all_names.add(words[1])
|
||||
return all_names, non_ip_names
|
||||
|
||||
|
||||
def _get_modules(config):
|
||||
"""Returns the list of modules found in module_list"""
|
||||
modules = []
|
||||
with open(os.path.join(config, "modules")) as f:
|
||||
for line in f:
|
||||
# Modules list is indented, everything else is headers/footers
|
||||
if line[0].isspace():
|
||||
words = line.split()
|
||||
# Modules redundantly end in "_module" which we can discard
|
||||
modules.append(words[0][:-7])
|
||||
|
||||
return modules
|
||||
|
||||
|
||||
def _get_version(config):
|
||||
"""Return version of Apache Server.
|
||||
|
||||
Version is returned as tuple. (ie. 2.4.7 = (2, 4, 7)). Code taken from
|
||||
the Apache plugin.
|
||||
|
||||
"""
|
||||
with open(os.path.join(config, "version")) as f:
|
||||
# Should be on first line of input
|
||||
matches = APACHE_VERSION_REGEX.findall(f.readline())
|
||||
|
||||
if len(matches) != 1:
|
||||
raise errors.Error("Unable to find Apache version")
|
||||
|
||||
return tuple([int(i) for i in matches[0].split(".")])
|
||||
|
|
@ -0,0 +1,146 @@
|
|||
"""Provides a common base for configurator proxies"""
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
|
||||
import docker
|
||||
|
||||
from letsencrypt import constants
|
||||
from letsencrypt_compatibility_test import errors
|
||||
from letsencrypt_compatibility_test import util
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Proxy(object):
|
||||
# pylint: disable=too-many-instance-attributes
|
||||
"""A common base for compatibility test configurators"""
|
||||
|
||||
_NOT_ADDED_ARGS = True
|
||||
|
||||
@classmethod
|
||||
def add_parser_arguments(cls, parser):
|
||||
"""Adds command line arguments needed by the plugin"""
|
||||
if Proxy._NOT_ADDED_ARGS:
|
||||
group = parser.add_argument_group("docker")
|
||||
group.add_argument(
|
||||
"--docker-url", default="unix://var/run/docker.sock",
|
||||
help="URL of the docker server")
|
||||
group.add_argument(
|
||||
"--no-remove", action="store_true",
|
||||
help="do not delete container on program exit")
|
||||
Proxy._NOT_ADDED_ARGS = False
|
||||
|
||||
def __init__(self, args):
|
||||
"""Initializes the plugin with the given command line args"""
|
||||
self._temp_dir = tempfile.mkdtemp()
|
||||
self.le_config = util.create_le_config(self._temp_dir)
|
||||
config_dir = util.extract_configs(args.configs, self._temp_dir)
|
||||
self._configs = [
|
||||
os.path.join(config_dir, config)
|
||||
for config in os.listdir(config_dir)]
|
||||
|
||||
self.args = args
|
||||
self._docker_client = docker.Client(
|
||||
base_url=self.args.docker_url, version="auto")
|
||||
self.http_port, self.https_port = util.get_two_free_ports()
|
||||
self._container_id = None
|
||||
|
||||
def has_more_configs(self):
|
||||
"""Returns true if there are more configs to test"""
|
||||
return bool(self._configs)
|
||||
|
||||
def cleanup_from_tests(self):
|
||||
"""Performs any necessary cleanup from running plugin tests"""
|
||||
self._docker_client.stop(self._container_id, 0)
|
||||
if not self.args.no_remove:
|
||||
self._docker_client.remove_container(self._container_id)
|
||||
|
||||
def load_config(self):
|
||||
"""Returns the next config directory to be tested"""
|
||||
shutil.rmtree(self.le_config.work_dir, ignore_errors=True)
|
||||
backup = os.path.join(self.le_config.work_dir, constants.BACKUP_DIR)
|
||||
os.makedirs(backup)
|
||||
return self._configs.pop()
|
||||
|
||||
def start_docker(self, image_name, command):
|
||||
"""Creates and runs a Docker container with the specified image"""
|
||||
logger.warning("Pulling Docker image. This may take a minute.")
|
||||
for line in self._docker_client.pull(image_name, stream=True):
|
||||
logger.debug(line)
|
||||
|
||||
host_config = docker.utils.create_host_config(
|
||||
binds={
|
||||
self._temp_dir : {"bind" : self._temp_dir, "mode" : "rw"}},
|
||||
port_bindings={
|
||||
80 : ("127.0.0.1", self.http_port),
|
||||
443 : ("127.0.0.1", self.https_port)},)
|
||||
container = self._docker_client.create_container(
|
||||
image_name, command, ports=[80, 443], volumes=self._temp_dir,
|
||||
host_config=host_config)
|
||||
if container["Warnings"]:
|
||||
logger.warning(container["Warnings"])
|
||||
self._container_id = container["Id"]
|
||||
self._docker_client.start(self._container_id)
|
||||
|
||||
def check_call(self, command, *args, **kwargs):
|
||||
# pylint: disable=unused-argument
|
||||
"""Simulates a call to check_call but executes the command in the
|
||||
running docker image
|
||||
|
||||
"""
|
||||
if self.popen(command).returncode:
|
||||
raise errors.Error(
|
||||
"{0} exited with a nonzero value".format(command))
|
||||
|
||||
def popen(self, command, *args, **kwargs):
|
||||
# pylint: disable=unused-argument
|
||||
"""Simulates a call to Popen but executes the command in the
|
||||
running docker image
|
||||
|
||||
"""
|
||||
class SimplePopen(object):
|
||||
# pylint: disable=too-few-public-methods
|
||||
"""Simplified Popen object"""
|
||||
def __init__(self, returncode, output):
|
||||
self.returncode = returncode
|
||||
self._stdout = output
|
||||
self._stderr = output
|
||||
|
||||
def communicate(self):
|
||||
"""Returns stdout and stderr"""
|
||||
return self._stdout, self._stderr
|
||||
|
||||
if isinstance(command, list):
|
||||
command = " ".join(command)
|
||||
|
||||
returncode, output = self.execute_in_docker(command)
|
||||
return SimplePopen(returncode, output)
|
||||
|
||||
def execute_in_docker(self, command):
|
||||
"""Executes command inside the running docker image"""
|
||||
logger.debug("Executing '%s'", command)
|
||||
exec_id = self._docker_client.exec_create(self._container_id, command)
|
||||
output = self._docker_client.exec_start(exec_id)
|
||||
returncode = self._docker_client.exec_inspect(exec_id)["ExitCode"]
|
||||
return returncode, output
|
||||
|
||||
def copy_certs_and_keys(self, cert_path, key_path, chain_path=None):
|
||||
"""Copies certs and keys into the temporary directory"""
|
||||
cert_and_key_dir = os.path.join(self._temp_dir, "certs_and_keys")
|
||||
if not os.path.isdir(cert_and_key_dir):
|
||||
os.mkdir(cert_and_key_dir)
|
||||
|
||||
cert = os.path.join(cert_and_key_dir, "cert")
|
||||
shutil.copy(cert_path, cert)
|
||||
key = os.path.join(cert_and_key_dir, "key")
|
||||
shutil.copy(key_path, key)
|
||||
if chain_path:
|
||||
chain = os.path.join(cert_and_key_dir, "chain")
|
||||
shutil.copy(chain_path, chain)
|
||||
else:
|
||||
chain = None
|
||||
|
||||
return cert, key, chain
|
||||
|
|
@ -0,0 +1,5 @@
|
|||
"""Let's Encrypt compatibility test errors"""
|
||||
|
||||
|
||||
class Error(Exception):
|
||||
"""Generic Let's Encrypt compatibility test error"""
|
||||
|
|
@ -0,0 +1,52 @@
|
|||
"""Let's Encrypt compatibility test interfaces"""
|
||||
import zope.interface
|
||||
|
||||
import letsencrypt.interfaces
|
||||
|
||||
# pylint: disable=no-self-argument,no-method-argument
|
||||
|
||||
|
||||
class IPluginProxy(zope.interface.Interface):
|
||||
"""Wraps a Let's Encrypt plugin"""
|
||||
http_port = zope.interface.Attribute(
|
||||
"The port to connect to on localhost for HTTP traffic")
|
||||
|
||||
https_port = zope.interface.Attribute(
|
||||
"The port to connect to on localhost for HTTPS traffic")
|
||||
|
||||
def add_parser_arguments(cls, parser):
|
||||
"""Adds command line arguments needed by the parser"""
|
||||
|
||||
def __init__(args):
|
||||
"""Initializes the plugin with the given command line args"""
|
||||
|
||||
def cleanup_from_tests():
|
||||
"""Performs any necessary cleanup from running plugin tests.
|
||||
|
||||
This is guarenteed to be called before the program exits.
|
||||
|
||||
"""
|
||||
|
||||
def has_more_configs():
|
||||
"""Returns True if there are more configs to test"""
|
||||
|
||||
def load_config():
|
||||
"""Loads the next config and returns its name"""
|
||||
|
||||
def get_testable_domain_names():
|
||||
"""Returns the domain names that can be used in testing"""
|
||||
|
||||
|
||||
class IAuthenticatorProxy(IPluginProxy, letsencrypt.interfaces.IAuthenticator):
|
||||
"""Wraps a Let's Encrypt authenticator"""
|
||||
|
||||
|
||||
class IInstallerProxy(IPluginProxy, letsencrypt.interfaces.IInstaller):
|
||||
"""Wraps a Let's Encrypt installer"""
|
||||
|
||||
def get_all_names_answer():
|
||||
"""Returns all names that should be found by the installer"""
|
||||
|
||||
|
||||
class IConfiguratorProxy(IAuthenticatorProxy, IInstallerProxy):
|
||||
"""Wraps a Let's Encrypt configurator"""
|
||||
|
|
@ -0,0 +1,368 @@
|
|||
"""Tests Let's Encrypt plugins against different server configurations."""
|
||||
import argparse
|
||||
import filecmp
|
||||
import functools
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
import time
|
||||
|
||||
import OpenSSL
|
||||
|
||||
from acme import challenges
|
||||
from acme import crypto_util
|
||||
from acme import messages
|
||||
from letsencrypt import achallenges
|
||||
from letsencrypt import errors as le_errors
|
||||
from letsencrypt import validator
|
||||
from letsencrypt.tests import acme_util
|
||||
|
||||
from letsencrypt_compatibility_test import errors
|
||||
from letsencrypt_compatibility_test import util
|
||||
from letsencrypt_compatibility_test.configurators.apache import apache24
|
||||
|
||||
|
||||
DESCRIPTION = """
|
||||
Tests Let's Encrypt plugins against different server configuratons. It is
|
||||
assumed that Docker is already installed. If no test types is specified, all
|
||||
tests that the plugin supports are performed.
|
||||
|
||||
"""
|
||||
|
||||
PLUGINS = {"apache" : apache24.Proxy}
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def test_authenticator(plugin, config, temp_dir):
|
||||
"""Tests authenticator, returning True if the tests are successful"""
|
||||
backup = _create_backup(config, temp_dir)
|
||||
|
||||
achalls = _create_achalls(plugin)
|
||||
if not achalls:
|
||||
logger.error("The plugin and this program support no common "
|
||||
"challenge types")
|
||||
return False
|
||||
|
||||
try:
|
||||
responses = plugin.perform(achalls)
|
||||
except le_errors.Error as error:
|
||||
logger.error("Performing challenges on %s caused an error:", config)
|
||||
logger.exception(error)
|
||||
return False
|
||||
|
||||
success = True
|
||||
for i in xrange(len(responses)):
|
||||
if not responses[i]:
|
||||
logger.error(
|
||||
"Plugin failed to complete %s for %s in %s",
|
||||
type(achalls[i]), achalls[i].domain, config)
|
||||
success = False
|
||||
elif isinstance(responses[i], challenges.DVSNIResponse):
|
||||
verify = functools.partial(responses[i].simple_verify, achalls[i],
|
||||
achalls[i].domain,
|
||||
util.JWK.public_key(),
|
||||
host="127.0.0.1",
|
||||
port=plugin.https_port)
|
||||
if _try_until_true(verify):
|
||||
logger.info(
|
||||
"DVSNI verification for %s succeeded", achalls[i].domain)
|
||||
else:
|
||||
logger.error(
|
||||
"DVSNI verification for %s in %s failed",
|
||||
achalls[i].domain, config)
|
||||
success = False
|
||||
|
||||
if success:
|
||||
try:
|
||||
plugin.cleanup(achalls)
|
||||
except le_errors.Error as error:
|
||||
logger.error("Challenge cleanup for %s caused an error:", config)
|
||||
logger.exception(error)
|
||||
success = False
|
||||
|
||||
if _dirs_are_unequal(config, backup):
|
||||
logger.error("Challenge cleanup failed for %s", config)
|
||||
return False
|
||||
else:
|
||||
logger.info("Challenge cleanup succeeded")
|
||||
|
||||
return success
|
||||
|
||||
|
||||
def _create_achalls(plugin):
|
||||
"""Returns a list of annotated challenges to test on plugin"""
|
||||
achalls = list()
|
||||
names = plugin.get_testable_domain_names()
|
||||
for domain in names:
|
||||
prefs = plugin.get_chall_pref(domain)
|
||||
for chall_type in prefs:
|
||||
if chall_type == challenges.DVSNI:
|
||||
chall = challenges.DVSNI(
|
||||
token=os.urandom(challenges.DVSNI.TOKEN_SIZE))
|
||||
challb = acme_util.chall_to_challb(
|
||||
chall, messages.STATUS_PENDING)
|
||||
achall = achallenges.DVSNI(
|
||||
challb=challb, domain=domain, account_key=util.JWK)
|
||||
achalls.append(achall)
|
||||
|
||||
return achalls
|
||||
|
||||
|
||||
def test_installer(args, plugin, config, temp_dir):
|
||||
"""Tests plugin as an installer"""
|
||||
backup = _create_backup(config, temp_dir)
|
||||
|
||||
names_match = plugin.get_all_names() == plugin.get_all_names_answer()
|
||||
if names_match:
|
||||
logger.info("get_all_names test succeeded")
|
||||
else:
|
||||
logger.error("get_all_names test failed for config %s", config)
|
||||
|
||||
domains = list(plugin.get_testable_domain_names())
|
||||
success = test_deploy_cert(plugin, temp_dir, domains)
|
||||
|
||||
if success and args.enhance:
|
||||
success = test_enhancements(plugin, domains)
|
||||
|
||||
good_rollback = test_rollback(plugin, config, backup)
|
||||
return names_match and success and good_rollback
|
||||
|
||||
|
||||
def test_deploy_cert(plugin, temp_dir, domains):
|
||||
"""Tests deploy_cert returning True if the tests are successful"""
|
||||
cert = crypto_util.gen_ss_cert(util.KEY, domains)
|
||||
cert_path = os.path.join(temp_dir, "cert.pem")
|
||||
with open(cert_path, "w") as f:
|
||||
f.write(OpenSSL.crypto.dump_certificate(
|
||||
OpenSSL.crypto.FILETYPE_PEM, cert))
|
||||
|
||||
for domain in domains:
|
||||
try:
|
||||
plugin.deploy_cert(domain, cert_path, util.KEY_PATH)
|
||||
except le_errors.Error as error:
|
||||
logger.error("Plugin failed to deploy ceritificate for %s:", domain)
|
||||
logger.exception(error)
|
||||
return False
|
||||
|
||||
if not _save_and_restart(plugin, "deployed"):
|
||||
return False
|
||||
|
||||
success = True
|
||||
for domain in domains:
|
||||
verify = functools.partial(validator.Validator().certificate, cert,
|
||||
domain, "127.0.0.1", plugin.https_port)
|
||||
if not _try_until_true(verify):
|
||||
logger.error("Could not verify certificate for domain %s", domain)
|
||||
success = False
|
||||
|
||||
if success:
|
||||
logger.info("HTTPS validation succeeded")
|
||||
|
||||
return success
|
||||
|
||||
|
||||
def test_enhancements(plugin, domains):
|
||||
"""Tests supported enhancements returning True if successful"""
|
||||
supported = plugin.supported_enhancements()
|
||||
|
||||
if "redirect" not in supported:
|
||||
logger.error("The plugin and this program support no common "
|
||||
"enhancements")
|
||||
return False
|
||||
|
||||
for domain in domains:
|
||||
try:
|
||||
plugin.enhance(domain, "redirect")
|
||||
except le_errors.PluginError as error:
|
||||
# Don't immediately fail because a redirect may already be enabled
|
||||
logger.warning("Plugin failed to enable redirect for %s:", domain)
|
||||
logger.warning("%s", error)
|
||||
except le_errors.Error as error:
|
||||
logger.error("An error occurred while enabling redirect for %s:",
|
||||
domain)
|
||||
logger.exception(error)
|
||||
|
||||
if not _save_and_restart(plugin, "enhanced"):
|
||||
return False
|
||||
|
||||
success = True
|
||||
for domain in domains:
|
||||
verify = functools.partial(validator.Validator().redirect, "localhost",
|
||||
plugin.http_port, headers={"Host" : domain})
|
||||
if not _try_until_true(verify):
|
||||
logger.error("Improper redirect for domain %s", domain)
|
||||
success = False
|
||||
|
||||
if success:
|
||||
logger.info("Enhancments test succeeded")
|
||||
|
||||
return success
|
||||
|
||||
|
||||
def _try_until_true(func, max_tries=5, sleep_time=0.5):
|
||||
"""Calls func up to max_tries times until it returns True"""
|
||||
for _ in xrange(0, max_tries):
|
||||
if func():
|
||||
return True
|
||||
else:
|
||||
time.sleep(sleep_time)
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def _save_and_restart(plugin, title=None):
|
||||
"""Saves and restart the plugin, returning True if no errors occurred"""
|
||||
try:
|
||||
plugin.save(title)
|
||||
plugin.restart()
|
||||
return True
|
||||
except le_errors.Error as error:
|
||||
logger.error("Plugin failed to save and restart server:")
|
||||
logger.exception(error)
|
||||
return False
|
||||
|
||||
|
||||
def test_rollback(plugin, config, backup):
|
||||
"""Tests the rollback checkpoints function"""
|
||||
try:
|
||||
plugin.rollback_checkpoints(1337)
|
||||
except le_errors.Error as error:
|
||||
logger.error("Plugin raised an exception during rollback:")
|
||||
logger.exception(error)
|
||||
return False
|
||||
|
||||
if _dirs_are_unequal(config, backup):
|
||||
logger.error("Rollback failed for config `%s`", config)
|
||||
return False
|
||||
else:
|
||||
logger.info("Rollback succeeded")
|
||||
return True
|
||||
|
||||
|
||||
def _create_backup(config, temp_dir):
|
||||
"""Creates a backup of config in temp_dir"""
|
||||
backup = os.path.join(temp_dir, "backup")
|
||||
shutil.rmtree(backup, ignore_errors=True)
|
||||
shutil.copytree(config, backup, symlinks=True)
|
||||
|
||||
return backup
|
||||
|
||||
|
||||
def _dirs_are_unequal(dir1, dir2):
|
||||
"""Returns True if dir1 and dir2 are unequal"""
|
||||
dircmps = [filecmp.dircmp(dir1, dir2)]
|
||||
while len(dircmps):
|
||||
dircmp = dircmps.pop()
|
||||
if dircmp.left_only or dircmp.right_only:
|
||||
logger.error("The following files and directories are only "
|
||||
"present in one directory")
|
||||
if dircmp.left_only:
|
||||
logger.error(dircmp.left_only)
|
||||
else:
|
||||
logger.error(dircmp.right_only)
|
||||
return True
|
||||
elif dircmp.common_funny or dircmp.funny_files:
|
||||
logger.error("The following files and directories could not be "
|
||||
"compared:")
|
||||
if dircmp.common_funny:
|
||||
logger.error(dircmp.common_funny)
|
||||
else:
|
||||
logger.error(dircmp.funny_files)
|
||||
return True
|
||||
elif dircmp.diff_files:
|
||||
logger.error("The following files differ:")
|
||||
logger.error(dircmp.diff_files)
|
||||
return True
|
||||
|
||||
for subdir in dircmp.subdirs.itervalues():
|
||||
dircmps.append(subdir)
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def get_args():
|
||||
"""Returns parsed command line arguments."""
|
||||
parser = argparse.ArgumentParser(
|
||||
description=DESCRIPTION,
|
||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||
|
||||
group = parser.add_argument_group("general")
|
||||
group.add_argument(
|
||||
"-c", "--configs", default="configs.tar.gz",
|
||||
help="a directory or tarball containing server configurations")
|
||||
group.add_argument(
|
||||
"-p", "--plugin", default="apache", help="the plugin to be tested")
|
||||
group.add_argument(
|
||||
"-v", "--verbose", dest="verbose_count", action="count",
|
||||
default=0, help="you know how to use this")
|
||||
group.add_argument(
|
||||
"-a", "--auth", action="store_true",
|
||||
help="tests the challenges the plugin supports")
|
||||
group.add_argument(
|
||||
"-i", "--install", action="store_true",
|
||||
help="tests the plugin as an installer")
|
||||
group.add_argument(
|
||||
"-e", "--enhance", action="store_true", help="tests the enhancements "
|
||||
"the plugin supports (implicitly includes installer tests)")
|
||||
|
||||
for plugin in PLUGINS.itervalues():
|
||||
plugin.add_parser_arguments(parser)
|
||||
|
||||
args = parser.parse_args()
|
||||
if args.enhance:
|
||||
args.install = True
|
||||
elif not (args.auth or args.install):
|
||||
args.auth = args.install = args.enhance = True
|
||||
|
||||
return args
|
||||
|
||||
|
||||
def setup_logging(args):
|
||||
"""Prepares logging for the program"""
|
||||
handler = logging.StreamHandler()
|
||||
|
||||
root_logger = logging.getLogger()
|
||||
root_logger.setLevel(logging.ERROR - args.verbose_count * 10)
|
||||
root_logger.addHandler(handler)
|
||||
|
||||
|
||||
def main():
|
||||
"""Main test script execution."""
|
||||
args = get_args()
|
||||
setup_logging(args)
|
||||
|
||||
if args.plugin not in PLUGINS:
|
||||
raise errors.Error("Unknown plugin {0}".format(args.plugin))
|
||||
|
||||
temp_dir = tempfile.mkdtemp()
|
||||
plugin = PLUGINS[args.plugin](args)
|
||||
try:
|
||||
plugin.execute_in_docker("mkdir -p /var/log/apache2")
|
||||
while plugin.has_more_configs():
|
||||
success = True
|
||||
|
||||
try:
|
||||
config = plugin.load_config()
|
||||
logger.info("Loaded configuration: %s", config)
|
||||
if args.auth:
|
||||
success = test_authenticator(plugin, config, temp_dir)
|
||||
if success and args.install:
|
||||
success = test_installer(args, plugin, config, temp_dir)
|
||||
except errors.Error as error:
|
||||
logger.error("Tests on %s raised:", config)
|
||||
logger.exception(error)
|
||||
success = False
|
||||
|
||||
if success:
|
||||
logger.info("All tests on %s succeeded", config)
|
||||
else:
|
||||
logger.error("Tests on %s failed", config)
|
||||
finally:
|
||||
plugin.cleanup_from_tests()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
13
letsencrypt-compatibility-test/letsencrypt_compatibility_test/testdata/empty_cert.pem
vendored
Normal file
13
letsencrypt-compatibility-test/letsencrypt_compatibility_test/testdata/empty_cert.pem
vendored
Normal file
|
|
@ -0,0 +1,13 @@
|
|||
-----BEGIN CERTIFICATE-----
|
||||
MIICATCCAWoCCQCvMbKu4FHZ6zANBgkqhkiG9w0BAQsFADBFMQswCQYDVQQGEwJB
|
||||
VTETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50ZXJuZXQgV2lkZ2l0
|
||||
cyBQdHkgTHRkMB4XDTE1MDcyMzIzMjc1MFoXDTE2MDcyMjIzMjc1MFowRTELMAkG
|
||||
A1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0
|
||||
IFdpZGdpdHMgUHR5IEx0ZDCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEAws3o
|
||||
y46PMLM9Gr68pbex0MhdPr7Cq4rRe9BBpnOuHFdF35Ak0aPrzFwVzLlGOir94U11
|
||||
e5JYJDWJi+4FwLBRkOAfanjJ5GJ9BnEHSOdbtO+sv9uhbt+7iYOOUOngKSiJyUrM
|
||||
i1THAE+B1CenxZ1KHRQCke708zkK8jVuxLeIAOMCAwEAATANBgkqhkiG9w0BAQsF
|
||||
AAOBgQCC3LUP3MHk+IBmwHHZAZCX+6p4lop9SP6y6rDpWgnqEEeb9oFleHi2Rvzq
|
||||
7gxl6nS5AsaSzfAygJ3zWKTwVAZyU4GOQ8QTK+nHk3+LO1X4cDbUlQfm5+YuwKDa
|
||||
4LFKeovmrK6BiMLIc1J+MxUjLfCeVHYSdkZULTVXue0zif0BUA==
|
||||
-----END CERTIFICATE-----
|
||||
15
letsencrypt-compatibility-test/letsencrypt_compatibility_test/testdata/rsa1024_key.pem
vendored
Normal file
15
letsencrypt-compatibility-test/letsencrypt_compatibility_test/testdata/rsa1024_key.pem
vendored
Normal file
|
|
@ -0,0 +1,15 @@
|
|||
-----BEGIN RSA PRIVATE KEY-----
|
||||
MIICXAIBAAKBgQCsREbM+UcfsgDy2w56AVGyxsO0HVsbEZHHoEzv7qksIwFgRYMp
|
||||
rowwIxD450RQQqjvw9IoXlMVmr1t5szn5KXn9JRO9T5KNCCy3VPx75WBcp6kzd9Q
|
||||
2HS1OEOtpilNnDkZ+TJfdgFWPUBYj2o4Md1hPmcvagiIJY5U6speka2bjwIDAQAB
|
||||
AoGANCMZ9pF/mDUsmlP4Rq69hkkoFAxKdZ/UqkF256so4mXZ1cRUFTpxzWPfkCWW
|
||||
hGAYdzCiG3uo08IYkPmojIqkN1dk5Hcq5eQAmshaPkQHQCHjmPjjcNvgjIXQoGUf
|
||||
TpDU2hbY4UAlJlj4ZLh+jGP5Zq8/WrNi8RsI3v9Nagfp/FECQQDgi2q8p1gX0TNh
|
||||
d1aEKmSXkR3bxkyFk6oS+pBrAG3+yX27ZayN6Rx6DOs/FcBsOu7fX3PYBziDeEWe
|
||||
Lkf1P743AkEAxGYT/LY3puglSz4iJZZzWmRCrVOg41yhfQ+F1BRX43/2vtoU5GyM
|
||||
2lUn1vQ2e/rfmnAvfJxc90GeZCIHB1ihaQJBALH8UMLxMtbOMJgVbDKfF9U8ZhqK
|
||||
+KT5A1q/2jG2yXmoZU1hroFeQgBMtTvwFfK0VBwjIUQflSBA+Y4EyW0Q9ckCQGvd
|
||||
jHitM1+N/H2YwHRYbz5j9mLvnVuCEod3MQ9LpQGj1Eb5y6OxIqL/RgQ+2HW7UXem
|
||||
yc3sqvp5pZ5lOesE+JECQETPI64gqxlTIs3nErNMpMynUuTWpaElOcIJTT6icLzB
|
||||
Xix67kKXjROO5D58GEYkM0Yi5k7YdUPoQBW7MoIrSIA=
|
||||
-----END RSA PRIVATE KEY-----
|
||||
15
letsencrypt-compatibility-test/letsencrypt_compatibility_test/testdata/rsa1024_key2.pem
vendored
Normal file
15
letsencrypt-compatibility-test/letsencrypt_compatibility_test/testdata/rsa1024_key2.pem
vendored
Normal file
|
|
@ -0,0 +1,15 @@
|
|||
-----BEGIN RSA PRIVATE KEY-----
|
||||
MIICXAIBAAKBgQDCzejLjo8wsz0avrylt7HQyF0+vsKritF70EGmc64cV0XfkCTR
|
||||
o+vMXBXMuUY6Kv3hTXV7klgkNYmL7gXAsFGQ4B9qeMnkYn0GcQdI51u076y/26Fu
|
||||
37uJg45Q6eApKInJSsyLVMcAT4HUJ6fFnUodFAKR7vTzOQryNW7Et4gA4wIDAQAB
|
||||
AoGAKiAU40/krwdTg2ETslJS5W8ums7tkeLnAfs69x+02vQUbA/jpmHoL70KCcdW
|
||||
5GU/mWUCrsIqxUm+gL/sBosaV/TF256qUBt2qQCZTN8MbDaNSYiiMnucOfbWdIqx
|
||||
Zgls6GUoXQvPic9RUoFSlgfSjo5ezz6el5ihvRMp+wbk24ECQQD3oz4hN029DSZo
|
||||
Y3+flmBn77gA0BMUvLa6hmt9b3xT5U/ToCLfbmUvpx7zV1g5era2y9qt/o3UtAbW
|
||||
1zCVETgzAkEAyWHv/+RnSXp8/D4YwTVWyeWi862uNBPkuLGP/0zASdwBfBK3uBls
|
||||
+VumfSCtp0kt2AXXmScg1fkHdeAVT6AkkQJBAJb2XRnCrRFiwtdAULzo3zx9Vp6o
|
||||
OfmaUYrEByMgo5pBYLiSFrA+jFDQgH238YCY3mnxPA517+CLHuA5rtQw+yECQCfm
|
||||
gL/pyFE1tLfhsdPuNpDwL9YqLl7hJis1+zrxQRQhRCYKK16NoxrQ/u7B38ZKaIvp
|
||||
tGsC5q2elszTJkXNjBECQCVE9QCVx056vHVdPWM8z3GAeV3sJQ01HLLjebTEEz6G
|
||||
jH54gk+YYPp4kjCvVUykbnB58BY2n88GQt5Jj5eLuMo=
|
||||
-----END RSA PRIVATE KEY-----
|
||||
|
|
@ -0,0 +1,62 @@
|
|||
"""Utility functions for Let"s Encrypt plugin tests."""
|
||||
import argparse
|
||||
import copy
|
||||
import contextlib
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import socket
|
||||
import tarfile
|
||||
|
||||
from acme import jose
|
||||
from acme import test_util
|
||||
from letsencrypt import constants
|
||||
|
||||
from letsencrypt_compatibility_test import errors
|
||||
|
||||
|
||||
_KEY_BASE = "rsa1024_key.pem"
|
||||
KEY_PATH = test_util.vector_path(_KEY_BASE)
|
||||
KEY = test_util.load_pyopenssl_private_key(_KEY_BASE)
|
||||
JWK = jose.JWKRSA(key=test_util.load_rsa_private_key(_KEY_BASE))
|
||||
IP_REGEX = re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$")
|
||||
|
||||
|
||||
def create_le_config(parent_dir):
|
||||
"""Sets up LE dirs in parent_dir and returns the config dict"""
|
||||
config = copy.deepcopy(constants.CLI_DEFAULTS)
|
||||
|
||||
le_dir = os.path.join(parent_dir, "letsencrypt")
|
||||
config["config_dir"] = os.path.join(le_dir, "config")
|
||||
config["work_dir"] = os.path.join(le_dir, "work")
|
||||
config["logs_dir"] = os.path.join(le_dir, "logs_dir")
|
||||
os.makedirs(config["config_dir"])
|
||||
os.mkdir(config["work_dir"])
|
||||
os.mkdir(config["logs_dir"])
|
||||
|
||||
return argparse.Namespace(**config) # pylint: disable=star-args
|
||||
|
||||
|
||||
def extract_configs(configs, parent_dir):
|
||||
"""Extracts configs to a new dir under parent_dir and returns it"""
|
||||
config_dir = os.path.join(parent_dir, "configs")
|
||||
|
||||
if os.path.isdir(configs):
|
||||
shutil.copytree(configs, config_dir, symlinks=True)
|
||||
elif tarfile.is_tarfile(configs):
|
||||
with tarfile.open(configs, "r") as tar:
|
||||
tar.extractall(config_dir)
|
||||
else:
|
||||
raise errors.Error("Unknown configurations file type")
|
||||
|
||||
return config_dir
|
||||
|
||||
|
||||
def get_two_free_ports():
|
||||
"""Returns two free ports to use for the tests"""
|
||||
with contextlib.closing(socket.socket()) as sock1:
|
||||
with contextlib.closing(socket.socket()) as sock2:
|
||||
sock1.bind(("", 0))
|
||||
sock2.bind(("", 0))
|
||||
|
||||
return sock1.getsockname()[1], sock2.getsockname()[1]
|
||||
23
letsencrypt-compatibility-test/setup.py
Normal file
23
letsencrypt-compatibility-test/setup.py
Normal file
|
|
@ -0,0 +1,23 @@
|
|||
from setuptools import setup
|
||||
from setuptools import find_packages
|
||||
|
||||
|
||||
install_requires = [
|
||||
'letsencrypt',
|
||||
'letsencrypt-apache',
|
||||
'letsencrypt-nginx',
|
||||
'docker-py',
|
||||
'mock<1.1.0', # py26
|
||||
'zope.interface',
|
||||
]
|
||||
|
||||
setup(
|
||||
name='letsencrypt-compatibility-test',
|
||||
packages=find_packages(),
|
||||
install_requires=install_requires,
|
||||
entry_points={
|
||||
'console_scripts': [
|
||||
'letsencrypt-compatibility-test = letsencrypt_compatibility_test.test_driver:main',
|
||||
],
|
||||
},
|
||||
)
|
||||
|
|
@ -414,17 +414,52 @@ class IDisplay(zope.interface.Interface):
|
|||
class IValidator(zope.interface.Interface):
|
||||
"""Configuration validator."""
|
||||
|
||||
def redirect(name):
|
||||
"""Verify redirect to HTTPS."""
|
||||
def certificate(cert, name, alt_host=None, port=443):
|
||||
"""Verifies the certificate presented at name is cert
|
||||
|
||||
def ocsp_stapling(name):
|
||||
"""Verify ocsp stapling for domain."""
|
||||
:param OpenSSL.crypto.X509 cert: Expected certificate
|
||||
:param str name: Server's domain name
|
||||
:param bytes alt_host: Host to connect to instead of the IP
|
||||
address of host
|
||||
:param int port: Port to connect to
|
||||
|
||||
:returns: True if the certificate was verified successfully
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
|
||||
def redirect(name, port=80, headers=None):
|
||||
"""Verify redirect to HTTPS
|
||||
|
||||
:param str name: Server's domain name
|
||||
:param int port: Port to connect to
|
||||
:param dict headers: HTTP headers to include in request
|
||||
|
||||
:returns: True if redirect is successfully enabled
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
|
||||
def https(names):
|
||||
"""Verify HTTPS is enabled for domain."""
|
||||
|
||||
def hsts(name):
|
||||
"""Verify HSTS header is enabled."""
|
||||
"""Verify HSTS header is enabled
|
||||
|
||||
:param str name: Server's domain name
|
||||
|
||||
:returns: True if HSTS header is successfully enabled
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
|
||||
def ocsp_stapling(name):
|
||||
"""Verify ocsp stapling for domain
|
||||
|
||||
:param str name: Server's domain name
|
||||
|
||||
:returns: True if ocsp stapling is successfully enabled
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
|
||||
|
||||
class IReporter(zope.interface.Interface):
|
||||
|
|
|
|||
121
letsencrypt/tests/validator_test.py
Normal file
121
letsencrypt/tests/validator_test.py
Normal file
|
|
@ -0,0 +1,121 @@
|
|||
"""Tests for letsencrypt.validator."""
|
||||
import requests
|
||||
import unittest
|
||||
|
||||
import mock
|
||||
import OpenSSL
|
||||
|
||||
from acme import errors as acme_errors
|
||||
from letsencrypt import validator
|
||||
|
||||
|
||||
class ValidatorTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.validator = validator.Validator()
|
||||
|
||||
@mock.patch("letsencrypt.validator.crypto_util.probe_sni")
|
||||
def test_certificate_success(self, mock_probe_sni):
|
||||
cert = OpenSSL.crypto.X509()
|
||||
mock_probe_sni.return_value = cert
|
||||
self.assertTrue(self.validator.certificate(
|
||||
cert, "test.com", "127.0.0.1"))
|
||||
|
||||
@mock.patch("letsencrypt.validator.crypto_util.probe_sni")
|
||||
def test_certificate_error(self, mock_probe_sni):
|
||||
cert = OpenSSL.crypto.X509()
|
||||
mock_probe_sni.side_effect = [acme_errors.Error]
|
||||
self.assertFalse(self.validator.certificate(
|
||||
cert, "test.com", "127.0.0.1"))
|
||||
|
||||
@mock.patch("letsencrypt.validator.crypto_util.probe_sni")
|
||||
def test_certificate_failure(self, mock_probe_sni):
|
||||
cert = OpenSSL.crypto.X509()
|
||||
cert.set_serial_number(1337)
|
||||
mock_probe_sni.return_value = OpenSSL.crypto.X509()
|
||||
self.assertFalse(self.validator.certificate(
|
||||
cert, "test.com", "127.0.0.1"))
|
||||
|
||||
@mock.patch("letsencrypt.validator.requests.get")
|
||||
def test_succesful_redirect(self, mock_get_request):
|
||||
mock_get_request.return_value = create_response(
|
||||
301, {"location" : "https://test.com"})
|
||||
self.assertTrue(self.validator.redirect("test.com"))
|
||||
|
||||
@mock.patch("letsencrypt.validator.requests.get")
|
||||
def test_redirect_with_headers(self, mock_get_request):
|
||||
mock_get_request.return_value = create_response(
|
||||
301, {"location" : "https://test.com"})
|
||||
self.assertTrue(self.validator.redirect(
|
||||
"test.com", headers={"Host" : "test.com"}))
|
||||
|
||||
@mock.patch("letsencrypt.validator.requests.get")
|
||||
def test_redirect_missing_location(self, mock_get_request):
|
||||
mock_get_request.return_value = create_response(301)
|
||||
self.assertFalse(self.validator.redirect("test.com"))
|
||||
|
||||
@mock.patch("letsencrypt.validator.requests.get")
|
||||
def test_redirect_wrong_status_code(self, mock_get_request):
|
||||
mock_get_request.return_value = create_response(
|
||||
201, {"location" : "https://test.com"})
|
||||
self.assertFalse(self.validator.redirect("test.com"))
|
||||
|
||||
@mock.patch("letsencrypt.validator.requests.get")
|
||||
def test_redirect_wrong_redirect_code(self, mock_get_request):
|
||||
mock_get_request.return_value = create_response(
|
||||
303, {"location" : "https://test.com"})
|
||||
self.assertFalse(self.validator.redirect("test.com"))
|
||||
|
||||
@mock.patch("letsencrypt.validator.requests.get")
|
||||
def test_hsts_empty(self, mock_get_request):
|
||||
mock_get_request.return_value = create_response(
|
||||
headers={"strict-transport-security": ""})
|
||||
self.assertFalse(self.validator.hsts("test.com"))
|
||||
|
||||
@mock.patch("letsencrypt.validator.requests.get")
|
||||
def test_hsts_malformed(self, mock_get_request):
|
||||
mock_get_request.return_value = create_response(
|
||||
headers={"strict-transport-security": "sdfal"})
|
||||
self.assertFalse(self.validator.hsts("test.com"))
|
||||
|
||||
@mock.patch("letsencrypt.validator.requests.get")
|
||||
def test_hsts_bad_max_age(self, mock_get_request):
|
||||
mock_get_request.return_value = create_response(
|
||||
headers={"strict-transport-security": "max-age=not-an-int"})
|
||||
self.assertFalse(self.validator.hsts("test.com"))
|
||||
|
||||
@mock.patch("letsencrypt.validator.requests.get")
|
||||
def test_hsts_expire(self, mock_get_request):
|
||||
mock_get_request.return_value = create_response(
|
||||
headers={"strict-transport-security": "max-age=3600"})
|
||||
self.assertFalse(self.validator.hsts("test.com"))
|
||||
|
||||
@mock.patch("letsencrypt.validator.requests.get")
|
||||
def test_hsts(self, mock_get_request):
|
||||
mock_get_request.return_value = create_response(
|
||||
headers={"strict-transport-security": "max-age=31536000"})
|
||||
self.assertTrue(self.validator.hsts("test.com"))
|
||||
|
||||
@mock.patch("letsencrypt.validator.requests.get")
|
||||
def test_hsts_include_subdomains(self, mock_get_request):
|
||||
mock_get_request.return_value = create_response(
|
||||
headers={"strict-transport-security":
|
||||
"max-age=31536000;includeSubDomains"})
|
||||
self.assertTrue(self.validator.hsts("test.com"))
|
||||
|
||||
def test_ocsp_stapling(self):
|
||||
self.assertRaises(
|
||||
NotImplementedError, self.validator.ocsp_stapling, "test.com")
|
||||
|
||||
def create_response(status_code=200, headers=None):
|
||||
"""Creates a requests.Response object for testing"""
|
||||
response = requests.Response()
|
||||
response.status_code = status_code
|
||||
|
||||
if headers:
|
||||
response.headers = headers
|
||||
|
||||
return response
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main() # pragma: no cover
|
||||
84
letsencrypt/validator.py
Normal file
84
letsencrypt/validator.py
Normal file
|
|
@ -0,0 +1,84 @@
|
|||
"""Validators to determine the current webserver configuration"""
|
||||
import logging
|
||||
import socket
|
||||
import requests
|
||||
import zope.interface
|
||||
|
||||
from acme import crypto_util
|
||||
from acme import errors as acme_errors
|
||||
from letsencrypt import interfaces
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Validator(object):
|
||||
# pylint: disable=no-self-use
|
||||
"""Collection of functions to test a live webserver's configuration"""
|
||||
zope.interface.implements(interfaces.IValidator)
|
||||
|
||||
def certificate(self, cert, name, alt_host=None, port=443):
|
||||
"""Verifies the certificate presented at name is cert"""
|
||||
host = alt_host if alt_host else socket.gethostbyname(name)
|
||||
try:
|
||||
presented_cert = crypto_util.probe_sni(name, host, port)
|
||||
except acme_errors.Error as error:
|
||||
logger.exception(error)
|
||||
return False
|
||||
|
||||
return presented_cert.digest("sha256") == cert.digest("sha256")
|
||||
|
||||
def redirect(self, name, port=80, headers=None):
|
||||
"""Test whether webserver redirects to secure connection."""
|
||||
url = "http://{0}:{1}".format(name, port)
|
||||
if headers:
|
||||
response = requests.get(url, headers=headers, allow_redirects=False)
|
||||
else:
|
||||
response = requests.get(url, allow_redirects=False)
|
||||
|
||||
if response.status_code not in (301, 303):
|
||||
return False
|
||||
|
||||
redirect_location = response.headers.get("location", "")
|
||||
if not redirect_location.startswith("https://"):
|
||||
return False
|
||||
|
||||
if response.status_code != 301:
|
||||
logger.error("Server did not redirect with permanent code")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def hsts(self, name):
|
||||
"""Test for HTTP Strict Transport Security header"""
|
||||
headers = requests.get("https://" + name).headers
|
||||
hsts_header = headers.get("strict-transport-security")
|
||||
|
||||
if not hsts_header:
|
||||
return False
|
||||
|
||||
# Split directives following RFC6797, section 6.1
|
||||
directives = [d.split("=") for d in hsts_header.split(";")]
|
||||
max_age = [d for d in directives if d[0] == "max-age"]
|
||||
|
||||
if not max_age:
|
||||
logger.error("Server responded with invalid HSTS header field")
|
||||
return False
|
||||
|
||||
try:
|
||||
_, max_age_value = max_age[0]
|
||||
max_age_value = int(max_age_value)
|
||||
except ValueError:
|
||||
logger.error("Server responded with invalid HSTS header field")
|
||||
return False
|
||||
|
||||
# Test whether HSTS does not expire for at least two weeks.
|
||||
if max_age_value <= (2 * 7 * 24 * 3600):
|
||||
logger.error("HSTS should not expire in less than two weeks")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def ocsp_stapling(self, name):
|
||||
"""Verify ocsp stapling for domain."""
|
||||
raise NotImplementedError()
|
||||
3
tox.ini
3
tox.ini
|
|
@ -35,8 +35,9 @@ basepython = python2.7
|
|||
# duplicate code checking; if one of the commands fails, others will
|
||||
# continue, but tox return code will reflect previous error
|
||||
commands =
|
||||
pip install -r requirements.txt -e acme -e .[dev] -e letsencrypt-apache -e letsencrypt-nginx
|
||||
pip install -r requirements.txt -e acme -e .[dev] -e letsencrypt-apache -e letsencrypt-nginx -e letsencrypt-compatibility-test
|
||||
pylint --rcfile=.pylintrc letsencrypt
|
||||
pylint --rcfile=.pylintrc acme/acme
|
||||
pylint --rcfile=.pylintrc letsencrypt-apache/letsencrypt_apache
|
||||
pylint --rcfile=.pylintrc letsencrypt-nginx/letsencrypt_nginx
|
||||
pylint --rcfile=.pylintrc letsencrypt-compatibility-test/letsencrypt_compatibility_test
|
||||
|
|
|
|||
Loading…
Reference in a new issue