Rewrite xfer/ans11/ans.py to use AsyncDnsServer

Replace the hand-rolled threaded socket server with the standard
AsyncDnsServer framework used by other ans.py servers in the test suite.

The DNS wire-format message builders (IXFR diff, AXFR, SOA, SERVFAIL)
are retained unchanged since they produce carefully crafted messages
needed to trigger the IXFR->AXFR race condition. The server
infrastructure is replaced:

- Manual TCP/UDP socket management and threading replaced by
  AsyncDnsServer, which handles both protocols, pidfile lifecycle,
  and signal handling.
- Query parsing replaced by the framework's dns.message-based parser;
  query dispatch moved into IxfrRaceHandler.get_responses().
- The axfr_done_event threading.Event replaced by a boolean instance
  variable on IxfrRaceHandler, safe within the single asyncio event
  loop.
- For IXFR over TCP, the handler yields two BytesResponseSend actions
  (msg1 then msg2) so the framework sends both with TCP length prefixes,
  preserving the race-triggering sequence.
- For IXFR over UDP, the TC flag is set on the response to force TCP
  retry.
- Unused encode_name_compressed() and parse_dns_query() removed.

Also fix a timing issue that might result in the initial transfer not
being done by the time the test is executed -- since ns11 is started
after ns6. Ensure the initial transfer has happened before running the
ixfr_race test.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Nicki Křížek 2026-04-02 12:40:25 +00:00 committed by Aram Sargsyan
parent 141ff7bfa7
commit 187e571f4d
2 changed files with 65 additions and 235 deletions

View file

@ -11,14 +11,25 @@ See the COPYRIGHT file distributed with this work for additional
information regarding copyright ownership.
"""
import os
import signal
import socket
import struct
import sys
import threading
from collections.abc import AsyncGenerator
# DNS constants
import struct
import dns.flags
import dns.rcode
import dns.rdatatype
from isctest.asyncserver import (
AsyncDnsServer,
BytesResponseSend,
DnsProtocol,
DnsResponseSend,
QueryContext,
ResponseAction,
ResponseHandler,
)
# DNS constants used by raw wire builder functions below
DNS_TYPE_SOA = 6
DNS_TYPE_A = 1
DNS_TYPE_NS = 2
@ -30,6 +41,9 @@ DNS_FLAG_AA = 0x0400
DNS_RCODE_NOERROR = 0
DNS_RCODE_SERVFAIL = 2
ZONE_NAME = "ixfr-race."
NUM_RECORDS = 400
def encode_name(name):
"""Encode a DNS name in wire format (no compression)."""
@ -42,11 +56,6 @@ def encode_name(name):
return result
def encode_name_compressed(offset):
"""Encode a DNS name using compression pointer."""
return struct.pack("!H", 0xC000 | offset)
def build_soa_rdata(
mname, rname, serial, refresh=3600, retry=900, expire=604800, minimum=86400
):
@ -76,36 +85,6 @@ def build_dns_header(qid, flags, qdcount, ancount, nscount=0, arcount=0):
return struct.pack("!HHHHHH", qid, flags, qdcount, ancount, nscount, arcount)
def parse_dns_query(data):
"""Parse incoming DNS query, return (qid, qname, qtype, qclass)."""
if len(data) < 12:
return None
qid, _, _ = struct.unpack("!HHH", data[:6])
# Parse question
offset = 12
labels = []
while offset < len(data):
length = data[offset]
offset += 1
if length == 0:
break
if length >= 0xC0:
# Compression pointer
offset += 1
break
labels.append(data[offset : offset + length].decode("ascii"))
offset += length
qname = ".".join(labels) + "."
if offset + 4 > len(data):
return None
qtype, qclass = struct.unpack("!HH", data[offset : offset + 4])
return qid, qname, qtype, qclass
def build_ixfr_message1(qid, zone_name, num_records):
"""
Build IXFR Message 1: A valid IXFR diff that triggers ixfr_commit().
@ -176,11 +155,6 @@ def build_ixfr_message1(qid, zone_name, num_records):
header = build_dns_header(qid, flags, 1, ancount)
msg = header + question + answer
print(
f"[*] Message 1: {len(msg)} bytes, {ancount} RRs "
f"(diff 1: {num_records} DEL + {num_records} ADD)"
)
return msg
@ -209,11 +183,6 @@ def build_bad_rcode_message2(qid, zone_name):
header = build_dns_header(qid, flags, 1, 0)
msg = header + question
print(
f"[*] Message 2 (bad-rcode): {len(msg)} bytes, "
"rcode=SERVFAIL -> triggers try_axfr -> xfrin_reset()"
)
return msg
@ -276,198 +245,54 @@ def build_axfr_response(qid, zone_name, serial, num_records):
header = build_dns_header(qid, flags, 1, ancount)
msg = header + question + answer
print(
f"[*] AXFR response: {len(msg)} bytes, {ancount} RRs "
f"(serial={serial}, {num_records} A records)"
)
return msg
def tcp_send_message(sock, msg):
"""Send a DNS message over TCP with 2-byte length prefix."""
length = struct.pack("!H", len(msg))
sock.sendall(length + msg)
class IxfrRaceHandler(ResponseHandler):
"""
Handle SOA, AXFR, and IXFR queries to trigger the IXFR->AXFR race condition.
Phase 1: Respond to SOA with serial=1 and serve an AXFR to load the zone.
Phase 2: After AXFR, respond to SOA with serial=3 to trigger IXFR.
On IXFR, send a valid large diff (msg1) followed immediately by a
SERVFAIL response (msg2) to race ixfr_commit() against xfrin_reset().
"""
def tcp_recv_message(sock):
"""Receive a DNS message over TCP with 2-byte length prefix."""
length_data = b""
while len(length_data) < 2:
chunk = sock.recv(2 - len(length_data))
if not chunk:
return None
length_data += chunk
length = struct.unpack("!H", length_data)[0]
def __init__(self) -> None:
self._axfr_done = False
data = b""
while len(data) < length:
chunk = sock.recv(length - len(data))
if not chunk:
return None
data += chunk
return data
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[ResponseAction, None]:
qid = qctx.query.id
if qctx.qtype == dns.rdatatype.SOA:
serial = 3 if self._axfr_done else 1
yield BytesResponseSend(build_soa_response(qid, ZONE_NAME, serial))
def handle_client(conn, addr, zone_name, num_records, axfr_done_event):
"""Handle a single TCP connection from a BIND secondary."""
print(f"[+] Connection from {addr}")
try:
while True:
data = tcp_recv_message(conn)
if data is None:
print(f"[-] Connection closed by {addr}")
break
parsed = parse_dns_query(data)
if parsed is None:
print(f"[-] Failed to parse query from {addr}")
break
qid, qname, qtype, qclass = parsed
print(f"[*] Query: {qname} type={qtype} class={qclass} id={qid}")
if qtype == DNS_TYPE_SOA:
# SOA query over TCP (initial or pre-transfer check)
# Respond with serial=1 if initial AXFR not done yet,
# serial=3 to trigger IXFR after initial load
if axfr_done_event.is_set():
serial = 3
else:
serial = 1
print(f"[*] Responding with SOA serial={serial}")
response = build_soa_response(qid, zone_name, serial)
tcp_send_message(conn, response)
elif qtype == DNS_TYPE_AXFR:
# Initial AXFR to load the zone with serial=1
print("[*] AXFR request - sending initial zone (serial=1)")
response = build_axfr_response(qid, zone_name, 1, num_records)
tcp_send_message(conn, response)
axfr_done_event.set()
print(
"[+] Initial AXFR complete. Zone loaded with "
"serial=1. Next SOA will return serial=3 to "
"trigger IXFR."
)
elif qtype == DNS_TYPE_IXFR:
print("[*] IXFR request received")
elif qctx.qtype == dns.rdatatype.AXFR:
yield BytesResponseSend(build_axfr_response(qid, ZONE_NAME, 1, NUM_RECORDS))
self._axfr_done = True
elif qctx.qtype == dns.rdatatype.IXFR:
if qctx.protocol == DnsProtocol.UDP:
# Force TCP retry by setting the TC bit
qctx.response.flags |= dns.flags.TC
yield DnsResponseSend(qctx.response)
else:
# Message 1: Valid IXFR diff -> triggers ixfr_commit()
msg1 = build_ixfr_message1(qid, zone_name, num_records)
print(f"[*] Sending Message 1 ({len(msg1)} bytes)...")
tcp_send_message(conn, msg1)
# Message 2: Trigger xfrin_reset() while worker is running
msg2 = build_bad_rcode_message2(qid, zone_name)
print(f"[*] Sending Message 2 ({len(msg2)} bytes) - triggers race!")
tcp_send_message(conn, msg2)
print(
"[+] IXFR response sent. If BIND9 is built with "
"TSAN, expect data race reports on "
"xfr->ixfr.journal and xfr->ver"
yield BytesResponseSend(
build_ixfr_message1(qid, ZONE_NAME, NUM_RECORDS)
)
else:
print(f"[*] Ignoring query type {qtype}")
except (ConnectionResetError, BrokenPipeError) as e:
print(f"[-] Connection error: {e}")
finally:
conn.close()
print(f"[-] Connection to {addr} closed")
# Message 2: SERVFAIL -> triggers xfrin_reset() while
# ixfr_apply worker from Message 1 is still running -> UAF
yield BytesResponseSend(build_bad_rcode_message2(qid, ZONE_NAME))
def udp_server(listen_addr, port, zone_name, axfr_done_event):
"""UDP server for SOA queries (BIND sends SOA queries over UDP first)."""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((listen_addr, port))
print(f"[+] UDP listening on {listen_addr}:{port} (for SOA queries)")
while True:
try:
data, addr = sock.recvfrom(4096)
parsed = parse_dns_query(data)
if parsed is None:
continue
qid, qname, qtype, qclass = parsed
print(f"[UDP] Query from {addr}: {qname} class={qclass} type={qtype}")
if qtype == DNS_TYPE_SOA:
# Return serial=1 initially (matches zone), then serial=3
# after AXFR to trigger IXFR
if axfr_done_event.is_set():
serial = 3
else:
serial = 1
print(f"[UDP] Responding with SOA serial={serial}")
response = build_soa_response(qid, zone_name, serial)
sock.sendto(response, addr)
elif qtype == DNS_TYPE_IXFR:
# IXFR over UDP gets truncated response to force TCP
print("[UDP] IXFR over UDP, sending TC=1 to force TCP")
flags = DNS_FLAG_QR | DNS_FLAG_AA | 0x0200 # TC bit
header = build_dns_header(qid, flags, 0, 0)
sock.sendto(header, addr)
else:
print(f"[UDP] Ignoring query type {qtype}")
except Exception as e: # pylint: disable=broad-except
print(f"[UDP] Error: {e}")
def sigterm(*_):
print("SIGTERM received, shutting down")
os.remove("ans.pid")
sys.exit(0)
def main():
signal.signal(signal.SIGTERM, sigterm)
signal.signal(signal.SIGINT, sigterm)
with open("ans.pid", "w", encoding="utf-8") as pidfile:
print(os.getpid(), file=pidfile)
listen = sys.argv[1]
port = int(sys.argv[2])
zone_name = "ixfr-race."
num_records = 400
# Shared event: set after initial AXFR, before IXFR
axfr_done_event = threading.Event()
# Start UDP server in background (for SOA queries)
udp_thread = threading.Thread(
target=udp_server, args=(listen, port, zone_name, axfr_done_event)
)
udp_thread.daemon = True
udp_thread.start()
# Set up TCP server (for AXFR initial load + IXFR attack)
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((listen, port))
server.listen(5)
print(f"[+] TCP listening on {listen}:{port}")
print()
print("[*] Phase 1: Initial AXFR to load zone with serial=1")
print("[*] Phase 2: SOA refresh will return serial=3 -> IXFR -> race")
print()
while True:
conn, addr = server.accept()
t = threading.Thread(
target=handle_client,
args=(conn, addr, zone_name, num_records, axfr_done_event),
)
t.daemon = True
t.start()
server.close()
def main() -> None:
server = AsyncDnsServer(default_rcode=dns.rcode.NOERROR, default_aa=True)
server.install_response_handler(IxfrRaceHandler())
server.run()
if __name__ == "__main__":

View file

@ -567,10 +567,15 @@ def test_ixfr_race(ns6):
isctest.log.info(
"Check that ixfr-race has been successfully transferred by the secondary"
)
with ns6.watch_log_from_start() as watcher_transfer_completed:
watcher_transfer_completed.wait_for_line(
"zone ixfr-race/IN: zone transfer finished: success"
)
if "zone ixfr-race/IN: zone transfer finished: success" not in ns6.log:
# ns11 is started after ns6, so the zone transfer might not have
# happened by the time this test is started: if not, use retransfer to
# do the initial fetch now
with ns6.watch_log_from_start() as watcher_transfer_completed:
ns6.rndc("retransfer ixfr-race.")
watcher_transfer_completed.wait_for_line(
"zone ixfr-race/IN: zone transfer finished: success"
)
isctest.log.info("Try to reload the zone from the primary")
with ns6.watch_log_from_here() as watcher_transfer_completed: