mirror of
https://github.com/isc-projects/bind9.git
synced 2026-05-28 04:34:54 -04:00
Use Re() for creating regular expressions
It's a fairly common pattern to use regular expression in our tests.
Instead of using the fairly verbose re.compile(), import that function
as Re() instead to allow for more brevity in the test syntax.
(cherry picked from commit ac7127d620)
This commit is contained in:
parent
182c7c83a8
commit
631e366dcf
8 changed files with 25 additions and 20 deletions
|
|
@ -9,7 +9,7 @@
|
|||
# See the COPYRIGHT file distributed with this work for additional
|
||||
# information regarding copyright ownership.
|
||||
|
||||
import re
|
||||
from re import compile as Re
|
||||
|
||||
import pytest
|
||||
|
||||
|
|
@ -31,7 +31,7 @@ pytestmark = pytest.mark.extra_artifacts(
|
|||
@pytest.fixture(scope="module")
|
||||
def transfers_complete(servers):
|
||||
for zone in ["example", "example-aes-128", "example-aes-256", "example-chacha-20"]:
|
||||
pattern = re.compile(
|
||||
pattern = Re(
|
||||
f"transfer of '{zone}/IN' from 10.53.0.1#[0-9]+: Transfer completed"
|
||||
)
|
||||
for ns in ["ns2", "ns3", "ns4", "ns5"]:
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@
|
|||
import filecmp
|
||||
import os
|
||||
from pathlib import Path
|
||||
import re
|
||||
from re import compile as Re
|
||||
import shutil
|
||||
import subprocess
|
||||
import tempfile
|
||||
|
|
@ -53,7 +53,7 @@ else:
|
|||
|
||||
XDIST_WORKER = os.environ.get("PYTEST_XDIST_WORKER", "")
|
||||
FILE_DIR = os.path.abspath(Path(__file__).parent)
|
||||
ENV_RE = re.compile(b"([^=]+)=(.*)")
|
||||
ENV_RE = Re(b"([^=]+)=(.*)")
|
||||
PRIORITY_TESTS = [
|
||||
# Tests that are scheduled first. Speeds up parallel execution.
|
||||
"rpz/",
|
||||
|
|
@ -62,9 +62,9 @@ PRIORITY_TESTS = [
|
|||
"timeouts/",
|
||||
"upforwd/",
|
||||
]
|
||||
PRIORITY_TESTS_RE = re.compile("|".join(PRIORITY_TESTS))
|
||||
SYSTEM_TEST_NAME_RE = re.compile(f"{SYSTEM_TEST_DIR_GIT_PATH}" + r"/([^/]+)")
|
||||
SYMLINK_REPLACEMENT_RE = re.compile(r"/tests(_.*)\.py")
|
||||
PRIORITY_TESTS_RE = Re("|".join(PRIORITY_TESTS))
|
||||
SYSTEM_TEST_NAME_RE = Re(f"{SYSTEM_TEST_DIR_GIT_PATH}" + r"/([^/]+)")
|
||||
SYMLINK_REPLACEMENT_RE = Re(r"/tests(_.*)\.py")
|
||||
|
||||
# ----------------------- Global requirements ----------------------------
|
||||
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ import glob
|
|||
import os
|
||||
from pathlib import Path
|
||||
import re
|
||||
from re import compile as Re
|
||||
import time
|
||||
from typing import Dict, List, Optional, Tuple, Union
|
||||
|
||||
|
|
@ -1453,7 +1454,7 @@ def next_key_event_equals(server, zone, next_event):
|
|||
waitfor = rf".*zone {zone}.*: next key event in (?!3600$)(.*) seconds"
|
||||
|
||||
with server.watch_log_from_start() as watcher:
|
||||
watcher.wait_for_line(re.compile(waitfor))
|
||||
watcher.wait_for_line(Re(waitfor))
|
||||
|
||||
# WMM: The with code below is extracting the line the watcher was
|
||||
# waiting for. If WatchLog.wait_for_line()` returned the matched string,
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ from typing import Any, Iterator, List, Match, Optional, Pattern, TextIO, TypeVa
|
|||
import abc
|
||||
import os
|
||||
import re
|
||||
from re import compile as Re
|
||||
import time
|
||||
|
||||
|
||||
|
|
@ -213,7 +214,7 @@ class WatchLog(abc.ABC):
|
|||
if isinstance(string, Pattern):
|
||||
patterns.append(string)
|
||||
elif isinstance(string, str):
|
||||
pattern = re.compile(re.escape(string))
|
||||
pattern = Re(re.escape(string))
|
||||
patterns.append(pattern)
|
||||
else:
|
||||
raise WatchLogException(
|
||||
|
|
@ -256,13 +257,14 @@ class WatchLog(abc.ABC):
|
|||
Recommended use:
|
||||
|
||||
```python
|
||||
from re import compile as Re
|
||||
import isctest
|
||||
|
||||
def test_foo(servers):
|
||||
with servers["ns1"].watch_log_from_start() as watcher:
|
||||
watcher.wait_for_line("all zones loaded")
|
||||
|
||||
pattern = re.compile(r"next key event in ([0-9]+) seconds")
|
||||
pattern = Re(r"next key event in ([0-9]+) seconds")
|
||||
with servers["ns1"].watch_log_from_here() as watcher:
|
||||
# ... do stuff here ...
|
||||
match = watcher.wait_for_line(pattern)
|
||||
|
|
@ -321,7 +323,8 @@ class WatchLog(abc.ABC):
|
|||
>>> # Different values must be returned depending on which line is
|
||||
>>> # found in the log file.
|
||||
>>> import tempfile
|
||||
>>> patterns = [re.compile(r"bar ([0-9])"), "qux"]
|
||||
>>> from re import compile as Re
|
||||
>>> patterns = [Re(r"bar ([0-9])"), "qux"]
|
||||
>>> with tempfile.NamedTemporaryFile("w") as file:
|
||||
... print("foo bar 3", file=file, flush=True)
|
||||
... with WatchLogFromStart(file.name) as watcher:
|
||||
|
|
@ -443,7 +446,8 @@ class WatchLog(abc.ABC):
|
|||
>>> assert ret[1].group(0) == "foo"
|
||||
|
||||
>>> import tempfile
|
||||
>>> bar_pattern = re.compile('bar')
|
||||
>>> from re import compile as Re
|
||||
>>> bar_pattern = Re('bar')
|
||||
>>> patterns = ['foo', bar_pattern]
|
||||
>>> with tempfile.NamedTemporaryFile("w") as file:
|
||||
... print("bar", file=file, flush=True)
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@
|
|||
# information regarding copyright ownership.
|
||||
|
||||
import os
|
||||
import re
|
||||
from re import compile as Re
|
||||
from typing import Optional
|
||||
|
||||
from .. import log
|
||||
|
|
@ -33,7 +33,7 @@ def parse_openssl_config(path: Optional[str]):
|
|||
return
|
||||
assert os.path.isfile(path), f"{path} exists, but it's not a file"
|
||||
|
||||
regex = re.compile(r"([^=]+)=(.*)")
|
||||
regex = Re(r"([^=]+)=(.*)")
|
||||
log.debug(f"parsing openssl config: {path}")
|
||||
with open(path, "r", encoding="utf-8") as conf:
|
||||
for line in conf:
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@
|
|||
from datetime import timedelta
|
||||
import os
|
||||
import re
|
||||
from re import compile as Re
|
||||
|
||||
import pytest
|
||||
|
||||
|
|
@ -103,9 +104,7 @@ def check_no_dnssec_in_journal(server, zone):
|
|||
]
|
||||
|
||||
cmd = isctest.run.cmd(journalprint)
|
||||
pattern = re.compile(
|
||||
r"^\s*(?:\S+\s+){4}(NSEC|NSEC3|NSEC3PARAM|RRSIG)", flags=re.MULTILINE
|
||||
)
|
||||
pattern = Re(r"^\s*(?:\S+\s+){4}(NSEC|NSEC3|NSEC3PARAM|RRSIG)", flags=re.MULTILINE)
|
||||
match = pattern.search(cmd.out)
|
||||
assert not match, f"{match.group(1)} record found in journal"
|
||||
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@
|
|||
# See the COPYRIGHT file distributed with this work for additional
|
||||
# information regarding copyright ownership.
|
||||
|
||||
import re
|
||||
from re import compile as Re
|
||||
|
||||
import isctest
|
||||
|
||||
|
|
@ -32,7 +32,7 @@ def wait_for_initial_xfrin(ns):
|
|||
|
||||
|
||||
def wait_for_sending_notify(ns1, ns, key_name):
|
||||
pattern = re.compile(
|
||||
pattern = Re(
|
||||
f"zone test/IN: sending notify to {ns.ip}#[0-9]+ : TSIG \\({key_name}\\)"
|
||||
)
|
||||
with ns1.watch_log_from_start() as watcher:
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@
|
|||
import glob
|
||||
import os
|
||||
import re
|
||||
from re import compile as Re
|
||||
import shutil
|
||||
import signal
|
||||
import time
|
||||
|
|
@ -71,7 +72,7 @@ def test_xferquota(named_port, ns1, ns2):
|
|||
isctest.check.rrsets_equal(ns1response.answer, ns2response.answer)
|
||||
|
||||
query_and_compare(axfr_msg)
|
||||
pattern = re.compile(
|
||||
pattern = Re(
|
||||
f"transfer of 'changing/IN' from 10.53.0.1#{named_port}: "
|
||||
f"Transfer completed: .*\\(serial 2\\)"
|
||||
)
|
||||
|
|
|
|||
Loading…
Reference in a new issue