Add common parts of resolver test custom servers

These will be shared by all the ans*/ans.py files.

(cherry picked from commit 23d9055617)
This commit is contained in:
Štěpán Balážik 2025-12-26 01:06:28 +01:00
parent e466fedcc4
commit c11a7877b9

View file

@ -0,0 +1,146 @@
"""
Copyright (C) Internet Systems Consortium, Inc. ("ISC")
SPDX-License-Identifier: MPL-2.0
This Source Code Form is subject to the terms of the Mozilla Public
License, v. 2.0. If a copy of the MPL was not distributed with this
file, you can obtain one at https://mozilla.org/MPL/2.0/.
See the COPYRIGHT file distributed with this work for additional
information regarding copyright ownership.
"""
from typing import AsyncGenerator, List, NamedTuple, Union
import abc
import dns.name
import dns.rdataclass
import dns.rdatatype
import dns.rrset
from isctest.asyncserver import (
DnsResponseSend,
DomainHandler,
QnameHandler,
QueryContext,
)
def rrset(
qname: Union[dns.name.Name, str],
rtype: dns.rdatatype.RdataType,
rdata: str,
ttl: int = 300,
) -> dns.rrset.RRset:
return dns.rrset.from_text(qname, ttl, dns.rdataclass.IN, rtype, rdata)
def rrset_from_list(
qname: Union[dns.name.Name, str],
rtype: dns.rdatatype.RdataType,
rdata_list: List[str],
ttl: int = 300,
) -> dns.rrset.RRset:
return dns.rrset.from_text_list(qname, ttl, dns.rdataclass.IN, rtype, rdata_list)
def soa_rrset(qname: Union[dns.name.Name, str]) -> dns.rrset.RRset:
return rrset(qname, dns.rdatatype.SOA, ". . 0 0 0 0 0")
class DelegationRRsets(NamedTuple):
ns_rrset: dns.rrset.RRset
a_rrset: dns.rrset.RRset
def delegation_rrsets(
owner: Union[dns.name.Name, str],
server_number: int,
ns_name: Union[dns.name.Name, str, None] = None,
) -> DelegationRRsets:
if ns_name is None:
ns_name = f"ns.{owner}"
ns_rrset = rrset(owner, dns.rdatatype.NS, f"{ns_name}")
a_rrset = rrset(ns_name, dns.rdatatype.A, f"10.53.0.{server_number}")
return DelegationRRsets(ns_rrset, a_rrset)
def setup_delegation(
qctx: QueryContext, owner: Union[dns.name.Name, str], server_number: int
) -> None:
delegation = delegation_rrsets(owner, server_number)
qctx.response.authority.append(delegation.ns_rrset)
qctx.response.additional.append(delegation.a_rrset)
class DelegationHandler(DomainHandler):
@property
@abc.abstractmethod
def server_number(self) -> int:
raise NotImplementedError
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[DnsResponseSend, None]:
setup_delegation(qctx, self.matched_domain, self.server_number)
yield DnsResponseSend(qctx.response, authoritative=False)
class Gl6412AHandler(QnameHandler):
qnames = ["a.gl6412.", "a.a.gl6412."]
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[DnsResponseSend, None]:
qctx.response.authority.append(soa_rrset(qctx.qname))
yield DnsResponseSend(qctx.response)
class Gl6412Handler(QnameHandler):
qnames = ["gl6412."]
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[DnsResponseSend, None]:
if qctx.qtype == dns.rdatatype.SOA:
qctx.response.answer.append(soa_rrset(qctx.qname))
elif qctx.qtype == dns.rdatatype.NS:
# XXX: The delegation is broken here; dot is missing from NS target names.
# I don't know if this is intentional, but for now we are chasing behavior parity.
ns2_rrset = rrset(qctx.qname, dns.rdatatype.NS, f"ns2{qctx.qname}")
ns3_rrset = rrset(qctx.qname, dns.rdatatype.NS, f"ns3{qctx.qname}")
qctx.response.answer.append(ns2_rrset)
qctx.response.answer.append(ns3_rrset)
else:
qctx.response.authority.append(soa_rrset(qctx.qname))
yield DnsResponseSend(qctx.response)
class Gl6412Ns2Handler(QnameHandler):
qnames = ["ns2.gl6412."]
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[DnsResponseSend, None]:
if qctx.qtype == dns.rdatatype.A:
a_rrset = rrset(qctx.qname, dns.rdatatype.A, "10.53.0.2")
qctx.response.answer.append(a_rrset)
else:
qctx.response.authority.append(soa_rrset(qctx.qname))
yield DnsResponseSend(qctx.response)
class Gl6412Ns3Handler(QnameHandler):
qnames = ["ns3.gl6412."]
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[DnsResponseSend, None]:
if qctx.qtype == dns.rdatatype.A:
a_rrset = rrset(qctx.qname, dns.rdatatype.A, "10.53.0.3")
qctx.response.answer.append(a_rrset)
else:
qctx.response.authority.append(soa_rrset(qctx.qname))
yield DnsResponseSend(qctx.response)