Refactor LogFile into TextFile with Grep support

Add a new Grep-like interface which can be used for searching for
regular expressions in files. Replace the prior LogFile used for named
logs with the new TextFile interface.
This commit is contained in:
Nicki Křížek 2025-10-02 17:06:25 +02:00
parent be6bae2a75
commit 7743bab5fc
17 changed files with 101 additions and 91 deletions

View file

@ -60,7 +60,7 @@ def test_tat_queries(ns1, ns6):
watcher.wait_for_line("trust-anchor-telemetry '_ta-")
# check that _ta-AAAA trust-anchor-telemetry are not sent when disabled
ns1.log.prohibit("sending trust-anchor-telemetry query '_ta")
assert "sending trust-anchor-telemetry query '_ta" not in ns1.log
# check that KEY-TAG (ednsopt 14) trust-anchor-telemetry queries are
# logged. this matches "dig . dnskey +ednsopt=KEY-TAG:ffff":

View file

@ -9,8 +9,8 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from re import compile as Re
import os
import re
import shutil
import time
@ -54,14 +54,6 @@ pytestmark = pytest.mark.extra_artifacts(
)
# helper functions
def grep_q(regex, filename):
with open(filename, "r", encoding="utf-8") as f:
blob = f.read().splitlines()
results = [x for x in blob if re.search(regex, x)]
return len(results) != 0
def getfrom(file):
with open(file, encoding="utf-8") as f:
return f.read().strip()
@ -501,13 +493,10 @@ def test_negative_validation_nsec3():
isctest.check.servfail(res2)
def test_excessive_nsec3_iterations():
assert grep_q(
"zone too-many-iterations/IN: excessive NSEC3PARAM iterations", "ns2/named.run"
)
assert grep_q(
"zone too-many-iterations/IN: excessive NSEC3PARAM iterations", "ns3/named.run"
)
def test_excessive_nsec3_iterations(ns2, ns3):
msg = "zone too-many-iterations/IN: excessive NSEC3PARAM iterations"
assert msg in ns2.log
assert msg in ns3.log
# check fallback to insecure with NSEC3 iterations is too high
msg = isctest.query.create("does-not-exist.too-many-iterations", "A")
@ -721,10 +710,11 @@ def test_negative_validation_optout():
def test_cache(ns4):
# check that key id's are logged when dumping the cache
ns4.rndc("dumpdb -cache", log=False)
assert grep_q("; key id = ", "ns4/named_dump.db")
dumpdb = isctest.text.TextFile("ns4/named_dump.db")
assert "; key id = " in dumpdb
# check for RRSIG covered type in negative cache
assert grep_q("; example. RRSIG NSEC ", "ns4/named_dump.db")
assert "; example. RRSIG NSEC " in dumpdb
# check validated data are not cached longer than originalttl
msg = isctest.query.create("a.ttlpatch.example", "A")
@ -952,7 +942,8 @@ def test_validation_recovery(ns2, ns4):
res = isctest.query.tcp(msg, "10.53.0.4")
isctest.check.servfail(res)
ns4.rndc("dumpdb", log=False)
grep_q("10.53.0.100", "ns4/named_dump.db")
dumpdb = isctest.text.TextFile("ns4/named_dump.db")
assert "10.53.0.100" in dumpdb
# then reload server with properly signed zone
shutil.copyfile(
@ -1136,7 +1127,7 @@ def test_expired_signatures(ns4):
isctest.check.servfail(res)
isctest.check.noadflag(res)
isctest.check.ede(res, EDECode.SIGNATURE_EXPIRED)
assert grep_q("expired.example/.*: RRSIG has expired", "ns4/named.run")
assert Re("expired.example/.*: RRSIG has expired") in ns4.log
# check future signatures do not validate
msg = isctest.query.create("future.example", "SOA")
@ -1144,9 +1135,7 @@ def test_expired_signatures(ns4):
isctest.check.servfail(res)
isctest.check.noadflag(res)
isctest.check.ede(res, EDECode.SIGNATURE_NOT_YET_VALID)
assert grep_q(
"future.example/.*: RRSIG validity period has not begun", "ns4/named.run"
)
assert Re("future.example/.*: RRSIG validity period has not begun") in ns4.log
# check that a dynamic zone with future signatures is re-signed on load
msg = isctest.query.create("managed-future.example", "A")

View file

@ -21,10 +21,11 @@ import re
import dns.message
import dns.rcode
from .log import debug, info, LogFile, WatchLogFromStart, WatchLogFromHere
from .log import debug, info, WatchLogFromStart, WatchLogFromHere
from .rndc import RNDCBinaryExecutor, RNDCException, RNDCExecutor
from .run import perl
from .query import udp
from .text import TextFile
class NamedPorts(NamedTuple):
@ -87,7 +88,7 @@ class NamedInstance:
if ports is None:
ports = NamedPorts.from_env()
self.ports = ports
self.log = LogFile(os.path.join(identifier, "named.run"))
self.log = TextFile(os.path.join(identifier, "named.run"))
self._rndc_executor = rndc_executor or RNDCBinaryExecutor()
self._rndc_logger = rndc_logger
@ -239,3 +240,6 @@ class NamedInstance:
f"{os.environ['srcdir']}/start.pl",
[self.system_test_name, self.identifier] + args,
)
def __repr__(self):
return self.identifier

View file

@ -1099,9 +1099,8 @@ def check_cdslog(server, zone, key, substr):
def check_cdslog_prohibit(server, zone, key, substr):
server.log.prohibit(
f"{substr} for key {zone}/{key.algorithm.name}/{key.tag} is now published"
)
msg = f"{substr} for key {zone}/{key.algorithm.name}/{key.tag} is now published"
assert msg not in server.log
def check_cdsdelete(rrset, expected):

View file

@ -23,4 +23,4 @@ from .basic import (
critical,
)
from .watchlog import LogFile, WatchLogFromStart, WatchLogFromHere
from .watchlog import WatchLogFromStart, WatchLogFromHere

View file

@ -15,7 +15,7 @@ import abc
import os
import time
from isctest.text import compile_pattern, FlexPattern, LineReader, LogFile
from isctest.text import compile_pattern, FlexPattern, LineReader
T = TypeVar("T")

View file

@ -14,7 +14,7 @@
import abc
import re
from re import compile as Re
from typing import Iterator, Match, Optional, Pattern, TextIO, Union
from typing import Iterator, List, Match, Optional, Pattern, TextIO, Union
FlexPattern = Union[str, Pattern]
@ -28,41 +28,60 @@ def compile_pattern(string: FlexPattern) -> Pattern:
raise TypeError("only string and re.Pattern allowed")
class LogFile:
class Grep(abc.ABC):
"""
Log file wrapper with a path and means to find a string in its contents.
Implement a grep-like interface for pattern matching in texts and files.
"""
@abc.abstractmethod
def readlines(self) -> Iterator[str]:
raise NotImplementedError
def igrep(self, pattern: FlexPattern) -> Iterator[Match]:
"""
Iterate over the lines matching the pattern.
"""
regex = compile_pattern(pattern)
for line in self.readlines():
match = regex.search(line)
if match:
yield match
def grep(self, pattern: FlexPattern) -> List[Match]:
"""
Get list of lines matching the pattern.
"""
return list(self.igrep(pattern))
def __contains__(self, pattern: FlexPattern) -> bool:
"""
Return whether any of the lines in the log contains matches the pattern.
"""
try:
next(self.igrep(pattern))
except StopIteration:
return False
return True
class TextFile(Grep):
"""
Text file wrapper with grep support.
"""
def __init__(self, path: str):
self.path = path
@property
def _lines(self) -> Iterator[str]:
def readlines(self) -> Iterator[str]:
with open(self.path, encoding="utf-8") as f:
yield from f
def __contains__(self, substring: str) -> bool:
"""
Return whether any of the lines in the log contains a given string.
"""
for line in self._lines:
if substring in line:
return True
return False
def expect(self, msg: str):
"""Check the string is present anywhere in the log file."""
if msg in self:
return
assert False, f"log message not found in log {self.path}: {msg}"
def prohibit(self, msg: str):
"""Check the string is not present in the entire log file."""
if msg in self:
assert False, f"forbidden message appeared in log {self.path}: {msg}"
def __repr__(self):
return self.path
class LineReader:
class LineReader(Grep):
"""
>>> import io

View file

@ -1601,7 +1601,7 @@ def test_kasp_zsk_retired(ns3):
watcher.wait_for_line(f"keymgr: {zone} done")
msg = f"zone {zone}/IN (signed): zone_rekey:zone_verifykeys failed: some key files are missing"
ns3.log.prohibit(msg)
assert msg not in ns3.log
def test_kasp_purge_keys(ns4):
@ -1625,10 +1625,10 @@ def test_kasp_purge_keys(ns4):
watcher.wait_for_line(f"keymgr: {zone} done")
msg = f"zone {zone}/IN/example1 (signed): zone_rekey:zone_verifykeys failed: some key files are missing"
ns4.log.prohibit(msg)
assert msg not in ns4.log
msg = f"zone {zone}/IN/example2 (signed): zone_rekey:zone_verifykeys failed: some key files are missing"
ns4.log.prohibit(msg)
assert msg not in ns4.log
def test_kasp_reload_restart(ns6):
@ -1730,7 +1730,7 @@ def test_kasp_manual_mode(ns3):
# Key rollover should have been be blocked.
tag = expected[1].key.tag
blockmsg = f"keymgr-manual-mode: block ZSK rollover for key {zone}/ECDSAP256SHA256/{tag} (policy {policy})"
ns3.log.expect(blockmsg)
assert blockmsg in ns3.log
# Remove files.
for key in ksks + zsks:

View file

@ -179,7 +179,7 @@ def check_add_zsk(server, zone, keys, expected, extra_keys, extra, primary=None)
isctest.log.info(
f"- zone {zone} {server.identifier}: make sure we did not try to sign with the keys added with nsupdate"
)
server.log.prohibit(f"dns_zone_findkeys: error reading ./K{zone}")
assert f"dns_zone_findkeys: error reading ./K{zone}" not in server.log
# Trigger keymgr.
with server.watch_log_from_here() as watcher:
@ -189,7 +189,7 @@ def check_add_zsk(server, zone, keys, expected, extra_keys, extra, primary=None)
# Check again.
isctest.log.info(f"- zone {zone} {server.identifier}: check again after keymgr run")
check_dnssec(server, zone, keys + extra_keys, expected + extra)
server.log.prohibit(f"dns_zone_findkeys: error reading ./K{zone}")
assert f"dns_zone_findkeys: error reading ./K{zone}" not in server.log
def _check_remove_zsk_fail(

View file

@ -82,8 +82,8 @@ def test_algoroll_csk_reconfig_step1(tld, ns6, alg, size):
tag = keys[0].key.tag
msg1 = f"keymgr-manual-mode: block retire DNSKEY {zone}/RSASHA256/{tag} (CSK)"
msg2 = f"keymgr-manual-mode: block new key generation for zone {zone} (policy {policy})"
ns6.log.expect(msg1)
ns6.log.expect(msg2)
assert msg1 in ns6.log
assert msg2 in ns6.log
# Force step.
with ns6.watch_log_from_here() as watcher:
@ -175,7 +175,7 @@ def test_algoroll_csk_reconfig_step3(tld, ns6, alg, size):
# Check logs.
tag = keys[1].key.tag
msg = f"keymgr-manual-mode: block transition CSK {zone}/ECDSAP256SHA256/{tag} type DS state HIDDEN to state RUMOURED"
ns6.log.expect(msg)
assert msg in ns6.log
# Force step.
with ns6.watch_log_from_here() as watcher:
@ -244,7 +244,7 @@ def test_algoroll_csk_reconfig_step4(tld, ns6, alg, size):
# Check logs.
tag = keys[0].key.tag
msg = f"keymgr-manual-mode: block transition CSK {zone}/RSASHA256/{tag} type DNSKEY state OMNIPRESENT to state UNRETENTIVE"
ns6.log.expect(msg)
assert msg in ns6.log
# Force step.
tag = keys[1].key.tag

View file

@ -85,9 +85,9 @@ def test_algoroll_ksk_zsk_reconfig_step1(tld, ns6, alg, size):
msg1 = f"keymgr-manual-mode: block retire DNSKEY {zone}/RSASHA256/{ktag} (KSK)"
msg2 = f"keymgr-manual-mode: block retire DNSKEY {zone}/RSASHA256/{ztag} (ZSK)"
msg3 = f"keymgr-manual-mode: block new key generation for zone {zone} (policy {policy})" # twice
ns6.log.expect(msg1)
ns6.log.expect(msg2)
ns6.log.expect(msg3)
assert msg1 in ns6.log
assert msg2 in ns6.log
assert len(ns6.log.grep(msg3)) >= 2
# Force step.
with ns6.watch_log_from_here() as watcher:
@ -184,7 +184,7 @@ def test_algoroll_ksk_zsk_reconfig_step3(tld, ns6, alg, size):
# Check logs.
tag = keys[2].key.tag
msg = f"keymgr-manual-mode: block transition KSK {zone}/ECDSAP256SHA256/{tag} type DS state HIDDEN to state RUMOURED"
ns6.log.expect(msg)
assert msg in ns6.log
# Force step.
with ns6.watch_log_from_here() as watcher:
@ -259,8 +259,8 @@ def test_algoroll_ksk_zsk_reconfig_step4(tld, ns6, alg, size):
ztag = keys[1].key.tag
msg1 = f"keymgr-manual-mode: block transition KSK {zone}/RSASHA256/{ktag} type DNSKEY state OMNIPRESENT to state UNRETENTIVE"
msg2 = f"keymgr-manual-mode: block transition ZSK {zone}/RSASHA256/{ztag} type DNSKEY state OMNIPRESENT to state UNRETENTIVE"
ns6.log.expect(msg1)
ns6.log.expect(msg2)
assert msg1 in ns6.log
assert msg2 in ns6.log
# Force step.
ktag = keys[3].key.tag

View file

@ -125,7 +125,7 @@ def test_csk_roll1_step2(tld, alg, size, ns3):
# Check logs.
tag = keys[0].key.tag
msg = f"keymgr-manual-mode: block CSK rollover for key {zone}/ECDSAP256SHA256/{tag} (policy {policy})"
ns3.log.expect(msg)
assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:
@ -185,7 +185,7 @@ def test_csk_roll1_step3(tld, alg, size, ns3):
# Check logs.
tag = keys[1].key.tag
msg = f"keymgr-manual-mode: block transition CSK {zone}/ECDSAP256SHA256/{tag} type ZRRSIG state HIDDEN to state RUMOURED"
ns3.log.expect(msg)
assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:
@ -274,8 +274,7 @@ def test_csk_roll1_step4(tld, alg, size, ns3):
# Check logs.
tag = keys[0].key.tag
msg = f"keymgr-manual-mode: block transition CSK {zone}/ECDSAP256SHA256/{tag} type KRRSIG state OMNIPRESENT to state UNRETENTIVE"
ns3.log.expect(msg)
assert msg in ns3.log
# Force step.
tag = keys[1].key.tag

View file

@ -128,7 +128,7 @@ def test_csk_roll2_step2(tld, alg, size, ns3):
# Check logs.
tag = keys[0].key.tag
msg = f"keymgr-manual-mode: block CSK rollover for key {zone}/ECDSAP256SHA256/{tag} (policy {policy})"
ns3.log.expect(msg)
assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:
@ -188,7 +188,7 @@ def test_csk_roll2_step3(tld, alg, size, ns3):
# Check logs.
tag = keys[1].key.tag
msg = f"keymgr-manual-mode: block transition CSK {zone}/ECDSAP256SHA256/{tag} type ZRRSIG state HIDDEN to state RUMOURED"
ns3.log.expect(msg)
assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:
@ -315,8 +315,8 @@ def test_csk_roll2_step5(tld, alg, size, ns3):
tag = keys[0].key.tag
msg1 = f"keymgr-manual-mode: block transition CSK {zone}/ECDSAP256SHA256/{tag} type DNSKEY state OMNIPRESENT to state UNRETENTIVE"
msg2 = f"keymgr-manual-mode: block transition CSK {zone}/ECDSAP256SHA256/{tag} type KRRSIG state OMNIPRESENT to state UNRETENTIVE"
ns3.log.expect(msg1)
ns3.log.expect(msg2)
assert msg1 in ns3.log
assert msg2 in ns3.log
# Force step.
tag = keys[1].key.tag

View file

@ -74,7 +74,7 @@ def test_rollover_enable_dnssec_step1(tld, alg, size, ns3):
# Check logs.
msg = f"keymgr-manual-mode: block new key generation for zone {zone} (policy {policy})"
ns3.log.expect(msg)
assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:
@ -155,7 +155,7 @@ def test_rollover_enable_dnssec_step3(tld, alg, size, ns3):
# Check logs.
tag = keys[0].key.tag
msg = f"keymgr-manual-mode: block transition CSK {zone}/ECDSAP256SHA256/{tag} type DS state HIDDEN to state RUMOURED"
ns3.log.expect(msg)
assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:

View file

@ -110,7 +110,7 @@ def test_ksk_doubleksk_step2(tld, alg, size, ns3):
# Check logs.
tag = keys[1].key.tag
msg = f"keymgr-manual-mode: block KSK rollover for key {zone}/ECDSAP256SHA256/{tag} (policy {policy})"
ns3.log.expect(msg)
assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:
@ -171,7 +171,7 @@ def test_ksk_doubleksk_step3(tld, alg, size, ns3):
# Check logs.
tag = keys[2].key.tag
msg = f"keymgr-manual-mode: block transition KSK {zone}/ECDSAP256SHA256/{tag} type DS state HIDDEN to state RUMOURED"
ns3.log.expect(msg)
assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:
@ -252,8 +252,8 @@ def test_ksk_doubleksk_step4(tld, alg, size, ns3):
tag = keys[1].key.tag
msg1 = f"keymgr-manual-mode: block transition KSK {zone}/ECDSAP256SHA256/{tag} type DNSKEY state OMNIPRESENT to state UNRETENTIVE"
msg2 = f"keymgr-manual-mode: block transition KSK {zone}/ECDSAP256SHA256/{tag} type KRRSIG state OMNIPRESENT to state UNRETENTIVE"
ns3.log.expect(msg1)
ns3.log.expect(msg2)
assert msg1 in ns3.log
assert msg2 in ns3.log
# Force step.
tag = keys[2].key.tag

View file

@ -117,7 +117,7 @@ def test_zsk_prepub_step2(tld, alg, size, ns3):
# Check logs.
tag = keys[1].key.tag
msg = f"keymgr-manual-mode: block ZSK rollover for key {zone}/ECDSAP256SHA256/{tag} (policy {policy})"
ns3.log.expect(msg)
assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:
@ -176,7 +176,7 @@ def test_zsk_prepub_step3(tld, alg, size, ns3):
# Check logs.
tag = keys[2].key.tag
msg = f"keymgr-manual-mode: block transition ZSK {zone}/ECDSAP256SHA256/{tag} type ZRRSIG state HIDDEN to state RUMOURED"
ns3.log.expect(msg)
assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:
@ -263,7 +263,7 @@ def test_zsk_prepub_step4(tld, alg, size, ns3):
# Check logs.
tag = keys[1].key.tag
msg = f"keymgr-manual-mode: block transition ZSK {zone}/ECDSAP256SHA256/{tag} type DNSKEY state OMNIPRESENT to state UNRETENTIVE"
ns3.log.expect(msg)
assert msg in ns3.log
# Force step.
tag = keys[2].key.tag

View file

@ -21,7 +21,7 @@ def test_spf_log(ns1):
"zone warn/IN: loaded serial 0",
"zone nowarn/IN: loaded serial 0",
):
ns1.log.expect(msg)
assert msg in ns1.log
for msg in (
"zone nowarn/IN: 'y.nowarn' found type SPF record but no SPF TXT record found",
@ -29,4 +29,4 @@ def test_spf_log(ns1):
"zone warn/IN: 'warn' found type SPF record but no SPF TXT record found",
"zone nowarn/IN: 'nowarn' found type SPF record but no SPF TXT record found",
):
ns1.log.prohibit(msg)
assert msg not in ns1.log