mirror of
https://github.com/isc-projects/bind9.git
synced 2026-05-28 04:34:54 -04:00
Use Text with Grep support in isctest.run.cmd()
When commands are executed using the isctest.run.cmd() command, allow the output to be Grep-able like logs and text files.
This commit is contained in:
parent
7743bab5fc
commit
4b6a86b029
7 changed files with 75 additions and 90 deletions
|
|
@ -10,7 +10,7 @@
|
|||
# information regarding copyright ownership.
|
||||
|
||||
import os
|
||||
import re
|
||||
from re import compile as Re
|
||||
import subprocess
|
||||
|
||||
import pytest
|
||||
|
|
@ -49,13 +49,6 @@ pytestmark = pytest.mark.extra_artifacts(
|
|||
)
|
||||
|
||||
|
||||
# helper functions
|
||||
def grep_c(regex, data):
|
||||
blob = data.splitlines()
|
||||
results = [x for x in blob if re.search(regex, x)]
|
||||
return len(results)
|
||||
|
||||
|
||||
# run delv
|
||||
def delv(*args, tkeys=False):
|
||||
delv_cmd = [os.environ.get("DELV")]
|
||||
|
|
@ -70,115 +63,115 @@ def delv(*args, tkeys=False):
|
|||
def test_positive_validation_delv():
|
||||
# check positive validation NSEC
|
||||
response = delv("a", "a.example")
|
||||
assert grep_c("a.example..*10.0.0.1", response.out)
|
||||
assert grep_c("a.example..*.RRSIG.A [0-9][0-9]* 2 300 .*", response.out)
|
||||
assert Re("a.example..*10.0.0.1") in response.out
|
||||
assert Re("a.example..*.RRSIG.A [0-9][0-9]* 2 300 .*") in response.out
|
||||
|
||||
# check positive validation NSEC (trsuted-keys)
|
||||
response = delv("a", "a.example", tkeys=True)
|
||||
assert grep_c("a.example..*10.0.0.1", response.out)
|
||||
assert grep_c("a.example..*.RRSIG.A [0-9][0-9]* 2 300 .*", response.out)
|
||||
assert Re("a.example..*10.0.0.1") in response.out
|
||||
assert Re("a.example..*.RRSIG.A [0-9][0-9]* 2 300 .*") in response.out
|
||||
|
||||
# check positive validation NSEC3
|
||||
response = delv("a", "a.nsec3.example")
|
||||
assert grep_c("a.nsec3.example..*10.0.0.1", response.out)
|
||||
assert grep_c("a.nsec3.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response.out)
|
||||
assert Re("a.nsec3.example..*10.0.0.1") in response.out
|
||||
assert Re("a.nsec3.example..*.RRSIG.A [0-9][0-9]* 3 300 .*") in response.out
|
||||
|
||||
# check positive validation OPTOUT
|
||||
response = delv("a", "a.optout.example")
|
||||
assert grep_c("a.optout.example..*10.0.0.1", response.out)
|
||||
assert grep_c("a.optout.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response.out)
|
||||
assert Re("a.optout.example..*10.0.0.1") in response.out
|
||||
assert Re("a.optout.example..*.RRSIG.A [0-9][0-9]* 3 300 .*") in response.out
|
||||
|
||||
# check positive wildcard validation NSEC
|
||||
response = delv("a", "a.wild.example")
|
||||
assert grep_c("a.wild.example..*10.0.0.27", response.out)
|
||||
assert grep_c("a.wild.example..*.RRSIG.A [0-9][0-9]* 2 300 .*", response.out)
|
||||
assert Re("a.wild.example..*10.0.0.27") in response.out
|
||||
assert Re("a.wild.example..*.RRSIG.A [0-9][0-9]* 2 300 .*") in response.out
|
||||
|
||||
# check positive wildcard validation NSEC3
|
||||
response = delv("a", "a.wild.nsec3.example")
|
||||
assert grep_c("a.wild.nsec3.example..*10.0.0.6", response.out)
|
||||
assert grep_c("a.wild.nsec3.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response.out)
|
||||
assert Re("a.wild.nsec3.example..*10.0.0.6") in response.out
|
||||
assert Re("a.wild.nsec3.example..*.RRSIG.A [0-9][0-9]* 3 300 .*") in response.out
|
||||
|
||||
# check positive wildcard validation OPTOUT
|
||||
response = delv("a", "a.wild.optout.example")
|
||||
assert grep_c("a.wild.optout.example..*10.0.0.6", response.out)
|
||||
assert grep_c("a.wild.optout.example..*.RRSIG.A [0-9][0-9]* 3 300 .*", response.out)
|
||||
assert Re("a.wild.optout.example..*10.0.0.6") in response.out
|
||||
assert Re("a.wild.optout.example..*.RRSIG.A [0-9][0-9]* 3 300 .*") in response.out
|
||||
|
||||
|
||||
def test_negative_validation_delv():
|
||||
# checking negative validation NXDOMAIN NSEC
|
||||
response = delv("a", "q.example")
|
||||
assert grep_c("resolution failed: ncache nxdomain", response.out)
|
||||
assert "resolution failed: ncache nxdomain" in response.out
|
||||
|
||||
# checking negative validation NODATA NSEC
|
||||
response = delv("txt", "a.example")
|
||||
assert grep_c("resolution failed: ncache nxrrset", response.out)
|
||||
assert "resolution failed: ncache nxrrset" in response.out
|
||||
|
||||
# checking negative validation NXDOMAIN NSEC3
|
||||
response = delv("a", "q.nsec3.example")
|
||||
assert grep_c("resolution failed: ncache nxdomain", response.out)
|
||||
assert "resolution failed: ncache nxdomain" in response.out
|
||||
|
||||
# checking negative validation NODATA NSEC3
|
||||
response = delv("txt", "a.nsec3.example")
|
||||
assert grep_c("resolution failed: ncache nxrrset", response.out)
|
||||
assert "resolution failed: ncache nxrrset" in response.out
|
||||
|
||||
# checking negative validation NXDOMAIN OPTOUT
|
||||
response = delv("a", "q.optout.example")
|
||||
assert grep_c("resolution failed: ncache nxdomain", response.out)
|
||||
assert "resolution failed: ncache nxdomain" in response.out
|
||||
|
||||
# checking negative validation NODATA OPTOUT
|
||||
response = delv("txt", "a.optout.example")
|
||||
assert grep_c("resolution failed: ncache nxrrset", response.out)
|
||||
assert "resolution failed: ncache nxrrset" in response.out
|
||||
|
||||
# checking negative wildcard validation NSEC
|
||||
response = delv("txt", "b.wild.example")
|
||||
assert grep_c("resolution failed: ncache nxrrset", response.out)
|
||||
assert "resolution failed: ncache nxrrset" in response.out
|
||||
|
||||
# checking negative wildcard validation NSEC3
|
||||
response = delv("txt", "b.wild.nsec3.example")
|
||||
assert grep_c("resolution failed: ncache nxrrset", response.out)
|
||||
assert "resolution failed: ncache nxrrset" in response.out
|
||||
|
||||
# checking negative wildcard validation OPTOUT
|
||||
response = delv("txt", "b.wild.optout.example")
|
||||
assert grep_c("resolution failed: ncache nxrrset", response.out)
|
||||
assert "resolution failed: ncache nxrrset" in response.out
|
||||
|
||||
|
||||
def test_insecure_validation_delv():
|
||||
# check 1-server insecurity proof NSEC
|
||||
response = delv("a", "a.insecure.example")
|
||||
assert grep_c("a.insecure.example..*10.0.0.1", response.out)
|
||||
assert Re("a.insecure.example..*10.0.0.1") in response.out
|
||||
|
||||
# check 1-server insecurity proof NSEC3
|
||||
response = delv("a", "a.insecure.nsec3.example")
|
||||
assert grep_c("a.insecure.nsec3.example..*10.0.0.1", response.out)
|
||||
assert Re("a.insecure.nsec3.example..*10.0.0.1") in response.out
|
||||
|
||||
# check 1-server insecurity proof NSEC3
|
||||
response = delv("a", "a.insecure.optout.example")
|
||||
assert grep_c("a.insecure.optout.example..*10.0.0.1", response.out)
|
||||
assert Re("a.insecure.optout.example..*10.0.0.1") in response.out
|
||||
|
||||
# check 1-server negative insecurity proof NSEC
|
||||
response = delv("a", "q.insecure.example")
|
||||
assert grep_c("resolution failed: ncache nxdomain", response.out)
|
||||
assert "resolution failed: ncache nxdomain" in response.out
|
||||
|
||||
# check 1-server negative insecurity proof NSEC3
|
||||
response = delv("a", "q.insecure.nsec3.example")
|
||||
assert grep_c("resolution failed: ncache nxdomain", response.out)
|
||||
assert "resolution failed: ncache nxdomain" in response.out
|
||||
|
||||
# check 1-server negative insecurity proof OPTOUT
|
||||
response = delv("a", "q.insecure.optout.example")
|
||||
assert grep_c("resolution failed: ncache nxdomain", response.out)
|
||||
assert "resolution failed: ncache nxdomain" in response.out
|
||||
|
||||
|
||||
def test_validation_failure_delv():
|
||||
# check failed validation due to bogus data
|
||||
response = delv("+cd", "a", "a.bogus.example")
|
||||
assert grep_c("resolution failed: RRSIG failed to verify", response.out)
|
||||
assert "resolution failed: RRSIG failed to verify" in response.out
|
||||
|
||||
# check failed validation due to missing key record
|
||||
response = delv("+cd", "a", "a.b.keyless.example")
|
||||
assert grep_c("resolution failed: insecurity proof failed", response.out)
|
||||
assert "resolution failed: insecurity proof failed" in response.out
|
||||
|
||||
|
||||
def test_revoked_key_delv():
|
||||
# check failed validation succeeds when a revoked key is encountered
|
||||
response = delv("+cd", "soa", "revkey.example")
|
||||
assert grep_c("fully validated", response.out)
|
||||
assert "fully validated" in response.out
|
||||
|
|
|
|||
|
|
@ -52,14 +52,6 @@ pytestmark = pytest.mark.extra_artifacts(
|
|||
)
|
||||
|
||||
|
||||
# helper functions
|
||||
def grep_c(regex, filename):
|
||||
with open(filename, "r", encoding="utf-8") as f:
|
||||
blob = f.read().splitlines()
|
||||
results = [x for x in blob if re.search(regex, x)]
|
||||
return len(results)
|
||||
|
||||
|
||||
# run dnssec-keygen
|
||||
def keygen(*args):
|
||||
keygen_cmd = [os.environ.get("KEYGEN")]
|
||||
|
|
@ -126,7 +118,7 @@ def test_split_dnssec():
|
|||
isctest.check.adflag(res2)
|
||||
|
||||
|
||||
def test_expiring_rrsig():
|
||||
def test_expiring_rrsig(ns3):
|
||||
# check soon-to-expire RRSIGs without a replacement private
|
||||
# key aren't deleted. this response has to have an RRSIG:
|
||||
msg = isctest.query.create("expiring.example.", "NS")
|
||||
|
|
@ -135,8 +127,7 @@ def test_expiring_rrsig():
|
|||
assert sigs
|
||||
|
||||
# check that named doesn't loop when private keys are not available
|
||||
n = grep_c("reading private key file expiring.example", "ns3/named.run")
|
||||
assert n < 15
|
||||
assert len(ns3.log.grep("reading private key file expiring.example")) < 15
|
||||
|
||||
# check expired signatures stay place when updates are disabled
|
||||
msg = isctest.query.create("expired.example", "SOA")
|
||||
|
|
|
|||
|
|
@ -16,18 +16,19 @@ import time
|
|||
from typing import List, Optional
|
||||
|
||||
import isctest.log
|
||||
import isctest.text
|
||||
|
||||
|
||||
class CmdResult:
|
||||
def __init__(self, proc=None):
|
||||
self.proc = proc
|
||||
self.rc = self.proc.returncode
|
||||
self.out = ""
|
||||
self.err = ""
|
||||
self.out = isctest.text.Text("")
|
||||
self.err = isctest.text.Text("")
|
||||
if self.proc.stdout:
|
||||
self.out = self.proc.stdout.decode("utf-8")
|
||||
self.out = isctest.text.Text(self.proc.stdout.decode("utf-8"))
|
||||
if self.proc.stderr:
|
||||
self.err = self.proc.stderr.decode("utf-8")
|
||||
self.err = isctest.text.Text(self.proc.stderr.decode("utf-8"))
|
||||
|
||||
|
||||
def cmd(
|
||||
|
|
|
|||
|
|
@ -65,6 +65,15 @@ class Grep(abc.ABC):
|
|||
return True
|
||||
|
||||
|
||||
class Text(Grep, str): # type: ignore
|
||||
"""
|
||||
Wrapper around classic string with grep support.
|
||||
"""
|
||||
|
||||
def readlines(self):
|
||||
yield from self.splitlines(keepends=True)
|
||||
|
||||
|
||||
class TextFile(Grep):
|
||||
"""
|
||||
Text file wrapper with grep support.
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@
|
|||
|
||||
import hashlib
|
||||
import os
|
||||
import re
|
||||
from re import compile as Re
|
||||
import shutil
|
||||
|
||||
import pytest
|
||||
|
|
@ -83,7 +83,7 @@ def token_init_and_cleanup():
|
|||
env=EMPTY_OPENSSL_CONF_ENV,
|
||||
raise_on_exception=False,
|
||||
)
|
||||
assert re.search("Found token (.*) with matching token label", cmd.out)
|
||||
assert Re("Found token (.*) with matching token label") in cmd.out
|
||||
|
||||
|
||||
# pylint: disable-msg=too-many-locals
|
||||
|
|
|
|||
|
|
@ -11,7 +11,6 @@
|
|||
|
||||
from datetime import timedelta
|
||||
import os
|
||||
import re
|
||||
from re import compile as Re
|
||||
|
||||
import pytest
|
||||
|
|
@ -104,9 +103,9 @@ def check_no_dnssec_in_journal(server, zone):
|
|||
]
|
||||
|
||||
cmd = isctest.run.cmd(journalprint)
|
||||
pattern = Re(r"^\s*(?:\S+\s+){4}(NSEC|NSEC3|NSEC3PARAM|RRSIG)", flags=re.MULTILINE)
|
||||
match = pattern.search(cmd.out)
|
||||
assert not match, f"{match.group(1)} record found in journal"
|
||||
assert (
|
||||
Re(r"^\s*(?:\S+\s+){4}(NSEC|NSEC3|NSEC3PARAM|RRSIG)") not in cmd.out
|
||||
), "dnssec record found in journal"
|
||||
|
||||
|
||||
def wait_for_serial(primary, server, zone):
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@
|
|||
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
from re import compile as Re
|
||||
|
||||
import pytest
|
||||
|
||||
|
|
@ -62,14 +62,14 @@ def test_verify_good_zone_nsec_next_name_case_mismatch():
|
|||
)
|
||||
|
||||
|
||||
def get_bad_zone_output(zone):
|
||||
only_opt = ["-z"] if re.match(r"[zk]sk-only", zone) else []
|
||||
def verify_bad_zone(zone):
|
||||
only_opt = ["-z"] if re.search(r"^[zk]sk-only", zone) else []
|
||||
cmd = isctest.run.cmd(
|
||||
[VERIFY, *only_opt, "-o", zone, f"zones/{zone}.bad"],
|
||||
stderr=subprocess.STDOUT,
|
||||
raise_on_exception=False,
|
||||
)
|
||||
return cmd.out
|
||||
assert cmd.rc != 0
|
||||
return cmd
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
|
@ -81,7 +81,8 @@ def get_bad_zone_output(zone):
|
|||
],
|
||||
)
|
||||
def test_verify_bad_zone_files_dnskeyonly(zone):
|
||||
assert re.match(r".*DNSKEY is not signed.*", get_bad_zone_output(zone))
|
||||
cmd = verify_bad_zone(zone)
|
||||
assert "DNSKEY is not signed" in cmd.err
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
|
@ -98,10 +99,8 @@ def test_verify_bad_zone_files_dnskeyonly(zone):
|
|||
],
|
||||
)
|
||||
def test_verify_bad_zone_files_expired(zone):
|
||||
assert re.match(
|
||||
r".*signature has expired.*|.*No self-signed .*DNSKEY found.*",
|
||||
get_bad_zone_output(zone),
|
||||
)
|
||||
cmd = verify_bad_zone(zone)
|
||||
assert Re("signature has expired|No self-signed DNSKEY found") in cmd.err
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
|
@ -113,40 +112,33 @@ def test_verify_bad_zone_files_expired(zone):
|
|||
],
|
||||
)
|
||||
def test_verify_bad_zone_files_unexpected_nsec_rrset(zone):
|
||||
assert re.match(r".*unexpected NSEC RRset at.*", get_bad_zone_output(zone))
|
||||
cmd = verify_bad_zone(zone)
|
||||
assert "unexpected NSEC RRset at" in cmd.err
|
||||
|
||||
|
||||
def test_verify_bad_zone_files_bad_nsec_record():
|
||||
assert re.match(
|
||||
r".*Bad NSEC record for.*, next name mismatch.*",
|
||||
get_bad_zone_output("ksk+zsk.nsec.broken-chain"),
|
||||
)
|
||||
cmd = verify_bad_zone("ksk+zsk.nsec.broken-chain")
|
||||
assert Re("Bad NSEC record for.*, next name mismatch") in cmd.err
|
||||
|
||||
|
||||
def test_verify_bad_zone_files_bad_bitmap():
|
||||
assert re.match(
|
||||
r".*bit map mismatch.*", get_bad_zone_output("ksk+zsk.nsec.bad-bitmap")
|
||||
)
|
||||
cmd = verify_bad_zone("ksk+zsk.nsec.bad-bitmap")
|
||||
assert "bit map mismatch" in cmd.err
|
||||
|
||||
|
||||
def test_verify_bad_zone_files_missing_nsec3_record():
|
||||
assert re.match(
|
||||
r".*Missing NSEC3 record for.*",
|
||||
get_bad_zone_output("ksk+zsk.nsec3.missing-empty"),
|
||||
)
|
||||
cmd = verify_bad_zone("ksk+zsk.nsec3.missing-empty")
|
||||
assert "Missing NSEC3 record for" in cmd.err
|
||||
|
||||
|
||||
def test_verify_bad_zone_files_no_dnssec_keys():
|
||||
assert re.match(
|
||||
r".*Zone contains no DNSSEC keys.*", get_bad_zone_output("unsigned")
|
||||
)
|
||||
cmd = verify_bad_zone("unsigned")
|
||||
assert "Zone contains no DNSSEC keys" in cmd.err
|
||||
|
||||
|
||||
def test_verify_bad_zone_files_unequal_nsec3_chains():
|
||||
assert re.match(
|
||||
r".*Expected and found NSEC3 chains not equal.*",
|
||||
get_bad_zone_output("ksk+zsk.nsec3.extra-nsec3"),
|
||||
)
|
||||
cmd = verify_bad_zone("ksk+zsk.nsec3.extra-nsec3")
|
||||
assert "Expected and found NSEC3 chains not equal" in cmd.err
|
||||
|
||||
|
||||
# checking error message when -o is not used
|
||||
|
|
|
|||
Loading…
Reference in a new issue