Use new AsyncDnsServer features in cookie system test

Take advantage of `default_aa`, `default_rcode` and `keyring` arguments.
This commit is contained in:
Štěpán Balážik 2025-11-12 16:35:40 +01:00
parent 5384998ccd
commit f5f84a649b

View file

@ -12,8 +12,8 @@
from typing import AsyncGenerator
import dns.edns
import dns.message
import dns.name
import dns.rcode
import dns.rdatatype
import dns.rrset
import dns.tsigkeyring
@ -37,16 +37,6 @@ KEYRING = dns.tsigkeyring.from_text(
)
def _reparse_with_keyring(qctx: QueryContext) -> None:
"""
`isctest.asyncserver` doesn't support TSIG signing and validation properly
and hacks around it. However, here we need to be able to sign responses with
TSIG, so we reparse the query and recreate the response stub here.
"""
qctx.query = dns.message.from_wire(qctx.query.to_wire(), keyring=KEYRING)
qctx.response = dns.message.make_response(qctx.query)
def _first_label(qctx: QueryContext) -> str:
return qctx.qname.labels[0].decode("ascii")
@ -112,14 +102,13 @@ class NsHandler(_SpoofableHandler):
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[DnsResponseSend, None]:
_reparse_with_keyring(qctx)
_add_cookie(qctx)
qctx.response.answer.append(_ns(qctx))
if self.evil_server:
qctx.response.authority.append(_spoofed_a(qctx))
else:
qctx.response.authority.append(_legit_a(qctx))
yield DnsResponseSend(qctx.response, authoritative=True)
yield DnsResponseSend(qctx.response)
class GlueHandler(_SpoofableHandler):
@ -129,13 +118,12 @@ class GlueHandler(_SpoofableHandler):
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[DnsResponseSend, None]:
_reparse_with_keyring(qctx)
_add_cookie(qctx)
if self.evil_server:
qctx.response.answer.append(_spoofed_a(qctx))
else:
qctx.response.answer.append(_legit_a(qctx))
yield DnsResponseSend(qctx.response, authoritative=True)
yield DnsResponseSend(qctx.response)
class TcpAHandler(ResponseHandler):
@ -145,11 +133,10 @@ class TcpAHandler(ResponseHandler):
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[DnsResponseSend, None]:
_reparse_with_keyring(qctx)
if _first_label(qctx) != "nocookie":
_add_cookie(qctx)
qctx.response.answer.append(_legit_a(qctx))
yield DnsResponseSend(qctx.response, authoritative=True)
yield DnsResponseSend(qctx.response)
class WithtsigUdpAHandler(ResponseHandler):
@ -163,16 +150,15 @@ class WithtsigUdpAHandler(ResponseHandler):
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[DnsResponseSend, None]:
_reparse_with_keyring(qctx)
qctx.response.answer.append(_legit_a(qctx))
qctx.response.answer.append(_spoofed_a(qctx))
qctx.response.use_tsig(keyring=KEYRING, keyname="fake")
yield DnsResponseSend(qctx.response, authoritative=True)
yield DnsResponseSend(qctx.response)
_reparse_with_keyring(qctx)
qctx.prepare_new_response()
_add_cookie(qctx)
qctx.response.answer.append(_legit_a(qctx))
yield DnsResponseSend(qctx.response, authoritative=True)
yield DnsResponseSend(qctx.response)
class UdpAHandler(ResponseHandler):
@ -182,31 +168,31 @@ class UdpAHandler(ResponseHandler):
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[DnsResponseSend, None]:
_reparse_with_keyring(qctx)
qctx.response.answer.append(_legit_a(qctx))
if _first_label(qctx) not in ("nocookie", "tcponly"):
_add_cookie(qctx)
else:
qctx.response.answer.append(_spoofed_a(qctx))
yield DnsResponseSend(qctx.response, authoritative=True)
yield DnsResponseSend(qctx.response)
class FallbackHandler(ResponseHandler):
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[DnsResponseSend, None]:
_reparse_with_keyring(qctx)
_add_cookie(qctx)
if qctx.qtype == dns.rdatatype.SOA:
qctx.response.answer.append(_soa(qctx))
else:
qctx.response.authority.append(_soa(qctx))
yield DnsResponseSend(qctx.response, authoritative=True)
yield DnsResponseSend(qctx.response)
def cookie_server(evil: bool) -> AsyncDnsServer:
server = AsyncDnsServer(keyring=None)
server = AsyncDnsServer(
keyring=KEYRING, default_aa=True, default_rcode=dns.rcode.NOERROR
)
server.install_response_handlers(
[
NsHandler(evil),