[9.20] chg: test: Clean up custom server code in the "resend_loop" system test

Apply assorted cleanups to `bin/tests/system/resend_loop/ans3/ans.py`.

Backport of MR !12063

Merge branch 'backport-michal/resend_loop-test-ans3-cleanup-9.20' into 'bind-9.20'

See merge request isc-projects/bind9!12071
This commit is contained in:
Michał Kępień 2026-05-21 12:42:36 +02:00
commit ebe67bf017
2 changed files with 45 additions and 91 deletions

View file

@ -44,12 +44,8 @@ def _add_cookie(qctx: QueryContext) -> None:
for o in qctx.query.options:
if o.otype == dns.edns.OptionType.COOKIE:
cookie = o
try:
if len(cookie.server) == 0:
cookie.server = cookie.client
except AttributeError: # dnspython<2.7.0 compat
if len(o.data) == 8:
cookie.data *= 2
if len(cookie.server) == 0:
cookie.server = cookie.client
qctx.response.use_edns(options=[cookie])
return

View file

@ -14,112 +14,70 @@ from collections.abc import AsyncGenerator
import dns.edns
import dns.name
import dns.rcode
import dns.rdataclass
import dns.rdatatype
import dns.rrset
from isctest.asyncserver import (
AsyncDnsServer,
DnsResponseSend,
DomainHandler,
QnameQtypeHandler,
QueryContext,
ResponseHandler,
StaticResponseHandler,
)
def _get_cookie(qctx: QueryContext):
for o in qctx.query.options:
if o.otype == dns.edns.OptionType.COOKIE:
cookie = o
try:
if len(cookie.server) == 0:
cookie.server = b"\x11\x22\x33\x44\x55\x66\x77\x88"
except AttributeError: # dnspython<2.7.0 compat
if len(o.data) == 8:
cookie.data *= 2
return cookie
return None
def rrset(
qname: dns.name.Name | str,
rtype: dns.rdatatype.RdataType,
rdata: str,
ttl: int = 300,
) -> dns.rrset.RRset:
return dns.rrset.from_text(qname, ttl, dns.rdataclass.IN, rtype, rdata)
class PrimeHandler(ResponseHandler):
"""
Specifically handle priming query for "." NS (type 2)
"""
class RootNsHandler(QnameQtypeHandler, StaticResponseHandler):
qnames = ["."]
qtypes = [dns.rdatatype.NS]
answer = [rrset(".", dns.rdatatype.NS, "a.root-servers.nil.")]
additional = [rrset("a.root-servers.nil.", dns.rdatatype.A, "10.53.0.3")]
def match(self, qctx: QueryContext) -> bool:
return len(qctx.qname.labels) == 0 and qctx.qtype == dns.rdatatype.NS
class ExampleCookieHandler(DomainHandler):
domains = ["example."]
def _get_cookie(self, qctx: QueryContext) -> dns.edns.CookieOption | None:
for o in qctx.query.options:
if o.otype == dns.edns.OptionType.COOKIE:
cookie = o
cookie.server = b"\x11\x22\x33\x44\x55\x66\x77\x88"
return cookie
return None
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[DnsResponseSend, None]:
ns_rrset = dns.rrset.from_text(
".", dns.rdatatype.NS, qctx.qclass, "a.root-servers.nil."
)
a_rrset = dns.rrset.from_text(
"a.root-servers.nil.", dns.rdatatype.A, qctx.qclass, "10.53.0.3"
)
response = qctx.prepare_new_response(with_zone_data=False)
response.set_rcode(dns.rcode.NOERROR)
response.answer.append(ns_rrset)
response.additional.append(a_rrset)
yield DnsResponseSend(response, authoritative=True)
class CookieHandler(ResponseHandler):
def match(self, qctx: QueryContext) -> bool:
example = dns.name.from_text("example")
return qctx.qname.is_subdomain(example)
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[DnsResponseSend, None]:
qctx.prepare_new_response()
# Check for client cookie
cookie = _get_cookie(qctx)
# If missing cookie entirely, just return SERVFAIL
if cookie is None:
if cookie := self._get_cookie(qctx):
# If there is a client cookie, mock BADCOOKIE to trigger
# the resend loop logic.
qctx.response.use_edns(options=[cookie])
qctx.response.set_rcode(dns.rcode.BADCOOKIE)
yield DnsResponseSend(qctx.response)
else:
# If missing cookie entirely, just return SERVFAIL
qctx.response.set_rcode(dns.rcode.SERVFAIL)
yield DnsResponseSend(qctx.response, authoritative=True)
# If there is a client cookie, mock BADCOOKIE to trigger
# the resend loop logic.
qctx.response.use_edns(options=[cookie])
qctx.response.set_rcode(dns.rcode.BADCOOKIE)
yield DnsResponseSend(qctx.response, authoritative=True)
class NoErrorHandler(ResponseHandler):
"""
If the query is NOT a subdomain of example, respond with standard NOERROR empty answer
"""
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[DnsResponseSend, None]:
qctx.prepare_new_response()
qctx.response.set_rcode(dns.rcode.NOERROR)
yield DnsResponseSend(qctx.response, authoritative=True)
def resend_server() -> AsyncDnsServer:
server = AsyncDnsServer(default_aa=True, default_rcode=dns.rcode.NOERROR)
server.install_response_handlers(
PrimeHandler(),
CookieHandler(),
NoErrorHandler(),
)
return server
yield DnsResponseSend(qctx.response)
def main() -> None:
resend_server().run()
server = AsyncDnsServer(default_aa=True, default_rcode=dns.rcode.NOERROR)
server.install_response_handlers(
RootNsHandler(),
ExampleCookieHandler(),
)
server.run()
if __name__ == "__main__":