diff --git a/certbot-ci/certbot_integration_tests/assets/hook.py b/certbot-ci/certbot_integration_tests/assets/hook.py index c11704f47..1b04b2998 100755 --- a/certbot-ci/certbot_integration_tests/assets/hook.py +++ b/certbot-ci/certbot_integration_tests/assets/hook.py @@ -1,9 +1,11 @@ #!/usr/bin/env python +"""A Certbot hook for probing.""" import os import sys hook_script_type = os.path.basename(os.path.dirname(sys.argv[1])) -if hook_script_type == 'deploy' and ('RENEWED_DOMAINS' not in os.environ or 'RENEWED_LINEAGE' not in os.environ): +if hook_script_type == 'deploy' and ('RENEWED_DOMAINS' not in os.environ + or 'RENEWED_LINEAGE' not in os.environ): sys.stderr.write('Environment variables not properly set!\n') sys.exit(1) diff --git a/certbot-ci/certbot_integration_tests/certbot_tests/assertions.py b/certbot-ci/certbot_integration_tests/certbot_tests/assertions.py index c223d524c..92ce8fac8 100644 --- a/certbot-ci/certbot_integration_tests/certbot_tests/assertions.py +++ b/certbot-ci/certbot_integration_tests/certbot_tests/assertions.py @@ -1,9 +1,10 @@ """This module contains advanced assertions for the certbot integration tests.""" import io import os +from typing import Type from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey +from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey, EllipticCurve from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey from cryptography.hazmat.primitives.serialization import load_pem_private_key @@ -11,7 +12,6 @@ try: import grp POSIX_MODE = True except ImportError: - import win32api import win32security import ntsecuritycon POSIX_MODE = False @@ -21,11 +21,11 @@ SYSTEM_SID = 'S-1-5-18' ADMINS_SID = 'S-1-5-32-544' -def assert_elliptic_key(key, curve): +def assert_elliptic_key(key: str, curve: Type[EllipticCurve]) -> None: """ Asserts that the key at the given path is an EC key using the given curve. :param key: path to key - :param curve: name of the expected elliptic curve + :param EllipticCurve curve: name of the expected elliptic curve """ with open(key, 'rb') as file: privkey1 = file.read() @@ -36,10 +36,10 @@ def assert_elliptic_key(key, curve): assert isinstance(key.curve, curve) -def assert_rsa_key(key): +def assert_rsa_key(key: str) -> None: """ Asserts that the key at the given path is an RSA key. - :param key: path to key + :param str key: path to key """ with open(key, 'rb') as file: privkey1 = file.read() @@ -48,11 +48,11 @@ def assert_rsa_key(key): assert isinstance(key, RSAPrivateKey) -def assert_hook_execution(probe_path, probe_content): +def assert_hook_execution(probe_path: str, probe_content: str) -> None: """ Assert that a certbot hook has been executed - :param probe_path: path to the file that received the hook output - :param probe_content: content expected when the hook is executed + :param str probe_path: path to the file that received the hook output + :param str probe_content: content expected when the hook is executed """ encoding = 'utf-8' if POSIX_MODE else 'utf-16' with io.open(probe_path, 'rt', encoding=encoding) as file: @@ -62,22 +62,22 @@ def assert_hook_execution(probe_path, probe_content): assert probe_content in lines -def assert_saved_renew_hook(config_dir, lineage): +def assert_saved_renew_hook(config_dir: str, lineage: str) -> None: """ Assert that the renew hook configuration of a lineage has been saved. - :param config_dir: location of the certbot configuration - :param lineage: lineage domain name + :param str config_dir: location of the certbot configuration + :param str lineage: lineage domain name """ with open(os.path.join(config_dir, 'renewal', '{0}.conf'.format(lineage))) as file_h: assert 'renew_hook' in file_h.read() -def assert_cert_count_for_lineage(config_dir, lineage, count): +def assert_cert_count_for_lineage(config_dir: str, lineage: str, count: int) -> None: """ Assert the number of certificates generated for a lineage. - :param config_dir: location of the certbot configuration - :param lineage: lineage domain name - :param count: number of expected certificates + :param str config_dir: location of the certbot configuration + :param str lineage: lineage domain name + :param int count: number of expected certificates """ archive_dir = os.path.join(config_dir, 'archive') lineage_dir = os.path.join(archive_dir, lineage) @@ -85,11 +85,11 @@ def assert_cert_count_for_lineage(config_dir, lineage, count): assert len(certs) == count -def assert_equals_group_permissions(file1, file2): +def assert_equals_group_permissions(file1: str, file2: str) -> None: """ Assert that two files have the same permissions for group owner. - :param file1: first file path to compare - :param file2: second file path to compare + :param str file1: first file path to compare + :param str file2: second file path to compare """ # On Windows there is no group, so this assertion does nothing on this platform if POSIX_MODE: @@ -99,11 +99,11 @@ def assert_equals_group_permissions(file1, file2): assert mode_file1 == mode_file2 -def assert_equals_world_read_permissions(file1, file2): +def assert_equals_world_read_permissions(file1: str, file2: str) -> None: """ Assert that two files have the same read permissions for everyone. - :param file1: first file path to compare - :param file2: second file path to compare + :param str file1: first file path to compare + :param str file2: second file path to compare """ if POSIX_MODE: mode_file1 = os.stat(file1).st_mode & 0o004 @@ -134,11 +134,11 @@ def assert_equals_world_read_permissions(file1, file2): assert mode_file1 == mode_file2 -def assert_equals_group_owner(file1, file2): +def assert_equals_group_owner(file1: str, file2: str) -> None: """ Assert that two files have the same group owner. - :param file1: first file path to compare - :param file2: second file path to compare + :param str file1: first file path to compare + :param str file2: second file path to compare """ # On Windows there is no group, so this assertion does nothing on this platform if POSIX_MODE: @@ -148,10 +148,10 @@ def assert_equals_group_owner(file1, file2): assert group_owner_file1 == group_owner_file2 -def assert_world_no_permissions(file): +def assert_world_no_permissions(file: str) -> None: """ Assert that the given file is not world-readable. - :param file: path of the file to check + :param str file: path of the file to check """ if POSIX_MODE: mode_file_all = os.stat(file).st_mode & 0o007 @@ -168,10 +168,10 @@ def assert_world_no_permissions(file): assert not mode -def assert_world_read_permissions(file): +def assert_world_read_permissions(file: str) -> None: """ Assert that the given file is world-readable, but not world-writable or world-executable. - :param file: path of the file to check + :param str file: path of the file to check """ if POSIX_MODE: mode_file_all = os.stat(file).st_mode & 0o007 @@ -188,8 +188,3 @@ def assert_world_read_permissions(file): assert not mode & ntsecuritycon.FILE_GENERIC_WRITE assert not mode & ntsecuritycon.FILE_GENERIC_EXECUTE assert mode & ntsecuritycon.FILE_GENERIC_READ == ntsecuritycon.FILE_GENERIC_READ - - -def _get_current_user(): - account_name = win32api.GetUserNameEx(win32api.NameSamCompatible) - return win32security.LookupAccountName(None, account_name)[0] diff --git a/certbot-ci/certbot_integration_tests/certbot_tests/context.py b/certbot-ci/certbot_integration_tests/certbot_tests/context.py index c86a06754..fdef82252 100644 --- a/certbot-ci/certbot_integration_tests/certbot_tests/context.py +++ b/certbot-ci/certbot_integration_tests/certbot_tests/context.py @@ -3,21 +3,25 @@ import os import shutil import sys import tempfile +from typing import Iterable +from typing import Tuple + +import pytest from certbot_integration_tests.utils import certbot_call class IntegrationTestsContext: """General fixture describing a certbot integration tests context""" - def __init__(self, request): + def __init__(self, request: pytest.FixtureRequest) -> None: self.request = request if hasattr(request.config, 'workerinput'): # Worker node - self.worker_id = request.config.workerinput['workerid'] - acme_xdist = request.config.workerinput['acme_xdist'] + self.worker_id = request.config.workerinput['workerid'] # type: ignore[attr-defined] + acme_xdist = request.config.workerinput['acme_xdist'] # type: ignore[attr-defined] else: # Primary node self.worker_id = 'primary' - acme_xdist = request.config.acme_xdist + acme_xdist = request.config.acme_xdist # type: ignore[attr-defined] self.acme_server = acme_xdist['acme_server'] self.directory_url = acme_xdist['directory_url'] @@ -52,16 +56,17 @@ class IntegrationTestsContext: '"' ).format(sys.executable, self.challtestsrv_port) - def cleanup(self): + def cleanup(self) -> None: """Cleanup the integration test context.""" shutil.rmtree(self.workspace) - def certbot(self, args, force_renew=True): + def certbot(self, args: Iterable[str], force_renew: bool = True) -> Tuple[str, str]: """ Execute certbot with given args, not renewing certificates by default. :param args: args to pass to certbot - :param force_renew: set to False to not renew by default + :param bool force_renew: set to False to not renew by default :return: stdout and stderr from certbot execution + :rtype: Tuple of `str` """ command = ['--authenticator', 'standalone', '--installer', 'null'] command.extend(args) @@ -69,14 +74,15 @@ class IntegrationTestsContext: command, self.directory_url, self.http_01_port, self.tls_alpn_01_port, self.config_dir, self.workspace, force_renew=force_renew) - def get_domain(self, subdomain='le'): + def get_domain(self, subdomain: str = 'le') -> str: """ Generate a certificate domain name suitable for distributed certbot integration tests. This is a requirement to let the distribution know how to redirect the challenge check from the ACME server to the relevant pytest-xdist worker. This resolution is done by appending the pytest worker id to the subdomain, using this pattern: {subdomain}.{worker_id}.wtf - :param subdomain: the subdomain to use in the generated domain (default 'le') + :param str subdomain: the subdomain to use in the generated domain (default 'le') :return: the well-formed domain suitable for redirection on + :rtype: str """ return '{0}.{1}.wtf'.format(subdomain, self.worker_id) diff --git a/certbot-ci/certbot_integration_tests/certbot_tests/test_main.py b/certbot-ci/certbot_integration_tests/certbot_tests/test_main.py index 12c45088f..0e3a08c8c 100644 --- a/certbot-ci/certbot_integration_tests/certbot_tests/test_main.py +++ b/certbot-ci/certbot_integration_tests/certbot_tests/test_main.py @@ -1,5 +1,4 @@ """Module executing integration tests against certbot core.""" - import os from os.path import exists from os.path import join @@ -7,14 +6,18 @@ import re import shutil import subprocess import time +from typing import Iterable +from typing import Generator +from typing import Type +from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurve from cryptography.hazmat.primitives.asymmetric.ec import SECP256R1 from cryptography.hazmat.primitives.asymmetric.ec import SECP384R1 from cryptography.hazmat.primitives.asymmetric.ec import SECP521R1 from cryptography.x509 import NameOID import pytest -from certbot_integration_tests.certbot_tests import context as certbot_context +from certbot_integration_tests.certbot_tests.context import IntegrationTestsContext from certbot_integration_tests.certbot_tests.assertions import assert_cert_count_for_lineage from certbot_integration_tests.certbot_tests.assertions import assert_elliptic_key from certbot_integration_tests.certbot_tests.assertions import assert_equals_group_owner @@ -30,17 +33,17 @@ from certbot_integration_tests.utils import misc @pytest.fixture(name='context') -def test_context(request): - # pylint: disable=missing-function-docstring +def test_context(request: pytest.FixtureRequest) -> Generator[IntegrationTestsContext, None, None]: + """Fixture providing the integration test context.""" # Fixture request is a built-in pytest fixture describing current test request. - integration_test_context = certbot_context.IntegrationTestsContext(request) + integration_test_context = IntegrationTestsContext(request) try: yield integration_test_context finally: integration_test_context.cleanup() -def test_basic_commands(context): +def test_basic_commands(context: IntegrationTestsContext) -> None: """Test simple commands on Certbot CLI.""" # TMPDIR env variable is set to workspace for the certbot subprocess. # So tempdir module will create any temporary files/dirs in workspace, @@ -58,7 +61,7 @@ def test_basic_commands(context): assert initial_count_tmpfiles == new_count_tmpfiles -def test_hook_dirs_creation(context): +def test_hook_dirs_creation(context: IntegrationTestsContext) -> None: """Test all hooks directory are created during Certbot startup.""" context.certbot(['register']) @@ -66,7 +69,7 @@ def test_hook_dirs_creation(context): assert os.path.isdir(hook_dir) -def test_registration_override(context): +def test_registration_override(context: IntegrationTestsContext) -> None: """Test correct register/unregister, and registration override.""" context.certbot(['register']) context.certbot(['unregister']) @@ -76,14 +79,14 @@ def test_registration_override(context): context.certbot(['update_account', '--email', 'ex1@domain.org,ex2@domain.org']) -def test_prepare_plugins(context): +def test_prepare_plugins(context: IntegrationTestsContext) -> None: """Test that plugins are correctly instantiated and displayed.""" stdout, _ = context.certbot(['plugins', '--init', '--prepare']) assert 'webroot' in stdout -def test_http_01(context): +def test_http_01(context: IntegrationTestsContext) -> None: """Test the HTTP-01 challenge using standalone plugin.""" # We start a server listening on the port for the # TLS-SNI challenge to prevent regressions in #3601. @@ -101,7 +104,7 @@ def test_http_01(context): assert_saved_renew_hook(context.config_dir, certname) -def test_manual_http_auth(context): +def test_manual_http_auth(context: IntegrationTestsContext) -> None: """Test the HTTP-01 challenge using manual plugin.""" with misc.create_http_server(context.http_01_port) as webroot,\ misc.manual_http_hooks(webroot, context.http_01_port) as scripts: @@ -122,7 +125,7 @@ def test_manual_http_auth(context): assert_saved_renew_hook(context.config_dir, certname) -def test_manual_dns_auth(context): +def test_manual_dns_auth(context: IntegrationTestsContext) -> None: """Test the DNS-01 challenge using manual plugin.""" certname = context.get_domain('dns') context.certbot([ @@ -144,14 +147,14 @@ def test_manual_dns_auth(context): assert_cert_count_for_lineage(context.config_dir, certname, 2) -def test_certonly(context): +def test_certonly(context: IntegrationTestsContext) -> None: """Test the certonly verb on certbot.""" context.certbot(['certonly', '--cert-name', 'newname', '-d', context.get_domain('newname')]) assert_cert_count_for_lineage(context.config_dir, 'newname', 1) -def test_certonly_webroot(context): +def test_certonly_webroot(context: IntegrationTestsContext) -> None: """Test the certonly verb with webroot plugin""" with misc.create_http_server(context.http_01_port) as webroot: certname = context.get_domain('webroot') @@ -160,7 +163,7 @@ def test_certonly_webroot(context): assert_cert_count_for_lineage(context.config_dir, certname, 1) -def test_auth_and_install_with_csr(context): +def test_auth_and_install_with_csr(context: IntegrationTestsContext) -> None: """Test certificate issuance and install using an existing CSR.""" certname = context.get_domain('le3') key_path = join(context.workspace, 'key.pem') @@ -187,7 +190,7 @@ def test_auth_and_install_with_csr(context): ]) -def test_renew_files_permissions(context): +def test_renew_files_permissions(context: IntegrationTestsContext) -> None: """Test proper certificate file permissions upon renewal""" certname = context.get_domain('renew') context.certbot(['-d', certname]) @@ -207,7 +210,7 @@ def test_renew_files_permissions(context): assert_equals_group_permissions(privkey1, privkey2) -def test_renew_with_hook_scripts(context): +def test_renew_with_hook_scripts(context: IntegrationTestsContext) -> None: """Test certificate renewal with script hooks.""" certname = context.get_domain('renew') context.certbot(['-d', certname]) @@ -221,7 +224,7 @@ def test_renew_with_hook_scripts(context): assert_hook_execution(context.hook_probe, 'deploy') -def test_renew_files_propagate_permissions(context): +def test_renew_files_propagate_permissions(context: IntegrationTestsContext) -> None: """Test proper certificate renewal with custom permissions propagated on private key.""" certname = context.get_domain('renew') context.certbot(['-d', certname]) @@ -263,7 +266,7 @@ def test_renew_files_propagate_permissions(context): assert_world_no_permissions(privkey2) -def test_graceful_renew_it_is_not_time(context): +def test_graceful_renew_it_is_not_time(context: IntegrationTestsContext) -> None: """Test graceful renew is not done when it is not due time.""" certname = context.get_domain('renew') context.certbot(['-d', certname]) @@ -278,7 +281,7 @@ def test_graceful_renew_it_is_not_time(context): assert_hook_execution(context.hook_probe, 'deploy') -def test_graceful_renew_it_is_time(context): +def test_graceful_renew_it_is_time(context: IntegrationTestsContext) -> None: """Test graceful renew is done when it is due time.""" certname = context.get_domain('renew') context.certbot(['-d', certname]) @@ -298,7 +301,7 @@ def test_graceful_renew_it_is_time(context): assert_hook_execution(context.hook_probe, 'deploy') -def test_renew_with_changed_private_key_complexity(context): +def test_renew_with_changed_private_key_complexity(context: IntegrationTestsContext) -> None: """Test proper renew with updated private key complexity.""" certname = context.get_domain('renew') context.certbot(['-d', certname, '--rsa-key-size', '4096']) @@ -320,7 +323,7 @@ def test_renew_with_changed_private_key_complexity(context): assert os.stat(key3).st_size < 1800 # 2048 bits keys takes less than 1800 bytes -def test_renew_ignoring_directory_hooks(context): +def test_renew_ignoring_directory_hooks(context: IntegrationTestsContext) -> None: """Test hooks are ignored during renewal with relevant CLI flag.""" certname = context.get_domain('renew') context.certbot(['-d', certname]) @@ -335,7 +338,7 @@ def test_renew_ignoring_directory_hooks(context): assert_hook_execution(context.hook_probe, 'deploy') -def test_renew_empty_hook_scripts(context): +def test_renew_empty_hook_scripts(context: IntegrationTestsContext) -> None: """Test proper renew with empty hook scripts.""" certname = context.get_domain('renew') context.certbot(['-d', certname]) @@ -353,7 +356,7 @@ def test_renew_empty_hook_scripts(context): assert_cert_count_for_lineage(context.config_dir, certname, 2) -def test_renew_hook_override(context): +def test_renew_hook_override(context: IntegrationTestsContext) -> None: """Test correct hook override on renew.""" certname = context.get_domain('override') context.certbot([ @@ -398,7 +401,7 @@ def test_renew_hook_override(context): assert_hook_execution(context.hook_probe, 'deploy_override') -def test_invalid_domain_with_dns_challenge(context): +def test_invalid_domain_with_dns_challenge(context: IntegrationTestsContext) -> None: """Test certificate issuance failure with DNS-01 challenge.""" # Manual dns auth hooks from misc are designed to fail if the domain contains 'fail-*'. domains = ','.join([context.get_domain('dns1'), context.get_domain('fail-dns1')]) @@ -415,7 +418,7 @@ def test_invalid_domain_with_dns_challenge(context): assert context.get_domain('fail-dns1') not in stdout -def test_reuse_key(context): +def test_reuse_key(context: IntegrationTestsContext) -> None: """Test various scenarios where a key is reused.""" certname = context.get_domain('reusekey') context.certbot(['--domains', certname, '--reuse-key']) @@ -458,12 +461,12 @@ def test_reuse_key(context): assert len({cert1, cert2, cert3}) == 3 -def test_incorrect_key_type(context): +def test_incorrect_key_type(context: IntegrationTestsContext) -> None: with pytest.raises(subprocess.CalledProcessError): context.certbot(['--key-type="failwhale"']) -def test_ecdsa(context): +def test_ecdsa(context: IntegrationTestsContext) -> None: """Test issuance for ECDSA CSR based request (legacy supported mode).""" key_path = join(context.workspace, 'privkey-p384.pem') csr_path = join(context.workspace, 'csr-p384.der') @@ -484,7 +487,7 @@ def test_ecdsa(context): assert 'ASN1 OID: secp384r1' in certificate -def test_default_key_type(context): +def test_default_key_type(context: IntegrationTestsContext) -> None: """Test default key type is RSA""" certname = context.get_domain('renew') context.certbot([ @@ -495,7 +498,7 @@ def test_default_key_type(context): assert_rsa_key(filename) -def test_default_curve_type(context): +def test_default_curve_type(context: IntegrationTestsContext) -> None: """test that the curve used when not specifying any is secp256r1""" certname = context.get_domain('renew') context.certbot([ @@ -511,7 +514,8 @@ def test_default_curve_type(context): ('secp384r1', SECP384R1, []), ('secp521r1', SECP521R1, ['boulder-v1', 'boulder-v2'])] ) -def test_ecdsa_curves(context, curve, curve_cls, skip_servers): +def test_ecdsa_curves(context: IntegrationTestsContext, curve: str, curve_cls: Type[EllipticCurve], + skip_servers: Iterable[str]) -> None: """Test issuance for each supported ECDSA curve""" if context.acme_server in skip_servers: pytest.skip('ACME server {} does not support ECDSA curve {}' @@ -527,7 +531,7 @@ def test_ecdsa_curves(context, curve, curve_cls, skip_servers): assert_elliptic_key(key, curve_cls) -def test_renew_with_ec_keys(context): +def test_renew_with_ec_keys(context: IntegrationTestsContext) -> None: """Test proper renew with updated private key complexity.""" certname = context.get_domain('renew') context.certbot([ @@ -567,7 +571,7 @@ def test_renew_with_ec_keys(context): assert_rsa_key(key3) -def test_ocsp_must_staple(context): +def test_ocsp_must_staple(context: IntegrationTestsContext) -> None: """Test that OCSP Must-Staple is correctly set in the generated certificate.""" if context.acme_server == 'pebble': pytest.skip('Pebble does not support OCSP Must-Staple.') @@ -580,7 +584,7 @@ def test_ocsp_must_staple(context): assert 'status_request' in certificate or '1.3.6.1.5.5.7.1.24' in certificate -def test_revoke_simple(context): +def test_revoke_simple(context: IntegrationTestsContext) -> None: """Test various scenarios that revokes a certificate.""" # Default action after revoke is to delete the certificate. certname = context.get_domain() @@ -611,7 +615,7 @@ def test_revoke_simple(context): context.certbot(['revoke', '--cert-path', cert_path, '--key-path', key_path]) -def test_revoke_and_unregister(context): +def test_revoke_and_unregister(context: IntegrationTestsContext) -> None: """Test revoke with a reason then unregister.""" cert1 = context.get_domain('le1') cert2 = context.get_domain('le2') @@ -639,7 +643,7 @@ def test_revoke_and_unregister(context): assert cert3 in stdout -def test_revoke_mutual_exclusive_flags(context): +def test_revoke_mutual_exclusive_flags(context: IntegrationTestsContext) -> None: """Test --cert-path and --cert-name cannot be used during revoke.""" cert = context.get_domain('le1') context.certbot(['-d', cert]) @@ -651,7 +655,7 @@ def test_revoke_mutual_exclusive_flags(context): assert 'Exactly one of --cert-path or --cert-name must be specified' in error.value.stderr -def test_revoke_multiple_lineages(context): +def test_revoke_multiple_lineages(context: IntegrationTestsContext) -> None: """Test revoke does not delete certs if multiple lineages share the same dir.""" cert1 = context.get_domain('le1') context.certbot(['-d', cert1]) @@ -683,7 +687,7 @@ def test_revoke_multiple_lineages(context): assert 'Not deleting revoked certificates due to overlapping archive dirs' in f.read() -def test_wildcard_certificates(context): +def test_wildcard_certificates(context: IntegrationTestsContext) -> None: """Test wildcard certificate issuance.""" if context.acme_server == 'boulder-v1': pytest.skip('Wildcard certificates are not supported on ACME v1') @@ -700,7 +704,7 @@ def test_wildcard_certificates(context): assert exists(join(context.config_dir, 'live', certname, 'fullchain.pem')) -def test_ocsp_status_stale(context): +def test_ocsp_status_stale(context: IntegrationTestsContext) -> None: """Test retrieval of OCSP statuses for staled config""" sample_data_path = misc.load_sample_data_path(context.workspace) stdout, _ = context.certbot(['certificates', '--config-dir', sample_data_path]) @@ -711,7 +715,7 @@ def test_ocsp_status_stale(context): .format(stdout.count('EXPIRED'))) -def test_ocsp_status_live(context): +def test_ocsp_status_live(context: IntegrationTestsContext) -> None: """Test retrieval of OCSP statuses for live config""" cert = context.get_domain('ocsp-check') @@ -733,7 +737,7 @@ def test_ocsp_status_live(context): assert stdout.count('REVOKED') == 1, 'Expected {0} to be REVOKED'.format(cert) -def test_ocsp_renew(context): +def test_ocsp_renew(context: IntegrationTestsContext) -> None: """Test that revoked certificates are renewed.""" # Obtain a certificate certname = context.get_domain('ocsp-renew') @@ -750,7 +754,7 @@ def test_ocsp_renew(context): assert_cert_count_for_lineage(context.config_dir, certname, 2) -def test_dry_run_deactivate_authzs(context): +def test_dry_run_deactivate_authzs(context: IntegrationTestsContext) -> None: """Test that Certbot deactivates authorizations when performing a dry run""" name = context.get_domain('dry-run-authz-deactivation') @@ -768,7 +772,7 @@ def test_dry_run_deactivate_authzs(context): assert log_line in f.read(), 'Second order should have been recreated due to authz reuse' -def test_preferred_chain(context): +def test_preferred_chain(context: IntegrationTestsContext) -> None: """Test that --preferred-chain results in the correct chain.pem being produced""" try: issuers = misc.get_acme_issuers(context) diff --git a/certbot-ci/certbot_integration_tests/conftest.py b/certbot-ci/certbot_integration_tests/conftest.py index 5e6ed5562..deb9a5228 100644 --- a/certbot-ci/certbot_integration_tests/conftest.py +++ b/certbot-ci/certbot_integration_tests/conftest.py @@ -1,3 +1,4 @@ +# type: ignore """ General conftest for pytest execution of all integration tests lying in the certbot_integration tests package. diff --git a/certbot-ci/certbot_integration_tests/nginx_tests/context.py b/certbot-ci/certbot_integration_tests/nginx_tests/context.py index f273ed9ff..2a8881aa9 100644 --- a/certbot-ci/certbot_integration_tests/nginx_tests/context.py +++ b/certbot-ci/certbot_integration_tests/nginx_tests/context.py @@ -1,6 +1,10 @@ """Module to handle the context of nginx integration tests.""" import os import subprocess +from typing import Iterable +from typing import Tuple + +import pytest from certbot_integration_tests.certbot_tests import context as certbot_context from certbot_integration_tests.nginx_tests import nginx_config as config @@ -10,7 +14,7 @@ from certbot_integration_tests.utils import misc class IntegrationTestsContext(certbot_context.IntegrationTestsContext): """General fixture describing a certbot-nginx integration tests context""" - def __init__(self, request): + def __init__(self, request: pytest.FixtureRequest) -> None: super().__init__(request) self.nginx_root = os.path.join(self.workspace, 'nginx') @@ -22,16 +26,16 @@ class IntegrationTestsContext(certbot_context.IntegrationTestsContext): file_handler.write('Hello World!') self.nginx_config_path = os.path.join(self.nginx_root, 'nginx.conf') - self.nginx_config = None + self.nginx_config: str - default_server = request.param['default_server'] + default_server = request.param['default_server'] # type: ignore[attr-defined] self.process = self._start_nginx(default_server) - def cleanup(self): + def cleanup(self) -> None: self._stop_nginx() super().cleanup() - def certbot_test_nginx(self, args): + def certbot_test_nginx(self, args: Iterable[str]) -> Tuple[str, str]: """ Main command to execute certbot using the nginx plugin. :param list args: list of arguments to pass to nginx @@ -44,7 +48,7 @@ class IntegrationTestsContext(certbot_context.IntegrationTestsContext): command, self.directory_url, self.http_01_port, self.tls_alpn_01_port, self.config_dir, self.workspace, force_renew=True) - def _start_nginx(self, default_server): + def _start_nginx(self, default_server: bool) -> subprocess.Popen: self.nginx_config = config.construct_nginx_config( self.nginx_root, self.webroot, self.http_01_port, self.tls_alpn_01_port, self.other_port, default_server, wtf_prefix=self.worker_id) @@ -58,7 +62,7 @@ class IntegrationTestsContext(certbot_context.IntegrationTestsContext): misc.check_until_timeout('http://localhost:{0}'.format(self.http_01_port)) return process - def _stop_nginx(self): + def _stop_nginx(self) -> None: assert self.process.poll() is None self.process.terminate() self.process.wait() diff --git a/certbot-ci/certbot_integration_tests/nginx_tests/nginx_config.py b/certbot-ci/certbot_integration_tests/nginx_tests/nginx_config.py index bbbe8ea06..60f1696a8 100644 --- a/certbot-ci/certbot_integration_tests/nginx_tests/nginx_config.py +++ b/certbot-ci/certbot_integration_tests/nginx_tests/nginx_config.py @@ -1,12 +1,14 @@ # -*- coding: utf-8 -*- """General purpose nginx test configuration generator.""" import getpass +from typing import Optional import pkg_resources -def construct_nginx_config(nginx_root, nginx_webroot, http_port, https_port, other_port, - default_server, key_path=None, cert_path=None, wtf_prefix='le'): +def construct_nginx_config(nginx_root: str, nginx_webroot: str, http_port: int, https_port: int, + other_port: int, default_server: bool, key_path: Optional[str] = None, + cert_path: Optional[str] = None, wtf_prefix: str = 'le') -> str: """ This method returns a full nginx configuration suitable for integration tests. :param str nginx_root: nginx root configuration path diff --git a/certbot-ci/certbot_integration_tests/nginx_tests/test_main.py b/certbot-ci/certbot_integration_tests/nginx_tests/test_main.py index 2c52c8523..f9e6554ca 100644 --- a/certbot-ci/certbot_integration_tests/nginx_tests/test_main.py +++ b/certbot-ci/certbot_integration_tests/nginx_tests/test_main.py @@ -1,17 +1,18 @@ """Module executing integration tests against certbot with nginx plugin.""" import os import ssl +from typing import Generator from typing import List import pytest -from certbot_integration_tests.nginx_tests import context as nginx_context +from certbot_integration_tests.nginx_tests.context import IntegrationTestsContext @pytest.fixture(name='context') -def test_context(request): +def test_context(request: pytest.FixtureRequest) -> Generator[IntegrationTestsContext, None, None]: # Fixture request is a built-in pytest fixture describing current test request. - integration_test_context = nginx_context.IntegrationTestsContext(request) + integration_test_context = IntegrationTestsContext(request) try: yield integration_test_context finally: @@ -33,7 +34,7 @@ def test_context(request): ], {'default_server': False}), ], indirect=['context']) def test_certificate_deployment(certname_pattern: str, params: List[str], - context: nginx_context.IntegrationTestsContext) -> None: + context: IntegrationTestsContext) -> None: """ Test various scenarios to deploy a certificate to nginx using certbot. """ diff --git a/certbot-ci/certbot_integration_tests/py.typed b/certbot-ci/certbot_integration_tests/py.typed new file mode 100644 index 000000000..e69de29bb diff --git a/certbot-ci/certbot_integration_tests/rfc2136_tests/context.py b/certbot-ci/certbot_integration_tests/rfc2136_tests/context.py index 3d4147313..dde41f367 100644 --- a/certbot-ci/certbot_integration_tests/rfc2136_tests/context.py +++ b/certbot-ci/certbot_integration_tests/rfc2136_tests/context.py @@ -2,9 +2,12 @@ from contextlib import contextmanager import tempfile +from typing import Generator +from typing import Iterable +from typing import Tuple from pkg_resources import resource_filename -from pytest import skip +import pytest from certbot_integration_tests.certbot_tests import context as certbot_context from certbot_integration_tests.utils import certbot_call @@ -12,17 +15,17 @@ from certbot_integration_tests.utils import certbot_call class IntegrationTestsContext(certbot_context.IntegrationTestsContext): """Integration test context for certbot-dns-rfc2136""" - def __init__(self, request): + def __init__(self, request: pytest.FixtureRequest) -> None: super().__init__(request) self.request = request if hasattr(request.config, 'workerinput'): # Worker node - self._dns_xdist = request.config.workerinput['dns_xdist'] + self._dns_xdist = request.config.workerinput['dns_xdist'] # type: ignore[attr-defined] else: # Primary node - self._dns_xdist = request.config.dns_xdist + self._dns_xdist = request.config.dns_xdist # type: ignore[attr-defined] - def certbot_test_rfc2136(self, args): + def certbot_test_rfc2136(self, args: Iterable[str]) -> Tuple[str, str]: """ Main command to execute certbot using the RFC2136 DNS authenticator. :param list args: list of arguments to pass to Certbot @@ -34,7 +37,7 @@ class IntegrationTestsContext(certbot_context.IntegrationTestsContext): self.config_dir, self.workspace, force_renew=True) @contextmanager - def rfc2136_credentials(self, label='default'): + def rfc2136_credentials(self, label: str = 'default') -> Generator[str, None, None]: """ Produces the contents of a certbot-dns-rfc2136 credentials file. :param str label: which RFC2136 credential to use @@ -57,8 +60,8 @@ class IntegrationTestsContext(certbot_context.IntegrationTestsContext): fp.flush() yield fp.name - def skip_if_no_bind9_server(self): + def skip_if_no_bind9_server(self) -> None: """Skips the test if there was no RFC2136-capable DNS server configured in the test environment""" if not self._dns_xdist: - skip('No RFC2136-capable DNS server is configured') + pytest.skip('No RFC2136-capable DNS server is configured') diff --git a/certbot-ci/certbot_integration_tests/rfc2136_tests/test_main.py b/certbot-ci/certbot_integration_tests/rfc2136_tests/test_main.py index ae6c0018e..9466934a8 100644 --- a/certbot-ci/certbot_integration_tests/rfc2136_tests/test_main.py +++ b/certbot-ci/certbot_integration_tests/rfc2136_tests/test_main.py @@ -1,14 +1,16 @@ """Module executing integration tests against Certbot with the RFC2136 DNS authenticator.""" +from typing import Generator + import pytest -from certbot_integration_tests.rfc2136_tests import context as rfc2136_context +from certbot_integration_tests.rfc2136_tests.context import IntegrationTestsContext @pytest.fixture(name="context") -def pytest_context(request): +def test_context(request: pytest.FixtureRequest) -> Generator[IntegrationTestsContext, None, None]: # pylint: disable=missing-function-docstring # Fixture request is a built-in pytest fixture describing current test request. - integration_test_context = rfc2136_context.IntegrationTestsContext(request) + integration_test_context = IntegrationTestsContext(request) try: yield integration_test_context finally: @@ -16,7 +18,7 @@ def pytest_context(request): @pytest.mark.parametrize('domain', [('example.com'), ('sub.example.com')]) -def test_get_certificate(domain, context): +def test_get_certificate(domain: str, context: IntegrationTestsContext) -> None: context.skip_if_no_bind9_server() with context.rfc2136_credentials() as creds: diff --git a/certbot-ci/certbot_integration_tests/utils/acme_server.py b/certbot-ci/certbot_integration_tests/utils/acme_server.py index 65b78faad..611b33d0c 100755 --- a/certbot-ci/certbot_integration_tests/utils/acme_server.py +++ b/certbot-ci/certbot_integration_tests/utils/acme_server.py @@ -11,7 +11,15 @@ import subprocess import sys import tempfile import time +from types import TracebackType +from typing import Any +from typing import cast +from typing import Dict from typing import List +from typing import Mapping +from typing import Optional +from typing import Sequence +from typing import Type import requests @@ -34,8 +42,9 @@ class ACMEServer: ACMEServer is also a context manager, and so can be used to ensure ACME server is started/stopped upon context enter/exit. """ - def __init__(self, acme_server, nodes, http_proxy=True, stdout=False, - dns_server=None, http_01_port=DEFAULT_HTTP_01_PORT): + def __init__(self, acme_server: str, nodes: Sequence[str], http_proxy: bool = True, + stdout: bool = False, dns_server: Optional[str] = None, + http_01_port: int = DEFAULT_HTTP_01_PORT) -> None: """ Create an ACMEServer instance. :param str acme_server: the type of acme server used (boulder-v1, boulder-v2 or pebble) @@ -60,7 +69,7 @@ class ACMEServer: raise ValueError('setting http_01_port is not currently supported ' 'with boulder or the HTTP proxy') - def start(self): + def start(self) -> None: """Start the test stack""" try: if self._proxy: @@ -73,7 +82,7 @@ class ACMEServer: self.stop() raise e - def stop(self): + def stop(self) -> None: """Stop the test stack, and clean its resources""" print('=> Tear down the test infrastructure...') try: @@ -104,14 +113,15 @@ class ACMEServer: self._stdout.close() print('=> Test infrastructure stopped and cleaned up.') - def __enter__(self): + def __enter__(self) -> Dict[str, Any]: self.start() return self.acme_xdist - def __exit__(self, exc_type, exc_val, exc_tb): + def __exit__(self, exc_type: Optional[Type[BaseException]], exc: Optional[BaseException], + traceback: Optional[TracebackType]) -> None: self.stop() - def _construct_acme_xdist(self, acme_server, nodes): + def _construct_acme_xdist(self, acme_server: str, nodes: Sequence[str]) -> None: """Generate and return the acme_xdist dict""" acme_xdist = {'acme_server': acme_server, 'challtestsrv_port': CHALLTESTSRV_PORT} @@ -138,7 +148,7 @@ class ACMEServer: self.acme_xdist = acme_xdist - def _prepare_pebble_server(self): + def _prepare_pebble_server(self) -> None: """Configure and launch the Pebble server""" print('=> Starting pebble instance deployment...') pebble_artifacts_rv = pebble_artifacts.fetch(self._workspace, self._http_01_port) @@ -174,11 +184,11 @@ class ACMEServer: # Wait for the ACME CA server to be up. print('=> Waiting for pebble instance to respond...') - misc.check_until_timeout(self.acme_xdist['directory_url']) + misc.check_until_timeout(self.acme_xdist['directory_url']) # type: ignore[arg-type] print('=> Finished pebble instance deployment.') - def _prepare_boulder_server(self): + def _prepare_boulder_server(self) -> None: """Configure and launch the Boulder server""" print('=> Starting boulder instance deployment...') instance_path = join(self._workspace, 'boulder') @@ -207,7 +217,8 @@ class ACMEServer: # Wait for the ACME CA server to be up. print('=> Waiting for boulder instance to respond...') - misc.check_until_timeout(self.acme_xdist['directory_url'], attempts=300) + misc.check_until_timeout( + self.acme_xdist['directory_url'], attempts=300) # type: ignore[arg-type] if not self._dns_server: # Configure challtestsrv to answer any A record request with ip of the docker host. @@ -226,16 +237,19 @@ class ACMEServer: print('=> Finished boulder instance deployment.') - def _prepare_http_proxy(self): + def _prepare_http_proxy(self) -> None: """Configure and launch an HTTP proxy""" print('=> Configuring the HTTP proxy...') + http_port_map = cast(Dict[str, int], self.acme_xdist['http_port']) mapping = {r'.+\.{0}\.wtf'.format(node): 'http://127.0.0.1:{0}'.format(port) - for node, port in self.acme_xdist['http_port'].items()} + for node, port in http_port_map.items()} command = [sys.executable, proxy.__file__, str(DEFAULT_HTTP_01_PORT), json.dumps(mapping)] self._launch_process(command) print('=> Finished configuring the HTTP proxy.') - def _launch_process(self, command, cwd=os.getcwd(), env=None, force_stderr=False): + def _launch_process(self, command: Sequence[str], cwd: str = os.getcwd(), + env: Optional[Mapping[str, str]] = None, + force_stderr: bool = False) -> subprocess.Popen: """Launch silently a subprocess OS command""" if not env: env = os.environ @@ -248,7 +262,7 @@ class ACMEServer: return process -def main(): +def main() -> None: # pylint: disable=missing-function-docstring parser = argparse.ArgumentParser( description='CLI tool to start a local instance of Pebble or Boulder CA server.') diff --git a/certbot-ci/certbot_integration_tests/utils/certbot_call.py b/certbot-ci/certbot_integration_tests/utils/certbot_call.py index ffd14ac4b..b79b35912 100755 --- a/certbot-ci/certbot_integration_tests/utils/certbot_call.py +++ b/certbot-ci/certbot_integration_tests/utils/certbot_call.py @@ -5,14 +5,20 @@ import os import pkg_resources import subprocess import sys +from typing import Dict +from typing import List +from typing import Mapping +from typing import Sequence +from typing import Tuple import certbot_integration_tests # pylint: disable=wildcard-import,unused-wildcard-import from certbot_integration_tests.utils.constants import * -def certbot_test(certbot_args, directory_url, http_01_port, tls_alpn_01_port, - config_dir, workspace, force_renew=True): +def certbot_test(certbot_args: Sequence[str], directory_url: str, http_01_port: int, + tls_alpn_01_port: int, config_dir: str, workspace: str, + force_renew: bool = True) -> Tuple[str, str]: """ Invoke the certbot executable available in PATH in a test context for the given args. The test context consists in running certbot in debug mode, with various flags suitable @@ -40,7 +46,7 @@ def certbot_test(certbot_args, directory_url, http_01_port, tls_alpn_01_port, return proc.stdout, proc.stderr -def _prepare_environ(workspace): +def _prepare_environ(workspace: str) -> Dict[str, str]: # pylint: disable=missing-function-docstring new_environ = os.environ.copy() @@ -78,7 +84,8 @@ def _prepare_environ(workspace): return new_environ -def _compute_additional_args(workspace, environ, force_renew): +def _compute_additional_args(workspace: str, environ: Mapping[str, str], + force_renew: bool) -> List[str]: additional_args = [] output = subprocess.check_output(['certbot', '--version'], universal_newlines=True, stderr=subprocess.STDOUT, @@ -94,8 +101,9 @@ def _compute_additional_args(workspace, environ, force_renew): return additional_args -def _prepare_args_env(certbot_args, directory_url, http_01_port, tls_alpn_01_port, - config_dir, workspace, force_renew): +def _prepare_args_env(certbot_args: Sequence[str], directory_url: str, http_01_port: int, + tls_alpn_01_port: int, config_dir: str, workspace: str, + force_renew: bool) -> Tuple[List[str], Dict[str, str]]: new_environ = _prepare_environ(workspace) additional_args = _compute_additional_args(workspace, new_environ, force_renew) @@ -126,7 +134,7 @@ def _prepare_args_env(certbot_args, directory_url, http_01_port, tls_alpn_01_por return command, new_environ -def main(): +def main() -> None: # pylint: disable=missing-function-docstring args = sys.argv[1:] diff --git a/certbot-ci/certbot_integration_tests/utils/dns_server.py b/certbot-ci/certbot_integration_tests/utils/dns_server.py index c4bbcaea1..f74a5da40 100644 --- a/certbot-ci/certbot_integration_tests/utils/dns_server.py +++ b/certbot-ci/certbot_integration_tests/utils/dns_server.py @@ -8,7 +8,11 @@ import subprocess import sys import tempfile import time +from types import TracebackType +from typing import Any, Sequence +from typing import Dict from typing import Optional +from typing import Type from pkg_resources import resource_filename @@ -30,7 +34,7 @@ class DNSServer: future to support parallelization (https://github.com/certbot/certbot/issues/8455). """ - def __init__(self, unused_nodes, show_output=False): + def __init__(self, unused_nodes: Sequence[str], show_output: bool = False) -> None: """ Create an DNSServer instance. :param list nodes: list of node names that will be setup by pytest xdist @@ -48,7 +52,7 @@ class DNSServer: # pylint: disable=consider-using-with self._output = sys.stderr if show_output else open(os.devnull, "w") - def start(self): + def start(self) -> None: """Start the DNS server""" try: self._configure_bind() @@ -57,7 +61,7 @@ class DNSServer: self.stop() raise - def stop(self): + def stop(self) -> None: """Stop the DNS server, and clean its resources""" if self.process: try: @@ -71,7 +75,7 @@ class DNSServer: if self._output != sys.stderr: self._output.close() - def _configure_bind(self): + def _configure_bind(self) -> None: """Configure the BIND9 server based on the prebaked configuration""" bind_conf_src = resource_filename( "certbot_integration_tests", "assets/bind-config" @@ -81,7 +85,7 @@ class DNSServer: os.path.join(bind_conf_src, directory), os.path.join(self.bind_root, directory) ) - def _start_bind(self): + def _start_bind(self) -> None: """Launch the BIND9 server as a Docker container""" addr_str = "{}:{}".format(BIND_BIND_ADDRESS[0], BIND_BIND_ADDRESS[1]) # pylint: disable=consider-using-with @@ -150,9 +154,10 @@ class DNSServer: "Gave up waiting for DNS server {} to respond".format(BIND_BIND_ADDRESS) ) - def __enter__(self): + def __start__(self) -> Dict[str, Any]: self.start() return self.dns_xdist - def __exit__(self, exc_type, exc_val, exc_tb): + def __exit__(self, exc_type: Optional[Type[BaseException]], exc: Optional[BaseException], + traceback: Optional[TracebackType]) -> None: self.stop() diff --git a/certbot-ci/certbot_integration_tests/utils/misc.py b/certbot-ci/certbot_integration_tests/utils/misc.py index c92e08a74..9143804f9 100644 --- a/certbot-ci/certbot_integration_tests/utils/misc.py +++ b/certbot-ci/certbot_integration_tests/utils/misc.py @@ -15,6 +15,11 @@ import sys import tempfile import time import warnings +from typing import Generator +from typing import Iterable +from typing import List +from typing import Optional +from typing import Tuple from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric import ec @@ -22,10 +27,12 @@ from cryptography.hazmat.primitives.serialization import Encoding from cryptography.hazmat.primitives.serialization import NoEncryption from cryptography.hazmat.primitives.serialization import PrivateFormat from cryptography.x509 import load_pem_x509_certificate +from cryptography.x509 import Certificate from OpenSSL import crypto import pkg_resources import requests +from certbot_integration_tests.certbot_tests.context import IntegrationTestsContext from certbot_integration_tests.utils.constants import PEBBLE_ALTERNATE_ROOTS from certbot_integration_tests.utils.constants import PEBBLE_MANAGEMENT_URL @@ -33,7 +40,7 @@ RSA_KEY_TYPE = 'rsa' ECDSA_KEY_TYPE = 'ecdsa' -def _suppress_x509_verification_warnings(): +def _suppress_x509_verification_warnings() -> None: try: import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) @@ -44,7 +51,7 @@ def _suppress_x509_verification_warnings(): requests.packages.urllib3.disable_warnings(InsecureRequestWarning) -def check_until_timeout(url, attempts=30): +def check_until_timeout(url: str, attempts: int = 30) -> None: """ Wait and block until given url responds with status 200, or raise an exception after the specified number of attempts. @@ -72,12 +79,12 @@ class GracefulTCPServer(socketserver.TCPServer): allow_reuse_address = True -def _run_server(port): +def _run_server(port: int) -> None: GracefulTCPServer(('', port), SimpleHTTPServer.SimpleHTTPRequestHandler).serve_forever() @contextlib.contextmanager -def create_http_server(port): +def create_http_server(port: int) -> Generator[str, None, None]: """ Setup and start an HTTP server for the given TCP port. This server stays active for the lifetime of the context, and is automatically @@ -111,7 +118,7 @@ def create_http_server(port): shutil.rmtree(webroot) -def list_renewal_hooks_dirs(config_dir): +def list_renewal_hooks_dirs(config_dir: str) -> List[str]: """ Find and return paths of all hook directories for the given certbot config directory :param str config_dir: path to the certbot config directory @@ -121,14 +128,14 @@ def list_renewal_hooks_dirs(config_dir): return [os.path.join(renewal_hooks_root, item) for item in ['pre', 'deploy', 'post']] -def generate_test_file_hooks(config_dir, hook_probe): +def generate_test_file_hooks(config_dir: str, hook_probe: str) -> None: """ Create a suite of certbot hook scripts and put them in the relevant hook directory for the given certbot configuration directory. These scripts, when executed, will write specific verbs in the given hook_probe file to allow asserting they have effectively been executed. The deploy hook also checks that the renewal environment variables are set. :param str config_dir: current certbot config directory - :param hook_probe: path to the hook probe to test hook scripts execution + :param str hook_probe: path to the hook probe to test hook scripts execution """ hook_path = pkg_resources.resource_filename('certbot_integration_tests', 'assets/hook.py') @@ -163,7 +170,8 @@ set -e @contextlib.contextmanager -def manual_http_hooks(http_server_root, http_port): +def manual_http_hooks(http_server_root: str, + http_port: int) -> Generator[Tuple[str, str], None, None]: """ Generate suitable http-01 hooks command for test purpose in the given HTTP server webroot directory. These hooks command use temporary python scripts @@ -216,7 +224,8 @@ shutil.rmtree(well_known) shutil.rmtree(tempdir) -def generate_csr(domains, key_path, csr_path, key_type=RSA_KEY_TYPE): +def generate_csr(domains: Iterable[str], key_path: str, csr_path: str, + key_type: str = RSA_KEY_TYPE) -> None: """ Generate a private key, and a CSR for the given domains using this key. :param domains: the domain names to include in the CSR @@ -260,7 +269,7 @@ def generate_csr(domains, key_path, csr_path, key_type=RSA_KEY_TYPE): file_h.write(crypto.dump_certificate_request(crypto.FILETYPE_ASN1, req)) -def read_certificate(cert_path): +def read_certificate(cert_path: str) -> str: """ Load the certificate from the provided path, and return a human readable version of it (TEXT mode). @@ -274,7 +283,7 @@ def read_certificate(cert_path): return crypto.dump_certificate(crypto.FILETYPE_TEXT, cert).decode('utf-8') -def load_sample_data_path(workspace): +def load_sample_data_path(workspace: str) -> str: """ Load the certbot configuration example designed to make OCSP tests, and return its path :param str workspace: current test workspace directory path @@ -305,7 +314,7 @@ def load_sample_data_path(workspace): return copied -def echo(keyword, path=None): +def echo(keyword: str, path: Optional[str] = None) -> str: """ Generate a platform independent executable command that echoes the given keyword into the given file. @@ -320,7 +329,7 @@ def echo(keyword, path=None): os.path.basename(sys.executable), keyword, ' >> "{0}"'.format(path) if path else '') -def get_acme_issuers(context): +def get_acme_issuers(context: IntegrationTestsContext) -> List[Certificate]: """Gets the list of one or more issuer certificates from the ACME server used by the context. :param context: the testing context. diff --git a/certbot-ci/certbot_integration_tests/utils/pebble_artifacts.py b/certbot-ci/certbot_integration_tests/utils/pebble_artifacts.py index 918a5fd04..b929a14fe 100644 --- a/certbot-ci/certbot_integration_tests/utils/pebble_artifacts.py +++ b/certbot-ci/certbot_integration_tests/utils/pebble_artifacts.py @@ -3,6 +3,7 @@ import json import os import stat +from typing import Tuple import pkg_resources import requests @@ -14,7 +15,7 @@ PEBBLE_VERSION = 'v2.3.0' ASSETS_PATH = pkg_resources.resource_filename('certbot_integration_tests', 'assets') -def fetch(workspace, http_01_port=DEFAULT_HTTP_01_PORT): +def fetch(workspace: str, http_01_port: int = DEFAULT_HTTP_01_PORT) -> Tuple[str, str, str]: # pylint: disable=missing-function-docstring suffix = 'linux-amd64' if os.name != 'nt' else 'windows-amd64.exe' @@ -25,7 +26,7 @@ def fetch(workspace, http_01_port=DEFAULT_HTTP_01_PORT): return pebble_path, challtestsrv_path, pebble_config_path -def _fetch_asset(asset, suffix): +def _fetch_asset(asset: str, suffix: str) -> str: asset_path = os.path.join(ASSETS_PATH, '{0}_{1}_{2}'.format(asset, PEBBLE_VERSION, suffix)) if not os.path.exists(asset_path): asset_url = ('https://github.com/letsencrypt/pebble/releases/download/{0}/{1}_{2}' @@ -39,7 +40,7 @@ def _fetch_asset(asset, suffix): return asset_path -def _build_pebble_config(workspace, http_01_port): +def _build_pebble_config(workspace: str, http_01_port: int) -> str: config_path = os.path.join(workspace, 'pebble-config.json') with open(config_path, 'w') as file_h: file_h.write(json.dumps({ diff --git a/certbot-ci/certbot_integration_tests/utils/pebble_ocsp_server.py b/certbot-ci/certbot_integration_tests/utils/pebble_ocsp_server.py index 3458929ad..6fe966ac6 100755 --- a/certbot-ci/certbot_integration_tests/utils/pebble_ocsp_server.py +++ b/certbot-ci/certbot_integration_tests/utils/pebble_ocsp_server.py @@ -22,7 +22,7 @@ from certbot_integration_tests.utils.misc import GracefulTCPServer class _ProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): # pylint: disable=missing-function-docstring - def do_POST(self): + def do_POST(self) -> None: request = requests.get(PEBBLE_MANAGEMENT_URL + '/intermediate-keys/0', verify=False) issuer_key = serialization.load_pem_private_key(request.content, None, default_backend()) diff --git a/certbot-ci/certbot_integration_tests/utils/proxy.py b/certbot-ci/certbot_integration_tests/utils/proxy.py index 7c46e640d..3f8e099cf 100644 --- a/certbot-ci/certbot_integration_tests/utils/proxy.py +++ b/certbot-ci/certbot_integration_tests/utils/proxy.py @@ -5,17 +5,19 @@ import http.server as BaseHTTPServer import json import re import sys +from typing import Mapping +from typing import Type import requests from certbot_integration_tests.utils.misc import GracefulTCPServer -def _create_proxy(mapping): +def _create_proxy(mapping: Mapping[str, str]) -> Type[BaseHTTPServer.BaseHTTPRequestHandler]: # pylint: disable=missing-function-docstring class ProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): # pylint: disable=missing-class-docstring - def do_GET(self): + def do_GET(self) -> None: headers = {key.lower(): value for key, value in self.headers.items()} backend = [backend for pattern, backend in mapping.items() if re.match(pattern, headers['host'])][0] diff --git a/certbot-ci/snap_integration_tests/conftest.py b/certbot-ci/snap_integration_tests/conftest.py index ca2712d82..26314ab0e 100644 --- a/certbot-ci/snap_integration_tests/conftest.py +++ b/certbot-ci/snap_integration_tests/conftest.py @@ -1,3 +1,4 @@ +# type: ignore """ General conftest for pytest execution of all integration tests lying in the snap_installer_integration tests package. @@ -18,9 +19,10 @@ def pytest_addoption(parser): parser.addoption('--snap-folder', required=True, help='set the folder path where snaps to test are located') parser.addoption('--snap-arch', default='amd64', - help='set the architecture do test (default: amd64)') + help='set the architecture do test (default: amd64)') parser.addoption('--allow-persistent-changes', action='store_true', - help='needs to be set, and confirm that the test will make persistent changes on this machine') + help='needs to be set, and confirm that the test will make persistent ' + 'changes on this machine') def pytest_configure(config): @@ -30,7 +32,8 @@ def pytest_configure(config): """ if not config.option.allow_persistent_changes: raise RuntimeError('This integration test would install the Certbot snap on your machine. ' - 'Please run it again with the `--allow-persistent-changes` flag set to acknowledge.') + 'Please run it again with the `--allow-persistent-changes` flag set ' + 'to acknowledge.') def pytest_generate_tests(metafunc): @@ -40,6 +43,6 @@ def pytest_generate_tests(metafunc): if "dns_snap_path" in metafunc.fixturenames: snap_arch = metafunc.config.getoption('snap_arch') snap_folder = metafunc.config.getoption('snap_folder') - snap_dns_path_list = glob.glob(os.path.join(snap_folder, + snap_dns_path_list = glob.glob(os.path.join(snap_folder, 'certbot-dns-*_{0}.snap'.format(snap_arch))) metafunc.parametrize("dns_snap_path", snap_dns_path_list) diff --git a/certbot-ci/snap_integration_tests/dns_tests/test_main.py b/certbot-ci/snap_integration_tests/dns_tests/test_main.py index d008efc67..8dc6c4b80 100644 --- a/certbot-ci/snap_integration_tests/dns_tests/test_main.py +++ b/certbot-ci/snap_integration_tests/dns_tests/test_main.py @@ -1,14 +1,17 @@ #!/usr/bin/env python3 +"""Module executing integration tests against certbot snap.""" import glob import os import re import subprocess +from typing import Generator import pytest @pytest.fixture(autouse=True, scope="module") -def install_certbot_snap(request): +def install_certbot_snap(request: pytest.FixtureRequest) -> Generator[None, None, None]: + """Fixture ensuring the certbot snap is installed before each test.""" with pytest.raises(Exception): subprocess.check_call(['certbot', '--version']) try: @@ -22,13 +25,14 @@ def install_certbot_snap(request): subprocess.call(['snap', 'remove', 'certbot']) -def test_dns_plugin_install(dns_snap_path): +def test_dns_plugin_install(dns_snap_path: str) -> None: """ Test that each DNS plugin Certbot snap can be installed and is usable with the Certbot snap. """ - plugin_name = re.match(r'^certbot-(dns-\w+)_.*\.snap$', - os.path.basename(dns_snap_path)).group(1) + match = re.match(r'^certbot-(dns-\w+)_.*\.snap$', os.path.basename(dns_snap_path)) + assert match + plugin_name = match.group(1) snap_name = 'certbot-{0}'.format(plugin_name) assert plugin_name not in subprocess.check_output(['certbot', 'plugins', '--prepare'], universal_newlines=True) diff --git a/certbot-ci/snap_integration_tests/py.typed b/certbot-ci/snap_integration_tests/py.typed new file mode 100644 index 000000000..e69de29bb diff --git a/certbot-ci/windows_installer_integration_tests/conftest.py b/certbot-ci/windows_installer_integration_tests/conftest.py index 8a9de057f..3332fc923 100644 --- a/certbot-ci/windows_installer_integration_tests/conftest.py +++ b/certbot-ci/windows_installer_integration_tests/conftest.py @@ -1,3 +1,4 @@ +# type: ignore """ General conftest for pytest execution of all integration tests lying in the window_installer_integration tests package. @@ -21,9 +22,9 @@ def pytest_addoption(parser): default=os.path.join(ROOT_PATH, 'windows-installer', 'build', 'nsis', 'certbot-beta-installer-win32.exe'), help='set the path of the windows installer to use, default to ' - 'CERTBOT_ROOT_PATH\\windows-installer\\build\\nsis\\certbot-beta-installer-win32.exe') + 'CERTBOT_ROOT_PATH\\windows-installer\\build\\nsis\\certbot-beta-installer-win32.exe') # pylint: disable=line-too-long parser.addoption('--allow-persistent-changes', action='store_true', - help='needs to be set, and confirm that the test will make persistent changes on this machine') + help='needs to be set, and confirm that the test will make persistent changes on this machine') # pylint: disable=line-too-long def pytest_configure(config): @@ -33,4 +34,5 @@ def pytest_configure(config): """ if not config.option.allow_persistent_changes: raise RuntimeError('This integration test would install Certbot on your machine. ' - 'Please run it again with the `--allow-persistent-changes` flag set to acknowledge.') + 'Please run it again with the `--allow-persistent-changes` ' + 'flag set to acknowledge.') diff --git a/certbot-ci/windows_installer_integration_tests/py.typed b/certbot-ci/windows_installer_integration_tests/py.typed new file mode 100644 index 000000000..e69de29bb diff --git a/certbot-ci/windows_installer_integration_tests/test_main.py b/certbot-ci/windows_installer_integration_tests/test_main.py index f699b736a..ad1622bde 100644 --- a/certbot-ci/windows_installer_integration_tests/test_main.py +++ b/certbot-ci/windows_installer_integration_tests/test_main.py @@ -1,12 +1,16 @@ +"""Module executing integration tests for the windows installer.""" import os import re import subprocess import time import unittest +from typing import Any + +import pytest @unittest.skipIf(os.name != 'nt', reason='Windows installer tests must be run on Windows.') -def test_it(request): +def test_it(request: pytest.FixtureRequest) -> None: try: subprocess.check_call(['certbot', '--version']) except (subprocess.CalledProcessError, OSError): @@ -20,10 +24,12 @@ def test_it(request): # Assert certbot is installed and runnable output = subprocess.check_output(['certbot', '--version'], universal_newlines=True) - assert re.match(r'^certbot \d+\.\d+\.\d+.*$', output), 'Flag --version does not output a version.' + assert re.match(r'^certbot \d+\.\d+\.\d+.*$', + output), 'Flag --version does not output a version.' # Assert renew task is installed and ready - output = _ps('(Get-ScheduledTask -TaskName "Certbot Renew Task").State', capture_stdout=True) + output = _ps('(Get-ScheduledTask -TaskName "Certbot Renew Task").State', + capture_stdout=True) assert output.strip() == 'Ready' # Assert renew task is working @@ -32,7 +38,8 @@ def test_it(request): status = 'Running' while status != 'Ready': - status = _ps('(Get-ScheduledTask -TaskName "Certbot Renew Task").State', capture_stdout=True).strip() + status = _ps('(Get-ScheduledTask -TaskName "Certbot Renew Task").State', + capture_stdout=True).strip() time.sleep(1) log_path = os.path.join('C:\\', 'Certbot', 'log', 'letsencrypt.log') @@ -45,9 +52,10 @@ def test_it(request): assert 'no renewal failures' in data, 'Renew task did not execute properly.' finally: - # Sadly this command cannot work in non interactive mode: uninstaller will ask explicitly permission in an UAC prompt + # Sadly this command cannot work in non interactive mode: uninstaller will + # ask explicitly permission in an UAC prompt # print('Uninstalling Certbot ...') - # uninstall_path = _ps('(gci "HKLM:\\SOFTWARE\\Wow6432Node\\Microsoft\\Windows\\CurrentVersion\\Uninstall"' + # uninstall_path = _ps('(gci "HKLM:\\SOFTWARE\\Wow6432Node\\Microsoft\\Windows\\CurrentVersion\\Uninstall"' # pylint: disable=line-too-long # ' | foreach { gp $_.PSPath }' # ' | ? { $_ -match "Certbot" }' # ' | select UninstallString)' @@ -56,6 +64,7 @@ def test_it(request): pass -def _ps(powershell_str, capture_stdout=False): +def _ps(powershell_str: str, capture_stdout: bool = False) -> Any: fn = subprocess.check_output if capture_stdout else subprocess.check_call - return fn(['powershell.exe', '-c', powershell_str], universal_newlines=True) + return fn(['powershell.exe', '-c', powershell_str], # type: ignore[operator] + universal_newlines=True) diff --git a/linter_plugin.py b/linter_plugin.py index a19bf7df9..1eb13ffcf 100644 --- a/linter_plugin.py +++ b/linter_plugin.py @@ -6,12 +6,18 @@ deprecated modules. You can check its behavior as a reference to what is coded h See https://github.com/PyCQA/pylint/blob/b20a2984c94e2946669d727dbda78735882bf50a/pylint/checkers/imports.py#L287 See https://docs.pytest.org/en/latest/writing_plugins.html """ +import os.path +import re + from pylint.checkers import BaseChecker from pylint.interfaces import IAstroidChecker -# Modules in theses packages can import the os module. -WHITELIST_PACKAGES = [ - 'acme', 'certbot_integration_tests', 'certbot_compatibility_test', 'lock_test' +# Modules whose file is matching one of these paths can import the os module. +WHITELIST_PATHS = [ + '/acme/acme/', + '/certbot-ci/', + '/certbot-compatibility-test/', + '/tests/lock_test', ] @@ -50,5 +56,5 @@ def register(linter): def _check_disabled(node): module = node.root() - return any(package for package in WHITELIST_PACKAGES - if module.name.startswith(package + '.') or module.name == package) + return any(path for path in WHITELIST_PATHS + if os.path.normpath(path) in os.path.normpath(module.file)) diff --git a/tox.ini b/tox.ini index ee3e4d7d2..23e373bd7 100644 --- a/tox.ini +++ b/tox.ini @@ -20,8 +20,8 @@ install_and_test = python {toxinidir}/tools/install_and_test.py dns_packages = certbot-dns-cloudflare certbot-dns-cloudxns certbot-dns-digitalocean certbot-dns-dnsimple certbot-dns-dnsmadeeasy certbot-dns-gehirn certbot-dns-google certbot-dns-linode certbot-dns-luadns certbot-dns-nsone certbot-dns-ovh certbot-dns-rfc2136 certbot-dns-route53 certbot-dns-sakuracloud win_all_packages = acme[test] certbot[test] {[base]dns_packages} certbot-nginx all_packages = {[base]win_all_packages} certbot-apache -fully_typed_source_paths = acme/acme certbot/certbot -partially_typed_source_paths = certbot-ci/certbot_integration_tests certbot-apache/certbot_apache certbot-compatibility-test/certbot_compatibility_test certbot-dns-cloudflare/certbot_dns_cloudflare certbot-dns-cloudxns/certbot_dns_cloudxns certbot-dns-digitalocean/certbot_dns_digitalocean certbot-dns-dnsimple/certbot_dns_dnsimple certbot-dns-dnsmadeeasy/certbot_dns_dnsmadeeasy certbot-dns-gehirn/certbot_dns_gehirn certbot-dns-google/certbot_dns_google certbot-dns-linode/certbot_dns_linode certbot-dns-luadns/certbot_dns_luadns certbot-dns-nsone/certbot_dns_nsone certbot-dns-ovh/certbot_dns_ovh certbot-dns-rfc2136/certbot_dns_rfc2136 certbot-dns-route53/certbot_dns_route53 certbot-dns-sakuracloud/certbot_dns_sakuracloud certbot-nginx/certbot_nginx tests/lock_test.py +fully_typed_source_paths = acme/acme certbot/certbot certbot-ci/certbot_integration_tests certbot-ci/snap_integration_tests certbot-ci/windows_installer_integration_tests +partially_typed_source_paths = certbot-apache/certbot_apache certbot-compatibility-test/certbot_compatibility_test certbot-dns-cloudflare/certbot_dns_cloudflare certbot-dns-cloudxns/certbot_dns_cloudxns certbot-dns-digitalocean/certbot_dns_digitalocean certbot-dns-dnsimple/certbot_dns_dnsimple certbot-dns-dnsmadeeasy/certbot_dns_dnsmadeeasy certbot-dns-gehirn/certbot_dns_gehirn certbot-dns-google/certbot_dns_google certbot-dns-linode/certbot_dns_linode certbot-dns-luadns/certbot_dns_luadns certbot-dns-nsone/certbot_dns_nsone certbot-dns-ovh/certbot_dns_ovh certbot-dns-rfc2136/certbot_dns_rfc2136 certbot-dns-route53/certbot_dns_route53 certbot-dns-sakuracloud/certbot_dns_sakuracloud certbot-nginx/certbot_nginx tests/lock_test.py [testenv] passenv =