[9.18] new: test: Regex support for logs and cmd output in pytest

Improve and unify the handling of regular expressions when searching in logs, files and command output in system tests.
- Use `Re()` for constructing regular expressions, which is an imported shorthand for `re.compile()` (imported as `from re import compile as Re`
- Add new `isctest.text.Text` interface which is a text wrapper that supports the `in` operator for line matching operation for both strings and regular expressions, e.g.:
  - `assert "running" in ns1.log`
  - `assert Re("a.example..*10.0.0.1") in response.out`
- Use the new `isctest.text.Text` for:
  - `isctest.run.cmd()` output, where `.out` and `.err` can be used for stdout and stderr contents
  - `NamedInstance.log` rather than the previous log interface (`.expect()` and `.prohibit()` is no longer available or needed. The `in` operator along with an `assert` statement can be used now instead.)
  - `NamedInstance.rndc()` output, which returns identical output as `isctest.run.cmd()`

Backport of MR !11054

Merge branch 'backport-nicki/pytest-grep-9.18' into 'bind-9.18'

See merge request isc-projects/bind9!11343
This commit is contained in:
Nicki Křížek 2025-12-08 20:05:57 +01:00
commit 963c9e0cc1
25 changed files with 402 additions and 465 deletions

View file

@ -675,7 +675,7 @@ vulture:
<<: *python_triggering_rules
needs: []
script:
- vulture --exclude "*/ans*/ans.py,conftest.py,get_algorithms.py,isctest" --ignore-names "after_servers_start,bootstrap,pytestmark" bin/tests/system/
- vulture --exclude "*/ans*/ans.py,conftest.py,get_algorithms.py,re_compile_checker.py,isctest" --ignore-names "after_servers_start,bootstrap,pytestmark" bin/tests/system/
ci-variables:
<<: *precheck_job
@ -766,7 +766,7 @@ pylint:
script:
- pylint --rcfile $CI_PROJECT_DIR/.pylintrc $(git ls-files '*.py' | grep -vE '(ans\.py|dangerfile\.py|^bin/tests/system/|^contrib/)')
# Ignore Pylint wrong-import-position error in system test to enable use of pytest.importorskip
- pylint --rcfile $CI_PROJECT_DIR/.pylintrc --disable=wrong-import-position $(git ls-files 'bin/tests/system/*.py' | grep -vE '(ans\.py|vulture_ignore_list\.py)')
- pylint --rcfile $CI_PROJECT_DIR/.pylintrc --load-plugins re_compile_checker --disable=wrong-import-position $(git ls-files 'bin/tests/system/*.py' | grep -vE '(ans\.py|vulture_ignore_list\.py)')
reuse:
<<: *precheck_job

View file

@ -10,12 +10,12 @@
# information regarding copyright ownership.
import concurrent.futures
import os
import subprocess
import time
import pytest
import isctest
pytestmark = pytest.mark.extra_artifacts(
[
"ns*/*.nzf*",
@ -43,20 +43,19 @@ def rndc_loop(test_state, domain, ns3):
["delzone", domain],
]
args = [os.environ["RNDC"]] + ns3.rndc_args.split()
while not test_state["finished"]:
for command in rndc_commands:
ns3.rndc(" ".join(command), ignore_errors=True, log=False)
# avoid using ns3.rndc() directly to avoid log spam
subprocess.run(args + " ".join(command), timeout=10, check=False)
def check_if_server_is_responsive(ns3):
"""
Check if server status can be successfully retrieved using "rndc status"
"""
try:
ns3.rndc("status", log=False)
return True
except isctest.rndc.RNDCException:
return False
cmd = ns3.rndc("status", raise_on_exception=False)
return cmd.rc == 0
def test_rndc_deadlock(servers):

View file

@ -100,10 +100,10 @@ def verify_zone(zone, transfer):
verifier = isctest.run.cmd(verify_cmd)
if verifier.returncode != 0:
if verifier.rc != 0:
isctest.log.error(f"dnssec-verify {zone}. failed")
return verifier.returncode == 0
return verifier.rc == 0
def read_statefile(server, zone):

View file

@ -13,7 +13,7 @@ from functools import partial
import filecmp
import os
from pathlib import Path
import re
from re import compile as Re
import shutil
import subprocess
import tempfile
@ -49,7 +49,7 @@ else:
XDIST_WORKER = os.environ.get("PYTEST_XDIST_WORKER", "")
FILE_DIR = os.path.abspath(Path(__file__).parent)
ENV_RE = re.compile(b"([^=]+)=(.*)")
ENV_RE = Re(b"([^=]+)=(.*)")
PORT_MIN = 5001
PORT_MAX = 32767
PORTS_PER_TEST = 20
@ -62,10 +62,10 @@ PRIORITY_TESTS = [
"timeouts/",
"upforwd/",
]
PRIORITY_TESTS_RE = re.compile("|".join(PRIORITY_TESTS))
PRIORITY_TESTS_RE = Re("|".join(PRIORITY_TESTS))
SYSTEM_TEST_DIR_GIT_PATH = "bin/tests/system"
SYSTEM_TEST_NAME_RE = re.compile(f"{SYSTEM_TEST_DIR_GIT_PATH}" + r"/([^/]+)")
SYMLINK_REPLACEMENT_RE = re.compile(r"/tests(_.*)\.py")
SYSTEM_TEST_NAME_RE = Re(f"{SYSTEM_TEST_DIR_GIT_PATH}" + r"/([^/]+)")
SYMLINK_REPLACEMENT_RE = Re(r"/tests(_.*)\.py")
# ---------------------- Module initialization ---------------------------

View file

@ -13,7 +13,6 @@
import os
import re
import subprocess
import isctest
import pytest
@ -30,20 +29,7 @@ pytestmark = pytest.mark.extra_artifacts(
)
def run_rndc(server, rndc_command):
"""
Send the specified 'rndc_command' to 'server' with a timeout of 10 seconds
"""
rndc = os.getenv("RNDC")
port = os.getenv("CONTROLPORT")
cmdline = [rndc, "-c", "../_common/rndc.conf", "-p", port, "-s", server]
cmdline.extend(rndc_command)
subprocess.check_output(cmdline, stderr=subprocess.STDOUT, timeout=10)
def test_dnstap_dispatch_socket_addresses():
def test_dnstap_dispatch_socket_addresses(ns3):
# Send some query to ns3 so that it records something in its dnstap file.
msg = isctest.query.create("mail.example.", "A")
res = isctest.query.tcp(msg, "10.53.0.2", expected_rcode=dns.rcode.NOERROR)
@ -52,14 +38,14 @@ def test_dnstap_dispatch_socket_addresses():
]
# Before continuing, roll dnstap file to ensure it is flushed to disk.
run_rndc("10.53.0.3", ["dnstap", "-roll", "1"])
ns3.rndc("dnstap -roll 1")
# Move the dnstap file aside so that it is retained for troubleshooting.
os.rename(os.path.join("ns3", "dnstap.out.0"), "dnstap.out.resolver_addresses")
# Read the contents of the dnstap file using dnstap-read.
output = subprocess.check_output(
[os.getenv("DNSTAPREAD"), "dnstap.out.resolver_addresses"], encoding="utf-8"
dnstapread = isctest.run.cmd(
[os.getenv("DNSTAPREAD"), "dnstap.out.resolver_addresses"],
)
# Check whether all frames contain the expected addresses.
@ -74,7 +60,7 @@ def test_dnstap_dispatch_socket_addresses():
bad_frames = []
inspected_frames = 0
addr_regex = r"^10\.53\.0\.[0-9]+:[0-9]{1,5}$"
for line in output.splitlines():
for line in dnstapread.out.splitlines():
_, _, frame_type, addr1, _, addr2, _ = line.split(" ", 6)
# Only inspect RESOLVER_QUERY and RESOLVER_RESPONSE frames.
if frame_type not in ("RQ", "RR"):

View file

@ -25,10 +25,10 @@ def gnutls_cli_executable():
pytest.skip("gnutls-cli not found in PATH")
# Ensure gnutls-cli supports the --logfile command-line option.
output = isctest.run.cmd(
cmd = isctest.run.cmd(
[executable, "--logfile=/dev/null"], log_stderr=False, raise_on_exception=False
).stdout
if b"illegal option" in output:
)
if "illegal option" in cmd.out:
pytest.skip("gnutls-cli does not support the --logfile option")
return executable

View file

@ -12,7 +12,6 @@
from . import check
from . import instance
from . import query
from . import rndc
from . import run
from . import template
from . import log

View file

@ -11,14 +11,15 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from typing import NamedTuple, Optional
from typing import NamedTuple
import logging
import os
from pathlib import Path
import re
from .rndc import RNDCBinaryExecutor, RNDCException, RNDCExecutor
from .log import info, LogFile, WatchLogFromStart, WatchLogFromHere
from .log import WatchLogFromStart, WatchLogFromHere
from .run import CmdResult, EnvCmd
from .text import TextFile
class NamedPorts(NamedTuple):
@ -42,8 +43,6 @@ class NamedInstance:
self,
identifier: str,
ports: NamedPorts = NamedPorts(),
rndc_logger: Optional[logging.Logger] = None,
rndc_executor: Optional[RNDCExecutor] = None,
) -> None:
"""
`identifier` must be an `ns<X>` string, where `<X>` is an integer
@ -52,18 +51,18 @@ class NamedInstance:
`ports` is the `NamedPorts` instance listing the UDP/TCP ports on which
this `named` instance is listening for various types of traffic (both
DNS traffic and RNDC commands).
`rndc_logger` is the `logging.Logger` to use for logging RNDC
commands sent to this `named` instance.
`rndc_executor` is an object implementing the `RNDCExecutor` interface
that is used for executing RNDC commands on this `named` instance.
"""
self.ip = self._identifier_to_ip(identifier)
self.ports = ports
self.log = LogFile(os.path.join(identifier, "named.run"))
self._rndc_executor = rndc_executor or RNDCBinaryExecutor()
self._rndc_logger = rndc_logger
self.log = TextFile(os.path.join(identifier, "named.run"))
self._rndc_conf = Path("../_common/rndc.conf").absolute()
self._rndc = EnvCmd("RNDC", self.rndc_args)
@property
def rndc_args(self) -> str:
"""Base arguments for calling RNDC to control the instance."""
return f"-c {self._rndc_conf} -s {self.ip} -p {self.ports.rndc}"
@staticmethod
def _identifier_to_ip(identifier: str) -> str:
@ -72,52 +71,16 @@ class NamedInstance:
raise ValueError("Invalid named instance identifier" + identifier)
return "10.53.0." + regex_match.group("index")
def rndc(self, command: str, ignore_errors: bool = False, log: bool = True) -> str:
def rndc(self, command: str, timeout=10, **kwargs) -> CmdResult:
"""
Send `command` to this named instance using RNDC. Return the server's
response.
If the RNDC command fails, an `RNDCException` is raised unless
`ignore_errors` is set to `True`.
The RNDC command will be logged to `rndc.log` (along with the server's
response) unless `log` is set to `False`.
```python
def test_foo(servers):
# Send the "status" command to ns1. An `RNDCException` will be
# raised if the RNDC command fails. This command will be logged.
response = servers["ns1"].rndc("status")
# Send the "thaw foo" command to ns2. No exception will be raised
# in case the RNDC command fails. This command will be logged
# (even if it fails).
response = servers["ns2"].rndc("thaw foo", ignore_errors=True)
# Send the "stop" command to ns3. An `RNDCException` will be
# raised if the RNDC command fails, but this command will not be
# logged (the server's response will still be returned to the
# caller, though).
response = servers["ns3"].rndc("stop", log=False)
# Send the "halt" command to ns4 in "fire & forget mode": no
# exceptions will be raised and no logging will take place (the
# server's response will still be returned to the caller, though).
response = servers["ns4"].rndc("stop", ignore_errors=True, log=False)
```
To suppress exceptions, redirect outputs, control logging change
timeout etc. use keyword arguments which are passed to
isctest.cmd.run().
"""
try:
response = self._rndc_executor.call(self.ip, self.ports.rndc, command)
if log:
self._rndc_log(command, response)
except RNDCException as exc:
response = str(exc)
if log:
self._rndc_log(command, response)
if not ignore_errors:
raise
return response
return self._rndc(command, timeout=timeout, **kwargs)
def watch_log_from_start(
self, timeout: float = WatchLogFromStart.DEFAULT_TIMEOUT
@ -137,28 +100,12 @@ class NamedInstance:
"""
return WatchLogFromHere(self.log.path, timeout)
def reconfigure(self, **kwargs) -> None:
def reconfigure(self, **kwargs) -> CmdResult:
"""
Reconfigure this named `instance` and wait until reconfiguration is
finished. Raise an `RNDCException` if reconfiguration fails.
finished.
"""
with self.watch_log_from_here() as watcher:
self.rndc("reconfig", **kwargs)
cmd = self.rndc("reconfig", **kwargs)
watcher.wait_for_line("any newly configured zones are now loaded")
def _rndc_log(self, command: str, response: str) -> None:
"""
Log an `rndc` invocation (and its output) to the `rndc.log` file in the
current working directory.
"""
fmt = '%(ip)s: "%(command)s"\n%(separator)s\n%(response)s%(separator)s'
args = {
"ip": self.ip,
"command": command,
"separator": "-" * 80,
"response": response,
}
if self._rndc_logger is None:
info(fmt, args)
else:
self._rndc_logger.info(fmt, args)
return cmd

View file

@ -21,4 +21,4 @@ from .basic import (
critical,
)
from .watchlog import LogFile, WatchLogFromStart, WatchLogFromHere
from .watchlog import WatchLogFromStart, WatchLogFromHere

View file

@ -9,15 +9,15 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from typing import Any, Iterator, List, Match, Optional, Pattern, TextIO, TypeVar, Union
from typing import Any, List, Match, Optional, Pattern, TextIO, TypeVar, Union
import abc
import os
import re
import time
from isctest.text import compile_pattern, FlexPattern, LineReader
FlexPattern = Union[str, Pattern]
T = TypeVar("T")
OneOrMore = Union[T, List[T]]
@ -30,128 +30,6 @@ class WatchLogTimeout(WatchLogException):
pass
class LogFile:
"""
Log file wrapper with a path and means to find a string in its contents.
"""
def __init__(self, path: str):
self.path = path
@property
def _lines(self) -> Iterator[str]:
with open(self.path, encoding="utf-8") as f:
yield from f
def __contains__(self, substring: str) -> bool:
"""
Return whether any of the lines in the log contains a given string.
"""
for line in self._lines:
if substring in line:
return True
return False
def expect(self, msg: str):
"""Check the string is present anywhere in the log file."""
if msg in self:
return
assert False, f"log message not found in log {self.path}: {msg}"
def prohibit(self, msg: str):
"""Check the string is not present in the entire log file."""
if msg in self:
assert False, f"forbidden message appeared in log {self.path}: {msg}"
class LineReader:
"""
>>> import io
>>> file = io.StringIO("complete line\\n")
>>> line_reader = LineReader(file)
>>> for line in line_reader.readlines():
... print(line.strip())
complete line
>>> file = io.StringIO("complete line\\nand then incomplete line")
>>> line_reader = LineReader(file)
>>> for line in line_reader.readlines():
... print(line.strip())
complete line
>>> file = io.StringIO("complete line\\nand then another complete line\\n")
>>> line_reader = LineReader(file)
>>> for line in line_reader.readlines():
... print(line.strip())
complete line
and then another complete line
>>> file = io.StringIO()
>>> line_reader = LineReader(file)
>>> for chunk in (
... "first line\\nsecond line\\nthi",
... "rd ",
... "line\\nfour",
... "th line\\n\\nfifth line\\n"
... ):
... print("=== OUTER ITERATION ===")
... pos = file.tell()
... print(chunk, end="", file=file)
... _ = file.seek(pos)
... for line in line_reader.readlines():
... print("--- inner iteration ---")
... print(line.strip() or "<blank>")
=== OUTER ITERATION ===
--- inner iteration ---
first line
--- inner iteration ---
second line
=== OUTER ITERATION ===
=== OUTER ITERATION ===
--- inner iteration ---
third line
=== OUTER ITERATION ===
--- inner iteration ---
fourth line
--- inner iteration ---
<blank>
--- inner iteration ---
fifth line
"""
def __init__(self, stream: TextIO):
self._stream = stream
self._linebuf = ""
def readline(self) -> Optional[str]:
"""
Wrapper around io.readline() function to handle unfinished lines.
If a line ends with newline character, it's returned immediately.
If a line doesn't end with a newline character, the read contents are
buffered until the next call of this function and None is returned
instead.
"""
read = self._stream.readline()
if not read.endswith("\n"):
self._linebuf += read
return None
read = self._linebuf + read
self._linebuf = ""
return read
def readlines(self) -> Iterator[str]:
"""
Wrapper around io.readline() which only returns finished lines.
"""
while True:
line = self.readline()
if line is None:
return
yield line
class WatchLog(abc.ABC):
"""
Wait for a log message to appear in a text file.
@ -210,15 +88,7 @@ class WatchLog(abc.ABC):
if not isinstance(strings, list):
strings = [strings]
for string in strings:
if isinstance(string, Pattern):
patterns.append(string)
elif isinstance(string, str):
pattern = re.compile(re.escape(string))
patterns.append(pattern)
else:
raise WatchLogException(
"only string and re.Pattern allowed for matching"
)
patterns.append(compile_pattern(string))
return patterns
def _wait_for_match(self, regexes: List[Pattern]) -> Match:
@ -256,13 +126,14 @@ class WatchLog(abc.ABC):
Recommended use:
```python
from re import compile as Re
import isctest
def test_foo(servers):
with servers["ns1"].watch_log_from_start() as watcher:
watcher.wait_for_line("all zones loaded")
pattern = re.compile(r"next key event in ([0-9]+) seconds")
pattern = Re(r"next key event in ([0-9]+) seconds")
with servers["ns1"].watch_log_from_here() as watcher:
# ... do stuff here ...
match = watcher.wait_for_line(pattern)
@ -321,7 +192,8 @@ class WatchLog(abc.ABC):
>>> # Different values must be returned depending on which line is
>>> # found in the log file.
>>> import tempfile
>>> patterns = [re.compile(r"bar ([0-9])"), "qux"]
>>> from re import compile as Re
>>> patterns = [Re(r"bar ([0-9])"), "qux"]
>>> with tempfile.NamedTemporaryFile("w") as file:
... print("foo bar 3", file=file, flush=True)
... with WatchLogFromStart(file.name) as watcher:
@ -443,7 +315,8 @@ class WatchLog(abc.ABC):
>>> assert ret[1].group(0) == "foo"
>>> import tempfile
>>> bar_pattern = re.compile('bar')
>>> from re import compile as Re
>>> bar_pattern = Re('bar')
>>> patterns = ['foo', bar_pattern]
>>> with tempfile.NamedTemporaryFile("w") as file:
... print("bar", file=file, flush=True)

View file

@ -1,69 +0,0 @@
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import abc
import os
import subprocess
class RNDCExecutor(abc.ABC):
"""
An interface which RNDC executors have to implement in order for the
`NamedInstance` class to be able to use them.
"""
@abc.abstractmethod
def call(self, ip: str, port: int, command: str) -> str:
"""
Send RNDC `command` to the `named` instance at `ip:port` and return the
server's response.
"""
class RNDCException(Exception):
"""
Raised by classes implementing the `RNDCExecutor` interface when sending an
RNDC command fails for any reason.
"""
class RNDCBinaryExecutor(RNDCExecutor):
"""
An `RNDCExecutor` which sends RNDC commands to servers using the `rndc`
binary.
"""
def __init__(self) -> None:
"""
This class needs the `RNDC` environment variable to be set to the path
to the `rndc` binary to use.
"""
rndc_path = os.environ.get("RNDC", "/bin/false")
rndc_conf = os.path.join("..", "_common", "rndc.conf")
self._base_cmdline = [rndc_path, "-c", rndc_conf]
def call(self, ip: str, port: int, command: str) -> str:
"""
Send RNDC `command` to the `named` instance at `ip:port` and return the
server's response.
"""
cmdline = self._base_cmdline[:]
cmdline.extend(["-s", ip])
cmdline.extend(["-p", str(port)])
cmdline.extend(command.split())
try:
return subprocess.check_output(
cmdline, stderr=subprocess.STDOUT, timeout=10, encoding="utf-8"
)
except subprocess.SubprocessError as exc:
msg = getattr(exc, "output", "RNDC exception occurred")
raise RNDCException(msg) from exc

View file

@ -15,11 +15,24 @@ import time
from typing import Optional
import isctest.log
import isctest.text
from isctest.compat import dns_rcode
import dns.message
class CmdResult:
def __init__(self, proc=None):
self.proc = proc
self.rc = self.proc.returncode
self.out = isctest.text.Text("")
self.err = isctest.text.Text("")
if self.proc.stdout:
self.out = isctest.text.Text(self.proc.stdout.decode("utf-8"))
if self.proc.stderr:
self.err = isctest.text.Text(self.proc.stderr.decode("utf-8"))
def cmd(
args,
cwd=None,
@ -31,7 +44,7 @@ def cmd(
input_text: Optional[bytes] = None,
raise_on_exception=True,
env: Optional[dict] = None,
):
) -> CmdResult:
"""Execute a command with given args as subprocess."""
isctest.log.debug(f"isctest.run.cmd(): {' '.join(args)}")
@ -61,24 +74,26 @@ def cmd(
env=env,
)
print_debug_logs(proc)
return proc
return CmdResult(proc)
except subprocess.CalledProcessError as exc:
print_debug_logs(exc)
isctest.log.debug(f"isctest.run.cmd(): (return code) {exc.returncode}")
if raise_on_exception:
raise exc
return exc
return CmdResult(exc)
class Dig:
def __init__(self, base_params: str = ""):
self.base_params = base_params
class EnvCmd:
"""Helper for executing binaries from env with optional base parameters."""
def __call__(self, params: str) -> str:
"""Run the dig command with the given parameters and return the decoded output."""
return cmd(
[os.environ.get("DIG")] + f"{self.base_params} {params}".split(),
).stdout.decode("utf-8")
def __init__(self, name: str, base_params: str = ""):
self.bin_path = os.environ[name]
self.base_params = base_params.split()
def __call__(self, params: str, **kwargs) -> CmdResult:
"""Call the command. Keyword arguments from isctest.run.cmd() are supported."""
args = self.base_params + params.split()
return cmd([self.bin_path] + args, **kwargs)
def retry_with_timeout(func, timeout, delay=1, msg=None):

View file

@ -0,0 +1,178 @@
#!/usr/bin/python3
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import abc
import re
from re import compile as Re
from typing import Iterator, List, Match, Optional, Pattern, TextIO, Union
FlexPattern = Union[str, Pattern]
def compile_pattern(string: FlexPattern) -> Pattern:
if isinstance(string, Pattern):
return string
if isinstance(string, str):
return Re(re.escape(string))
raise TypeError("only string and re.Pattern allowed")
class Grep(abc.ABC):
"""
Implement a grep-like interface for pattern matching in texts and files.
"""
@abc.abstractmethod
def readlines(self) -> Iterator[str]:
raise NotImplementedError
def igrep(self, pattern: FlexPattern) -> Iterator[Match]:
"""
Iterate over the lines matching the pattern.
"""
regex = compile_pattern(pattern)
for line in self.readlines():
match = regex.search(line)
if match:
yield match
def grep(self, pattern: FlexPattern) -> List[Match]:
"""
Get list of lines matching the pattern.
"""
return list(self.igrep(pattern))
def __contains__(self, pattern: FlexPattern) -> bool:
"""
Return whether any of the lines in the log contains matches the pattern.
"""
try:
next(self.igrep(pattern))
except StopIteration:
return False
return True
class Text(Grep, str): # type: ignore
"""
Wrapper around classic string with grep support.
"""
def readlines(self):
yield from self.splitlines(keepends=True)
class TextFile(Grep):
"""
Text file wrapper with grep support.
"""
def __init__(self, path: str):
self.path = path
def readlines(self) -> Iterator[str]:
with open(self.path, encoding="utf-8") as f:
yield from f
def __repr__(self):
return self.path
class LineReader(Grep):
"""
>>> import io
>>> file = io.StringIO("complete line\\n")
>>> line_reader = LineReader(file)
>>> for line in line_reader.readlines():
... print(line.strip())
complete line
>>> file = io.StringIO("complete line\\nand then incomplete line")
>>> line_reader = LineReader(file)
>>> for line in line_reader.readlines():
... print(line.strip())
complete line
>>> file = io.StringIO("complete line\\nand then another complete line\\n")
>>> line_reader = LineReader(file)
>>> for line in line_reader.readlines():
... print(line.strip())
complete line
and then another complete line
>>> file = io.StringIO()
>>> line_reader = LineReader(file)
>>> for chunk in (
... "first line\\nsecond line\\nthi",
... "rd ",
... "line\\nfour",
... "th line\\n\\nfifth line\\n"
... ):
... print("=== OUTER ITERATION ===")
... pos = file.tell()
... print(chunk, end="", file=file)
... _ = file.seek(pos)
... for line in line_reader.readlines():
... print("--- inner iteration ---")
... print(line.strip() or "<blank>")
=== OUTER ITERATION ===
--- inner iteration ---
first line
--- inner iteration ---
second line
=== OUTER ITERATION ===
=== OUTER ITERATION ===
--- inner iteration ---
third line
=== OUTER ITERATION ===
--- inner iteration ---
fourth line
--- inner iteration ---
<blank>
--- inner iteration ---
fifth line
"""
def __init__(self, stream: TextIO):
self._stream = stream
self._linebuf = ""
def readline(self) -> Optional[str]:
"""
Wrapper around io.readline() function to handle unfinished lines.
If a line ends with newline character, it's returned immediately.
If a line doesn't end with a newline character, the read contents are
buffered until the next call of this function and None is returned
instead.
"""
read = self._stream.readline()
if not read.endswith("\n"):
self._linebuf += read
return None
read = self._linebuf + read
self._linebuf = ""
return read
def readlines(self) -> Iterator[str]:
"""
Wrapper around io.readline() which only returns finished lines.
"""
while True:
line = self.readline()
if line is None:
return
yield line

View file

@ -20,7 +20,7 @@ pytestmark = pytest.mark.extra_artifacts(
def test_dig_tcp_keepalive_handling(named_port, servers):
def get_keepalive_options_received():
servers["ns2"].rndc("stats", log=False)
servers["ns2"].rndc("stats")
options_received = 0
with open("ns2/named.stats", "r", encoding="utf-8") as ns2_stats_file:
for line in ns2_stats_file:
@ -28,38 +28,41 @@ def test_dig_tcp_keepalive_handling(named_port, servers):
options_received = line.split()[0]
return int(options_received)
dig = isctest.run.Dig(f"-p {str(named_port)}")
dig = isctest.run.EnvCmd("DIG", f"-p {str(named_port)}")
isctest.log.info("check that dig handles TCP keepalive in query")
assert "; TCP KEEPALIVE" in dig("+qr +keepalive foo.example. @10.53.0.2")
assert "; TCP KEEPALIVE" in dig("+qr +keepalive foo.example. @10.53.0.2").out
isctest.log.info("check that dig added TCP keepalive was received")
assert get_keepalive_options_received() == 1
isctest.log.info("check that TCP keepalive is added for TCP responses")
assert "; TCP KEEPALIVE" in dig("+tcp +keepalive foo.example. @10.53.0.2")
assert "; TCP KEEPALIVE" in dig("+tcp +keepalive foo.example. @10.53.0.2").out
isctest.log.info("check that TCP keepalive requires TCP")
assert "; TCP KEEPALIVE" not in dig("+keepalive foo.example. @10.53.0.2")
assert "; TCP KEEPALIVE" not in dig("+keepalive foo.example. @10.53.0.2").out
isctest.log.info("check the default keepalive value")
assert "; TCP KEEPALIVE: 30.0 secs" in dig(
"+tcp +keepalive foo.example. @10.53.0.3"
assert (
"; TCP KEEPALIVE: 30.0 secs"
in dig("+tcp +keepalive foo.example. @10.53.0.3").out
)
isctest.log.info("check a keepalive configured value")
assert "; TCP KEEPALIVE: 15.0 secs" in dig(
"+tcp +keepalive foo.example. @10.53.0.2"
assert (
"; TCP KEEPALIVE: 15.0 secs"
in dig("+tcp +keepalive foo.example. @10.53.0.2").out
)
isctest.log.info("check a re-configured keepalive value")
response = servers["ns2"].rndc("tcp-timeouts 300 300 300 200", log=False)
assert "tcp-initial-timeout=300" in response
assert "tcp-idle-timeout=300" in response
assert "tcp-keepalive-timeout=300" in response
assert "tcp-advertised-timeout=200" in response
assert "; TCP KEEPALIVE: 20.0 secs" in dig(
"+tcp +keepalive foo.example. @10.53.0.2"
response = servers["ns2"].rndc("tcp-timeouts 300 300 300 200")
assert "tcp-initial-timeout=300" in response.out
assert "tcp-idle-timeout=300" in response.out
assert "tcp-keepalive-timeout=300" in response.out
assert "tcp-advertised-timeout=200" in response.out
assert (
"; TCP KEEPALIVE: 20.0 secs"
in dig("+tcp +keepalive foo.example. @10.53.0.2").out
)
isctest.log.info("check server config entry")

View file

@ -11,7 +11,7 @@
import hashlib
import os
import re
from re import compile as Re
import shutil
import pytest
@ -75,18 +75,16 @@ def token_init_and_cleanup():
)
try:
output = isctest.run.cmd(
token_init_command, env=EMPTY_OPENSSL_CONF_ENV
).stdout.decode("utf-8")
assert "The token has been initialized and is reassigned to slot" in output
cmd = isctest.run.cmd(token_init_command, env=EMPTY_OPENSSL_CONF_ENV)
assert "The token has been initialized and is reassigned to slot" in cmd.out
yield
finally:
output = isctest.run.cmd(
cmd = isctest.run.cmd(
token_cleanup_command,
env=EMPTY_OPENSSL_CONF_ENV,
raise_on_exception=False,
).stdout.decode("utf-8")
assert re.search("Found token (.*) with matching token label", output)
)
assert Re("Found token (.*) with matching token label") in cmd.out
# pylint: disable-msg=too-many-locals
@ -126,11 +124,9 @@ def test_keyfromlabel(alg_name, alg_type, alg_bits):
HSMPIN,
]
output = isctest.run.cmd(
pkcs11_command, env=EMPTY_OPENSSL_CONF_ENV
).stdout.decode("utf-8")
cmd = isctest.run.cmd(pkcs11_command, env=EMPTY_OPENSSL_CONF_ENV)
assert "Key pair generated" in output
assert "Key pair generated" in cmd.out
def keyfromlabel(alg_name, zone, key_id, key_flag):
key_flag = key_flag.split() if key_flag else []
@ -148,18 +144,18 @@ def test_keyfromlabel(alg_name, alg_type, alg_bits):
zone,
]
output = isctest.run.cmd(keyfrlab_command)
output_decoded = output.stdout.decode("utf-8").rstrip() + ".key"
cmd = isctest.run.cmd(keyfrlab_command)
keyfile = cmd.out.rstrip() + ".key"
assert os.path.exists(output_decoded)
assert os.path.exists(keyfile)
return output_decoded
return keyfile
if (
isctest.run.cmd(
[os.environ["SHELL"], "../testcrypto.sh", alg_name],
raise_on_exception=False,
).returncode
).rc
!= 0
):
pytest.skip(f"{alg_name} is not supported")

View file

@ -96,7 +96,7 @@ def test_masterfile_missing_master_file_servfail():
def test_masterfile_owner_inheritance():
"""Test owner inheritance after $INCLUDE"""
checker_output = isctest.run.cmd(
cmd = isctest.run.cmd(
[
os.environ["CHECKZONE"],
"-D",
@ -104,12 +104,12 @@ def test_masterfile_owner_inheritance():
"example",
"zone/inheritownerafterinclude.db",
]
).stdout.decode("utf-8")
)
owner_inheritance_zone = """
example. 0 IN SOA . . 0 0 0 0 0
example. 0 IN TXT "this should be at the zone apex"
example. 0 IN NS .
"""
checker_zone = dns.zone.from_text(checker_output, origin="example.")
checker_zone = dns.zone.from_text(cmd.out, origin="example.")
expected = dns.zone.from_text(owner_inheritance_zone, origin="example.")
isctest.check.zones_equal(checker_zone, expected, compare_ttl=True)

View file

@ -86,10 +86,10 @@ def verify_zone(zone, transfer):
verifier = isctest.run.cmd(verify_cmd)
if verifier.returncode != 0:
if verifier.rc != 0:
isctest.log.error(f"dnssec-verify {zone}. failed")
return verifier.returncode == 0
return verifier.rc == 0
def test_optout(ns2):

View file

@ -0,0 +1,46 @@
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
# pylint: disable=unknown-option-value,re-compile-alias
import re
from astroid import nodes
from pylint.checkers import BaseRawFileChecker
from pylint.lint import PyLinter
class ReCompileChecker(BaseRawFileChecker):
name = "custom_raw"
msgs = {
"R9901": (
"Replace re.compile() with Re() using `from re import compile as Re`",
"re-compile-alias",
(
"Use a Re() alias instead of re.compile() by importing the "
"re.compile() function as Re()"
),
),
}
options = ()
def process_module(self, node: nodes.Module) -> None:
pattern = re.compile(r"re\.compile\(")
with node.stream() as stream:
for lineno, line in enumerate(stream):
if pattern.search(line.decode("utf-8")):
self.add_message("re-compile-alias", line=lineno)
def register(linter: PyLinter) -> None:
linter.register_checker(ReCompileChecker(linter))

View file

@ -121,22 +121,18 @@ pytestmark = pytest.mark.extra_artifacts(
],
)
def test_rrchecker_list_standard_names(option, expected_result):
stdout = isctest.run.cmd([os.environ["RRCHECKER"], option]).stdout.decode("utf-8")
values = [line for line in stdout.split("\n") if line.strip()]
cmd = isctest.run.cmd([os.environ["RRCHECKER"], option])
values = [line for line in cmd.out.split("\n") if line.strip()]
assert sorted(values) == sorted(expected_result)
def run_rrchecker(option, rr_class, rr_type, rr_rest):
rrchecker_output = (
isctest.run.cmd(
[os.environ["RRCHECKER"], option],
input_text=f"{rr_class} {rr_type} {rr_rest}".encode("utf-8"),
)
.stdout.decode("utf-8")
.strip()
cmd = isctest.run.cmd(
[os.environ["RRCHECKER"], option],
input_text=f"{rr_class} {rr_type} {rr_rest}".encode("utf-8"),
)
return rrchecker_output.split()
return cmd.out.strip().split()
@pytest.mark.parametrize(
@ -162,7 +158,7 @@ def test_rrchecker_conversions(option):
".",
tempzone_file,
],
).stdout.decode("utf-8")
).out
checkzone_output = [
line for line in checkzone_output.splitlines() if not line.startswith(";")
]

View file

@ -71,11 +71,8 @@ def do_work(named_proc, resolver_ip, instance, kill_method, n_workers, n_queries
# helper function, 'command' is the rndc command to run
def launch_rndc(command):
try:
instance.rndc(command, log=False)
return 0
except isctest.rndc.RNDCException:
return -1
ret = instance.rndc(command, raise_on_exception=False)
return 0 if ret.rc == 0 else -1
# We're going to execute queries in parallel by means of a thread pool.
# dnspython functions block, so we need to circumvent that.

View file

@ -13,7 +13,7 @@ import pytest
@pytest.mark.requires_zones_loaded("ns1")
def test_spf_log(servers):
def test_spf_log(ns1):
for msg in (
"zone spf/IN: 'y.spf' found type SPF record but no SPF TXT record found",
"zone warn/IN: 'y.warn' found type SPF record but no SPF TXT record found",
@ -21,7 +21,7 @@ def test_spf_log(servers):
"zone warn/IN: loaded serial 0",
"zone nowarn/IN: loaded serial 0",
):
servers["ns1"].log.expect(msg)
assert msg in ns1.log
for msg in (
"zone nowarn/IN: 'y.nowarn' found type SPF record but no SPF TXT record found",
@ -29,4 +29,4 @@ def test_spf_log(servers):
"zone warn/IN: 'warn' found type SPF record but no SPF TXT record found",
"zone nowarn/IN: 'nowarn' found type SPF record but no SPF TXT record found",
):
servers["ns1"].log.prohibit(msg)
assert msg not in ns1.log

View file

@ -10,7 +10,6 @@
# information regarding copyright ownership.
import concurrent.futures
import os
import time
import dns.update
@ -28,22 +27,8 @@ pytestmark = pytest.mark.extra_artifacts(
def rndc_loop(test_state, server):
rndc = os.getenv("RNDC")
port = os.getenv("CONTROLPORT")
cmdline = [
rndc,
"-c",
"../_common/rndc.conf",
"-p",
port,
"-s",
server,
"reload",
]
while not test_state["finished"]:
isctest.run.cmd(cmdline, raise_on_exception=False)
server.rndc("reload", raise_on_exception=False)
time.sleep(1)

View file

@ -51,16 +51,12 @@ def test_nsec3_hashes(domain, nsec3hash):
algorithm = "1"
iterations = "12"
output = isctest.run.cmd(
[NSEC3HASH, salt, algorithm, iterations, domain]
).stdout.decode("utf-8")
assert nsec3hash in output
cmd = isctest.run.cmd([NSEC3HASH, salt, algorithm, iterations, domain])
assert nsec3hash in cmd.out
flags = "0"
output = isctest.run.cmd(
[NSEC3HASH, "-r", algorithm, flags, iterations, salt, domain]
).stdout.decode("utf-8")
assert nsec3hash in output
cmd = isctest.run.cmd([NSEC3HASH, "-r", algorithm, flags, iterations, salt, domain])
assert nsec3hash in cmd.out
@pytest.mark.parametrize(
@ -77,11 +73,11 @@ def test_nsec3_empty_salt(salt_emptiness_args):
iterations = "0"
domain = "com"
output = isctest.run.cmd(
cmd = isctest.run.cmd(
[NSEC3HASH] + salt_emptiness_args + [algorithm, iterations, domain]
).stdout.decode("utf-8")
assert "CK0POJMG874LJREF7EFN8430QVIT8BSM" in output
assert "salt=-" in output
)
assert "CK0POJMG874LJREF7EFN8430QVIT8BSM" in cmd.out
assert "salt=-" in cmd.out
@pytest.mark.parametrize(
@ -97,7 +93,7 @@ def test_nsec3_empty_salt_r(salt_emptiness_arg):
iterations = "0"
domain = "com"
output = isctest.run.cmd(
cmd = isctest.run.cmd(
[
NSEC3HASH,
"-r",
@ -107,8 +103,8 @@ def test_nsec3_empty_salt_r(salt_emptiness_arg):
salt_emptiness_arg,
domain,
]
).stdout.decode("utf-8")
assert " - CK0POJMG874LJREF7EFN8430QVIT8BSM" in output
)
assert " - CK0POJMG874LJREF7EFN8430QVIT8BSM" in cmd.out
@pytest.mark.parametrize(
@ -144,10 +140,8 @@ def test_nsec3hash_acceptable_values(domain, it, salt_bytes) -> None:
)
# calculate the hash using nsec3hash:
output = isctest.run.cmd(
[NSEC3HASH, salt_text, "1", str(it), str(domain)]
).stdout.decode("ascii")
hash2 = output.partition(" ")[0]
cmd = isctest.run.cmd([NSEC3HASH, salt_text, "1", str(it), str(domain)])
hash2 = cmd.out.partition(" ")[0]
assert hash1 == hash2

View file

@ -11,6 +11,7 @@
import os
import re
from re import compile as Re
import pytest
@ -61,14 +62,14 @@ def test_verify_good_zone_nsec_next_name_case_mismatch():
)
def get_bad_zone_output(zone):
only_opt = ["-z"] if re.match(r"[zk]sk-only", zone) else []
output = isctest.run.cmd(
def verify_bad_zone(zone):
only_opt = ["-z"] if re.search(r"^[zk]sk-only", zone) else []
cmd = isctest.run.cmd(
[VERIFY, *only_opt, "-o", zone, f"zones/{zone}.bad"],
raise_on_exception=False,
)
stream = (output.stdout + output.stderr).decode("utf-8").replace("\n", "")
return stream
assert cmd.rc != 0
return cmd
@pytest.mark.parametrize(
@ -80,7 +81,8 @@ def get_bad_zone_output(zone):
],
)
def test_verify_bad_zone_files_dnskeyonly(zone):
assert re.match(r".*DNSKEY is not signed.*", get_bad_zone_output(zone))
cmd = verify_bad_zone(zone)
assert "DNSKEY is not signed" in cmd.err
@pytest.mark.parametrize(
@ -97,10 +99,8 @@ def test_verify_bad_zone_files_dnskeyonly(zone):
],
)
def test_verify_bad_zone_files_expired(zone):
assert re.match(
r".*signature has expired.*|.*No self-signed .*DNSKEY found.*",
get_bad_zone_output(zone),
)
cmd = verify_bad_zone(zone)
assert Re("signature has expired|No self-signed DNSKEY found") in cmd.err
@pytest.mark.parametrize(
@ -112,40 +112,33 @@ def test_verify_bad_zone_files_expired(zone):
],
)
def test_verify_bad_zone_files_unexpected_nsec_rrset(zone):
assert re.match(r".*unexpected NSEC RRset at.*", get_bad_zone_output(zone))
cmd = verify_bad_zone(zone)
assert "unexpected NSEC RRset at" in cmd.err
def test_verify_bad_zone_files_bad_nsec_record():
assert re.match(
r".*Bad NSEC record for.*, next name mismatch.*",
get_bad_zone_output("ksk+zsk.nsec.broken-chain"),
)
cmd = verify_bad_zone("ksk+zsk.nsec.broken-chain")
assert Re("Bad NSEC record for.*, next name mismatch") in cmd.err
def test_verify_bad_zone_files_bad_bitmap():
assert re.match(
r".*bit map mismatch.*", get_bad_zone_output("ksk+zsk.nsec.bad-bitmap")
)
cmd = verify_bad_zone("ksk+zsk.nsec.bad-bitmap")
assert "bit map mismatch" in cmd.err
def test_verify_bad_zone_files_missing_nsec3_record():
assert re.match(
r".*Missing NSEC3 record for.*",
get_bad_zone_output("ksk+zsk.nsec3.missing-empty"),
)
cmd = verify_bad_zone("ksk+zsk.nsec3.missing-empty")
assert "Missing NSEC3 record for" in cmd.err
def test_verify_bad_zone_files_no_dnssec_keys():
assert re.match(
r".*Zone contains no DNSSEC keys.*", get_bad_zone_output("unsigned")
)
cmd = verify_bad_zone("unsigned")
assert "Zone contains no DNSSEC keys" in cmd.err
def test_verify_bad_zone_files_unequal_nsec3_chains():
assert re.match(
r".*Expected and found NSEC3 chains not equal.*",
get_bad_zone_output("ksk+zsk.nsec3.extra-nsec3"),
)
cmd = verify_bad_zone("ksk+zsk.nsec3.extra-nsec3")
assert "Expected and found NSEC3 chains not equal" in cmd.err
# checking error message when -o is not used
@ -153,19 +146,17 @@ def test_verify_bad_zone_files_unequal_nsec3_chains():
def test_verify_soa_not_at_top_error():
# when -o is not used, origin is set to zone file name,
# which should cause an error in this case
output = isctest.run.cmd(
[VERIFY, "zones/ksk+zsk.nsec.good"], raise_on_exception=False
).stderr.decode("utf-8")
assert "not at top of zone" in output
assert "use -o to specify a different zone origin" in output
cmd = isctest.run.cmd([VERIFY, "zones/ksk+zsk.nsec.good"], raise_on_exception=False)
assert "not at top of zone" in cmd.err
assert "use -o to specify a different zone origin" in cmd.err
# checking error message when an invalid -o is specified
# and a SOA record not at top of zone is found
def test_verify_invalid_o_option_soa_not_at_top_error():
output = isctest.run.cmd(
cmd = isctest.run.cmd(
[VERIFY, "-o", "invalid.origin", "zones/ksk+zsk.nsec.good"],
raise_on_exception=False,
).stderr.decode("utf-8")
assert "not at top of zone" in output
assert "use -o to specify a different zone origin" not in output
)
assert "not at top of zone" in cmd.err
assert "use -o to specify a different zone origin" not in cmd.err

View file

@ -12,6 +12,7 @@
import glob
import os
import re
from re import compile as Re
import shutil
import signal
import time
@ -71,7 +72,7 @@ def test_xferquota(named_port, servers):
isctest.check.rrsets_equal(ns1response.answer, ns2response.answer)
query_and_compare(axfr_msg)
pattern = re.compile(
pattern = Re(
f"transfer of 'changing/IN' from 10.53.0.1#{named_port}: "
f"Transfer completed: .*\\(serial 2\\)"
)