Add a system test for CNAME answers to DNSSEC meta-type queries

Two authoritative zones drive the cases. 'example.' answers DNSKEY,
NSEC, NSEC3 and RRSIG queries with a CNAME: a direct recursive query for
one of these must not crash the resolver, and the validator's own DNSKEY
fetch for a signed name must fail as a broken trust chain and return
SERVFAIL promptly.

'secure.' is served faithfully but answers DS queries with an unsigned
CNAME -- the input that drove the validator's insecurity proof into a
self-join.  The resolver must return SERVFAIL within a couple of seconds
instead of stalling for twelve.

Assisted-by: Claude:claude-opus-4-8
This commit is contained in:
Ondřej Surý 2026-05-29 11:32:52 +02:00 committed by Ondřej Surý
parent 938b58a809
commit 358c55ffa2
No known key found for this signature in database
GPG key ID: 2820F37E873DEA41
6 changed files with 394 additions and 0 deletions

View file

@ -0,0 +1,173 @@
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from collections.abc import AsyncGenerator
import dns.flags
import dns.name
import dns.rcode
import dns.rdatatype
import dns.rrset
import dns.zone
from isctest.asyncserver import (
AsyncDnsServer,
DnsResponseSend,
DomainHandler,
QueryContext,
)
# 'example.' answers DNSKEY/NSEC/NSEC3/RRSIG queries with a CNAME (the
# meta-types whose CNAME answer the resolver and validator must cope with).
EXAMPLE = dns.zone.from_file("example.signed.db", origin="example.", relativize=False)
# 'secure.' is served faithfully but answers DS queries with an unsigned
# CNAME: the input that drove the validator's insecurity proof into a
# self-join deadlock (GL#5878). Served correctly otherwise so the resolver
# can validate down to the zone and reach the DS query.
SECURE = dns.zone.from_file("secure.signed.db", origin="secure.", relativize=False)
def _append_rrset_with_rrsig(
zone: dns.zone.Zone,
section: list,
name: dns.name.Name,
qclass: int,
rdtype: int,
rds,
) -> None:
rrset = dns.rrset.RRset(name, qclass, rdtype)
rrset.update(rds)
section.append(rrset)
node = zone.get_node(name)
if node is None:
return
rrsig_rds = node.get_rdataset(qclass, dns.rdatatype.RRSIG, covers=rdtype)
if rrsig_rds is None:
return
rrsig_rrset = dns.rrset.RRset(name, qclass, dns.rdatatype.RRSIG, covers=rdtype)
rrsig_rrset.update(rrsig_rds)
section.append(rrsig_rrset)
class CnameZoneHandler(DomainHandler):
"""Serve a signed zone faithfully, but answer queries for the configured
rdata types with a CNAME instead of the real records."""
def __init__(self, zone: dns.zone.Zone, cname_qtypes) -> None:
self.zone = zone
self.cname_qtypes = frozenset(cname_qtypes)
super().__init__()
@property
def domains(self) -> list:
return [self.zone.origin.to_text()]
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[DnsResponseSend, None]:
qctx.prepare_new_response(with_zone_data=False)
qctx.response.flags |= dns.flags.AA
if qctx.qtype in self.cname_qtypes:
cname_target = f"cname-target.{qctx.qname.to_text()}"
cname_rrset = dns.rrset.from_text(
qctx.qname,
300,
qctx.qclass,
dns.rdatatype.CNAME,
cname_target,
)
qctx.response.answer.append(cname_rrset)
yield DnsResponseSend(qctx.response)
return
node = self.zone.get_node(qctx.qname)
soa_rds = self.zone.get_rdataset(self.zone.origin, dns.rdatatype.SOA)
if node is None:
qctx.response.set_rcode(dns.rcode.NXDOMAIN)
_append_rrset_with_rrsig(
self.zone,
qctx.response.authority,
self.zone.origin,
qctx.qclass,
dns.rdatatype.SOA,
soa_rds,
)
yield DnsResponseSend(qctx.response)
return
rds = node.get_rdataset(qctx.qclass, qctx.qtype)
if rds is None:
_append_rrset_with_rrsig(
self.zone,
qctx.response.authority,
self.zone.origin,
qctx.qclass,
dns.rdatatype.SOA,
soa_rds,
)
yield DnsResponseSend(qctx.response)
return
_append_rrset_with_rrsig(
self.zone,
qctx.response.answer,
qctx.qname,
qctx.qclass,
qctx.qtype,
rds,
)
yield DnsResponseSend(qctx.response)
class LoneRecordHandler(DomainHandler):
"""Answer any query with a single unrelated A record (no RRSIG and no
alias). An RRSIG query is handled by the resolver as a subset of ANY,
and such an answer used to be dropped entirely, leaving the fetch
waiting for a validator that was never started."""
domains = ["lone-a.example."]
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[DnsResponseSend, None]:
qctx.prepare_new_response(with_zone_data=False)
qctx.response.flags |= dns.flags.AA
a_rrset = dns.rrset.from_text(
qctx.qname, 300, qctx.qclass, dns.rdatatype.A, "192.0.2.1"
)
qctx.response.answer.append(a_rrset)
yield DnsResponseSend(qctx.response)
def main() -> None:
server = AsyncDnsServer(default_rcode=dns.rcode.NOERROR, default_aa=True)
server.install_response_handlers(
LoneRecordHandler(),
CnameZoneHandler(
EXAMPLE,
{
dns.rdatatype.DNSKEY,
dns.rdatatype.NSEC,
dns.rdatatype.NSEC3,
dns.rdatatype.RRSIG,
},
),
CnameZoneHandler(SECURE, {dns.rdatatype.DS}),
)
server.run()
if __name__ == "__main__":
main()

View file

@ -0,0 +1,11 @@
$TTL 300
@ IN SOA ns.example. root.example. (
1 ; serial
3600 ; refresh
1800 ; retry
1814400 ; expire
300 ; minimum
)
IN NS ns.example.
ns IN A 10.53.0.2
www IN A 10.53.0.3

View file

@ -0,0 +1,10 @@
$TTL 300
@ IN SOA ns.secure. root.secure. (
1 ; serial
3600 ; refresh
1800 ; retry
1814400 ; expire
300 ; minimum
)
IN NS ns.secure.
ns IN A 10.53.0.2

View file

@ -0,0 +1,24 @@
options {
query-source address 10.53.0.3;
notify-source 10.53.0.3;
transfer-source 10.53.0.3;
port @PORT@;
pid-file "named.pid";
listen-on { 10.53.0.3; };
listen-on-v6 { none; };
allow-transfer { any; };
dnssec-validation yes;
recursion yes;
};
zone "example." {
type static-stub;
server-addresses { 10.53.0.2; };
};
zone "secure." {
type static-stub;
server-addresses { 10.53.0.2; };
};
include "trusted.conf";

View file

@ -0,0 +1,4 @@
trust-anchors {
example. static-key 257 3 14 "@ksk_public_key@";
secure. static-key 257 3 14 "@secure_ksk_public_key@";
};

View file

@ -0,0 +1,172 @@
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from re import compile as Re
import base64
import time
from cryptography.hazmat.primitives.asymmetric import ec
from dns.rdtypes.dnskeybase import Flag
import dns.dnssec
import dns.rdataclass
import dns.zone
import pytest
import isctest
def _sign_zone(db_in, signed_out, origin):
"""Sign 'db_in' with a fresh KSK; write 'signed_out'; return the KSK
public key (base64) for use as a static trust anchor."""
ksk_private_key = ec.generate_private_key(ec.SECP384R1())
ksk_dnskey = dns.dnssec.make_dnskey(
public_key=ksk_private_key.public_key(),
algorithm=dns.dnssec.Algorithm.ECDSAP384SHA384,
flags=Flag.ZONE | Flag.SEP,
)
zone = dns.zone.from_file(db_in, origin=origin)
with zone.writer() as txn:
dns.dnssec.sign_zone(
zone=zone,
txn=txn,
keys=[(ksk_private_key, ksk_dnskey)],
lifetime=300,
add_dnskey=True,
deterministic=False, # for OpenSSL<3.2.0 compat
)
zone.to_file(signed_out)
return base64.b64encode(ksk_dnskey.key).decode()
def bootstrap():
try:
result = {
"ksk_public_key": _sign_zone(
"ans2/example.db.in", "ans2/example.signed.db", "example."
),
"secure_ksk_public_key": _sign_zone(
"ans2/secure.db.in", "ans2/secure.signed.db", "secure."
),
}
except ImportError as exc:
pytest.skip(f"{exc}")
return result
def _assert_ns3_alive():
"""Fail if ns3 is no longer answering (e.g. it hit an assertion)."""
liveness = isctest.query.create("version.bind.", "TXT", dns.rdataclass.CH, rd=False)
res = isctest.query.tcp(liveness, "10.53.0.3", timeout=5)
assert res is not None, "ns3 did not answer a liveness query -- it may have crashed"
@pytest.mark.parametrize("qtype", ["DNSKEY", "NSEC", "NSEC3", "RRSIG"])
def test_direct_metatype_query_does_not_crash_resolver(qtype):
"""
A direct recursive client query for a DNSSEC meta-type, answered by a
malicious authoritative server with a CNAME, must not crash the
resolver. This probes the client-facing consumers of the resolver
fetch (ns_query/query_cname), not the validator's internal fetch.
A resolver fetch that completes with DNS_R_CNAME goes through the
normal answer path, which binds the answer name and rdataset. An
earlier resolver-side shortcut returned DNS_R_CNAME without binding
them, so query_cname() handed an empty (non-absolute) name to
dns_message_addname() and named aborted on REQUIRE(dns_name_isabsolute).
"""
msg = isctest.query.create("sub.example.", qtype)
start_time = time.time()
res = isctest.query.tcp(msg, "10.53.0.3", timeout=8)
elapsed_time = time.time() - start_time
# The resolver must answer promptly. An RRSIG query is handled as a
# subset of ANY, and a CNAME answer to it used to be dropped without
# caching or validation, leaving the fetch waiting ~12s for a
# validator that was never started.
assert elapsed_time < 5.0, f"{qtype} query took too long: {elapsed_time}s"
# We do not assert a particular rcode here -- SERVFAIL or a chased
# answer are both acceptable. The point is that named survives.
assert res is not None, f"no response to direct {qtype} query"
_assert_ns3_alive()
def test_rrsig_lone_record_does_not_stall_resolver():
"""
A direct recursive RRSIG query answered with an unrelated record
(here a lone A, with no RRSIG and no alias) must not stall the
resolver. An RRSIG query is handled as a subset of ANY; every record
of the wrong type is filtered out, and when nothing is left the
answer used to be accepted as success with no answer bound, leaving
the fetch waiting ~12s for a validator that was never started.
"""
msg = isctest.query.create("lone-a.example.", "RRSIG")
start_time = time.time()
res = isctest.query.tcp(msg, "10.53.0.3", timeout=8)
elapsed_time = time.time() - start_time
assert elapsed_time < 5.0, f"RRSIG query took too long: {elapsed_time}s"
assert res is not None, "no response to lone-record RRSIG query"
_assert_ns3_alive()
def test_cname_for_validator_dnskey_fetch(ns3):
"""
A malicious authoritative server returning a CNAME for the
validator's DNSKEY fetch must not stall validation. The DNSKEY
fetch completes with DNS_R_CNAME, which the validator treats as a
broken trust chain, so the client query terminates with SERVFAIL
rather than hanging. No resolver-side special case is needed: the
validator already rejects a CNAME answer to its meta-fetch.
"""
log_brokenchain = Re(r"broken trust chain resolving 'www\.example/A/IN'")
msg = isctest.query.create("www.example.", "A")
start_time = time.time()
with ns3.watch_log_from_here(timeout=5) as watcher:
res = isctest.query.tcp(msg, "10.53.0.3")
watcher.wait_for_line(log_brokenchain)
elapsed_time = time.time() - start_time
assert elapsed_time < 5.0, f"Query took too long: {elapsed_time}s"
isctest.check.servfail(res)
def test_ds_cname_does_not_deadlock():
"""
A DS query answered with an unsigned CNAME must not send the validator
into a self-join deadlock (GL#5878). While proving the CNAME insecure
the validator would fetch the DS for the same name, re-entering the
in-flight DS fetch it is blocked on and stalling for ~12 seconds until a
backstop timer fires. The validator now detects that such a fetch cannot
advance the alias chain and aborts, so the client gets SERVFAIL promptly.
'secure.' is a properly signed zone (so validation reaches the DS query),
but its authoritative server answers DS queries with an unsigned CNAME.
"""
msg = isctest.query.create("insecure.secure.", "DS")
start_time = time.time()
res = isctest.query.tcp(msg, "10.53.0.3", timeout=8)
elapsed_time = time.time() - start_time
assert (
elapsed_time < 5.0
), f"DS query took too long: {elapsed_time}s (possible deadlock)"
isctest.check.servfail(res)
_assert_ns3_alive()