bind9/bin/tests/system/statschannel/generic_dnspython.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

129 lines
3.7 KiB
Python
Raw Normal View History

# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from collections import defaultdict
import dns.message
import dns.query
import dns.rcode
TIMEOUT = 10
def create_msg(qname, qtype):
msg = dns.message.make_query(
qname, qtype, want_dnssec=True, use_edns=0, payload=4096
)
return msg
def udp_query(ip, port, msg):
ans = dns.query.udp(msg, ip, TIMEOUT, port=port)
assert ans.rcode() == dns.rcode.NOERROR
return ans
def tcp_query(ip, port, msg):
ans = dns.query.tcp(msg, ip, TIMEOUT, port=port)
assert ans.rcode() == dns.rcode.NOERROR
return ans
def create_expected(data):
expected = {
"dns-tcp-requests-sizes-received-ipv4": defaultdict(int),
"dns-tcp-responses-sizes-sent-ipv4": defaultdict(int),
"dns-tcp-requests-sizes-received-ipv6": defaultdict(int),
"dns-tcp-responses-sizes-sent-ipv6": defaultdict(int),
"dns-udp-requests-sizes-received-ipv4": defaultdict(int),
"dns-udp-requests-sizes-received-ipv6": defaultdict(int),
"dns-udp-responses-sizes-sent-ipv4": defaultdict(int),
"dns-udp-responses-sizes-sent-ipv6": defaultdict(int),
}
for k, v in data.items():
for kk, vv in v.items():
expected[k][kk] += vv
return expected
def update_expected(expected, key, msg):
msg_len = len(msg.to_wire())
bucket_num = (msg_len // 16) * 16
bucket = "{}-{}".format(bucket_num, bucket_num + 15)
expected[key][bucket] += 1
def check_traffic(data, expected):
def ordered(obj):
if isinstance(obj, dict):
return sorted((k, ordered(v)) for k, v in obj.items())
if isinstance(obj, list):
return sorted(ordered(x) for x in obj)
return obj
ordered_data = ordered(data)
ordered_expected = ordered(expected)
assert len(ordered_data) == 8
assert len(ordered_expected) == 8
assert len(data) == len(ordered_data)
assert len(expected) == len(ordered_expected)
assert ordered_data == ordered_expected
Refactor "statschannel" test's helper modules The "statschannel" system test contains two Python helper modules: - generic.py: test functions directly invoked by both tests-json.py and test-xml.py, - helper.py: helper functions invoked by test functions in generic.py. The above logic for splitting helper functions into Python modules prevents selective test skipping from working due to unconditional import statements being present in both helper modules. For example, if dnspython is not available on the test host, tests-json.py imports generic.py, which in turn imports helper.py, which in turn attempts to import various dnspython modules, triggering ImportError exceptions during test initialization. Various decorators used for some tests (like @pytest.mark.dnspython) suggest that such a scenario should be handled gracefully, but that is not the case - modifying the test collection in conftest.py does not prevent pytest from failing due to import errors. Fix by moving helper functions around to achieve a different split: - generic.py: helper functions only relying on the Python standard library, - generic_dnspython.py: helper functions requiring dnspython. Only two tests in tests-{json,xml}.py need dnspython to work (test_traffic_json(), test_traffic_xml()). Since all dnspython-dependent code is now present in generic_dnspython.py, employ pytest.importorskip() in those two tests to ensure they can be selectively skipped when dnspython is not available. Adjust other code to account for the revised Python helper module layout. Remove all occurrences of the @pytest.mark.dnspython decorator (and all associated code) from the "statschannel" system test to prevent confusion.
2022-03-14 03:59:32 -04:00
def test_traffic(fetch_traffic, **kwargs):
statsip = kwargs["statsip"]
statsport = kwargs["statsport"]
port = kwargs["port"]
Refactor "statschannel" test's helper modules The "statschannel" system test contains two Python helper modules: - generic.py: test functions directly invoked by both tests-json.py and test-xml.py, - helper.py: helper functions invoked by test functions in generic.py. The above logic for splitting helper functions into Python modules prevents selective test skipping from working due to unconditional import statements being present in both helper modules. For example, if dnspython is not available on the test host, tests-json.py imports generic.py, which in turn imports helper.py, which in turn attempts to import various dnspython modules, triggering ImportError exceptions during test initialization. Various decorators used for some tests (like @pytest.mark.dnspython) suggest that such a scenario should be handled gracefully, but that is not the case - modifying the test collection in conftest.py does not prevent pytest from failing due to import errors. Fix by moving helper functions around to achieve a different split: - generic.py: helper functions only relying on the Python standard library, - generic_dnspython.py: helper functions requiring dnspython. Only two tests in tests-{json,xml}.py need dnspython to work (test_traffic_json(), test_traffic_xml()). Since all dnspython-dependent code is now present in generic_dnspython.py, employ pytest.importorskip() in those two tests to ensure they can be selectively skipped when dnspython is not available. Adjust other code to account for the revised Python helper module layout. Remove all occurrences of the @pytest.mark.dnspython decorator (and all associated code) from the "statschannel" system test to prevent confusion.
2022-03-14 03:59:32 -04:00
data = fetch_traffic(statsip, statsport)
exp = create_expected(data)
msg = create_msg("short.example.", "TXT")
update_expected(exp, "dns-udp-requests-sizes-received-ipv4", msg)
ans = udp_query(statsip, port, msg)
update_expected(exp, "dns-udp-responses-sizes-sent-ipv4", ans)
data = fetch_traffic(statsip, statsport)
check_traffic(data, exp)
msg = create_msg("long.example.", "TXT")
update_expected(exp, "dns-udp-requests-sizes-received-ipv4", msg)
ans = udp_query(statsip, port, msg)
update_expected(exp, "dns-udp-responses-sizes-sent-ipv4", ans)
data = fetch_traffic(statsip, statsport)
check_traffic(data, exp)
msg = create_msg("short.example.", "TXT")
update_expected(exp, "dns-tcp-requests-sizes-received-ipv4", msg)
ans = tcp_query(statsip, port, msg)
update_expected(exp, "dns-tcp-responses-sizes-sent-ipv4", ans)
data = fetch_traffic(statsip, statsport)
check_traffic(data, exp)
msg = create_msg("long.example.", "TXT")
update_expected(exp, "dns-tcp-requests-sizes-received-ipv4", msg)
ans = tcp_query(statsip, port, msg)
update_expected(exp, "dns-tcp-responses-sizes-sent-ipv4", ans)
data = fetch_traffic(statsip, statsport)
check_traffic(data, exp)