diff --git a/bin/tests/system/cookie/cookie_ans.py b/bin/tests/system/cookie/cookie_ans.py index 3b6f0406f2..59f3d1fdf5 100644 --- a/bin/tests/system/cookie/cookie_ans.py +++ b/bin/tests/system/cookie/cookie_ans.py @@ -44,12 +44,8 @@ def _add_cookie(qctx: QueryContext) -> None: for o in qctx.query.options: if o.otype == dns.edns.OptionType.COOKIE: cookie = o - try: - if len(cookie.server) == 0: - cookie.server = cookie.client - except AttributeError: # dnspython<2.7.0 compat - if len(o.data) == 8: - cookie.data *= 2 + if len(cookie.server) == 0: + cookie.server = cookie.client qctx.response.use_edns(options=[cookie]) return diff --git a/bin/tests/system/resend_loop/ans3/ans.py b/bin/tests/system/resend_loop/ans3/ans.py index d0cb6d2935..17ff396f11 100644 --- a/bin/tests/system/resend_loop/ans3/ans.py +++ b/bin/tests/system/resend_loop/ans3/ans.py @@ -21,29 +21,13 @@ import dns.rrset from isctest.asyncserver import ( AsyncDnsServer, DnsResponseSend, - QnameHandler, + DomainHandler, + QnameQtypeHandler, QueryContext, - ResponseHandler, StaticResponseHandler, ) -def _get_cookie(qctx: QueryContext): - for o in qctx.query.options: - if o.otype == dns.edns.OptionType.COOKIE: - cookie = o - try: - if len(cookie.server) == 0: - cookie.server = b"\x11\x22\x33\x44\x55\x66\x77\x88" - except AttributeError: # dnspython<2.7.0 compat - if len(o.data) == 8: - cookie.data *= 2 - - return cookie - - return None - - def rrset( qname: dns.name.Name | str, rtype: dns.rdatatype.RdataType, @@ -53,79 +37,55 @@ def rrset( return dns.rrset.from_text(qname, ttl, dns.rdataclass.IN, rtype, rdata) -class RootNSHandler(QnameHandler, StaticResponseHandler): +class RootNsHandler(QnameQtypeHandler, StaticResponseHandler): qnames = ["."] - answer = [ - rrset(".", dns.rdatatype.NS, "a.root-servers.nil."), - ] - additional = [ - rrset("a.root-servers.nil.", dns.rdatatype.A, "10.53.0.3"), - ] + qtypes = [dns.rdatatype.NS] + answer = [rrset(".", dns.rdatatype.NS, "a.root-servers.nil.")] + additional = [rrset("a.root-servers.nil.", dns.rdatatype.A, "10.53.0.3")] -class ExampleNSHandler(QnameHandler, StaticResponseHandler): +class ExampleNsHandler(QnameQtypeHandler, StaticResponseHandler): qnames = ["example."] - answer = [ - rrset("example.", dns.rdatatype.NS, "ns.example."), - ] - additional = [ - rrset("ns.example.", dns.rdatatype.A, "10.53.0.3"), - ] + qtypes = [dns.rdatatype.NS] + answer = [rrset("example.", dns.rdatatype.NS, "ns.example.")] + additional = [rrset("ns.example.", dns.rdatatype.A, "10.53.0.3")] -class CookieHandler(ResponseHandler): - def match(self, qctx: QueryContext) -> bool: - example = dns.name.from_text("example") - return qctx.qname.is_subdomain(example) +class ExampleCookieHandler(DomainHandler): + domains = ["example."] + + def _get_cookie(self, qctx: QueryContext) -> dns.edns.CookieOption | None: + for o in qctx.query.options: + if o.otype == dns.edns.OptionType.COOKIE: + cookie = o + cookie.server = b"\x11\x22\x33\x44\x55\x66\x77\x88" + return cookie + + return None async def get_responses( self, qctx: QueryContext ) -> AsyncGenerator[DnsResponseSend, None]: - - qctx.prepare_new_response() - - # Check for client cookie - cookie = _get_cookie(qctx) - - # If missing cookie entirely, just return SERVFAIL - if cookie is None: + if cookie := self._get_cookie(qctx): + # If there is a client cookie, mock BADCOOKIE to trigger + # the resend loop logic. + qctx.response.use_edns(options=[cookie]) + qctx.response.set_rcode(dns.rcode.BADCOOKIE) + yield DnsResponseSend(qctx.response) + else: + # If missing cookie entirely, just return SERVFAIL qctx.response.set_rcode(dns.rcode.SERVFAIL) - yield DnsResponseSend(qctx.response, authoritative=True) - - # If there is a client cookie, mock BADCOOKIE to trigger - # the resend loop logic. - qctx.response.use_edns(options=[cookie]) - qctx.response.set_rcode(dns.rcode.BADCOOKIE) - yield DnsResponseSend(qctx.response, authoritative=True) - - -class NoErrorHandler(ResponseHandler): - """ - If the query is NOT a subdomain of example, respond with standard NOERROR empty answer - """ - - async def get_responses( - self, qctx: QueryContext - ) -> AsyncGenerator[DnsResponseSend, None]: - - qctx.prepare_new_response() - qctx.response.set_rcode(dns.rcode.NOERROR) - yield DnsResponseSend(qctx.response, authoritative=True) - - -def resend_server() -> AsyncDnsServer: - server = AsyncDnsServer(default_aa=True, default_rcode=dns.rcode.NOERROR) - server.install_response_handlers( - RootNSHandler(), - ExampleNSHandler(), - CookieHandler(), - NoErrorHandler(), - ) - return server + yield DnsResponseSend(qctx.response) def main() -> None: - resend_server().run() + server = AsyncDnsServer(default_aa=True, default_rcode=dns.rcode.NOERROR) + server.install_response_handlers( + RootNsHandler(), + ExampleNsHandler(), + ExampleCookieHandler(), + ) + server.run() if __name__ == "__main__":