Refactor LogFile into TextFile with Grep support

Add a new Grep-like interface which can be used for searching for
regular expressions in files. Replace the prior LogFile used for named
logs with the new TextFile interface.

(cherry picked from commit 7743bab5fc)
This commit is contained in:
Nicki Křížek 2025-10-02 17:06:25 +02:00
parent 98f5c5774d
commit 249dbeddf8
15 changed files with 88 additions and 67 deletions

View file

@ -21,10 +21,11 @@ import re
import dns.message
import dns.rcode
from .log import debug, info, LogFile, WatchLogFromStart, WatchLogFromHere
from .log import debug, info, WatchLogFromStart, WatchLogFromHere
from .rndc import RNDCBinaryExecutor, RNDCException, RNDCExecutor
from .run import perl
from .query import udp
from .text import TextFile
class NamedPorts(NamedTuple):
@ -87,7 +88,7 @@ class NamedInstance:
if ports is None:
ports = NamedPorts.from_env()
self.ports = ports
self.log = LogFile(os.path.join(identifier, "named.run"))
self.log = TextFile(os.path.join(identifier, "named.run"))
self._rndc_executor = rndc_executor or RNDCBinaryExecutor()
self._rndc_logger = rndc_logger
@ -239,3 +240,6 @@ class NamedInstance:
f"{os.environ['srcdir']}/start.pl",
[self.system_test_name, self.identifier] + args,
)
def __repr__(self):
return self.identifier

View file

@ -1078,9 +1078,8 @@ def check_cdslog(server, zone, key, substr):
def check_cdslog_prohibit(server, zone, key, substr):
server.log.prohibit(
f"{substr} for key {zone}/{key.algorithm.name}/{key.tag} is now published"
)
msg = f"{substr} for key {zone}/{key.algorithm.name}/{key.tag} is now published"
assert msg not in server.log
def check_cdsdelete(rrset, expected):

View file

@ -23,4 +23,4 @@ from .basic import (
critical,
)
from .watchlog import LogFile, WatchLogFromStart, WatchLogFromHere
from .watchlog import WatchLogFromStart, WatchLogFromHere

View file

@ -15,7 +15,7 @@ import abc
import os
import time
from isctest.text import compile_pattern, FlexPattern, LineReader, LogFile
from isctest.text import compile_pattern, FlexPattern, LineReader
T = TypeVar("T")

View file

@ -14,7 +14,7 @@
import abc
import re
from re import compile as Re
from typing import Iterator, Match, Optional, Pattern, TextIO, Union
from typing import Iterator, List, Match, Optional, Pattern, TextIO, Union
FlexPattern = Union[str, Pattern]
@ -28,41 +28,60 @@ def compile_pattern(string: FlexPattern) -> Pattern:
raise TypeError("only string and re.Pattern allowed")
class LogFile:
class Grep(abc.ABC):
"""
Log file wrapper with a path and means to find a string in its contents.
Implement a grep-like interface for pattern matching in texts and files.
"""
@abc.abstractmethod
def readlines(self) -> Iterator[str]:
raise NotImplementedError
def igrep(self, pattern: FlexPattern) -> Iterator[Match]:
"""
Iterate over the lines matching the pattern.
"""
regex = compile_pattern(pattern)
for line in self.readlines():
match = regex.search(line)
if match:
yield match
def grep(self, pattern: FlexPattern) -> List[Match]:
"""
Get list of lines matching the pattern.
"""
return list(self.igrep(pattern))
def __contains__(self, pattern: FlexPattern) -> bool:
"""
Return whether any of the lines in the log contains matches the pattern.
"""
try:
next(self.igrep(pattern))
except StopIteration:
return False
return True
class TextFile(Grep):
"""
Text file wrapper with grep support.
"""
def __init__(self, path: str):
self.path = path
@property
def _lines(self) -> Iterator[str]:
def readlines(self) -> Iterator[str]:
with open(self.path, encoding="utf-8") as f:
yield from f
def __contains__(self, substring: str) -> bool:
"""
Return whether any of the lines in the log contains a given string.
"""
for line in self._lines:
if substring in line:
return True
return False
def expect(self, msg: str):
"""Check the string is present anywhere in the log file."""
if msg in self:
return
assert False, f"log message not found in log {self.path}: {msg}"
def prohibit(self, msg: str):
"""Check the string is not present in the entire log file."""
if msg in self:
assert False, f"forbidden message appeared in log {self.path}: {msg}"
def __repr__(self):
return self.path
class LineReader:
class LineReader(Grep):
"""
>>> import io

View file

@ -1600,7 +1600,7 @@ def test_kasp_zsk_retired(ns3):
watcher.wait_for_line(f"keymgr: {zone} done")
msg = f"zone {zone}/IN (signed): zone_rekey:zone_verifykeys failed: some key files are missing"
ns3.log.prohibit(msg)
assert msg not in ns3.log
def test_kasp_purge_keys(ns4):
@ -1624,10 +1624,10 @@ def test_kasp_purge_keys(ns4):
watcher.wait_for_line(f"keymgr: {zone} done")
msg = f"zone {zone}/IN/example1 (signed): zone_rekey:zone_verifykeys failed: some key files are missing"
ns4.log.prohibit(msg)
assert msg not in ns4.log
msg = f"zone {zone}/IN/example2 (signed): zone_rekey:zone_verifykeys failed: some key files are missing"
ns4.log.prohibit(msg)
assert msg not in ns4.log
def test_kasp_reload_restart(ns6):
@ -1729,7 +1729,7 @@ def test_kasp_manual_mode(ns3):
# Key rollover should have been be blocked.
tag = expected[1].key.tag
blockmsg = f"keymgr-manual-mode: block ZSK rollover for key {zone}/ECDSAP256SHA256/{tag} (policy {policy})"
ns3.log.expect(blockmsg)
assert blockmsg in ns3.log
# Remove files.
for key in ksks + zsks:

View file

@ -179,7 +179,7 @@ def check_add_zsk(server, zone, keys, expected, extra_keys, extra, primary=None)
isctest.log.info(
f"- zone {zone} {server.identifier}: make sure we did not try to sign with the keys added with nsupdate"
)
server.log.prohibit(f"dns_zone_findkeys: error reading ./K{zone}")
assert f"dns_zone_findkeys: error reading ./K{zone}" not in server.log
# Trigger keymgr.
with server.watch_log_from_here() as watcher:
@ -189,7 +189,7 @@ def check_add_zsk(server, zone, keys, expected, extra_keys, extra, primary=None)
# Check again.
isctest.log.info(f"- zone {zone} {server.identifier}: check again after keymgr run")
check_dnssec(server, zone, keys + extra_keys, expected + extra)
server.log.prohibit(f"dns_zone_findkeys: error reading ./K{zone}")
assert f"dns_zone_findkeys: error reading ./K{zone}" not in server.log
def _check_remove_zsk_fail(

View file

@ -82,8 +82,8 @@ def test_algoroll_csk_reconfig_step1(tld, ns6, alg, size):
tag = keys[0].key.tag
msg1 = f"keymgr-manual-mode: block retire DNSKEY {zone}/RSASHA256/{tag} (CSK)"
msg2 = f"keymgr-manual-mode: block new key generation for zone {zone} (policy {policy})"
ns6.log.expect(msg1)
ns6.log.expect(msg2)
assert msg1 in ns6.log
assert msg2 in ns6.log
# Force step.
with ns6.watch_log_from_here() as watcher:
@ -175,7 +175,7 @@ def test_algoroll_csk_reconfig_step3(tld, ns6, alg, size):
# Check logs.
tag = keys[1].key.tag
msg = f"keymgr-manual-mode: block transition CSK {zone}/ECDSAP256SHA256/{tag} type DS state HIDDEN to state RUMOURED"
ns6.log.expect(msg)
assert msg in ns6.log
# Force step.
with ns6.watch_log_from_here() as watcher:
@ -244,7 +244,7 @@ def test_algoroll_csk_reconfig_step4(tld, ns6, alg, size):
# Check logs.
tag = keys[0].key.tag
msg = f"keymgr-manual-mode: block transition CSK {zone}/RSASHA256/{tag} type DNSKEY state OMNIPRESENT to state UNRETENTIVE"
ns6.log.expect(msg)
assert msg in ns6.log
# Force step.
tag = keys[1].key.tag

View file

@ -85,9 +85,9 @@ def test_algoroll_ksk_zsk_reconfig_step1(tld, ns6, alg, size):
msg1 = f"keymgr-manual-mode: block retire DNSKEY {zone}/RSASHA256/{ktag} (KSK)"
msg2 = f"keymgr-manual-mode: block retire DNSKEY {zone}/RSASHA256/{ztag} (ZSK)"
msg3 = f"keymgr-manual-mode: block new key generation for zone {zone} (policy {policy})" # twice
ns6.log.expect(msg1)
ns6.log.expect(msg2)
ns6.log.expect(msg3)
assert msg1 in ns6.log
assert msg2 in ns6.log
assert len(ns6.log.grep(msg3)) >= 2
# Force step.
with ns6.watch_log_from_here() as watcher:
@ -184,7 +184,7 @@ def test_algoroll_ksk_zsk_reconfig_step3(tld, ns6, alg, size):
# Check logs.
tag = keys[2].key.tag
msg = f"keymgr-manual-mode: block transition KSK {zone}/ECDSAP256SHA256/{tag} type DS state HIDDEN to state RUMOURED"
ns6.log.expect(msg)
assert msg in ns6.log
# Force step.
with ns6.watch_log_from_here() as watcher:
@ -259,8 +259,8 @@ def test_algoroll_ksk_zsk_reconfig_step4(tld, ns6, alg, size):
ztag = keys[1].key.tag
msg1 = f"keymgr-manual-mode: block transition KSK {zone}/RSASHA256/{ktag} type DNSKEY state OMNIPRESENT to state UNRETENTIVE"
msg2 = f"keymgr-manual-mode: block transition ZSK {zone}/RSASHA256/{ztag} type DNSKEY state OMNIPRESENT to state UNRETENTIVE"
ns6.log.expect(msg1)
ns6.log.expect(msg2)
assert msg1 in ns6.log
assert msg2 in ns6.log
# Force step.
ktag = keys[3].key.tag

View file

@ -125,7 +125,7 @@ def test_csk_roll1_step2(tld, alg, size, ns3):
# Check logs.
tag = keys[0].key.tag
msg = f"keymgr-manual-mode: block CSK rollover for key {zone}/ECDSAP256SHA256/{tag} (policy {policy})"
ns3.log.expect(msg)
assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:
@ -185,7 +185,7 @@ def test_csk_roll1_step3(tld, alg, size, ns3):
# Check logs.
tag = keys[1].key.tag
msg = f"keymgr-manual-mode: block transition CSK {zone}/ECDSAP256SHA256/{tag} type ZRRSIG state HIDDEN to state RUMOURED"
ns3.log.expect(msg)
assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:
@ -274,8 +274,7 @@ def test_csk_roll1_step4(tld, alg, size, ns3):
# Check logs.
tag = keys[0].key.tag
msg = f"keymgr-manual-mode: block transition CSK {zone}/ECDSAP256SHA256/{tag} type KRRSIG state OMNIPRESENT to state UNRETENTIVE"
ns3.log.expect(msg)
assert msg in ns3.log
# Force step.
tag = keys[1].key.tag

View file

@ -128,7 +128,7 @@ def test_csk_roll2_step2(tld, alg, size, ns3):
# Check logs.
tag = keys[0].key.tag
msg = f"keymgr-manual-mode: block CSK rollover for key {zone}/ECDSAP256SHA256/{tag} (policy {policy})"
ns3.log.expect(msg)
assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:
@ -188,7 +188,7 @@ def test_csk_roll2_step3(tld, alg, size, ns3):
# Check logs.
tag = keys[1].key.tag
msg = f"keymgr-manual-mode: block transition CSK {zone}/ECDSAP256SHA256/{tag} type ZRRSIG state HIDDEN to state RUMOURED"
ns3.log.expect(msg)
assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:
@ -315,8 +315,8 @@ def test_csk_roll2_step5(tld, alg, size, ns3):
tag = keys[0].key.tag
msg1 = f"keymgr-manual-mode: block transition CSK {zone}/ECDSAP256SHA256/{tag} type DNSKEY state OMNIPRESENT to state UNRETENTIVE"
msg2 = f"keymgr-manual-mode: block transition CSK {zone}/ECDSAP256SHA256/{tag} type KRRSIG state OMNIPRESENT to state UNRETENTIVE"
ns3.log.expect(msg1)
ns3.log.expect(msg2)
assert msg1 in ns3.log
assert msg2 in ns3.log
# Force step.
tag = keys[1].key.tag

View file

@ -74,7 +74,7 @@ def test_rollover_enable_dnssec_step1(tld, alg, size, ns3):
# Check logs.
msg = f"keymgr-manual-mode: block new key generation for zone {zone} (policy {policy})"
ns3.log.expect(msg)
assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:
@ -155,7 +155,7 @@ def test_rollover_enable_dnssec_step3(tld, alg, size, ns3):
# Check logs.
tag = keys[0].key.tag
msg = f"keymgr-manual-mode: block transition CSK {zone}/ECDSAP256SHA256/{tag} type DS state HIDDEN to state RUMOURED"
ns3.log.expect(msg)
assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:

View file

@ -110,7 +110,7 @@ def test_ksk_doubleksk_step2(tld, alg, size, ns3):
# Check logs.
tag = keys[1].key.tag
msg = f"keymgr-manual-mode: block KSK rollover for key {zone}/ECDSAP256SHA256/{tag} (policy {policy})"
ns3.log.expect(msg)
assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:
@ -171,7 +171,7 @@ def test_ksk_doubleksk_step3(tld, alg, size, ns3):
# Check logs.
tag = keys[2].key.tag
msg = f"keymgr-manual-mode: block transition KSK {zone}/ECDSAP256SHA256/{tag} type DS state HIDDEN to state RUMOURED"
ns3.log.expect(msg)
assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:
@ -252,8 +252,8 @@ def test_ksk_doubleksk_step4(tld, alg, size, ns3):
tag = keys[1].key.tag
msg1 = f"keymgr-manual-mode: block transition KSK {zone}/ECDSAP256SHA256/{tag} type DNSKEY state OMNIPRESENT to state UNRETENTIVE"
msg2 = f"keymgr-manual-mode: block transition KSK {zone}/ECDSAP256SHA256/{tag} type KRRSIG state OMNIPRESENT to state UNRETENTIVE"
ns3.log.expect(msg1)
ns3.log.expect(msg2)
assert msg1 in ns3.log
assert msg2 in ns3.log
# Force step.
tag = keys[2].key.tag

View file

@ -117,7 +117,7 @@ def test_zsk_prepub_step2(tld, alg, size, ns3):
# Check logs.
tag = keys[1].key.tag
msg = f"keymgr-manual-mode: block ZSK rollover for key {zone}/ECDSAP256SHA256/{tag} (policy {policy})"
ns3.log.expect(msg)
assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:
@ -176,7 +176,7 @@ def test_zsk_prepub_step3(tld, alg, size, ns3):
# Check logs.
tag = keys[2].key.tag
msg = f"keymgr-manual-mode: block transition ZSK {zone}/ECDSAP256SHA256/{tag} type ZRRSIG state HIDDEN to state RUMOURED"
ns3.log.expect(msg)
assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:
@ -263,7 +263,7 @@ def test_zsk_prepub_step4(tld, alg, size, ns3):
# Check logs.
tag = keys[1].key.tag
msg = f"keymgr-manual-mode: block transition ZSK {zone}/ECDSAP256SHA256/{tag} type DNSKEY state OMNIPRESENT to state UNRETENTIVE"
ns3.log.expect(msg)
assert msg in ns3.log
# Force step.
tag = keys[2].key.tag

View file

@ -21,7 +21,7 @@ def test_spf_log(ns1):
"zone warn/IN: loaded serial 0",
"zone nowarn/IN: loaded serial 0",
):
ns1.log.expect(msg)
assert msg in ns1.log
for msg in (
"zone nowarn/IN: 'y.nowarn' found type SPF record but no SPF TXT record found",
@ -29,4 +29,4 @@ def test_spf_log(ns1):
"zone warn/IN: 'warn' found type SPF record but no SPF TXT record found",
"zone nowarn/IN: 'nowarn' found type SPF record but no SPF TXT record found",
):
ns1.log.prohibit(msg)
assert msg not in ns1.log