bind9/bin/tests/system/resolver/ans8/ans.py
Štěpán Balážik ced002c4ab Replace deprecated typing imports
More specific modules (like collections.abc) can now be used.

Generated with: ruff check --extend-select UP035 --fix
2026-02-20 15:17:32 +01:00

144 lines
3.9 KiB
Python

"""
Copyright (C) Internet Systems Consortium, Inc. ("ISC")
SPDX-License-Identifier: MPL-2.0
This Source Code Form is subject to the terms of the Mozilla Public
License, v. 2.0. If a copy of the MPL was not distributed with this
file, you can obtain one at https://mozilla.org/MPL/2.0/.
See the COPYRIGHT file distributed with this work for additional
information regarding copyright ownership.
"""
from collections.abc import AsyncGenerator
import abc
import dns.flags
import dns.message
import dns.rcode
import dns.rdatatype
from isctest.asyncserver import (
AsyncDnsServer,
DnsProtocol,
DnsResponseSend,
DomainHandler,
QnameHandler,
QnameQtypeHandler,
QueryContext,
ResponseHandler,
StaticResponseHandler,
)
from ..resolver_ans import rrset
class HeaderOnlyHandler(ResponseHandler):
"""
Return an empty DNS message with only header flags set.
"""
@property
@abc.abstractmethod
def flags(self) -> dns.flags.Flag:
raise NotImplementedError
@property
def rcode(self) -> dns.rcode.Rcode:
return dns.rcode.NOERROR
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[DnsResponseSend, None]:
message = dns.message.Message(id=qctx.query.id)
message.use_edns(False)
message.flags = self.flags
message.set_rcode(self.rcode)
yield DnsResponseSend(message, acknowledge_hand_rolled_response=True)
class RefusedOnTcpHandler(QnameHandler, HeaderOnlyHandler):
qnames = ["tcpalso.no-questions."]
flags = dns.flags.QR
rcode = dns.rcode.REFUSED
def match(self, qctx: QueryContext) -> bool:
return qctx.protocol == DnsProtocol.TCP and super().match(qctx)
class TcpFallbackHandler(ResponseHandler):
def match(self, qctx: QueryContext) -> bool:
return qctx.protocol == DnsProtocol.TCP
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[DnsResponseSend, None]:
qctx.response.answer.append(rrset(qctx.qname, dns.rdatatype.A, "1.2.3.4"))
yield DnsResponseSend(qctx.response)
class FormerrToAllHandler(DomainHandler, StaticResponseHandler):
domains = ["formerr-to-all."]
rcode = dns.rcode.FORMERR
class NoQuestionsNSHandler(QnameQtypeHandler, StaticResponseHandler):
qnames = ["no-questions."]
qtypes = [dns.rdatatype.NS]
answer = [rrset(qnames[0], dns.rdatatype.NS, f"ns.{qnames[0]}")]
additional = [rrset(f"ns.{qnames[0]}", dns.rdatatype.A, "10.53.0.8")]
class NsNoQuestionsAHandler(QnameHandler):
qnames = ["ns.no-questions."]
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[DnsResponseSend, None]:
if qctx.qtype == dns.rdatatype.A:
a_rrset = rrset(qctx.qname, dns.rdatatype.A, "10.53.0.8")
qctx.response.answer.append(a_rrset)
yield DnsResponseSend(qctx.response)
class TcpalsoNoQuestionsHandler(QnameHandler, HeaderOnlyHandler):
qnames = ["tcpalso.no-questions."]
flags = dns.flags.QR | dns.flags.TC
rcode = dns.rcode.REFUSED
class TruncatedNoQuestionsHandler(QnameHandler, HeaderOnlyHandler):
qnames = ["truncated.no-questions."]
flags = dns.flags.QR | dns.flags.AA | dns.flags.TC
class FallbackHandler(HeaderOnlyHandler):
flags = dns.flags.QR | dns.flags.AA
def main() -> None:
server = AsyncDnsServer(default_aa=True, default_rcode=dns.rcode.NOERROR)
# Install TCP handlers first so they take precedence
server.install_response_handlers(
RefusedOnTcpHandler(),
TcpFallbackHandler(),
)
# Install UDP handlers
server.install_response_handlers(
FormerrToAllHandler(),
NoQuestionsNSHandler(),
NsNoQuestionsAHandler(),
TcpalsoNoQuestionsHandler(),
TruncatedNoQuestionsHandler(),
)
server.install_response_handler(FallbackHandler())
server.run()
if __name__ == "__main__":
main()