diff --git a/bin/tests/system/checkds/tests_checkds.py b/bin/tests/system/checkds/tests_checkds.py index d946634913..8e6660a6ef 100755 --- a/bin/tests/system/checkds/tests_checkds.py +++ b/bin/tests/system/checkds/tests_checkds.py @@ -100,10 +100,10 @@ def verify_zone(zone, transfer): verifier = isctest.run.cmd(verify_cmd) - if verifier.returncode != 0: + if verifier.rc != 0: isctest.log.error(f"dnssec-verify {zone}. failed") - return verifier.returncode == 0 + return verifier.rc == 0 def read_statefile(server, zone): diff --git a/bin/tests/system/dnstap/tests_dnstap.py b/bin/tests/system/dnstap/tests_dnstap.py index 57ba7e88ce..3c179c2825 100644 --- a/bin/tests/system/dnstap/tests_dnstap.py +++ b/bin/tests/system/dnstap/tests_dnstap.py @@ -13,7 +13,6 @@ import os import re -import subprocess import isctest import pytest @@ -45,8 +44,8 @@ def test_dnstap_dispatch_socket_addresses(ns3): os.rename(os.path.join("ns3", "dnstap.out.0"), "dnstap.out.resolver_addresses") # Read the contents of the dnstap file using dnstap-read. - output = subprocess.check_output( - [os.getenv("DNSTAPREAD"), "dnstap.out.resolver_addresses"], encoding="utf-8" + dnstapread = isctest.run.cmd( + [os.getenv("DNSTAPREAD"), "dnstap.out.resolver_addresses"], ) # Check whether all frames contain the expected addresses. @@ -61,7 +60,7 @@ def test_dnstap_dispatch_socket_addresses(ns3): bad_frames = [] inspected_frames = 0 addr_regex = r"^10\.53\.0\.[0-9]+:[0-9]{1,5}$" - for line in output.splitlines(): + for line in dnstapread.out.splitlines(): _, _, frame_type, addr1, _, addr2, _ = line.split(" ", 6) # Only inspect RESOLVER_QUERY and RESOLVER_RESPONSE frames. if frame_type not in ("RQ", "RR"): diff --git a/bin/tests/system/doth/conftest.py b/bin/tests/system/doth/conftest.py index 27dca7ee8a..b95baeaab5 100644 --- a/bin/tests/system/doth/conftest.py +++ b/bin/tests/system/doth/conftest.py @@ -25,10 +25,10 @@ def gnutls_cli_executable(): pytest.skip("gnutls-cli not found in PATH") # Ensure gnutls-cli supports the --logfile command-line option. - output = isctest.run.cmd( + cmd = isctest.run.cmd( [executable, "--logfile=/dev/null"], log_stderr=False, raise_on_exception=False - ).stdout - if b"illegal option" in output: + ) + if "illegal option" in cmd.out: pytest.skip("gnutls-cli does not support the --logfile option") return executable diff --git a/bin/tests/system/isctest/run.py b/bin/tests/system/isctest/run.py index f33fa3cad2..422877be06 100644 --- a/bin/tests/system/isctest/run.py +++ b/bin/tests/system/isctest/run.py @@ -20,6 +20,18 @@ from isctest.compat import dns_rcode import dns.message +class CmdResult: + def __init__(self, proc=None): + self.proc = proc + self.rc = self.proc.returncode + self.out = "" + self.err = "" + if self.proc.stdout: + self.out = self.proc.stdout.decode("utf-8") + if self.proc.stderr: + self.err = self.proc.stderr.decode("utf-8") + + def cmd( args, cwd=None, @@ -31,7 +43,7 @@ def cmd( input_text: Optional[bytes] = None, raise_on_exception=True, env: Optional[dict] = None, -): +) -> CmdResult: """Execute a command with given args as subprocess.""" isctest.log.debug(f"isctest.run.cmd(): {' '.join(args)}") @@ -61,24 +73,24 @@ def cmd( env=env, ) print_debug_logs(proc) - return proc + return CmdResult(proc) except subprocess.CalledProcessError as exc: print_debug_logs(exc) isctest.log.debug(f"isctest.run.cmd(): (return code) {exc.returncode}") if raise_on_exception: raise exc - return exc + return CmdResult(exc) class Dig: def __init__(self, base_params: str = ""): self.base_params = base_params - def __call__(self, params: str) -> str: - """Run the dig command with the given parameters and return the decoded output.""" + def __call__(self, params: str) -> CmdResult: + """Run the dig command with the given parameters.""" return cmd( [os.environ.get("DIG")] + f"{self.base_params} {params}".split(), - ).stdout.decode("utf-8") + ) def retry_with_timeout(func, timeout, delay=1, msg=None): diff --git a/bin/tests/system/keepalive/tests_keepalive.py b/bin/tests/system/keepalive/tests_keepalive.py index 5fc8aaaf5b..243525636c 100644 --- a/bin/tests/system/keepalive/tests_keepalive.py +++ b/bin/tests/system/keepalive/tests_keepalive.py @@ -31,25 +31,27 @@ def test_dig_tcp_keepalive_handling(named_port, servers): dig = isctest.run.Dig(f"-p {str(named_port)}") isctest.log.info("check that dig handles TCP keepalive in query") - assert "; TCP KEEPALIVE" in dig("+qr +keepalive foo.example. @10.53.0.2") + assert "; TCP KEEPALIVE" in dig("+qr +keepalive foo.example. @10.53.0.2").out isctest.log.info("check that dig added TCP keepalive was received") assert get_keepalive_options_received() == 1 isctest.log.info("check that TCP keepalive is added for TCP responses") - assert "; TCP KEEPALIVE" in dig("+tcp +keepalive foo.example. @10.53.0.2") + assert "; TCP KEEPALIVE" in dig("+tcp +keepalive foo.example. @10.53.0.2").out isctest.log.info("check that TCP keepalive requires TCP") - assert "; TCP KEEPALIVE" not in dig("+keepalive foo.example. @10.53.0.2") + assert "; TCP KEEPALIVE" not in dig("+keepalive foo.example. @10.53.0.2").out isctest.log.info("check the default keepalive value") - assert "; TCP KEEPALIVE: 30.0 secs" in dig( - "+tcp +keepalive foo.example. @10.53.0.3" + assert ( + "; TCP KEEPALIVE: 30.0 secs" + in dig("+tcp +keepalive foo.example. @10.53.0.3").out ) isctest.log.info("check a keepalive configured value") - assert "; TCP KEEPALIVE: 15.0 secs" in dig( - "+tcp +keepalive foo.example. @10.53.0.2" + assert ( + "; TCP KEEPALIVE: 15.0 secs" + in dig("+tcp +keepalive foo.example. @10.53.0.2").out ) isctest.log.info("check a re-configured keepalive value") @@ -58,8 +60,9 @@ def test_dig_tcp_keepalive_handling(named_port, servers): assert "tcp-idle-timeout=300" in response assert "tcp-keepalive-timeout=300" in response assert "tcp-advertised-timeout=200" in response - assert "; TCP KEEPALIVE: 20.0 secs" in dig( - "+tcp +keepalive foo.example. @10.53.0.2" + assert ( + "; TCP KEEPALIVE: 20.0 secs" + in dig("+tcp +keepalive foo.example. @10.53.0.2").out ) isctest.log.info("check server config entry") diff --git a/bin/tests/system/keyfromlabel/tests_keyfromlabel.py b/bin/tests/system/keyfromlabel/tests_keyfromlabel.py index cc62ec6a54..91cc89ca95 100644 --- a/bin/tests/system/keyfromlabel/tests_keyfromlabel.py +++ b/bin/tests/system/keyfromlabel/tests_keyfromlabel.py @@ -75,18 +75,16 @@ def token_init_and_cleanup(): ) try: - output = isctest.run.cmd( - token_init_command, env=EMPTY_OPENSSL_CONF_ENV - ).stdout.decode("utf-8") - assert "The token has been initialized and is reassigned to slot" in output + cmd = isctest.run.cmd(token_init_command, env=EMPTY_OPENSSL_CONF_ENV) + assert "The token has been initialized and is reassigned to slot" in cmd.out yield finally: - output = isctest.run.cmd( + cmd = isctest.run.cmd( token_cleanup_command, env=EMPTY_OPENSSL_CONF_ENV, raise_on_exception=False, - ).stdout.decode("utf-8") - assert re.search("Found token (.*) with matching token label", output) + ) + assert re.search("Found token (.*) with matching token label", cmd.out) # pylint: disable-msg=too-many-locals @@ -126,11 +124,9 @@ def test_keyfromlabel(alg_name, alg_type, alg_bits): HSMPIN, ] - output = isctest.run.cmd( - pkcs11_command, env=EMPTY_OPENSSL_CONF_ENV - ).stdout.decode("utf-8") + cmd = isctest.run.cmd(pkcs11_command, env=EMPTY_OPENSSL_CONF_ENV) - assert "Key pair generated" in output + assert "Key pair generated" in cmd.out def keyfromlabel(alg_name, zone, key_id, key_flag): key_flag = key_flag.split() if key_flag else [] @@ -148,18 +144,18 @@ def test_keyfromlabel(alg_name, alg_type, alg_bits): zone, ] - output = isctest.run.cmd(keyfrlab_command) - output_decoded = output.stdout.decode("utf-8").rstrip() + ".key" + cmd = isctest.run.cmd(keyfrlab_command) + keyfile = cmd.out.rstrip() + ".key" - assert os.path.exists(output_decoded) + assert os.path.exists(keyfile) - return output_decoded + return keyfile if ( isctest.run.cmd( [os.environ["SHELL"], "../testcrypto.sh", alg_name], raise_on_exception=False, - ).returncode + ).rc != 0 ): pytest.skip(f"{alg_name} is not supported") diff --git a/bin/tests/system/masterfile/tests_masterfile.py b/bin/tests/system/masterfile/tests_masterfile.py index 5d7baf58b3..e408bf235c 100644 --- a/bin/tests/system/masterfile/tests_masterfile.py +++ b/bin/tests/system/masterfile/tests_masterfile.py @@ -96,7 +96,7 @@ def test_masterfile_missing_master_file_servfail(): def test_masterfile_owner_inheritance(): """Test owner inheritance after $INCLUDE""" - checker_output = isctest.run.cmd( + cmd = isctest.run.cmd( [ os.environ["CHECKZONE"], "-D", @@ -104,12 +104,12 @@ def test_masterfile_owner_inheritance(): "example", "zone/inheritownerafterinclude.db", ] - ).stdout.decode("utf-8") + ) owner_inheritance_zone = """ example. 0 IN SOA . . 0 0 0 0 0 example. 0 IN TXT "this should be at the zone apex" example. 0 IN NS . """ - checker_zone = dns.zone.from_text(checker_output, origin="example.") + checker_zone = dns.zone.from_text(cmd.out, origin="example.") expected = dns.zone.from_text(owner_inheritance_zone, origin="example.") isctest.check.zones_equal(checker_zone, expected, compare_ttl=True) diff --git a/bin/tests/system/optout/tests_optout.py b/bin/tests/system/optout/tests_optout.py index d7cbe1845f..31bef7c333 100755 --- a/bin/tests/system/optout/tests_optout.py +++ b/bin/tests/system/optout/tests_optout.py @@ -86,10 +86,10 @@ def verify_zone(zone, transfer): verifier = isctest.run.cmd(verify_cmd) - if verifier.returncode != 0: + if verifier.rc != 0: isctest.log.error(f"dnssec-verify {zone}. failed") - return verifier.returncode == 0 + return verifier.rc == 0 def test_optout(ns2): diff --git a/bin/tests/system/rrchecker/tests_rrchecker.py b/bin/tests/system/rrchecker/tests_rrchecker.py index 5889224e91..a321765295 100644 --- a/bin/tests/system/rrchecker/tests_rrchecker.py +++ b/bin/tests/system/rrchecker/tests_rrchecker.py @@ -121,22 +121,18 @@ pytestmark = pytest.mark.extra_artifacts( ], ) def test_rrchecker_list_standard_names(option, expected_result): - stdout = isctest.run.cmd([os.environ["RRCHECKER"], option]).stdout.decode("utf-8") - values = [line for line in stdout.split("\n") if line.strip()] + cmd = isctest.run.cmd([os.environ["RRCHECKER"], option]) + values = [line for line in cmd.out.split("\n") if line.strip()] assert sorted(values) == sorted(expected_result) def run_rrchecker(option, rr_class, rr_type, rr_rest): - rrchecker_output = ( - isctest.run.cmd( - [os.environ["RRCHECKER"], option], - input_text=f"{rr_class} {rr_type} {rr_rest}".encode("utf-8"), - ) - .stdout.decode("utf-8") - .strip() + cmd = isctest.run.cmd( + [os.environ["RRCHECKER"], option], + input_text=f"{rr_class} {rr_type} {rr_rest}".encode("utf-8"), ) - return rrchecker_output.split() + return cmd.out.strip().split() @pytest.mark.parametrize( @@ -162,7 +158,7 @@ def test_rrchecker_conversions(option): ".", tempzone_file, ], - ).stdout.decode("utf-8") + ).out checkzone_output = [ line for line in checkzone_output.splitlines() if not line.startswith(";") ] diff --git a/bin/tests/system/tools/tests_tools_nsec3hash.py b/bin/tests/system/tools/tests_tools_nsec3hash.py index f1384d6ceb..93670bc48c 100644 --- a/bin/tests/system/tools/tests_tools_nsec3hash.py +++ b/bin/tests/system/tools/tests_tools_nsec3hash.py @@ -51,16 +51,12 @@ def test_nsec3_hashes(domain, nsec3hash): algorithm = "1" iterations = "12" - output = isctest.run.cmd( - [NSEC3HASH, salt, algorithm, iterations, domain] - ).stdout.decode("utf-8") - assert nsec3hash in output + cmd = isctest.run.cmd([NSEC3HASH, salt, algorithm, iterations, domain]) + assert nsec3hash in cmd.out flags = "0" - output = isctest.run.cmd( - [NSEC3HASH, "-r", algorithm, flags, iterations, salt, domain] - ).stdout.decode("utf-8") - assert nsec3hash in output + cmd = isctest.run.cmd([NSEC3HASH, "-r", algorithm, flags, iterations, salt, domain]) + assert nsec3hash in cmd.out @pytest.mark.parametrize( @@ -77,11 +73,11 @@ def test_nsec3_empty_salt(salt_emptiness_args): iterations = "0" domain = "com" - output = isctest.run.cmd( + cmd = isctest.run.cmd( [NSEC3HASH] + salt_emptiness_args + [algorithm, iterations, domain] - ).stdout.decode("utf-8") - assert "CK0POJMG874LJREF7EFN8430QVIT8BSM" in output - assert "salt=-" in output + ) + assert "CK0POJMG874LJREF7EFN8430QVIT8BSM" in cmd.out + assert "salt=-" in cmd.out @pytest.mark.parametrize( @@ -97,7 +93,7 @@ def test_nsec3_empty_salt_r(salt_emptiness_arg): iterations = "0" domain = "com" - output = isctest.run.cmd( + cmd = isctest.run.cmd( [ NSEC3HASH, "-r", @@ -107,8 +103,8 @@ def test_nsec3_empty_salt_r(salt_emptiness_arg): salt_emptiness_arg, domain, ] - ).stdout.decode("utf-8") - assert " - CK0POJMG874LJREF7EFN8430QVIT8BSM" in output + ) + assert " - CK0POJMG874LJREF7EFN8430QVIT8BSM" in cmd.out @pytest.mark.parametrize( @@ -144,10 +140,8 @@ def test_nsec3hash_acceptable_values(domain, it, salt_bytes) -> None: ) # calculate the hash using nsec3hash: - output = isctest.run.cmd( - [NSEC3HASH, salt_text, "1", str(it), str(domain)] - ).stdout.decode("ascii") - hash2 = output.partition(" ")[0] + cmd = isctest.run.cmd([NSEC3HASH, salt_text, "1", str(it), str(domain)]) + hash2 = cmd.out.partition(" ")[0] assert hash1 == hash2 diff --git a/bin/tests/system/verify/tests_verify.py b/bin/tests/system/verify/tests_verify.py index d900e56039..4b50fe4ee5 100644 --- a/bin/tests/system/verify/tests_verify.py +++ b/bin/tests/system/verify/tests_verify.py @@ -11,6 +11,7 @@ import os import re +import subprocess import pytest @@ -63,12 +64,12 @@ def test_verify_good_zone_nsec_next_name_case_mismatch(): def get_bad_zone_output(zone): only_opt = ["-z"] if re.match(r"[zk]sk-only", zone) else [] - output = isctest.run.cmd( + cmd = isctest.run.cmd( [VERIFY, *only_opt, "-o", zone, f"zones/{zone}.bad"], + stderr=subprocess.STDOUT, raise_on_exception=False, ) - stream = (output.stdout + output.stderr).decode("utf-8").replace("\n", "") - return stream + return cmd.out @pytest.mark.parametrize( @@ -153,19 +154,17 @@ def test_verify_bad_zone_files_unequal_nsec3_chains(): def test_verify_soa_not_at_top_error(): # when -o is not used, origin is set to zone file name, # which should cause an error in this case - output = isctest.run.cmd( - [VERIFY, "zones/ksk+zsk.nsec.good"], raise_on_exception=False - ).stderr.decode("utf-8") - assert "not at top of zone" in output - assert "use -o to specify a different zone origin" in output + cmd = isctest.run.cmd([VERIFY, "zones/ksk+zsk.nsec.good"], raise_on_exception=False) + assert "not at top of zone" in cmd.err + assert "use -o to specify a different zone origin" in cmd.err # checking error message when an invalid -o is specified # and a SOA record not at top of zone is found def test_verify_invalid_o_option_soa_not_at_top_error(): - output = isctest.run.cmd( + cmd = isctest.run.cmd( [VERIFY, "-o", "invalid.origin", "zones/ksk+zsk.nsec.good"], raise_on_exception=False, - ).stderr.decode("utf-8") - assert "not at top of zone" in output - assert "use -o to specify a different zone origin" not in output + ) + assert "not at top of zone" in cmd.err + assert "use -o to specify a different zone origin" not in cmd.err