mirror of
https://github.com/isc-projects/bind9.git
synced 2026-06-10 17:30:22 -04:00
More specific modules (like collections.abc) can now be used. Generated with: ruff check --extend-select UP035 --fix
229 lines
6 KiB
Python
229 lines
6 KiB
Python
"""
|
|
Copyright (C) Internet Systems Consortium, Inc. ("ISC")
|
|
|
|
SPDX-License-Identifier: MPL-2.0
|
|
|
|
This Source Code Form is subject to the terms of the Mozilla Public
|
|
License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
file, you can obtain one at https://mozilla.org/MPL/2.0/.
|
|
|
|
See the COPYRIGHT file distributed with this work for additional
|
|
information regarding copyright ownership.
|
|
"""
|
|
|
|
from collections.abc import AsyncGenerator
|
|
|
|
import dns.edns
|
|
import dns.name
|
|
import dns.rcode
|
|
import dns.rdatatype
|
|
import dns.rrset
|
|
|
|
from isctest.asyncserver import (
|
|
AsyncDnsServer,
|
|
DnsResponseSend,
|
|
DomainHandler,
|
|
IgnoreAllQueries,
|
|
QnameHandler,
|
|
QnameQtypeHandler,
|
|
QueryContext,
|
|
ResponseHandler,
|
|
StaticResponseHandler,
|
|
)
|
|
|
|
from ..resolver_ans import (
|
|
DelegationHandler,
|
|
Gl6412AHandler,
|
|
Gl6412Handler,
|
|
Gl6412Ns2Handler,
|
|
Gl6412Ns3Handler,
|
|
rrset,
|
|
setup_delegation,
|
|
soa_rrset,
|
|
)
|
|
|
|
|
|
class BadGoodDnameNsHandler(QnameQtypeHandler, StaticResponseHandler):
|
|
qnames = [
|
|
"baddname.example.org.",
|
|
"gooddname.example.org.",
|
|
]
|
|
qtypes = [dns.rdatatype.NS]
|
|
answer = [rrset("example.org.", dns.rdatatype.NS, "a.root-servers.nil.")]
|
|
authoritative = True
|
|
|
|
|
|
def _cname_rrsets(
|
|
qname: dns.name.Name | str,
|
|
) -> tuple[dns.rrset.RRset, dns.rrset.RRset]:
|
|
return (
|
|
rrset(qname, dns.rdatatype.CNAME, f"{qname}"),
|
|
rrset(qname, dns.rdatatype.A, "1.2.3.4"),
|
|
)
|
|
|
|
|
|
class Cname1Handler(QnameHandler, StaticResponseHandler):
|
|
qnames = ["cname1.example.com."]
|
|
# Data for the "cname + other data / 1" test
|
|
answer = _cname_rrsets(qnames[0])
|
|
authoritative = False
|
|
|
|
|
|
class Cname2Handler(QnameHandler, StaticResponseHandler):
|
|
qnames = ["cname2.example.com."]
|
|
# Data for the "cname + other data / 2" test: same RRs in opposite order
|
|
answer = tuple(reversed(_cname_rrsets(qnames[0])))
|
|
authoritative = False
|
|
|
|
|
|
class ExampleOrgHandler(QnameHandler):
|
|
qnames = [
|
|
"www.example.org",
|
|
"badcname.example.org",
|
|
"goodcname.example.org",
|
|
"foo.baddname.example.org",
|
|
"foo.gooddname.example.org",
|
|
]
|
|
|
|
async def get_responses(
|
|
self, qctx: QueryContext
|
|
) -> AsyncGenerator[DnsResponseSend, None]:
|
|
# Data for address/alias filtering.
|
|
if qctx.qtype == dns.rdatatype.A:
|
|
a_rrset = rrset(qctx.qname, dns.rdatatype.A, "192.0.2.1")
|
|
qctx.response.answer.append(a_rrset)
|
|
elif qctx.qtype == dns.rdatatype.AAAA:
|
|
aaaa_rrset = rrset(qctx.qname, dns.rdatatype.AAAA, "2001:db8:beef::1")
|
|
qctx.response.answer.append(aaaa_rrset)
|
|
yield DnsResponseSend(qctx.response, authoritative=True)
|
|
|
|
|
|
class NoResponseExampleUdpHandler(QnameHandler, IgnoreAllQueries):
|
|
qnames = ["noresponse.exampleudp.net."]
|
|
|
|
|
|
class RootNsHandler(QnameQtypeHandler):
|
|
qnames = [
|
|
"example.com.",
|
|
"com.",
|
|
"example.org.",
|
|
"org.",
|
|
"net.",
|
|
]
|
|
|
|
qtypes = [dns.rdatatype.NS]
|
|
|
|
async def get_responses(
|
|
self, qctx: QueryContext
|
|
) -> AsyncGenerator[DnsResponseSend, None]:
|
|
root_ns = rrset(qctx.qname, dns.rdatatype.NS, "a.root-servers.nil.")
|
|
qctx.response.answer.append(root_ns)
|
|
yield DnsResponseSend(qctx.response, authoritative=True)
|
|
|
|
|
|
class ZoneVersionHandler(QnameHandler):
|
|
qnames = ["zoneversion."]
|
|
|
|
async def get_responses(
|
|
self, qctx: QueryContext
|
|
) -> AsyncGenerator[DnsResponseSend, None]:
|
|
qctx.response.authority.append(soa_rrset("."))
|
|
zoneversion_opt = dns.edns.GenericOption(19, bytes.fromhex("000101022304")) # type: ignore
|
|
qctx.response.use_edns(edns=0, options=[zoneversion_opt])
|
|
yield DnsResponseSend(qctx.response, authoritative=False)
|
|
|
|
|
|
class Ns2Delegation(DelegationHandler):
|
|
domains = ["exampleudp.net."]
|
|
server_number = 2
|
|
|
|
|
|
class Ns3Delegation(DelegationHandler):
|
|
domains = [
|
|
"example.net.",
|
|
"lame.example.org.",
|
|
"sub.example.org.",
|
|
]
|
|
server_number = 3
|
|
|
|
|
|
class Ns3GlueInAnswerDelegation(DelegationHandler):
|
|
domains = ["glue-in-answer.example.org."]
|
|
server_number = 3
|
|
|
|
async def get_responses(
|
|
self, qctx: QueryContext
|
|
) -> AsyncGenerator[DnsResponseSend, None]:
|
|
async for dns_response in super().get_responses(qctx):
|
|
dns_response.response.answer += dns_response.response.additional
|
|
yield dns_response
|
|
|
|
|
|
class Ns4Delegation(DelegationHandler):
|
|
domains = ["broken."]
|
|
server_number = 4
|
|
|
|
|
|
class Ns6Delegation(DelegationHandler):
|
|
domains = [
|
|
"redirect.com.",
|
|
"tld1.",
|
|
]
|
|
server_number = 6
|
|
|
|
|
|
class Ns7Delegation(DelegationHandler):
|
|
domains = ["tld2."]
|
|
server_number = 7
|
|
|
|
|
|
class PartialFormerrHandler(DomainHandler, StaticResponseHandler):
|
|
domains = ["partial-formerr."]
|
|
authoritative = False
|
|
rcode = dns.rcode.FORMERR
|
|
|
|
|
|
class FallbackHandler(ResponseHandler):
|
|
async def get_responses(
|
|
self, qctx: QueryContext
|
|
) -> AsyncGenerator[DnsResponseSend, None]:
|
|
setup_delegation(qctx, "below.www.example.com.", 3)
|
|
yield DnsResponseSend(qctx.response, authoritative=False)
|
|
|
|
|
|
def main() -> None:
|
|
server = AsyncDnsServer(default_rcode=dns.rcode.NOERROR)
|
|
|
|
# Install QnameHandlers first
|
|
server.install_response_handlers(
|
|
BadGoodDnameNsHandler(),
|
|
Cname1Handler(),
|
|
Cname2Handler(),
|
|
ExampleOrgHandler(),
|
|
Gl6412AHandler(),
|
|
Gl6412Handler(),
|
|
Gl6412Ns2Handler(),
|
|
Gl6412Ns3Handler(),
|
|
NoResponseExampleUdpHandler(),
|
|
RootNsHandler(),
|
|
ZoneVersionHandler(),
|
|
)
|
|
|
|
# Then install DomainHandlers
|
|
server.install_response_handlers(
|
|
Ns2Delegation(),
|
|
Ns3Delegation(),
|
|
Ns3GlueInAnswerDelegation(),
|
|
Ns4Delegation(),
|
|
Ns6Delegation(),
|
|
Ns7Delegation(),
|
|
PartialFormerrHandler(),
|
|
)
|
|
|
|
# Finally, install the fallback handler
|
|
server.install_response_handler(FallbackHandler())
|
|
server.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|