Use CmdResult to decode stdout/stderr from isctest.run.cmd()

Avoid repeating the .decode("utf-8") snippet when processing command
output and provide a helper instead, which leads to more concise code.
This commit is contained in:
Nicki Křížek 2025-10-01 17:53:18 +02:00
parent ac2be27f8f
commit ac998da3f6
20 changed files with 180 additions and 202 deletions

View file

@ -38,125 +38,117 @@ def test_dnssecpolicy_keystore():
# Superfluous key file.
zone = "superfluous-keyfile.kz.example"
out = isctest.run.cmd(
cmd = isctest.run.cmd(
[CHECKCONF, "-k", "bad-superfluous-keyfile.conf"], raise_on_exception=False
)
err = out.stdout.decode("utf-8")
assert f"zone '{zone}': wrong number of key files (3, expected 2)" in err
assert f"zone '{zone}': wrong number of key files (3, expected 2)" in cmd.out
# Missing key file.
zone = "missing-keyfile.kz.example"
out = isctest.run.cmd(
cmd = isctest.run.cmd(
[CHECKCONF, "-k", "bad-missing-keyfile.conf"], raise_on_exception=False
)
err = out.stdout.decode("utf-8")
assert f"zone '{zone}': wrong number of key files (1, expected 2)" in err
assert f"zone '{zone}': wrong number of key files (1, expected 2)" in cmd.out
# Mismatch algorithm.
zone = "bad-algorithm.kz.example"
out = isctest.run.cmd(
cmd = isctest.run.cmd(
[CHECKCONF, "-k", "bad-algorithm.conf"], raise_on_exception=False
)
err = out.stdout.decode("utf-8")
keys = isctest.kasp.keydir_to_keylist(zone)
assert len(keys) == 2
assert (
f"zone '{zone}': key file '{zone}/ECDSAP256SHA256/{keys[0].tag}' does not match dnssec-policy alternative-kz"
in err
in cmd.out
)
assert (
f"zone '{zone}': key file '{zone}/ECDSAP256SHA256/{keys[1].tag}' does not match dnssec-policy alternative-kz"
in err
in cmd.out
)
assert (
f"zone '{zone}': no key file found matching dnssec-policy alternative-kz key:'ksk algorithm:RSASHA256 length:2048 tag-range:0-65535'"
in err
in cmd.out
)
assert (
f"zone '{zone}': no key file found matching dnssec-policy alternative-kz key:'zsk algorithm:RSASHA256 length:2048 tag-range:0-65535'"
in err
in cmd.out
)
# Mismatch length
zone = "bad-length.csk.example"
out = isctest.run.cmd(
cmd = isctest.run.cmd(
[CHECKCONF, "-k", "bad-length.conf"], raise_on_exception=False
)
err = out.stdout.decode("utf-8")
keys = isctest.kasp.keydir_to_keylist(zone)
assert len(keys) == 1
assert (
f"zone '{zone}': key file '{zone}/RSASHA256/{keys[0].tag}' does not match dnssec-policy alternative-csk"
in err
in cmd.out
)
assert (
f"zone '{zone}': no key file found matching dnssec-policy alternative-csk key:'csk algorithm:RSASHA256 length:2048 tag-range:0-65535'"
in err
in cmd.out
)
# Mismatch tag range
zone = "bad-tagrange.csk.example"
out = isctest.run.cmd(
cmd = isctest.run.cmd(
[CHECKCONF, "-k", "bad-tagrange.conf"], raise_on_exception=False
)
err = out.stdout.decode("utf-8")
keys = isctest.kasp.keydir_to_keylist(zone)
assert len(keys) == 1
assert (
f"zone '{zone}': key file '{zone}/ECDSAP256SHA256/{keys[0].tag}' does not match dnssec-policy tagrange-csk"
in err
in cmd.out
)
assert (
f"zone '{zone}': no key file found matching dnssec-policy tagrange-csk key:'csk algorithm:ECDSAP256SHA256 length:256 tag-range:0-32767'"
in err
in cmd.out
)
# Mismatch role
zone = "bad-role.kz.example"
out = isctest.run.cmd([CHECKCONF, "-k", "bad-role.conf"], raise_on_exception=False)
err = out.stdout.decode("utf-8")
cmd = isctest.run.cmd([CHECKCONF, "-k", "bad-role.conf"], raise_on_exception=False)
keys = isctest.kasp.keydir_to_keylist(zone)
assert len(keys) == 2
assert (
f"zone '{zone}': no key file found matching dnssec-policy default-kz key:'zsk algorithm:ECDSAP256SHA256 length:256 tag-range:0-65535'"
in err
in cmd.out
)
# Mismatch algorithm (default policy)
zone = "bad-default-algorithm.example"
out = isctest.run.cmd(
cmd = isctest.run.cmd(
[CHECKCONF, "-k", "bad-default-algorithm.conf"], raise_on_exception=False
)
err = out.stdout.decode("utf-8")
keys = isctest.kasp.keydir_to_keylist(zone)
assert len(keys) == 1
assert (
f"zone '{zone}': key file '{zone}/RSASHA256/{keys[0].tag}' does not match dnssec-policy default"
in err
in cmd.out
)
assert (
f"zone '{zone}': no key file found matching dnssec-policy default key:'csk algorithm:ECDSAP256SHA256 length:256 tag-range:0-65535'"
in err
in cmd.out
)
# Mismatch role (default policy)
zone = "bad-default-kz.example"
out = isctest.run.cmd(
cmd = isctest.run.cmd(
[CHECKCONF, "-k", "bad-default-kz.conf"], raise_on_exception=False
)
err = out.stdout.decode("utf-8")
keys = isctest.kasp.keydir_to_keylist(zone)
assert len(keys) == 2
assert (
f"zone '{zone}': key file '{zone}/ECDSAP256SHA256/{keys[0].tag}' does not match dnssec-policy default"
in err
in cmd.out
)
assert (
f"zone '{zone}': key file '{zone}/ECDSAP256SHA256/{keys[1].tag}' does not match dnssec-policy default"
in err
in cmd.out
)
assert (
f"zone '{zone}': no key file found matching dnssec-policy default key:'csk algorithm:ECDSAP256SHA256 length:256 tag-range:0-65535'"
in err
in cmd.out
)
assert f"zone '{zone}': wrong number of key files (2, expected 1)" in err
assert f"zone '{zone}': wrong number of key files (2, expected 1)" in cmd.out

View file

@ -15,25 +15,23 @@ import isctest
def test_checkconf_effective():
proc = isctest.run.cmd([os.environ["CHECKCONF"], "-e", "effective.conf"])
checkconf_output = proc.stdout.decode()
assert "listen-on port 5353 {\n\t\t127.1.2.3/32;\n\t};" in checkconf_output
assert 'view "_bind" chaos {' in checkconf_output
assert 'remote-servers "_default_iana_root_zone_primaries" {' in checkconf_output
assert 'view "foo" {\n}' in checkconf_output
cmd = isctest.run.cmd([os.environ["CHECKCONF"], "-e", "effective.conf"])
assert b"listen-on port 5353 {\n\t\t127.1.2.3/32;\n\t};" in cmd.proc.stdout
assert 'view "_bind" chaos {' in cmd.out
assert 'remote-servers "_default_iana_root_zone_primaries" {' in cmd.out
assert b'view "foo" {\n}' in cmd.proc.stdout
# builtin-trust-anchors is non documented and internal clause only, it must
# not be visible.
assert "builtin-trust-anchors" not in checkconf_output
assert "builtin-trust-anchors" not in cmd.out
def test_checkconf_builtin():
proc = isctest.run.cmd([os.environ["CHECKCONF"], "-b"])
checkconf_output = proc.stdout.decode()
assert 'listen-on {\n\t\t"any";\n\t};' in checkconf_output
assert 'view "_bind" chaos {' in checkconf_output
assert 'remote-servers "_default_iana_root_zone_primaries" {' in checkconf_output
cmd = isctest.run.cmd([os.environ["CHECKCONF"], "-b"])
assert b'listen-on {\n\t\t"any";\n\t};' in cmd.proc.stdout
assert 'view "_bind" chaos {' in cmd.out
assert 'remote-servers "_default_iana_root_zone_primaries" {' in cmd.out
# builtin-trust-anchors is non documented and internal clause only, it must
# not be visible.
assert "builtin-trust-anchors" not in checkconf_output
assert "builtin-trust-anchors" not in cmd.out

View file

@ -102,10 +102,10 @@ def verify_zone(zone, transfer):
verifier = isctest.run.cmd(verify_cmd)
if verifier.returncode != 0:
if verifier.rc != 0:
isctest.log.error(f"dnssec-verify {zone} failed")
return verifier.returncode == 0
return verifier.rc == 0
def read_statefile(server, zone):
@ -210,10 +210,10 @@ def rekey(zone):
]
controller = isctest.run.cmd(rndc_cmd)
if controller.returncode != 0:
if controller.rc != 0:
isctest.log.error(f"rndc loadkeys {zone} failed")
assert controller.returncode == 0
assert controller.rc == 0
class CheckDSTest(NamedTuple):

View file

@ -64,125 +64,121 @@ def delv(*args, tkeys=False):
delv_cmd.extend(["@10.53.0.4", "-a", tfile, "-p", os.environ["PORT"]])
delv_cmd.extend(args)
return (
isctest.run.cmd(delv_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
.stdout.decode("utf-8")
.strip()
)
return isctest.run.cmd(delv_cmd, stderr=subprocess.STDOUT)
def test_positive_validation_delv():
# check positive validation NSEC
response = delv("a", "a.example")
assert grep_c("a.example..*10.0.0.1", response)
assert grep_c("a.example..*.RRSIG.A [0-9][0-9]* 2 300 .*", response)
assert grep_c("a.example..*10.0.0.1", response.out)
assert grep_c("a.example..*.RRSIG.A [0-9][0-9]* 2 300 .*", response.out)
# check positive validation NSEC (trsuted-keys)
response = delv("a", "a.example", tkeys=True)
assert grep_c("a.example..*10.0.0.1", response)
assert grep_c("a.example..*.RRSIG.A [0-9][0-9]* 2 300 .*", response)
assert grep_c("a.example..*10.0.0.1", response.out)
assert grep_c("a.example..*.RRSIG.A [0-9][0-9]* 2 300 .*", response.out)
# check positive validation NSEC3
response = delv("a", "a.nsec3.example")
assert grep_c("a.nsec3.example..*10.0.0.1", response)
assert grep_c("a.nsec3.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response)
assert grep_c("a.nsec3.example..*10.0.0.1", response.out)
assert grep_c("a.nsec3.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response.out)
# check positive validation OPTOUT
response = delv("a", "a.optout.example")
assert grep_c("a.optout.example..*10.0.0.1", response)
assert grep_c("a.optout.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response)
assert grep_c("a.optout.example..*10.0.0.1", response.out)
assert grep_c("a.optout.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response.out)
# check positive wildcard validation NSEC
response = delv("a", "a.wild.example")
assert grep_c("a.wild.example..*10.0.0.27", response)
assert grep_c("a.wild.example..*.RRSIG.A [0-9][0-9]* 2 300 .*", response)
assert grep_c("a.wild.example..*10.0.0.27", response.out)
assert grep_c("a.wild.example..*.RRSIG.A [0-9][0-9]* 2 300 .*", response.out)
# check positive wildcard validation NSEC3
response = delv("a", "a.wild.nsec3.example")
assert grep_c("a.wild.nsec3.example..*10.0.0.6", response)
assert grep_c("a.wild.nsec3.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response)
assert grep_c("a.wild.nsec3.example..*10.0.0.6", response.out)
assert grep_c("a.wild.nsec3.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response.out)
# check positive wildcard validation OPTOUT
response = delv("a", "a.wild.optout.example")
assert grep_c("a.wild.optout.example..*10.0.0.6", response)
assert grep_c("a.wild.optout.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response)
assert grep_c("a.wild.optout.example..*10.0.0.6", response.out)
assert grep_c("a.wild.optout.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response.out)
def test_negative_validation_delv():
# checking negative validation NXDOMAIN NSEC
response = delv("a", "q.example")
assert grep_c("resolution failed: ncache nxdomain", response)
assert grep_c("resolution failed: ncache nxdomain", response.out)
# checking negative validation NODATA NSEC
response = delv("txt", "a.example")
assert grep_c("resolution failed: ncache nxrrset", response)
assert grep_c("resolution failed: ncache nxrrset", response.out)
# checking negative validation NXDOMAIN NSEC3
response = delv("a", "q.nsec3.example")
assert grep_c("resolution failed: ncache nxdomain", response)
assert grep_c("resolution failed: ncache nxdomain", response.out)
# checking negative validation NODATA NSEC3
response = delv("txt", "a.nsec3.example")
assert grep_c("resolution failed: ncache nxrrset", response)
assert grep_c("resolution failed: ncache nxrrset", response.out)
# checking negative validation NXDOMAIN OPTOUT
response = delv("a", "q.optout.example")
assert grep_c("resolution failed: ncache nxdomain", response)
assert grep_c("resolution failed: ncache nxdomain", response.out)
# checking negative validation NODATA OPTOUT
response = delv("txt", "a.optout.example")
assert grep_c("resolution failed: ncache nxrrset", response)
assert grep_c("resolution failed: ncache nxrrset", response.out)
# checking negative wildcard validation NSEC
response = delv("txt", "b.wild.example")
assert grep_c("resolution failed: ncache nxrrset", response)
assert grep_c("resolution failed: ncache nxrrset", response.out)
# checking negative wildcard validation NSEC3
response = delv("txt", "b.wild.nsec3.example")
assert grep_c("resolution failed: ncache nxrrset", response)
assert grep_c("resolution failed: ncache nxrrset", response.out)
# checking negative wildcard validation OPTOUT
response = delv("txt", "b.wild.optout.example")
assert grep_c("resolution failed: ncache nxrrset", response)
assert grep_c("resolution failed: ncache nxrrset", response.out)
def test_insecure_validation_delv():
# check 1-server insecurity proof NSEC
response = delv("a", "a.insecure.example")
assert grep_c("a.insecure.example..*10.0.0.1", response)
assert grep_c("a.insecure.example..*10.0.0.1", response.out)
# check 1-server insecurity proof NSEC3
response = delv("a", "a.insecure.nsec3.example")
assert grep_c("a.insecure.nsec3.example..*10.0.0.1", response)
assert grep_c("a.insecure.nsec3.example..*10.0.0.1", response.out)
# check 1-server insecurity proof NSEC3
response = delv("a", "a.insecure.optout.example")
assert grep_c("a.insecure.optout.example..*10.0.0.1", response)
assert grep_c("a.insecure.optout.example..*10.0.0.1", response.out)
# check 1-server negative insecurity proof NSEC
response = delv("a", "q.insecure.example")
assert grep_c("resolution failed: ncache nxdomain", response)
assert grep_c("resolution failed: ncache nxdomain", response.out)
# check 1-server negative insecurity proof NSEC3
response = delv("a", "q.insecure.nsec3.example")
assert grep_c("resolution failed: ncache nxdomain", response)
assert grep_c("resolution failed: ncache nxdomain", response.out)
# check 1-server negative insecurity proof OPTOUT
response = delv("a", "q.insecure.optout.example")
assert grep_c("resolution failed: ncache nxdomain", response)
assert grep_c("resolution failed: ncache nxdomain", response.out)
def test_validation_failure_delv():
# check failed validation due to bogus data
response = delv("+cd", "a", "a.bogus.example")
assert grep_c("resolution failed: RRSIG failed to verify", response)
assert grep_c("resolution failed: RRSIG failed to verify", response.out)
# check failed validation due to missing key record
response = delv("+cd", "a", "a.b.keyless.example")
assert grep_c("resolution failed: insecurity proof failed", response)
assert grep_c("resolution failed: insecurity proof failed", response.out)
def test_revoked_key_delv():
# check failed validation succeeds when a revoked key is encountered
response = delv("+cd", "soa", "revkey.example")
assert grep_c("fully validated", response)
assert grep_c("fully validated", response.out)

View file

@ -63,14 +63,14 @@ def grep_c(regex, filename):
def keygen(*args):
keygen_cmd = [os.environ.get("KEYGEN")]
keygen_cmd.extend(args)
return isctest.run.cmd(keygen_cmd, log_stdout=True).stdout.decode("utf-8").strip()
return isctest.run.cmd(keygen_cmd).out.strip()
# run dnssec-settime
def settime(*args):
settime_cmd = [os.environ.get("SETTIME")]
settime_cmd.extend(args)
return isctest.run.cmd(settime_cmd, log_stdout=True).stdout.decode("utf-8").strip()
return isctest.run.cmd(settime_cmd).out.strip()
@pytest.mark.parametrize(

View file

@ -48,7 +48,7 @@ def test_dnstap_dispatch_socket_addresses(ns3):
os.rename(os.path.join("ns3", "dnstap.out.0"), "dnstap.out.resolver_addresses")
# Read the contents of the dnstap file using dnstap-read.
run = isctest.run.cmd(
dnstapread = isctest.run.cmd(
[isctest.vars.ALL["DNSTAPREAD"], "dnstap.out.resolver_addresses"],
)
@ -64,7 +64,7 @@ def test_dnstap_dispatch_socket_addresses(ns3):
bad_frames = []
inspected_frames = 0
addr_regex = r"^10\.53\.0\.[0-9]+:[0-9]{1,5}$"
for line in run.stdout.decode("utf-8").splitlines():
for line in dnstapread.out.splitlines():
_, _, frame_type, addr1, _, addr2, _ = line.split(" ", 6)
# Only inspect RESOLVER_QUERY and RESOLVER_RESPONSE frames.
if frame_type not in ("RQ", "RR"):

View file

@ -25,10 +25,10 @@ def gnutls_cli_executable():
pytest.skip("gnutls-cli not found in PATH")
# Ensure gnutls-cli supports the --logfile command-line option.
output = isctest.run.cmd(
cmd = isctest.run.cmd(
[executable, "--logfile=/dev/null"], log_stderr=False, raise_on_exception=False
).stdout
if b"illegal option" in output:
)
if "illegal option" in cmd.out:
pytest.skip("gnutls-cli does not support the --logfile option")
return executable

View file

@ -15,7 +15,6 @@ import glob
import os
from pathlib import Path
import re
import subprocess
import time
from typing import Dict, List, Optional, Tuple, Union
@ -533,8 +532,8 @@ class Key:
str(self.keyfile),
]
out = isctest.run.cmd(dsfromkey_command)
dsfromkey = out.stdout.decode("utf-8").split()
cmd = isctest.run.cmd(dsfromkey_command)
dsfromkey = cmd.out.split()
rdata_fromfile = " ".join(dsfromkey[:7])
rdata_fromwire = " ".join(cds[:7])
@ -837,18 +836,14 @@ def check_dnssec_verify(server, zone, tsig=None):
file.write(rr.to_text())
file.write("\n")
try:
verify_command = [os.environ.get("VERIFY"), "-z", "-o", zone, zonefile]
verified = isctest.run.cmd(verify_command)
except subprocess.CalledProcessError:
pass
if verified:
break
verify_command = [os.environ.get("VERIFY"), "-z", "-o", zone, zonefile]
verified = isctest.run.cmd(verify_command, raise_on_exception=False)
if verified.rc == 0:
return
time.sleep(1)
assert verified
assert False, "zone not verified"
def check_dnssecstatus(server, zone, keys, policy=None, view=None, verbose=False):

View file

@ -18,6 +18,18 @@ from typing import List, Optional
import isctest.log
class CmdResult:
def __init__(self, proc=None):
self.proc = proc
self.rc = self.proc.returncode
self.out = ""
self.err = ""
if self.proc.stdout:
self.out = self.proc.stdout.decode("utf-8")
if self.proc.stderr:
self.err = self.proc.stderr.decode("utf-8")
def cmd(
args,
cwd=None,
@ -29,7 +41,7 @@ def cmd(
input_text: Optional[bytes] = None,
raise_on_exception=True,
env: Optional[dict] = None,
):
) -> CmdResult:
"""Execute a command with given args as subprocess."""
isctest.log.debug(f"isctest.run.cmd(): {' '.join(args)}")
@ -59,13 +71,13 @@ def cmd(
env=env,
)
print_debug_logs(proc)
return proc
return CmdResult(proc)
except subprocess.CalledProcessError as exc:
print_debug_logs(exc)
isctest.log.debug(f"isctest.run.cmd(): (return code) {exc.returncode}")
if raise_on_exception:
raise exc
return exc
return CmdResult(exc)
def _run_script(
@ -107,11 +119,11 @@ class Dig:
def __init__(self, base_params: str = ""):
self.base_params = base_params
def __call__(self, params: str) -> str:
"""Run the dig command with the given parameters and return the decoded output."""
def __call__(self, params: str) -> CmdResult:
"""Run the dig command with the given parameters."""
return cmd(
[os.environ.get("DIG")] + f"{self.base_params} {params}".split(),
).stdout.decode("utf-8")
)
def shell(script: str, args: Optional[List[str]] = None) -> None:

View file

@ -1282,7 +1282,7 @@ def test_kasp_dnssec_keygen():
zone,
]
return isctest.run.cmd(keygen_command).stdout.decode("utf-8")
return isctest.run.cmd(keygen_command).out
isctest.log.info(
"check that 'dnssec-keygen -k' (configured policy) created valid files"
@ -1327,7 +1327,7 @@ def test_kasp_dnssec_keygen():
str(publish),
key.path,
]
out = isctest.run.cmd(settime).stdout.decode("utf-8")
isctest.run.cmd(settime)
isctest.check.file_contents_equal(f"{key.statefile}", f"{key.statefile}.backup")
assert key.get_metadata("Publish", file=key.privatefile) == str(publish)
@ -1378,7 +1378,7 @@ def test_kasp_dnssec_keygen():
str(now),
key.path,
]
out = isctest.run.cmd(settime).stdout.decode("utf-8")
isctest.run.cmd(settime)
isctest.kasp.check_keys("kasp", keys, expected)
isctest.kasp.check_keytimes(keys, expected)
@ -1415,7 +1415,7 @@ def test_kasp_dnssec_keygen():
str(now),
key.path,
]
out = isctest.run.cmd(settime).stdout.decode("utf-8")
isctest.run.cmd(settime)
isctest.kasp.check_keys("kasp", keys, expected)
isctest.kasp.check_keytimes(keys, expected)
@ -1463,7 +1463,7 @@ def test_kasp_dnssec_keygen():
str(soon),
key.path,
]
out = isctest.run.cmd(settime).stdout.decode("utf-8")
isctest.run.cmd(settime)
isctest.kasp.check_keys("kasp", keys, expected)
isctest.kasp.check_keytimes(keys, expected)

View file

@ -31,25 +31,27 @@ def test_dig_tcp_keepalive_handling(named_port, ns2):
dig = isctest.run.Dig(f"-p {str(named_port)}")
isctest.log.info("check that dig handles TCP keepalive in query")
assert "; TCP-KEEPALIVE" in dig("+qr +keepalive foo.example. @10.53.0.2")
assert "; TCP-KEEPALIVE" in dig("+qr +keepalive foo.example. @10.53.0.2").out
isctest.log.info("check that dig added TCP keepalive was received")
assert get_keepalive_options_received() == 1
isctest.log.info("check that TCP keepalive is added for TCP responses")
assert "; TCP-KEEPALIVE" in dig("+tcp +keepalive foo.example. @10.53.0.2")
assert "; TCP-KEEPALIVE" in dig("+tcp +keepalive foo.example. @10.53.0.2").out
isctest.log.info("check that TCP keepalive requires TCP")
assert "; TCP-KEEPALIVE" not in dig("+keepalive foo.example. @10.53.0.2")
assert "; TCP-KEEPALIVE" not in dig("+keepalive foo.example. @10.53.0.2").out
isctest.log.info("check the default keepalive value")
assert "; TCP-KEEPALIVE: 30.0 secs" in dig(
"+tcp +keepalive foo.example. @10.53.0.3"
assert (
"; TCP-KEEPALIVE: 30.0 secs"
in dig("+tcp +keepalive foo.example. @10.53.0.3").out
)
isctest.log.info("check a keepalive configured value")
assert "; TCP-KEEPALIVE: 15.0 secs" in dig(
"+tcp +keepalive foo.example. @10.53.0.2"
assert (
"; TCP-KEEPALIVE: 15.0 secs"
in dig("+tcp +keepalive foo.example. @10.53.0.2").out
)
isctest.log.info("check a re-configured keepalive value")
@ -59,8 +61,9 @@ def test_dig_tcp_keepalive_handling(named_port, ns2):
assert "tcp-keepalive-timeout=300" in response
assert "tcp-advertised-timeout=200" in response
assert "tcp-primaries-timeout=100" in response
assert "; TCP-KEEPALIVE: 20.0 secs" in dig(
"+tcp +keepalive foo.example. @10.53.0.2"
assert (
"; TCP-KEEPALIVE: 20.0 secs"
in dig("+tcp +keepalive foo.example. @10.53.0.2").out
)
isctest.log.info("check server config entry")

View file

@ -74,18 +74,16 @@ def token_init_and_cleanup():
)
try:
output = isctest.run.cmd(
token_init_command, env=EMPTY_OPENSSL_CONF_ENV
).stdout.decode("utf-8")
assert "The token has been initialized and is reassigned to slot" in output
cmd = isctest.run.cmd(token_init_command, env=EMPTY_OPENSSL_CONF_ENV)
assert "The token has been initialized and is reassigned to slot" in cmd.out
yield
finally:
output = isctest.run.cmd(
cmd = isctest.run.cmd(
token_cleanup_command,
env=EMPTY_OPENSSL_CONF_ENV,
raise_on_exception=False,
).stdout.decode("utf-8")
assert re.search("Found token (.*) with matching token label", output)
)
assert re.search("Found token (.*) with matching token label", cmd.out)
# pylint: disable-msg=too-many-locals
@ -125,11 +123,9 @@ def test_keyfromlabel(alg_name, alg_type, alg_bits):
HSMPIN,
]
output = isctest.run.cmd(
pkcs11_command, env=EMPTY_OPENSSL_CONF_ENV
).stdout.decode("utf-8")
cmd = isctest.run.cmd(pkcs11_command, env=EMPTY_OPENSSL_CONF_ENV)
assert "Key pair generated" in output
assert "Key pair generated" in cmd.out
def keyfromlabel(alg_name, zone, key_id, key_flag):
key_flag = key_flag.split() if key_flag else []
@ -145,12 +141,12 @@ def test_keyfromlabel(alg_name, alg_type, alg_bits):
zone,
]
output = isctest.run.cmd(keyfrlab_command)
output_decoded = output.stdout.decode("utf-8").rstrip() + ".key"
cmd = isctest.run.cmd(keyfrlab_command)
keyfile = cmd.out.rstrip() + ".key"
assert os.path.exists(output_decoded)
assert os.path.exists(keyfile)
return output_decoded
return keyfile
if f"{alg_name.upper()}_SUPPORTED" not in os.environ:
pytest.skip(f"{alg_name} is not supported")

View file

@ -97,8 +97,8 @@ def ksr(zone, policy, action, options="", raise_on_exception=True):
zone,
]
out = isctest.run.cmd(ksr_command, raise_on_exception=raise_on_exception)
return out.stdout.decode("utf-8"), out.stderr.decode("utf-8")
cmd = isctest.run.cmd(ksr_command, raise_on_exception=raise_on_exception)
return cmd.out, cmd.err # TODO return cmd instead
def check_keys(

View file

@ -149,7 +149,7 @@ def test_masterfile_missing_master_file_servfail():
def test_masterfile_owner_inheritance():
"""Test owner inheritance after $INCLUDE"""
checker_output = isctest.run.cmd(
cmd = isctest.run.cmd(
[
os.environ["CHECKZONE"],
"-D",
@ -157,12 +157,12 @@ def test_masterfile_owner_inheritance():
"example",
"zone/inheritownerafterinclude.db",
]
).stdout.decode("utf-8")
)
owner_inheritance_zone = """
example. 0 IN SOA . . 0 0 0 0 0
example. 0 IN TXT "this should be at the zone apex"
example. 0 IN NS .
"""
checker_zone = dns.zone.from_text(checker_output, origin="example.")
checker_zone = dns.zone.from_text(cmd.out, origin="example.")
expected = dns.zone.from_text(owner_inheritance_zone, origin="example.")
isctest.check.zones_equal(checker_zone, expected, compare_ttl=True)

View file

@ -73,8 +73,8 @@ def dsfromkey(key):
"-w",
str(key.keyfile),
]
out = isctest.run.cmd(dsfromkey_command)
return out.stdout.decode("utf-8").split()
cmd = isctest.run.cmd(dsfromkey_command)
return cmd.out.split()
def check_dnssec(server, zone, keys, expected):
@ -102,12 +102,11 @@ def check_no_dnssec_in_journal(server, zone):
f"{server.identifier}/{zone}.db.jnl",
]
out = isctest.run.cmd(journalprint)
contents = out.stdout.decode("utf-8")
cmd = isctest.run.cmd(journalprint)
pattern = re.compile(
r"^\s*(?:\S+\s+){4}(NSEC|NSEC3|NSEC3PARAM|RRSIG)", flags=re.MULTILINE
)
match = pattern.search(contents)
match = pattern.search(cmd.out)
assert not match, f"{match.group(1)} record found in journal"

View file

@ -44,13 +44,11 @@ def test_nzd2nzf(ns1):
# dump "_default.nzd" to "_default.nzf" and check that it contains the expected content
cfg_dir = "ns1"
stdout = isctest.run.cmd(
[os.environ["NZD2NZF"], "_default.nzd"], cwd=cfg_dir
).stdout.decode("utf-8")
assert f"zone {zone_data}" in stdout
cmd = isctest.run.cmd([os.environ["NZD2NZF"], "_default.nzd"], cwd=cfg_dir)
assert f"zone {zone_data}" in cmd.out
nzf_filename = os.path.join(cfg_dir, "_default.nzf")
with open(nzf_filename, "w", encoding="utf-8") as nzf_file:
nzf_file.write(stdout)
nzf_file.write(cmd.out)
# delete "_default.nzd" database
nzd_filename = os.path.join(cfg_dir, "_default.nzd")

View file

@ -58,7 +58,7 @@ def test_rollover_multisigner(ns3, alg, size):
zone,
]
return isctest.run.cmd(keygen_command).stdout.decode("utf-8")
return isctest.run.cmd(keygen_command).out
zone = "multisigner-model2.kasp"

View file

@ -121,22 +121,18 @@ pytestmark = pytest.mark.extra_artifacts(
],
)
def test_rrchecker_list_standard_names(option, expected_result):
stdout = isctest.run.cmd([os.environ["RRCHECKER"], option]).stdout.decode("utf-8")
values = [line for line in stdout.split("\n") if line.strip()]
cmd = isctest.run.cmd([os.environ["RRCHECKER"], option])
values = [line for line in cmd.out.split("\n") if line.strip()]
assert sorted(values) == sorted(expected_result)
def run_rrchecker(option, rr_class, rr_type, rr_rest):
rrchecker_output = (
isctest.run.cmd(
[os.environ["RRCHECKER"], option],
input_text=f"{rr_class} {rr_type} {rr_rest}".encode("utf-8"),
)
.stdout.decode("utf-8")
.strip()
cmd = isctest.run.cmd(
[os.environ["RRCHECKER"], option],
input_text=f"{rr_class} {rr_type} {rr_rest}".encode("utf-8"),
)
return rrchecker_output.split()
return cmd.out.strip().split()
@pytest.mark.parametrize(
@ -162,7 +158,7 @@ def test_rrchecker_conversions(option):
".",
tempzone_file,
],
).stdout.decode("utf-8")
).out
checkzone_output = [
line for line in checkzone_output.splitlines() if not line.startswith(";")
]

View file

@ -52,16 +52,12 @@ def test_nsec3_hashes(domain, nsec3hash):
algorithm = "1"
iterations = "12"
output = isctest.run.cmd(
[NSEC3HASH, salt, algorithm, iterations, domain]
).stdout.decode("utf-8")
assert nsec3hash in output
cmd = isctest.run.cmd([NSEC3HASH, salt, algorithm, iterations, domain])
assert nsec3hash in cmd.out
flags = "0"
output = isctest.run.cmd(
[NSEC3HASH, "-r", algorithm, flags, iterations, salt, domain]
).stdout.decode("utf-8")
assert nsec3hash in output
cmd = isctest.run.cmd([NSEC3HASH, "-r", algorithm, flags, iterations, salt, domain])
assert nsec3hash in cmd.out
@pytest.mark.parametrize(
@ -78,11 +74,11 @@ def test_nsec3_empty_salt(salt_emptiness_args):
iterations = "0"
domain = "com"
output = isctest.run.cmd(
cmd = isctest.run.cmd(
[NSEC3HASH] + salt_emptiness_args + [algorithm, iterations, domain]
).stdout.decode("utf-8")
assert "CK0POJMG874LJREF7EFN8430QVIT8BSM" in output
assert "salt=-" in output
)
assert "CK0POJMG874LJREF7EFN8430QVIT8BSM" in cmd.out
assert "salt=-" in cmd.out
@pytest.mark.parametrize(
@ -98,7 +94,7 @@ def test_nsec3_empty_salt_r(salt_emptiness_arg):
iterations = "0"
domain = "com"
output = isctest.run.cmd(
cmd = isctest.run.cmd(
[
NSEC3HASH,
"-r",
@ -108,8 +104,8 @@ def test_nsec3_empty_salt_r(salt_emptiness_arg):
salt_emptiness_arg,
domain,
]
).stdout.decode("utf-8")
assert " - CK0POJMG874LJREF7EFN8430QVIT8BSM" in output
)
assert " - CK0POJMG874LJREF7EFN8430QVIT8BSM" in cmd.out
@pytest.mark.parametrize(
@ -145,10 +141,8 @@ def test_nsec3hash_acceptable_values(domain, it, salt_bytes) -> None:
)
# calculate the hash using nsec3hash:
output = isctest.run.cmd(
[NSEC3HASH, salt_text, "1", str(it), str(domain)]
).stdout.decode("ascii")
hash2 = output.partition(" ")[0]
cmd = isctest.run.cmd([NSEC3HASH, salt_text, "1", str(it), str(domain)])
hash2 = cmd.out.partition(" ")[0]
assert hash1 == hash2

View file

@ -11,6 +11,7 @@
import os
import re
import subprocess
import pytest
@ -63,12 +64,12 @@ def test_verify_good_zone_nsec_next_name_case_mismatch():
def get_bad_zone_output(zone):
only_opt = ["-z"] if re.match(r"[zk]sk-only", zone) else []
output = isctest.run.cmd(
cmd = isctest.run.cmd(
[VERIFY, *only_opt, "-o", zone, f"zones/{zone}.bad"],
stderr=subprocess.STDOUT,
raise_on_exception=False,
)
stream = (output.stdout + output.stderr).decode("utf-8").replace("\n", "")
return stream
return cmd.out
@pytest.mark.parametrize(
@ -153,27 +154,25 @@ def test_verify_bad_zone_files_unequal_nsec3_chains():
def test_verify_soa_not_at_top_error():
# when -o is not used, origin is set to zone file name,
# which should cause an error in this case
output = isctest.run.cmd(
[VERIFY, "zones/ksk+zsk.nsec.good"], raise_on_exception=False
).stderr.decode("utf-8")
assert "not at top of zone" in output
assert "use -o to specify a different zone origin" in output
cmd = isctest.run.cmd([VERIFY, "zones/ksk+zsk.nsec.good"], raise_on_exception=False)
assert "not at top of zone" in cmd.err
assert "use -o to specify a different zone origin" in cmd.err
# checking error message when an invalid -o is specified
# and a SOA record not at top of zone is found
def test_verify_invalid_o_option_soa_not_at_top_error():
output = isctest.run.cmd(
cmd = isctest.run.cmd(
[VERIFY, "-o", "invalid.origin", "zones/ksk+zsk.nsec.good"],
raise_on_exception=False,
).stderr.decode("utf-8")
assert "not at top of zone" in output
assert "use -o to specify a different zone origin" not in output
)
assert "not at top of zone" in cmd.err
assert "use -o to specify a different zone origin" not in cmd.err
# checking dnssec-verify -J reads journal file
def test_verify_j_reads_journal_file():
output = isctest.run.cmd(
cmd = isctest.run.cmd(
[
VERIFY,
"-o",
@ -182,5 +181,5 @@ def test_verify_j_reads_journal_file():
"zones/updated.other.jnl",
"zones/updated.other",
]
).stdout.decode("utf-8")
assert "Loading zone 'updated' from file 'zones/updated.other'" in output
)
assert "Loading zone 'updated' from file 'zones/updated.other'" in cmd.out