mirror of
https://github.com/isc-projects/bind9.git
synced 2026-06-13 18:00:00 -04:00
[9.18] new: test: add helper functions to isctest
added some helper functions in isctest to reduce code repetition
in dnssec-related tests:
- isctest.check.adflag() - checks that a response contains AD=1
- isctest.check.noadflag() - checks that a response contains AD=0
- isctest.check.rdflag() - checks that a response contains RD=1
- isctest.check.nordflag() - checks that a response contains RD=0
- isctest.check.raflag() - checks that a response contains RA=1
- isctest.check.noraflag() - checks that a response contains RA=0
- isctest.check.rr_count_eq() - checks the number of RRsset in a section
- isctest.check.same_data() - checks that two message have the
same rcode and data
- isctest.check.same_answer() - checks that two message have the same
rcode and answer
- isctest.query.create() - a wrapper for dns.message.make_query() that
creates a query message similar to dig +dnssec
Backport of MR !10760
Merge branch 'backport-each-isctest-helpers-9.18' into 'bind-9.18'
See merge request isc-projects/bind9!10794
This commit is contained in:
commit
a47f46612e
22 changed files with 140 additions and 63 deletions
|
|
@ -79,7 +79,7 @@ def has_signed_apex_nsec(zone, response):
|
|||
|
||||
|
||||
def do_query(server, qname, qtype, tcp=False):
|
||||
msg = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True)
|
||||
msg = isctest.query.create(qname, qtype)
|
||||
query_func = isctest.query.tcp if tcp else isctest.query.udp
|
||||
response = query_func(msg, server.ip, expected_rcode=dns.rcode.NOERROR)
|
||||
return response
|
||||
|
|
|
|||
|
|
@ -9,13 +9,13 @@
|
|||
# See the COPYRIGHT file distributed with this work for additional
|
||||
# information regarding copyright ownership.
|
||||
|
||||
import isctest
|
||||
import dns
|
||||
|
||||
import dns.message
|
||||
import isctest
|
||||
|
||||
|
||||
def test_database(servers, templates):
|
||||
msg = dns.message.make_query("database.", "SOA")
|
||||
msg = isctest.query.create("database.", "SOA")
|
||||
|
||||
# checking pre reload zone
|
||||
res = isctest.query.tcp(msg, "10.53.0.1")
|
||||
|
|
|
|||
|
|
@ -18,9 +18,8 @@ import subprocess
|
|||
import isctest
|
||||
import pytest
|
||||
|
||||
import dns.message
|
||||
|
||||
pytest.importorskip("dns", minversion="2.0.0")
|
||||
import dns.rrset
|
||||
|
||||
pytestmark = pytest.mark.extra_artifacts(
|
||||
[
|
||||
|
|
@ -46,7 +45,7 @@ def run_rndc(server, rndc_command):
|
|||
|
||||
def test_dnstap_dispatch_socket_addresses():
|
||||
# Send some query to ns3 so that it records something in its dnstap file.
|
||||
msg = dns.message.make_query("mail.example.", "A")
|
||||
msg = isctest.query.create("mail.example.", "A")
|
||||
res = isctest.query.tcp(msg, "10.53.0.2", expected_rcode=dns.rcode.NOERROR)
|
||||
assert res.answer == [
|
||||
dns.rrset.from_text("mail.example.", 300, "IN", "A", "10.0.0.2")
|
||||
|
|
|
|||
|
|
@ -20,11 +20,12 @@ import pytest
|
|||
|
||||
pytest.importorskip("dns")
|
||||
import dns.exception
|
||||
import dns.message
|
||||
import dns.name
|
||||
import dns.rdataclass
|
||||
import dns.rdatatype
|
||||
|
||||
import isctest
|
||||
|
||||
pytestmark = pytest.mark.extra_artifacts(
|
||||
[
|
||||
"gnutls-cli.*",
|
||||
|
|
@ -35,7 +36,7 @@ pytestmark = pytest.mark.extra_artifacts(
|
|||
|
||||
def test_gnutls_cli_query(gnutls_cli_executable, named_tlsport):
|
||||
# Prepare the example/SOA query which will be sent over TLS.
|
||||
query = dns.message.make_query("example.", dns.rdatatype.SOA)
|
||||
query = isctest.query.create("example.", dns.rdatatype.SOA)
|
||||
query_wire = query.to_wire()
|
||||
query_with_length = struct.pack(">H", len(query_wire)) + query_wire
|
||||
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@
|
|||
# See the COPYRIGHT file distributed with this work for additional
|
||||
# information regarding copyright ownership.
|
||||
|
||||
import dns.message
|
||||
import dns.flags
|
||||
import pytest
|
||||
|
||||
import isctest
|
||||
|
|
@ -29,7 +29,7 @@ pytestmark = pytest.mark.extra_artifacts(
|
|||
|
||||
def test_dsdigest_good():
|
||||
"""Check that validation with enabled digest types works"""
|
||||
msg = dns.message.make_query("a.good.", "A", want_dnssec=True)
|
||||
msg = isctest.query.create("a.good.", "A")
|
||||
res = isctest.query.tcp(
|
||||
msg,
|
||||
"10.53.0.3",
|
||||
|
|
@ -51,7 +51,7 @@ def test_dsdigest_bad():
|
|||
|
||||
def test_dsdigest_insecure():
|
||||
"""Check that validation with not supported digest algorithms is insecure"""
|
||||
msg_ds = dns.message.make_query("bad.", "DS", want_dnssec=True)
|
||||
msg_ds = isctest.query.create("bad.", "DS")
|
||||
res_ds = isctest.query.tcp(
|
||||
msg_ds,
|
||||
"10.53.0.4",
|
||||
|
|
@ -59,7 +59,7 @@ def test_dsdigest_insecure():
|
|||
isctest.check.noerror(res_ds)
|
||||
assert res_ds.flags & dns.flags.AD
|
||||
|
||||
msg_a = dns.message.make_query("a.bad.", "A", want_dnssec=True)
|
||||
msg_a = isctest.query.create("a.bad.", "A")
|
||||
res_a = isctest.query.tcp(
|
||||
msg_a,
|
||||
"10.53.0.4",
|
||||
|
|
|
|||
|
|
@ -9,8 +9,6 @@
|
|||
# See the COPYRIGHT file distributed with this work for additional
|
||||
# information regarding copyright ownership.
|
||||
|
||||
import dns.message
|
||||
|
||||
import isctest
|
||||
|
||||
|
||||
|
|
@ -20,11 +18,11 @@ def test_emptyzones(servers, templates):
|
|||
ns1.rndc("reload")
|
||||
templates.render("ns1/named.conf", {"automatic_empty_zones": True})
|
||||
ns1.rndc("reload")
|
||||
msg = dns.message.make_query("version.bind", "TXT", "CH")
|
||||
msg = isctest.query.create("version.bind", "TXT", "CH")
|
||||
res = isctest.query.tcp(msg, "10.53.0.1")
|
||||
isctest.check.noerror(res)
|
||||
|
||||
# check that allow-transfer { none; } works
|
||||
msg = dns.message.make_query("10.in-addr.arpa", "AXFR")
|
||||
msg = isctest.query.create("10.in-addr.arpa", "AXFR")
|
||||
res = isctest.query.tcp(msg, "10.53.0.1")
|
||||
isctest.check.refused(res)
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@
|
|||
# information regarding copyright ownership.
|
||||
|
||||
|
||||
import dns.flags
|
||||
import dns.message
|
||||
import pytest
|
||||
|
||||
|
|
@ -20,7 +21,7 @@ pytest.importorskip("dns", minversion="2.0.0")
|
|||
|
||||
def test_glue_full_glue_set():
|
||||
"""test that a ccTLD referral gets a full glue set from the root zone"""
|
||||
msg = dns.message.make_query("foo.bar.fi", "A")
|
||||
msg = isctest.query.create("foo.bar.fi", "A")
|
||||
msg.flags &= ~dns.flags.RD
|
||||
res = isctest.query.udp(msg, "10.53.0.1")
|
||||
|
||||
|
|
@ -51,7 +52,7 @@ NS.UU.NET. 172800 IN A 137.39.1.3
|
|||
|
||||
def test_glue_no_glue_set():
|
||||
"""test that out-of-zone glue is not found"""
|
||||
msg = dns.message.make_query("example.net.", "A")
|
||||
msg = isctest.query.create("example.net.", "A")
|
||||
msg.flags &= ~dns.flags.RD
|
||||
res = isctest.query.udp(msg, "10.53.0.1")
|
||||
|
||||
|
|
|
|||
|
|
@ -9,15 +9,11 @@
|
|||
# See the COPYRIGHT file distributed with this work for additional
|
||||
# information regarding copyright ownership.
|
||||
|
||||
import pytest
|
||||
import isctest
|
||||
|
||||
pytest.importorskip("dns")
|
||||
import dns.message
|
||||
|
||||
|
||||
def test_async_hook():
|
||||
msg = dns.message.make_query("example.com.", "A")
|
||||
msg = isctest.query.create("example.com.", "A")
|
||||
res = isctest.query.udp(msg, "10.53.0.1")
|
||||
# the test-async plugin changes the status of any positive answer to NOTIMP
|
||||
isctest.check.notimp(res)
|
||||
|
|
|
|||
|
|
@ -11,10 +11,10 @@
|
|||
|
||||
import os
|
||||
|
||||
import isctest
|
||||
import dns.rrset
|
||||
import pytest
|
||||
|
||||
import dns.message
|
||||
import isctest
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
|
@ -26,7 +26,7 @@ import dns.message
|
|||
],
|
||||
)
|
||||
def test_include_multiplecfg(qname):
|
||||
msg = dns.message.make_query(qname, "A")
|
||||
msg = isctest.query.create(qname, "A")
|
||||
res = isctest.query.tcp(msg, "10.53.0.2")
|
||||
|
||||
isctest.check.noerror(res)
|
||||
|
|
|
|||
|
|
@ -12,8 +12,9 @@
|
|||
import shutil
|
||||
from typing import Optional
|
||||
|
||||
import dns.rcode
|
||||
import dns.flags
|
||||
import dns.message
|
||||
import dns.rcode
|
||||
import dns.zone
|
||||
|
||||
import isctest.log
|
||||
|
|
@ -40,6 +41,55 @@ def servfail(message: dns.message.Message) -> None:
|
|||
rcode(message, dns_rcode.SERVFAIL)
|
||||
|
||||
|
||||
def adflag(message: dns.message.Message) -> None:
|
||||
assert (message.flags & dns.flags.AD) != 0, str(message)
|
||||
|
||||
|
||||
def noadflag(message: dns.message.Message) -> None:
|
||||
assert (message.flags & dns.flags.AD) == 0, str(message)
|
||||
|
||||
|
||||
def rdflag(message: dns.message.Message) -> None:
|
||||
assert (message.flags & dns.flags.RD) != 0, str(message)
|
||||
|
||||
|
||||
def nordflag(message: dns.message.Message) -> None:
|
||||
assert (message.flags & dns.flags.RD) == 0, str(message)
|
||||
|
||||
|
||||
def raflag(message: dns.message.Message) -> None:
|
||||
assert (message.flags & dns.flags.RA) != 0, str(message)
|
||||
|
||||
|
||||
def noraflag(message: dns.message.Message) -> None:
|
||||
assert (message.flags & dns.flags.RA) == 0, str(message)
|
||||
|
||||
|
||||
def section_equal(first_section: list, second_section: list) -> None:
|
||||
for rrset in first_section:
|
||||
assert (
|
||||
rrset in second_section
|
||||
), f"No corresponding RRset found in second section: {rrset}"
|
||||
for rrset in second_section:
|
||||
assert (
|
||||
rrset in first_section
|
||||
), f"No corresponding RRset found in first section: {rrset}"
|
||||
|
||||
|
||||
def same_data(res1: dns.message.Message, res2: dns.message.Message):
|
||||
section_equal(res1.question, res2.question)
|
||||
section_equal(res1.answer, res2.answer)
|
||||
section_equal(res1.authority, res2.authority)
|
||||
section_equal(res1.additional, res2.additional)
|
||||
assert res1.rcode() == res2.rcode()
|
||||
|
||||
|
||||
def same_answer(res1: dns.message.Message, res2: dns.message.Message):
|
||||
section_equal(res1.question, res2.question)
|
||||
section_equal(res1.answer, res2.answer)
|
||||
assert res1.rcode() == res2.rcode()
|
||||
|
||||
|
||||
def rrsets_equal(
|
||||
first_rrset: dns.rrset.RRset,
|
||||
second_rrset: dns.rrset.RRset,
|
||||
|
|
@ -102,6 +152,16 @@ def is_executable(cmd: str, errmsg: str) -> None:
|
|||
assert executable is not None, errmsg
|
||||
|
||||
|
||||
def named_alive(named_proc, resolver_ip):
|
||||
assert named_proc.poll() is None, "named isn't running"
|
||||
msg = isctest.query.create("version.bind", "TXT", "CH")
|
||||
isctest.query.tcp(msg, resolver_ip, expected_rcode=dns_rcode.NOERROR)
|
||||
|
||||
|
||||
def notauth(message: dns.message.Message) -> None:
|
||||
rcode(message, dns.rcode.NOTAUTH)
|
||||
|
||||
|
||||
def nxdomain(message: dns.message.Message) -> None:
|
||||
rcode(message, dns.rcode.NXDOMAIN)
|
||||
|
||||
|
|
@ -114,6 +174,12 @@ def empty_answer(message: dns.message.Message) -> None:
|
|||
assert not message.answer, str(message)
|
||||
|
||||
|
||||
def rr_count_eq(section: list, expected: int):
|
||||
# NOTE: OPT and TSIG records aren't included in the count for ADDITIONAL section
|
||||
count = sum(len(rrset) for rrset in section)
|
||||
assert count == expected, str(section)
|
||||
|
||||
|
||||
def is_response_to(response: dns.message.Message, query: dns.message.Message) -> None:
|
||||
single_question(response)
|
||||
single_question(query)
|
||||
|
|
|
|||
|
|
@ -137,13 +137,13 @@ class NamedInstance:
|
|||
"""
|
||||
return WatchLogFromHere(self.log.path, timeout)
|
||||
|
||||
def reconfigure(self) -> None:
|
||||
def reconfigure(self, **kwargs) -> None:
|
||||
"""
|
||||
Reconfigure this named `instance` and wait until reconfiguration is
|
||||
finished. Raise an `RNDCException` if reconfiguration fails.
|
||||
"""
|
||||
with self.watch_log_from_here() as watcher:
|
||||
self.rndc("reconfig")
|
||||
self.rndc("reconfig", **kwargs)
|
||||
watcher.wait_for_line("any newly configured zones are now loaded")
|
||||
|
||||
def _rndc_log(self, command: str, response: str) -> None:
|
||||
|
|
|
|||
|
|
@ -74,3 +74,23 @@ def udp(*args, **kwargs) -> Any:
|
|||
|
||||
def tcp(*args, **kwargs) -> Any:
|
||||
return generic_query(dns.query.tcp, *args, **kwargs)
|
||||
|
||||
|
||||
def create(
|
||||
qname,
|
||||
qtype,
|
||||
qclass=dns.rdataclass.IN,
|
||||
dnssec: bool = True,
|
||||
cd: bool = False,
|
||||
ad: bool = True,
|
||||
) -> dns.message.Message:
|
||||
"""Create DNS query with defaults suitable for our tests."""
|
||||
msg = dns.message.make_query(
|
||||
qname, qtype, qclass, use_edns=True, want_dnssec=dnssec
|
||||
)
|
||||
msg.flags = dns.flags.RD
|
||||
if ad:
|
||||
msg.flags |= dns.flags.AD
|
||||
if cd:
|
||||
msg.flags |= dns.flags.CD
|
||||
return msg
|
||||
|
|
|
|||
|
|
@ -14,11 +14,10 @@ import itertools
|
|||
import isctest
|
||||
import pytest
|
||||
|
||||
import dns.message
|
||||
|
||||
# Everything from getting a big answer to creating an RR set with thousands
|
||||
# of records takes minutes of CPU and real time with dnspython < 2.0.0.
|
||||
pytest.importorskip("dns", minversion="2.0.0")
|
||||
import dns.rrset
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
|
@ -32,7 +31,7 @@ pytest.importorskip("dns", minversion="2.0.0")
|
|||
],
|
||||
)
|
||||
def test_limits(name, limit):
|
||||
msg_query = dns.message.make_query(f"{name}.example.", "A")
|
||||
msg_query = isctest.query.create(f"{name}.example.", "A")
|
||||
res = isctest.query.tcp(msg_query, "10.53.0.1", log_response=False)
|
||||
|
||||
iplist = [
|
||||
|
|
@ -46,7 +45,7 @@ def test_limits(name, limit):
|
|||
|
||||
|
||||
def test_limit_exceeded():
|
||||
msg_query = dns.message.make_query("5000.example.", "A")
|
||||
msg_query = isctest.query.create("5000.example.", "A")
|
||||
res = isctest.query.tcp(msg_query, "10.53.0.1", log_response=False)
|
||||
|
||||
assert res.flags & dns.flags.TC, "TC flag was not set"
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ import isctest
|
|||
|
||||
def test_masterfile_include_semantics():
|
||||
"""Test master file $INCLUDE semantics"""
|
||||
msg_axfr = dns.message.make_query("include.", "AXFR")
|
||||
msg_axfr = isctest.query.create("include.", "AXFR")
|
||||
res_axfr = isctest.query.tcp(msg_axfr, "10.53.0.1")
|
||||
axfr_include_semantics = """;ANSWER
|
||||
include. 300 IN SOA ns.include. hostmaster.include. 1 3600 1800 1814400 3600
|
||||
|
|
@ -40,7 +40,7 @@ ns.include. 300 IN A 127.0.0.1
|
|||
|
||||
def test_masterfile_bind_8_compat_semantics():
|
||||
"""Test master file BIND 8 TTL and $TTL semantics compatibility"""
|
||||
msg_axfr = dns.message.make_query("ttl1.", "AXFR")
|
||||
msg_axfr = isctest.query.create("ttl1.", "AXFR")
|
||||
res_axfr = isctest.query.tcp(msg_axfr, "10.53.0.1")
|
||||
axfr_ttl_semantics = """;ANSWER
|
||||
ttl1. 3 IN SOA ns.ttl1. hostmaster.ttl1. 1 3600 1800 1814400 3
|
||||
|
|
@ -59,7 +59,7 @@ ns.ttl1. 3 IN A 10.53.0.1
|
|||
|
||||
def test_masterfile_rfc_1035_semantics():
|
||||
"""Test master file RFC1035 TTL and $TTL semantics"""
|
||||
msg_axfr = dns.message.make_query("ttl2.", "AXFR")
|
||||
msg_axfr = isctest.query.create("ttl2.", "AXFR")
|
||||
res_axfr = isctest.query.tcp(msg_axfr, "10.53.0.1")
|
||||
axfr_ttl_semantics = """;ANSWER
|
||||
ttl2. 1 IN SOA ns.ttl2. hostmaster.ttl2. 1 3600 1800 1814400 3
|
||||
|
|
@ -78,7 +78,7 @@ ns.ttl2. 1 IN A 10.53.0.1
|
|||
|
||||
def test_masterfile_missing_master_file():
|
||||
"""Test nameserver running with a missing master file"""
|
||||
msg_soa = dns.message.make_query("example.", "SOA")
|
||||
msg_soa = isctest.query.create("example.", "SOA")
|
||||
res_soa = isctest.query.tcp(msg_soa, "10.53.0.2")
|
||||
expected_soa_rr = """;ANSWER
|
||||
example. 300 IN SOA mname1. . 2010042407 20 20 1814400 3600
|
||||
|
|
@ -89,7 +89,7 @@ example. 300 IN SOA mname1. . 2010042407 20 20 1814400 3600
|
|||
|
||||
def test_masterfile_missing_master_file_servfail():
|
||||
"""Test nameserver returning SERVFAIL for a missing master file"""
|
||||
msg_soa = dns.message.make_query("missing.", "SOA")
|
||||
msg_soa = isctest.query.create("missing.", "SOA")
|
||||
res_soa = isctest.query.tcp(msg_soa, "10.53.0.2")
|
||||
isctest.check.servfail(res_soa)
|
||||
|
||||
|
|
|
|||
|
|
@ -13,8 +13,6 @@ import pytest
|
|||
|
||||
pytest.importorskip("dns", minversion="2.7.0")
|
||||
|
||||
|
||||
import dns.message
|
||||
import isctest
|
||||
|
||||
|
||||
|
|
@ -22,7 +20,7 @@ import isctest
|
|||
# about twice as large as the answer with compression enabled, while
|
||||
# maintaining identical content.
|
||||
def test_names():
|
||||
msg = dns.message.make_query("example.", "MX")
|
||||
msg = isctest.query.create("example.", "MX")
|
||||
# Getting message size with compression enabled
|
||||
res_enabled = isctest.query.tcp(msg, ip="10.53.0.1", source="10.53.0.1")
|
||||
# Getting message size with compression disabled
|
||||
|
|
|
|||
|
|
@ -15,10 +15,9 @@ import socket
|
|||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
import isctest
|
||||
|
||||
pytest.importorskip("dns")
|
||||
import dns.message
|
||||
|
||||
pytestmark = pytest.mark.extra_artifacts(
|
||||
[
|
||||
|
|
@ -66,6 +65,6 @@ def test_cve_2023_3341(control_port):
|
|||
# Wait for named to (possibly) crash
|
||||
time.sleep(10)
|
||||
|
||||
msg = dns.message.make_query("version.bind", "TXT", "CH")
|
||||
msg = isctest.query.create("version.bind", "TXT", "CH")
|
||||
res = isctest.query.udp(msg, "10.53.0.2")
|
||||
isctest.check.noerror(res)
|
||||
|
|
|
|||
|
|
@ -12,13 +12,16 @@
|
|||
# information regarding copyright ownership.
|
||||
|
||||
import os
|
||||
|
||||
import pytest
|
||||
|
||||
pytest.importorskip("dns", minversion="2.0.0")
|
||||
import dns.rcode
|
||||
import dns.rrset
|
||||
|
||||
import isctest
|
||||
from isctest.compat import dns_rcode
|
||||
|
||||
import dns.message
|
||||
|
||||
pytestmark = pytest.mark.extra_artifacts(
|
||||
[
|
||||
|
|
@ -70,7 +73,7 @@ pytestmark = pytest.mark.extra_artifacts(
|
|||
)
|
||||
def test_rpz_multiple_views(qname, source, rcode):
|
||||
# Wait for the rpz-external.local zone transfer
|
||||
msg = dns.message.make_query("rpz-external.local", "SOA")
|
||||
msg = isctest.query.create("rpz-external.local", "SOA")
|
||||
isctest.query.tcp(
|
||||
msg,
|
||||
ip="10.53.0.3",
|
||||
|
|
@ -84,7 +87,7 @@ def test_rpz_multiple_views(qname, source, rcode):
|
|||
expected_rcode=dns_rcode.NOERROR,
|
||||
)
|
||||
|
||||
msg = dns.message.make_query(qname, "A")
|
||||
msg = isctest.query.create(qname, "A")
|
||||
res = isctest.query.udp(msg, "10.53.0.3", source=source, expected_rcode=rcode)
|
||||
if rcode == dns.rcode.NOERROR:
|
||||
assert res.answer == [dns.rrset.from_text(qname, 300, "IN", "A", "10.53.0.2")]
|
||||
|
|
@ -94,7 +97,7 @@ def test_rpz_passthru_logging():
|
|||
resolver_ip = "10.53.0.3"
|
||||
|
||||
# Should generate a log entry into rpz_passthru.txt
|
||||
msg_allowed = dns.message.make_query("allowed.", "A")
|
||||
msg_allowed = isctest.query.create("allowed.", "A")
|
||||
res_allowed = isctest.query.udp(
|
||||
msg_allowed, resolver_ip, source="10.53.0.1", expected_rcode=dns.rcode.NOERROR
|
||||
)
|
||||
|
|
@ -104,7 +107,7 @@ def test_rpz_passthru_logging():
|
|||
|
||||
# baddomain.com isn't allowed (CNAME .), should return NXDOMAIN
|
||||
# Should generate a log entry into rpz.txt
|
||||
msg_not_allowed = dns.message.make_query("baddomain.", "A")
|
||||
msg_not_allowed = isctest.query.create("baddomain.", "A")
|
||||
res_not_allowed = isctest.query.udp(
|
||||
msg_not_allowed,
|
||||
resolver_ip,
|
||||
|
|
|
|||
|
|
@ -105,7 +105,7 @@ def do_work(named_proc, resolver_ip, instance, kill_method, n_workers, n_queries
|
|||
)
|
||||
|
||||
qname = relname + ".test"
|
||||
msg = dns.message.make_query(qname, "A")
|
||||
msg = isctest.query.create(qname, "A")
|
||||
futures[
|
||||
executor.submit(
|
||||
isctest.query.udp, msg, resolver_ip, timeout=1, attempts=1
|
||||
|
|
|
|||
|
|
@ -56,7 +56,7 @@ class CraftedTKEYQuery:
|
|||
rrset = dns.rrset.from_rdata(dns.name.root, dns.rdatatype.TKEY, rdata)
|
||||
|
||||
# Prepare complete TKEY query to send
|
||||
self.msg = dns.message.make_query(
|
||||
self.msg = isctest.query.create(
|
||||
dns.name.root, dns.rdatatype.TKEY, dns.rdataclass.ANY
|
||||
)
|
||||
self.msg.additional.append(rrset)
|
||||
|
|
|
|||
|
|
@ -13,9 +13,6 @@ import pytest
|
|||
|
||||
import isctest
|
||||
|
||||
pytest.importorskip("dns")
|
||||
import dns.message
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"qname,rdtype,expected_ttl",
|
||||
|
|
@ -27,7 +24,7 @@ import dns.message
|
|||
],
|
||||
)
|
||||
def test_cache_ttl(qname, rdtype, expected_ttl):
|
||||
msg = dns.message.make_query(qname, rdtype)
|
||||
msg = isctest.query.create(qname, rdtype)
|
||||
response = isctest.query.udp(msg, "10.53.0.2")
|
||||
for rr in response.answer + response.authority:
|
||||
assert rr.ttl == expected_ttl
|
||||
|
|
|
|||
|
|
@ -94,7 +94,7 @@ def test_wildcard_rdtype_mismatch(
|
|||
# See RFC 4592 section 2.2.1.
|
||||
assume(name == SUFFIX or name.labels[-len(SUFFIX) - 1] != b"*")
|
||||
|
||||
query_msg = dns.message.make_query(name, rdtype)
|
||||
query_msg = isctest.query.create(name, rdtype)
|
||||
response_msg = isctest.query.tcp(query_msg, IP_ADDR, named_port, timeout=TIMEOUT)
|
||||
|
||||
isctest.check.is_response_to(response_msg, query_msg)
|
||||
|
|
@ -111,7 +111,7 @@ def test_wildcard_match(name: dns.name.Name, named_port: int) -> None:
|
|||
# See RFC 4592 section 2.2.1.
|
||||
assume(name.labels[-len(SUFFIX) - 1] != b"*")
|
||||
|
||||
query_msg = dns.message.make_query(name, WILDCARD_RDTYPE)
|
||||
query_msg = isctest.query.create(name, WILDCARD_RDTYPE)
|
||||
response_msg = isctest.query.tcp(query_msg, IP_ADDR, named_port, timeout=TIMEOUT)
|
||||
|
||||
isctest.check.is_response_to(response_msg, query_msg)
|
||||
|
|
@ -140,7 +140,7 @@ def test_wildcard_with_star_not_synthesized(
|
|||
name: dns.name.Name, named_port: int
|
||||
) -> None:
|
||||
"""RFC 4592 section 2.2.1 ghost.*.example."""
|
||||
query_msg = dns.message.make_query(name, WILDCARD_RDTYPE)
|
||||
query_msg = isctest.query.create(name, WILDCARD_RDTYPE)
|
||||
response_msg = isctest.query.tcp(query_msg, IP_ADDR, named_port, timeout=TIMEOUT)
|
||||
|
||||
isctest.check.is_response_to(response_msg, query_msg)
|
||||
|
|
@ -170,7 +170,7 @@ def test_name_in_between_wildcards(name: dns.name.Name, named_port: int) -> None
|
|||
or name.labels[-len(NESTED_SUFFIX) - 1] != b"*"
|
||||
)
|
||||
|
||||
query_msg = dns.message.make_query(name, WILDCARD_RDTYPE)
|
||||
query_msg = isctest.query.create(name, WILDCARD_RDTYPE)
|
||||
response_msg = isctest.query.tcp(query_msg, IP_ADDR, named_port, timeout=TIMEOUT)
|
||||
|
||||
isctest.check.is_response_to(response_msg, query_msg)
|
||||
|
|
@ -201,7 +201,7 @@ def test_name_nested_wildcard_subdomains_not_synthesized(
|
|||
|
||||
`foo.*.*.*.nestedwild.test. A` must not be synthesized.
|
||||
"""
|
||||
query_msg = dns.message.make_query(name, WILDCARD_RDTYPE)
|
||||
query_msg = isctest.query.create(name, WILDCARD_RDTYPE)
|
||||
response_msg = isctest.query.tcp(query_msg, IP_ADDR, named_port, timeout=TIMEOUT)
|
||||
|
||||
isctest.check.is_response_to(response_msg, query_msg)
|
||||
|
|
|
|||
|
|
@ -60,8 +60,8 @@ def test_xferquota(named_port, servers):
|
|||
|
||||
isctest.run.retry_with_timeout(check_line_count, timeout=360)
|
||||
|
||||
axfr_msg = dns.message.make_query("zone000099.example.", "AXFR")
|
||||
a_msg = dns.message.make_query("a.changing.", "A")
|
||||
axfr_msg = isctest.query.create("zone000099.example.", "AXFR")
|
||||
a_msg = isctest.query.create("a.changing.", "A")
|
||||
|
||||
def query_and_compare(msg):
|
||||
ns1response = isctest.query.tcp(msg, "10.53.0.1")
|
||||
|
|
|
|||
Loading…
Reference in a new issue