Add isctest.transfer.transfer_message() helper and convert tests

Add a new helper function, isctest.transfer.transfer_message(), to
bin/tests/system/isctest/transfer.py that generates the log message
produced by xfrin_log() in lib/dns/xfrin.c for an incoming zone
transfer:

    transfer of '<zone>/IN' from <source_ns>#<port>: <msg>

The helper always returns a compiled re.Pattern.  source_ns and port
each accept None to match any source address / port.  msg accepts
either a plain str (regex-escaped automatically) or a compiled
re.Pattern (spliced into the regex as-is), so callers that need regex
syntax in the message part can pass Re(r"...") without having to
wrap the whole result.

source_ns is passed through re.escape() when provided, so dots in
IPv4 addresses (e.g. "10.53.0.1") match a literal dot rather than
any character.

Convert the existing call sites across the system tests to use the
new helper.

Co-Authored-By: Nicki Křížek <nicki@isc.org>
Assisted-by: Claude:claude-sonnet-4-6
Assisted-by: Claude:claude-opus-4-7
This commit is contained in:
Michal Nowak 2026-05-11 13:24:22 +02:00
parent f65c5b0ab6
commit 27ee27d4e3
No known key found for this signature in database
11 changed files with 205 additions and 50 deletions

View file

@ -9,8 +9,6 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from re import compile as Re
import dns.rcode
import pytest
@ -27,8 +25,8 @@ pytestmark = pytest.mark.extra_artifacts(
@pytest.fixture(scope="module")
def transfers_complete(servers):
for zone in ["example", "example-aes-128", "example-aes-256", "example-chacha-20"]:
pattern = Re(
f"transfer of '{zone}/IN' from 10.53.0.1#[0-9]+: Transfer completed"
pattern = isctest.transfer.transfer_message(
zone, "10.53.0.1", "Transfer completed"
)
for ns in ["ns2", "ns3", "ns4", "ns5"]:
with servers[ns].watch_log_from_start() as watcher:

View file

@ -27,9 +27,13 @@ def check_soa_servfail_ede24(edemsg):
isctest.check.ede(res, EDECode.INVALID_DATA, edemsg)
def check_ns2_ready(ns2):
def check_ns2_ready(ns2, named_port):
# Sanity check that everything works first, once we're sure the foo.fr zone
# has transfered to ns2.
with ns2.watch_log_from_start() as watcher:
watcher.wait_for_line("Transfer status: success")
watcher.wait_for_line(
isctest.transfer.transfer_message(
"foo.fr", "10.53.0.1", "Transfer status: success", named_port
)
)
check_soa_noerror()

View file

@ -13,9 +13,11 @@ import os
from ede24.common import check_ns2_ready, check_soa_noerror, check_soa_servfail_ede24
import isctest
def test_ede24_expired(ns1, ns2):
check_ns2_ready(ns2)
def test_ede24_expired(named_port, ns1, ns2):
check_ns2_ready(ns2, named_port)
# Stop the primary and wait for expiration of the zone in the secondary.
with ns2.watch_log_from_here() as watcher:
@ -32,5 +34,9 @@ def test_ede24_expired(ns1, ns2):
# Restart the primary and wait for the zone to be back up again.
with ns2.watch_log_from_here() as watcher:
ns1.start(["--noclean", "--restart", "--port", os.environ["PORT"]])
watcher.wait_for_line("Transfer status: success")
watcher.wait_for_line(
isctest.transfer.transfer_message(
"foo.fr", "10.53.0.1", "Transfer status: success", named_port
)
)
check_soa_noerror()

View file

@ -14,8 +14,8 @@ import os
from ede24.common import check_ns2_ready, check_soa_servfail_ede24
def test_ede24_noloaded(ns1, ns2):
check_ns2_ready(ns2)
def test_ede24_noloaded(named_port, ns1, ns2):
check_ns2_ready(ns2, named_port)
# Stop all servers, and we'll restart only ns2.
ns1.stop()

View file

@ -18,6 +18,7 @@ from . import ( # pylint: disable=redefined-builtin
query,
run,
template,
transfer,
vars,
)
@ -35,5 +36,6 @@ __all__ = [
"query",
"run",
"template",
"transfer",
"vars",
]

View file

@ -0,0 +1,47 @@
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from re import Pattern
import re
def transfer_message(
zone: str, source_ns: str | None, msg: str | Pattern, port: int | None = None
) -> Pattern:
"""Return the expected log message for an incoming zone transfer.
Mirrors the format produced by xfrin_log() in lib/dns/xfrin.c:
transfer of '<zone>/IN' from <source_ns>#<port>: <msg>
Always returns a compiled Pattern. When source_ns or port is None,
the unknown part is replaced by a wildcard in the regex.
Args:
zone: Zone name (without class, e.g. "example.com").
source_ns: Source nameserver IP address string (e.g. "10.53.0.1"),
or None to match any source address.
msg: Transfer status or other message as a plain string
(e.g. "Transfer status: success"), which is regex-escaped,
or a compiled Pattern whose .pattern is spliced in as-is
for callers that need regex syntax in the message part.
port: Source port number, or None to match any port.
"""
source_str = re.escape(source_ns) if source_ns is not None else ".*"
port_str = str(port) if port is not None else "[0-9]+"
msg_str = msg.pattern if isinstance(msg, Pattern) else re.escape(msg)
return re.compile(
re.escape(f"transfer of '{zone}/IN' from ")
+ f"{source_str}#{port_str}: "
+ msg_str
)

View file

@ -13,6 +13,8 @@
from isctest.instance import NamedInstance
from isctest.mark import live_internet_test
import isctest
@live_internet_test
def test_mirror_root_zone(servers: dict[str, NamedInstance]):
@ -23,4 +25,6 @@ def test_mirror_root_zone(servers: dict[str, NamedInstance]):
ns1 = servers["ns1"]
with ns1.watch_log_from_start() as watch_log:
# TimeoutError is raised if the line is not found and the test will fail.
watch_log.wait_for_line("Transfer status: success")
watch_log.wait_for_line(
isctest.transfer.transfer_message(".", None, "Transfer status: success", 53)
)

View file

@ -28,7 +28,9 @@ def test_wait_for_zone_retransfer(named_port, ns6):
with ns6.watch_log_from_here() as watcher:
ns6.rndc("retransfer axfr-rndc-retransfer-force.")
watcher.wait_for_line(
f"'axfr-rndc-retransfer-force/IN' from 10.53.0.1#{named_port}: received"
isctest.transfer.transfer_message(
"axfr-rndc-retransfer-force", "10.53.0.1", "received", named_port
)
)
@ -40,11 +42,21 @@ def test_cancel_ongoing_retransfer(named_port, ns6):
with ns6.watch_log_from_here() as watcher_transfer_shutting_down:
ns6.rndc("retransfer -force axfr-rndc-retransfer-force.")
watcher_transfer_shutting_down.wait_for_line(
f"'axfr-rndc-retransfer-force/IN' from 10.53.0.1#{named_port}: Transfer status: shutting down"
isctest.transfer.transfer_message(
"axfr-rndc-retransfer-force",
"10.53.0.1",
"Transfer status: shutting down",
named_port,
)
)
isctest.log.info("Wait for the new transfer to complete successfully")
watcher_transfer_success.wait_for_line(
f"'axfr-rndc-retransfer-force/IN' from 10.53.0.1#{named_port}: Transfer status: success"
isctest.transfer.transfer_message(
"axfr-rndc-retransfer-force",
"10.53.0.1",
"Transfer status: success",
named_port,
)
)

View file

@ -40,18 +40,26 @@ def send_switch_control_command(command):
@pytest.fixture(scope="module", autouse=True)
def after_servers_start(templates, ns4):
def after_servers_start(templates, named_port, ns4):
# initial correctly-signed transfer should succeed
send_switch_control_command("goodaxfr")
with ns4.watch_log_from_here() as watcher:
templates.render("ns4/named.conf", {"ns4_as_secondary_for_nil": True})
ns4.reconfigure()
watcher.wait_for_line("Transfer status: success")
watcher.wait_for_line(
isctest.transfer.transfer_message(
"nil", "10.53.0.5", "Transfer status: success", named_port
)
)
with ns4.watch_log_from_here() as watcher_retransfer_nil_success:
ns4.rndc("retransfer nil.")
watcher_retransfer_nil_success.wait_for_line("Transfer status: success")
watcher_retransfer_nil_success.wait_for_line(
isctest.transfer.transfer_message(
"nil", "10.53.0.5", "Transfer status: success", named_port
)
)
def get_response(msg, server_ip, allow_empty_answer=False):
@ -297,7 +305,7 @@ def test_make_ns4_secondary_for_nil():
check_rdata_in_txt_record("initial AXFR")
def test_handle_ixfr_notimp(ns4):
def test_handle_ixfr_notimp(named_port, ns4):
send_switch_control_command("ixfrnotimp")
with ns4.watch_log_from_here() as watcher_transfer_success:
with ns4.watch_log_from_here() as watcher_requesting_ixfr:
@ -305,76 +313,105 @@ def test_handle_ixfr_notimp(ns4):
watcher_requesting_ixfr.wait_for_line(
"zone nil/IN: requesting IXFR from 10.53.0.5"
)
watcher_transfer_success.wait_for_line("Transfer status: success")
watcher_transfer_success.wait_for_line(
isctest.transfer.transfer_message(
"nil", "10.53.0.5", "Transfer status: success", named_port
)
)
check_rdata_in_txt_record("IXFR NOTIMP")
@pytest.mark.parametrize(
"command_file,expected_rdata,named_log_line",
"command_file,expected_rdata,named_log_line,xfrin_msg",
[
param(
"unsigned",
"unsigned AXFR",
"Transfer status: expected a TSIG or SIG(0)",
True,
),
param(
"badkeydata",
"bad keydata AXFR",
"Transfer status: tsig verify failure",
True,
),
param(
"partial",
"partially signed AXFR",
"Transfer status: expected a TSIG or SIG(0)",
True,
),
param(
"unknownkey",
"unknown key AXFR",
"tsig key 'tsig_key': key name and algorithm do not match",
False,
),
param(
"wrongkey",
"incorrect key AXFR",
"tsig key 'tsig_key': key name and algorithm do not match",
False,
),
param(
"wrongname",
"wrong question AXFR",
"question name mismatch",
True,
),
param(
"badmessageid",
"bad message id",
"Transfer status: unexpected error",
True,
),
param(
"soamismatch",
"SOA mismatch AXFR",
"Transfer status: FORMERR",
True,
),
],
)
def test_under_signed_transfer(command_file, expected_rdata, named_log_line, ns4):
def test_under_signed_transfer(
command_file, expected_rdata, named_log_line, xfrin_msg, named_port, ns4
):
send_switch_control_command(command_file)
with ns4.watch_log_from_here() as watcher:
ns4.rndc("retransfer nil.")
watcher.wait_for_line(named_log_line)
if xfrin_msg:
watcher.wait_for_line(
isctest.transfer.transfer_message(
"nil", "10.53.0.5", named_log_line, named_port
)
)
else:
watcher.wait_for_line(named_log_line)
check_rdata_in_txt_record(expected_rdata, should_exist=False)
def test_handle_edns_notimp(ns4):
def test_handle_edns_notimp(named_port, ns4):
send_switch_control_command("ednsnotimp")
with ns4.watch_log_from_here() as watcher:
ns4.rndc("retransfer nil.")
watcher.wait_for_line("Transfer status: NOTIMP")
watcher.wait_for_line(
isctest.transfer.transfer_message(
"nil", "10.53.0.5", "Transfer status: NOTIMP", named_port
)
)
def test_handle_edns_formerr(ns4):
def test_handle_edns_formerr(named_port, ns4):
send_switch_control_command("ednsformerr")
with ns4.watch_log_from_here() as watcher:
ns4.rndc("retransfer nil.")
watcher.wait_for_line("Transfer status: success")
watcher.wait_for_line(
isctest.transfer.transfer_message(
"nil", "10.53.0.5", "Transfer status: success", named_port
)
)
check_rdata_in_txt_record("EDNS FORMERR")
@ -436,9 +473,16 @@ def test_mapped_zone(named_port, ns3):
# test that a zone with too many records is rejected (AXFR)
def test_axfr_too_many_records(ns6):
def test_axfr_too_many_records(named_port, ns6):
with ns6.watch_log_from_start() as watcher:
watcher.wait_for_line(Re("'axfr-too-big/IN'.*: too many records"))
watcher.wait_for_line(
isctest.transfer.transfer_message(
"axfr-too-big",
"10.53.0.1",
"Transfer status: too many records",
named_port,
)
)
# test that a zone with too many records is rejected (IXFR)
@ -451,7 +495,14 @@ def test_ixfr_too_many_records(named_port, ns6):
send
"""
nsupdate(nsupdate_config)
watcher.wait_for_line("Transfer status: too many records")
watcher.wait_for_line(
isctest.transfer.transfer_message(
"ixfr-too-big",
"10.53.0.1",
"Transfer status: too many records",
named_port,
)
)
# test that a zone with too many diffs (IXFR) is retried with AXFR
@ -466,7 +517,12 @@ def test_ixfr_too_many_diffs(named_port, ns6):
update add the-35th-record.ixfr-too-many-diffs 0 TXT ixfr
send
"""
log_sequence = ["too many diffs, retrying with AXFR", "Transfer status: success"]
log_sequence = [
"too many diffs, retrying with AXFR",
isctest.transfer.transfer_message(
"ixfr-too-many-diffs", "10.53.0.1", "Transfer status: success", named_port
),
]
with ns6.watch_log_from_here() as watcher_transfer_status:
nsupdate(nsupdate_config)
watcher_transfer_status.wait_for_sequence(log_sequence)
@ -477,10 +533,14 @@ def test_dig_and_named_axfr_stats(named_port, ns3):
# Use ns3 logs for checking incoming transfer statistics as ns3 is a
# secondary server (for ns1) for "xfer-stats".
with ns3.watch_log_from_start() as watcher_transfer_completed:
pattern_transfer_completed = (
"Transfer completed: 16 messages, 10003 records, 218403 bytes"
watcher_transfer_completed.wait_for_line(
isctest.transfer.transfer_message(
"xfer-stats",
"10.53.0.1",
"Transfer completed: 16 messages, 10003 records, 218403 bytes",
named_port,
)
)
watcher_transfer_completed.wait_for_line(pattern_transfer_completed)
# Loop until the secondary server manages to transfer the "xfer-stats" zone so
# that we can both check dig output and immediately proceed with the next test.
@ -508,11 +568,18 @@ def test_dig_and_named_axfr_stats(named_port, ns3):
# First, test that named tries the next primary in the list when the first one
# fails (XoT -> Do53). Then, test that named tries the next primary in the list
# when the first one is already marked as unreachable (XoT -> Do53).
def test_xot_primary_try_next(ns6):
def test_xot_primary_try_next(named_port, ns6):
def retransfer_and_check_log():
with ns6.watch_log_from_here(timeout=60) as watcher:
ns6.rndc("retransfer xot-primary-try-next.")
watcher.wait_for_line("Transfer status: success")
watcher.wait_for_line(
isctest.transfer.transfer_message(
"xot-primary-try-next",
"10.53.0.1",
"Transfer status: success",
named_port,
)
)
retransfer_and_check_log()
retransfer_and_check_log()

View file

@ -25,9 +25,13 @@ def check_soa(ns, serial):
)
def wait_for_initial_xfrin(ns):
def wait_for_initial_xfrin(ns, named_port):
with ns.watch_log_from_start() as watcher:
watcher.wait_for_line("Transfer status: success")
watcher.wait_for_line(
isctest.transfer.transfer_message(
"test", "10.53.0.1", "Transfer status: success", named_port
)
)
check_soa(ns, 1)
@ -39,11 +43,11 @@ def wait_for_sending_notify(ns1, ns, key_name):
watcher.wait_for_line(pattern)
def test_xfer_servers_list(ns1, ns2, ns3, ns4, templates):
# First, wait for ns2, ns3 and ns4 to xfrin foo.fr and answer it
wait_for_initial_xfrin(ns2)
wait_for_initial_xfrin(ns3)
wait_for_initial_xfrin(ns4)
def test_xfer_servers_list(named_port, ns1, ns2, ns3, ns4, templates):
# First, wait for ns2, ns3 and ns4 to xfrin test. and answer it
wait_for_initial_xfrin(ns2, named_port)
wait_for_initial_xfrin(ns3, named_port)
wait_for_initial_xfrin(ns4, named_port)
# ns1 initially notifies the secondaries using the respectively configured keys
# - 10.53.0.2 has the key defined where `secondaries` is used
@ -56,13 +60,25 @@ def test_xfer_servers_list(ns1, ns2, ns3, ns4, templates):
for ns, key_name in seq:
wait_for_sending_notify(ns1, ns, key_name)
# Then, ns1 update foo.fr. It notifies ns2, ns3 and ns4 about it
# Then, ns1 update test. It notifies ns2, ns3 and ns4 about it
templates.render("ns1/test.db", {"serial": 2})
with ns2.watch_log_from_here() as ns2_watcher, ns3.watch_log_from_here() as ns3_watcher, ns4.watch_log_from_here() as ns4_watcher:
ns1.rndc("reload")
ns2_watcher.wait_for_line("Transfer status: success")
ns3_watcher.wait_for_line("Transfer status: success")
ns4_watcher.wait_for_line("Transfer status: success")
ns2_watcher.wait_for_line(
isctest.transfer.transfer_message(
"test", "10.53.0.1", "Transfer status: success", named_port
)
)
ns3_watcher.wait_for_line(
isctest.transfer.transfer_message(
"test", "10.53.0.1", "Transfer status: success", named_port
)
)
ns4_watcher.wait_for_line(
isctest.transfer.transfer_message(
"test", "10.53.0.1", "Transfer status: success", named_port
)
)
check_soa(ns2, 2)
check_soa(ns3, 2)
check_soa(ns4, 2)

View file

@ -73,9 +73,8 @@ def test_xferquota(named_port, ns1, ns2):
isctest.check.rrsets_equal(ns1response.answer, ns2response.answer)
query_and_compare(axfr_msg)
pattern = Re(
f"transfer of 'changing/IN' from 10.53.0.1#{named_port}: "
f"Transfer completed: .*\\(serial 2\\)"
pattern = isctest.transfer.transfer_message(
"changing", "10.53.0.1", Re(r"Transfer completed: .*\(serial 2\)"), named_port
)
with ns2.watch_log_from_start(timeout=30) as watcher:
watcher.wait_for_line(pattern)