bind9/bin/tests/system/resolver/ans2/ans.py
Štěpán Balážik ced002c4ab Replace deprecated typing imports
More specific modules (like collections.abc) can now be used.

Generated with: ruff check --extend-select UP035 --fix
2026-02-20 15:17:32 +01:00

229 lines
6 KiB
Python

"""
Copyright (C) Internet Systems Consortium, Inc. ("ISC")
SPDX-License-Identifier: MPL-2.0
This Source Code Form is subject to the terms of the Mozilla Public
License, v. 2.0. If a copy of the MPL was not distributed with this
file, you can obtain one at https://mozilla.org/MPL/2.0/.
See the COPYRIGHT file distributed with this work for additional
information regarding copyright ownership.
"""
from collections.abc import AsyncGenerator
import dns.edns
import dns.name
import dns.rcode
import dns.rdatatype
import dns.rrset
from isctest.asyncserver import (
AsyncDnsServer,
DnsResponseSend,
DomainHandler,
IgnoreAllQueries,
QnameHandler,
QnameQtypeHandler,
QueryContext,
ResponseHandler,
StaticResponseHandler,
)
from ..resolver_ans import (
DelegationHandler,
Gl6412AHandler,
Gl6412Handler,
Gl6412Ns2Handler,
Gl6412Ns3Handler,
rrset,
setup_delegation,
soa_rrset,
)
class BadGoodDnameNsHandler(QnameQtypeHandler, StaticResponseHandler):
qnames = [
"baddname.example.org.",
"gooddname.example.org.",
]
qtypes = [dns.rdatatype.NS]
answer = [rrset("example.org.", dns.rdatatype.NS, "a.root-servers.nil.")]
authoritative = True
def _cname_rrsets(
qname: dns.name.Name | str,
) -> tuple[dns.rrset.RRset, dns.rrset.RRset]:
return (
rrset(qname, dns.rdatatype.CNAME, f"{qname}"),
rrset(qname, dns.rdatatype.A, "1.2.3.4"),
)
class Cname1Handler(QnameHandler, StaticResponseHandler):
qnames = ["cname1.example.com."]
# Data for the "cname + other data / 1" test
answer = _cname_rrsets(qnames[0])
authoritative = False
class Cname2Handler(QnameHandler, StaticResponseHandler):
qnames = ["cname2.example.com."]
# Data for the "cname + other data / 2" test: same RRs in opposite order
answer = tuple(reversed(_cname_rrsets(qnames[0])))
authoritative = False
class ExampleOrgHandler(QnameHandler):
qnames = [
"www.example.org",
"badcname.example.org",
"goodcname.example.org",
"foo.baddname.example.org",
"foo.gooddname.example.org",
]
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[DnsResponseSend, None]:
# Data for address/alias filtering.
if qctx.qtype == dns.rdatatype.A:
a_rrset = rrset(qctx.qname, dns.rdatatype.A, "192.0.2.1")
qctx.response.answer.append(a_rrset)
elif qctx.qtype == dns.rdatatype.AAAA:
aaaa_rrset = rrset(qctx.qname, dns.rdatatype.AAAA, "2001:db8:beef::1")
qctx.response.answer.append(aaaa_rrset)
yield DnsResponseSend(qctx.response, authoritative=True)
class NoResponseExampleUdpHandler(QnameHandler, IgnoreAllQueries):
qnames = ["noresponse.exampleudp.net."]
class RootNsHandler(QnameQtypeHandler):
qnames = [
"example.com.",
"com.",
"example.org.",
"org.",
"net.",
]
qtypes = [dns.rdatatype.NS]
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[DnsResponseSend, None]:
root_ns = rrset(qctx.qname, dns.rdatatype.NS, "a.root-servers.nil.")
qctx.response.answer.append(root_ns)
yield DnsResponseSend(qctx.response, authoritative=True)
class ZoneVersionHandler(QnameHandler):
qnames = ["zoneversion."]
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[DnsResponseSend, None]:
qctx.response.authority.append(soa_rrset("."))
zoneversion_opt = dns.edns.GenericOption(19, bytes.fromhex("000101022304")) # type: ignore
qctx.response.use_edns(edns=0, options=[zoneversion_opt])
yield DnsResponseSend(qctx.response, authoritative=False)
class Ns2Delegation(DelegationHandler):
domains = ["exampleudp.net."]
server_number = 2
class Ns3Delegation(DelegationHandler):
domains = [
"example.net.",
"lame.example.org.",
"sub.example.org.",
]
server_number = 3
class Ns3GlueInAnswerDelegation(DelegationHandler):
domains = ["glue-in-answer.example.org."]
server_number = 3
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[DnsResponseSend, None]:
async for dns_response in super().get_responses(qctx):
dns_response.response.answer += dns_response.response.additional
yield dns_response
class Ns4Delegation(DelegationHandler):
domains = ["broken."]
server_number = 4
class Ns6Delegation(DelegationHandler):
domains = [
"redirect.com.",
"tld1.",
]
server_number = 6
class Ns7Delegation(DelegationHandler):
domains = ["tld2."]
server_number = 7
class PartialFormerrHandler(DomainHandler, StaticResponseHandler):
domains = ["partial-formerr."]
authoritative = False
rcode = dns.rcode.FORMERR
class FallbackHandler(ResponseHandler):
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[DnsResponseSend, None]:
setup_delegation(qctx, "below.www.example.com.", 3)
yield DnsResponseSend(qctx.response, authoritative=False)
def main() -> None:
server = AsyncDnsServer(default_rcode=dns.rcode.NOERROR)
# Install QnameHandlers first
server.install_response_handlers(
BadGoodDnameNsHandler(),
Cname1Handler(),
Cname2Handler(),
ExampleOrgHandler(),
Gl6412AHandler(),
Gl6412Handler(),
Gl6412Ns2Handler(),
Gl6412Ns3Handler(),
NoResponseExampleUdpHandler(),
RootNsHandler(),
ZoneVersionHandler(),
)
# Then install DomainHandlers
server.install_response_handlers(
Ns2Delegation(),
Ns3Delegation(),
Ns3GlueInAnswerDelegation(),
Ns4Delegation(),
Ns6Delegation(),
Ns7Delegation(),
PartialFormerrHandler(),
)
# Finally, install the fallback handler
server.install_response_handler(FallbackHandler())
server.run()
if __name__ == "__main__":
main()