[9.20] new: test: Regex support for logs and cmd output in pytest

Improve and unify the handling of regular expressions when searching in logs, files and command output in system tests.
- Use `Re()` for constructing regular expressions, which is an imported shorthand for `re.compile()` (imported as `from re import compile as Re`
- Add new `isctest.text.Text` interface which is a text wrapper that supports the `in` operator for line matching operation for both strings and regular expressions, e.g.:
  - `assert "running" in ns1.log`
  - `assert Re("a.example..*10.0.0.1") in response.out`
- Use the new `isctest.text.Text` for:
  - `isctest.run.cmd()` output, where `.out` and `.err` can be used for stdout and stderr contents
  - `NamedInstance.log` rather than the previous log interface (`.expect()` and `.prohibit()` is no longer available or needed. The `in` operator along with an `assert` statement can be used now instead.)
  - `NamedInstance.rndc()` output, which returns identical output as `isctest.run.cmd()`

Backport of MR !11054

Merge branch 'backport-nicki/pytest-grep-9.20' into 'bind-9.20'

See merge request isc-projects/bind9!11342
This commit is contained in:
Nicki Křížek 2025-12-08 17:55:54 +01:00
commit 50900e2490
44 changed files with 801 additions and 894 deletions

View file

@ -720,7 +720,7 @@ vulture:
<<: *python_triggering_rules
needs: []
script:
- vulture --exclude "*ans.py,conftest.py,isctest" --ignore-names "after_servers_start,bootstrap,pytestmark" bin/tests/system/
- vulture --exclude "*ans.py,conftest.py,re_compile_checker.py,isctest" --ignore-names "after_servers_start,bootstrap,pytestmark" bin/tests/system/
ci-variables:
<<: *precheck_job
@ -811,7 +811,7 @@ pylint:
script:
- pylint --rcfile $CI_PROJECT_DIR/.pylintrc $(git ls-files '*.py' | grep -vE '(ans\.py|dangerfile\.py|^bin/tests/system/|^contrib/)')
# Ignore Pylint wrong-import-position error in system test to enable use of pytest.importorskip
- pylint --rcfile $CI_PROJECT_DIR/.pylintrc --disable=wrong-import-position $(git ls-files 'bin/tests/system/*.py' | grep -vE '(ans\.py|vulture_ignore_list\.py)')
- pylint --rcfile $CI_PROJECT_DIR/.pylintrc --load-plugins re_compile_checker --disable=wrong-import-position $(git ls-files 'bin/tests/system/*.py' | grep -vE '(ans\.py|vulture_ignore_list\.py)')
reuse:
<<: *precheck_job

View file

@ -10,12 +10,12 @@
# information regarding copyright ownership.
import concurrent.futures
import os
import subprocess
import time
import pytest
import isctest
pytestmark = pytest.mark.extra_artifacts(
[
"ns*/*.nzf*",
@ -43,20 +43,19 @@ def rndc_loop(test_state, domain, ns3):
["delzone", domain],
]
args = [os.environ["RNDC"]] + ns3.rndc_args.split()
while not test_state["finished"]:
for command in rndc_commands:
ns3.rndc(" ".join(command), ignore_errors=True, log=False)
# avoid using ns3.rndc() directly to avoid log spam
subprocess.run(args + " ".join(command), timeout=10, check=False)
def check_if_server_is_responsive(ns3):
"""
Check if server status can be successfully retrieved using "rndc status"
"""
try:
ns3.rndc("status", log=False)
return True
except isctest.rndc.RNDCException:
return False
cmd = ns3.rndc("status", raise_on_exception=False)
return cmd.rc == 0
def test_rndc_deadlock(ns3):

View file

@ -38,125 +38,117 @@ def test_dnssecpolicy_keystore():
# Superfluous key file.
zone = "superfluous-keyfile.kz.example"
out = isctest.run.cmd(
cmd = isctest.run.cmd(
[CHECKCONF, "-k", "bad-superfluous-keyfile.conf"], raise_on_exception=False
)
err = out.stdout.decode("utf-8")
assert f"zone '{zone}': wrong number of key files (3, expected 2)" in err
assert f"zone '{zone}': wrong number of key files (3, expected 2)" in cmd.out
# Missing key file.
zone = "missing-keyfile.kz.example"
out = isctest.run.cmd(
cmd = isctest.run.cmd(
[CHECKCONF, "-k", "bad-missing-keyfile.conf"], raise_on_exception=False
)
err = out.stdout.decode("utf-8")
assert f"zone '{zone}': wrong number of key files (1, expected 2)" in err
assert f"zone '{zone}': wrong number of key files (1, expected 2)" in cmd.out
# Mismatch algorithm.
zone = "bad-algorithm.kz.example"
out = isctest.run.cmd(
cmd = isctest.run.cmd(
[CHECKCONF, "-k", "bad-algorithm.conf"], raise_on_exception=False
)
err = out.stdout.decode("utf-8")
keys = isctest.kasp.keydir_to_keylist(zone)
assert len(keys) == 2
assert (
f"zone '{zone}': key file '{zone}/ECDSAP256SHA256/{keys[0].tag}' does not match dnssec-policy alternative-kz"
in err
in cmd.out
)
assert (
f"zone '{zone}': key file '{zone}/ECDSAP256SHA256/{keys[1].tag}' does not match dnssec-policy alternative-kz"
in err
in cmd.out
)
assert (
f"zone '{zone}': no key file found matching dnssec-policy alternative-kz key:'ksk algorithm:RSASHA256 length:2048 tag-range:0-65535'"
in err
in cmd.out
)
assert (
f"zone '{zone}': no key file found matching dnssec-policy alternative-kz key:'zsk algorithm:RSASHA256 length:2048 tag-range:0-65535'"
in err
in cmd.out
)
# Mismatch length
zone = "bad-length.csk.example"
out = isctest.run.cmd(
cmd = isctest.run.cmd(
[CHECKCONF, "-k", "bad-length.conf"], raise_on_exception=False
)
err = out.stdout.decode("utf-8")
keys = isctest.kasp.keydir_to_keylist(zone)
assert len(keys) == 1
assert (
f"zone '{zone}': key file '{zone}/RSASHA256/{keys[0].tag}' does not match dnssec-policy alternative-csk"
in err
in cmd.out
)
assert (
f"zone '{zone}': no key file found matching dnssec-policy alternative-csk key:'csk algorithm:RSASHA256 length:2048 tag-range:0-65535'"
in err
in cmd.out
)
# Mismatch tag range
zone = "bad-tagrange.csk.example"
out = isctest.run.cmd(
cmd = isctest.run.cmd(
[CHECKCONF, "-k", "bad-tagrange.conf"], raise_on_exception=False
)
err = out.stdout.decode("utf-8")
keys = isctest.kasp.keydir_to_keylist(zone)
assert len(keys) == 1
assert (
f"zone '{zone}': key file '{zone}/ECDSAP256SHA256/{keys[0].tag}' does not match dnssec-policy tagrange-csk"
in err
in cmd.out
)
assert (
f"zone '{zone}': no key file found matching dnssec-policy tagrange-csk key:'csk algorithm:ECDSAP256SHA256 length:256 tag-range:0-32767'"
in err
in cmd.out
)
# Mismatch role
zone = "bad-role.kz.example"
out = isctest.run.cmd([CHECKCONF, "-k", "bad-role.conf"], raise_on_exception=False)
err = out.stdout.decode("utf-8")
cmd = isctest.run.cmd([CHECKCONF, "-k", "bad-role.conf"], raise_on_exception=False)
keys = isctest.kasp.keydir_to_keylist(zone)
assert len(keys) == 2
assert (
f"zone '{zone}': no key file found matching dnssec-policy default-kz key:'zsk algorithm:ECDSAP256SHA256 length:256 tag-range:0-65535'"
in err
in cmd.out
)
# Mismatch algorithm (default policy)
zone = "bad-default-algorithm.example"
out = isctest.run.cmd(
cmd = isctest.run.cmd(
[CHECKCONF, "-k", "bad-default-algorithm.conf"], raise_on_exception=False
)
err = out.stdout.decode("utf-8")
keys = isctest.kasp.keydir_to_keylist(zone)
assert len(keys) == 1
assert (
f"zone '{zone}': key file '{zone}/RSASHA256/{keys[0].tag}' does not match dnssec-policy default"
in err
in cmd.out
)
assert (
f"zone '{zone}': no key file found matching dnssec-policy default key:'csk algorithm:ECDSAP256SHA256 length:256 tag-range:0-65535'"
in err
in cmd.out
)
# Mismatch role (default policy)
zone = "bad-default-kz.example"
out = isctest.run.cmd(
cmd = isctest.run.cmd(
[CHECKCONF, "-k", "bad-default-kz.conf"], raise_on_exception=False
)
err = out.stdout.decode("utf-8")
keys = isctest.kasp.keydir_to_keylist(zone)
assert len(keys) == 2
assert (
f"zone '{zone}': key file '{zone}/ECDSAP256SHA256/{keys[0].tag}' does not match dnssec-policy default"
in err
in cmd.out
)
assert (
f"zone '{zone}': key file '{zone}/ECDSAP256SHA256/{keys[1].tag}' does not match dnssec-policy default"
in err
in cmd.out
)
assert (
f"zone '{zone}': no key file found matching dnssec-policy default key:'csk algorithm:ECDSAP256SHA256 length:256 tag-range:0-65535'"
in err
in cmd.out
)
assert f"zone '{zone}': wrong number of key files (2, expected 1)" in err
assert f"zone '{zone}': wrong number of key files (2, expected 1)" in cmd.out

View file

@ -102,10 +102,10 @@ def verify_zone(zone, transfer):
verifier = isctest.run.cmd(verify_cmd)
if verifier.returncode != 0:
if verifier.rc != 0:
isctest.log.error(f"dnssec-verify {zone} failed")
return verifier.returncode == 0
return verifier.rc == 0
def read_statefile(server, zone):
@ -189,33 +189,6 @@ def keystate_check(server, zone, key):
assert val != 0
def rekey(zone):
rndc = os.getenv("RNDC")
assert rndc is not None
port = os.getenv("CONTROLPORT")
assert port is not None
# rndc loadkeys.
rndc_cmd = [
rndc,
"-c",
"../_common/rndc.conf",
"-p",
port,
"-s",
"10.53.0.9",
"loadkeys",
zone,
]
controller = isctest.run.cmd(rndc_cmd)
if controller.returncode != 0:
isctest.log.error(f"rndc loadkeys {zone} failed")
assert controller.returncode == 0
class CheckDSTest(NamedTuple):
zone: str
logs_to_wait_for: Tuple[str]
@ -472,7 +445,7 @@ def test_checkds(ns2, ns9, params):
for log_string in params.logs_to_wait_for:
line = f"zone {params.zone}/IN (signed): checkds: {log_string}"
while line not in ns9.log:
rekey(params.zone)
ns9.rndc(f"loadkeys {params.zone}")
time_remaining -= 1
assert time_remaining, f'Timed out waiting for "{log_string}" to be logged'
time.sleep(1)

View file

@ -9,7 +9,7 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import re
from re import compile as Re
import pytest
@ -31,7 +31,7 @@ pytestmark = pytest.mark.extra_artifacts(
@pytest.fixture(scope="module")
def transfers_complete(servers):
for zone in ["example", "example-aes-128", "example-aes-256", "example-chacha-20"]:
pattern = re.compile(
pattern = Re(
f"transfer of '{zone}/IN' from 10.53.0.1#[0-9]+: Transfer completed"
)
for ns in ["ns2", "ns3", "ns4", "ns5"]:

View file

@ -12,7 +12,7 @@
import filecmp
import os
from pathlib import Path
import re
from re import compile as Re
import shutil
import subprocess
import tempfile
@ -53,7 +53,7 @@ else:
XDIST_WORKER = os.environ.get("PYTEST_XDIST_WORKER", "")
FILE_DIR = os.path.abspath(Path(__file__).parent)
ENV_RE = re.compile(b"([^=]+)=(.*)")
ENV_RE = Re(b"([^=]+)=(.*)")
PRIORITY_TESTS = [
# Tests that are scheduled first. Speeds up parallel execution.
"rpz/",
@ -62,9 +62,9 @@ PRIORITY_TESTS = [
"timeouts/",
"upforwd/",
]
PRIORITY_TESTS_RE = re.compile("|".join(PRIORITY_TESTS))
SYSTEM_TEST_NAME_RE = re.compile(f"{SYSTEM_TEST_DIR_GIT_PATH}" + r"/([^/]+)")
SYMLINK_REPLACEMENT_RE = re.compile(r"/tests(_.*)\.py")
PRIORITY_TESTS_RE = Re("|".join(PRIORITY_TESTS))
SYSTEM_TEST_NAME_RE = Re(f"{SYSTEM_TEST_DIR_GIT_PATH}" + r"/([^/]+)")
SYMLINK_REPLACEMENT_RE = Re(r"/tests(_.*)\.py")
# ----------------------- Global requirements ----------------------------

View file

@ -33,20 +33,7 @@ pytestmark = [
]
def run_rndc(server, rndc_command):
"""
Send the specified 'rndc_command' to 'server' with a timeout of 10 seconds
"""
rndc = isctest.vars.ALL["RNDC"]
port = isctest.vars.ALL["CONTROLPORT"]
cmdline = [rndc, "-c", "../_common/rndc.conf", "-p", port, "-s", server]
cmdline.extend(rndc_command)
isctest.run.cmd(cmdline)
def test_dnstap_dispatch_socket_addresses():
def test_dnstap_dispatch_socket_addresses(ns3):
# Send some query to ns3 so that it records something in its dnstap file.
msg = isctest.query.create("mail.example.", "A")
res = isctest.query.tcp(msg, "10.53.0.2", expected_rcode=dns.rcode.NOERROR)
@ -55,13 +42,13 @@ def test_dnstap_dispatch_socket_addresses():
]
# Before continuing, roll dnstap file to ensure it is flushed to disk.
run_rndc("10.53.0.3", ["dnstap", "-roll", "1"])
ns3.rndc("dnstap -roll 1")
# Move the dnstap file aside so that it is retained for troubleshooting.
os.rename(os.path.join("ns3", "dnstap.out.0"), "dnstap.out.resolver_addresses")
# Read the contents of the dnstap file using dnstap-read.
run = isctest.run.cmd(
dnstapread = isctest.run.cmd(
[isctest.vars.ALL["DNSTAPREAD"], "dnstap.out.resolver_addresses"],
)
@ -77,7 +64,7 @@ def test_dnstap_dispatch_socket_addresses():
bad_frames = []
inspected_frames = 0
addr_regex = r"^10\.53\.0\.[0-9]+:[0-9]{1,5}$"
for line in run.stdout.decode("utf-8").splitlines():
for line in dnstapread.out.splitlines():
_, _, frame_type, addr1, _, addr2, _ = line.split(" ", 6)
# Only inspect RESOLVER_QUERY and RESOLVER_RESPONSE frames.
if frame_type not in ("RQ", "RR"):

View file

@ -25,10 +25,10 @@ def gnutls_cli_executable():
pytest.skip("gnutls-cli not found in PATH")
# Ensure gnutls-cli supports the --logfile command-line option.
output = isctest.run.cmd(
cmd = isctest.run.cmd(
[executable, "--logfile=/dev/null"], log_stderr=False, raise_on_exception=False
).stdout
if b"illegal option" in output:
)
if "illegal option" in cmd.out:
pytest.skip("gnutls-cli does not support the --logfile option")
return executable

View file

@ -13,7 +13,6 @@ from . import check
from . import instance
from . import query
from . import kasp
from . import rndc
from . import run
from . import template
from . import log

View file

@ -13,7 +13,6 @@
from typing import List, NamedTuple, Optional
import logging
import os
from pathlib import Path
import re
@ -21,10 +20,10 @@ import re
import dns.message
import dns.rcode
from .log import debug, info, LogFile, WatchLogFromStart, WatchLogFromHere
from .rndc import RNDCBinaryExecutor, RNDCException, RNDCExecutor
from .run import perl
from .log import debug, WatchLogFromStart, WatchLogFromHere
from .run import CmdResult, EnvCmd, perl
from .query import udp
from .text import TextFile
class NamedPorts(NamedTuple):
@ -56,8 +55,6 @@ class NamedInstance:
identifier: str,
num: Optional[int] = None,
ports: Optional[NamedPorts] = None,
rndc_logger: Optional[logging.Logger] = None,
rndc_executor: Optional[RNDCExecutor] = None,
) -> None:
"""
`identifier` is the name of the instance's directory
@ -70,12 +67,6 @@ class NamedInstance:
this `named` instance is listening for various types of traffic (both
DNS traffic and RNDC commands). Defaults to ports set by the test
framework.
`rndc_logger` is the `logging.Logger` to use for logging RNDC
commands sent to this `named` instance.
`rndc_executor` is an object implementing the `RNDCExecutor` interface
that is used for executing RNDC commands on this `named` instance.
"""
self.directory = Path(identifier).absolute()
if not self.directory.is_dir():
@ -87,9 +78,15 @@ class NamedInstance:
if ports is None:
ports = NamedPorts.from_env()
self.ports = ports
self.log = LogFile(os.path.join(identifier, "named.run"))
self._rndc_executor = rndc_executor or RNDCBinaryExecutor()
self._rndc_logger = rndc_logger
self.log = TextFile(os.path.join(identifier, "named.run"))
self._rndc_conf = Path("../_common/rndc.conf").absolute()
self._rndc = EnvCmd("RNDC", self.rndc_args)
@property
def rndc_args(self) -> str:
"""Base arguments for calling RNDC to control the instance."""
return f"-c {self._rndc_conf} -s {self.ip} -p {self.ports.rndc}"
@property
def ip(self) -> str:
@ -107,52 +104,16 @@ class NamedInstance:
assert num is None or num == parsed_num, "mismatched num and identifier"
return parsed_num
def rndc(self, command: str, ignore_errors: bool = False, log: bool = True) -> str:
def rndc(self, command: str, timeout=10, **kwargs) -> CmdResult:
"""
Send `command` to this named instance using RNDC. Return the server's
response.
If the RNDC command fails, an `RNDCException` is raised unless
`ignore_errors` is set to `True`.
The RNDC command will be logged to `rndc.log` (along with the server's
response) unless `log` is set to `False`.
```python
def test_foo(servers):
# Send the "status" command to ns1. An `RNDCException` will be
# raised if the RNDC command fails. This command will be logged.
response = servers["ns1"].rndc("status")
# Send the "thaw foo" command to ns2. No exception will be raised
# in case the RNDC command fails. This command will be logged
# (even if it fails).
response = servers["ns2"].rndc("thaw foo", ignore_errors=True)
# Send the "stop" command to ns3. An `RNDCException` will be
# raised if the RNDC command fails, but this command will not be
# logged (the server's response will still be returned to the
# caller, though).
response = servers["ns3"].rndc("stop", log=False)
# Send the "halt" command to ns4 in "fire & forget mode": no
# exceptions will be raised and no logging will take place (the
# server's response will still be returned to the caller, though).
response = servers["ns4"].rndc("stop", ignore_errors=True, log=False)
```
To suppress exceptions, redirect outputs, control logging change
timeout etc. use keyword arguments which are passed to
isctest.cmd.run().
"""
try:
response = self._rndc_executor.call(self.ip, self.ports.rndc, command)
if log:
self._rndc_log(command, response)
except RNDCException as exc:
response = str(exc)
if log:
self._rndc_log(command, response)
if not ignore_errors:
raise
return response
return self._rndc(command, timeout=timeout, **kwargs)
def nsupdate(
self, update_msg: dns.message.Message, expected_rcode=dns.rcode.NOERROR
@ -198,31 +159,15 @@ class NamedInstance:
"""
return WatchLogFromHere(self.log.path, timeout)
def reconfigure(self, **kwargs) -> None:
def reconfigure(self, **kwargs) -> CmdResult:
"""
Reconfigure this named `instance` and wait until reconfiguration is
finished. Raise an `RNDCException` if reconfiguration fails.
finished.
"""
with self.watch_log_from_here() as watcher:
self.rndc("reconfig", **kwargs)
cmd = self.rndc("reconfig", **kwargs)
watcher.wait_for_line("any newly configured zones are now loaded")
def _rndc_log(self, command: str, response: str) -> None:
"""
Log an `rndc` invocation (and its output) to the `rndc.log` file in the
current working directory.
"""
fmt = '%(ip)s: "%(command)s"\n%(separator)s\n%(response)s%(separator)s'
args = {
"ip": self.ip,
"command": command,
"separator": "-" * 80,
"response": response,
}
if self._rndc_logger is None:
info(fmt, args)
else:
self._rndc_logger.info(fmt, args)
return cmd
def stop(self, args: Optional[List[str]] = None) -> None:
"""Stop the instance."""
@ -239,3 +184,6 @@ class NamedInstance:
f"{os.environ['srcdir']}/start.pl",
[self.system_test_name, self.identifier] + args,
)
def __repr__(self):
return self.identifier

View file

@ -15,7 +15,7 @@ import glob
import os
from pathlib import Path
import re
import subprocess
from re import compile as Re
import time
from typing import Dict, List, Optional, Tuple, Union
@ -518,8 +518,8 @@ class Key:
str(self.keyfile),
]
out = isctest.run.cmd(dsfromkey_command)
dsfromkey = out.stdout.decode("utf-8").split()
cmd = isctest.run.cmd(dsfromkey_command)
dsfromkey = cmd.out.split()
rdata_fromfile = " ".join(dsfromkey[:7])
rdata_fromwire = " ".join(cds[:7])
@ -822,18 +822,14 @@ def check_dnssec_verify(server, zone, tsig=None):
file.write(rr.to_text())
file.write("\n")
try:
verify_command = [os.environ.get("VERIFY"), "-z", "-o", zone, zonefile]
verified = isctest.run.cmd(verify_command)
except subprocess.CalledProcessError:
pass
if verified:
break
verify_command = [os.environ.get("VERIFY"), "-z", "-o", zone, zonefile]
verified = isctest.run.cmd(verify_command, raise_on_exception=False)
if verified.rc == 0:
return
time.sleep(1)
assert verified
assert False, "zone not verified"
def check_dnssecstatus(server, zone, keys, policy=None, view=None):
@ -842,19 +838,19 @@ def check_dnssecstatus(server, zone, keys, policy=None, view=None):
# policy name is returned, and if all expected keys are listed.
response = ""
if view is None:
response = server.rndc(f"dnssec -status {zone}", log=False)
response = server.rndc(f"dnssec -status {zone}")
else:
response = server.rndc(f"dnssec -status {zone} in {view}", log=False)
response = server.rndc(f"dnssec -status {zone} in {view}")
if policy is None:
assert "Zone does not have dnssec-policy" in response
assert "Zone does not have dnssec-policy" in response.out
return
assert f"dnssec-policy: {policy}" in response
assert f"dnssec-policy: {policy}" in response.out
for key in keys:
if not key.external:
assert f"key: {key.tag}" in response
assert f"key: {key.tag}" in response.out
def _check_signatures(
@ -1082,9 +1078,8 @@ def check_cdslog(server, zone, key, substr):
def check_cdslog_prohibit(server, zone, key, substr):
server.log.prohibit(
f"{substr} for key {zone}/{key.algorithm.name}/{key.tag} is now published"
)
msg = f"{substr} for key {zone}/{key.algorithm.name}/{key.tag} is now published"
assert msg not in server.log
def check_cdsdelete(rrset, expected):
@ -1458,7 +1453,7 @@ def next_key_event_equals(server, zone, next_event):
waitfor = rf".*zone {zone}.*: next key event in (?!3600$)(.*) seconds"
with server.watch_log_from_start() as watcher:
watcher.wait_for_line(re.compile(waitfor))
watcher.wait_for_line(Re(waitfor))
# WMM: The with code below is extracting the line the watcher was
# waiting for. If WatchLog.wait_for_line()` returned the matched string,

View file

@ -23,4 +23,4 @@ from .basic import (
critical,
)
from .watchlog import LogFile, WatchLogFromStart, WatchLogFromHere
from .watchlog import WatchLogFromStart, WatchLogFromHere

View file

@ -9,15 +9,15 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from typing import Any, Iterator, List, Match, Optional, Pattern, TextIO, TypeVar, Union
from typing import Any, List, Match, Optional, Pattern, TextIO, TypeVar, Union
import abc
import os
import re
import time
from isctest.text import compile_pattern, FlexPattern, LineReader
FlexPattern = Union[str, Pattern]
T = TypeVar("T")
OneOrMore = Union[T, List[T]]
@ -30,128 +30,6 @@ class WatchLogTimeout(WatchLogException):
pass
class LogFile:
"""
Log file wrapper with a path and means to find a string in its contents.
"""
def __init__(self, path: str):
self.path = path
@property
def _lines(self) -> Iterator[str]:
with open(self.path, encoding="utf-8") as f:
yield from f
def __contains__(self, substring: str) -> bool:
"""
Return whether any of the lines in the log contains a given string.
"""
for line in self._lines:
if substring in line:
return True
return False
def expect(self, msg: str):
"""Check the string is present anywhere in the log file."""
if msg in self:
return
assert False, f"log message not found in log {self.path}: {msg}"
def prohibit(self, msg: str):
"""Check the string is not present in the entire log file."""
if msg in self:
assert False, f"forbidden message appeared in log {self.path}: {msg}"
class LineReader:
"""
>>> import io
>>> file = io.StringIO("complete line\\n")
>>> line_reader = LineReader(file)
>>> for line in line_reader.readlines():
... print(line.strip())
complete line
>>> file = io.StringIO("complete line\\nand then incomplete line")
>>> line_reader = LineReader(file)
>>> for line in line_reader.readlines():
... print(line.strip())
complete line
>>> file = io.StringIO("complete line\\nand then another complete line\\n")
>>> line_reader = LineReader(file)
>>> for line in line_reader.readlines():
... print(line.strip())
complete line
and then another complete line
>>> file = io.StringIO()
>>> line_reader = LineReader(file)
>>> for chunk in (
... "first line\\nsecond line\\nthi",
... "rd ",
... "line\\nfour",
... "th line\\n\\nfifth line\\n"
... ):
... print("=== OUTER ITERATION ===")
... pos = file.tell()
... print(chunk, end="", file=file)
... _ = file.seek(pos)
... for line in line_reader.readlines():
... print("--- inner iteration ---")
... print(line.strip() or "<blank>")
=== OUTER ITERATION ===
--- inner iteration ---
first line
--- inner iteration ---
second line
=== OUTER ITERATION ===
=== OUTER ITERATION ===
--- inner iteration ---
third line
=== OUTER ITERATION ===
--- inner iteration ---
fourth line
--- inner iteration ---
<blank>
--- inner iteration ---
fifth line
"""
def __init__(self, stream: TextIO):
self._stream = stream
self._linebuf = ""
def readline(self) -> Optional[str]:
"""
Wrapper around io.readline() function to handle unfinished lines.
If a line ends with newline character, it's returned immediately.
If a line doesn't end with a newline character, the read contents are
buffered until the next call of this function and None is returned
instead.
"""
read = self._stream.readline()
if not read.endswith("\n"):
self._linebuf += read
return None
read = self._linebuf + read
self._linebuf = ""
return read
def readlines(self) -> Iterator[str]:
"""
Wrapper around io.readline() which only returns finished lines.
"""
while True:
line = self.readline()
if line is None:
return
yield line
class WatchLog(abc.ABC):
"""
Wait for a log message to appear in a text file.
@ -210,15 +88,7 @@ class WatchLog(abc.ABC):
if not isinstance(strings, list):
strings = [strings]
for string in strings:
if isinstance(string, Pattern):
patterns.append(string)
elif isinstance(string, str):
pattern = re.compile(re.escape(string))
patterns.append(pattern)
else:
raise WatchLogException(
"only string and re.Pattern allowed for matching"
)
patterns.append(compile_pattern(string))
return patterns
def _wait_for_match(self, regexes: List[Pattern]) -> Match:
@ -256,13 +126,14 @@ class WatchLog(abc.ABC):
Recommended use:
```python
from re import compile as Re
import isctest
def test_foo(servers):
with servers["ns1"].watch_log_from_start() as watcher:
watcher.wait_for_line("all zones loaded")
pattern = re.compile(r"next key event in ([0-9]+) seconds")
pattern = Re(r"next key event in ([0-9]+) seconds")
with servers["ns1"].watch_log_from_here() as watcher:
# ... do stuff here ...
match = watcher.wait_for_line(pattern)
@ -321,7 +192,8 @@ class WatchLog(abc.ABC):
>>> # Different values must be returned depending on which line is
>>> # found in the log file.
>>> import tempfile
>>> patterns = [re.compile(r"bar ([0-9])"), "qux"]
>>> from re import compile as Re
>>> patterns = [Re(r"bar ([0-9])"), "qux"]
>>> with tempfile.NamedTemporaryFile("w") as file:
... print("foo bar 3", file=file, flush=True)
... with WatchLogFromStart(file.name) as watcher:
@ -443,7 +315,8 @@ class WatchLog(abc.ABC):
>>> assert ret[1].group(0) == "foo"
>>> import tempfile
>>> bar_pattern = re.compile('bar')
>>> from re import compile as Re
>>> bar_pattern = Re('bar')
>>> patterns = ['foo', bar_pattern]
>>> with tempfile.NamedTemporaryFile("w") as file:
... print("bar", file=file, flush=True)

View file

@ -1,69 +0,0 @@
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import abc
import os
import subprocess
class RNDCExecutor(abc.ABC):
"""
An interface which RNDC executors have to implement in order for the
`NamedInstance` class to be able to use them.
"""
@abc.abstractmethod
def call(self, ip: str, port: int, command: str) -> str:
"""
Send RNDC `command` to the `named` instance at `ip:port` and return the
server's response.
"""
class RNDCException(Exception):
"""
Raised by classes implementing the `RNDCExecutor` interface when sending an
RNDC command fails for any reason.
"""
class RNDCBinaryExecutor(RNDCExecutor):
"""
An `RNDCExecutor` which sends RNDC commands to servers using the `rndc`
binary.
"""
def __init__(self) -> None:
"""
This class needs the `RNDC` environment variable to be set to the path
to the `rndc` binary to use.
"""
rndc_path = os.environ.get("RNDC", "/bin/false")
rndc_conf = os.path.join("..", "_common", "rndc.conf")
self._base_cmdline = [rndc_path, "-c", rndc_conf]
def call(self, ip: str, port: int, command: str) -> str:
"""
Send RNDC `command` to the `named` instance at `ip:port` and return the
server's response.
"""
cmdline = self._base_cmdline[:]
cmdline.extend(["-s", ip])
cmdline.extend(["-p", str(port)])
cmdline.extend(command.split())
try:
return subprocess.check_output(
cmdline, stderr=subprocess.STDOUT, timeout=10, encoding="utf-8"
)
except subprocess.SubprocessError as exc:
msg = getattr(exc, "output", "RNDC exception occurred")
raise RNDCException(msg) from exc

View file

@ -16,6 +16,19 @@ import time
from typing import List, Optional
import isctest.log
import isctest.text
class CmdResult:
def __init__(self, proc=None):
self.proc = proc
self.rc = self.proc.returncode
self.out = isctest.text.Text("")
self.err = isctest.text.Text("")
if self.proc.stdout:
self.out = isctest.text.Text(self.proc.stdout.decode("utf-8"))
if self.proc.stderr:
self.err = isctest.text.Text(self.proc.stderr.decode("utf-8"))
def cmd(
@ -29,7 +42,7 @@ def cmd(
input_text: Optional[bytes] = None,
raise_on_exception=True,
env: Optional[dict] = None,
):
) -> CmdResult:
"""Execute a command with given args as subprocess."""
isctest.log.debug(f"isctest.run.cmd(): {' '.join(args)}")
@ -59,13 +72,26 @@ def cmd(
env=env,
)
print_debug_logs(proc)
return proc
return CmdResult(proc)
except subprocess.CalledProcessError as exc:
print_debug_logs(exc)
isctest.log.debug(f"isctest.run.cmd(): (return code) {exc.returncode}")
if raise_on_exception:
raise exc
return exc
return CmdResult(exc)
class EnvCmd:
"""Helper for executing binaries from env with optional base parameters."""
def __init__(self, name: str, base_params: str = ""):
self.bin_path = os.environ[name]
self.base_params = base_params.split()
def __call__(self, params: str, **kwargs) -> CmdResult:
"""Call the command. Keyword arguments from isctest.run.cmd() are supported."""
args = self.base_params + params.split()
return cmd([self.bin_path] + args, **kwargs)
def _run_script(
@ -103,17 +129,6 @@ def _run_script(
isctest.log.debug(" exited with %d", returncode)
class Dig:
def __init__(self, base_params: str = ""):
self.base_params = base_params
def __call__(self, params: str) -> str:
"""Run the dig command with the given parameters and return the decoded output."""
return cmd(
[os.environ.get("DIG")] + f"{self.base_params} {params}".split(),
).stdout.decode("utf-8")
def shell(script: str, args: Optional[List[str]] = None) -> None:
"""Run a given script with system's shell interpreter."""
_run_script(os.environ["SHELL"], script, args)

View file

@ -0,0 +1,178 @@
#!/usr/bin/python3
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import abc
import re
from re import compile as Re
from typing import Iterator, List, Match, Optional, Pattern, TextIO, Union
FlexPattern = Union[str, Pattern]
def compile_pattern(string: FlexPattern) -> Pattern:
if isinstance(string, Pattern):
return string
if isinstance(string, str):
return Re(re.escape(string))
raise TypeError("only string and re.Pattern allowed")
class Grep(abc.ABC):
"""
Implement a grep-like interface for pattern matching in texts and files.
"""
@abc.abstractmethod
def readlines(self) -> Iterator[str]:
raise NotImplementedError
def igrep(self, pattern: FlexPattern) -> Iterator[Match]:
"""
Iterate over the lines matching the pattern.
"""
regex = compile_pattern(pattern)
for line in self.readlines():
match = regex.search(line)
if match:
yield match
def grep(self, pattern: FlexPattern) -> List[Match]:
"""
Get list of lines matching the pattern.
"""
return list(self.igrep(pattern))
def __contains__(self, pattern: FlexPattern) -> bool:
"""
Return whether any of the lines in the log contains matches the pattern.
"""
try:
next(self.igrep(pattern))
except StopIteration:
return False
return True
class Text(Grep, str): # type: ignore
"""
Wrapper around classic string with grep support.
"""
def readlines(self):
yield from self.splitlines(keepends=True)
class TextFile(Grep):
"""
Text file wrapper with grep support.
"""
def __init__(self, path: str):
self.path = path
def readlines(self) -> Iterator[str]:
with open(self.path, encoding="utf-8") as f:
yield from f
def __repr__(self):
return self.path
class LineReader(Grep):
"""
>>> import io
>>> file = io.StringIO("complete line\\n")
>>> line_reader = LineReader(file)
>>> for line in line_reader.readlines():
... print(line.strip())
complete line
>>> file = io.StringIO("complete line\\nand then incomplete line")
>>> line_reader = LineReader(file)
>>> for line in line_reader.readlines():
... print(line.strip())
complete line
>>> file = io.StringIO("complete line\\nand then another complete line\\n")
>>> line_reader = LineReader(file)
>>> for line in line_reader.readlines():
... print(line.strip())
complete line
and then another complete line
>>> file = io.StringIO()
>>> line_reader = LineReader(file)
>>> for chunk in (
... "first line\\nsecond line\\nthi",
... "rd ",
... "line\\nfour",
... "th line\\n\\nfifth line\\n"
... ):
... print("=== OUTER ITERATION ===")
... pos = file.tell()
... print(chunk, end="", file=file)
... _ = file.seek(pos)
... for line in line_reader.readlines():
... print("--- inner iteration ---")
... print(line.strip() or "<blank>")
=== OUTER ITERATION ===
--- inner iteration ---
first line
--- inner iteration ---
second line
=== OUTER ITERATION ===
=== OUTER ITERATION ===
--- inner iteration ---
third line
=== OUTER ITERATION ===
--- inner iteration ---
fourth line
--- inner iteration ---
<blank>
--- inner iteration ---
fifth line
"""
def __init__(self, stream: TextIO):
self._stream = stream
self._linebuf = ""
def readline(self) -> Optional[str]:
"""
Wrapper around io.readline() function to handle unfinished lines.
If a line ends with newline character, it's returned immediately.
If a line doesn't end with a newline character, the read contents are
buffered until the next call of this function and None is returned
instead.
"""
read = self._stream.readline()
if not read.endswith("\n"):
self._linebuf += read
return None
read = self._linebuf + read
self._linebuf = ""
return read
def readlines(self) -> Iterator[str]:
"""
Wrapper around io.readline() which only returns finished lines.
"""
while True:
line = self.readline()
if line is None:
return
yield line

View file

@ -10,7 +10,7 @@
# information regarding copyright ownership.
import os
import re
from re import compile as Re
from typing import Optional
from .. import log
@ -33,7 +33,7 @@ def parse_openssl_config(path: Optional[str]):
return
assert os.path.isfile(path), f"{path} exists, but it's not a file"
regex = re.compile(r"([^=]+)=(.*)")
regex = Re(r"([^=]+)=(.*)")
log.debug(f"parsing openssl config: {path}")
with open(path, "r", encoding="utf-8") as conf:
for line in conf:

View file

@ -11,6 +11,7 @@
import os
import shutil
import subprocess
import time
from datetime import timedelta
@ -200,7 +201,7 @@ def cb_ixfr_is_signed(expected_updates, params, ksks=None, zsks=None):
f"expected updates {expected_updates} policy {policy} ksks {ksks} zsks {zsks}"
)
shutil.copyfile(f"ns2/{zone}.db.in2", f"ns2/{zone}.db")
servers["ns2"].rndc(f"reload {zone}", log=False)
servers["ns2"].rndc(f"reload {zone}")
def update_is_signed():
parts = update.split()
@ -314,7 +315,7 @@ def cb_remove_keyfiles(params, ksks=None, zsks=None):
os.remove(k.statefile)
with servers["ns3"].watch_log_from_here() as watcher:
servers["ns3"].rndc(f"loadkeys {zone}", log=False)
servers["ns3"].rndc(f"loadkeys {zone}")
watcher.wait_for_line(
f"zone {zone}/IN (signed): zone_rekey:zone_verifykeys failed: some key files are missing"
)
@ -806,9 +807,9 @@ def test_kasp_inherit_view(number, dynamic, inline_signing, txt_rdata, ns4):
isctest.kasp.check_dnssecstatus(ns4, zone, keys, policy=policy, view=view)
isctest.kasp.check_apex(ns4, zone, keys, [], tsig=tsig)
# check zonestatus
response = ns4.rndc(f"zonestatus {zone} in {view}", log=False)
assert f"dynamic: {dynamic}" in response
assert f"inline signing: {inline_signing}" in response
response = ns4.rndc(f"zonestatus {zone} in {view}")
assert f"dynamic: {dynamic}" in response.out
assert f"inline signing: {inline_signing}" in response.out
# check subdomain
fqdn = f"{zone}."
qname = f"view.{zone}."
@ -869,7 +870,7 @@ def test_kasp_default(ns3):
state_stat = os.stat(key.statefile)
with ns3.watch_log_from_here() as watcher:
ns3.rndc(f"loadkeys {zone}", log=False)
ns3.rndc(f"loadkeys {zone}")
watcher.wait_for_line(f"keymgr: {zone} done")
assert privkey_stat.st_mtime == os.stat(key.privatefile).st_mtime
@ -878,7 +879,7 @@ def test_kasp_default(ns3):
# again
with ns3.watch_log_from_here() as watcher:
ns3.rndc(f"loadkeys {zone}", log=False)
ns3.rndc(f"loadkeys {zone}")
watcher.wait_for_line(f"keymgr: {zone} done")
assert privkey_stat.st_mtime == os.stat(key.privatefile).st_mtime
@ -888,7 +889,7 @@ def test_kasp_default(ns3):
# modify unsigned zone file and check that new record is signed.
isctest.log.info("check that an updated zone signs the new record")
shutil.copyfile("ns3/template2.db.in", f"ns3/{zone}.db")
ns3.rndc(f"reload {zone}", log=False)
ns3.rndc(f"reload {zone}")
def update_is_signed():
parts = update.split()
@ -909,7 +910,7 @@ def test_kasp_default(ns3):
shutil.move(f"{key.privatefile}", f"{key.path}.offline")
expectmsg = "zone_rekey:zone_verifykeys failed: some key files are missing"
with ns3.watch_log_from_here() as watcher:
ns3.rndc(f"loadkeys {zone}", log=False)
ns3.rndc(f"loadkeys {zone}")
watcher.wait_for_line(f"zone {zone}/IN (signed): {expectmsg}")
# Nothing has changed.
expected[0].private = False # noqa
@ -986,7 +987,7 @@ def test_kasp_dynamic(ns3):
# Update zone with freeze/thaw.
isctest.log.info("check dynamic zone is updated and signed after freeze and thaw")
with ns3.watch_log_from_here() as watcher:
ns3.rndc(f"freeze {zone}", log=False)
ns3.rndc(f"freeze {zone}")
watcher.wait_for_line(f"freezing zone '{zone}/IN': success")
time.sleep(1)
@ -995,7 +996,7 @@ def test_kasp_dynamic(ns3):
time.sleep(1)
with ns3.watch_log_from_here() as watcher:
ns3.rndc(f"thaw {zone}", log=False)
ns3.rndc(f"thaw {zone}")
watcher.wait_for_line(f"thawing zone '{zone}/IN': success")
expected_updates = [f"a.{zone}. A 10.0.0.1", f"d.{zone}. A 10.0.0.44"]
@ -1024,7 +1025,7 @@ def test_kasp_dynamic(ns3):
"check dynamic inline-signed zone is updated and signed after freeze and thaw"
)
with ns3.watch_log_from_here() as watcher:
ns3.rndc(f"freeze {zone}", log=False)
ns3.rndc(f"freeze {zone}")
watcher.wait_for_line(f"freezing zone '{zone}/IN': success")
time.sleep(1)
@ -1032,7 +1033,7 @@ def test_kasp_dynamic(ns3):
time.sleep(1)
with ns3.watch_log_from_here() as watcher:
ns3.rndc(f"thaw {zone}", log=False)
ns3.rndc(f"thaw {zone}")
watcher.wait_for_line(f"thawing zone '{zone}/IN': success")
expected_updates = [f"a.{zone}. A 10.0.0.11", f"d.{zone}. A 10.0.0.44"]
@ -1089,7 +1090,7 @@ def test_kasp_checkds(ns3):
ksk = ksks[0]
isctest.log.info("check if checkds -publish correctly sets DSPublish")
ns3.rndc(f"dnssec -checkds -when {now} published {zone}", log=False)
ns3.rndc(f"dnssec -checkds -when {now} published {zone}")
metadata = f"DSPublish: {now}"
isctest.run.retry_with_timeout(wait_for_metadata, timeout=3)
expected[0].metadata["DSState"] = "rumoured"
@ -1097,7 +1098,7 @@ def test_kasp_checkds(ns3):
isctest.kasp.check_keys(zone, keys, expected)
isctest.log.info("check if checkds -withdrawn correctly sets DSRemoved")
ns3.rndc(f"dnssec -checkds -when {now} withdrawn {zone}", log=False)
ns3.rndc(f"dnssec -checkds -when {now} withdrawn {zone}")
metadata = f"DSRemoved: {now}"
isctest.run.retry_with_timeout(wait_for_metadata, timeout=3)
expected[0].metadata["DSState"] = "unretentive"
@ -1137,8 +1138,8 @@ def test_kasp_checkds_doubleksk(ns3):
isctest.log.info("check invalid checkds commands")
def check_error():
response = ns3.rndc(test["command"], log=False)
assert test["error"] in response
response = ns3.rndc(test["command"], stderr=subprocess.STDOUT)
assert test["error"] in response.out
test_cases = [
{
@ -1162,7 +1163,7 @@ def test_kasp_checkds_doubleksk(ns3):
check_error()
isctest.log.info("check if checkds -publish -key correctly sets DSPublish")
ns3.rndc(f"dnssec -checkds -when {now} -key {ksk.tag} published {zone}", log=False)
ns3.rndc(f"dnssec -checkds -when {now} -key {ksk.tag} published {zone}")
metadata = f"DSPublish: {now}"
isctest.run.retry_with_timeout(wait_for_metadata, timeout=3)
expected[0].metadata["DSState"] = "rumoured"
@ -1171,7 +1172,7 @@ def test_kasp_checkds_doubleksk(ns3):
isctest.log.info("check if checkds -withdrawn -key correctly sets DSRemoved")
ksk = ksks[1]
ns3.rndc(f"dnssec -checkds -when {now} -key {ksk.tag} withdrawn {zone}", log=False)
ns3.rndc(f"dnssec -checkds -when {now} -key {ksk.tag} withdrawn {zone}")
metadata = f"DSRemoved: {now}"
isctest.run.retry_with_timeout(wait_for_metadata, timeout=3)
expected[1].metadata["DSState"] = "unretentive"
@ -1204,7 +1205,7 @@ def test_kasp_checkds_csk(ns3):
ksk = keys[0]
isctest.log.info("check if checkds -publish csk correctly sets DSPublish")
ns3.rndc(f"dnssec -checkds -when {now} published {zone}", log=False)
ns3.rndc(f"dnssec -checkds -when {now} published {zone}")
metadata = f"DSPublish: {now}"
isctest.run.retry_with_timeout(wait_for_metadata, timeout=3)
expected[0].metadata["DSState"] = "rumoured"
@ -1212,7 +1213,7 @@ def test_kasp_checkds_csk(ns3):
isctest.kasp.check_keys(zone, keys, expected)
isctest.log.info("check if checkds -withdrawn csk correctly sets DSRemoved")
ns3.rndc(f"dnssec -checkds -when {now} withdrawn {zone}", log=False)
ns3.rndc(f"dnssec -checkds -when {now} withdrawn {zone}")
metadata = f"DSRemoved: {now}"
isctest.run.retry_with_timeout(wait_for_metadata, timeout=3)
expected[0].metadata["DSState"] = "unretentive"
@ -1281,7 +1282,7 @@ def test_kasp_dnssec_keygen():
zone,
]
return isctest.run.cmd(keygen_command).stdout.decode("utf-8")
return isctest.run.cmd(keygen_command).out
isctest.log.info(
"check that 'dnssec-keygen -k' (configured policy) created valid files"
@ -1326,7 +1327,7 @@ def test_kasp_dnssec_keygen():
str(publish),
key.path,
]
out = isctest.run.cmd(settime).stdout.decode("utf-8")
isctest.run.cmd(settime)
isctest.check.file_contents_equal(f"{key.statefile}", f"{key.statefile}.backup")
assert key.get_metadata("Publish", file=key.privatefile) == str(publish)
@ -1377,7 +1378,7 @@ def test_kasp_dnssec_keygen():
str(now),
key.path,
]
out = isctest.run.cmd(settime).stdout.decode("utf-8")
isctest.run.cmd(settime)
isctest.kasp.check_keys("kasp", keys, expected)
isctest.kasp.check_keytimes(keys, expected)
@ -1414,7 +1415,7 @@ def test_kasp_dnssec_keygen():
str(now),
key.path,
]
out = isctest.run.cmd(settime).stdout.decode("utf-8")
isctest.run.cmd(settime)
isctest.kasp.check_keys("kasp", keys, expected)
isctest.kasp.check_keytimes(keys, expected)
@ -1462,7 +1463,7 @@ def test_kasp_dnssec_keygen():
str(soon),
key.path,
]
out = isctest.run.cmd(settime).stdout.decode("utf-8")
isctest.run.cmd(settime)
isctest.kasp.check_keys("kasp", keys, expected)
isctest.kasp.check_keytimes(keys, expected)
@ -1596,11 +1597,11 @@ def test_kasp_zsk_retired(ns3):
# Load again, make sure the purged key is not an issue when verifying keys.
with ns3.watch_log_from_here() as watcher:
ns3.rndc(f"loadkeys {zone}", log=False)
ns3.rndc(f"loadkeys {zone}")
watcher.wait_for_line(f"keymgr: {zone} done")
msg = f"zone {zone}/IN (signed): zone_rekey:zone_verifykeys failed: some key files are missing"
ns3.log.prohibit(msg)
assert msg not in ns3.log
def test_kasp_purge_keys(ns4):
@ -1620,14 +1621,14 @@ def test_kasp_purge_keys(ns4):
# Reconfig, make sure the purged key is not an issue when verifying keys.
shutil.copyfile("ns4/purgekeys2.conf", "ns4/purgekeys.conf")
with ns4.watch_log_from_here() as watcher:
ns4.rndc("reconfig", log=False)
ns4.rndc("reconfig")
watcher.wait_for_line(f"keymgr: {zone} done")
msg = f"zone {zone}/IN/example1 (signed): zone_rekey:zone_verifykeys failed: some key files are missing"
ns4.log.prohibit(msg)
assert msg not in ns4.log
msg = f"zone {zone}/IN/example2 (signed): zone_rekey:zone_verifykeys failed: some key files are missing"
ns4.log.prohibit(msg)
assert msg not in ns4.log
def test_kasp_reload_restart(ns6):
@ -1666,7 +1667,7 @@ def test_kasp_reload_restart(ns6):
shutil.copyfile(f"ns6/{zone}2.db.in", f"ns6/{zone}.db")
with ns6.watch_log_from_here() as watcher:
ns6.rndc("reload", log=False)
ns6.rndc("reload")
watcher.wait_for_line("all zones loaded")
newttl = 300
@ -1729,7 +1730,7 @@ def test_kasp_manual_mode(ns3):
# Key rollover should have been be blocked.
tag = expected[1].key.tag
blockmsg = f"keymgr-manual-mode: block ZSK rollover for key {zone}/ECDSAP256SHA256/{tag} (policy {policy})"
ns3.log.expect(blockmsg)
assert blockmsg in ns3.log
# Remove files.
for key in ksks + zsks:
@ -1743,7 +1744,7 @@ def test_kasp_manual_mode(ns3):
# Force step.
with ns3.watch_log_from_here() as watcher:
ns3.rndc(f"dnssec -step {zone}", log=False)
ns3.rndc(f"dnssec -step {zone}")
watcher.wait_for_line(
f"zone {zone}/IN (signed): zone_rekey:zone_verifykeys failed: some key files are missing"
)
@ -1756,7 +1757,7 @@ def test_kasp_manual_mode(ns3):
# Load keys.
with ns3.watch_log_from_here() as watcher:
ns3.rndc(f"loadkeys {zone}", log=False)
ns3.rndc(f"loadkeys {zone}")
watcher.wait_for_line(blockmsg)
# Check keys again, make sure no new keys are created.
@ -1767,7 +1768,7 @@ def test_kasp_manual_mode(ns3):
# Force step.
with ns3.watch_log_from_here() as watcher:
ns3.rndc(f"dnssec -step {zone}", log=False)
ns3.rndc(f"dnssec -step {zone}")
watcher.wait_for_line(
f"zone {zone}/IN (signed): zone_rekey done: key {tag}/ECDSAP256SHA256"
)

View file

@ -20,7 +20,7 @@ pytestmark = pytest.mark.extra_artifacts(
def test_dig_tcp_keepalive_handling(named_port, ns2):
def get_keepalive_options_received():
ns2.rndc("stats", log=False)
ns2.rndc("stats")
options_received = 0
with open("ns2/named.stats", "r", encoding="utf-8") as ns2_stats_file:
for line in ns2_stats_file:
@ -28,38 +28,41 @@ def test_dig_tcp_keepalive_handling(named_port, ns2):
options_received = line.split()[0]
return int(options_received)
dig = isctest.run.Dig(f"-p {str(named_port)}")
dig = isctest.run.EnvCmd("DIG", f"-p {str(named_port)}")
isctest.log.info("check that dig handles TCP keepalive in query")
assert "; TCP-KEEPALIVE" in dig("+qr +keepalive foo.example. @10.53.0.2")
assert "; TCP-KEEPALIVE" in dig("+qr +keepalive foo.example. @10.53.0.2").out
isctest.log.info("check that dig added TCP keepalive was received")
assert get_keepalive_options_received() == 1
isctest.log.info("check that TCP keepalive is added for TCP responses")
assert "; TCP-KEEPALIVE" in dig("+tcp +keepalive foo.example. @10.53.0.2")
assert "; TCP-KEEPALIVE" in dig("+tcp +keepalive foo.example. @10.53.0.2").out
isctest.log.info("check that TCP keepalive requires TCP")
assert "; TCP-KEEPALIVE" not in dig("+keepalive foo.example. @10.53.0.2")
assert "; TCP-KEEPALIVE" not in dig("+keepalive foo.example. @10.53.0.2").out
isctest.log.info("check the default keepalive value")
assert "; TCP-KEEPALIVE: 30.0 secs" in dig(
"+tcp +keepalive foo.example. @10.53.0.3"
assert (
"; TCP-KEEPALIVE: 30.0 secs"
in dig("+tcp +keepalive foo.example. @10.53.0.3").out
)
isctest.log.info("check a keepalive configured value")
assert "; TCP-KEEPALIVE: 15.0 secs" in dig(
"+tcp +keepalive foo.example. @10.53.0.2"
assert (
"; TCP-KEEPALIVE: 15.0 secs"
in dig("+tcp +keepalive foo.example. @10.53.0.2").out
)
isctest.log.info("check a re-configured keepalive value")
response = ns2.rndc("tcp-timeouts 300 300 300 200", log=False)
assert "tcp-initial-timeout=300" in response
assert "tcp-idle-timeout=300" in response
assert "tcp-keepalive-timeout=300" in response
assert "tcp-advertised-timeout=200" in response
assert "; TCP-KEEPALIVE: 20.0 secs" in dig(
"+tcp +keepalive foo.example. @10.53.0.2"
response = ns2.rndc("tcp-timeouts 300 300 300 200")
assert "tcp-initial-timeout=300" in response.out
assert "tcp-idle-timeout=300" in response.out
assert "tcp-keepalive-timeout=300" in response.out
assert "tcp-advertised-timeout=200" in response.out
assert (
"; TCP-KEEPALIVE: 20.0 secs"
in dig("+tcp +keepalive foo.example. @10.53.0.2").out
)
isctest.log.info("check server config entry")

View file

@ -11,7 +11,7 @@
import hashlib
import os
import re
from re import compile as Re
import shutil
import pytest
@ -74,18 +74,16 @@ def token_init_and_cleanup():
)
try:
output = isctest.run.cmd(
token_init_command, env=EMPTY_OPENSSL_CONF_ENV
).stdout.decode("utf-8")
assert "The token has been initialized and is reassigned to slot" in output
cmd = isctest.run.cmd(token_init_command, env=EMPTY_OPENSSL_CONF_ENV)
assert "The token has been initialized and is reassigned to slot" in cmd.out
yield
finally:
output = isctest.run.cmd(
cmd = isctest.run.cmd(
token_cleanup_command,
env=EMPTY_OPENSSL_CONF_ENV,
raise_on_exception=False,
).stdout.decode("utf-8")
assert re.search("Found token (.*) with matching token label", output)
)
assert Re("Found token (.*) with matching token label") in cmd.out
# pylint: disable-msg=too-many-locals
@ -125,11 +123,9 @@ def test_keyfromlabel(alg_name, alg_type, alg_bits):
HSMPIN,
]
output = isctest.run.cmd(
pkcs11_command, env=EMPTY_OPENSSL_CONF_ENV
).stdout.decode("utf-8")
cmd = isctest.run.cmd(pkcs11_command, env=EMPTY_OPENSSL_CONF_ENV)
assert "Key pair generated" in output
assert "Key pair generated" in cmd.out
def keyfromlabel(alg_name, zone, key_id, key_flag):
key_flag = key_flag.split() if key_flag else []
@ -146,12 +142,12 @@ def test_keyfromlabel(alg_name, alg_type, alg_bits):
zone,
]
output = isctest.run.cmd(keyfrlab_command)
output_decoded = output.stdout.decode("utf-8").rstrip() + ".key"
cmd = isctest.run.cmd(keyfrlab_command)
keyfile = cmd.out.rstrip() + ".key"
assert os.path.exists(output_decoded)
assert os.path.exists(keyfile)
return output_decoded
return keyfile
if f"{alg_name.upper()}_SUPPORTED" not in os.environ:
pytest.skip(f"{alg_name} is not supported")

View file

@ -85,7 +85,7 @@ def between(value, start, end):
return start < value < end
def ksr(zone, policy, action, options="", raise_on_exception=True):
def ksr(zone, policy, action, options="", raise_on_exception=True, to_file=""):
ksr_command = [
os.environ.get("KSR"),
"-l",
@ -97,8 +97,14 @@ def ksr(zone, policy, action, options="", raise_on_exception=True):
zone,
]
out = isctest.run.cmd(ksr_command, raise_on_exception=raise_on_exception)
return out.stdout.decode("utf-8"), out.stderr.decode("utf-8")
if to_file:
with open(to_file, "wb") as f:
cmd = isctest.run.cmd(
ksr_command, raise_on_exception=raise_on_exception, stdout=f
)
else:
cmd = isctest.run.cmd(ksr_command, raise_on_exception=raise_on_exception)
return cmd
def check_keys(
@ -259,8 +265,9 @@ def check_rrsig_bundle(bundle_keys, bundle_lines, zone, rrtype, sigend, sigstart
assert count == len(bundle_lines)
def check_keysigningrequest(out, zsks, start, end):
lines = out.split("\n")
def check_keysigningrequest(path, zsks, start, end):
with open(path, "r", encoding="utf-8") as f:
lines = f.readlines()
line_no = 0
inception = start
@ -302,14 +309,14 @@ def check_keysigningrequest(out, zsks, start, end):
# trailing empty lines
while line_no < len(lines):
assert lines[line_no] == ""
assert lines[line_no].rstrip() == ""
line_no += 1
assert line_no == len(lines)
def check_signedkeyresponse(
out,
path,
zone,
ksks,
zsks,
@ -319,7 +326,8 @@ def check_signedkeyresponse(
cdnskey=True,
cds="SHA-256",
):
lines = out.split("\n")
with open(path, "r", encoding="utf-8") as f:
lines = f.readlines()
line_no = 0
next_bundle = end + 1
@ -341,7 +349,7 @@ def check_signedkeyresponse(
# ignore empty lines
while line_no < len(lines):
if lines[line_no] == "":
if lines[line_no].rstrip() == "":
line_no += 1
else:
break
@ -537,32 +545,32 @@ def check_signedkeyresponse(
def test_ksr_errors():
# check that 'dnssec-ksr' errors on unknown action
_, err = ksr("common.test", "common", "foobar", raise_on_exception=False)
assert "dnssec-ksr: fatal: unknown command 'foobar'" in err
cmd = ksr("common.test", "common", "foobar", raise_on_exception=False)
assert "dnssec-ksr: fatal: unknown command 'foobar'" in cmd.err
# check that 'dnssec-ksr keygen' errors on missing end date
_, err = ksr("common.test", "common", "keygen", raise_on_exception=False)
assert "dnssec-ksr: fatal: keygen requires an end date" in err
cmd = ksr("common.test", "common", "keygen", raise_on_exception=False)
assert "dnssec-ksr: fatal: keygen requires an end date" in cmd.err
# check that 'dnssec-ksr keygen' errors on zone with csk
_, err = ksr(
cmd = ksr(
"csk.test", "csk", "keygen", options="-K ns1 -e +2y", raise_on_exception=False
)
assert "dnssec-ksr: fatal: no keys created for policy 'csk'" in err
assert "dnssec-ksr: fatal: no keys created for policy 'csk'" in cmd.err
# check that 'dnssec-ksr request' errors on missing end date
_, err = ksr("common.test", "common", "request", raise_on_exception=False)
assert "dnssec-ksr: fatal: request requires an end date" in err
cmd = ksr("common.test", "common", "request", raise_on_exception=False)
assert "dnssec-ksr: fatal: request requires an end date" in cmd.err
# check that 'dnssec-ksr sign' errors on missing ksr file
_, err = ksr(
cmd = ksr(
"common.test",
"common",
"sign",
options="-K ns1/offline -i now -e +1y",
raise_on_exception=False,
)
assert "dnssec-ksr: fatal: 'sign' requires a KSR file" in err
assert "dnssec-ksr: fatal: 'sign' requires a KSR file" in cmd.err
def test_ksr_common(ns1):
@ -573,15 +581,15 @@ def test_ksr_common(ns1):
# create ksk
kskdir = "ns1/offline"
out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i now -e +1y -o")
ksks = isctest.kasp.keystr_to_keylist(out, kskdir)
cmd = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i now -e +1y -o")
ksks = isctest.kasp.keystr_to_keylist(cmd.out, kskdir)
assert len(ksks) == 1
check_keys(ksks, None)
# check that 'dnssec-ksr keygen' pregenerates right amount of keys
out, _ = ksr(zone, policy, "keygen", options="-i now -e +1y")
zsks = isctest.kasp.keystr_to_keylist(out)
cmd = ksr(zone, policy, "keygen", options="-i now -e +1y")
zsks = isctest.kasp.keystr_to_keylist(cmd.out)
assert len(zsks) == 2
lifetime = timedelta(days=31 * 6)
@ -590,8 +598,8 @@ def test_ksr_common(ns1):
# check that 'dnssec-ksr keygen' pregenerates right amount of keys
# in the given key directory
zskdir = "ns1"
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +1y")
zsks = isctest.kasp.keystr_to_keylist(out, zskdir)
cmd = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +1y")
zsks = isctest.kasp.keystr_to_keylist(cmd.out, zskdir)
assert len(zsks) == 2
lifetime = timedelta(days=31 * 6)
@ -608,33 +616,35 @@ def test_ksr_common(ns1):
# check that 'dnssec-ksr request' creates correct ksr
now = zsks[0].get_timing("Created")
until = now + timedelta(days=365)
out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {now} -e +1y")
fname = f"{zone}.ksr.{n}"
with open(fname, "w", encoding="utf-8") as file:
file.write(out)
check_keysigningrequest(out, zsks, now, until)
ksr_fname = f"{zone}.ksr.{n}"
ksr(
zone,
policy,
"request",
options=f"-K {zskdir} -i {now} -e +1y",
to_file=ksr_fname,
)
check_keysigningrequest(ksr_fname, zsks, now, until)
# check that 'dnssec-ksr sign' creates correct skr
out, _ = ksr(
zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {now} -e +1y"
)
fname = f"{zone}.skr.{n}"
with open(fname, "w", encoding="utf-8") as file:
file.write(out)
refresh = -432000 # 5 days
check_signedkeyresponse(out, zone, ksks, zsks, now, until, refresh)
skr_fname = f"{zone}.skr.{n}"
ksr(
zone,
policy,
"sign",
options=f"-K {kskdir} -f {ksr_fname} -i {now} -e +1y",
to_file=skr_fname,
)
check_signedkeyresponse(skr_fname, zone, ksks, zsks, now, until, refresh)
# common test cases (2)
n = 2
# check that 'dnssec-ksr keygen' selects pregenerated keys for
# the same time bundle
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {now} -e +1y")
selected_zsks = isctest.kasp.keystr_to_keylist(out, zskdir)
cmd = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {now} -e +1y")
selected_zsks = isctest.kasp.keystr_to_keylist(cmd.out, zskdir)
assert len(selected_zsks) == 2
for index, key in enumerate(selected_zsks):
assert zsks[index] == key
@ -648,13 +658,13 @@ def test_ksr_common(ns1):
# check that 'dnssec-ksr keygen' generates only necessary keys for
# overlapping time bundle
out, err = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {now} -e +2y -v 1")
overlapping_zsks = isctest.kasp.keystr_to_keylist(out, zskdir)
cmd = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {now} -e +2y -v 1")
overlapping_zsks = isctest.kasp.keystr_to_keylist(cmd.out, zskdir)
assert len(overlapping_zsks) == 4
selected = len(re.findall("Selecting key pair", err))
generated = len(re.findall("Generating key pair", err)) - len(
re.findall("collide", err)
selected = len(re.findall("Selecting key pair", cmd.err))
generated = len(re.findall("Generating key pair", cmd.err)) - len(
re.findall("collide", cmd.err)
)
assert selected == 2
@ -673,8 +683,8 @@ def test_ksr_common(ns1):
)
# run 'dnssec-ksr keygen' again with verbosity 0
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {now} -e +2y")
overlapping_zsks2 = isctest.kasp.keystr_to_keylist(out, zskdir)
cmd = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {now} -e +2y")
overlapping_zsks2 = isctest.kasp.keystr_to_keylist(cmd.out, zskdir)
assert len(overlapping_zsks2) == 4
check_keys(overlapping_zsks2, lifetime)
for index, key in enumerate(overlapping_zsks2):
@ -682,47 +692,41 @@ def test_ksr_common(ns1):
# check that 'dnssec-ksr request' creates correct ksr if the
# interval is shorter
out, _ = ksr(zone, policy, "request", options=f"-K ns1 -i {now} -e +1y")
fname = f"{zone}.ksr.{n}.shorter"
with open(fname, "w", encoding="utf-8") as file:
file.write(out)
check_keysigningrequest(out, zsks, now, until)
ksr_fname = f"{zone}.ksr.{n}.shorter"
ksr(zone, policy, "request", options=f"-K ns1 -i {now} -e +1y", to_file=ksr_fname)
check_keysigningrequest(ksr_fname, zsks, now, until)
# check that 'dnssec-ksr request' creates correct ksr with new interval
out, _ = ksr(zone, policy, "request", options=f"-K ns1 -i {now} -e +2y")
fname = f"{zone}.ksr.{n}"
with open(fname, "w", encoding="utf-8") as file:
file.write(out)
until = now + timedelta(days=365 * 2)
check_keysigningrequest(out, overlapping_zsks, now, until)
ksr_fname = f"{zone}.ksr.{n}"
ksr(zone, policy, "request", options=f"-K ns1 -i {now} -e +2y", to_file=ksr_fname)
check_keysigningrequest(ksr_fname, overlapping_zsks, now, until)
# check that 'dnssec-ksr request' errors if there are not enough keys
_, err = ksr(
cmd = ksr(
zone,
policy,
"request",
options=f"-K ns1 -i {now} -e +3y",
raise_on_exception=False,
)
error = f"no {zone}/ECDSAP256SHA256 zsk key pair found for bundle"
assert f"dnssec-ksr: fatal: {error}" in err
errmsg = (
f"dnssec-ksr: fatal: no {zone}/ECDSAP256SHA256 zsk key pair found for bundle"
)
assert errmsg in cmd.err
# check that 'dnssec-ksr sign' creates correct skr
out, _ = ksr(
zone, policy, "sign", options=f"-K ns1/offline -f {fname} -i {now} -e +2y"
)
fname = f"{zone}.skr.{n}"
with open(fname, "w", encoding="utf-8") as file:
file.write(out)
refresh = -432000 # 5 days
skr_fname = f"{zone}.skr.{n}"
ksr(
zone,
policy,
"sign",
options=f"-K ns1/offline -f {ksr_fname} -i {now} -e +2y",
to_file=skr_fname,
)
check_signedkeyresponse(
out,
skr_fname,
zone,
ksks,
overlapping_zsks,
@ -736,13 +740,12 @@ def test_ksr_common(ns1):
f"addzone {zone} "
+ "{ type primary; file "
+ f'"{zone}.db"; dnssec-policy {policy}; '
+ "};",
log=False,
+ "};"
)
# import skr
shutil.copyfile(fname, f"ns1/{fname}")
ns1.rndc(f"skr -import {fname} {zone}", log=False)
shutil.copyfile(skr_fname, f"ns1/{skr_fname}")
ns1.rndc(f"skr -import {skr_fname} {zone}")
# test zone is correctly signed
# - check rndc dnssec -status output
@ -765,16 +768,16 @@ def test_ksr_lastbundle(ns1):
# create ksk
kskdir = "ns1/offline"
offset = -timedelta(days=365)
out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i -1y -e +1d -o")
ksks = isctest.kasp.keystr_to_keylist(out, kskdir)
cmd = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i -1y -e +1d -o")
ksks = isctest.kasp.keystr_to_keylist(cmd.out, kskdir)
assert len(ksks) == 1
check_keys(ksks, None, offset=offset)
# check that 'dnssec-ksr keygen' pregenerates right amount of keys
zskdir = "ns1"
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i -1y -e +1d")
zsks = isctest.kasp.keystr_to_keylist(out, zskdir)
cmd = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i -1y -e +1d")
zsks = isctest.kasp.keystr_to_keylist(cmd.out, zskdir)
assert len(zsks) == 2
lifetime = timedelta(days=31 * 6)
@ -783,25 +786,27 @@ def test_ksr_lastbundle(ns1):
# check that 'dnssec-ksr request' creates correct ksr
then = zsks[0].get_timing("Created") + offset
until = then + timedelta(days=366)
out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {then} -e +1d")
fname = f"{zone}.ksr.{n}"
with open(fname, "w", encoding="utf-8") as file:
file.write(out)
check_keysigningrequest(out, zsks, then, until)
ksr_fname = f"{zone}.ksr.{n}"
ksr(
zone,
policy,
"request",
options=f"-K {zskdir} -i {then} -e +1d",
to_file=ksr_fname,
)
check_keysigningrequest(ksr_fname, zsks, then, until)
# check that 'dnssec-ksr sign' creates correct skr
out, _ = ksr(
zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {then} -e +1d"
)
fname = f"{zone}.skr.{n}"
with open(fname, "w", encoding="utf-8") as file:
file.write(out)
refresh = -432000 # 5 days
check_signedkeyresponse(out, zone, ksks, zsks, then, until, refresh)
skr_fname = f"{zone}.skr.{n}"
ksr(
zone,
policy,
"sign",
options=f"-K {kskdir} -f {ksr_fname} -i {then} -e +1d",
to_file=skr_fname,
)
check_signedkeyresponse(skr_fname, zone, ksks, zsks, then, until, refresh)
# add zone
ns1.rndc(
@ -809,12 +814,11 @@ def test_ksr_lastbundle(ns1):
+ "{ type primary; file "
+ f'"{zone}.db"; dnssec-policy {policy}; '
+ "};",
log=False,
)
# import skr
shutil.copyfile(fname, f"ns1/{fname}")
ns1.rndc(f"skr -import {fname} {zone}", log=False)
shutil.copyfile(skr_fname, f"ns1/{skr_fname}")
ns1.rndc(f"skr -import {skr_fname} {zone}")
# test zone is correctly signed
# - check rndc dnssec -status output
@ -841,16 +845,16 @@ def test_ksr_inthemiddle(ns1):
# create ksk
kskdir = "ns1/offline"
offset = -timedelta(days=365)
out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i -1y -e +1y -o")
ksks = isctest.kasp.keystr_to_keylist(out, kskdir)
cmd = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i -1y -e +1y -o")
ksks = isctest.kasp.keystr_to_keylist(cmd.out, kskdir)
assert len(ksks) == 1
check_keys(ksks, None, offset=offset)
# check that 'dnssec-ksr keygen' pregenerates right amount of keys
zskdir = "ns1"
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i -1y -e +1y")
zsks = isctest.kasp.keystr_to_keylist(out, zskdir)
cmd = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i -1y -e +1y")
zsks = isctest.kasp.keystr_to_keylist(cmd.out, zskdir)
assert len(zsks) == 4
lifetime = timedelta(days=31 * 6)
@ -860,25 +864,27 @@ def test_ksr_inthemiddle(ns1):
then = zsks[0].get_timing("Created")
then = then + offset
until = then + timedelta(days=365 * 2)
out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {then} -e +1y")
fname = f"{zone}.ksr.{n}"
with open(fname, "w", encoding="utf-8") as file:
file.write(out)
check_keysigningrequest(out, zsks, then, until)
ksr_fname = f"{zone}.ksr.{n}"
ksr(
zone,
policy,
"request",
options=f"-K {zskdir} -i {then} -e +1y",
to_file=ksr_fname,
)
check_keysigningrequest(ksr_fname, zsks, then, until)
# check that 'dnssec-ksr sign' creates correct skr
out, _ = ksr(
zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {then} -e +1y"
)
fname = f"{zone}.skr.{n}"
with open(fname, "w", encoding="utf-8") as file:
file.write(out)
refresh = -432000 # 5 days
check_signedkeyresponse(out, zone, ksks, zsks, then, until, refresh)
skr_fname = f"{zone}.skr.{n}"
ksr(
zone,
policy,
"sign",
options=f"-K {kskdir} -f {ksr_fname} -i {then} -e +1y",
to_file=skr_fname,
)
check_signedkeyresponse(skr_fname, zone, ksks, zsks, then, until, refresh)
# add zone
ns1.rndc(
@ -886,12 +892,11 @@ def test_ksr_inthemiddle(ns1):
+ "{ type primary; file "
+ f'"{zone}.db"; dnssec-policy {policy}; '
+ "};",
log=False,
)
# import skr
shutil.copyfile(fname, f"ns1/{fname}")
ns1.rndc(f"skr -import {fname} {zone}", log=False)
shutil.copyfile(skr_fname, f"ns1/{skr_fname}")
ns1.rndc(f"skr -import {skr_fname} {zone}")
# test zone is correctly signed
# - check rndc dnssec -status output
@ -918,34 +923,38 @@ def check_ksr_rekey_logs_error(server, zone, policy, offset, end):
now = KeyTimingMetadata.now()
then = now + offset
until = now + end
out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i {then} -e {until} -o")
ksks = isctest.kasp.keystr_to_keylist(out, kskdir)
cmd = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i {then} -e {until} -o")
ksks = isctest.kasp.keystr_to_keylist(cmd.out, kskdir)
assert len(ksks) == 1
# key generation
zskdir = "ns1"
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {then} -e {until}")
zsks = isctest.kasp.keystr_to_keylist(out, zskdir)
cmd = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {then} -e {until}")
zsks = isctest.kasp.keystr_to_keylist(cmd.out, zskdir)
assert len(zsks) == 2
# create request
now = zsks[0].get_timing("Created")
then = now + offset
until = now + end
out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {then} -e {until}")
fname = f"{zone}.ksr.{n}"
with open(fname, "w", encoding="utf-8") as file:
file.write(out)
# sign request
out, _ = ksr(
zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {then} -e {until}"
ksr_fname = f"{zone}.ksr.{n}"
ksr(
zone,
policy,
"request",
options=f"-K {zskdir} -i {then} -e {until}",
to_file=ksr_fname,
)
fname = f"{zone}.skr.{n}"
with open(fname, "w", encoding="utf-8") as file:
file.write(out)
# sign request
skr_fname = f"{zone}.skr.{n}"
ksr(
zone,
policy,
"sign",
options=f"-K {kskdir} -f {ksr_fname} -i {then} -e {until}",
to_file=skr_fname,
)
# add zone
server.rndc(
@ -953,12 +962,11 @@ def check_ksr_rekey_logs_error(server, zone, policy, offset, end):
+ "{ type primary; file "
+ f'"{zone}.db"; dnssec-policy {policy}; '
+ "};",
log=False,
)
# import skr
shutil.copyfile(fname, f"ns1/{fname}")
server.rndc(f"skr -import {fname} {zone}", log=False)
shutil.copyfile(skr_fname, f"ns1/{skr_fname}")
server.rndc(f"skr -import {skr_fname} {zone}")
# test that rekey logs error
time_remaining = 10
@ -987,16 +995,16 @@ def test_ksr_unlimited(ns1):
# create ksk
kskdir = "ns1/offline"
out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i now -e +2y -o")
ksks = isctest.kasp.keystr_to_keylist(out, kskdir)
cmd = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i now -e +2y -o")
ksks = isctest.kasp.keystr_to_keylist(cmd.out, kskdir)
assert len(ksks) == 1
check_keys(ksks, None)
# check that 'dnssec-ksr keygen' pregenerates right amount of keys
zskdir = "ns1"
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +2y")
zsks = isctest.kasp.keystr_to_keylist(out, zskdir)
cmd = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +2y")
zsks = isctest.kasp.keystr_to_keylist(cmd.out, zskdir)
assert len(zsks) == 1
lifetime = None
@ -1005,26 +1013,28 @@ def test_ksr_unlimited(ns1):
# check that 'dnssec-ksr request' creates correct ksr
now = zsks[0].get_timing("Created")
until = now + timedelta(days=365 * 4)
out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {now} -e +4y")
fname = f"{zone}.ksr.{n}"
with open(fname, "w", encoding="utf-8") as file:
file.write(out)
check_keysigningrequest(out, zsks, now, until)
ksr_fname = f"{zone}.ksr.{n}"
ksr(
zone,
policy,
"request",
options=f"-K {zskdir} -i {now} -e +4y",
to_file=ksr_fname,
)
check_keysigningrequest(ksr_fname, zsks, now, until)
# check that 'dnssec-ksr sign' creates correct skr without cdnskey
out, _ = ksr(
zone, "no-cdnskey", "sign", options=f"-K {kskdir} -f {fname} -i {now} -e +4y"
)
skrfile = f"{zone}.no-cdnskey.skr.{n}"
with open(skrfile, "w", encoding="utf-8") as file:
file.write(out)
refresh = -432000 # 5 days
skr_fname = f"{zone}.no-cdnskey.skr.{n}"
ksr(
zone,
"no-cdnskey",
"sign",
options=f"-K {kskdir} -f {ksr_fname} -i {now} -e +4y",
to_file=skr_fname,
)
check_signedkeyresponse(
out,
skr_fname,
zone,
ksks,
zsks,
@ -1036,17 +1046,17 @@ def test_ksr_unlimited(ns1):
)
# check that 'dnssec-ksr sign' creates correct skr without cds
out, _ = ksr(
zone, "no-cds", "sign", options=f"-K {kskdir} -f {fname} -i {now} -e +4y"
)
skrfile = f"{zone}.no-cds.skr.{n}"
with open(skrfile, "w", encoding="utf-8") as file:
file.write(out)
refresh = -432000 # 5 days
skr_fname = f"{zone}.no-cds.skr.{n}"
ksr(
zone,
"no-cds",
"sign",
options=f"-K {kskdir} -f {ksr_fname} -i {now} -e +4y",
to_file=skr_fname,
)
check_signedkeyresponse(
out,
skr_fname,
zone,
ksks,
zsks,
@ -1057,16 +1067,16 @@ def test_ksr_unlimited(ns1):
)
# check that 'dnssec-ksr sign' creates correct skr
out, _ = ksr(
zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {now} -e +4y"
)
skrfile = f"{zone}.{policy}.skr.{n}"
with open(skrfile, "w", encoding="utf-8") as file:
file.write(out)
refresh = -432000 # 5 days
check_signedkeyresponse(out, zone, ksks, zsks, now, until, refresh)
skr_fname = f"{zone}.{policy}.skr.{n}"
ksr(
zone,
policy,
"sign",
options=f"-K {kskdir} -f {ksr_fname} -i {now} -e +4y",
to_file=skr_fname,
)
check_signedkeyresponse(skr_fname, zone, ksks, zsks, now, until, refresh)
# add zone
ns1.rndc(
@ -1074,12 +1084,11 @@ def test_ksr_unlimited(ns1):
+ "{ type primary; file "
+ f'"{zone}.db"; dnssec-policy {policy}; '
+ "};",
log=False,
)
# import skr
shutil.copyfile(skrfile, f"ns1/{skrfile}")
ns1.rndc(f"skr -import {skrfile} {zone}", log=False)
shutil.copyfile(skr_fname, f"ns1/{skr_fname}")
ns1.rndc(f"skr -import {skr_fname} {zone}")
# test zone is correctly signed
# - check rndc dnssec -status output
@ -1101,8 +1110,8 @@ def test_ksr_twotone(ns1):
# create ksk
kskdir = "ns1/offline"
out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i now -e +1y -o")
ksks = isctest.kasp.keystr_to_keylist(out, kskdir)
cmd = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i now -e +1y -o")
ksks = isctest.kasp.keystr_to_keylist(cmd.out, kskdir)
assert len(ksks) == 2
ksks_defalg = []
@ -1125,8 +1134,8 @@ def test_ksr_twotone(ns1):
# check that 'dnssec-ksr keygen' pregenerates right amount of keys
zskdir = "ns1"
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +1y")
zsks = isctest.kasp.keystr_to_keylist(out, zskdir)
cmd = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +1y")
zsks = isctest.kasp.keystr_to_keylist(cmd.out, zskdir)
# First algorithm keys have a lifetime of 3 months, so there should
# be 4 created keys. Second algorithm keys have a lifetime of 5
# months, so there should be 3 created keys. While only two time
@ -1157,25 +1166,27 @@ def test_ksr_twotone(ns1):
# check that 'dnssec-ksr request' creates correct ksr
now = zsks[0].get_timing("Created")
until = now + timedelta(days=365)
out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {now} -e +1y")
fname = f"{zone}.ksr.{n}"
with open(fname, "w", encoding="utf-8") as file:
file.write(out)
check_keysigningrequest(out, zsks, now, until)
ksr_fname = f"{zone}.ksr.{n}"
ksr(
zone,
policy,
"request",
options=f"-K {zskdir} -i {now} -e +1y",
to_file=ksr_fname,
)
check_keysigningrequest(ksr_fname, zsks, now, until)
# check that 'dnssec-ksr sign' creates correct skr
out, _ = ksr(
zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {now} -e +1y"
)
skrfile = f"{zone}.skr.{n}"
with open(skrfile, "w", encoding="utf-8") as file:
file.write(out)
refresh = -timedelta(days=5)
check_signedkeyresponse(out, zone, ksks, zsks, now, until, refresh)
skr_fname = f"{zone}.skr.{n}"
ksr(
zone,
policy,
"sign",
options=f"-K {kskdir} -f {ksr_fname} -i {now} -e +1y",
to_file=skr_fname,
)
check_signedkeyresponse(skr_fname, zone, ksks, zsks, now, until, refresh)
# add zone
ns1.rndc(
@ -1183,12 +1194,11 @@ def test_ksr_twotone(ns1):
+ "{ type primary; file "
+ f'"{zone}.db"; dnssec-policy {policy}; '
+ "};",
log=False,
)
# import skr
shutil.copyfile(skrfile, f"ns1/{skrfile}")
ns1.rndc(f"skr -import {skrfile} {zone}", log=False)
shutil.copyfile(skr_fname, f"ns1/{skr_fname}")
ns1.rndc(f"skr -import {skr_fname} {zone}")
# test zone is correctly signed
# - check rndc dnssec -status output
@ -1216,8 +1226,8 @@ def test_ksr_kskroll(ns1):
# create ksk
kskdir = "ns1/offline"
out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i now -e +1y -o")
ksks = isctest.kasp.keystr_to_keylist(out, kskdir)
cmd = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i now -e +1y -o")
ksks = isctest.kasp.keystr_to_keylist(cmd.out, kskdir)
assert len(ksks) == 2
lifetime = timedelta(days=31 * 6)
@ -1225,8 +1235,8 @@ def test_ksr_kskroll(ns1):
# check that 'dnssec-ksr keygen' pregenerates right amount of keys
zskdir = "ns1"
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +1y")
zsks = isctest.kasp.keystr_to_keylist(out, zskdir)
cmd = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +1y")
zsks = isctest.kasp.keystr_to_keylist(cmd.out, zskdir)
assert len(zsks) == 1
check_keys(zsks, None)
@ -1234,25 +1244,27 @@ def test_ksr_kskroll(ns1):
# check that 'dnssec-ksr request' creates correct ksr
now = zsks[0].get_timing("Created")
until = now + timedelta(days=365)
out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {now} -e +1y")
fname = f"{zone}.ksr.{n}"
with open(fname, "w", encoding="utf-8") as file:
file.write(out)
check_keysigningrequest(out, zsks, now, until)
ksr_fname = f"{zone}.ksr.{n}"
ksr(
zone,
policy,
"request",
options=f"-K {zskdir} -i {now} -e +1y",
to_file=ksr_fname,
)
check_keysigningrequest(ksr_fname, zsks, now, until)
# check that 'dnssec-ksr sign' creates correct skr
out, _ = ksr(
zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {now} -e +1y"
)
skrfile = f"{zone}.skr.{n}"
with open(skrfile, "w", encoding="utf-8") as file:
file.write(out)
refresh = -432000 # 5 days
check_signedkeyresponse(out, zone, ksks, zsks, now, until, refresh)
skr_fname = f"{zone}.skr.{n}"
ksr(
zone,
policy,
"sign",
options=f"-K {kskdir} -f {ksr_fname} -i {now} -e +1y",
to_file=skr_fname,
)
check_signedkeyresponse(skr_fname, zone, ksks, zsks, now, until, refresh)
# add zone
ns1.rndc(
@ -1260,12 +1272,11 @@ def test_ksr_kskroll(ns1):
+ "{ type primary; file "
+ f'"{zone}.db"; dnssec-policy {policy}; '
+ "};",
log=False,
)
# import skr
shutil.copyfile(skrfile, f"ns1/{skrfile}")
ns1.rndc(f"skr -import {skrfile} {zone}", log=False)
shutil.copyfile(skr_fname, f"ns1/{skr_fname}")
ns1.rndc(f"skr -import {skr_fname} {zone}")
# test zone is correctly signed
# - check rndc dnssec -status output

View file

@ -96,7 +96,7 @@ def test_masterfile_missing_master_file_servfail():
def test_masterfile_owner_inheritance():
"""Test owner inheritance after $INCLUDE"""
checker_output = isctest.run.cmd(
cmd = isctest.run.cmd(
[
os.environ["CHECKZONE"],
"-D",
@ -104,12 +104,12 @@ def test_masterfile_owner_inheritance():
"example",
"zone/inheritownerafterinclude.db",
]
).stdout.decode("utf-8")
)
owner_inheritance_zone = """
example. 0 IN SOA . . 0 0 0 0 0
example. 0 IN TXT "this should be at the zone apex"
example. 0 IN NS .
"""
checker_zone = dns.zone.from_text(checker_output, origin="example.")
checker_zone = dns.zone.from_text(cmd.out, origin="example.")
expected = dns.zone.from_text(owner_inheritance_zone, origin="example.")
isctest.check.zones_equal(checker_zone, expected, compare_ttl=True)

View file

@ -11,7 +11,7 @@
from datetime import timedelta
import os
import re
from re import compile as Re
import pytest
@ -73,8 +73,8 @@ def dsfromkey(key):
"-w",
str(key.keyfile),
]
out = isctest.run.cmd(dsfromkey_command)
return out.stdout.decode("utf-8").split()
cmd = isctest.run.cmd(dsfromkey_command)
return cmd.out.split()
def check_dnssec(server, zone, keys, expected):
@ -102,13 +102,10 @@ def check_no_dnssec_in_journal(server, zone):
f"{server.identifier}/{zone}.db.jnl",
]
out = isctest.run.cmd(journalprint)
contents = out.stdout.decode("utf-8")
pattern = re.compile(
r"^\s*(?:\S+\s+){4}(NSEC|NSEC3|NSEC3PARAM|RRSIG)", flags=re.MULTILINE
)
match = pattern.search(contents)
assert not match, f"{match.group(1)} record found in journal"
cmd = isctest.run.cmd(journalprint)
assert (
Re(r"^\s*(?:\S+\s+){4}(NSEC|NSEC3|NSEC3PARAM|RRSIG)") not in cmd.out
), "dnssec record found in journal"
def wait_for_serial(primary, server, zone):
@ -181,17 +178,17 @@ def check_add_zsk(server, zone, keys, expected, extra_keys, extra, primary=None)
isctest.log.info(
f"- zone {zone} {server.identifier}: make sure we did not try to sign with the keys added with nsupdate"
)
server.log.prohibit(f"dns_zone_findkeys: error reading ./K{zone}")
assert f"dns_zone_findkeys: error reading ./K{zone}" not in server.log
# Trigger keymgr.
with server.watch_log_from_here() as watcher:
server.rndc(f"loadkeys {zone}", log=False)
server.rndc(f"loadkeys {zone}")
watcher.wait_for_line(f"keymgr: {zone} done")
# Check again.
isctest.log.info(f"- zone {zone} {server.identifier}: check again after keymgr run")
check_dnssec(server, zone, keys + extra_keys, expected + extra)
server.log.prohibit(f"dns_zone_findkeys: error reading ./K{zone}")
assert f"dns_zone_findkeys: error reading ./K{zone}" not in server.log
def _check_remove_zsk_fail(
@ -223,7 +220,7 @@ def _check_remove_zsk_fail(
# Trigger keymgr.
with server.watch_log_from_here() as watcher:
server.rndc(f"loadkeys {zone}", log=False)
server.rndc(f"loadkeys {zone}")
watcher.wait_for_line(f"keymgr: {zone} done")
# Check again.
@ -266,7 +263,7 @@ def check_remove_zsk(
# Trigger keymgr.
with server.watch_log_from_here() as watcher:
server.rndc(f"loadkeys {zone}", log=False)
server.rndc(f"loadkeys {zone}")
watcher.wait_for_line(f"keymgr: {zone} done")
# Check again.
@ -302,7 +299,7 @@ def check_add_cdnskey(server, zone, keys, expected, extra_keys, extra, primary=N
# Trigger keymgr.
with server.watch_log_from_here() as watcher:
server.rndc(f"loadkeys {zone}", log=False)
server.rndc(f"loadkeys {zone}")
watcher.wait_for_line(f"keymgr: {zone} done")
# Check again.
@ -339,7 +336,7 @@ def _check_remove_cdnskey_fail(
# Trigger keymgr.
with server.watch_log_from_here() as watcher:
server.rndc(f"loadkeys {zone}", log=False)
server.rndc(f"loadkeys {zone}")
watcher.wait_for_line(f"keymgr: {zone} done")
# Check again.
@ -382,7 +379,7 @@ def check_remove_cdnskey(
# Trigger keymgr.
with server.watch_log_from_here() as watcher:
server.rndc(f"loadkeys {zone}", log=False)
server.rndc(f"loadkeys {zone}")
watcher.wait_for_line(f"keymgr: {zone} done")
# Check again.
@ -418,7 +415,7 @@ def check_add_cds(server, zone, keys, expected, extra_keys, extra, primary=None)
# Trigger keymgr.
with server.watch_log_from_here() as watcher:
server.rndc(f"loadkeys {zone}", log=False)
server.rndc(f"loadkeys {zone}")
watcher.wait_for_line(f"keymgr: {zone} done")
# Check again.
@ -455,7 +452,7 @@ def _check_remove_cds_fail(
# Trigger keymgr.
with server.watch_log_from_here() as watcher:
server.rndc(f"loadkeys {zone}", log=False)
server.rndc(f"loadkeys {zone}")
watcher.wait_for_line(f"keymgr: {zone} done")
# Check again.
@ -498,7 +495,7 @@ def check_remove_cds(
# Trigger keymgr.
with server.watch_log_from_here() as watcher:
server.rndc(f"loadkeys {zone}", log=False)
server.rndc(f"loadkeys {zone}")
watcher.wait_for_line(f"keymgr: {zone} done")
# Check again.

View file

@ -114,4 +114,4 @@ def test_nsec3_case(ns3):
# Using rndc signing -nsec3param (should fail)
isctest.log.info(f"use rndc signing -nsec3param {zone} to change NSEC3 settings")
response = ns3.rndc(f"signing -nsec3param 1 1 12 ffff {zone}")
assert "zone uses dnssec-policy, use rndc dnssec command instead" in response
assert "zone uses dnssec-policy, use rndc dnssec command instead" in response.out

View file

@ -34,7 +34,7 @@ def test_nzd2nzf(ns1):
isctest.check.refused(res)
# add new zone into the default NZD using "rndc addzone"
ns1.rndc(f"addzone {zone_data}", log=False)
ns1.rndc(f"addzone {zone_data}")
# query for existing zone data
res = isctest.query.tcp(msg, ns1.ip)
@ -44,13 +44,11 @@ def test_nzd2nzf(ns1):
# dump "_default.nzd" to "_default.nzf" and check that it contains the expected content
cfg_dir = "ns1"
stdout = isctest.run.cmd(
[os.environ["NZD2NZF"], "_default.nzd"], cwd=cfg_dir
).stdout.decode("utf-8")
assert f"zone {zone_data}" in stdout
cmd = isctest.run.cmd([os.environ["NZD2NZF"], "_default.nzd"], cwd=cfg_dir)
assert f"zone {zone_data}" in cmd.out
nzf_filename = os.path.join(cfg_dir, "_default.nzf")
with open(nzf_filename, "w", encoding="utf-8") as nzf_file:
nzf_file.write(stdout)
nzf_file.write(cmd.out)
# delete "_default.nzd" database
nzd_filename = os.path.join(cfg_dir, "_default.nzd")

View file

@ -0,0 +1,46 @@
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
# pylint: disable=unknown-option-value,re-compile-alias
import re
from astroid import nodes
from pylint.checkers import BaseRawFileChecker
from pylint.lint import PyLinter
class ReCompileChecker(BaseRawFileChecker):
name = "custom_raw"
msgs = {
"R9901": (
"Replace re.compile() with Re() using `from re import compile as Re`",
"re-compile-alias",
(
"Use a Re() alias instead of re.compile() by importing the "
"re.compile() function as Re()"
),
),
}
options = ()
def process_module(self, node: nodes.Module) -> None:
pattern = re.compile(r"re\.compile\(")
with node.stream() as stream:
for lineno, line in enumerate(stream):
if pattern.search(line.decode("utf-8")):
self.add_message("re-compile-alias", line=lineno)
def register(linter: PyLinter) -> None:
linter.register_checker(ReCompileChecker(linter))

View file

@ -22,18 +22,20 @@ def test_resolver_cache_reloadfails(ns1, templates):
isctest.check.noerror(res)
assert res.answer[0].ttl == 300
templates.render("ns1/named.conf", {"wrongoption": True})
try:
# The first reload fails, and the old cache list will be preserved
ns1.rndc("reload")
except isctest.rndc.RNDCException:
templates.render("ns1/named.conf", {"wrongoption": False})
# The second reload succeed, and the cache is still there, as preserved
# from the old cache list
ns1.rndc("reload")
time.sleep(3)
msg = isctest.query.create("www.example.org.", "A")
res = isctest.query.udp(msg, "10.53.0.1")
isctest.check.noerror(res)
# The ttl being lower than 300 (provided by fake authoritative) proves
# the cache is still in use
assert res.answer[0].ttl < 300
# The first reload fails, and the old cache list will be preserved
cmd = ns1.rndc("reload", raise_on_exception=False)
assert cmd.rc != 0
templates.render("ns1/named.conf", {"wrongoption": False})
# The second reload succeed, and the cache is still there, as preserved
# from the old cache list
ns1.rndc("reload")
time.sleep(3)
msg = isctest.query.create("www.example.org.", "A")
res = isctest.query.udp(msg, "10.53.0.1")
isctest.check.noerror(res)
# The ttl being lower than 300 (provided by fake authoritative) proves
# the cache is still in use
assert res.answer[0].ttl < 300

View file

@ -82,8 +82,8 @@ def test_algoroll_csk_reconfig_step1(tld, ns6, alg, size):
tag = keys[0].key.tag
msg1 = f"keymgr-manual-mode: block retire DNSKEY {zone}/RSASHA256/{tag} (CSK)"
msg2 = f"keymgr-manual-mode: block new key generation for zone {zone} (policy {policy})"
ns6.log.expect(msg1)
ns6.log.expect(msg2)
assert msg1 in ns6.log
assert msg2 in ns6.log
# Force step.
with ns6.watch_log_from_here() as watcher:
@ -175,7 +175,7 @@ def test_algoroll_csk_reconfig_step3(tld, ns6, alg, size):
# Check logs.
tag = keys[1].key.tag
msg = f"keymgr-manual-mode: block transition CSK {zone}/ECDSAP256SHA256/{tag} type DS state HIDDEN to state RUMOURED"
ns6.log.expect(msg)
assert msg in ns6.log
# Force step.
with ns6.watch_log_from_here() as watcher:
@ -244,7 +244,7 @@ def test_algoroll_csk_reconfig_step4(tld, ns6, alg, size):
# Check logs.
tag = keys[0].key.tag
msg = f"keymgr-manual-mode: block transition CSK {zone}/RSASHA256/{tag} type DNSKEY state OMNIPRESENT to state UNRETENTIVE"
ns6.log.expect(msg)
assert msg in ns6.log
# Force step.
tag = keys[1].key.tag

View file

@ -85,9 +85,9 @@ def test_algoroll_ksk_zsk_reconfig_step1(tld, ns6, alg, size):
msg1 = f"keymgr-manual-mode: block retire DNSKEY {zone}/RSASHA256/{ktag} (KSK)"
msg2 = f"keymgr-manual-mode: block retire DNSKEY {zone}/RSASHA256/{ztag} (ZSK)"
msg3 = f"keymgr-manual-mode: block new key generation for zone {zone} (policy {policy})" # twice
ns6.log.expect(msg1)
ns6.log.expect(msg2)
ns6.log.expect(msg3)
assert msg1 in ns6.log
assert msg2 in ns6.log
assert len(ns6.log.grep(msg3)) >= 2
# Force step.
with ns6.watch_log_from_here() as watcher:
@ -184,7 +184,7 @@ def test_algoroll_ksk_zsk_reconfig_step3(tld, ns6, alg, size):
# Check logs.
tag = keys[2].key.tag
msg = f"keymgr-manual-mode: block transition KSK {zone}/ECDSAP256SHA256/{tag} type DS state HIDDEN to state RUMOURED"
ns6.log.expect(msg)
assert msg in ns6.log
# Force step.
with ns6.watch_log_from_here() as watcher:
@ -259,8 +259,8 @@ def test_algoroll_ksk_zsk_reconfig_step4(tld, ns6, alg, size):
ztag = keys[1].key.tag
msg1 = f"keymgr-manual-mode: block transition KSK {zone}/RSASHA256/{ktag} type DNSKEY state OMNIPRESENT to state UNRETENTIVE"
msg2 = f"keymgr-manual-mode: block transition ZSK {zone}/RSASHA256/{ztag} type DNSKEY state OMNIPRESENT to state UNRETENTIVE"
ns6.log.expect(msg1)
ns6.log.expect(msg2)
assert msg1 in ns6.log
assert msg2 in ns6.log
# Force step.
ktag = keys[3].key.tag

View file

@ -125,7 +125,7 @@ def test_csk_roll1_step2(tld, alg, size, ns3):
# Check logs.
tag = keys[0].key.tag
msg = f"keymgr-manual-mode: block CSK rollover for key {zone}/ECDSAP256SHA256/{tag} (policy {policy})"
ns3.log.expect(msg)
assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:
@ -185,7 +185,7 @@ def test_csk_roll1_step3(tld, alg, size, ns3):
# Check logs.
tag = keys[1].key.tag
msg = f"keymgr-manual-mode: block transition CSK {zone}/ECDSAP256SHA256/{tag} type ZRRSIG state HIDDEN to state RUMOURED"
ns3.log.expect(msg)
assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:
@ -274,8 +274,7 @@ def test_csk_roll1_step4(tld, alg, size, ns3):
# Check logs.
tag = keys[0].key.tag
msg = f"keymgr-manual-mode: block transition CSK {zone}/ECDSAP256SHA256/{tag} type KRRSIG state OMNIPRESENT to state UNRETENTIVE"
ns3.log.expect(msg)
assert msg in ns3.log
# Force step.
tag = keys[1].key.tag

View file

@ -128,7 +128,7 @@ def test_csk_roll2_step2(tld, alg, size, ns3):
# Check logs.
tag = keys[0].key.tag
msg = f"keymgr-manual-mode: block CSK rollover for key {zone}/ECDSAP256SHA256/{tag} (policy {policy})"
ns3.log.expect(msg)
assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:
@ -188,7 +188,7 @@ def test_csk_roll2_step3(tld, alg, size, ns3):
# Check logs.
tag = keys[1].key.tag
msg = f"keymgr-manual-mode: block transition CSK {zone}/ECDSAP256SHA256/{tag} type ZRRSIG state HIDDEN to state RUMOURED"
ns3.log.expect(msg)
assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:
@ -315,8 +315,8 @@ def test_csk_roll2_step5(tld, alg, size, ns3):
tag = keys[0].key.tag
msg1 = f"keymgr-manual-mode: block transition CSK {zone}/ECDSAP256SHA256/{tag} type DNSKEY state OMNIPRESENT to state UNRETENTIVE"
msg2 = f"keymgr-manual-mode: block transition CSK {zone}/ECDSAP256SHA256/{tag} type KRRSIG state OMNIPRESENT to state UNRETENTIVE"
ns3.log.expect(msg1)
ns3.log.expect(msg2)
assert msg1 in ns3.log
assert msg2 in ns3.log
# Force step.
tag = keys[1].key.tag

View file

@ -74,7 +74,7 @@ def test_rollover_enable_dnssec_step1(tld, alg, size, ns3):
# Check logs.
msg = f"keymgr-manual-mode: block new key generation for zone {zone} (policy {policy})"
ns3.log.expect(msg)
assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:
@ -155,7 +155,7 @@ def test_rollover_enable_dnssec_step3(tld, alg, size, ns3):
# Check logs.
tag = keys[0].key.tag
msg = f"keymgr-manual-mode: block transition CSK {zone}/ECDSAP256SHA256/{tag} type DS state HIDDEN to state RUMOURED"
ns3.log.expect(msg)
assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:

View file

@ -110,7 +110,7 @@ def test_ksk_doubleksk_step2(tld, alg, size, ns3):
# Check logs.
tag = keys[1].key.tag
msg = f"keymgr-manual-mode: block KSK rollover for key {zone}/ECDSAP256SHA256/{tag} (policy {policy})"
ns3.log.expect(msg)
assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:
@ -171,7 +171,7 @@ def test_ksk_doubleksk_step3(tld, alg, size, ns3):
# Check logs.
tag = keys[2].key.tag
msg = f"keymgr-manual-mode: block transition KSK {zone}/ECDSAP256SHA256/{tag} type DS state HIDDEN to state RUMOURED"
ns3.log.expect(msg)
assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:
@ -252,8 +252,8 @@ def test_ksk_doubleksk_step4(tld, alg, size, ns3):
tag = keys[1].key.tag
msg1 = f"keymgr-manual-mode: block transition KSK {zone}/ECDSAP256SHA256/{tag} type DNSKEY state OMNIPRESENT to state UNRETENTIVE"
msg2 = f"keymgr-manual-mode: block transition KSK {zone}/ECDSAP256SHA256/{tag} type KRRSIG state OMNIPRESENT to state UNRETENTIVE"
ns3.log.expect(msg1)
ns3.log.expect(msg2)
assert msg1 in ns3.log
assert msg2 in ns3.log
# Force step.
tag = keys[2].key.tag

View file

@ -58,7 +58,7 @@ def test_rollover_multisigner(ns3, alg, size):
zone,
]
return isctest.run.cmd(keygen_command).stdout.decode("utf-8")
return isctest.run.cmd(keygen_command).out
zone = "multisigner-model2.kasp"

View file

@ -117,7 +117,7 @@ def test_zsk_prepub_step2(tld, alg, size, ns3):
# Check logs.
tag = keys[1].key.tag
msg = f"keymgr-manual-mode: block ZSK rollover for key {zone}/ECDSAP256SHA256/{tag} (policy {policy})"
ns3.log.expect(msg)
assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:
@ -176,7 +176,7 @@ def test_zsk_prepub_step3(tld, alg, size, ns3):
# Check logs.
tag = keys[2].key.tag
msg = f"keymgr-manual-mode: block transition ZSK {zone}/ECDSAP256SHA256/{tag} type ZRRSIG state HIDDEN to state RUMOURED"
ns3.log.expect(msg)
assert msg in ns3.log
# Force step.
with ns3.watch_log_from_here() as watcher:
@ -224,7 +224,7 @@ def test_zsk_prepub_step3(tld, alg, size, ns3):
# Force full resign and check all signatures have been replaced.
with ns3.watch_log_from_here() as watcher:
ns3.rndc(f"sign {zone}", log=False)
ns3.rndc(f"sign {zone}")
watcher.wait_for_line(f"zone_needdump: zone {zone}/IN (signed): enter")
step["smooth"] = False
@ -263,7 +263,7 @@ def test_zsk_prepub_step4(tld, alg, size, ns3):
# Check logs.
tag = keys[1].key.tag
msg = f"keymgr-manual-mode: block transition ZSK {zone}/ECDSAP256SHA256/{tag} type DNSKEY state OMNIPRESENT to state UNRETENTIVE"
ns3.log.expect(msg)
assert msg in ns3.log
# Force step.
tag = keys[2].key.tag

View file

@ -154,7 +154,7 @@ def test_rollover_manual(ns3):
# Try to schedule a ZSK rollover for an inactive key (should fail).
zsk = expected[3].key
response = ns3.rndc(f"dnssec -rollover -key {zsk.tag} {zone}")
assert "key is not actively signing" in response
assert "key is not actively signing" in response.out
def test_rollover_manual_zrrsig_rumoured(ns3):

View file

@ -121,22 +121,18 @@ pytestmark = pytest.mark.extra_artifacts(
],
)
def test_rrchecker_list_standard_names(option, expected_result):
stdout = isctest.run.cmd([os.environ["RRCHECKER"], option]).stdout.decode("utf-8")
values = [line for line in stdout.split("\n") if line.strip()]
cmd = isctest.run.cmd([os.environ["RRCHECKER"], option])
values = [line for line in cmd.out.split("\n") if line.strip()]
assert sorted(values) == sorted(expected_result)
def run_rrchecker(option, rr_class, rr_type, rr_rest):
rrchecker_output = (
isctest.run.cmd(
[os.environ["RRCHECKER"], option],
input_text=f"{rr_class} {rr_type} {rr_rest}".encode("utf-8"),
)
.stdout.decode("utf-8")
.strip()
cmd = isctest.run.cmd(
[os.environ["RRCHECKER"], option],
input_text=f"{rr_class} {rr_type} {rr_rest}".encode("utf-8"),
)
return rrchecker_output.split()
return cmd.out.strip().split()
@pytest.mark.parametrize(
@ -162,7 +158,7 @@ def test_rrchecker_conversions(option):
".",
tempzone_file,
],
).stdout.decode("utf-8")
).out
checkzone_output = [
line for line in checkzone_output.splitlines() if not line.startswith(";")
]

View file

@ -71,11 +71,8 @@ def do_work(named_proc, resolver_ip, instance, kill_method, n_workers, n_queries
# helper function, 'command' is the rndc command to run
def launch_rndc(command):
try:
instance.rndc(command, log=False)
return 0
except isctest.rndc.RNDCException:
return -1
ret = instance.rndc(command, raise_on_exception=False)
return 0 if ret.rc == 0 else -1
# We're going to execute queries in parallel by means of a thread pool.
# dnspython functions block, so we need to circumvent that.

View file

@ -21,7 +21,7 @@ def test_spf_log(ns1):
"zone warn/IN: loaded serial 0",
"zone nowarn/IN: loaded serial 0",
):
ns1.log.expect(msg)
assert msg in ns1.log
for msg in (
"zone nowarn/IN: 'y.nowarn' found type SPF record but no SPF TXT record found",
@ -29,4 +29,4 @@ def test_spf_log(ns1):
"zone warn/IN: 'warn' found type SPF record but no SPF TXT record found",
"zone nowarn/IN: 'nowarn' found type SPF record but no SPF TXT record found",
):
ns1.log.prohibit(msg)
assert msg not in ns1.log

View file

@ -10,7 +10,6 @@
# information regarding copyright ownership.
import concurrent.futures
import os
import time
import dns.update
@ -27,22 +26,8 @@ pytestmark = pytest.mark.extra_artifacts(
def rndc_loop(test_state, server):
rndc = os.getenv("RNDC")
port = os.getenv("CONTROLPORT")
cmdline = [
rndc,
"-c",
"../_common/rndc.conf",
"-p",
port,
"-s",
server,
"reload",
]
while not test_state["finished"]:
isctest.run.cmd(cmdline, raise_on_exception=False)
server.rndc("reload", raise_on_exception=False)
time.sleep(1)

View file

@ -52,16 +52,12 @@ def test_nsec3_hashes(domain, nsec3hash):
algorithm = "1"
iterations = "12"
output = isctest.run.cmd(
[NSEC3HASH, salt, algorithm, iterations, domain]
).stdout.decode("utf-8")
assert nsec3hash in output
cmd = isctest.run.cmd([NSEC3HASH, salt, algorithm, iterations, domain])
assert nsec3hash in cmd.out
flags = "0"
output = isctest.run.cmd(
[NSEC3HASH, "-r", algorithm, flags, iterations, salt, domain]
).stdout.decode("utf-8")
assert nsec3hash in output
cmd = isctest.run.cmd([NSEC3HASH, "-r", algorithm, flags, iterations, salt, domain])
assert nsec3hash in cmd.out
@pytest.mark.parametrize(
@ -78,11 +74,11 @@ def test_nsec3_empty_salt(salt_emptiness_args):
iterations = "0"
domain = "com"
output = isctest.run.cmd(
cmd = isctest.run.cmd(
[NSEC3HASH] + salt_emptiness_args + [algorithm, iterations, domain]
).stdout.decode("utf-8")
assert "CK0POJMG874LJREF7EFN8430QVIT8BSM" in output
assert "salt=-" in output
)
assert "CK0POJMG874LJREF7EFN8430QVIT8BSM" in cmd.out
assert "salt=-" in cmd.out
@pytest.mark.parametrize(
@ -98,7 +94,7 @@ def test_nsec3_empty_salt_r(salt_emptiness_arg):
iterations = "0"
domain = "com"
output = isctest.run.cmd(
cmd = isctest.run.cmd(
[
NSEC3HASH,
"-r",
@ -108,8 +104,8 @@ def test_nsec3_empty_salt_r(salt_emptiness_arg):
salt_emptiness_arg,
domain,
]
).stdout.decode("utf-8")
assert " - CK0POJMG874LJREF7EFN8430QVIT8BSM" in output
)
assert " - CK0POJMG874LJREF7EFN8430QVIT8BSM" in cmd.out
@pytest.mark.parametrize(
@ -145,10 +141,8 @@ def test_nsec3hash_acceptable_values(domain, it, salt_bytes) -> None:
)
# calculate the hash using nsec3hash:
output = isctest.run.cmd(
[NSEC3HASH, salt_text, "1", str(it), str(domain)]
).stdout.decode("ascii")
hash2 = output.partition(" ")[0]
cmd = isctest.run.cmd([NSEC3HASH, salt_text, "1", str(it), str(domain)])
hash2 = cmd.out.partition(" ")[0]
assert hash1 == hash2

View file

@ -11,6 +11,7 @@
import os
import re
from re import compile as Re
import pytest
@ -61,14 +62,14 @@ def test_verify_good_zone_nsec_next_name_case_mismatch():
)
def get_bad_zone_output(zone):
only_opt = ["-z"] if re.match(r"[zk]sk-only", zone) else []
output = isctest.run.cmd(
def verify_bad_zone(zone):
only_opt = ["-z"] if re.search(r"^[zk]sk-only", zone) else []
cmd = isctest.run.cmd(
[VERIFY, *only_opt, "-o", zone, f"zones/{zone}.bad"],
raise_on_exception=False,
)
stream = (output.stdout + output.stderr).decode("utf-8").replace("\n", "")
return stream
assert cmd.rc != 0
return cmd
@pytest.mark.parametrize(
@ -80,7 +81,8 @@ def get_bad_zone_output(zone):
],
)
def test_verify_bad_zone_files_dnskeyonly(zone):
assert re.match(r".*DNSKEY is not signed.*", get_bad_zone_output(zone))
cmd = verify_bad_zone(zone)
assert "DNSKEY is not signed" in cmd.err
@pytest.mark.parametrize(
@ -97,10 +99,8 @@ def test_verify_bad_zone_files_dnskeyonly(zone):
],
)
def test_verify_bad_zone_files_expired(zone):
assert re.match(
r".*signature has expired.*|.*No self-signed .*DNSKEY found.*",
get_bad_zone_output(zone),
)
cmd = verify_bad_zone(zone)
assert Re("signature has expired|No self-signed DNSKEY found") in cmd.err
@pytest.mark.parametrize(
@ -112,40 +112,33 @@ def test_verify_bad_zone_files_expired(zone):
],
)
def test_verify_bad_zone_files_unexpected_nsec_rrset(zone):
assert re.match(r".*unexpected NSEC RRset at.*", get_bad_zone_output(zone))
cmd = verify_bad_zone(zone)
assert "unexpected NSEC RRset at" in cmd.err
def test_verify_bad_zone_files_bad_nsec_record():
assert re.match(
r".*Bad NSEC record for.*, next name mismatch.*",
get_bad_zone_output("ksk+zsk.nsec.broken-chain"),
)
cmd = verify_bad_zone("ksk+zsk.nsec.broken-chain")
assert Re("Bad NSEC record for.*, next name mismatch") in cmd.err
def test_verify_bad_zone_files_bad_bitmap():
assert re.match(
r".*bit map mismatch.*", get_bad_zone_output("ksk+zsk.nsec.bad-bitmap")
)
cmd = verify_bad_zone("ksk+zsk.nsec.bad-bitmap")
assert "bit map mismatch" in cmd.err
def test_verify_bad_zone_files_missing_nsec3_record():
assert re.match(
r".*Missing NSEC3 record for.*",
get_bad_zone_output("ksk+zsk.nsec3.missing-empty"),
)
cmd = verify_bad_zone("ksk+zsk.nsec3.missing-empty")
assert "Missing NSEC3 record for" in cmd.err
def test_verify_bad_zone_files_no_dnssec_keys():
assert re.match(
r".*Zone contains no DNSSEC keys.*", get_bad_zone_output("unsigned")
)
cmd = verify_bad_zone("unsigned")
assert "Zone contains no DNSSEC keys" in cmd.err
def test_verify_bad_zone_files_unequal_nsec3_chains():
assert re.match(
r".*Expected and found NSEC3 chains not equal.*",
get_bad_zone_output("ksk+zsk.nsec3.extra-nsec3"),
)
cmd = verify_bad_zone("ksk+zsk.nsec3.extra-nsec3")
assert "Expected and found NSEC3 chains not equal" in cmd.err
# checking error message when -o is not used
@ -153,27 +146,25 @@ def test_verify_bad_zone_files_unequal_nsec3_chains():
def test_verify_soa_not_at_top_error():
# when -o is not used, origin is set to zone file name,
# which should cause an error in this case
output = isctest.run.cmd(
[VERIFY, "zones/ksk+zsk.nsec.good"], raise_on_exception=False
).stderr.decode("utf-8")
assert "not at top of zone" in output
assert "use -o to specify a different zone origin" in output
cmd = isctest.run.cmd([VERIFY, "zones/ksk+zsk.nsec.good"], raise_on_exception=False)
assert "not at top of zone" in cmd.err
assert "use -o to specify a different zone origin" in cmd.err
# checking error message when an invalid -o is specified
# and a SOA record not at top of zone is found
def test_verify_invalid_o_option_soa_not_at_top_error():
output = isctest.run.cmd(
cmd = isctest.run.cmd(
[VERIFY, "-o", "invalid.origin", "zones/ksk+zsk.nsec.good"],
raise_on_exception=False,
).stderr.decode("utf-8")
assert "not at top of zone" in output
assert "use -o to specify a different zone origin" not in output
)
assert "not at top of zone" in cmd.err
assert "use -o to specify a different zone origin" not in cmd.err
# checking dnssec-verify -J reads journal file
def test_verify_j_reads_journal_file():
output = isctest.run.cmd(
cmd = isctest.run.cmd(
[
VERIFY,
"-o",
@ -182,5 +173,5 @@ def test_verify_j_reads_journal_file():
"zones/updated.other.jnl",
"zones/updated.other",
]
).stdout.decode("utf-8")
assert "Loading zone 'updated' from file 'zones/updated.other'" in output
)
assert "Loading zone 'updated' from file 'zones/updated.other'" in cmd.out

View file

@ -9,7 +9,7 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import re
from re import compile as Re
import isctest
@ -32,7 +32,7 @@ def wait_for_initial_xfrin(ns):
def wait_for_sending_notify(ns1, ns, key_name):
pattern = re.compile(
pattern = Re(
f"zone test/IN: sending notify to {ns.ip}#[0-9]+ : TSIG \\({key_name}\\)"
)
with ns1.watch_log_from_start() as watcher:

View file

@ -12,6 +12,7 @@
import glob
import os
import re
from re import compile as Re
import shutil
import signal
import time
@ -71,7 +72,7 @@ def test_xferquota(named_port, ns1, ns2):
isctest.check.rrsets_equal(ns1response.answer, ns2response.answer)
query_and_compare(axfr_msg)
pattern = re.compile(
pattern = Re(
f"transfer of 'changing/IN' from 10.53.0.1#{named_port}: "
f"Transfer completed: .*\\(serial 2\\)"
)