[CVE-2025-40778] sec: test: Add various bailiwick-related tests

Closes #5414

Merge branch '5414-add-various-bailiwick-related-tests' into 'main'

See merge request isc-projects/bind9!11406
This commit is contained in:
Michał Kępień 2025-12-22 12:44:38 +01:00
commit 4430632915
12 changed files with 559 additions and 1 deletions

View file

@ -679,7 +679,7 @@ vulture:
<<: *python_triggering_rules
needs: []
script:
- vulture --exclude "*ans.py,conftest.py,re_compile_checker.py,isctest" --ignore-names "after_servers_start,bootstrap,pytestmark" bin/tests/system/
- vulture --exclude "*ans.py,conftest.py,re_compile_checker.py,isctest" --ignore-names "after_servers_start,bootstrap,pytestmark,autouse_*" bin/tests/system/
ci-variables:
<<: *precheck_job

48
bin/tests/system/ans.py Normal file
View file

@ -0,0 +1,48 @@
"""
Copyright (C) Internet Systems Consortium, Inc. ("ISC")
SPDX-License-Identifier: MPL-2.0
This Source Code Form is subject to the terms of the Mozilla Public
License, v. 2.0. If a copy of the MPL was not distributed with this
file, you can obtain one at https://mozilla.org/MPL/2.0/.
See the COPYRIGHT file distributed with this work for additional
information regarding copyright ownership.
"""
"""
This is a bare-bones DNS server that only serves data from zone files. It is
meant to be used as a replacement for full-blown named instances in system
tests when a given server is only required to return zone-based data.
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
BEWARE! THIS SERVER DOES NOT NECESSARILY RETURN PROTOCOL-COMPLIANT ANSWERS!
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
See AsyncDnsServer._abort_if_*() methods in isctests/asyncserver.py for more
details. Use a regular named instance for anything non-trivial.
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
DO NOT ADD CUSTOM LOGIC TO THIS FILE. IT IS ONLY MEANT TO BE SYMLINKED INTO
ansX/ SUBDIRECTORIES IN SYSTEM TESTS TO REDUCE THE AMOUNT OF BOILERPLATE CODE.
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
If you need to customize server behavior, implement it in a dedicated ans.py
server in the system test at hand. If an extension you are working on can be
useful in other system tests, please consider opening a merge request extending
isctest/asyncserver.py.
"""
from isctest.asyncserver import (
AsyncDnsServer,
)
def main() -> None:
server = AsyncDnsServer()
server.run()
if __name__ == "__main__":
main()

View file

@ -0,0 +1,68 @@
"""
Copyright (C) Internet Systems Consortium, Inc. ("ISC")
SPDX-License-Identifier: MPL-2.0
This Source Code Form is subject to the terms of the Mozilla Public
License, v. 2.0. If a copy of the MPL was not distributed with this
file, you can obtain one at https://mozilla.org/MPL/2.0/.
See the COPYRIGHT file distributed with this work for additional
information regarding copyright ownership.
"""
from typing import AsyncGenerator
import dns.rdatatype
import dns.rrset
from isctest.asyncserver import (
DnsResponseSend,
QueryContext,
ResponseAction,
)
from bailiwick_ans import ResponseSpoofer, spoofing_server
ATTACKER_IP = "10.53.0.3"
TTL = 3600
class SiblingNsSpoofer(ResponseSpoofer, mode="sibling-ns"):
qname = "trigger."
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[ResponseAction, None]:
response = qctx.prepare_new_response(with_zone_data=False)
txt_rrset = dns.rrset.from_text(
qctx.qname,
TTL,
qctx.qclass,
dns.rdatatype.TXT,
'"spoofed answer with extra NS"',
)
response.answer.append(txt_rrset)
ns_rrset = dns.rrset.from_text(
"victim.", TTL, qctx.qclass, dns.rdatatype.NS, "ns.attacker."
)
response.authority.append(ns_rrset)
a_rrset = dns.rrset.from_text(
"ns.attacker.", TTL, qctx.qclass, dns.rdatatype.A, ATTACKER_IP
)
response.additional.append(a_rrset)
yield DnsResponseSend(response, authoritative=True)
def main() -> None:
spoofing_server().run()
if __name__ == "__main__":
main()

View file

@ -0,0 +1,24 @@
; Copyright (C) Internet Systems Consortium, Inc. ("ISC")
;
; SPDX-License-Identifier: MPL-2.0
;
; This Source Code Form is subject to the terms of the Mozilla Public
; License, v. 2.0. If a copy of the MPL was not distributed with this
; file, you can obtain one at https://mozilla.org/MPL/2.0/.
;
; See the COPYRIGHT file distributed with this work for additional
; information regarding copyright ownership.
$ORIGIN .
$TTL 3600
. SOA . . 0 0 0 0 3600
. NS a.root-servers.nil.
a.root-servers.nil. A 10.53.0.1
; queries should go here ... unless the attack succeeded
victim. NS ns.victim.
ns.victim. A 10.53.0.2
; no query should go here
attacker. NS ns.attacker.
ns.attacker. A 10.53.0.3

View file

@ -0,0 +1,110 @@
"""
Copyright (C) Internet Systems Consortium, Inc. ("ISC")
SPDX-License-Identifier: MPL-2.0
This Source Code Form is subject to the terms of the Mozilla Public
License, v. 2.0. If a copy of the MPL was not distributed with this
file, you can obtain one at https://mozilla.org/MPL/2.0/.
See the COPYRIGHT file distributed with this work for additional
information regarding copyright ownership.
"""
from typing import AsyncGenerator
import dns.rdatatype
import dns.rrset
from isctest.asyncserver import (
DnsResponseSend,
QueryContext,
ResponseAction,
)
from bailiwick_ans import ResponseSpoofer, spoofing_server
ATTACKER_IP = "10.53.0.3"
TTL = 3600
class UnsolicitedNsSpoofer(ResponseSpoofer, mode="unsolicited-ns"):
qname = "trigger.victim."
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[ResponseAction, None]:
response = qctx.prepare_new_response(with_zone_data=False)
txt_rrset = dns.rrset.from_text(
qctx.qname,
TTL,
qctx.qclass,
dns.rdatatype.TXT,
'"spoofed answer with extra NS"',
)
response.answer.append(txt_rrset)
ns_rrset = dns.rrset.from_text(
"victim.", TTL, qctx.qclass, dns.rdatatype.NS, "ns.attacker."
)
response.authority.append(ns_rrset)
yield DnsResponseSend(response, authoritative=True)
class ParentGlueSpoofer(ResponseSpoofer, mode="parent-glue"):
qname = "trigger.victim."
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[ResponseAction, None]:
response = qctx.prepare_new_response(with_zone_data=False)
ns_rrset = dns.rrset.from_text(
"trigger.victim.", TTL, qctx.qclass, dns.rdatatype.NS, "ns.victim."
)
response.authority.append(ns_rrset)
glue_rrset = dns.rrset.from_text(
"ns.victim.", TTL, qctx.qclass, dns.rdatatype.A, ATTACKER_IP
)
response.additional.append(glue_rrset)
yield DnsResponseSend(response, authoritative=False)
class DnameSpoofer(ResponseSpoofer, mode="dname"):
qname = "trigger.victim."
async def get_responses(
self, qctx: QueryContext
) -> AsyncGenerator[ResponseAction, None]:
response = qctx.prepare_new_response(with_zone_data=False)
cname_rrset = dns.rrset.from_text(
qctx.qname,
TTL,
qctx.qclass,
dns.rdatatype.CNAME,
"trigger.attacker.",
)
dname_rrset = dns.rrset.from_text(
"victim.", TTL, qctx.qclass, dns.rdatatype.DNAME, "attacker."
)
response.answer.append(cname_rrset)
response.answer.append(dname_rrset)
yield DnsResponseSend(response, authoritative=True)
def main() -> None:
spoofing_server().run()
if __name__ == "__main__":
main()

View file

@ -0,0 +1,17 @@
; Copyright (C) Internet Systems Consortium, Inc. ("ISC")
;
; SPDX-License-Identifier: MPL-2.0
;
; This Source Code Form is subject to the terms of the Mozilla Public
; License, v. 2.0. If a copy of the MPL was not distributed with this
; file, you can obtain one at https://mozilla.org/MPL/2.0/.
;
; See the COPYRIGHT file distributed with this work for additional
; information regarding copyright ownership.
$TTL 3600
@ SOA ns.victim. . 0 0 0 0 3600
@ NS ns
ns A 10.53.0.2
prime TXT "this record is used for priming the cache of the targeted resolver"
canary TXT "correct answer from the domain under attack"

View file

@ -0,0 +1 @@
../../ans.py

View file

@ -0,0 +1,17 @@
; Copyright (C) Internet Systems Consortium, Inc. ("ISC")
;
; SPDX-License-Identifier: MPL-2.0
;
; This Source Code Form is subject to the terms of the Mozilla Public
; License, v. 2.0. If a copy of the MPL was not distributed with this
; file, you can obtain one at https://mozilla.org/MPL/2.0/.
;
; See the COPYRIGHT file distributed with this work for additional
; information regarding copyright ownership.
$TTL 3600
@ SOA ns.attacker. . 0 0 0 0 3600
@ NS ns
ns A 10.53.0.3
only-if-hijacked TXT "this record only exists in the hijacked version of the zone"
canary TXT "fake answer from attacker"

View file

@ -0,0 +1,16 @@
; Copyright (C) Internet Systems Consortium, Inc. ("ISC")
;
; SPDX-License-Identifier: MPL-2.0
;
; This Source Code Form is subject to the terms of the Mozilla Public
; License, v. 2.0. If a copy of the MPL was not distributed with this
; file, you can obtain one at https://mozilla.org/MPL/2.0/.
;
; See the COPYRIGHT file distributed with this work for additional
; information regarding copyright ownership.
$TTL 3600
@ SOA ns.attacker. . 0 0 0 0 3600
@ NS ns.attacker.
only-if-hijacked TXT "this record only exists in the hijacked version of the zone"
canary TXT "fake answer from attacker's auth"

View file

@ -0,0 +1,102 @@
"""
Copyright (C) Internet Systems Consortium, Inc. ("ISC")
SPDX-License-Identifier: MPL-2.0
This Source Code Form is subject to the terms of the Mozilla Public
License, v. 2.0. If a copy of the MPL was not distributed with this
file, you can obtain one at https://mozilla.org/MPL/2.0/.
See the COPYRIGHT file distributed with this work for additional
information regarding copyright ownership.
"""
from typing import Dict, List, Optional, Type
import abc
import dns.name
import dns.rcode
import dns.rdatatype
from isctest.asyncserver import (
ControlCommand,
ControllableAsyncDnsServer,
DnsProtocol,
QueryContext,
ResponseHandler,
)
class ResponseSpoofer(ResponseHandler, abc.ABC):
spoofers: Dict[str, Type["ResponseSpoofer"]] = {}
def __init_subclass__(cls, mode: str) -> None:
assert mode not in cls.spoofers
cls.spoofers[mode] = cls
@classmethod
def get_spoofer(cls, mode: str) -> Optional["ResponseSpoofer"]:
try:
return cls.spoofers[mode]()
except KeyError:
return None
@property
@abc.abstractmethod
def qname(self) -> str:
raise NotImplementedError
def match(self, qctx: QueryContext) -> bool:
return (
qctx.qname == dns.name.from_text(self.qname)
and qctx.qtype == dns.rdatatype.TXT
and qctx.protocol == DnsProtocol.UDP
)
class SetSpoofingModeCommand(ControlCommand):
"""
Select the ResponseSpoofer to use while handling queries from the resolver
under test (ns4). This control command is used at the start of each test
function in tests_bailiwick.py.
"""
control_subdomain = "set-spoofing-mode"
def __init__(self) -> None:
self._current_handler: Optional[ResponseSpoofer] = None
def handle(
self, args: List[str], server: ControllableAsyncDnsServer, qctx: QueryContext
) -> Optional[str]:
if len(args) != 1:
qctx.response.set_rcode(dns.rcode.SERVFAIL)
return "invalid control command"
mode = args[0]
if mode == "none":
if self._current_handler:
server.uninstall_response_handler(self._current_handler)
self._current_handler = None
return "response spoofing disabled"
spoofer = ResponseSpoofer.get_spoofer(mode)
if not spoofer:
qctx.response.set_rcode(dns.rcode.SERVFAIL)
return f"unknown spoofing mode {mode}"
if self._current_handler:
server.uninstall_response_handler(self._current_handler)
server.install_response_handler(spoofer)
self._current_handler = spoofer
return f"response spoofing enabled (mode: {mode})"
def spoofing_server() -> ControllableAsyncDnsServer:
server = ControllableAsyncDnsServer(default_rcode=dns.rcode.NOERROR)
server.install_control_command(SetSpoofingModeCommand())
return server

View file

@ -0,0 +1,36 @@
/*
* Copyright (C) Internet Systems Consortium, Inc. ("ISC")
*
* SPDX-License-Identifier: MPL-2.0
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, you can obtain one at https://mozilla.org/MPL/2.0/.
*
* See the COPYRIGHT file distributed with this work for additional
* information regarding copyright ownership.
*/
options {
query-source address 10.53.0.4;
notify-source 10.53.0.4;
transfer-source 10.53.0.4;
port @PORT@;
pid-file "named.pid";
listen-on { 10.53.0.4; };
listen-on-v6 { none; };
recursion yes;
dnssec-validation no;
qname-minimization off;
};
controls {
inet 10.53.0.4 port @CONTROLPORT@ allow { any; } keys { rndc_key; };
};
include "../../_common/rndc.key";
zone "." {
type hint;
file "../../_common/root.hint";
};

View file

@ -0,0 +1,119 @@
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
#
# SPDX-License-Identifier: MPL-2.0
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
#
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from typing import Dict
import time
import dns.message
import pytest
# isctest.asyncserver requires dnspython >= 2.0.0
pytest.importorskip("dns", minversion="2.0.0")
import isctest
from isctest.instance import NamedInstance
@pytest.fixture(autouse=True)
def autouse_flush_resolver_cache(servers: Dict[str, NamedInstance]) -> None:
servers["ns4"].rndc("flush")
def set_spoofing_mode(ans1: str, ans2: str) -> None:
for ip, mode in (("10.53.0.1", ans1), ("10.53.0.2", ans2)):
msg = dns.message.make_query(f"{mode}.set-spoofing-mode._control.", "TXT")
res = isctest.query.tcp(msg, ip)
isctest.check.noerror(res)
def prime_cache(ns4: NamedInstance) -> None:
msg = dns.message.make_query("prime.victim.", "TXT")
res = isctest.query.tcp(msg, ns4.ip)
isctest.check.noerror(res)
assert res.answer[0] == dns.rrset.from_text(
"prime.victim.",
0,
"IN",
"TXT",
'"this record is used for priming the cache of the targeted resolver"',
)
def send_trigger_query(ns4: NamedInstance, qname: str) -> None:
msg = dns.message.make_query(qname, "TXT")
isctest.query.tcp(msg, ns4.ip)
# The contents of the resolver's response to the trigger query do not
# matter, so they are not checked in any way; what matters is whether the
# spoofed response succeeded in hijacking the "victim." domain, which is
# checked below.
def check_domain_hijack(ns4: NamedInstance) -> None:
# Not necessary for triggering bugs, but useful for troubleshooting test
# behavior.
ns4.rndc("dumpdb -cache")
msg = dns.message.make_query("only-if-hijacked.victim.", "TXT")
res = isctest.query.tcp(msg, ns4.ip)
isctest.check.nxdomain(res)
msg = dns.message.make_query("canary.victim.", "TXT")
res = isctest.query.tcp(msg, ns4.ip)
isctest.check.noerror(res)
assert res.answer[0] == dns.rrset.from_text(
"canary.victim.",
0,
"IN",
"TXT",
'"correct answer from the domain under attack"',
)
def test_bailiwick_sibling_ns_referral(servers: Dict[str, NamedInstance]) -> None:
set_spoofing_mode(ans1="sibling-ns", ans2="none")
ns4 = servers["ns4"]
send_trigger_query(ns4, "trigger.")
check_domain_hijack(ns4)
def test_bailiwick_unsolicited_authority(servers: Dict[str, NamedInstance]) -> None:
set_spoofing_mode(ans1="none", ans2="unsolicited-ns")
ns4 = servers["ns4"]
prime_cache(ns4)
send_trigger_query(ns4, "trigger.victim.")
check_domain_hijack(ns4)
def test_bailiwick_parent_glue(servers: Dict[str, NamedInstance]) -> None:
set_spoofing_mode(ans1="none", ans2="parent-glue")
ns4 = servers["ns4"]
prime_cache(ns4)
send_trigger_query(ns4, "trigger.victim.")
isctest.log.info("Waiting 61 seconds for the ns.victim. ADB entry to expire")
time.sleep(61)
check_domain_hijack(ns4)
def test_bailiwick_spoofed_dname(servers: Dict[str, NamedInstance]) -> None:
set_spoofing_mode(ans1="none", ans2="dname")
ns4 = servers["ns4"]
send_trigger_query(ns4, "trigger.victim.")
check_domain_hijack(ns4)