mirror of
https://github.com/isc-projects/bind9.git
synced 2026-06-11 09:09:59 -04:00
chg: test: Clean up custom server code in the "resend_loop" system test
Apply assorted cleanups to `bin/tests/system/resend_loop/ans3/ans.py`. Merge branch 'michal/resend_loop-test-ans3-cleanup' into 'main' See merge request isc-projects/bind9!12063
This commit is contained in:
commit
be827c68a6
2 changed files with 39 additions and 83 deletions
|
|
@ -44,12 +44,8 @@ def _add_cookie(qctx: QueryContext) -> None:
|
|||
for o in qctx.query.options:
|
||||
if o.otype == dns.edns.OptionType.COOKIE:
|
||||
cookie = o
|
||||
try:
|
||||
if len(cookie.server) == 0:
|
||||
cookie.server = cookie.client
|
||||
except AttributeError: # dnspython<2.7.0 compat
|
||||
if len(o.data) == 8:
|
||||
cookie.data *= 2
|
||||
if len(cookie.server) == 0:
|
||||
cookie.server = cookie.client
|
||||
|
||||
qctx.response.use_edns(options=[cookie])
|
||||
return
|
||||
|
|
|
|||
|
|
@ -21,29 +21,13 @@ import dns.rrset
|
|||
from isctest.asyncserver import (
|
||||
AsyncDnsServer,
|
||||
DnsResponseSend,
|
||||
QnameHandler,
|
||||
DomainHandler,
|
||||
QnameQtypeHandler,
|
||||
QueryContext,
|
||||
ResponseHandler,
|
||||
StaticResponseHandler,
|
||||
)
|
||||
|
||||
|
||||
def _get_cookie(qctx: QueryContext):
|
||||
for o in qctx.query.options:
|
||||
if o.otype == dns.edns.OptionType.COOKIE:
|
||||
cookie = o
|
||||
try:
|
||||
if len(cookie.server) == 0:
|
||||
cookie.server = b"\x11\x22\x33\x44\x55\x66\x77\x88"
|
||||
except AttributeError: # dnspython<2.7.0 compat
|
||||
if len(o.data) == 8:
|
||||
cookie.data *= 2
|
||||
|
||||
return cookie
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def rrset(
|
||||
qname: dns.name.Name | str,
|
||||
rtype: dns.rdatatype.RdataType,
|
||||
|
|
@ -53,79 +37,55 @@ def rrset(
|
|||
return dns.rrset.from_text(qname, ttl, dns.rdataclass.IN, rtype, rdata)
|
||||
|
||||
|
||||
class RootNSHandler(QnameHandler, StaticResponseHandler):
|
||||
class RootNsHandler(QnameQtypeHandler, StaticResponseHandler):
|
||||
qnames = ["."]
|
||||
answer = [
|
||||
rrset(".", dns.rdatatype.NS, "a.root-servers.nil."),
|
||||
]
|
||||
additional = [
|
||||
rrset("a.root-servers.nil.", dns.rdatatype.A, "10.53.0.3"),
|
||||
]
|
||||
qtypes = [dns.rdatatype.NS]
|
||||
answer = [rrset(".", dns.rdatatype.NS, "a.root-servers.nil.")]
|
||||
additional = [rrset("a.root-servers.nil.", dns.rdatatype.A, "10.53.0.3")]
|
||||
|
||||
|
||||
class ExampleNSHandler(QnameHandler, StaticResponseHandler):
|
||||
class ExampleNsHandler(QnameQtypeHandler, StaticResponseHandler):
|
||||
qnames = ["example."]
|
||||
answer = [
|
||||
rrset("example.", dns.rdatatype.NS, "ns.example."),
|
||||
]
|
||||
additional = [
|
||||
rrset("ns.example.", dns.rdatatype.A, "10.53.0.3"),
|
||||
]
|
||||
qtypes = [dns.rdatatype.NS]
|
||||
answer = [rrset("example.", dns.rdatatype.NS, "ns.example.")]
|
||||
additional = [rrset("ns.example.", dns.rdatatype.A, "10.53.0.3")]
|
||||
|
||||
|
||||
class CookieHandler(ResponseHandler):
|
||||
def match(self, qctx: QueryContext) -> bool:
|
||||
example = dns.name.from_text("example")
|
||||
return qctx.qname.is_subdomain(example)
|
||||
class ExampleCookieHandler(DomainHandler):
|
||||
domains = ["example."]
|
||||
|
||||
def _get_cookie(self, qctx: QueryContext) -> dns.edns.CookieOption | None:
|
||||
for o in qctx.query.options:
|
||||
if o.otype == dns.edns.OptionType.COOKIE:
|
||||
cookie = o
|
||||
cookie.server = b"\x11\x22\x33\x44\x55\x66\x77\x88"
|
||||
return cookie
|
||||
|
||||
return None
|
||||
|
||||
async def get_responses(
|
||||
self, qctx: QueryContext
|
||||
) -> AsyncGenerator[DnsResponseSend, None]:
|
||||
|
||||
qctx.prepare_new_response()
|
||||
|
||||
# Check for client cookie
|
||||
cookie = _get_cookie(qctx)
|
||||
|
||||
# If missing cookie entirely, just return SERVFAIL
|
||||
if cookie is None:
|
||||
if cookie := self._get_cookie(qctx):
|
||||
# If there is a client cookie, mock BADCOOKIE to trigger
|
||||
# the resend loop logic.
|
||||
qctx.response.use_edns(options=[cookie])
|
||||
qctx.response.set_rcode(dns.rcode.BADCOOKIE)
|
||||
yield DnsResponseSend(qctx.response)
|
||||
else:
|
||||
# If missing cookie entirely, just return SERVFAIL
|
||||
qctx.response.set_rcode(dns.rcode.SERVFAIL)
|
||||
yield DnsResponseSend(qctx.response, authoritative=True)
|
||||
|
||||
# If there is a client cookie, mock BADCOOKIE to trigger
|
||||
# the resend loop logic.
|
||||
qctx.response.use_edns(options=[cookie])
|
||||
qctx.response.set_rcode(dns.rcode.BADCOOKIE)
|
||||
yield DnsResponseSend(qctx.response, authoritative=True)
|
||||
|
||||
|
||||
class NoErrorHandler(ResponseHandler):
|
||||
"""
|
||||
If the query is NOT a subdomain of example, respond with standard NOERROR empty answer
|
||||
"""
|
||||
|
||||
async def get_responses(
|
||||
self, qctx: QueryContext
|
||||
) -> AsyncGenerator[DnsResponseSend, None]:
|
||||
|
||||
qctx.prepare_new_response()
|
||||
qctx.response.set_rcode(dns.rcode.NOERROR)
|
||||
yield DnsResponseSend(qctx.response, authoritative=True)
|
||||
|
||||
|
||||
def resend_server() -> AsyncDnsServer:
|
||||
server = AsyncDnsServer(default_aa=True, default_rcode=dns.rcode.NOERROR)
|
||||
server.install_response_handlers(
|
||||
RootNSHandler(),
|
||||
ExampleNSHandler(),
|
||||
CookieHandler(),
|
||||
NoErrorHandler(),
|
||||
)
|
||||
return server
|
||||
yield DnsResponseSend(qctx.response)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
resend_server().run()
|
||||
server = AsyncDnsServer(default_aa=True, default_rcode=dns.rcode.NOERROR)
|
||||
server.install_response_handlers(
|
||||
RootNsHandler(),
|
||||
ExampleNsHandler(),
|
||||
ExampleCookieHandler(),
|
||||
)
|
||||
server.run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
|
|||
Loading…
Reference in a new issue