diff --git a/bin/tests/system/cookie/cookie_ans.py b/bin/tests/system/cookie/cookie_ans.py index 3b6f0406f2..59f3d1fdf5 100644 --- a/bin/tests/system/cookie/cookie_ans.py +++ b/bin/tests/system/cookie/cookie_ans.py @@ -44,12 +44,8 @@ def _add_cookie(qctx: QueryContext) -> None: for o in qctx.query.options: if o.otype == dns.edns.OptionType.COOKIE: cookie = o - try: - if len(cookie.server) == 0: - cookie.server = cookie.client - except AttributeError: # dnspython<2.7.0 compat - if len(o.data) == 8: - cookie.data *= 2 + if len(cookie.server) == 0: + cookie.server = cookie.client qctx.response.use_edns(options=[cookie]) return diff --git a/bin/tests/system/resend_loop/ans3/ans.py b/bin/tests/system/resend_loop/ans3/ans.py index 217bae0301..2423fe1314 100644 --- a/bin/tests/system/resend_loop/ans3/ans.py +++ b/bin/tests/system/resend_loop/ans3/ans.py @@ -14,112 +14,70 @@ from collections.abc import AsyncGenerator import dns.edns import dns.name import dns.rcode +import dns.rdataclass import dns.rdatatype import dns.rrset from isctest.asyncserver import ( AsyncDnsServer, DnsResponseSend, + DomainHandler, + QnameQtypeHandler, QueryContext, - ResponseHandler, + StaticResponseHandler, ) -def _get_cookie(qctx: QueryContext): - for o in qctx.query.options: - if o.otype == dns.edns.OptionType.COOKIE: - cookie = o - try: - if len(cookie.server) == 0: - cookie.server = b"\x11\x22\x33\x44\x55\x66\x77\x88" - except AttributeError: # dnspython<2.7.0 compat - if len(o.data) == 8: - cookie.data *= 2 - - return cookie - - return None +def rrset( + qname: dns.name.Name | str, + rtype: dns.rdatatype.RdataType, + rdata: str, + ttl: int = 300, +) -> dns.rrset.RRset: + return dns.rrset.from_text(qname, ttl, dns.rdataclass.IN, rtype, rdata) -class PrimeHandler(ResponseHandler): - """ - Specifically handle priming query for "." NS (type 2) - """ +class RootNsHandler(QnameQtypeHandler, StaticResponseHandler): + qnames = ["."] + qtypes = [dns.rdatatype.NS] + answer = [rrset(".", dns.rdatatype.NS, "a.root-servers.nil.")] + additional = [rrset("a.root-servers.nil.", dns.rdatatype.A, "10.53.0.3")] - def match(self, qctx: QueryContext) -> bool: - return len(qctx.qname.labels) == 0 and qctx.qtype == dns.rdatatype.NS + +class ExampleCookieHandler(DomainHandler): + domains = ["example."] + + def _get_cookie(self, qctx: QueryContext) -> dns.edns.CookieOption | None: + for o in qctx.query.options: + if o.otype == dns.edns.OptionType.COOKIE: + cookie = o + cookie.server = b"\x11\x22\x33\x44\x55\x66\x77\x88" + return cookie + + return None async def get_responses( self, qctx: QueryContext ) -> AsyncGenerator[DnsResponseSend, None]: - - ns_rrset = dns.rrset.from_text( - ".", dns.rdatatype.NS, qctx.qclass, "a.root-servers.nil." - ) - a_rrset = dns.rrset.from_text( - "a.root-servers.nil.", dns.rdatatype.A, qctx.qclass, "10.53.0.3" - ) - - response = qctx.prepare_new_response(with_zone_data=False) - response.set_rcode(dns.rcode.NOERROR) - response.answer.append(ns_rrset) - response.additional.append(a_rrset) - - yield DnsResponseSend(response, authoritative=True) - - -class CookieHandler(ResponseHandler): - def match(self, qctx: QueryContext) -> bool: - example = dns.name.from_text("example") - return qctx.qname.is_subdomain(example) - - async def get_responses( - self, qctx: QueryContext - ) -> AsyncGenerator[DnsResponseSend, None]: - - qctx.prepare_new_response() - - # Check for client cookie - cookie = _get_cookie(qctx) - - # If missing cookie entirely, just return SERVFAIL - if cookie is None: + if cookie := self._get_cookie(qctx): + # If there is a client cookie, mock BADCOOKIE to trigger + # the resend loop logic. + qctx.response.use_edns(options=[cookie]) + qctx.response.set_rcode(dns.rcode.BADCOOKIE) + yield DnsResponseSend(qctx.response) + else: + # If missing cookie entirely, just return SERVFAIL qctx.response.set_rcode(dns.rcode.SERVFAIL) - yield DnsResponseSend(qctx.response, authoritative=True) - - # If there is a client cookie, mock BADCOOKIE to trigger - # the resend loop logic. - qctx.response.use_edns(options=[cookie]) - qctx.response.set_rcode(dns.rcode.BADCOOKIE) - yield DnsResponseSend(qctx.response, authoritative=True) - - -class NoErrorHandler(ResponseHandler): - """ - If the query is NOT a subdomain of example, respond with standard NOERROR empty answer - """ - - async def get_responses( - self, qctx: QueryContext - ) -> AsyncGenerator[DnsResponseSend, None]: - - qctx.prepare_new_response() - qctx.response.set_rcode(dns.rcode.NOERROR) - yield DnsResponseSend(qctx.response, authoritative=True) - - -def resend_server() -> AsyncDnsServer: - server = AsyncDnsServer(default_aa=True, default_rcode=dns.rcode.NOERROR) - server.install_response_handlers( - PrimeHandler(), - CookieHandler(), - NoErrorHandler(), - ) - return server + yield DnsResponseSend(qctx.response) def main() -> None: - resend_server().run() + server = AsyncDnsServer(default_aa=True, default_rcode=dns.rcode.NOERROR) + server.install_response_handlers( + RootNsHandler(), + ExampleCookieHandler(), + ) + server.run() if __name__ == "__main__":