Merge branch 'michal/tidy-setup-of-python-based-tests-v9_18' into 'v9_18'

[v9_18] Tidy setup of Python-based tests

See merge request isc-projects/bind9!5973
This commit is contained in:
Michał Kępień 2022-03-14 08:55:15 +00:00
commit ed7c208616
24 changed files with 223 additions and 662 deletions

View file

@ -214,20 +214,18 @@ endif HAVE_PERLMOD_NET_DNS
if HAVE_PYTHON
TESTS += kasp keymgr2kasp tcp pipelined
if HAVE_PYMOD_DNS
TESTS += checkds dispatch qmin cookie timeouts
if HAVE_PYTEST
TESTS += checkds dispatch rpzextra shutdown timeouts
endif
if HAVE_PYMOD_DNS
TESTS += qmin cookie
if HAVE_PERLMOD_NET_DNS
TESTS += dnssec
if HAVE_PERLMOD_NET_DNS_NAMESERVER
TESTS += chain
endif HAVE_PERLMOD_NET_DNS_NAMESERVER
endif HAVE_PERLMOD_NET_DNS
if HAVE_PYTEST
TESTS += rpzextra shutdown
endif
endif HAVE_PYMOD_DNS
endif HAVE_PYTHON

View file

@ -1,71 +0,0 @@
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import os
import pytest
def pytest_configure(config):
config.addinivalue_line(
"markers", "dnspython: mark tests that need dnspython to function"
)
config.addinivalue_line(
"markers", "dnspython2: mark tests that need dnspython >= 2.0.0"
)
def pytest_collection_modifyitems(config, items):
# pylint: disable=unused-argument,unused-import,too-many-branches
# pylint: disable=import-outside-toplevel
# Test for dnspython module
skip_dnspython = pytest.mark.skip(
reason="need dnspython module to run")
try:
import dns.query # noqa: F401
except ModuleNotFoundError:
for item in items:
if "dnspython" in item.keywords:
item.add_marker(skip_dnspython)
# Test for dnspython >= 2.0.0 module
skip_dnspython2 = pytest.mark.skip(
reason="need dnspython >= 2.0.0 module to run")
try:
from dns.query import udp_with_fallback # noqa: F401
except ImportError:
for item in items:
if "dnspython2" in item.keywords:
item.add_marker(skip_dnspython2)
@pytest.fixture
def named_port(request):
# pylint: disable=unused-argument
port = os.getenv("PORT")
if port is None:
port = 5301
else:
port = int(port)
return port
@pytest.fixture
def control_port(request):
# pylint: disable=unused-argument
port = os.getenv("CONTROLPORT")
if port is None:
port = 5301
else:
port = int(port)
return port

View file

@ -17,9 +17,18 @@ import subprocess
import sys
import time
import dns.resolver
import pytest
pytest.importorskip('dns', minversion='2.0.0')
import dns.exception
import dns.message
import dns.name
import dns.query
import dns.rcode
import dns.rdataclass
import dns.rdatatype
import dns.resolver
def has_signed_apex_nsec(zone, response):
has_nsec = False
@ -220,8 +229,6 @@ def wait_for_log(filename, log):
assert found
@pytest.mark.dnspython
@pytest.mark.dnspython2
def test_checkds_dspublished(named_port):
# We create resolver instances that will be used to send queries.
server = dns.resolver.Resolver()
@ -304,8 +311,6 @@ def test_checkds_dspublished(named_port):
# TBD: Check with TLS
@pytest.mark.dnspython
@pytest.mark.dnspython2
def test_checkds_dswithdrawn(named_port):
# We create resolver instances that will be used to send queries.
server = dns.resolver.Resolver()

View file

@ -1,3 +1,5 @@
#!/usr/bin/python3
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
@ -10,16 +12,20 @@
# information regarding copyright ownership.
import os
import pytest
@pytest.fixture
def port(request):
# pylint: disable=unused-argument
env_port = os.getenv("PORT")
if env_port is None:
env_port = 5300
else:
env_port = int(env_port)
@pytest.fixture(scope='session')
def named_port():
return int(os.environ.get('PORT', default=5300))
return env_port
@pytest.fixture(scope='session')
def named_tlsport():
return int(os.environ.get('TLSPORT', default=8853))
@pytest.fixture(scope='session')
def control_port():
return int(os.environ.get('CONTROLPORT', default=9953))

View file

@ -19,8 +19,8 @@ import dns.query
import dns.rcode
def test_connreset(port):
def test_connreset(named_port):
msg = dns.message.make_query("sub.example.", "A", want_dnssec=True,
use_edns=0, payload=1232)
ans = dns.query.udp(msg, "10.53.0.2", timeout=10, port=port)
ans = dns.query.udp(msg, "10.53.0.2", timeout=10, port=named_port)
assert ans.rcode() == dns.rcode.SERVFAIL

View file

@ -11,7 +11,6 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import os
import shutil
import subprocess
@ -36,8 +35,3 @@ def gnutls_cli_executable():
pytest.skip('gnutls-cli does not support the --logfile option')
return executable
@pytest.fixture
def named_tlsport():
return int(os.environ.get('TLSPORT', '853'))

View file

@ -21,6 +21,8 @@ import pytest
pytest.importorskip('dns')
import dns.exception
import dns.message
import dns.name
import dns.rdataclass
import dns.rdatatype

View file

@ -14,7 +14,11 @@
# This script is a 'port' broker. It keeps track of ports given to the
# individual system subtests, so every test is given a unique port range.
total_tests=$(find . -maxdepth 1 -mindepth 1 -type d | wc -l)
get_sorted_test_names() {
find . -maxdepth 2 -mindepth 2 -type f \( -name "tests.sh" -o -name "tests*.py" \) | cut -d/ -f2 | sort -u
}
total_tests=$(get_sorted_test_names | wc -l)
ports_per_test=20
port_min=5001
@ -33,7 +37,7 @@ while getopts "p:t:-:" OPT; do
case "$OPT" in
p | port) baseport=$OPTARG ;;
t | test)
test_index=$(find . -maxdepth 1 -mindepth 1 -type d | sort | grep -F -x -n "./${OPTARG}" | cut -d: -f1)
test_index=$(get_sorted_test_names | awk "/^${OPTARG}\$/ { print NR }")
if [ -z "${test_index}" ]; then
echo "Test '${OPTARG}' not found" >&2
exit 1

View file

@ -1,3 +1,5 @@
#!/usr/bin/python3
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
@ -10,9 +12,9 @@
# information regarding copyright ownership.
import os
import pytest
@pytest.fixture(scope='module')
def named_port():
return int(os.environ.get("PORT", default=5300))
long_test = pytest.mark.skipif(not os.environ.get('CI_ENABLE_ALL_TESTS'),
reason='CI_ENABLE_ALL_TESTS not set')

View file

@ -1,54 +0,0 @@
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import os
import pytest
try:
import dns.resolver # noqa: F401 # pylint: disable=unused-import
except ModuleNotFoundError:
dns_resolver_module_found = False
else:
dns_resolver_module_found = True
def pytest_configure(config):
config.addinivalue_line(
"markers", "dnspython: mark tests that need dnspython to function"
)
def pytest_collection_modifyitems(config, items):
# pylint: disable=unused-argument
# Test for dnspython module
if not dns_resolver_module_found:
skip_requests = pytest.mark.skip(reason="need dnspython module to run")
for item in items:
if "dnspython" in item.keywords:
item.add_marker(skip_requests)
# Test if JSON statistics channel was enabled
no_jsonstats = pytest.mark.skip(reason="need JSON statistics to be enabled")
if os.getenv("HAVEJSONSTATS") is None:
for item in items:
if "json" in item.keywords:
item.add_marker(no_jsonstats)
@pytest.fixture
def named_port(request):
# pylint: disable=unused-argument
port = os.getenv("PORT")
if port is None:
port = 5301
else:
port = int(port)
return port

View file

@ -12,11 +12,13 @@
# information regarding copyright ownership.
import os
import pytest
pytest.importorskip('dns')
import dns.resolver
# @pytest.mark.dnspython
def test_rpz_passthru_logging(named_port):
resolver = dns.resolver.Resolver()
resolver.nameservers = ['10.53.0.1']

View file

@ -112,7 +112,7 @@ if [ "${srcdir}" != "${builddir}" ]; then
cp -a "${srcdir}/common" "${builddir}"
fi
# Some tests require additional files to work for out-of-tree test runs.
for file in ckdnsrps.sh digcomp.pl ditch.pl fromhex.pl kasp.sh packet.pl start.pl stop.pl testcrypto.sh; do
for file in ckdnsrps.sh conftest.py digcomp.pl ditch.pl fromhex.pl kasp.sh packet.pl pytest_custom_markers.py start.pl stop.pl testcrypto.sh; do
if [ ! -r "${file}" ]; then
cp -a "${srcdir}/${file}" "${builddir}"
fi

View file

@ -1,58 +0,0 @@
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import os
import pytest
def pytest_configure(config):
config.addinivalue_line(
"markers", "dnspython: mark tests that need dnspython to function"
)
def pytest_collection_modifyitems(config, items):
# pylint: disable=unused-argument,unused-import,too-many-branches
# pylint: disable=import-outside-toplevel
# Test for dnspython module
skip_dnspython = pytest.mark.skip(
reason="need dnspython module to run")
try:
import dns.query # noqa: F401
except ModuleNotFoundError:
for item in items:
if "dnspython" in item.keywords:
item.add_marker(skip_dnspython)
@pytest.fixture
def named_port(request):
# pylint: disable=unused-argument
port = os.getenv("PORT")
if port is None:
port = 5301
else:
port = int(port)
return port
@pytest.fixture
def control_port(request):
# pylint: disable=unused-argument
port = os.getenv("CONTROLPORT")
if port is None:
port = 5301
else:
port = int(port)
return port

View file

@ -19,9 +19,12 @@ import subprocess
from string import ascii_lowercase as letters
import time
import dns.resolver
import pytest
pytest.importorskip('dns')
import dns.exception
import dns.resolver
def do_work(named_proc, resolver, rndc_cmd, kill_method, n_workers, n_queries):
"""Creates a number of A queries to run in parallel
@ -126,7 +129,6 @@ def do_work(named_proc, resolver, rndc_cmd, kill_method, n_workers, n_queries):
assert ret_code == 0
@pytest.mark.dnspython
def test_named_shutdown(named_port, control_port):
# pylint: disable-msg=too-many-locals
cfg_dir = os.path.join(os.getcwd(), "resolver")

View file

@ -13,95 +13,13 @@ import os
import pytest
def pytest_configure(config):
config.addinivalue_line(
"markers", "requests: mark tests that need requests to function"
)
config.addinivalue_line(
"markers", "json: mark tests that need json to function"
)
config.addinivalue_line(
"markers", "xml: mark tests that need xml.etree to function"
)
config.addinivalue_line(
"markers", "dnspython: mark tests that need dnspython to function"
)
def pytest_collection_modifyitems(config, items):
# pylint: disable=unused-argument,unused-import,too-many-branches
# pylint: disable=import-outside-toplevel
# Test for requests module
skip_requests = pytest.mark.skip(
reason="need requests module to run")
try:
import requests # noqa: F401
except ModuleNotFoundError:
for item in items:
if "requests" in item.keywords:
item.add_marker(skip_requests)
# Test for json module
skip_json = pytest.mark.skip(
reason="need json module to run")
try:
import json # noqa: F401
except ModuleNotFoundError:
for item in items:
if "json" in item.keywords:
item.add_marker(skip_json)
# Test for xml module
skip_xml = pytest.mark.skip(
reason="need xml module to run")
try:
import xml.etree.ElementTree # noqa: F401
except ModuleNotFoundError:
for item in items:
if "xml" in item.keywords:
item.add_marker(skip_xml)
# Test if JSON statistics channel was enabled
no_jsonstats = pytest.mark.skip(
reason="need JSON statistics to be enabled")
if os.getenv("HAVEJSONSTATS") is None:
for item in items:
if "json" in item.keywords:
item.add_marker(no_jsonstats)
# Test if XML statistics channel was enabled
no_xmlstats = pytest.mark.skip(
reason="need XML statistics to be enabled")
if os.getenv("HAVEXMLSTATS") is None:
for item in items:
if "xml" in item.keywords:
item.add_marker(no_xmlstats)
# Test for dnspython module
skip_dnspython = pytest.mark.skip(
reason="need dnspython module to run")
try:
import dns.query # noqa: F401
except ModuleNotFoundError:
for item in items:
if "dnspython" in item.keywords:
item.add_marker(skip_dnspython)
@pytest.fixture
def statsport(request):
# pylint: disable=unused-argument
env_port = os.getenv("EXTRAPORT1")
if port is None:
if env_port is None:
env_port = 5301
else:
env_port = int(env_port)
return env_port
@pytest.fixture
def port(request):
# pylint: disable=unused-argument
env_port = os.getenv("PORT")
if port is None:
env_port = 5300
else:
env_port = int(env_port)
return env_port

View file

@ -9,7 +9,64 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import helper
from datetime import datetime, timedelta
import os
# ISO datetime format without msec
fmt = '%Y-%m-%dT%H:%M:%SZ'
# The constants were taken from BIND 9 source code (lib/dns/zone.c)
max_refresh = timedelta(seconds=2419200) # 4 weeks
max_expires = timedelta(seconds=14515200) # 24 weeks
now = datetime.utcnow().replace(microsecond=0)
dayzero = datetime.utcfromtimestamp(0).replace(microsecond=0)
# Generic helper functions
def check_expires(expires, min_time, max_time):
assert expires >= min_time
assert expires <= max_time
def check_refresh(refresh, min_time, max_time):
assert refresh >= min_time
assert refresh <= max_time
def check_loaded(loaded, expected):
# Sanity check the zone timers values
assert loaded == expected
assert loaded < now
def check_zone_timers(loaded, expires, refresh, loaded_exp):
# Sanity checks the zone timers values
if expires is not None:
check_expires(expires, now, now + max_expires)
if refresh is not None:
check_refresh(refresh, now, now + max_refresh)
check_loaded(loaded, loaded_exp)
#
# The output is gibberish, but at least make sure it does not crash.
#
def check_manykeys(name, zone=None):
# pylint: disable=unused-argument
assert name == "manykeys"
def zone_mtime(zonedir, name):
try:
si = os.stat(os.path.join(zonedir, "{}.db".format(name)))
except FileNotFoundError:
return dayzero
mtime = datetime.utcfromtimestamp(si.st_mtime).replace(microsecond=0)
return mtime
def test_zone_timers_primary(fetch_zones, load_timers, **kwargs):
@ -22,8 +79,8 @@ def test_zone_timers_primary(fetch_zones, load_timers, **kwargs):
for zone in zones:
(name, loaded, expires, refresh) = load_timers(zone, True)
mtime = helper.zone_mtime(zonedir, name)
helper.check_zone_timers(loaded, expires, refresh, mtime)
mtime = zone_mtime(zonedir, name)
check_zone_timers(loaded, expires, refresh, mtime)
def test_zone_timers_secondary(fetch_zones, load_timers, **kwargs):
@ -36,8 +93,8 @@ def test_zone_timers_secondary(fetch_zones, load_timers, **kwargs):
for zone in zones:
(name, loaded, expires, refresh) = load_timers(zone, False)
mtime = helper.zone_mtime(zonedir, name)
helper.check_zone_timers(loaded, expires, refresh, mtime)
mtime = zone_mtime(zonedir, name)
check_zone_timers(loaded, expires, refresh, mtime)
def test_zone_with_many_keys(fetch_zones, load_zone, **kwargs):
@ -50,46 +107,4 @@ def test_zone_with_many_keys(fetch_zones, load_zone, **kwargs):
for zone in zones:
name = load_zone(zone)
if name == 'manykeys':
helper.check_manykeys(name)
def test_traffic(fetch_traffic, **kwargs):
statsip = kwargs['statsip']
statsport = kwargs['statsport']
port = kwargs['port']
data = fetch_traffic(statsip, statsport)
exp = helper.create_expected(data)
msg = helper.create_msg("short.example.", "TXT")
helper.update_expected(exp, "dns-udp-requests-sizes-received-ipv4", msg)
ans = helper.udp_query(statsip, port, msg)
helper.update_expected(exp, "dns-udp-responses-sizes-sent-ipv4", ans)
data = fetch_traffic(statsip, statsport)
helper.check_traffic(data, exp)
msg = helper.create_msg("long.example.", "TXT")
helper.update_expected(exp, "dns-udp-requests-sizes-received-ipv4", msg)
ans = helper.udp_query(statsip, port, msg)
helper.update_expected(exp, "dns-udp-responses-sizes-sent-ipv4", ans)
data = fetch_traffic(statsip, statsport)
helper.check_traffic(data, exp)
msg = helper.create_msg("short.example.", "TXT")
helper.update_expected(exp, "dns-tcp-requests-sizes-received-ipv4", msg)
ans = helper.tcp_query(statsip, port, msg)
helper.update_expected(exp, "dns-tcp-responses-sizes-sent-ipv4", ans)
data = fetch_traffic(statsip, statsport)
helper.check_traffic(data, exp)
msg = helper.create_msg("long.example.", "TXT")
helper.update_expected(exp, "dns-tcp-requests-sizes-received-ipv4", msg)
ans = helper.tcp_query(statsip, port, msg)
helper.update_expected(exp, "dns-tcp-responses-sizes-sent-ipv4", ans)
data = fetch_traffic(statsip, statsport)
helper.check_traffic(data, exp)
check_manykeys(name)

View file

@ -9,75 +9,16 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import os
import os.path
from collections import defaultdict
from datetime import datetime, timedelta
import dns.message
import dns.query
import dns.rcode
# ISO datetime format without msec
fmt = '%Y-%m-%dT%H:%M:%SZ'
# The constants were taken from BIND 9 source code (lib/dns/zone.c)
max_refresh = timedelta(seconds=2419200) # 4 weeks
max_expires = timedelta(seconds=14515200) # 24 weeks
now = datetime.utcnow().replace(microsecond=0)
dayzero = datetime.utcfromtimestamp(0).replace(microsecond=0)
TIMEOUT = 10
# Generic helper functions
def check_expires(expires, min_time, max_time):
assert expires >= min_time
assert expires <= max_time
def check_refresh(refresh, min_time, max_time):
assert refresh >= min_time
assert refresh <= max_time
def check_loaded(loaded, expected):
# Sanity check the zone timers values
assert loaded == expected
assert loaded < now
def check_zone_timers(loaded, expires, refresh, loaded_exp):
# Sanity checks the zone timers values
if expires is not None:
check_expires(expires, now, now + max_expires)
if refresh is not None:
check_refresh(refresh, now, now + max_refresh)
check_loaded(loaded, loaded_exp)
#
# The output is gibberish, but at least make sure it does not crash.
#
def check_manykeys(name, zone=None):
# pylint: disable=unused-argument
assert name == "manykeys"
def zone_mtime(zonedir, name):
try:
si = os.stat(os.path.join(zonedir, "{}.db".format(name)))
except FileNotFoundError:
return dayzero
mtime = datetime.utcfromtimestamp(si.st_mtime).replace(microsecond=0)
return mtime
def create_msg(qname, qtype):
msg = dns.message.make_query(qname, qtype, want_dnssec=True,
use_edns=0, payload=4096)
@ -144,3 +85,45 @@ def check_traffic(data, expected):
assert len(expected) == len(ordered_expected)
assert ordered_data == ordered_expected
def test_traffic(fetch_traffic, **kwargs):
statsip = kwargs['statsip']
statsport = kwargs['statsport']
port = kwargs['port']
data = fetch_traffic(statsip, statsport)
exp = create_expected(data)
msg = create_msg("short.example.", "TXT")
update_expected(exp, "dns-udp-requests-sizes-received-ipv4", msg)
ans = udp_query(statsip, port, msg)
update_expected(exp, "dns-udp-responses-sizes-sent-ipv4", ans)
data = fetch_traffic(statsip, statsport)
check_traffic(data, exp)
msg = create_msg("long.example.", "TXT")
update_expected(exp, "dns-udp-requests-sizes-received-ipv4", msg)
ans = udp_query(statsip, port, msg)
update_expected(exp, "dns-udp-responses-sizes-sent-ipv4", ans)
data = fetch_traffic(statsip, statsport)
check_traffic(data, exp)
msg = create_msg("short.example.", "TXT")
update_expected(exp, "dns-tcp-requests-sizes-received-ipv4", msg)
ans = tcp_query(statsip, port, msg)
update_expected(exp, "dns-tcp-responses-sizes-sent-ipv4", ans)
data = fetch_traffic(statsip, statsport)
check_traffic(data, exp)
msg = create_msg("long.example.", "TXT")
update_expected(exp, "dns-tcp-requests-sizes-received-ipv4", msg)
ans = tcp_query(statsip, port, msg)
update_expected(exp, "dns-tcp-responses-sizes-sent-ipv4", ans)
data = fetch_traffic(statsip, statsport)
check_traffic(data, exp)

View file

@ -16,10 +16,12 @@ from datetime import datetime
import os
import pytest
import requests
import generic
from helper import fmt
pytestmark = pytest.mark.skipif(not os.environ.get('HAVEJSONSTATS'),
reason='json-c support disabled in the build')
requests = pytest.importorskip('requests')
# JSON helper functions
@ -50,7 +52,7 @@ def load_timers_json(zone, primary=True):
# Check if the primary zone timer exists
assert 'loaded' in zone
loaded = datetime.strptime(zone['loaded'], fmt)
loaded = datetime.strptime(zone['loaded'], generic.fmt)
if primary:
# Check if the secondary zone timers does not exist
@ -61,8 +63,8 @@ def load_timers_json(zone, primary=True):
else:
assert 'expires' in zone
assert 'refresh' in zone
expires = datetime.strptime(zone['expires'], fmt)
refresh = datetime.strptime(zone['refresh'], fmt)
expires = datetime.strptime(zone['expires'], generic.fmt)
refresh = datetime.strptime(zone['refresh'], generic.fmt)
return (name, loaded, expires, refresh)
@ -73,41 +75,25 @@ def load_zone_json(zone):
return name
@pytest.mark.json
@pytest.mark.requests
@pytest.mark.skipif(os.getenv("HAVEJSONSTATS", "unset") != "1",
reason="JSON not configured")
def test_zone_timers_primary_json(statsport):
generic.test_zone_timers_primary(fetch_zones_json, load_timers_json,
statsip="10.53.0.1", statsport=statsport,
zonedir="ns1")
@pytest.mark.json
@pytest.mark.requests
@pytest.mark.skipif(os.getenv("HAVEJSONSTATS", "unset") != "1",
reason="JSON not configured")
def test_zone_timers_secondary_json(statsport):
generic.test_zone_timers_secondary(fetch_zones_json, load_timers_json,
statsip="10.53.0.3", statsport=statsport,
zonedir="ns3")
@pytest.mark.json
@pytest.mark.requests
@pytest.mark.skipif(os.getenv("HAVEJSONSTATS", "unset") != "1",
reason="JSON not configured")
def test_zone_with_many_keys_json(statsport):
generic.test_zone_with_many_keys(fetch_zones_json, load_zone_json,
statsip="10.53.0.2", statsport=statsport)
@pytest.mark.json
@pytest.mark.requests
@pytest.mark.dnspython
@pytest.mark.skipif(os.getenv("HAVEJSONSTATS", "unset") != "1",
reason="JSON not configured")
def test_traffic_json(port, statsport):
generic.test_traffic(fetch_traffic_json,
statsip="10.53.0.2", statsport=statsport,
port=port)
def test_traffic_json(named_port, statsport):
generic_dnspython = pytest.importorskip('generic_dnspython')
generic_dnspython.test_traffic(fetch_traffic_json,
statsip="10.53.0.2", statsport=statsport,
port=named_port)

View file

@ -17,10 +17,12 @@ from datetime import datetime
import os
import pytest
import requests
import generic
from helper import fmt
pytestmark = pytest.mark.skipif(not os.environ.get('HAVEXMLSTATS'),
reason='libxml2 support disabled in the build')
requests = pytest.importorskip('requests')
# XML helper functions
@ -79,7 +81,7 @@ def load_timers_xml(zone, primary=True):
loaded_el = zone.find('loaded')
assert loaded_el is not None
loaded = datetime.strptime(loaded_el.text, fmt)
loaded = datetime.strptime(loaded_el.text, generic.fmt)
expires_el = zone.find('expires')
refresh_el = zone.find('refresh')
@ -91,8 +93,8 @@ def load_timers_xml(zone, primary=True):
else:
assert expires_el is not None
assert refresh_el is not None
expires = datetime.strptime(expires_el.text, fmt)
refresh = datetime.strptime(refresh_el.text, fmt)
expires = datetime.strptime(expires_el.text, generic.fmt)
refresh = datetime.strptime(refresh_el.text, generic.fmt)
return (name, loaded, expires, refresh)
@ -103,41 +105,25 @@ def load_zone_xml(zone):
return name
@pytest.mark.xml
@pytest.mark.requests
@pytest.mark.skipif(os.getenv("HAVEXMLSTATS", "unset") != "1",
reason="XML not configured")
def test_zone_timers_primary_xml(statsport):
generic.test_zone_timers_primary(fetch_zones_xml, load_timers_xml,
statsip="10.53.0.1", statsport=statsport,
zonedir="ns1")
@pytest.mark.xml
@pytest.mark.requests
@pytest.mark.skipif(os.getenv("HAVEXMLSTATS", "unset") != "1",
reason="XML not configured")
def test_zone_timers_secondary_xml(statsport):
generic.test_zone_timers_secondary(fetch_zones_xml, load_timers_xml,
statsip="10.53.0.3", statsport=statsport,
zonedir="ns3")
@pytest.mark.xml
@pytest.mark.requests
@pytest.mark.skipif(os.getenv("HAVEXMLSTATS", "unset") != "1",
reason="XML not configured")
def test_zone_with_many_keys_xml(statsport):
generic.test_zone_with_many_keys(fetch_zones_xml, load_zone_xml,
statsip="10.53.0.2", statsport=statsport)
@pytest.mark.xml
@pytest.mark.requests
@pytest.mark.dnspython
@pytest.mark.skipif(os.getenv("HAVEXMLSTATS", "unset") != "1",
reason="XML not configured")
def test_traffic_xml(port, statsport):
generic.test_traffic(fetch_traffic_xml,
statsip="10.53.0.2", statsport=statsport,
port=port)
def test_traffic_xml(named_port, statsport):
generic_dnspython = pytest.importorskip('generic_dnspython')
generic_dnspython.test_traffic(fetch_traffic_xml,
statsip="10.53.0.2", statsport=statsport,
port=named_port)

View file

@ -1,59 +0,0 @@
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import os
import pytest
def pytest_configure(config):
config.addinivalue_line(
"markers", "dnspython: mark tests that need dnspython to function"
)
config.addinivalue_line(
"markers", "dnspython2: mark tests that need dnspython >= 2.0.0"
)
def pytest_collection_modifyitems(config, items):
# pylint: disable=unused-argument,unused-import,too-many-branches
# pylint: disable=import-outside-toplevel
# Test for dnspython module
skip_dnspython = pytest.mark.skip(
reason="need dnspython module to run")
try:
import dns.query # noqa: F401
except ModuleNotFoundError:
for item in items:
if "dnspython" in item.keywords:
item.add_marker(skip_dnspython)
# Test for dnspython >= 2.0.0 module
skip_dnspython2 = pytest.mark.skip(
reason="need dnspython >= 2.0.0 module to run")
try:
from dns.query import send_tcp # noqa: F401
except ImportError:
for item in items:
if "dnspython2" in item.keywords:
item.add_marker(skip_dnspython2)
@pytest.fixture
def port(request):
# pylint: disable=unused-argument
env_port = os.getenv("PORT")
if port is None:
env_port = 5300
else:
env_port = int(env_port)
return env_port

View file

@ -19,11 +19,15 @@ import time
import pytest
pytest.importorskip('dns', minversion='2.0.0')
import dns.message
import dns.query
TIMEOUT = 10
def create_msg(qname, qtype):
import dns.message
msg = dns.message.make_query(qname, qtype, want_dnssec=True,
use_edns=0, payload=4096)
return msg
@ -39,12 +43,8 @@ def create_socket(host, port):
return sock
@pytest.mark.dnspython
@pytest.mark.dnspython2
def test_tcp_garbage(port):
import dns.query
with create_socket("10.53.0.7", port) as sock:
def test_tcp_garbage(named_port):
with create_socket("10.53.0.7", named_port) as sock:
msg = create_msg("a.example.", "A")
(sbytes, stime) = dns.query.send_tcp(sock, msg, timeout())
@ -66,13 +66,8 @@ def test_tcp_garbage(port):
raise EOFError from e
@pytest.mark.dnspython
@pytest.mark.dnspython2
def test_tcp_garbage_response(port):
import dns.query
import dns.message
with create_socket("10.53.0.7", port) as sock:
def test_tcp_garbage_response(named_port):
with create_socket("10.53.0.7", named_port) as sock:
msg = create_msg("a.example.", "A")
(sbytes, stime) = dns.query.send_tcp(sock, msg, timeout())

View file

@ -1,69 +0,0 @@
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
import os
import pytest
def pytest_configure(config):
config.addinivalue_line(
"markers", "dnspython: mark tests that need dnspython to function"
)
config.addinivalue_line(
"markers", "dnspython2: mark tests that need dnspython >= 2.0.0"
)
config.addinivalue_line(
"markers", "long: mark tests that take a long time to run"
)
def pytest_collection_modifyitems(config, items):
# pylint: disable=unused-argument,unused-import,too-many-branches
# pylint: disable=import-outside-toplevel
# Test for dnspython module
skip_dnspython = pytest.mark.skip(
reason="need dnspython module to run")
try:
import dns.query # noqa: F401
except ModuleNotFoundError:
for item in items:
if "dnspython" in item.keywords:
item.add_marker(skip_dnspython)
# Test for dnspython >= 2.0.0 module
skip_dnspython2 = pytest.mark.skip(
reason="need dnspython >= 2.0.0 module to run")
try:
from dns.query import send_tcp # noqa: F401
except ImportError:
for item in items:
if "dnspython2" in item.keywords:
item.add_marker(skip_dnspython2)
skip_long_tests = pytest.mark.skip(
reason="need CI_ENABLE_ALL_TESTS environment variable")
if not os.environ.get("CI_ENABLE_ALL_TESTS"):
for item in items:
if "long" in item.keywords:
item.add_marker(skip_long_tests)
@pytest.fixture
def port(request):
# pylint: disable=unused-argument
env_port = os.getenv("PORT")
if port is None:
env_port = 5300
else:
env_port = int(env_port)
return env_port

View file

@ -18,11 +18,21 @@ import time
import pytest
pytest.importorskip('dns', minversion='2.0.0')
import dns.edns
import dns.message
import dns.name
import dns.query
import dns.rdataclass
import dns.rdatatype
import pytest_custom_markers # pylint: disable=import-error
TIMEOUT = 10
def create_msg(qname, qtype):
import dns.message
msg = dns.message.make_query(qname, qtype, want_dnssec=True,
use_edns=0, payload=4096)
return msg
@ -32,16 +42,12 @@ def timeout():
return time.time() + TIMEOUT
@pytest.mark.dnspython
@pytest.mark.dnspython2
def test_initial_timeout(port):
def test_initial_timeout(named_port):
#
# The initial timeout is 2.5 seconds, so this should timeout
#
import dns.query
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect(("10.53.0.1", port))
sock.connect(("10.53.0.1", named_port))
time.sleep(3)
@ -55,17 +61,13 @@ def test_initial_timeout(port):
raise EOFError from e
@pytest.mark.dnspython
@pytest.mark.dnspython2
def test_idle_timeout(port):
def test_idle_timeout(named_port):
#
# The idle timeout is 5 seconds, so the third message should fail
#
import dns.rcode
msg = create_msg("example.", "A")
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect(("10.53.0.1", port))
sock.connect(("10.53.0.1", named_port))
time.sleep(1)
@ -87,20 +89,16 @@ def test_idle_timeout(port):
raise EOFError from e
@pytest.mark.dnspython
@pytest.mark.dnspython2
def test_keepalive_timeout(port):
def test_keepalive_timeout(named_port):
#
# Keepalive is 7 seconds, so the third message should succeed.
#
import dns.rcode
msg = create_msg("example.", "A")
kopt = dns.edns.GenericOption(11, b'\x00')
msg.use_edns(edns=True, payload=4096, options=[kopt])
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect(("10.53.0.1", port))
sock.connect(("10.53.0.1", named_port))
time.sleep(1)
@ -118,17 +116,13 @@ def test_keepalive_timeout(port):
(response, rtime) = dns.query.receive_tcp(sock, timeout())
@pytest.mark.dnspython
@pytest.mark.dnspython2
def test_pipelining_timeout(port):
def test_pipelining_timeout(named_port):
#
# The pipelining should only timeout after the last message is received
#
import dns.query
msg = create_msg("example.", "A")
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect(("10.53.0.1", port))
sock.connect(("10.53.0.1", named_port))
time.sleep(1)
@ -156,19 +150,13 @@ def test_pipelining_timeout(port):
raise EOFError from e
@pytest.mark.dnspython
@pytest.mark.dnspython2
def test_long_axfr(port):
def test_long_axfr(named_port):
#
# The timers should not fire during AXFR, thus the connection should not
# close abruptly
#
import dns.query
import dns.rdataclass
import dns.rdatatype
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect(("10.53.0.1", port))
sock.connect(("10.53.0.1", named_port))
name = dns.name.from_text("example.")
msg = create_msg("example.", "AXFR")
@ -192,13 +180,9 @@ def test_long_axfr(port):
assert soa is not None
@pytest.mark.dnspython
@pytest.mark.dnspython2
def test_send_timeout(port):
import dns.query
def test_send_timeout(named_port):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect(("10.53.0.1", port))
sock.connect(("10.53.0.1", named_port))
# Send and receive single large RDATA over TCP
msg = create_msg("large.example.", "TXT")
@ -222,16 +206,10 @@ def test_send_timeout(port):
raise EOFError from e
@pytest.mark.dnspython
@pytest.mark.dnspython2
@pytest.mark.long
def test_max_transfer_idle_out(port):
import dns.query
import dns.rdataclass
import dns.rdatatype
@pytest_custom_markers.long_test
def test_max_transfer_idle_out(named_port):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect(("10.53.0.1", port))
sock.connect(("10.53.0.1", named_port))
name = dns.name.from_text("example.")
msg = create_msg("example.", "AXFR")
@ -259,16 +237,10 @@ def test_max_transfer_idle_out(port):
assert soa is None
@pytest.mark.dnspython
@pytest.mark.dnspython2
@pytest.mark.long
def test_max_transfer_time_out(port):
import dns.query
import dns.rdataclass
import dns.rdatatype
@pytest_custom_markers.long_test
def test_max_transfer_time_out(named_port):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect(("10.53.0.1", port))
sock.connect(("10.53.0.1", named_port))
name = dns.name.from_text("example.")
msg = create_msg("example.", "AXFR")

View file

@ -35,7 +35,9 @@ import dns.message
import dns.name
import dns.query
import dns.rcode
import dns.rdataclass
import dns.rdatatype
import dns.rrset
pytest.importorskip("hypothesis")
from hypothesis import given