mirror of
https://github.com/isc-projects/bind9.git
synced 2026-06-11 03:09:59 -04:00
Verify that the resolver selects authoritative servers in increasing SRTT order. Four servers are configured with increasing response delays. 100 queries are sent, expecting most to go to the fastest server (ns2). Then ns2 stops responding, another 100 queries are sent and should go to ns3 (the next fastest), and so on through ns4 and ns5. Each query uses a unique name to avoid cache hits.
89 lines
2.7 KiB
Python
89 lines
2.7 KiB
Python
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
|
|
#
|
|
# SPDX-License-Identifier: MPL-2.0
|
|
#
|
|
# This Source Code Form is subject to the terms of the Mozilla Public
|
|
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
|
|
#
|
|
# See the COPYRIGHT file distributed with this work for additional
|
|
# information regarding copyright ownership.
|
|
|
|
import os
|
|
|
|
import isctest
|
|
import isctest.mark
|
|
|
|
pytestmark = [isctest.mark.with_dnstap]
|
|
|
|
|
|
def line_to_dst_ips(line):
|
|
# dnstap-read output line example
|
|
# 05-Feb-2026 11:00:57.853 RQ 10.53.0.6:38507 -> 10.53.0.3:22047 TCP 56b fooXXX.example./IN/NS
|
|
_, _, _, _, _, dst, _, _, _ = line.split(" ", 9)
|
|
ip, _ = dst.split(":", 1)
|
|
return ip
|
|
|
|
|
|
def extract_dnstap(ns):
|
|
ns.rndc("dnstap -roll 1")
|
|
path = os.path.join(ns.identifier, "dnstap.out.0")
|
|
dnstapread = isctest.run.cmd(
|
|
[isctest.vars.ALL["DNSTAPREAD"], path],
|
|
)
|
|
|
|
lines = dnstapread.out.splitlines()
|
|
return map(line_to_dst_ips, lines)
|
|
|
|
|
|
def assert_used_auth(ns, authip):
|
|
ips = extract_dnstap(ns)
|
|
queries = 0
|
|
matches = 0
|
|
for ip in ips:
|
|
queries += 1
|
|
if ip == authip:
|
|
matches += 1
|
|
assert matches > 85
|
|
assert queries <= 115
|
|
|
|
|
|
def test_srtt(ns6):
|
|
for i in range(1, 100):
|
|
msg = isctest.query.create(f"foo{i}.example.", "A")
|
|
res = isctest.query.udp(msg, ns6.ip)
|
|
isctest.check.noerror(res)
|
|
assert len(res.answer[0]) == 1
|
|
res.answer[0].ttl = 300
|
|
assert str(res.answer[0]) == f"foo{i}.example. 300 IN A 10.53.9.9"
|
|
|
|
assert_used_auth(ns6, "10.53.0.2")
|
|
|
|
for i in range(100, 200):
|
|
msg = isctest.query.create(f"foo{i}.example.", "A")
|
|
res = isctest.query.udp(msg, ns6.ip)
|
|
isctest.check.noerror(res)
|
|
assert len(res.answer[0]) == 1
|
|
res.answer[0].ttl = 300
|
|
assert str(res.answer[0]) == f"foo{i}.example. 300 IN A 10.53.9.9"
|
|
|
|
assert_used_auth(ns6, "10.53.0.3")
|
|
|
|
for i in range(200, 300):
|
|
msg = isctest.query.create(f"foo{i}.example.", "A")
|
|
res = isctest.query.udp(msg, ns6.ip)
|
|
isctest.check.noerror(res)
|
|
assert len(res.answer[0]) == 1
|
|
res.answer[0].ttl = 300
|
|
assert str(res.answer[0]) == f"foo{i}.example. 300 IN A 10.53.9.9"
|
|
|
|
assert_used_auth(ns6, "10.53.0.4")
|
|
|
|
for i in range(300, 400):
|
|
msg = isctest.query.create(f"foo{i}.example.", "A")
|
|
res = isctest.query.udp(msg, ns6.ip)
|
|
isctest.check.noerror(res)
|
|
assert len(res.answer[0]) == 1
|
|
res.answer[0].ttl = 300
|
|
assert str(res.answer[0]) == f"foo{i}.example. 300 IN A 10.53.9.9"
|
|
assert_used_auth(ns6, "10.53.0.5")
|