Use CmdResult to decode stdout/stderr from isctest.run.cmd()

Avoid repeating the .decode("utf-8") snippet when processing command
output and provide a helper instead, which leads to more concise code.

(cherry picked from commit ac998da3f6)
This commit is contained in:
Nicki Křížek 2025-10-01 17:53:18 +02:00
parent e26fcd330e
commit deb8b39dc6
11 changed files with 86 additions and 87 deletions

View file

@ -100,10 +100,10 @@ def verify_zone(zone, transfer):
verifier = isctest.run.cmd(verify_cmd)
if verifier.returncode != 0:
if verifier.rc != 0:
isctest.log.error(f"dnssec-verify {zone}. failed")
return verifier.returncode == 0
return verifier.rc == 0
def read_statefile(server, zone):

View file

@ -13,7 +13,6 @@
import os
import re
import subprocess
import isctest
import pytest
@ -45,8 +44,8 @@ def test_dnstap_dispatch_socket_addresses(ns3):
os.rename(os.path.join("ns3", "dnstap.out.0"), "dnstap.out.resolver_addresses")
# Read the contents of the dnstap file using dnstap-read.
output = subprocess.check_output(
[os.getenv("DNSTAPREAD"), "dnstap.out.resolver_addresses"], encoding="utf-8"
dnstapread = isctest.run.cmd(
[os.getenv("DNSTAPREAD"), "dnstap.out.resolver_addresses"],
)
# Check whether all frames contain the expected addresses.
@ -61,7 +60,7 @@ def test_dnstap_dispatch_socket_addresses(ns3):
bad_frames = []
inspected_frames = 0
addr_regex = r"^10\.53\.0\.[0-9]+:[0-9]{1,5}$"
for line in output.splitlines():
for line in dnstapread.out.splitlines():
_, _, frame_type, addr1, _, addr2, _ = line.split(" ", 6)
# Only inspect RESOLVER_QUERY and RESOLVER_RESPONSE frames.
if frame_type not in ("RQ", "RR"):

View file

@ -25,10 +25,10 @@ def gnutls_cli_executable():
pytest.skip("gnutls-cli not found in PATH")
# Ensure gnutls-cli supports the --logfile command-line option.
output = isctest.run.cmd(
cmd = isctest.run.cmd(
[executable, "--logfile=/dev/null"], log_stderr=False, raise_on_exception=False
).stdout
if b"illegal option" in output:
)
if "illegal option" in cmd.out:
pytest.skip("gnutls-cli does not support the --logfile option")
return executable

View file

@ -20,6 +20,18 @@ from isctest.compat import dns_rcode
import dns.message
class CmdResult:
def __init__(self, proc=None):
self.proc = proc
self.rc = self.proc.returncode
self.out = ""
self.err = ""
if self.proc.stdout:
self.out = self.proc.stdout.decode("utf-8")
if self.proc.stderr:
self.err = self.proc.stderr.decode("utf-8")
def cmd(
args,
cwd=None,
@ -31,7 +43,7 @@ def cmd(
input_text: Optional[bytes] = None,
raise_on_exception=True,
env: Optional[dict] = None,
):
) -> CmdResult:
"""Execute a command with given args as subprocess."""
isctest.log.debug(f"isctest.run.cmd(): {' '.join(args)}")
@ -61,24 +73,24 @@ def cmd(
env=env,
)
print_debug_logs(proc)
return proc
return CmdResult(proc)
except subprocess.CalledProcessError as exc:
print_debug_logs(exc)
isctest.log.debug(f"isctest.run.cmd(): (return code) {exc.returncode}")
if raise_on_exception:
raise exc
return exc
return CmdResult(exc)
class Dig:
def __init__(self, base_params: str = ""):
self.base_params = base_params
def __call__(self, params: str) -> str:
"""Run the dig command with the given parameters and return the decoded output."""
def __call__(self, params: str) -> CmdResult:
"""Run the dig command with the given parameters."""
return cmd(
[os.environ.get("DIG")] + f"{self.base_params} {params}".split(),
).stdout.decode("utf-8")
)
def retry_with_timeout(func, timeout, delay=1, msg=None):

View file

@ -31,25 +31,27 @@ def test_dig_tcp_keepalive_handling(named_port, servers):
dig = isctest.run.Dig(f"-p {str(named_port)}")
isctest.log.info("check that dig handles TCP keepalive in query")
assert "; TCP KEEPALIVE" in dig("+qr +keepalive foo.example. @10.53.0.2")
assert "; TCP KEEPALIVE" in dig("+qr +keepalive foo.example. @10.53.0.2").out
isctest.log.info("check that dig added TCP keepalive was received")
assert get_keepalive_options_received() == 1
isctest.log.info("check that TCP keepalive is added for TCP responses")
assert "; TCP KEEPALIVE" in dig("+tcp +keepalive foo.example. @10.53.0.2")
assert "; TCP KEEPALIVE" in dig("+tcp +keepalive foo.example. @10.53.0.2").out
isctest.log.info("check that TCP keepalive requires TCP")
assert "; TCP KEEPALIVE" not in dig("+keepalive foo.example. @10.53.0.2")
assert "; TCP KEEPALIVE" not in dig("+keepalive foo.example. @10.53.0.2").out
isctest.log.info("check the default keepalive value")
assert "; TCP KEEPALIVE: 30.0 secs" in dig(
"+tcp +keepalive foo.example. @10.53.0.3"
assert (
"; TCP KEEPALIVE: 30.0 secs"
in dig("+tcp +keepalive foo.example. @10.53.0.3").out
)
isctest.log.info("check a keepalive configured value")
assert "; TCP KEEPALIVE: 15.0 secs" in dig(
"+tcp +keepalive foo.example. @10.53.0.2"
assert (
"; TCP KEEPALIVE: 15.0 secs"
in dig("+tcp +keepalive foo.example. @10.53.0.2").out
)
isctest.log.info("check a re-configured keepalive value")
@ -58,8 +60,9 @@ def test_dig_tcp_keepalive_handling(named_port, servers):
assert "tcp-idle-timeout=300" in response
assert "tcp-keepalive-timeout=300" in response
assert "tcp-advertised-timeout=200" in response
assert "; TCP KEEPALIVE: 20.0 secs" in dig(
"+tcp +keepalive foo.example. @10.53.0.2"
assert (
"; TCP KEEPALIVE: 20.0 secs"
in dig("+tcp +keepalive foo.example. @10.53.0.2").out
)
isctest.log.info("check server config entry")

View file

@ -75,18 +75,16 @@ def token_init_and_cleanup():
)
try:
output = isctest.run.cmd(
token_init_command, env=EMPTY_OPENSSL_CONF_ENV
).stdout.decode("utf-8")
assert "The token has been initialized and is reassigned to slot" in output
cmd = isctest.run.cmd(token_init_command, env=EMPTY_OPENSSL_CONF_ENV)
assert "The token has been initialized and is reassigned to slot" in cmd.out
yield
finally:
output = isctest.run.cmd(
cmd = isctest.run.cmd(
token_cleanup_command,
env=EMPTY_OPENSSL_CONF_ENV,
raise_on_exception=False,
).stdout.decode("utf-8")
assert re.search("Found token (.*) with matching token label", output)
)
assert re.search("Found token (.*) with matching token label", cmd.out)
# pylint: disable-msg=too-many-locals
@ -126,11 +124,9 @@ def test_keyfromlabel(alg_name, alg_type, alg_bits):
HSMPIN,
]
output = isctest.run.cmd(
pkcs11_command, env=EMPTY_OPENSSL_CONF_ENV
).stdout.decode("utf-8")
cmd = isctest.run.cmd(pkcs11_command, env=EMPTY_OPENSSL_CONF_ENV)
assert "Key pair generated" in output
assert "Key pair generated" in cmd.out
def keyfromlabel(alg_name, zone, key_id, key_flag):
key_flag = key_flag.split() if key_flag else []
@ -148,18 +144,18 @@ def test_keyfromlabel(alg_name, alg_type, alg_bits):
zone,
]
output = isctest.run.cmd(keyfrlab_command)
output_decoded = output.stdout.decode("utf-8").rstrip() + ".key"
cmd = isctest.run.cmd(keyfrlab_command)
keyfile = cmd.out.rstrip() + ".key"
assert os.path.exists(output_decoded)
assert os.path.exists(keyfile)
return output_decoded
return keyfile
if (
isctest.run.cmd(
[os.environ["SHELL"], "../testcrypto.sh", alg_name],
raise_on_exception=False,
).returncode
).rc
!= 0
):
pytest.skip(f"{alg_name} is not supported")

View file

@ -96,7 +96,7 @@ def test_masterfile_missing_master_file_servfail():
def test_masterfile_owner_inheritance():
"""Test owner inheritance after $INCLUDE"""
checker_output = isctest.run.cmd(
cmd = isctest.run.cmd(
[
os.environ["CHECKZONE"],
"-D",
@ -104,12 +104,12 @@ def test_masterfile_owner_inheritance():
"example",
"zone/inheritownerafterinclude.db",
]
).stdout.decode("utf-8")
)
owner_inheritance_zone = """
example. 0 IN SOA . . 0 0 0 0 0
example. 0 IN TXT "this should be at the zone apex"
example. 0 IN NS .
"""
checker_zone = dns.zone.from_text(checker_output, origin="example.")
checker_zone = dns.zone.from_text(cmd.out, origin="example.")
expected = dns.zone.from_text(owner_inheritance_zone, origin="example.")
isctest.check.zones_equal(checker_zone, expected, compare_ttl=True)

View file

@ -86,10 +86,10 @@ def verify_zone(zone, transfer):
verifier = isctest.run.cmd(verify_cmd)
if verifier.returncode != 0:
if verifier.rc != 0:
isctest.log.error(f"dnssec-verify {zone}. failed")
return verifier.returncode == 0
return verifier.rc == 0
def test_optout(ns2):

View file

@ -121,22 +121,18 @@ pytestmark = pytest.mark.extra_artifacts(
],
)
def test_rrchecker_list_standard_names(option, expected_result):
stdout = isctest.run.cmd([os.environ["RRCHECKER"], option]).stdout.decode("utf-8")
values = [line for line in stdout.split("\n") if line.strip()]
cmd = isctest.run.cmd([os.environ["RRCHECKER"], option])
values = [line for line in cmd.out.split("\n") if line.strip()]
assert sorted(values) == sorted(expected_result)
def run_rrchecker(option, rr_class, rr_type, rr_rest):
rrchecker_output = (
isctest.run.cmd(
[os.environ["RRCHECKER"], option],
input_text=f"{rr_class} {rr_type} {rr_rest}".encode("utf-8"),
)
.stdout.decode("utf-8")
.strip()
cmd = isctest.run.cmd(
[os.environ["RRCHECKER"], option],
input_text=f"{rr_class} {rr_type} {rr_rest}".encode("utf-8"),
)
return rrchecker_output.split()
return cmd.out.strip().split()
@pytest.mark.parametrize(
@ -162,7 +158,7 @@ def test_rrchecker_conversions(option):
".",
tempzone_file,
],
).stdout.decode("utf-8")
).out
checkzone_output = [
line for line in checkzone_output.splitlines() if not line.startswith(";")
]

View file

@ -51,16 +51,12 @@ def test_nsec3_hashes(domain, nsec3hash):
algorithm = "1"
iterations = "12"
output = isctest.run.cmd(
[NSEC3HASH, salt, algorithm, iterations, domain]
).stdout.decode("utf-8")
assert nsec3hash in output
cmd = isctest.run.cmd([NSEC3HASH, salt, algorithm, iterations, domain])
assert nsec3hash in cmd.out
flags = "0"
output = isctest.run.cmd(
[NSEC3HASH, "-r", algorithm, flags, iterations, salt, domain]
).stdout.decode("utf-8")
assert nsec3hash in output
cmd = isctest.run.cmd([NSEC3HASH, "-r", algorithm, flags, iterations, salt, domain])
assert nsec3hash in cmd.out
@pytest.mark.parametrize(
@ -77,11 +73,11 @@ def test_nsec3_empty_salt(salt_emptiness_args):
iterations = "0"
domain = "com"
output = isctest.run.cmd(
cmd = isctest.run.cmd(
[NSEC3HASH] + salt_emptiness_args + [algorithm, iterations, domain]
).stdout.decode("utf-8")
assert "CK0POJMG874LJREF7EFN8430QVIT8BSM" in output
assert "salt=-" in output
)
assert "CK0POJMG874LJREF7EFN8430QVIT8BSM" in cmd.out
assert "salt=-" in cmd.out
@pytest.mark.parametrize(
@ -97,7 +93,7 @@ def test_nsec3_empty_salt_r(salt_emptiness_arg):
iterations = "0"
domain = "com"
output = isctest.run.cmd(
cmd = isctest.run.cmd(
[
NSEC3HASH,
"-r",
@ -107,8 +103,8 @@ def test_nsec3_empty_salt_r(salt_emptiness_arg):
salt_emptiness_arg,
domain,
]
).stdout.decode("utf-8")
assert " - CK0POJMG874LJREF7EFN8430QVIT8BSM" in output
)
assert " - CK0POJMG874LJREF7EFN8430QVIT8BSM" in cmd.out
@pytest.mark.parametrize(
@ -144,10 +140,8 @@ def test_nsec3hash_acceptable_values(domain, it, salt_bytes) -> None:
)
# calculate the hash using nsec3hash:
output = isctest.run.cmd(
[NSEC3HASH, salt_text, "1", str(it), str(domain)]
).stdout.decode("ascii")
hash2 = output.partition(" ")[0]
cmd = isctest.run.cmd([NSEC3HASH, salt_text, "1", str(it), str(domain)])
hash2 = cmd.out.partition(" ")[0]
assert hash1 == hash2

View file

@ -11,6 +11,7 @@
import os
import re
import subprocess
import pytest
@ -63,12 +64,12 @@ def test_verify_good_zone_nsec_next_name_case_mismatch():
def get_bad_zone_output(zone):
only_opt = ["-z"] if re.match(r"[zk]sk-only", zone) else []
output = isctest.run.cmd(
cmd = isctest.run.cmd(
[VERIFY, *only_opt, "-o", zone, f"zones/{zone}.bad"],
stderr=subprocess.STDOUT,
raise_on_exception=False,
)
stream = (output.stdout + output.stderr).decode("utf-8").replace("\n", "")
return stream
return cmd.out
@pytest.mark.parametrize(
@ -153,19 +154,17 @@ def test_verify_bad_zone_files_unequal_nsec3_chains():
def test_verify_soa_not_at_top_error():
# when -o is not used, origin is set to zone file name,
# which should cause an error in this case
output = isctest.run.cmd(
[VERIFY, "zones/ksk+zsk.nsec.good"], raise_on_exception=False
).stderr.decode("utf-8")
assert "not at top of zone" in output
assert "use -o to specify a different zone origin" in output
cmd = isctest.run.cmd([VERIFY, "zones/ksk+zsk.nsec.good"], raise_on_exception=False)
assert "not at top of zone" in cmd.err
assert "use -o to specify a different zone origin" in cmd.err
# checking error message when an invalid -o is specified
# and a SOA record not at top of zone is found
def test_verify_invalid_o_option_soa_not_at_top_error():
output = isctest.run.cmd(
cmd = isctest.run.cmd(
[VERIFY, "-o", "invalid.origin", "zones/ksk+zsk.nsec.good"],
raise_on_exception=False,
).stderr.decode("utf-8")
assert "not at top of zone" in output
assert "use -o to specify a different zone origin" not in output
)
assert "not at top of zone" in cmd.err
assert "use -o to specify a different zone origin" not in cmd.err