mirror of
https://github.com/isc-projects/bind9.git
synced 2026-06-11 09:09:59 -04:00
More specific modules (like collections.abc) can now be used. Generated with: ruff check --extend-select UP035 --fix
228 lines
6.5 KiB
Python
228 lines
6.5 KiB
Python
"""
|
|
Copyright (C) Internet Systems Consortium, Inc. ("ISC")
|
|
|
|
SPDX-License-Identifier: MPL-2.0
|
|
|
|
This Source Code Form is subject to the terms of the Mozilla Public
|
|
License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
file, you can obtain one at https://mozilla.org/MPL/2.0/.
|
|
|
|
See the COPYRIGHT file distributed with this work for additional
|
|
information regarding copyright ownership.
|
|
"""
|
|
|
|
from collections.abc import AsyncGenerator
|
|
|
|
import dns.name
|
|
import dns.rcode
|
|
import dns.rdatatype
|
|
|
|
from isctest.asyncserver import (
|
|
AsyncDnsServer,
|
|
DnsResponseSend,
|
|
DomainHandler,
|
|
IgnoreAllQueries,
|
|
QnameHandler,
|
|
QnameQtypeHandler,
|
|
QueryContext,
|
|
ResponseHandler,
|
|
StaticResponseHandler,
|
|
)
|
|
|
|
from ..resolver_ans import (
|
|
DelegationHandler,
|
|
Gl6412AHandler,
|
|
Gl6412Handler,
|
|
Gl6412Ns2Handler,
|
|
Gl6412Ns3Handler,
|
|
rrset,
|
|
rrset_from_list,
|
|
soa_rrset,
|
|
)
|
|
|
|
|
|
class ApexNSHandler(QnameHandler, StaticResponseHandler):
|
|
qnames = ["example.net."]
|
|
qtypes = [dns.rdatatype.NS]
|
|
answer = [rrset(qnames[0], dns.rdatatype.NS, f"ns.{qnames[0]}")]
|
|
additional = [rrset(f"ns.{qnames[0]}", dns.rdatatype.A, "10.53.0.3")]
|
|
|
|
|
|
class BadCnameHandler(QnameHandler, StaticResponseHandler):
|
|
qnames = ["badcname.example.net."]
|
|
answer = [rrset(qnames[0], dns.rdatatype.CNAME, "badcname.example.org.")]
|
|
|
|
|
|
class BadGoodDnameNsHandler(QnameQtypeHandler, StaticResponseHandler):
|
|
qnames = ["baddname.example.net.", "gooddname.example.net."]
|
|
qtypes = [dns.rdatatype.NS]
|
|
authority = [soa_rrset("example.net.")]
|
|
|
|
|
|
class CnameSubHandler(QnameHandler, StaticResponseHandler):
|
|
qnames = ["cname.sub.example.org."]
|
|
answer = [rrset(qnames[0], dns.rdatatype.CNAME, "ok.sub.example.org.")]
|
|
|
|
|
|
class FooBadDnameHandler(QnameHandler, StaticResponseHandler):
|
|
qnames = ["foo.baddname.example.net."]
|
|
answer = [
|
|
rrset("baddname.example.net.", dns.rdatatype.DNAME, "baddname.example.org.")
|
|
]
|
|
|
|
|
|
class FooBarSubTld1Handler(QnameHandler, StaticResponseHandler):
|
|
qnames = ["foo.bar.sub.tld1."]
|
|
answer = [rrset(qnames[0], dns.rdatatype.TXT, "baz")]
|
|
|
|
|
|
class FooGlueInAnswerHandler(QnameHandler, StaticResponseHandler):
|
|
qnames = ["foo.glue-in-answer.example.org."]
|
|
answer = [rrset(qnames[0], dns.rdatatype.A, "192.0.2.1")]
|
|
|
|
|
|
class FooGoodDnameHandler(QnameHandler, StaticResponseHandler):
|
|
qnames = ["foo.gooddname.example.net."]
|
|
answer = [
|
|
rrset("gooddname.example.net.", dns.rdatatype.DNAME, "gooddname.example.org.")
|
|
]
|
|
|
|
|
|
class GoodCnameHandler(QnameHandler, StaticResponseHandler):
|
|
qnames = ["goodcname.example.net."]
|
|
answer = [rrset(qnames[0], dns.rdatatype.CNAME, "goodcname.example.org.")]
|
|
|
|
|
|
class LameExampleOrgDelegation(DelegationHandler):
|
|
domains = ["lame.example.org."]
|
|
server_number = 3
|
|
|
|
|
|
class LargeReferralHandler(QnameHandler, StaticResponseHandler):
|
|
qnames = ["large-referral.example.net."]
|
|
qtypes = [dns.rdatatype.NS]
|
|
authority = [
|
|
rrset_from_list(
|
|
qnames[0],
|
|
dns.rdatatype.NS,
|
|
[f"ns{i}.fake.redirect.com." for i in range(1, 1000)],
|
|
)
|
|
]
|
|
|
|
|
|
class LongCnameHandler(ResponseHandler):
|
|
def match(self, qctx: QueryContext) -> bool:
|
|
return qctx.qname.labels[0].startswith(b"longcname")
|
|
|
|
async def get_responses(
|
|
self, qctx: QueryContext
|
|
) -> AsyncGenerator[DnsResponseSend, None]:
|
|
first_label = qctx.qname.labels[0].replace(b"longcname", b"longcnamex")
|
|
cname_target = f"{dns.name.Name((first_label,) + qctx.qname.labels[1:])}"
|
|
qctx.response.answer.append(
|
|
rrset(qctx.qname, dns.rdatatype.CNAME, cname_target)
|
|
)
|
|
yield DnsResponseSend(qctx.response)
|
|
|
|
|
|
class NodataHandler(QnameHandler, StaticResponseHandler):
|
|
qnames = ["nodata.example.net."]
|
|
|
|
|
|
class NoresponseHandler(QnameHandler, IgnoreAllQueries):
|
|
qnames = ["noresponse.example.net."]
|
|
|
|
|
|
class NsHandler(QnameHandler, StaticResponseHandler):
|
|
qnames = ["ns.example.net."]
|
|
answer = [rrset(qnames[0], dns.rdatatype.A, "10.53.0.3")]
|
|
|
|
|
|
class NxdomainHandler(QnameHandler, StaticResponseHandler):
|
|
qnames = ["nxdomain.example.net."]
|
|
rcode = dns.rcode.NXDOMAIN
|
|
|
|
|
|
class OkSubHandler(QnameHandler):
|
|
qnames = ["ok.sub.example.org.", "www.ok.sub.example.org."]
|
|
|
|
async def get_responses(
|
|
self, qctx: QueryContext
|
|
) -> AsyncGenerator[DnsResponseSend, None]:
|
|
qctx.response.answer.append(rrset(qctx.qname, dns.rdatatype.A, "192.0.2.1"))
|
|
yield DnsResponseSend(qctx.response)
|
|
|
|
|
|
class PartialFormerrHandler(DomainHandler):
|
|
domains = ["partial-formerr."]
|
|
|
|
async def get_responses(
|
|
self, qctx: QueryContext
|
|
) -> AsyncGenerator[DnsResponseSend, None]:
|
|
qctx.response.answer.append(
|
|
rrset(qctx.qname, dns.rdatatype.A, "10.53.0.3", ttl=1)
|
|
)
|
|
yield DnsResponseSend(qctx.response)
|
|
|
|
|
|
class WwwDnameSubHandler(QnameHandler, StaticResponseHandler):
|
|
qnames = ["www.dname.sub.example.org."]
|
|
answer = [
|
|
rrset("dname.sub.example.org.", dns.rdatatype.DNAME, "ok.sub.example.org.")
|
|
]
|
|
|
|
|
|
class WwwHandler(QnameHandler):
|
|
qnames = ["www.example.net."]
|
|
|
|
async def get_responses(
|
|
self, qctx: QueryContext
|
|
) -> AsyncGenerator[DnsResponseSend, None]:
|
|
if qctx.qtype == dns.rdatatype.A:
|
|
qctx.response.answer.append(rrset(qctx.qname, dns.rdatatype.A, "192.0.2.1"))
|
|
elif qctx.qtype == dns.rdatatype.AAAA:
|
|
qctx.response.answer.append(
|
|
rrset(qctx.qname, dns.rdatatype.AAAA, "2001:db8:beef::1")
|
|
)
|
|
yield DnsResponseSend(qctx.response)
|
|
|
|
|
|
class FallbackHandler(StaticResponseHandler):
|
|
answer = [rrset("www.example.com.", dns.rdatatype.A, "1.2.3.4")]
|
|
|
|
|
|
def main() -> None:
|
|
server = AsyncDnsServer(default_aa=True, default_rcode=dns.rcode.NOERROR)
|
|
server.install_response_handlers(
|
|
ApexNSHandler(),
|
|
BadCnameHandler(),
|
|
BadGoodDnameNsHandler(),
|
|
CnameSubHandler(),
|
|
FooBadDnameHandler(),
|
|
FooBarSubTld1Handler(),
|
|
FooGoodDnameHandler(),
|
|
FooGlueInAnswerHandler(),
|
|
Gl6412AHandler(),
|
|
Gl6412Handler(),
|
|
Gl6412Ns2Handler(),
|
|
Gl6412Ns3Handler(),
|
|
GoodCnameHandler(),
|
|
LameExampleOrgDelegation(),
|
|
LargeReferralHandler(),
|
|
LongCnameHandler(),
|
|
NodataHandler(),
|
|
NoresponseHandler(),
|
|
NsHandler(),
|
|
NxdomainHandler(),
|
|
OkSubHandler(),
|
|
PartialFormerrHandler(),
|
|
WwwDnameSubHandler(),
|
|
WwwHandler(),
|
|
)
|
|
|
|
server.install_response_handler(FallbackHandler())
|
|
server.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|