diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index a23aed8cb6..97499fdab5 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -675,7 +675,7 @@ vulture: <<: *python_triggering_rules needs: [] script: - - vulture --exclude "*/ans*/ans.py,conftest.py,get_algorithms.py,isctest" --ignore-names "after_servers_start,bootstrap,pytestmark" bin/tests/system/ + - vulture --exclude "*/ans*/ans.py,conftest.py,get_algorithms.py,re_compile_checker.py,isctest" --ignore-names "after_servers_start,bootstrap,pytestmark" bin/tests/system/ ci-variables: <<: *precheck_job @@ -766,7 +766,7 @@ pylint: script: - pylint --rcfile $CI_PROJECT_DIR/.pylintrc $(git ls-files '*.py' | grep -vE '(ans\.py|dangerfile\.py|^bin/tests/system/|^contrib/)') # Ignore Pylint wrong-import-position error in system test to enable use of pytest.importorskip - - pylint --rcfile $CI_PROJECT_DIR/.pylintrc --disable=wrong-import-position $(git ls-files 'bin/tests/system/*.py' | grep -vE '(ans\.py|vulture_ignore_list\.py)') + - pylint --rcfile $CI_PROJECT_DIR/.pylintrc --load-plugins re_compile_checker --disable=wrong-import-position $(git ls-files 'bin/tests/system/*.py' | grep -vE '(ans\.py|vulture_ignore_list\.py)') reuse: <<: *precheck_job diff --git a/bin/tests/system/addzone/tests_rndc_deadlock.py b/bin/tests/system/addzone/tests_rndc_deadlock.py index 3b987d3912..b65a074401 100755 --- a/bin/tests/system/addzone/tests_rndc_deadlock.py +++ b/bin/tests/system/addzone/tests_rndc_deadlock.py @@ -10,12 +10,12 @@ # information regarding copyright ownership. import concurrent.futures +import os +import subprocess import time import pytest -import isctest - pytestmark = pytest.mark.extra_artifacts( [ "ns*/*.nzf*", @@ -43,20 +43,19 @@ def rndc_loop(test_state, domain, ns3): ["delzone", domain], ] + args = [os.environ["RNDC"]] + ns3.rndc_args.split() while not test_state["finished"]: for command in rndc_commands: - ns3.rndc(" ".join(command), ignore_errors=True, log=False) + # avoid using ns3.rndc() directly to avoid log spam + subprocess.run(args + " ".join(command), timeout=10, check=False) def check_if_server_is_responsive(ns3): """ Check if server status can be successfully retrieved using "rndc status" """ - try: - ns3.rndc("status", log=False) - return True - except isctest.rndc.RNDCException: - return False + cmd = ns3.rndc("status", raise_on_exception=False) + return cmd.rc == 0 def test_rndc_deadlock(servers): diff --git a/bin/tests/system/checkds/tests_checkds.py b/bin/tests/system/checkds/tests_checkds.py index d946634913..8e6660a6ef 100755 --- a/bin/tests/system/checkds/tests_checkds.py +++ b/bin/tests/system/checkds/tests_checkds.py @@ -100,10 +100,10 @@ def verify_zone(zone, transfer): verifier = isctest.run.cmd(verify_cmd) - if verifier.returncode != 0: + if verifier.rc != 0: isctest.log.error(f"dnssec-verify {zone}. failed") - return verifier.returncode == 0 + return verifier.rc == 0 def read_statefile(server, zone): diff --git a/bin/tests/system/conftest.py b/bin/tests/system/conftest.py index b3b790c11f..3516e56173 100644 --- a/bin/tests/system/conftest.py +++ b/bin/tests/system/conftest.py @@ -13,7 +13,7 @@ from functools import partial import filecmp import os from pathlib import Path -import re +from re import compile as Re import shutil import subprocess import tempfile @@ -49,7 +49,7 @@ else: XDIST_WORKER = os.environ.get("PYTEST_XDIST_WORKER", "") FILE_DIR = os.path.abspath(Path(__file__).parent) -ENV_RE = re.compile(b"([^=]+)=(.*)") +ENV_RE = Re(b"([^=]+)=(.*)") PORT_MIN = 5001 PORT_MAX = 32767 PORTS_PER_TEST = 20 @@ -62,10 +62,10 @@ PRIORITY_TESTS = [ "timeouts/", "upforwd/", ] -PRIORITY_TESTS_RE = re.compile("|".join(PRIORITY_TESTS)) +PRIORITY_TESTS_RE = Re("|".join(PRIORITY_TESTS)) SYSTEM_TEST_DIR_GIT_PATH = "bin/tests/system" -SYSTEM_TEST_NAME_RE = re.compile(f"{SYSTEM_TEST_DIR_GIT_PATH}" + r"/([^/]+)") -SYMLINK_REPLACEMENT_RE = re.compile(r"/tests(_.*)\.py") +SYSTEM_TEST_NAME_RE = Re(f"{SYSTEM_TEST_DIR_GIT_PATH}" + r"/([^/]+)") +SYMLINK_REPLACEMENT_RE = Re(r"/tests(_.*)\.py") # ---------------------- Module initialization --------------------------- diff --git a/bin/tests/system/dnstap/tests_dnstap.py b/bin/tests/system/dnstap/tests_dnstap.py index 9dec5ddcc1..3c179c2825 100644 --- a/bin/tests/system/dnstap/tests_dnstap.py +++ b/bin/tests/system/dnstap/tests_dnstap.py @@ -13,7 +13,6 @@ import os import re -import subprocess import isctest import pytest @@ -30,20 +29,7 @@ pytestmark = pytest.mark.extra_artifacts( ) -def run_rndc(server, rndc_command): - """ - Send the specified 'rndc_command' to 'server' with a timeout of 10 seconds - """ - rndc = os.getenv("RNDC") - port = os.getenv("CONTROLPORT") - - cmdline = [rndc, "-c", "../_common/rndc.conf", "-p", port, "-s", server] - cmdline.extend(rndc_command) - - subprocess.check_output(cmdline, stderr=subprocess.STDOUT, timeout=10) - - -def test_dnstap_dispatch_socket_addresses(): +def test_dnstap_dispatch_socket_addresses(ns3): # Send some query to ns3 so that it records something in its dnstap file. msg = isctest.query.create("mail.example.", "A") res = isctest.query.tcp(msg, "10.53.0.2", expected_rcode=dns.rcode.NOERROR) @@ -52,14 +38,14 @@ def test_dnstap_dispatch_socket_addresses(): ] # Before continuing, roll dnstap file to ensure it is flushed to disk. - run_rndc("10.53.0.3", ["dnstap", "-roll", "1"]) + ns3.rndc("dnstap -roll 1") # Move the dnstap file aside so that it is retained for troubleshooting. os.rename(os.path.join("ns3", "dnstap.out.0"), "dnstap.out.resolver_addresses") # Read the contents of the dnstap file using dnstap-read. - output = subprocess.check_output( - [os.getenv("DNSTAPREAD"), "dnstap.out.resolver_addresses"], encoding="utf-8" + dnstapread = isctest.run.cmd( + [os.getenv("DNSTAPREAD"), "dnstap.out.resolver_addresses"], ) # Check whether all frames contain the expected addresses. @@ -74,7 +60,7 @@ def test_dnstap_dispatch_socket_addresses(): bad_frames = [] inspected_frames = 0 addr_regex = r"^10\.53\.0\.[0-9]+:[0-9]{1,5}$" - for line in output.splitlines(): + for line in dnstapread.out.splitlines(): _, _, frame_type, addr1, _, addr2, _ = line.split(" ", 6) # Only inspect RESOLVER_QUERY and RESOLVER_RESPONSE frames. if frame_type not in ("RQ", "RR"): diff --git a/bin/tests/system/doth/conftest.py b/bin/tests/system/doth/conftest.py index 27dca7ee8a..b95baeaab5 100644 --- a/bin/tests/system/doth/conftest.py +++ b/bin/tests/system/doth/conftest.py @@ -25,10 +25,10 @@ def gnutls_cli_executable(): pytest.skip("gnutls-cli not found in PATH") # Ensure gnutls-cli supports the --logfile command-line option. - output = isctest.run.cmd( + cmd = isctest.run.cmd( [executable, "--logfile=/dev/null"], log_stderr=False, raise_on_exception=False - ).stdout - if b"illegal option" in output: + ) + if "illegal option" in cmd.out: pytest.skip("gnutls-cli does not support the --logfile option") return executable diff --git a/bin/tests/system/isctest/__init__.py b/bin/tests/system/isctest/__init__.py index be94f1edfb..27b915d672 100644 --- a/bin/tests/system/isctest/__init__.py +++ b/bin/tests/system/isctest/__init__.py @@ -12,7 +12,6 @@ from . import check from . import instance from . import query -from . import rndc from . import run from . import template from . import log diff --git a/bin/tests/system/isctest/instance.py b/bin/tests/system/isctest/instance.py index 763a8dbb59..63eb21d127 100644 --- a/bin/tests/system/isctest/instance.py +++ b/bin/tests/system/isctest/instance.py @@ -11,14 +11,15 @@ # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. -from typing import NamedTuple, Optional +from typing import NamedTuple -import logging import os +from pathlib import Path import re -from .rndc import RNDCBinaryExecutor, RNDCException, RNDCExecutor -from .log import info, LogFile, WatchLogFromStart, WatchLogFromHere +from .log import WatchLogFromStart, WatchLogFromHere +from .run import CmdResult, EnvCmd +from .text import TextFile class NamedPorts(NamedTuple): @@ -42,8 +43,6 @@ class NamedInstance: self, identifier: str, ports: NamedPorts = NamedPorts(), - rndc_logger: Optional[logging.Logger] = None, - rndc_executor: Optional[RNDCExecutor] = None, ) -> None: """ `identifier` must be an `ns` string, where `` is an integer @@ -52,18 +51,18 @@ class NamedInstance: `ports` is the `NamedPorts` instance listing the UDP/TCP ports on which this `named` instance is listening for various types of traffic (both DNS traffic and RNDC commands). - - `rndc_logger` is the `logging.Logger` to use for logging RNDC - commands sent to this `named` instance. - - `rndc_executor` is an object implementing the `RNDCExecutor` interface - that is used for executing RNDC commands on this `named` instance. """ self.ip = self._identifier_to_ip(identifier) self.ports = ports - self.log = LogFile(os.path.join(identifier, "named.run")) - self._rndc_executor = rndc_executor or RNDCBinaryExecutor() - self._rndc_logger = rndc_logger + self.log = TextFile(os.path.join(identifier, "named.run")) + + self._rndc_conf = Path("../_common/rndc.conf").absolute() + self._rndc = EnvCmd("RNDC", self.rndc_args) + + @property + def rndc_args(self) -> str: + """Base arguments for calling RNDC to control the instance.""" + return f"-c {self._rndc_conf} -s {self.ip} -p {self.ports.rndc}" @staticmethod def _identifier_to_ip(identifier: str) -> str: @@ -72,52 +71,16 @@ class NamedInstance: raise ValueError("Invalid named instance identifier" + identifier) return "10.53.0." + regex_match.group("index") - def rndc(self, command: str, ignore_errors: bool = False, log: bool = True) -> str: + def rndc(self, command: str, timeout=10, **kwargs) -> CmdResult: """ Send `command` to this named instance using RNDC. Return the server's response. - If the RNDC command fails, an `RNDCException` is raised unless - `ignore_errors` is set to `True`. - - The RNDC command will be logged to `rndc.log` (along with the server's - response) unless `log` is set to `False`. - - ```python - def test_foo(servers): - # Send the "status" command to ns1. An `RNDCException` will be - # raised if the RNDC command fails. This command will be logged. - response = servers["ns1"].rndc("status") - - # Send the "thaw foo" command to ns2. No exception will be raised - # in case the RNDC command fails. This command will be logged - # (even if it fails). - response = servers["ns2"].rndc("thaw foo", ignore_errors=True) - - # Send the "stop" command to ns3. An `RNDCException` will be - # raised if the RNDC command fails, but this command will not be - # logged (the server's response will still be returned to the - # caller, though). - response = servers["ns3"].rndc("stop", log=False) - - # Send the "halt" command to ns4 in "fire & forget mode": no - # exceptions will be raised and no logging will take place (the - # server's response will still be returned to the caller, though). - response = servers["ns4"].rndc("stop", ignore_errors=True, log=False) - ``` + To suppress exceptions, redirect outputs, control logging change + timeout etc. use keyword arguments which are passed to + isctest.cmd.run(). """ - try: - response = self._rndc_executor.call(self.ip, self.ports.rndc, command) - if log: - self._rndc_log(command, response) - except RNDCException as exc: - response = str(exc) - if log: - self._rndc_log(command, response) - if not ignore_errors: - raise - - return response + return self._rndc(command, timeout=timeout, **kwargs) def watch_log_from_start( self, timeout: float = WatchLogFromStart.DEFAULT_TIMEOUT @@ -137,28 +100,12 @@ class NamedInstance: """ return WatchLogFromHere(self.log.path, timeout) - def reconfigure(self, **kwargs) -> None: + def reconfigure(self, **kwargs) -> CmdResult: """ Reconfigure this named `instance` and wait until reconfiguration is - finished. Raise an `RNDCException` if reconfiguration fails. + finished. """ with self.watch_log_from_here() as watcher: - self.rndc("reconfig", **kwargs) + cmd = self.rndc("reconfig", **kwargs) watcher.wait_for_line("any newly configured zones are now loaded") - - def _rndc_log(self, command: str, response: str) -> None: - """ - Log an `rndc` invocation (and its output) to the `rndc.log` file in the - current working directory. - """ - fmt = '%(ip)s: "%(command)s"\n%(separator)s\n%(response)s%(separator)s' - args = { - "ip": self.ip, - "command": command, - "separator": "-" * 80, - "response": response, - } - if self._rndc_logger is None: - info(fmt, args) - else: - self._rndc_logger.info(fmt, args) + return cmd diff --git a/bin/tests/system/isctest/log/__init__.py b/bin/tests/system/isctest/log/__init__.py index 5f285577c2..702a9562d0 100644 --- a/bin/tests/system/isctest/log/__init__.py +++ b/bin/tests/system/isctest/log/__init__.py @@ -21,4 +21,4 @@ from .basic import ( critical, ) -from .watchlog import LogFile, WatchLogFromStart, WatchLogFromHere +from .watchlog import WatchLogFromStart, WatchLogFromHere diff --git a/bin/tests/system/isctest/log/watchlog.py b/bin/tests/system/isctest/log/watchlog.py index 6d85e9817d..447879662a 100644 --- a/bin/tests/system/isctest/log/watchlog.py +++ b/bin/tests/system/isctest/log/watchlog.py @@ -9,15 +9,15 @@ # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. -from typing import Any, Iterator, List, Match, Optional, Pattern, TextIO, TypeVar, Union +from typing import Any, List, Match, Optional, Pattern, TextIO, TypeVar, Union import abc import os -import re import time +from isctest.text import compile_pattern, FlexPattern, LineReader + -FlexPattern = Union[str, Pattern] T = TypeVar("T") OneOrMore = Union[T, List[T]] @@ -30,128 +30,6 @@ class WatchLogTimeout(WatchLogException): pass -class LogFile: - """ - Log file wrapper with a path and means to find a string in its contents. - """ - - def __init__(self, path: str): - self.path = path - - @property - def _lines(self) -> Iterator[str]: - with open(self.path, encoding="utf-8") as f: - yield from f - - def __contains__(self, substring: str) -> bool: - """ - Return whether any of the lines in the log contains a given string. - """ - for line in self._lines: - if substring in line: - return True - return False - - def expect(self, msg: str): - """Check the string is present anywhere in the log file.""" - if msg in self: - return - assert False, f"log message not found in log {self.path}: {msg}" - - def prohibit(self, msg: str): - """Check the string is not present in the entire log file.""" - if msg in self: - assert False, f"forbidden message appeared in log {self.path}: {msg}" - - -class LineReader: - """ - >>> import io - - >>> file = io.StringIO("complete line\\n") - >>> line_reader = LineReader(file) - >>> for line in line_reader.readlines(): - ... print(line.strip()) - complete line - - >>> file = io.StringIO("complete line\\nand then incomplete line") - >>> line_reader = LineReader(file) - >>> for line in line_reader.readlines(): - ... print(line.strip()) - complete line - - >>> file = io.StringIO("complete line\\nand then another complete line\\n") - >>> line_reader = LineReader(file) - >>> for line in line_reader.readlines(): - ... print(line.strip()) - complete line - and then another complete line - - >>> file = io.StringIO() - >>> line_reader = LineReader(file) - >>> for chunk in ( - ... "first line\\nsecond line\\nthi", - ... "rd ", - ... "line\\nfour", - ... "th line\\n\\nfifth line\\n" - ... ): - ... print("=== OUTER ITERATION ===") - ... pos = file.tell() - ... print(chunk, end="", file=file) - ... _ = file.seek(pos) - ... for line in line_reader.readlines(): - ... print("--- inner iteration ---") - ... print(line.strip() or "") - === OUTER ITERATION === - --- inner iteration --- - first line - --- inner iteration --- - second line - === OUTER ITERATION === - === OUTER ITERATION === - --- inner iteration --- - third line - === OUTER ITERATION === - --- inner iteration --- - fourth line - --- inner iteration --- - - --- inner iteration --- - fifth line - """ - - def __init__(self, stream: TextIO): - self._stream = stream - self._linebuf = "" - - def readline(self) -> Optional[str]: - """ - Wrapper around io.readline() function to handle unfinished lines. - - If a line ends with newline character, it's returned immediately. - If a line doesn't end with a newline character, the read contents are - buffered until the next call of this function and None is returned - instead. - """ - read = self._stream.readline() - if not read.endswith("\n"): - self._linebuf += read - return None - read = self._linebuf + read - self._linebuf = "" - return read - - def readlines(self) -> Iterator[str]: - """ - Wrapper around io.readline() which only returns finished lines. - """ - while True: - line = self.readline() - if line is None: - return - yield line - - class WatchLog(abc.ABC): """ Wait for a log message to appear in a text file. @@ -210,15 +88,7 @@ class WatchLog(abc.ABC): if not isinstance(strings, list): strings = [strings] for string in strings: - if isinstance(string, Pattern): - patterns.append(string) - elif isinstance(string, str): - pattern = re.compile(re.escape(string)) - patterns.append(pattern) - else: - raise WatchLogException( - "only string and re.Pattern allowed for matching" - ) + patterns.append(compile_pattern(string)) return patterns def _wait_for_match(self, regexes: List[Pattern]) -> Match: @@ -256,13 +126,14 @@ class WatchLog(abc.ABC): Recommended use: ```python + from re import compile as Re import isctest def test_foo(servers): with servers["ns1"].watch_log_from_start() as watcher: watcher.wait_for_line("all zones loaded") - pattern = re.compile(r"next key event in ([0-9]+) seconds") + pattern = Re(r"next key event in ([0-9]+) seconds") with servers["ns1"].watch_log_from_here() as watcher: # ... do stuff here ... match = watcher.wait_for_line(pattern) @@ -321,7 +192,8 @@ class WatchLog(abc.ABC): >>> # Different values must be returned depending on which line is >>> # found in the log file. >>> import tempfile - >>> patterns = [re.compile(r"bar ([0-9])"), "qux"] + >>> from re import compile as Re + >>> patterns = [Re(r"bar ([0-9])"), "qux"] >>> with tempfile.NamedTemporaryFile("w") as file: ... print("foo bar 3", file=file, flush=True) ... with WatchLogFromStart(file.name) as watcher: @@ -443,7 +315,8 @@ class WatchLog(abc.ABC): >>> assert ret[1].group(0) == "foo" >>> import tempfile - >>> bar_pattern = re.compile('bar') + >>> from re import compile as Re + >>> bar_pattern = Re('bar') >>> patterns = ['foo', bar_pattern] >>> with tempfile.NamedTemporaryFile("w") as file: ... print("bar", file=file, flush=True) diff --git a/bin/tests/system/isctest/rndc.py b/bin/tests/system/isctest/rndc.py deleted file mode 100644 index d4a0a1bd77..0000000000 --- a/bin/tests/system/isctest/rndc.py +++ /dev/null @@ -1,69 +0,0 @@ -# Copyright (C) Internet Systems Consortium, Inc. ("ISC") -# -# SPDX-License-Identifier: MPL-2.0 -# -# This Source Code Form is subject to the terms of the Mozilla Public -# License, v. 2.0. If a copy of the MPL was not distributed with this -# file, you can obtain one at https://mozilla.org/MPL/2.0/. -# -# See the COPYRIGHT file distributed with this work for additional -# information regarding copyright ownership. - -import abc -import os -import subprocess - - -class RNDCExecutor(abc.ABC): - """ - An interface which RNDC executors have to implement in order for the - `NamedInstance` class to be able to use them. - """ - - @abc.abstractmethod - def call(self, ip: str, port: int, command: str) -> str: - """ - Send RNDC `command` to the `named` instance at `ip:port` and return the - server's response. - """ - - -class RNDCException(Exception): - """ - Raised by classes implementing the `RNDCExecutor` interface when sending an - RNDC command fails for any reason. - """ - - -class RNDCBinaryExecutor(RNDCExecutor): - """ - An `RNDCExecutor` which sends RNDC commands to servers using the `rndc` - binary. - """ - - def __init__(self) -> None: - """ - This class needs the `RNDC` environment variable to be set to the path - to the `rndc` binary to use. - """ - rndc_path = os.environ.get("RNDC", "/bin/false") - rndc_conf = os.path.join("..", "_common", "rndc.conf") - self._base_cmdline = [rndc_path, "-c", rndc_conf] - - def call(self, ip: str, port: int, command: str) -> str: - """ - Send RNDC `command` to the `named` instance at `ip:port` and return the - server's response. - """ - cmdline = self._base_cmdline[:] - cmdline.extend(["-s", ip]) - cmdline.extend(["-p", str(port)]) - cmdline.extend(command.split()) - - try: - return subprocess.check_output( - cmdline, stderr=subprocess.STDOUT, timeout=10, encoding="utf-8" - ) - except subprocess.SubprocessError as exc: - msg = getattr(exc, "output", "RNDC exception occurred") - raise RNDCException(msg) from exc diff --git a/bin/tests/system/isctest/run.py b/bin/tests/system/isctest/run.py index f33fa3cad2..3fbc6ac341 100644 --- a/bin/tests/system/isctest/run.py +++ b/bin/tests/system/isctest/run.py @@ -15,11 +15,24 @@ import time from typing import Optional import isctest.log +import isctest.text from isctest.compat import dns_rcode import dns.message +class CmdResult: + def __init__(self, proc=None): + self.proc = proc + self.rc = self.proc.returncode + self.out = isctest.text.Text("") + self.err = isctest.text.Text("") + if self.proc.stdout: + self.out = isctest.text.Text(self.proc.stdout.decode("utf-8")) + if self.proc.stderr: + self.err = isctest.text.Text(self.proc.stderr.decode("utf-8")) + + def cmd( args, cwd=None, @@ -31,7 +44,7 @@ def cmd( input_text: Optional[bytes] = None, raise_on_exception=True, env: Optional[dict] = None, -): +) -> CmdResult: """Execute a command with given args as subprocess.""" isctest.log.debug(f"isctest.run.cmd(): {' '.join(args)}") @@ -61,24 +74,26 @@ def cmd( env=env, ) print_debug_logs(proc) - return proc + return CmdResult(proc) except subprocess.CalledProcessError as exc: print_debug_logs(exc) isctest.log.debug(f"isctest.run.cmd(): (return code) {exc.returncode}") if raise_on_exception: raise exc - return exc + return CmdResult(exc) -class Dig: - def __init__(self, base_params: str = ""): - self.base_params = base_params +class EnvCmd: + """Helper for executing binaries from env with optional base parameters.""" - def __call__(self, params: str) -> str: - """Run the dig command with the given parameters and return the decoded output.""" - return cmd( - [os.environ.get("DIG")] + f"{self.base_params} {params}".split(), - ).stdout.decode("utf-8") + def __init__(self, name: str, base_params: str = ""): + self.bin_path = os.environ[name] + self.base_params = base_params.split() + + def __call__(self, params: str, **kwargs) -> CmdResult: + """Call the command. Keyword arguments from isctest.run.cmd() are supported.""" + args = self.base_params + params.split() + return cmd([self.bin_path] + args, **kwargs) def retry_with_timeout(func, timeout, delay=1, msg=None): diff --git a/bin/tests/system/isctest/text.py b/bin/tests/system/isctest/text.py new file mode 100644 index 0000000000..ca3cc835e7 --- /dev/null +++ b/bin/tests/system/isctest/text.py @@ -0,0 +1,178 @@ +#!/usr/bin/python3 + +# Copyright (C) Internet Systems Consortium, Inc. ("ISC") +# +# SPDX-License-Identifier: MPL-2.0 +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, you can obtain one at https://mozilla.org/MPL/2.0/. +# +# See the COPYRIGHT file distributed with this work for additional +# information regarding copyright ownership. + +import abc +import re +from re import compile as Re +from typing import Iterator, List, Match, Optional, Pattern, TextIO, Union + + +FlexPattern = Union[str, Pattern] + + +def compile_pattern(string: FlexPattern) -> Pattern: + if isinstance(string, Pattern): + return string + if isinstance(string, str): + return Re(re.escape(string)) + raise TypeError("only string and re.Pattern allowed") + + +class Grep(abc.ABC): + """ + Implement a grep-like interface for pattern matching in texts and files. + """ + + @abc.abstractmethod + def readlines(self) -> Iterator[str]: + raise NotImplementedError + + def igrep(self, pattern: FlexPattern) -> Iterator[Match]: + """ + Iterate over the lines matching the pattern. + """ + regex = compile_pattern(pattern) + + for line in self.readlines(): + match = regex.search(line) + if match: + yield match + + def grep(self, pattern: FlexPattern) -> List[Match]: + """ + Get list of lines matching the pattern. + """ + return list(self.igrep(pattern)) + + def __contains__(self, pattern: FlexPattern) -> bool: + """ + Return whether any of the lines in the log contains matches the pattern. + """ + try: + next(self.igrep(pattern)) + except StopIteration: + return False + return True + + +class Text(Grep, str): # type: ignore + """ + Wrapper around classic string with grep support. + """ + + def readlines(self): + yield from self.splitlines(keepends=True) + + +class TextFile(Grep): + """ + Text file wrapper with grep support. + """ + + def __init__(self, path: str): + self.path = path + + def readlines(self) -> Iterator[str]: + with open(self.path, encoding="utf-8") as f: + yield from f + + def __repr__(self): + return self.path + + +class LineReader(Grep): + """ + >>> import io + + >>> file = io.StringIO("complete line\\n") + >>> line_reader = LineReader(file) + >>> for line in line_reader.readlines(): + ... print(line.strip()) + complete line + + >>> file = io.StringIO("complete line\\nand then incomplete line") + >>> line_reader = LineReader(file) + >>> for line in line_reader.readlines(): + ... print(line.strip()) + complete line + + >>> file = io.StringIO("complete line\\nand then another complete line\\n") + >>> line_reader = LineReader(file) + >>> for line in line_reader.readlines(): + ... print(line.strip()) + complete line + and then another complete line + + >>> file = io.StringIO() + >>> line_reader = LineReader(file) + >>> for chunk in ( + ... "first line\\nsecond line\\nthi", + ... "rd ", + ... "line\\nfour", + ... "th line\\n\\nfifth line\\n" + ... ): + ... print("=== OUTER ITERATION ===") + ... pos = file.tell() + ... print(chunk, end="", file=file) + ... _ = file.seek(pos) + ... for line in line_reader.readlines(): + ... print("--- inner iteration ---") + ... print(line.strip() or "") + === OUTER ITERATION === + --- inner iteration --- + first line + --- inner iteration --- + second line + === OUTER ITERATION === + === OUTER ITERATION === + --- inner iteration --- + third line + === OUTER ITERATION === + --- inner iteration --- + fourth line + --- inner iteration --- + + --- inner iteration --- + fifth line + """ + + def __init__(self, stream: TextIO): + self._stream = stream + self._linebuf = "" + + def readline(self) -> Optional[str]: + """ + Wrapper around io.readline() function to handle unfinished lines. + + If a line ends with newline character, it's returned immediately. + If a line doesn't end with a newline character, the read contents are + buffered until the next call of this function and None is returned + instead. + """ + read = self._stream.readline() + if not read.endswith("\n"): + self._linebuf += read + return None + read = self._linebuf + read + self._linebuf = "" + return read + + def readlines(self) -> Iterator[str]: + """ + Wrapper around io.readline() which only returns finished lines. + """ + while True: + line = self.readline() + if line is None: + return + yield line diff --git a/bin/tests/system/keepalive/tests_keepalive.py b/bin/tests/system/keepalive/tests_keepalive.py index 5fc8aaaf5b..d09d566f6e 100644 --- a/bin/tests/system/keepalive/tests_keepalive.py +++ b/bin/tests/system/keepalive/tests_keepalive.py @@ -20,7 +20,7 @@ pytestmark = pytest.mark.extra_artifacts( def test_dig_tcp_keepalive_handling(named_port, servers): def get_keepalive_options_received(): - servers["ns2"].rndc("stats", log=False) + servers["ns2"].rndc("stats") options_received = 0 with open("ns2/named.stats", "r", encoding="utf-8") as ns2_stats_file: for line in ns2_stats_file: @@ -28,38 +28,41 @@ def test_dig_tcp_keepalive_handling(named_port, servers): options_received = line.split()[0] return int(options_received) - dig = isctest.run.Dig(f"-p {str(named_port)}") + dig = isctest.run.EnvCmd("DIG", f"-p {str(named_port)}") isctest.log.info("check that dig handles TCP keepalive in query") - assert "; TCP KEEPALIVE" in dig("+qr +keepalive foo.example. @10.53.0.2") + assert "; TCP KEEPALIVE" in dig("+qr +keepalive foo.example. @10.53.0.2").out isctest.log.info("check that dig added TCP keepalive was received") assert get_keepalive_options_received() == 1 isctest.log.info("check that TCP keepalive is added for TCP responses") - assert "; TCP KEEPALIVE" in dig("+tcp +keepalive foo.example. @10.53.0.2") + assert "; TCP KEEPALIVE" in dig("+tcp +keepalive foo.example. @10.53.0.2").out isctest.log.info("check that TCP keepalive requires TCP") - assert "; TCP KEEPALIVE" not in dig("+keepalive foo.example. @10.53.0.2") + assert "; TCP KEEPALIVE" not in dig("+keepalive foo.example. @10.53.0.2").out isctest.log.info("check the default keepalive value") - assert "; TCP KEEPALIVE: 30.0 secs" in dig( - "+tcp +keepalive foo.example. @10.53.0.3" + assert ( + "; TCP KEEPALIVE: 30.0 secs" + in dig("+tcp +keepalive foo.example. @10.53.0.3").out ) isctest.log.info("check a keepalive configured value") - assert "; TCP KEEPALIVE: 15.0 secs" in dig( - "+tcp +keepalive foo.example. @10.53.0.2" + assert ( + "; TCP KEEPALIVE: 15.0 secs" + in dig("+tcp +keepalive foo.example. @10.53.0.2").out ) isctest.log.info("check a re-configured keepalive value") - response = servers["ns2"].rndc("tcp-timeouts 300 300 300 200", log=False) - assert "tcp-initial-timeout=300" in response - assert "tcp-idle-timeout=300" in response - assert "tcp-keepalive-timeout=300" in response - assert "tcp-advertised-timeout=200" in response - assert "; TCP KEEPALIVE: 20.0 secs" in dig( - "+tcp +keepalive foo.example. @10.53.0.2" + response = servers["ns2"].rndc("tcp-timeouts 300 300 300 200") + assert "tcp-initial-timeout=300" in response.out + assert "tcp-idle-timeout=300" in response.out + assert "tcp-keepalive-timeout=300" in response.out + assert "tcp-advertised-timeout=200" in response.out + assert ( + "; TCP KEEPALIVE: 20.0 secs" + in dig("+tcp +keepalive foo.example. @10.53.0.2").out ) isctest.log.info("check server config entry") diff --git a/bin/tests/system/keyfromlabel/tests_keyfromlabel.py b/bin/tests/system/keyfromlabel/tests_keyfromlabel.py index cc62ec6a54..f2c1c0bea2 100644 --- a/bin/tests/system/keyfromlabel/tests_keyfromlabel.py +++ b/bin/tests/system/keyfromlabel/tests_keyfromlabel.py @@ -11,7 +11,7 @@ import hashlib import os -import re +from re import compile as Re import shutil import pytest @@ -75,18 +75,16 @@ def token_init_and_cleanup(): ) try: - output = isctest.run.cmd( - token_init_command, env=EMPTY_OPENSSL_CONF_ENV - ).stdout.decode("utf-8") - assert "The token has been initialized and is reassigned to slot" in output + cmd = isctest.run.cmd(token_init_command, env=EMPTY_OPENSSL_CONF_ENV) + assert "The token has been initialized and is reassigned to slot" in cmd.out yield finally: - output = isctest.run.cmd( + cmd = isctest.run.cmd( token_cleanup_command, env=EMPTY_OPENSSL_CONF_ENV, raise_on_exception=False, - ).stdout.decode("utf-8") - assert re.search("Found token (.*) with matching token label", output) + ) + assert Re("Found token (.*) with matching token label") in cmd.out # pylint: disable-msg=too-many-locals @@ -126,11 +124,9 @@ def test_keyfromlabel(alg_name, alg_type, alg_bits): HSMPIN, ] - output = isctest.run.cmd( - pkcs11_command, env=EMPTY_OPENSSL_CONF_ENV - ).stdout.decode("utf-8") + cmd = isctest.run.cmd(pkcs11_command, env=EMPTY_OPENSSL_CONF_ENV) - assert "Key pair generated" in output + assert "Key pair generated" in cmd.out def keyfromlabel(alg_name, zone, key_id, key_flag): key_flag = key_flag.split() if key_flag else [] @@ -148,18 +144,18 @@ def test_keyfromlabel(alg_name, alg_type, alg_bits): zone, ] - output = isctest.run.cmd(keyfrlab_command) - output_decoded = output.stdout.decode("utf-8").rstrip() + ".key" + cmd = isctest.run.cmd(keyfrlab_command) + keyfile = cmd.out.rstrip() + ".key" - assert os.path.exists(output_decoded) + assert os.path.exists(keyfile) - return output_decoded + return keyfile if ( isctest.run.cmd( [os.environ["SHELL"], "../testcrypto.sh", alg_name], raise_on_exception=False, - ).returncode + ).rc != 0 ): pytest.skip(f"{alg_name} is not supported") diff --git a/bin/tests/system/masterfile/tests_masterfile.py b/bin/tests/system/masterfile/tests_masterfile.py index 5d7baf58b3..e408bf235c 100644 --- a/bin/tests/system/masterfile/tests_masterfile.py +++ b/bin/tests/system/masterfile/tests_masterfile.py @@ -96,7 +96,7 @@ def test_masterfile_missing_master_file_servfail(): def test_masterfile_owner_inheritance(): """Test owner inheritance after $INCLUDE""" - checker_output = isctest.run.cmd( + cmd = isctest.run.cmd( [ os.environ["CHECKZONE"], "-D", @@ -104,12 +104,12 @@ def test_masterfile_owner_inheritance(): "example", "zone/inheritownerafterinclude.db", ] - ).stdout.decode("utf-8") + ) owner_inheritance_zone = """ example. 0 IN SOA . . 0 0 0 0 0 example. 0 IN TXT "this should be at the zone apex" example. 0 IN NS . """ - checker_zone = dns.zone.from_text(checker_output, origin="example.") + checker_zone = dns.zone.from_text(cmd.out, origin="example.") expected = dns.zone.from_text(owner_inheritance_zone, origin="example.") isctest.check.zones_equal(checker_zone, expected, compare_ttl=True) diff --git a/bin/tests/system/optout/tests_optout.py b/bin/tests/system/optout/tests_optout.py index d7cbe1845f..31bef7c333 100755 --- a/bin/tests/system/optout/tests_optout.py +++ b/bin/tests/system/optout/tests_optout.py @@ -86,10 +86,10 @@ def verify_zone(zone, transfer): verifier = isctest.run.cmd(verify_cmd) - if verifier.returncode != 0: + if verifier.rc != 0: isctest.log.error(f"dnssec-verify {zone}. failed") - return verifier.returncode == 0 + return verifier.rc == 0 def test_optout(ns2): diff --git a/bin/tests/system/re_compile_checker.py b/bin/tests/system/re_compile_checker.py new file mode 100644 index 0000000000..efa0e9a038 --- /dev/null +++ b/bin/tests/system/re_compile_checker.py @@ -0,0 +1,46 @@ +# Copyright (C) Internet Systems Consortium, Inc. ("ISC") +# +# SPDX-License-Identifier: MPL-2.0 +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, you can obtain one at https://mozilla.org/MPL/2.0/. +# +# See the COPYRIGHT file distributed with this work for additional +# information regarding copyright ownership. + +# pylint: disable=unknown-option-value,re-compile-alias + +import re + +from astroid import nodes + +from pylint.checkers import BaseRawFileChecker +from pylint.lint import PyLinter + + +class ReCompileChecker(BaseRawFileChecker): + + name = "custom_raw" + msgs = { + "R9901": ( + "Replace re.compile() with Re() using `from re import compile as Re`", + "re-compile-alias", + ( + "Use a Re() alias instead of re.compile() by importing the " + "re.compile() function as Re()" + ), + ), + } + options = () + + def process_module(self, node: nodes.Module) -> None: + pattern = re.compile(r"re\.compile\(") + with node.stream() as stream: + for lineno, line in enumerate(stream): + if pattern.search(line.decode("utf-8")): + self.add_message("re-compile-alias", line=lineno) + + +def register(linter: PyLinter) -> None: + linter.register_checker(ReCompileChecker(linter)) diff --git a/bin/tests/system/rrchecker/tests_rrchecker.py b/bin/tests/system/rrchecker/tests_rrchecker.py index 5889224e91..a321765295 100644 --- a/bin/tests/system/rrchecker/tests_rrchecker.py +++ b/bin/tests/system/rrchecker/tests_rrchecker.py @@ -121,22 +121,18 @@ pytestmark = pytest.mark.extra_artifacts( ], ) def test_rrchecker_list_standard_names(option, expected_result): - stdout = isctest.run.cmd([os.environ["RRCHECKER"], option]).stdout.decode("utf-8") - values = [line for line in stdout.split("\n") if line.strip()] + cmd = isctest.run.cmd([os.environ["RRCHECKER"], option]) + values = [line for line in cmd.out.split("\n") if line.strip()] assert sorted(values) == sorted(expected_result) def run_rrchecker(option, rr_class, rr_type, rr_rest): - rrchecker_output = ( - isctest.run.cmd( - [os.environ["RRCHECKER"], option], - input_text=f"{rr_class} {rr_type} {rr_rest}".encode("utf-8"), - ) - .stdout.decode("utf-8") - .strip() + cmd = isctest.run.cmd( + [os.environ["RRCHECKER"], option], + input_text=f"{rr_class} {rr_type} {rr_rest}".encode("utf-8"), ) - return rrchecker_output.split() + return cmd.out.strip().split() @pytest.mark.parametrize( @@ -162,7 +158,7 @@ def test_rrchecker_conversions(option): ".", tempzone_file, ], - ).stdout.decode("utf-8") + ).out checkzone_output = [ line for line in checkzone_output.splitlines() if not line.startswith(";") ] diff --git a/bin/tests/system/shutdown/tests_shutdown.py b/bin/tests/system/shutdown/tests_shutdown.py index 3d0c410ee0..52e574543e 100755 --- a/bin/tests/system/shutdown/tests_shutdown.py +++ b/bin/tests/system/shutdown/tests_shutdown.py @@ -71,11 +71,8 @@ def do_work(named_proc, resolver_ip, instance, kill_method, n_workers, n_queries # helper function, 'command' is the rndc command to run def launch_rndc(command): - try: - instance.rndc(command, log=False) - return 0 - except isctest.rndc.RNDCException: - return -1 + ret = instance.rndc(command, raise_on_exception=False) + return 0 if ret.rc == 0 else -1 # We're going to execute queries in parallel by means of a thread pool. # dnspython functions block, so we need to circumvent that. diff --git a/bin/tests/system/spf/tests_spf_zones.py b/bin/tests/system/spf/tests_spf_zones.py index 550704ba51..5cb7348c3c 100644 --- a/bin/tests/system/spf/tests_spf_zones.py +++ b/bin/tests/system/spf/tests_spf_zones.py @@ -13,7 +13,7 @@ import pytest @pytest.mark.requires_zones_loaded("ns1") -def test_spf_log(servers): +def test_spf_log(ns1): for msg in ( "zone spf/IN: 'y.spf' found type SPF record but no SPF TXT record found", "zone warn/IN: 'y.warn' found type SPF record but no SPF TXT record found", @@ -21,7 +21,7 @@ def test_spf_log(servers): "zone warn/IN: loaded serial 0", "zone nowarn/IN: loaded serial 0", ): - servers["ns1"].log.expect(msg) + assert msg in ns1.log for msg in ( "zone nowarn/IN: 'y.nowarn' found type SPF record but no SPF TXT record found", @@ -29,4 +29,4 @@ def test_spf_log(servers): "zone warn/IN: 'warn' found type SPF record but no SPF TXT record found", "zone nowarn/IN: 'nowarn' found type SPF record but no SPF TXT record found", ): - servers["ns1"].log.prohibit(msg) + assert msg not in ns1.log diff --git a/bin/tests/system/stress/tests_stress_update.py b/bin/tests/system/stress/tests_stress_update.py index 5bb413df33..ef93c57fa0 100644 --- a/bin/tests/system/stress/tests_stress_update.py +++ b/bin/tests/system/stress/tests_stress_update.py @@ -10,7 +10,6 @@ # information regarding copyright ownership. import concurrent.futures -import os import time import dns.update @@ -28,22 +27,8 @@ pytestmark = pytest.mark.extra_artifacts( def rndc_loop(test_state, server): - rndc = os.getenv("RNDC") - port = os.getenv("CONTROLPORT") - - cmdline = [ - rndc, - "-c", - "../_common/rndc.conf", - "-p", - port, - "-s", - server, - "reload", - ] - while not test_state["finished"]: - isctest.run.cmd(cmdline, raise_on_exception=False) + server.rndc("reload", raise_on_exception=False) time.sleep(1) diff --git a/bin/tests/system/tools/tests_tools_nsec3hash.py b/bin/tests/system/tools/tests_tools_nsec3hash.py index f1384d6ceb..93670bc48c 100644 --- a/bin/tests/system/tools/tests_tools_nsec3hash.py +++ b/bin/tests/system/tools/tests_tools_nsec3hash.py @@ -51,16 +51,12 @@ def test_nsec3_hashes(domain, nsec3hash): algorithm = "1" iterations = "12" - output = isctest.run.cmd( - [NSEC3HASH, salt, algorithm, iterations, domain] - ).stdout.decode("utf-8") - assert nsec3hash in output + cmd = isctest.run.cmd([NSEC3HASH, salt, algorithm, iterations, domain]) + assert nsec3hash in cmd.out flags = "0" - output = isctest.run.cmd( - [NSEC3HASH, "-r", algorithm, flags, iterations, salt, domain] - ).stdout.decode("utf-8") - assert nsec3hash in output + cmd = isctest.run.cmd([NSEC3HASH, "-r", algorithm, flags, iterations, salt, domain]) + assert nsec3hash in cmd.out @pytest.mark.parametrize( @@ -77,11 +73,11 @@ def test_nsec3_empty_salt(salt_emptiness_args): iterations = "0" domain = "com" - output = isctest.run.cmd( + cmd = isctest.run.cmd( [NSEC3HASH] + salt_emptiness_args + [algorithm, iterations, domain] - ).stdout.decode("utf-8") - assert "CK0POJMG874LJREF7EFN8430QVIT8BSM" in output - assert "salt=-" in output + ) + assert "CK0POJMG874LJREF7EFN8430QVIT8BSM" in cmd.out + assert "salt=-" in cmd.out @pytest.mark.parametrize( @@ -97,7 +93,7 @@ def test_nsec3_empty_salt_r(salt_emptiness_arg): iterations = "0" domain = "com" - output = isctest.run.cmd( + cmd = isctest.run.cmd( [ NSEC3HASH, "-r", @@ -107,8 +103,8 @@ def test_nsec3_empty_salt_r(salt_emptiness_arg): salt_emptiness_arg, domain, ] - ).stdout.decode("utf-8") - assert " - CK0POJMG874LJREF7EFN8430QVIT8BSM" in output + ) + assert " - CK0POJMG874LJREF7EFN8430QVIT8BSM" in cmd.out @pytest.mark.parametrize( @@ -144,10 +140,8 @@ def test_nsec3hash_acceptable_values(domain, it, salt_bytes) -> None: ) # calculate the hash using nsec3hash: - output = isctest.run.cmd( - [NSEC3HASH, salt_text, "1", str(it), str(domain)] - ).stdout.decode("ascii") - hash2 = output.partition(" ")[0] + cmd = isctest.run.cmd([NSEC3HASH, salt_text, "1", str(it), str(domain)]) + hash2 = cmd.out.partition(" ")[0] assert hash1 == hash2 diff --git a/bin/tests/system/verify/tests_verify.py b/bin/tests/system/verify/tests_verify.py index d900e56039..6d82bfca89 100644 --- a/bin/tests/system/verify/tests_verify.py +++ b/bin/tests/system/verify/tests_verify.py @@ -11,6 +11,7 @@ import os import re +from re import compile as Re import pytest @@ -61,14 +62,14 @@ def test_verify_good_zone_nsec_next_name_case_mismatch(): ) -def get_bad_zone_output(zone): - only_opt = ["-z"] if re.match(r"[zk]sk-only", zone) else [] - output = isctest.run.cmd( +def verify_bad_zone(zone): + only_opt = ["-z"] if re.search(r"^[zk]sk-only", zone) else [] + cmd = isctest.run.cmd( [VERIFY, *only_opt, "-o", zone, f"zones/{zone}.bad"], raise_on_exception=False, ) - stream = (output.stdout + output.stderr).decode("utf-8").replace("\n", "") - return stream + assert cmd.rc != 0 + return cmd @pytest.mark.parametrize( @@ -80,7 +81,8 @@ def get_bad_zone_output(zone): ], ) def test_verify_bad_zone_files_dnskeyonly(zone): - assert re.match(r".*DNSKEY is not signed.*", get_bad_zone_output(zone)) + cmd = verify_bad_zone(zone) + assert "DNSKEY is not signed" in cmd.err @pytest.mark.parametrize( @@ -97,10 +99,8 @@ def test_verify_bad_zone_files_dnskeyonly(zone): ], ) def test_verify_bad_zone_files_expired(zone): - assert re.match( - r".*signature has expired.*|.*No self-signed .*DNSKEY found.*", - get_bad_zone_output(zone), - ) + cmd = verify_bad_zone(zone) + assert Re("signature has expired|No self-signed DNSKEY found") in cmd.err @pytest.mark.parametrize( @@ -112,40 +112,33 @@ def test_verify_bad_zone_files_expired(zone): ], ) def test_verify_bad_zone_files_unexpected_nsec_rrset(zone): - assert re.match(r".*unexpected NSEC RRset at.*", get_bad_zone_output(zone)) + cmd = verify_bad_zone(zone) + assert "unexpected NSEC RRset at" in cmd.err def test_verify_bad_zone_files_bad_nsec_record(): - assert re.match( - r".*Bad NSEC record for.*, next name mismatch.*", - get_bad_zone_output("ksk+zsk.nsec.broken-chain"), - ) + cmd = verify_bad_zone("ksk+zsk.nsec.broken-chain") + assert Re("Bad NSEC record for.*, next name mismatch") in cmd.err def test_verify_bad_zone_files_bad_bitmap(): - assert re.match( - r".*bit map mismatch.*", get_bad_zone_output("ksk+zsk.nsec.bad-bitmap") - ) + cmd = verify_bad_zone("ksk+zsk.nsec.bad-bitmap") + assert "bit map mismatch" in cmd.err def test_verify_bad_zone_files_missing_nsec3_record(): - assert re.match( - r".*Missing NSEC3 record for.*", - get_bad_zone_output("ksk+zsk.nsec3.missing-empty"), - ) + cmd = verify_bad_zone("ksk+zsk.nsec3.missing-empty") + assert "Missing NSEC3 record for" in cmd.err def test_verify_bad_zone_files_no_dnssec_keys(): - assert re.match( - r".*Zone contains no DNSSEC keys.*", get_bad_zone_output("unsigned") - ) + cmd = verify_bad_zone("unsigned") + assert "Zone contains no DNSSEC keys" in cmd.err def test_verify_bad_zone_files_unequal_nsec3_chains(): - assert re.match( - r".*Expected and found NSEC3 chains not equal.*", - get_bad_zone_output("ksk+zsk.nsec3.extra-nsec3"), - ) + cmd = verify_bad_zone("ksk+zsk.nsec3.extra-nsec3") + assert "Expected and found NSEC3 chains not equal" in cmd.err # checking error message when -o is not used @@ -153,19 +146,17 @@ def test_verify_bad_zone_files_unequal_nsec3_chains(): def test_verify_soa_not_at_top_error(): # when -o is not used, origin is set to zone file name, # which should cause an error in this case - output = isctest.run.cmd( - [VERIFY, "zones/ksk+zsk.nsec.good"], raise_on_exception=False - ).stderr.decode("utf-8") - assert "not at top of zone" in output - assert "use -o to specify a different zone origin" in output + cmd = isctest.run.cmd([VERIFY, "zones/ksk+zsk.nsec.good"], raise_on_exception=False) + assert "not at top of zone" in cmd.err + assert "use -o to specify a different zone origin" in cmd.err # checking error message when an invalid -o is specified # and a SOA record not at top of zone is found def test_verify_invalid_o_option_soa_not_at_top_error(): - output = isctest.run.cmd( + cmd = isctest.run.cmd( [VERIFY, "-o", "invalid.origin", "zones/ksk+zsk.nsec.good"], raise_on_exception=False, - ).stderr.decode("utf-8") - assert "not at top of zone" in output - assert "use -o to specify a different zone origin" not in output + ) + assert "not at top of zone" in cmd.err + assert "use -o to specify a different zone origin" not in cmd.err diff --git a/bin/tests/system/xferquota/tests_xferquota.py b/bin/tests/system/xferquota/tests_xferquota.py index 1cb169156e..230135cd52 100644 --- a/bin/tests/system/xferquota/tests_xferquota.py +++ b/bin/tests/system/xferquota/tests_xferquota.py @@ -12,6 +12,7 @@ import glob import os import re +from re import compile as Re import shutil import signal import time @@ -71,7 +72,7 @@ def test_xferquota(named_port, servers): isctest.check.rrsets_equal(ns1response.answer, ns2response.answer) query_and_compare(axfr_msg) - pattern = re.compile( + pattern = Re( f"transfer of 'changing/IN' from 10.53.0.1#{named_port}: " f"Transfer completed: .*\\(serial 2\\)" )