diff --git a/bin/tests/system/cipher-suites/tests_cipher_suites.py b/bin/tests/system/cipher-suites/tests_cipher_suites.py index edc3c09598..86adc29da1 100644 --- a/bin/tests/system/cipher-suites/tests_cipher_suites.py +++ b/bin/tests/system/cipher-suites/tests_cipher_suites.py @@ -9,7 +9,7 @@ # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. -import re +from re import compile as Re import pytest @@ -31,7 +31,7 @@ pytestmark = pytest.mark.extra_artifacts( @pytest.fixture(scope="module") def transfers_complete(servers): for zone in ["example", "example-aes-128", "example-aes-256", "example-chacha-20"]: - pattern = re.compile( + pattern = Re( f"transfer of '{zone}/IN' from 10.53.0.1#[0-9]+: Transfer completed" ) for ns in ["ns2", "ns3", "ns4", "ns5"]: diff --git a/bin/tests/system/conftest.py b/bin/tests/system/conftest.py index 45728db241..51446ffa73 100644 --- a/bin/tests/system/conftest.py +++ b/bin/tests/system/conftest.py @@ -12,7 +12,7 @@ import filecmp import os from pathlib import Path -import re +from re import compile as Re import shutil import subprocess import tempfile @@ -53,7 +53,7 @@ else: XDIST_WORKER = os.environ.get("PYTEST_XDIST_WORKER", "") FILE_DIR = os.path.abspath(Path(__file__).parent) -ENV_RE = re.compile(b"([^=]+)=(.*)") +ENV_RE = Re(b"([^=]+)=(.*)") PRIORITY_TESTS = [ # Tests that are scheduled first. Speeds up parallel execution. "rpz/", @@ -62,9 +62,9 @@ PRIORITY_TESTS = [ "timeouts/", "upforwd/", ] -PRIORITY_TESTS_RE = re.compile("|".join(PRIORITY_TESTS)) -SYSTEM_TEST_NAME_RE = re.compile(f"{SYSTEM_TEST_DIR_GIT_PATH}" + r"/([^/]+)") -SYMLINK_REPLACEMENT_RE = re.compile(r"/tests(_.*)\.py") +PRIORITY_TESTS_RE = Re("|".join(PRIORITY_TESTS)) +SYSTEM_TEST_NAME_RE = Re(f"{SYSTEM_TEST_DIR_GIT_PATH}" + r"/([^/]+)") +SYMLINK_REPLACEMENT_RE = Re(r"/tests(_.*)\.py") # ----------------------- Global requirements ---------------------------- diff --git a/bin/tests/system/isctest/kasp.py b/bin/tests/system/isctest/kasp.py index a213ead4a3..06df6d44b3 100644 --- a/bin/tests/system/isctest/kasp.py +++ b/bin/tests/system/isctest/kasp.py @@ -15,6 +15,7 @@ import glob import os from pathlib import Path import re +from re import compile as Re import time from typing import Dict, List, Optional, Tuple, Union @@ -1453,7 +1454,7 @@ def next_key_event_equals(server, zone, next_event): waitfor = rf".*zone {zone}.*: next key event in (?!3600$)(.*) seconds" with server.watch_log_from_start() as watcher: - watcher.wait_for_line(re.compile(waitfor)) + watcher.wait_for_line(Re(waitfor)) # WMM: The with code below is extracting the line the watcher was # waiting for. If WatchLog.wait_for_line()` returned the matched string, diff --git a/bin/tests/system/isctest/log/watchlog.py b/bin/tests/system/isctest/log/watchlog.py index 6d85e9817d..d2d8521ebd 100644 --- a/bin/tests/system/isctest/log/watchlog.py +++ b/bin/tests/system/isctest/log/watchlog.py @@ -14,6 +14,7 @@ from typing import Any, Iterator, List, Match, Optional, Pattern, TextIO, TypeVa import abc import os import re +from re import compile as Re import time @@ -213,7 +214,7 @@ class WatchLog(abc.ABC): if isinstance(string, Pattern): patterns.append(string) elif isinstance(string, str): - pattern = re.compile(re.escape(string)) + pattern = Re(re.escape(string)) patterns.append(pattern) else: raise WatchLogException( @@ -256,13 +257,14 @@ class WatchLog(abc.ABC): Recommended use: ```python + from re import compile as Re import isctest def test_foo(servers): with servers["ns1"].watch_log_from_start() as watcher: watcher.wait_for_line("all zones loaded") - pattern = re.compile(r"next key event in ([0-9]+) seconds") + pattern = Re(r"next key event in ([0-9]+) seconds") with servers["ns1"].watch_log_from_here() as watcher: # ... do stuff here ... match = watcher.wait_for_line(pattern) @@ -321,7 +323,8 @@ class WatchLog(abc.ABC): >>> # Different values must be returned depending on which line is >>> # found in the log file. >>> import tempfile - >>> patterns = [re.compile(r"bar ([0-9])"), "qux"] + >>> from re import compile as Re + >>> patterns = [Re(r"bar ([0-9])"), "qux"] >>> with tempfile.NamedTemporaryFile("w") as file: ... print("foo bar 3", file=file, flush=True) ... with WatchLogFromStart(file.name) as watcher: @@ -443,7 +446,8 @@ class WatchLog(abc.ABC): >>> assert ret[1].group(0) == "foo" >>> import tempfile - >>> bar_pattern = re.compile('bar') + >>> from re import compile as Re + >>> bar_pattern = Re('bar') >>> patterns = ['foo', bar_pattern] >>> with tempfile.NamedTemporaryFile("w") as file: ... print("bar", file=file, flush=True) diff --git a/bin/tests/system/isctest/vars/openssl.py b/bin/tests/system/isctest/vars/openssl.py index 5df12b7247..c83fc4e5d3 100644 --- a/bin/tests/system/isctest/vars/openssl.py +++ b/bin/tests/system/isctest/vars/openssl.py @@ -10,7 +10,7 @@ # information regarding copyright ownership. import os -import re +from re import compile as Re from typing import Optional from .. import log @@ -33,7 +33,7 @@ def parse_openssl_config(path: Optional[str]): return assert os.path.isfile(path), f"{path} exists, but it's not a file" - regex = re.compile(r"([^=]+)=(.*)") + regex = Re(r"([^=]+)=(.*)") log.debug(f"parsing openssl config: {path}") with open(path, "r", encoding="utf-8") as conf: for line in conf: diff --git a/bin/tests/system/multisigner/tests_multisigner.py b/bin/tests/system/multisigner/tests_multisigner.py index c356595afe..900cc7e7b4 100644 --- a/bin/tests/system/multisigner/tests_multisigner.py +++ b/bin/tests/system/multisigner/tests_multisigner.py @@ -12,6 +12,7 @@ from datetime import timedelta import os import re +from re import compile as Re import pytest @@ -103,9 +104,7 @@ def check_no_dnssec_in_journal(server, zone): ] cmd = isctest.run.cmd(journalprint) - pattern = re.compile( - r"^\s*(?:\S+\s+){4}(NSEC|NSEC3|NSEC3PARAM|RRSIG)", flags=re.MULTILINE - ) + pattern = Re(r"^\s*(?:\S+\s+){4}(NSEC|NSEC3|NSEC3PARAM|RRSIG)", flags=re.MULTILINE) match = pattern.search(cmd.out) assert not match, f"{match.group(1)} record found in journal" diff --git a/bin/tests/system/xfer-servers-list/tests_xfer_servers_list.py b/bin/tests/system/xfer-servers-list/tests_xfer_servers_list.py index 612be36b1d..54cf2fcaf2 100644 --- a/bin/tests/system/xfer-servers-list/tests_xfer_servers_list.py +++ b/bin/tests/system/xfer-servers-list/tests_xfer_servers_list.py @@ -9,7 +9,7 @@ # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. -import re +from re import compile as Re import isctest @@ -32,7 +32,7 @@ def wait_for_initial_xfrin(ns): def wait_for_sending_notify(ns1, ns, key_name): - pattern = re.compile( + pattern = Re( f"zone test/IN: sending notify to {ns.ip}#[0-9]+ : TSIG \\({key_name}\\)" ) with ns1.watch_log_from_start() as watcher: diff --git a/bin/tests/system/xferquota/tests_xferquota.py b/bin/tests/system/xferquota/tests_xferquota.py index a4edc5998f..a0d042d135 100644 --- a/bin/tests/system/xferquota/tests_xferquota.py +++ b/bin/tests/system/xferquota/tests_xferquota.py @@ -12,6 +12,7 @@ import glob import os import re +from re import compile as Re import shutil import signal import time @@ -71,7 +72,7 @@ def test_xferquota(named_port, ns1, ns2): isctest.check.rrsets_equal(ns1response.answer, ns2response.answer) query_and_compare(axfr_msg) - pattern = re.compile( + pattern = Re( f"transfer of 'changing/IN' from 10.53.0.1#{named_port}: " f"Transfer completed: .*\\(serial 2\\)" )