bind9/bin/tests/system/nsprocessinglimit/tests_nsprocessinglimit.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

123 lines
3.8 KiB
Python
Raw Permalink Normal View History

# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import os
import isctest
import isctest.mark
pytestmark = [isctest.mark.with_dnstap]
def line_to_ips_and_queries(line):
# dnstap-read output line example
# 05-Feb-2026 11:00:57.853 RQ 10.53.0.4:38507 -> 10.53.0.3:22047 TCP 56b sub.example.tld/IN/NS
_, _, _, _, _, dst, _, _, query = line.split(" ", 9)
ip, _ = dst.split(":", 1)
return (ip, query)
def extract_dnstap(ns, expectedlen):
ns.rndc("dnstap -roll 1")
path = os.path.join(ns.identifier, "dnstap.out.0")
dnstapread = isctest.run.cmd(
[isctest.vars.ALL["DNSTAPREAD"], path],
)
lines = dnstapread.out.splitlines()
assert expectedlen == len(lines)
return map(line_to_ips_and_queries, lines)
def expect_query(expected_query, expected_query_count, ips_and_queries):
count = 0
for _, query in ips_and_queries:
if query == expected_query:
count += 1
assert count == expected_query_count
def expect_next_ip_and_query(expected_ips_and_queries, ips_and_queries):
for expected_ip, expected_query in expected_ips_and_queries:
ip, query = next(ips_and_queries)
assert ip == expected_ip
assert query == expected_query
def check_nsprocessinglimit(ns, queries_count):
msg = isctest.query.create("a.sub.example.tld.", "A")
res = isctest.query.tcp(msg, ns.ip)
isctest.check.servfail(res)
# The 4 formers lines are request to find sub.example.tld NSs.
# The latest are queries to sub.example.tld NSs.
ips_and_queries = extract_dnstap(ns, queries_count)
# Checking the begining of the resulution
expect_next_ip_and_query(
[
("10.53.0.1", "./IN/NS"),
("10.53.0.1", "tld/IN/NS"),
("10.53.0.2", "example.tld/IN/NS"),
("10.53.0.3", "sub.example.tld/IN/NS"),
],
ips_and_queries,
)
expect_query("a.sub.example.tld/IN/A", queries_count - 4, ips_and_queries)
def test_nsprocessinglimit_default(ns4):
check_nsprocessinglimit(ns4, 17)
def reconfig_maxdelegationservers(ns, templates, count):
templates.render(
"ns4/named.conf", {"maxdelegationservers": f"max-delegation-servers {count};"}
)
with ns.watch_log_from_here() as watcher:
ns.rndc("flush")
ns.rndc("reload")
watcher.wait_for_line("running")
def reconfig_maxdelegationservers_failure(ns, templates, count):
templates.render(
"ns4/named.conf", {"maxdelegationservers": f"max-delegation-servers {count};"}
)
with ns.watch_log_from_here() as watcher:
# Reload will fail, so do not raise the exception so the config line
# can be checked.
ns.rndc("reload", raise_on_exception=False)
watcher.wait_for_line("reloading configuration failed: out of range")
def test_nsprocessinglimit_13ns(ns4, templates):
reconfig_maxdelegationservers(ns4, templates, 13)
check_nsprocessinglimit(ns4, 17)
def test_nsprocessinglimit_5ns(ns4, templates):
reconfig_maxdelegationservers(ns4, templates, 5)
check_nsprocessinglimit(ns4, 9)
def test_nsprocessinglimit_20ns(ns4, templates):
reconfig_maxdelegationservers(ns4, templates, 20)
check_nsprocessinglimit(ns4, 24)
def test_nsprocessinglimit_lower(ns4, templates):
reconfig_maxdelegationservers_failure(ns4, templates, 0)
def test_nsprocessinglimit_upper(ns4, templates):
reconfig_maxdelegationservers_failure(ns4, templates, 101)