[Unix] Create a framework for certbot integration tests: PART 2 (#6821)

* Second part: integration tests for certbot core

* Specific coverages

* Add comments

* Improve names

* Suspend fail-under until complete coverage

* Implement a minimal functional example

* Update certbot-ci/certbot_integration_tests/certbot_tests/conftest.py

Co-Authored-By: adferrand <adferrand@users.noreply.github.com>

* Update certbot-ci/certbot_integration_tests/certbot_tests/context.py

Co-Authored-By: adferrand <adferrand@users.noreply.github.com>

* Update certbot-ci/certbot_integration_tests/certbot_tests/context.py

Co-Authored-By: adferrand <adferrand@users.noreply.github.com>

* Update certbot-ci/certbot_integration_tests/utils/misc.py

Co-Authored-By: adferrand <adferrand@users.noreply.github.com>

* Update certbot-ci/certbot_integration_tests/utils/misc.py

Co-Authored-By: adferrand <adferrand@users.noreply.github.com>

* Fist set of corrections after review

* Fix test and test deploy hook flag

* Improve an assertion, remove conftest

* Add a test to cover all assertions. Remove the CSR logic for now

* Update certbot-ci/certbot_integration_tests/utils/misc.py

Co-Authored-By: adferrand <adferrand@users.noreply.github.com>

* Update certbot-ci/certbot_integration_tests/utils/misc.py

Co-Authored-By: adferrand <adferrand@users.noreply.github.com>

* Update certbot-ci/certbot_integration_tests/utils/misc.py

Co-Authored-By: adferrand <adferrand@users.noreply.github.com>

* Update certbot-ci/certbot_integration_tests/utils/misc.py

Co-Authored-By: adferrand <adferrand@users.noreply.github.com>

* Some corrections

* Add the http-01 test to complete coverage

* Add a comment.

* Make single requirements

* Update certbot-ci/certbot_integration_tests/certbot_tests/context.py

Co-Authored-By: adferrand <adferrand@users.noreply.github.com>

* Revert "Some corrections"

This reverts commit 6f20a060e5.

# Conflicts:
#	certbot-ci/certbot_integration_tests/certbot_tests/context.py
#	certbot-ci/certbot_integration_tests/certbot_tests/test_main.py

* Clean join

* Update certbot-ci/certbot_integration_tests/certbot_tests/context.py

Co-Authored-By: adferrand <adferrand@users.noreply.github.com>

* Update certbot-ci/certbot_integration_tests/certbot_tests/context.py

Co-Authored-By: adferrand <adferrand@users.noreply.github.com>

* Change assertion name

* Rewrite http auth hook as real python scripts

* Correct output in some OS

* Try a direct execution

* Fix shebang

* Correct a script

* Update certbot config

* Call explicitly with python, to be cross platform compatible

* Avoid infinite loops. Improve documentation.

* Fix syntax
This commit is contained in:
Adrien Ferrand 2019-04-12 03:07:36 +02:00 committed by ohemorange
parent d5ea9f4486
commit 2b1c77c1ca
9 changed files with 477 additions and 44 deletions

View file

@ -5,4 +5,4 @@ disable_warnings = module-not-imported,no-data-collected
[report]
# Exclude unit tests in coverage during integration tests.
omit = **/*_test.py,**/tests/*,**/certbot_nginx/parser_obj.py
omit = **/*_test.py,**/tests/*,**/dns_common*,**/certbot_nginx/parser_obj.py

View file

@ -0,0 +1,5 @@
import pytest
# Custom assertions defined in the following package need to be registered to be properly
# displayed in a pytest report when they are failing.
pytest.register_assert_rewrite('certbot_integration_tests.certbot_tests.assertions')

View file

@ -0,0 +1,75 @@
"""This module contains advanced assertions for the certbot integration tests."""
import os
import grp
def assert_hook_execution(probe_path, probe_content):
"""
Assert that a certbot hook has been executed
:param probe_path: path to the file that received the hook output
:param probe_content: content expected when the hook is executed
"""
with open(probe_path, 'r') as file:
lines = file.readlines()
assert '{0}{1}'.format(probe_content, os.linesep) in lines
def assert_save_renew_hook(config_dir, lineage):
"""
Assert that the renew hook configuration of a lineage has been saved.
:param config_dir: location of the certbot configuration
:param lineage: lineage domain name
"""
assert os.path.isfile(os.path.join(config_dir, 'renewal/{0}.conf'.format(lineage)))
def assert_cert_count_for_lineage(config_dir, lineage, count):
"""
Assert the number of certificates generated for a lineage.
:param config_dir: location of the certbot configuration
:param lineage: lineage domain name
:param count: number of expected certificates
"""
archive_dir = os.path.join(config_dir, 'archive')
lineage_dir = os.path.join(archive_dir, lineage)
certs = [file for file in os.listdir(lineage_dir) if file.startswith('cert')]
assert len(certs) == count
def assert_equals_permissions(file1, file2, mask):
"""
Assert that permissions on two files are identical in respect to a given umask.
:param file1: first file path to compare
:param file2: second file path to compare
:param mask: 3-octal representation of a POSIX umask under which the two files mode
should match (eg. 0o074 will test RWX on group and R on world)
"""
mode_file1 = os.stat(file1).st_mode & mask
mode_file2 = os.stat(file2).st_mode & mask
assert mode_file1 == mode_file2
def assert_equals_group_owner(file1, file2):
"""
Assert that two files have the same group owner.
:param file1: first file path to compare
:param file2: second file path to compare
:return:
"""
group_owner_file1 = grp.getgrgid(os.stat(file1).st_gid)[0]
group_owner_file2 = grp.getgrgid(os.stat(file2).st_gid)[0]
assert group_owner_file1 == group_owner_file2
def assert_world_permissions(file, mode):
"""
Assert that a file has the expected world permission.
:param file: file path to check
:param mode: world permissions mode expected
"""
mode_file_all = os.stat(file).st_mode & 0o007
assert mode_file_all == mode

View file

@ -1,16 +1,138 @@
"""Module to handle the context of integration tests."""
import os
import shutil
import subprocess
import sys
import tempfile
from distutils.version import LooseVersion
from certbot_integration_tests.utils import misc
class IntegrationTestsContext(object):
"""General fixture describing a certbot integration tests context"""
def __init__(self, request):
self.request = request
if hasattr(request.config, 'slaveinput'): # Worker node
self.worker_id = request.config.slaveinput['slaveid']
self.acme_xdist = request.config.slaveinput['acme_xdist']
acme_xdist = request.config.slaveinput['acme_xdist']
else: # Primary node
self.worker_id = 'primary'
self.acme_xdist = request.config.acme_xdist
self.directory_url = self.acme_xdist['directory_url']
self.tls_alpn_01_port = self.acme_xdist['https_port'][self.worker_id]
self.http_01_port = self.acme_xdist['http_port'][self.worker_id]
acme_xdist = request.config.acme_xdist
self.acme_server =acme_xdist['acme_server']
self.directory_url = acme_xdist['directory_url']
self.tls_alpn_01_port = acme_xdist['https_port'][self.worker_id]
self.http_01_port = acme_xdist['http_port'][self.worker_id]
# Challtestsrv REST API, that exposes entrypoints to register new DNS entries,
# is listening on challtestsrv_port.
self.challtestsrv_port = acme_xdist['challtestsrv_port']
# Certbot version does not depend on the test context. But getting its value requires
# calling certbot from a subprocess. Since it will be called a lot of times through
# _common_test_no_force_renew, we cache its value as a member of the fixture context.
self.certbot_version = misc.get_certbot_version()
self.workspace = tempfile.mkdtemp()
self.config_dir = os.path.join(self.workspace, 'conf')
self.hook_probe = tempfile.mkstemp(dir=self.workspace)[1]
self.manual_dns_auth_hook = (
'{0} -c "import os; import requests; import json; '
"assert not os.environ.get('CERTBOT_DOMAIN').startswith('fail'); "
"data = {{'host':'_acme-challenge.{{0}}.'.format(os.environ.get('CERTBOT_DOMAIN')),"
"'value':os.environ.get('CERTBOT_VALIDATION')}}; "
"request = requests.post('http://localhost:{1}/set-txt', data=json.dumps(data)); "
"request.raise_for_status(); "
'"'
).format(sys.executable, self.challtestsrv_port)
self.manual_dns_cleanup_hook = (
'{0} -c "import os; import requests; import json; '
"data = {{'host':'_acme-challenge.{{0}}.'.format(os.environ.get('CERTBOT_DOMAIN'))}}; "
"request = requests.post('http://localhost:{1}/clear-txt', data=json.dumps(data)); "
"request.raise_for_status(); "
'"'
).format(sys.executable, self.challtestsrv_port)
def cleanup(self):
pass
"""Cleanup the integration test context."""
shutil.rmtree(self.workspace)
def _common_test_no_force_renew(self, args):
"""
Base command to execute certbot in a distributed integration test context,
not renewing certificates by default.
"""
new_environ = os.environ.copy()
new_environ['TMPDIR'] = self.workspace
additional_args = []
if self.certbot_version >= LooseVersion('0.30.0'):
additional_args.append('--no-random-sleep-on-renew')
command = [
'certbot',
'--server', self.directory_url,
'--no-verify-ssl',
'--http-01-port', str(self.http_01_port),
'--https-port', str(self.tls_alpn_01_port),
'--manual-public-ip-logging-ok',
'--config-dir', self.config_dir,
'--work-dir', os.path.join(self.workspace, 'work'),
'--logs-dir', os.path.join(self.workspace, 'logs'),
'--non-interactive',
'--no-redirect',
'--agree-tos',
'--register-unsafely-without-email',
'--debug',
'-vv'
]
command.extend(args)
command.extend(additional_args)
print('Invoke command:\n{0}'.format(subprocess.list2cmdline(command)))
return subprocess.check_output(command, universal_newlines=True,
cwd=self.workspace, env=new_environ)
def _common_test(self, args):
"""
Base command to execute certbot in a distributed integration test context,
renewing certificates by default.
"""
command = ['--renew-by-default']
command.extend(args)
return self._common_test_no_force_renew(command)
def certbot_no_force_renew(self, args):
"""
Execute certbot with given args, not renewing certificates by default.
:param args: args to pass to certbot
:return: output of certbot execution
"""
command = ['--authenticator', 'standalone', '--installer', 'null']
command.extend(args)
return self._common_test_no_force_renew(command)
def certbot(self, args):
"""
Execute certbot with given args, renewing certificates by default.
:param args: args to pass to certbot
:return: output of certbot execution
"""
command = ['--renew-by-default']
command.extend(args)
return self.certbot_no_force_renew(command)
def get_domain(self, subdomain='le'):
"""
Generate a certificate domain name suitable for distributed certbot integration tests.
This is a requirement to let the distribution know how to redirect the challenge check
from the ACME server to the relevant pytest-xdist worker. This resolution is done by
appending the pytest worker id to the subdomain, using this pattern:
{subdomain}.{worker_id}.wtf
:param subdomain: the subdomain to use in the generated domain (default 'le')
:return: the well-formed domain suitable for redirection on
"""
return '{0}.{1}.wtf'.format(subdomain, self.worker_id)

View file

@ -1,9 +1,16 @@
import requests
import urllib3
"""Module executing integration tests against certbot core."""
from __future__ import print_function
import os
import shutil
from os.path import join
import pytest
from certbot_integration_tests.certbot_tests import context as certbot_context
from certbot_integration_tests.certbot_tests.assertions import (
assert_hook_execution, assert_save_renew_hook, assert_cert_count_for_lineage,
assert_world_permissions, assert_equals_group_owner, assert_equals_permissions,
)
from certbot_integration_tests.utils import misc
@pytest.fixture()
@ -16,25 +23,78 @@ def context(request):
integration_test_context.cleanup()
def test_hello_1(context):
assert context.http_01_port
assert context.tls_alpn_01_port
try:
response = requests.get(context.directory_url, verify=False)
response.raise_for_status()
assert response.json()
response.close()
except urllib3.exceptions.InsecureRequestWarning:
pass
def test_manual_http_auth(context):
"""Test the HTTP-01 challenge using manual plugin."""
with misc.create_http_server(context.http_01_port) as webroot,\
misc.manual_http_hooks(webroot, context.http_01_port) as scripts:
certname = context.get_domain()
context.certbot([
'certonly', '-a', 'manual', '-d', certname,
'--cert-name', certname,
'--manual-auth-hook', scripts[0],
'--manual-cleanup-hook', scripts[1],
'--pre-hook', 'echo wtf.pre >> "{0}"'.format(context.hook_probe),
'--post-hook', 'echo wtf.post >> "{0}"'.format(context.hook_probe),
'--deploy-hook', 'echo deploy >> "{0}"'.format(context.hook_probe)
])
assert_hook_execution(context.hook_probe, 'deploy')
assert_save_renew_hook(context.config_dir, certname)
def test_hello_2(context):
assert context.http_01_port
assert context.tls_alpn_01_port
try:
response = requests.get(context.directory_url, verify=False)
response.raise_for_status()
assert response.json()
response.close()
except urllib3.exceptions.InsecureRequestWarning:
pass
def test_manual_dns_auth(context):
"""Test the DNS-01 challenge using manual plugin."""
certname = context.get_domain('dns')
context.certbot([
'-a', 'manual', '-d', certname, '--preferred-challenges', 'dns',
'run', '--cert-name', certname,
'--manual-auth-hook', context.manual_dns_auth_hook,
'--manual-cleanup-hook', context.manual_dns_cleanup_hook,
'--pre-hook', 'echo wtf.pre >> "{0}"'.format(context.hook_probe),
'--post-hook', 'echo wtf.post >> "{0}"'.format(context.hook_probe),
'--renew-hook', 'echo renew >> "{0}"'.format(context.hook_probe)
])
with pytest.raises(AssertionError):
assert_hook_execution(context.hook_probe, 'renew')
assert_save_renew_hook(context.config_dir, certname)
def test_renew_files_permissions(context):
"""Test certificate file permissions upon renewal"""
certname = context.get_domain('renew')
context.certbot(['-d', certname])
assert_cert_count_for_lineage(context.config_dir, certname, 1)
assert_world_permissions(
join(context.config_dir, 'archive', certname, 'privkey1.pem'), 0)
# Force renew. Assert certificate renewal and proper permissions.
# We assert certificate renewal and proper permissions.
context.certbot(['renew'])
assert_cert_count_for_lineage(context.config_dir, certname, 2)
assert_world_permissions(
join(context.config_dir, 'archive', certname, '/privkey2.pem'), 0)
assert_equals_group_owner(
join(context.config_dir, 'archive', certname, 'privkey1.pem'),
join(context.config_dir, 'archive', certname, 'privkey2.pem'))
assert_equals_permissions(
join(context.config_dir, 'archive', certname, 'privkey1.pem'),
join(context.config_dir, 'archive', certname, 'privkey2.pem'), 0o074)
def test_renew_with_hook_scripts(context):
"""Test certificate renewal with script hooks."""
certname = context.get_domain('renew')
context.certbot(['-d', certname])
assert_cert_count_for_lineage(context.config_dir, certname, 1)
# Force renew. Assert certificate renewal and hook scripts execution.
misc.generate_test_file_hooks(context.config_dir, context.hook_probe)
context.certbot(['renew'])
assert_cert_count_for_lineage(context.config_dir, certname, 2)
assert_hook_execution(context.hook_probe, 'deploy')

View file

@ -2,11 +2,21 @@
Misc module contains stateless functions that could be used during pytest execution,
or outside during setup/teardown of the integration tests environment.
"""
import os
import time
import contextlib
import errno
import multiprocessing
import os
import shutil
import stat
import subprocess
import sys
import tempfile
import time
from distutils.version import LooseVersion
import requests
from OpenSSL import crypto
from six.moves import socketserver, SimpleHTTPServer
def check_until_timeout(url):
@ -30,16 +40,172 @@ def check_until_timeout(url):
raise ValueError('Error, url did not respond after 150 attempts: {0}'.format(url))
@contextlib.contextmanager
def execute_in_given_cwd(cwd):
class GracefulTCPServer(socketserver.TCPServer):
"""
Context manager that will execute any command in the given cwd after entering context,
and restore current cwd when context is destroyed.
:param str cwd: the path to use as the temporary current workspace for python execution
This subclass of TCPServer allows graceful reuse of an address that has
just been released by another instance of TCPServer.
"""
allow_reuse_address = True
@contextlib.contextmanager
def create_http_server(port):
"""
Setup and start an HTTP server for the given TCP port.
This server stays active for the lifetime of the context, and is automatically
stopped with context exit, while its temporary webroot is deleted.
:param int port: the TCP port to use
:return str: the temporary webroot attached to this server
"""
current_cwd = os.getcwd()
webroot = tempfile.mkdtemp()
def run():
GracefulTCPServer(('', port), SimpleHTTPServer.SimpleHTTPRequestHandler).serve_forever()
process = multiprocessing.Process(target=run)
try:
os.chdir(cwd)
yield
# SimpleHTTPServer is designed to serve files from the current working directory at the
# time it starts. So we temporarily change the cwd to our crafted webroot before launch.
try:
os.chdir(webroot)
process.start()
finally:
os.chdir(current_cwd)
check_until_timeout('http://localhost:{0}/'.format(port))
yield webroot
finally:
os.chdir(current_cwd)
try:
if process.is_alive():
process.terminate()
process.join() # Block until process is effectively terminated
finally:
shutil.rmtree(webroot)
def list_renewal_hooks_dirs(config_dir):
"""
Find and return paths of all hook directories for the given certbot config directory
:param str config_dir: path to the certbot config directory
:return str[]: list of path to the standard hooks directory for this certbot instance
"""
renewal_hooks_root = os.path.join(config_dir, 'renewal-hooks')
return [os.path.join(renewal_hooks_root, item) for item in ['pre', 'deploy', 'post']]
def generate_test_file_hooks(config_dir, hook_probe):
"""
Create a suite of certbot hook scripts and put them in the relevant hook directory
for the given certbot configuration directory. These scripts, when executed, will write
specific verbs in the given hook_probe file to allow asserting they have effectively
been executed. The deploy hook also checks that the renewal environment variables are set.
:param str config_dir: current certbot config directory
:param hook_probe: path to the hook probe to test hook scripts execution
"""
if sys.platform == 'win32':
extension = 'bat'
else:
extension = 'sh'
renewal_hooks_dirs = list_renewal_hooks_dirs(config_dir)
renewal_deploy_hook_path = os.path.join(renewal_hooks_dirs[1], 'hook.sh')
for hook_dir in renewal_hooks_dirs:
# We want an equivalent of bash `chmod -p $HOOK_DIR, that does not fail if one folder of
# the hierarchy already exists. It is not the case of os.makedirs. Python 3 has an
# optional parameter `exists_ok` to not fail on existing dir, but Python 2.7 does not.
# So we pass through a try except pass for it. To be removed with dropped support on py27.
try:
os.makedirs(hook_dir)
except OSError as error:
if error.errno != errno.EEXIST:
raise
hook_path = os.path.join(hook_dir, 'hook.{0}'.format(extension))
if extension == 'sh':
data = '''\
#!/bin/bash -xe
if [ "$0" = "{0}" ]; then
if [ -z "$RENEWED_DOMAINS" -o -z "$RENEWED_LINEAGE" ]; then
echo "Environment variables not properly set!" >&2
exit 1
fi
fi
echo $(basename $(dirname "$0")) >> "{1}"\
'''.format(renewal_deploy_hook_path, hook_probe)
else:
# TODO: Write the equivalent bat file for Windows
data = '''\
'''
with open(hook_path, 'w') as file:
file.write(data)
os.chmod(hook_path, os.stat(hook_path).st_mode | stat.S_IEXEC)
@contextlib.contextmanager
def manual_http_hooks(http_server_root, http_port):
"""
Generate suitable http-01 hooks command for test purpose in the given HTTP
server webroot directory. These hooks command use temporary python scripts
that are deleted upon context exit.
:param str http_server_root: path to the HTTP server configured to serve http-01 challenges
:param int http_port: HTTP port that the HTTP server listen on
:return (str, str): a tuple containing the authentication hook and cleanup hook commands
"""
tempdir = tempfile.mkdtemp()
try:
auth_script_path = os.path.join(tempdir, 'auth.py')
with open(auth_script_path, 'w') as file_h:
file_h.write('''\
#!/usr/bin/env python
import os
import requests
import time
import sys
challenge_dir = os.path.join('{0}', '.well-known', 'acme-challenge')
os.makedirs(challenge_dir)
challenge_file = os.path.join(challenge_dir, os.environ.get('CERTBOT_TOKEN'))
with open(challenge_file, 'w') as file_h:
file_h.write(os.environ.get('CERTBOT_VALIDATION'))
url = 'http://localhost:{1}/.well-known/acme-challenge/' + os.environ.get('CERTBOT_TOKEN')
for _ in range(0, 10):
time.sleep(1)
try:
if request.get(url).status_code == 200:
sys.exit(0)
except requests.exceptions.ConnectionError:
pass
raise ValueError('Error, url did not respond after 10 attempts: {{0}}'.format(url))
'''.format(http_server_root, http_port))
os.chmod(auth_script_path, 0o755)
cleanup_script_path = os.path.join(tempdir, 'cleanup.py')
with open(cleanup_script_path, 'w') as file_h:
file_h.write('''\
#!/usr/bin/env python
import os
import shutil
well_known = os.path.join('{0}', '.well-known')
shutil.rmtree(well_known)
'''.format(http_server_root))
os.chmod(cleanup_script_path, 0o755)
yield ('{0} {1}'.format(sys.executable, auth_script_path),
'{0} {1}'.format(sys.executable, cleanup_script_path))
finally:
shutil.rmtree(tempdir)
def get_certbot_version():
"""
Find the version of the certbot available in PATH.
:return str: the certbot version
"""
output = subprocess.check_output(['certbot', '--version'],
universal_newlines=True, stderr=subprocess.STDOUT)
# Typical response is: output = 'certbot 0.31.0.dev0'
version_str = output.split(' ')[1].strip()
return LooseVersion(version_str)

View file

@ -5,13 +5,17 @@ from setuptools import find_packages
version = '0.32.0.dev0'
install_requires = [
'coverage',
'cryptography',
'pyopenssl',
'pytest',
'pytest-cov',
'pytest-xdist',
'pytest-rerunfailures==4.2',
'pytest-sugar',
'coverage',
'requests',
'pytest-xdist',
'pyyaml',
'requests',
'six',
]
setup(

View file

@ -57,6 +57,7 @@ pytest-cov==2.5.1
pytest-forked==0.2
pytest-xdist==1.22.5
pytest-sugar==0.9.2
pytest-rerunfailures==4.2
python-dateutil==2.6.1
python-digitalocean==1.11
PyYAML==3.13

View file

@ -259,6 +259,6 @@ commands =
--cov=acme --cov=certbot --cov=certbot_nginx --cov-report= \
--cov-config={toxinidir}/certbot-ci/certbot_integration_tests/.coveragerc \
-W 'ignore:Unverified HTTPS request'
coverage report --fail-under=65 --show-missing
coverage report --include 'certbot/*' --show-missing
passenv =
DOCKER_*