mirror of
https://github.com/isc-projects/bind9.git
synced 2026-06-10 16:30:00 -04:00
After SIG and NXT lost their special handling, KEY remained the only RFC 2535-era type still receiving coexistence allowances: KEY alongside CNAME at the same owner, KEY answered from the parent side of a zone cut, KEY kept across CNAME eviction in the cache. RFC 3755 retains type 25 only for SIG(0) and TKEY transaction signatures, and neither relies on those allowances in practice. The in-tree comment that flagged the RFC 3007 parent-side carve-out as "unclear" predicted this cleanup. Zones that publish CNAME and KEY at the same owner — already invalid under RFC 2181 — now fail to load. System test fixtures are updated accordingly, and a new test asserts that SIG, NXT, and KEY records pick up covering RRSIGs when their zone is signed.
1379 lines
50 KiB
Python
1379 lines
50 KiB
Python
# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
|
|
#
|
|
# SPDX-License-Identifier: MPL-2.0
|
|
#
|
|
# This Source Code Form is subject to the terms of the Mozilla Public
|
|
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
# file, you can obtain one at https://mozilla.org/MPL/2.0/.
|
|
#
|
|
# See the COPYRIGHT file distributed with this work for additional
|
|
# information regarding copyright ownership.
|
|
|
|
from re import compile as Re
|
|
|
|
import shutil
|
|
import time
|
|
|
|
from dns import flags, name, rdataclass, rdatatype
|
|
from dns.edns import EDECode
|
|
|
|
import pytest
|
|
|
|
from isctest.util import param
|
|
|
|
import isctest
|
|
import isctest.mark
|
|
|
|
pytestmark = pytest.mark.extra_artifacts(
|
|
[
|
|
"*/K*",
|
|
"*/dsset-*",
|
|
"*/*.bk",
|
|
"*/*.conf",
|
|
"*/*.db",
|
|
"*/*.id",
|
|
"*/*.jnl",
|
|
"*/*.jbk",
|
|
"*/*.key",
|
|
"*/*.signed",
|
|
"*/settime.out.*",
|
|
"ans*/ans.run",
|
|
"*/trusted.keys",
|
|
"*/*.bad",
|
|
"*/*.next",
|
|
"*/*.stripped",
|
|
"*/*.tmp",
|
|
"*/*.stage?",
|
|
"*/*.patched",
|
|
"*/*.lower",
|
|
"*/*.upper",
|
|
"*/*.unsplit",
|
|
]
|
|
)
|
|
|
|
|
|
def getfrom(file):
|
|
with open(file, encoding="utf-8") as f:
|
|
return f.read().strip()
|
|
|
|
|
|
@pytest.mark.requires_zones_loaded("ns2", "ns3")
|
|
@pytest.mark.parametrize(
|
|
"qname, qtype",
|
|
[
|
|
param("a.example.", "A"),
|
|
param("rfc2535.example.", "SOA"),
|
|
],
|
|
)
|
|
def test_load_transfer(qname, qtype):
|
|
# check that we can load and transfer zone
|
|
msg = isctest.query.create(qname, qtype)
|
|
res1 = isctest.query.tcp(msg, "10.53.0.2")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.3")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.noerror(res1)
|
|
|
|
|
|
def test_insecure_rrsig():
|
|
# check that for a rrsig query against a validating resolver where the
|
|
# authoritative zone is unsigned (insecure delegation), noerror is
|
|
# returned.
|
|
msg = isctest.query.create("a.insecure.example", "RRSIG")
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.noerror(res)
|
|
isctest.check.rr_count_eq(res.answer, 0)
|
|
isctest.check.rr_count_eq(res.authority, 1)
|
|
isctest.check.rr_count_eq(res.additional, 0)
|
|
assert str(res.authority[0].name) == "insecure.example."
|
|
assert res.authority[0].rdtype == rdatatype.SOA
|
|
|
|
|
|
def test_adflag():
|
|
# compare auth and recursive answers
|
|
msg = isctest.query.create("a.example", "A", dnssec=False)
|
|
res1 = isctest.query.tcp(msg, "10.53.0.2")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
|
|
# check no AD flag in authoritative response
|
|
isctest.check.noadflag(res1)
|
|
|
|
# check validating resolver sends AD=1 if the client sent AD=1
|
|
isctest.check.adflag(res2)
|
|
|
|
# check that AD=0 unless the client sent AD=1
|
|
msg = isctest.query.create("a.example", "A", dnssec=False, ad=False)
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.noadflag(res2)
|
|
|
|
|
|
def test_secure_root(ns4, default_algorithm):
|
|
# check that a query for a secure root validates
|
|
msg = isctest.query.create(".", "KEY")
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.noerror(res)
|
|
isctest.check.adflag(res)
|
|
|
|
# check that "rndc secroots" dumps the trusted keys
|
|
key = int(getfrom("ns1/managed.key.id"))
|
|
response = ns4.rndc("secroots -")
|
|
assert f"./{default_algorithm.name}/{key} ; static" in response.out
|
|
assert len(response.out.splitlines()) == 10
|
|
|
|
|
|
def test_positive_validation_nsec():
|
|
# positive answer
|
|
msg = isctest.query.create("a.example", "A")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.2")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.adflag(res2)
|
|
|
|
# wildcard
|
|
msg = isctest.query.create("a.wild.example", "A")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.noerror(res2)
|
|
isctest.check.adflag(res2)
|
|
|
|
assert str(res2.authority[0].name) == "*.wild.example."
|
|
assert res2.authority[0].rdtype == rdatatype.NSEC
|
|
nsecs = [str(a).split(" ", maxsplit=1)[0] for a in res2.authority[0]]
|
|
assert "z.example." in nsecs
|
|
assert res2.authority[1].rdtype == rdatatype.RRSIG
|
|
assert res2.authority[1].covers == rdatatype.NSEC
|
|
|
|
# mixed case
|
|
for rrtype in ["a", "txt", "aaaa", "loc"]:
|
|
msg = isctest.query.create("mixedcase.secure.example", rrtype)
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.noerror(res2)
|
|
isctest.check.adflag(res2)
|
|
|
|
|
|
def test_positive_validation_nsec3():
|
|
# positive answer
|
|
msg = isctest.query.create("a.nsec3.example", "A")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.noerror(res2)
|
|
isctest.check.adflag(res2)
|
|
|
|
# wildcard
|
|
msg = isctest.query.create("a.wild.nsec3.example", "A")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.noerror(res2)
|
|
isctest.check.adflag(res2)
|
|
isctest.check.rr_count_eq(res2.authority, 2)
|
|
|
|
# unknown NSEC3 hash algorithm
|
|
msg = isctest.query.create("nsec3-unknown.example", "SOA", dnssec=False)
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.noerror(res1)
|
|
isctest.check.noerror(res2)
|
|
isctest.check.adflag(res2)
|
|
isctest.check.rr_count_eq(res2.answer, 1)
|
|
|
|
|
|
def test_positive_validation_optout():
|
|
# positive answer
|
|
msg = isctest.query.create("a.optout.example", "A")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.adflag(res2)
|
|
|
|
# wildcard
|
|
msg = isctest.query.create("a.wild.optout.example", "A")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.noerror(res2)
|
|
isctest.check.noadflag(res2)
|
|
|
|
# unknown NSEC3 hash algorithm
|
|
msg = isctest.query.create("optout-unknown.example", "SOA", dnssec=False)
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.noerror(res1)
|
|
isctest.check.noerror(res2)
|
|
isctest.check.adflag(res2)
|
|
isctest.check.rr_count_eq(res2.answer, 1)
|
|
|
|
|
|
def answer_has(r, rdtype):
|
|
return bool([r for r in r.answer if r.rdtype == rdtype])
|
|
|
|
|
|
def test_chain_validation():
|
|
# check validation of ANY response
|
|
msg = isctest.query.create("foo.example", "ANY")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.2")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.noerror(res2)
|
|
isctest.check.rr_count_eq(res2.answer, 6) # 2 records, 1 NSEC, 3 RRSIGs
|
|
|
|
# check validation of CNAME response
|
|
msg = isctest.query.create("cname1.example", "TXT")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.2")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.noerror(res2)
|
|
isctest.check.rr_count_eq(res2.answer, 4) # CNAME, TXT, 2 RRSIGs
|
|
|
|
# check validation of DNAME response
|
|
msg = isctest.query.create("foo.dname1.example", "TXT")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.2")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.noerror(res2)
|
|
isctest.check.rr_count_eq(res2.answer, 5) # DNAME, TXT, 2 RRSIGs, synth CNAME
|
|
|
|
# check validation of CNAME response to ANY query
|
|
msg = isctest.query.create("cname2.example", "ANY")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.2")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.noerror(res2)
|
|
isctest.check.rr_count_eq(res2.answer, 4) # CNAME, NSEC, 2 RRSIGs
|
|
|
|
# check validation of DNAME response to ANY query
|
|
msg = isctest.query.create("foo.dname2.example", "ANY")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.2")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.noerror(res2)
|
|
isctest.check.rr_count_eq(res2.answer, 3) # DNAME, RSRIG, synth CNAME
|
|
|
|
# check bad CNAME signature is caught after +CD query
|
|
msg = isctest.query.create("bad-cname.example", "A", dnssec=False, cd=True)
|
|
# query once with CD to prime the cache
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
# query again with CD, bogus pending data should be returned
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.noerror(res)
|
|
assert "a.example." in str(res.answer[0])
|
|
assert "10.0.0.1" in str(res.answer[1])
|
|
# query again without CD, bogus data should be rejected
|
|
msg = isctest.query.create("bad-cname.example", "A", dnssec=False)
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.servfail(res)
|
|
|
|
# check bad DNAME signature is caught after +CD query
|
|
msg = isctest.query.create("a.bad-dname.example", "A", dnssec=False, cd=True)
|
|
# query once with CD to prime the cache
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
# query again with CD, bogus pending data should be returned
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.noerror(res)
|
|
assert "example." in str(res.answer[0])
|
|
assert "a.example." in str(res.answer[1])
|
|
assert "10.0.0.1" in str(res.answer[2])
|
|
# query again without CD, bogus data should be rejected
|
|
msg = isctest.query.create("a.bad-dname.example", "A", dnssec=False)
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.servfail(res)
|
|
|
|
# check DNSKEY lookup via CNAME
|
|
msg = isctest.query.create("cnameandkey.secure.example", "DNSKEY")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.adflag(res2)
|
|
assert answer_has(res2, rdatatype.CNAME)
|
|
|
|
# check DNSKEY lookup via DNAME
|
|
msg = isctest.query.create("a.dnameandkey.secure.example", "DNSKEY")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.adflag(res2)
|
|
assert answer_has(res2, rdatatype.DNAME)
|
|
|
|
# check KEY lookup via DNAME
|
|
msg = isctest.query.create("a.dnameandkey.secure.example", "KEY")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.adflag(res2)
|
|
assert answer_has(res2, rdatatype.DNAME)
|
|
|
|
|
|
@isctest.mark.rsasha1
|
|
def test_signing_algorithms_rsasha1():
|
|
# rsasha1 (should work with FIPS mode we're as only validating)
|
|
msg = isctest.query.create("a.rsasha1.example", "A")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.adflag(res2)
|
|
|
|
# rsasha1 (1024 bits) NSEC
|
|
msg = isctest.query.create("a.rsasha1-1024.example", "A")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.adflag(res2)
|
|
|
|
|
|
def test_signing_algorithms():
|
|
# rsasha256
|
|
msg = isctest.query.create("a.rsasha256.example", "A")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.adflag(res2)
|
|
|
|
# rsasha512
|
|
msg = isctest.query.create("a.rsasha512.example", "A")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.adflag(res2)
|
|
|
|
# KSK-only DNSKEY
|
|
msg = isctest.query.create("a.kskonly.example", "A")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.adflag(res2)
|
|
|
|
|
|
def test_private_algorithms(ns4):
|
|
# positive answer, private algorithm
|
|
msg = isctest.query.create("a.rsasha256oid.example", "A")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.adflag(res2)
|
|
|
|
# positive answer, unknown private algorithm
|
|
msg = isctest.query.create("a.unknownoid.example", "A")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.noadflag(res2)
|
|
|
|
# positive answer, extra ds for private algorithm
|
|
msg = isctest.query.create("a.extradsoid.example", "A")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.adflag(res2)
|
|
|
|
# positive anwer, extra ds for unknown private algorithm
|
|
with ns4.watch_log_from_here() as watcher:
|
|
msg = isctest.query.create("a.extradsunknownoid.example", "A")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.noerror(res1)
|
|
isctest.check.servfail(res2)
|
|
watcher.wait_for_line("no DNSKEY matching DS")
|
|
|
|
|
|
@isctest.mark.extended_ds_digest
|
|
def test_private_algorithms_extended_ds():
|
|
# check positive validation with extra ds using extended digest
|
|
# type for unknown private algorithm
|
|
msg = isctest.query.create("a.extended-ds-unknown-oid.example", "A")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.adflag(res2)
|
|
|
|
|
|
def test_negative_validation_nsec():
|
|
# nxdomain
|
|
msg = isctest.query.create("q.example", "A")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.2")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.nxdomain(res2)
|
|
isctest.check.adflag(res2)
|
|
|
|
# nodata
|
|
msg = isctest.query.create("a.example", "TXT")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.2")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.noerror(res2)
|
|
isctest.check.empty_answer(res2)
|
|
isctest.check.adflag(res2)
|
|
|
|
# negative wildcard
|
|
msg = isctest.query.create("b.wild.example", "TXT")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.2")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.noerror(res2)
|
|
isctest.check.adflag(res2)
|
|
|
|
|
|
def test_negative_validation_nsec3():
|
|
# nxdomain
|
|
msg = isctest.query.create("q.nsec3.example", "A")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.nxdomain(res2)
|
|
isctest.check.adflag(res2)
|
|
|
|
# nodata
|
|
msg = isctest.query.create("a.nsec3.example", "TXT")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.noerror(res2)
|
|
isctest.check.empty_answer(res2)
|
|
isctest.check.adflag(res2)
|
|
|
|
# negative wildcard
|
|
msg = isctest.query.create("b.wild.nsec3.example", "TXT")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.noerror(res2)
|
|
isctest.check.adflag(res2)
|
|
|
|
# check NSEC3 zone with mismatched NSEC3PARAM / NSEC parameters
|
|
msg = isctest.query.create("non-exist.badparam", "A")
|
|
res = isctest.query.tcp(msg, "10.53.0.2")
|
|
isctest.check.nxdomain(res)
|
|
|
|
# check negative unknown NSEC3 hash algorithm does not validate
|
|
msg = isctest.query.create("nsec3-unknown.example", "A", dnssec=False)
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.noerror(res1)
|
|
isctest.check.servfail(res2)
|
|
|
|
|
|
def test_excessive_nsec3_iterations(ns2, ns3):
|
|
msg = "zone too-many-iterations/IN: excessive NSEC3PARAM iterations"
|
|
assert msg in ns2.log
|
|
assert msg in ns3.log
|
|
|
|
# check fallback to insecure with NSEC3 iterations is too high
|
|
msg = isctest.query.create("does-not-exist.too-many-iterations", "A")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.2")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_data(res1, res2)
|
|
isctest.check.noadflag(res2)
|
|
isctest.check.rr_count_eq(res2.answer, 0)
|
|
isctest.check.rr_count_eq(res2.authority, 8)
|
|
|
|
# check fallback to insecure with NSEC3 iterations is too high (nodata)
|
|
msg = isctest.query.create("a.too-many-iterations", "TXT")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.2")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_data(res1, res2)
|
|
isctest.check.noadflag(res2)
|
|
isctest.check.rr_count_eq(res2.answer, 0)
|
|
isctest.check.rr_count_eq(res2.authority, 4)
|
|
|
|
# check fallback to insecure with NSEC3 iterations is too high (wildcard)
|
|
msg = isctest.query.create("wild.a.too-many-iterations", "A")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.2")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.section_equal(res1.answer, res2.answer)
|
|
isctest.check.noadflag(res2)
|
|
isctest.check.rr_count_eq(res2.answer, 2)
|
|
isctest.check.rr_count_eq(res2.authority, 2)
|
|
a, _ = res2.answer
|
|
assert str(a.name) == "wild.a.too-many-iterations."
|
|
assert str(a[0]) == "10.0.0.3"
|
|
|
|
# check fallback to insecure with high NSEC3 iterations (wildcard nodata)
|
|
msg = isctest.query.create("wild.a.too-many-iterations", 100)
|
|
res1 = isctest.query.tcp(msg, "10.53.0.2")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_data(res1, res2)
|
|
isctest.check.noadflag(res2)
|
|
isctest.check.rr_count_eq(res2.authority, 8)
|
|
|
|
|
|
def test_auth_nsec3():
|
|
# nxdomain response, closest encloser with 0 empty non-terminals
|
|
msg = isctest.query.create("b.b.b.b.b.a.nsec3.example", "A")
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.nxdomain(res)
|
|
# closest encloser (a.nsec3.example):
|
|
rrset = res.get_rrset(
|
|
res.authority,
|
|
name.from_text("6OVDUHTN094ML2PV8AN90U0DPU823GH2.nsec3.example."),
|
|
rdataclass.IN,
|
|
rdatatype.NSEC3,
|
|
)
|
|
assert rrset, "NSEC3 missing from AUTHORITY: " + str(res)
|
|
assert "7AT0S0RIDCJRFF2M5H5AAV22CSFJBUL4" in str(rrset[0]).upper()
|
|
# no QNAME (b.a.nsec3.example/DSPF4R9UKOEPJ9O34E1H4539LSOTL14E)
|
|
rrset = res.get_rrset(
|
|
res.authority,
|
|
name.from_text("BEJ5GMQA872JF4DAGQ0R3O5Q7A2O5S9L.nsec3.example."),
|
|
rdataclass.IN,
|
|
rdatatype.NSEC3,
|
|
)
|
|
assert rrset, "expected NSEC3 missing from AUTHORITY: " + str(res)
|
|
assert "EF2S05SGK1IR2K5SKMFIRERGQCLMR18M" in str(rrset[0]).upper()
|
|
# no WILDCARD (*.a.nsec3.example/TFGQ60S97BS31IT1EBEDO63ETM0T5JFA)
|
|
rrset = res.get_rrset(
|
|
res.authority,
|
|
name.from_text("R8EVDMNIGNOKME4LH2H90OSP2PRSNJ1Q.nsec3.example."),
|
|
rdataclass.IN,
|
|
rdatatype.NSEC3,
|
|
)
|
|
assert rrset, "expected NSEC3 missing from AUTHORITY: " + str(res)
|
|
assert "VH656EQUD4J02OFVSO4GKOK5D02MS1TL" in str(rrset[0]).upper()
|
|
|
|
# nxdomain response, closest encloser with 1 ENT
|
|
msg = isctest.query.create("b.b.b.b.b.a.a.nsec3.example", "A")
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.nxdomain(res)
|
|
# closest encloser (a.a.nsec3.example):
|
|
rrset = res.get_rrset(
|
|
res.authority,
|
|
name.from_text("NGCJFSOLJUUE27PFNQNJIME4TQ0OU2DH.nsec3.example."),
|
|
rdataclass.IN,
|
|
rdatatype.NSEC3,
|
|
)
|
|
assert rrset, "expected NSEC3 missing from AUTHORITY: " + str(res)
|
|
assert "R8EVDMNIGNOKME4LH2H90OSP2PRSNJ1Q" in str(rrset[0]).upper()
|
|
# noqname (b.a.a.nsec3.example):
|
|
rrset = res.get_rrset(
|
|
res.authority,
|
|
name.from_text("R8EVDMNIGNOKME4LH2H90OSP2PRSNJ1Q.nsec3.example."),
|
|
rdataclass.IN,
|
|
rdatatype.NSEC3,
|
|
)
|
|
assert rrset, "expected NSEC3 missing from AUTHORITY: " + str(res)
|
|
assert "VH656EQUD4J02OFVSO4GKOK5D02MS1TL" in str(rrset[0]).upper()
|
|
# no wildcard (*.a.a.nsec3.example/V7JNNDJ4NLRIU195FRB7DLUCSLU4LLFM)
|
|
# is covered by the noqname proof in this case
|
|
|
|
# nxdomain response, closest encloser with 2 ENTs
|
|
msg = isctest.query.create("b.b.b.b.b.a.a.a.nsec3.example", "A")
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.nxdomain(res)
|
|
# closest encloser (a.a.a.nsec3.example):
|
|
rrset = res.get_rrset(
|
|
res.authority,
|
|
name.from_text("H7RHPDCHSVVRAND332F878C8AB6IBJQV.nsec3.example."),
|
|
rdataclass.IN,
|
|
rdatatype.NSEC3,
|
|
)
|
|
assert rrset, "expected NSEC3 missing from AUTHORITY: " + str(res)
|
|
assert "K8IG76R2UPQ13IKFO49L7IB9JRVB6QJI" in str(rrset[0]).upper()
|
|
# noqname (b.a.a.a.nsec3.example/18Q8D89RM8GGRSSOPFRB05QS6VEGB1P4)
|
|
rrset = res.get_rrset(
|
|
res.authority,
|
|
name.from_text("0T7VH688AEK0612T69V8692OCMJD50M4.nsec3.example."),
|
|
rdataclass.IN,
|
|
rdatatype.NSEC3,
|
|
)
|
|
assert rrset, "expected NSEC3 missing from AUTHORITY: " + str(res)
|
|
assert "1HARMGSKJH0EBU2EI2OJIKTDPIQA6KBI" in str(rrset[0]).upper()
|
|
# no WILDCARD (*.a.a.a.nsec3.example/8113LDMSEFPUAG4VGFF1C8KLOUT4Q6PH)
|
|
rrset = res.get_rrset(
|
|
res.authority,
|
|
name.from_text("7AT0S0RIDCJRFF2M5H5AAV22CSFJBUL4.nsec3.example."),
|
|
rdataclass.IN,
|
|
rdatatype.NSEC3,
|
|
)
|
|
assert rrset, "expected NSEC3 missing from AUTHORITY: " + str(res)
|
|
assert "BEJ5GMQA872JF4DAGQ0R3O5Q7A2O5S9L" in str(rrset[0]).upper()
|
|
|
|
|
|
def test_negative_validation_optout():
|
|
# nxdomain
|
|
msg = isctest.query.create("q.optout.example", "A")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.nxdomain(res2)
|
|
isctest.check.noadflag(res2)
|
|
|
|
# nodata
|
|
msg = isctest.query.create("a.optout.example", "TXT")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.noerror(res2)
|
|
isctest.check.empty_answer(res2)
|
|
isctest.check.adflag(res2)
|
|
|
|
# negative wildcard
|
|
msg = isctest.query.create("b.wild.optout.example", "TXT")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.noerror(res2)
|
|
isctest.check.noadflag(res2)
|
|
|
|
# empty NODATA
|
|
msg = isctest.query.create("empty.optout.example", "TXT")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.noerror(res2)
|
|
isctest.check.noadflag(res2)
|
|
|
|
# (rt22007 regression tests:)
|
|
# check optout NSEC3 referral with only insecure delegatons
|
|
msg = isctest.query.create("delegation.single-nsec3", "A")
|
|
res = isctest.query.tcp(msg, "10.53.0.2")
|
|
isctest.check.noerror(res)
|
|
for rrset in res.authority:
|
|
if (
|
|
rrset.rdtype != rdatatype.NSEC3
|
|
or "3KL3NK1HKQ4IUEEHBEF12VGFKUETNBAN" not in rrset.name
|
|
):
|
|
continue
|
|
assert "1 1 1 - 3KL3NK1HKQ4IUEEHBEF12VGFKUETNBAN" in str(rrset[0])
|
|
|
|
# check optout NSEC3 NXDOMAIN with only insecure delegatons
|
|
msg = isctest.query.create("nonexist.single-nsec3", "A")
|
|
res = isctest.query.tcp(msg, "10.53.0.2")
|
|
isctest.check.nxdomain(res)
|
|
for rrset in res.authority:
|
|
if (
|
|
rrset.rdtype != rdatatype.NSEC3
|
|
or "3KL3NK1HKQ4IUEEHBEF12VGFKUETNBAN" not in rrset.name
|
|
):
|
|
continue
|
|
assert "1 1 1 - 3KL3NK1HKQ4IUEEHBEF12VGFKUETNBAN" in str(rrset[0])
|
|
|
|
# check optout NSEC3 NODATA with only insecure delegatons
|
|
msg = isctest.query.create("single-nsec3", "A")
|
|
res = isctest.query.tcp(msg, "10.53.0.2")
|
|
isctest.check.noerror(res)
|
|
for rrset in res.authority:
|
|
if (
|
|
rrset.rdtype != rdatatype.NSEC3
|
|
or "3KL3NK1HKQ4IUEEHBEF12VGFKUETNBAN" not in rrset.name
|
|
):
|
|
continue
|
|
assert "1 1 1 - 3KL3NK1HKQ4IUEEHBEF12VGFKUETNBAN" in str(rrset[0])
|
|
|
|
# check negative unknown NSEC3-OPTOUT hash algorithm does not validate
|
|
msg = isctest.query.create("optout-unknown.example", "A", dnssec=False)
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.noerror(res1)
|
|
isctest.check.servfail(res2)
|
|
|
|
|
|
def test_cache(ns4):
|
|
ns4.rndc("dumpdb -cache")
|
|
with isctest.log.WatchLogFromStart("ns4/named_dump.db") as watcher:
|
|
watcher.wait_for_all(
|
|
[
|
|
"; key id = ", # check that key id's are logged when dumping the cache
|
|
"; example. RRSIG NSEC ", # check for RRSIG covered type in negative cache
|
|
]
|
|
)
|
|
|
|
# check validated data are not cached longer than originalttl
|
|
msg = isctest.query.create("a.ttlpatch.example", "A")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.rr_count_eq(res1.answer, 2)
|
|
isctest.check.rr_count_eq(res2.answer, 2)
|
|
for rrset in res1.answer:
|
|
assert 3000 <= rrset.ttl <= 3600
|
|
for rrset in res2.answer:
|
|
assert rrset.ttl <= 300
|
|
|
|
# first query for a NS record, to cache NSEC and RRSIG(NSEC)
|
|
msg = isctest.query.create("normalthenrrsig.secure.example", "NS")
|
|
isctest.query.tcp(msg, "10.53.0.4")
|
|
# query for a record, then follow it with a query for the
|
|
# corresponding RRSIG, check that it's answered from the cache
|
|
msg = isctest.query.create("normalthenrrsig.secure.example", "A")
|
|
isctest.query.tcp(msg, "10.53.0.4")
|
|
|
|
msg = isctest.query.create("normalthenrrsig.secure.example", "RRSIG")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.noerror(res2)
|
|
isctest.check.raflag(res2)
|
|
|
|
# check direct query for RRSIG: if it's not cached with other records,
|
|
# it should result in an empty response.
|
|
msg = isctest.query.create("rrsigonly.secure.example", "RRSIG")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.empty_answer(res1)
|
|
isctest.check.noraflag(res1)
|
|
|
|
# check that a DNSKEY query with no data still gets cached
|
|
msg = isctest.query.create("insecure.example", "DNSKEY")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.4")
|
|
time.sleep(1) # give the TTL time to change
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
if res1.authority[0].ttl == res2.authority[0].ttl:
|
|
time.sleep(1)
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
assert res1.authority[0].ttl != res2.authority[0].ttl
|
|
|
|
|
|
def test_insecure_proof_nsec(ns4):
|
|
# 1-server positive
|
|
msg = isctest.query.create("a.insecure.example", "A")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.noerror(res2)
|
|
isctest.check.noadflag(res2)
|
|
|
|
# 1-server negative
|
|
msg = isctest.query.create("q.insecure.example", "A")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.nxdomain(res2)
|
|
isctest.check.noadflag(res2)
|
|
|
|
# 1-server negative with SOA hack
|
|
msg = isctest.query.create("r.insecure.example", "SOA")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.nxdomain(res2)
|
|
isctest.check.noadflag(res2)
|
|
assert res2.authority[0].rdtype == rdatatype.SOA
|
|
assert res2.authority[0].ttl == 0
|
|
|
|
# 2-server positive
|
|
msg = isctest.query.create("a.insecure.secure.example", "A")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.2")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.noerror(res2)
|
|
isctest.check.noadflag(res2)
|
|
|
|
# 2-server negative
|
|
msg = isctest.query.create("q.insecure.secure.example", "A")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.2")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.nxdomain(res2)
|
|
isctest.check.noadflag(res2)
|
|
|
|
# 2-server negative with SOA hack
|
|
msg = isctest.query.create("r.insecure.secure.example", "SOA")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.2")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.nxdomain(res2)
|
|
isctest.check.noadflag(res2)
|
|
|
|
# insecurity proof using negative cache
|
|
ns4.rndc("flush")
|
|
msg = isctest.query.create("insecure.example", "DS", cd=True)
|
|
isctest.query.tcp(msg, "10.53.0.4")
|
|
|
|
def query_and_check_nxdomain():
|
|
msg = isctest.query.create("nonexistent.insecure.example", "A")
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.nxdomain(res)
|
|
return True
|
|
|
|
isctest.run.retry_with_timeout(query_and_check_nxdomain, 20)
|
|
|
|
# check insecure negative response with an unsigned NSEC
|
|
# first try the auth server...
|
|
msg = isctest.query.create("nsec-rrsigs-stripped", "TXT")
|
|
res1 = isctest.query.udp(msg, "10.53.0.10")
|
|
isctest.check.noerror(res1)
|
|
isctest.check.empty_answer(res1)
|
|
isctest.check.rr_count_eq(res1.authority, 2)
|
|
isctest.check.rr_count_eq(res1.additional, 0)
|
|
# make sure there's no RRSIG(NSEC)
|
|
for rrset in res1.authority:
|
|
assert rrset.rdtype != rdatatype.RRSIG or rrset.covers != rdatatype.NSEC
|
|
# now try the resolver
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_data(res1, res2)
|
|
isctest.check.noadflag(res2)
|
|
|
|
|
|
def test_insecure_proof_nsec3():
|
|
# 1-server
|
|
msg = isctest.query.create("a.insecure.nsec3.example", "A")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_data(res1, res2)
|
|
isctest.check.noerror(res2)
|
|
isctest.check.noadflag(res2)
|
|
|
|
# 1-server negative
|
|
msg = isctest.query.create("q.insecure.nsec3.example", "A")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_data(res1, res2)
|
|
isctest.check.nxdomain(res2)
|
|
isctest.check.noadflag(res2)
|
|
|
|
# 1-server negative with SOA hack
|
|
msg = isctest.query.create("r.insecure.nsec3.example", "SOA")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_data(res1, res2)
|
|
isctest.check.nxdomain(res2)
|
|
isctest.check.noadflag(res2)
|
|
assert res2.authority[0].rdtype == rdatatype.SOA
|
|
assert res2.authority[0].ttl == 0
|
|
|
|
|
|
def test_insecure_proof_optout():
|
|
# 1-server
|
|
msg = isctest.query.create("a.insecure.optout.example", "A")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_data(res1, res2)
|
|
isctest.check.noerror(res2)
|
|
isctest.check.noadflag(res2)
|
|
|
|
# 1-server negative
|
|
msg = isctest.query.create("q.insecure.optout.example", "A")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_data(res1, res2)
|
|
isctest.check.nxdomain(res2)
|
|
isctest.check.noadflag(res2)
|
|
|
|
# 1-server negative with SOA hack
|
|
msg = isctest.query.create("r.insecure.optout.example", "SOA")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_data(res1, res2)
|
|
isctest.check.nxdomain(res2)
|
|
isctest.check.noadflag(res2)
|
|
assert res2.authority[0].rdtype == rdatatype.SOA
|
|
assert res2.authority[0].ttl == 0
|
|
|
|
|
|
def test_below_cname():
|
|
# check insecure zone below a cname resolves
|
|
msg = isctest.query.create("insecure.below-cname.example", "SOA")
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.noerror(res)
|
|
isctest.check.noadflag(res)
|
|
isctest.check.rr_count_eq(res.answer, 1)
|
|
|
|
# check secure zone below a cname resolves and validates
|
|
msg = isctest.query.create("secure.below-cname.example", "SOA")
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.noerror(res)
|
|
isctest.check.adflag(res)
|
|
isctest.check.rr_count_eq(res.answer, 2)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"qname",
|
|
[
|
|
"a.secure.example", # NSEC/NSEC
|
|
"a.nsec3.example", # NSEC/NSEC3
|
|
"a.optout.example", # NSEC/OPTOUT
|
|
"a.secure.nsec3.example", # NSEC3/NSEC
|
|
"a.nsec3.nsec3.example", # NSEC3/NSEC3
|
|
"a.optout.nsec3.example", # NSEC3/OPTOUT
|
|
"a.secure.optout.example", # OPTOUT/NSEC
|
|
"a.nsec3.optout.example", # OPTOUT/NSEC3
|
|
"a.optout.optout.example", # OPTOUT/OPTOUT
|
|
],
|
|
)
|
|
def test_positive_validation_multistage(qname):
|
|
msg = isctest.query.create(qname, "A")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.noerror(res2)
|
|
isctest.check.adflag(res2)
|
|
|
|
|
|
def test_validation_recovery(ns2, ns4):
|
|
# check recovery from spoofed server address.
|
|
# prime cache with spoofed address records...
|
|
msg = isctest.query.create("target.peer-ns-spoof", "A", cd=True)
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.servfail(res)
|
|
ns4.rndc("dumpdb")
|
|
with isctest.log.WatchLogFromStart("ns4/named_dump.db") as watcher:
|
|
watcher.wait_for_line("10.53.0.100")
|
|
|
|
# then reload server with properly signed zone
|
|
shutil.copyfile(
|
|
"ns2/peer.peer-ns-spoof.db.next", "ns2/peer.peer-ns-spoof.db.signed"
|
|
)
|
|
with ns2.watch_log_from_here() as watcher:
|
|
ns2.rndc("reload peer.peer-ns-spoof")
|
|
watcher.wait_for_line("zone peer.peer-ns-spoof/IN: loaded serial 2000042408")
|
|
|
|
# and check we can resolve with the correct server address
|
|
msg = isctest.query.create("test.target.peer-ns-spoof", "TXT")
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.nxdomain(res)
|
|
isctest.check.adflag(res)
|
|
|
|
# check recovery from stripped DNSKEY RRSIG.
|
|
# prime cache with spoofed address records...
|
|
msg = isctest.query.create("dnskey-rrsigs-stripped", "DNSKEY", cd=True)
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.noerror(res)
|
|
isctest.check.rr_count_eq(res.answer, 2)
|
|
|
|
# then reload server with properly signed zone
|
|
shutil.copyfile(
|
|
"ns2/dnskey-rrsigs-stripped.db.next", "ns2/dnskey-rrsigs-stripped.db.signed"
|
|
)
|
|
with ns2.watch_log_from_here() as watcher:
|
|
ns2.rndc("reload dnskey-rrsigs-stripped")
|
|
watcher.wait_for_line(
|
|
"zone dnskey-rrsigs-stripped/IN: loaded serial 2000042408"
|
|
)
|
|
|
|
# and check we can now resolve with the correct server address
|
|
msg = isctest.query.create("b.dnskey-rrsigs-stripped", "A")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.2")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.adflag(res2)
|
|
|
|
# check recovery from stripped DS RRSIG.
|
|
# prime cache with spoofed address records...
|
|
msg = isctest.query.create("child.ds-rrsigs-stripped", "DS", cd=True)
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.noerror(res)
|
|
isctest.check.rr_count_eq(res.answer, 1)
|
|
|
|
# then reload server with properly signed zone
|
|
shutil.copyfile(
|
|
"ns2/ds-rrsigs-stripped.db.next", "ns2/ds-rrsigs-stripped.db.signed"
|
|
)
|
|
with ns2.watch_log_from_here() as watcher:
|
|
ns2.rndc("reload ds-rrsigs-stripped")
|
|
watcher.wait_for_line("zone ds-rrsigs-stripped/IN: loaded serial 2000042408")
|
|
|
|
# and check we can now resolve with the correct server address
|
|
msg = isctest.query.create("b.child.ds-rrsigs-stripped", "A")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.2")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.adflag(res2)
|
|
|
|
# check recovery with mismatching NS
|
|
ns4.rndc("flush")
|
|
msg = isctest.query.create("inconsistent", "NS", dnssec=False, cd=True)
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.noadflag(res)
|
|
isctest.check.rr_count_eq(res.answer, 2)
|
|
isctest.check.rr_count_eq(res.additional, 0)
|
|
|
|
msg = isctest.query.create("inconsistent", "NS", cd=True)
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.noadflag(res)
|
|
isctest.check.rr_count_eq(res.answer, 3)
|
|
isctest.check.rr_count_eq(res.additional, 0)
|
|
|
|
msg = isctest.query.create("inconsistent", "NS")
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.adflag(res)
|
|
isctest.check.rr_count_eq(res.answer, 3)
|
|
isctest.check.rr_count_eq(res.additional, 0)
|
|
|
|
|
|
def test_failed_validation():
|
|
# bogus zone
|
|
msg = isctest.query.create("a.bogus.example", "A")
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.servfail(res)
|
|
|
|
# missing key record
|
|
msg = isctest.query.create("a.b.keyless.example", "A")
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.servfail(res)
|
|
|
|
|
|
def test_revoked_key():
|
|
# validation should succeed if a revoked key is encountered
|
|
msg = isctest.query.create("revkey.example", "SOA")
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.noerror(res)
|
|
isctest.check.adflag(res)
|
|
|
|
|
|
def test_standby_key():
|
|
# check that a secure chain with one active and one inactive KSK
|
|
# validates as secure
|
|
msg = isctest.query.create("a.lazy-ksk", "A")
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.noerror(res)
|
|
isctest.check.adflag(res)
|
|
|
|
|
|
def test_transitions():
|
|
# check that a zone finishing transitioning from one algorithm
|
|
# to another validates secure
|
|
msg = isctest.query.create("algroll", "NS")
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.noerror(res)
|
|
isctest.check.adflag(res)
|
|
|
|
# check that validation yields insecure during transition to signed
|
|
msg = isctest.query.create("inprogress", "A")
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.noerror(res)
|
|
isctest.check.noadflag(res)
|
|
a, _ = res.answer
|
|
assert str(a[0]) == "10.53.0.10"
|
|
|
|
|
|
def test_validating_forwarder(ns4, ns9):
|
|
# check validating forwarder behavior with mismatching NS
|
|
ns4.rndc("flush")
|
|
msg = isctest.query.create("inconsistent", "NS", dnssec=False, cd=True)
|
|
res = isctest.query.tcp(msg, "10.53.0.9")
|
|
isctest.check.noerror(res)
|
|
isctest.check.rr_count_eq(res.answer, 2)
|
|
isctest.check.rr_count_eq(res.additional, 0)
|
|
isctest.check.noadflag(res)
|
|
|
|
msg = isctest.query.create("inconsistent", "NS", cd=True)
|
|
res = isctest.query.tcp(msg, "10.53.0.9")
|
|
isctest.check.rr_count_eq(res.additional, 0)
|
|
isctest.check.noadflag(res)
|
|
isctest.check.rr_count_eq(res.answer, 3)
|
|
isctest.check.rr_count_eq(res.authority, 0)
|
|
isctest.check.rr_count_eq(res.additional, 0)
|
|
|
|
msg.flags &= ~flags.CD
|
|
res = isctest.query.tcp(msg, "10.53.0.9")
|
|
isctest.check.rr_count_eq(res.answer, 3)
|
|
isctest.check.rr_count_eq(res.authority, 0)
|
|
isctest.check.rr_count_eq(res.additional, 0)
|
|
isctest.check.adflag(res)
|
|
|
|
# check validating forwarder sends CD to validate with a local trust anchor
|
|
ns4.rndc("flush")
|
|
msg = isctest.query.create("localkey.example", "SOA")
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.servfail(res)
|
|
|
|
with ns9.watch_log_from_here() as watcher:
|
|
res = isctest.query.tcp(msg, "10.53.0.9")
|
|
isctest.check.noerror(res)
|
|
isctest.check.adflag(res)
|
|
watcher.wait_for_line("status: SERVFAIL")
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"zone",
|
|
[
|
|
"missing-dnskey",
|
|
"wrong-dnskey",
|
|
"missing-ksk",
|
|
"a.extradsunknownoid.example",
|
|
],
|
|
)
|
|
def test_missing_dnskey(zone, ns4):
|
|
msg = isctest.query.create(f"a.{zone}", "A")
|
|
res = isctest.query.tcp(msg, ns4.ip)
|
|
isctest.check.servfail(res)
|
|
isctest.check.ede(res, EDECode.DNSKEY_MISSING)
|
|
|
|
|
|
def test_expired_signatures(ns4):
|
|
# check expired signatures do not validate
|
|
msg = isctest.query.create("expired.example", "SOA")
|
|
res = isctest.query.tcp(msg, "10.53.0.3")
|
|
rrsig = res.get_rrset(
|
|
res.answer,
|
|
name.from_text("expired.example."),
|
|
rdataclass.IN,
|
|
rdatatype.RRSIG,
|
|
rdatatype.SOA,
|
|
)
|
|
assert rrsig, "expected RRSIG(SOA) missing from AUTHORITY: " + str(rrsig)
|
|
isctest.check.rr_count_eq(res.answer, 2)
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.servfail(res)
|
|
isctest.check.noadflag(res)
|
|
isctest.check.ede(res, EDECode.SIGNATURE_EXPIRED)
|
|
assert Re("expired.example/.*: RRSIG has expired") in ns4.log
|
|
|
|
# check future signatures do not validate
|
|
msg = isctest.query.create("future.example", "SOA")
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.servfail(res)
|
|
isctest.check.noadflag(res)
|
|
isctest.check.ede(res, EDECode.SIGNATURE_NOT_YET_VALID)
|
|
assert Re("future.example/.*: RRSIG validity period has not begun") in ns4.log
|
|
|
|
# check that a dynamic zone with future signatures is re-signed on load
|
|
msg = isctest.query.create("managed-future.example", "A")
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.adflag(res)
|
|
isctest.check.noerror(res)
|
|
|
|
# test TTL is capped at RRSIG expiry time
|
|
ns4.rndc("flush")
|
|
msg = isctest.query.create("expiring.example", "SOA", cd=True)
|
|
res1 = isctest.query.tcp(msg, "10.53.0.4")
|
|
msg = isctest.query.create("expiring.example", "SOA")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
for rrset in res1.answer:
|
|
assert 240 <= rrset.ttl <= 300
|
|
for rrset in res2.answer:
|
|
assert rrset.ttl <= 60
|
|
|
|
# test TTL is capped at RRSIG expiry time in the additional section (NS)
|
|
ns4.rndc("flush")
|
|
msg = isctest.query.create("expiring.example", "NS", cd=True)
|
|
res1 = isctest.query.tcp(msg, "10.53.0.4")
|
|
msg = isctest.query.create("expiring.example", "NS")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
for rrset in res1.additional:
|
|
assert 240 <= rrset.ttl <= 300
|
|
for rrset in res2.additional:
|
|
assert rrset.ttl <= 60
|
|
|
|
# test TTL is capped at RRSIG expiry time in the additional section (MX)
|
|
ns4.rndc("flush")
|
|
msg = isctest.query.create("expiring.example", "MX", cd=True)
|
|
res1 = isctest.query.tcp(msg, "10.53.0.4")
|
|
msg = isctest.query.create("expiring.example", "MX")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
for rrset in res1.additional:
|
|
assert 240 <= rrset.ttl <= 300
|
|
for rrset in res2.additional:
|
|
assert rrset.ttl <= 60
|
|
|
|
|
|
def test_casing():
|
|
# test legacy upper-case signer name validation
|
|
msg = isctest.query.create("upper.example", "SOA")
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.adflag(res)
|
|
_, sig = res.answer
|
|
assert sig.rdtype == rdatatype.RRSIG
|
|
assert sig.covers == rdatatype.SOA
|
|
assert "UPPER.EXAMPLE." in str(sig[0])
|
|
|
|
# test that we lower-case signer name
|
|
msg = isctest.query.create("LOWER.EXAMPLE", "SOA")
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
_, sig = res.answer
|
|
assert sig.rdtype == rdatatype.RRSIG
|
|
assert sig.covers == rdatatype.SOA
|
|
assert "lower.example." in str(sig[0])
|
|
|
|
|
|
def test_broken_servers():
|
|
# check that a non-cacheable NODATA works
|
|
msg = isctest.query.create("a.nosoa.secure.example", "TXT")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.6")
|
|
isctest.check.rr_count_eq(res1.authority, 0)
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.noerror(res2)
|
|
|
|
# check that a non-cacheable NXDOMAIN works
|
|
msg = isctest.query.create("b.nosoa.secure.example", "TXT")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.6")
|
|
isctest.check.rr_count_eq(res1.authority, 0)
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.nxdomain(res2)
|
|
|
|
# check that split RRSIGs are handled
|
|
msg = isctest.query.create("split-rrsig", "SOA")
|
|
res = isctest.query.tcp(msg, "10.53.0.6")
|
|
soa, _ = res.answer
|
|
assert soa[0].serial > 1
|
|
|
|
# check that not-at-zone-apex RRSIG(SOA) rrsets are removed
|
|
msg = isctest.query.create("split-rrsig", "AXFR")
|
|
res = isctest.query.tcp(msg, "10.53.0.6")
|
|
|
|
nza = [
|
|
r
|
|
for r in res.answer
|
|
if str(r.name) == "not-at-zone-apex.split-rrsig."
|
|
and r.rdtype == rdatatype.RRSIG
|
|
and r.covers == rdatatype.SOA
|
|
]
|
|
assert not nza
|
|
|
|
# check validation with missing nearest encloser proof
|
|
msg = isctest.query.create("b.c.d.optout-tld", "DS")
|
|
res = isctest.query.tcp(msg, "10.53.0.6")
|
|
nsec3s = [a for a in res.authority if a.rdtype == rdatatype.NSEC3]
|
|
assert len(nsec3s) == 2
|
|
|
|
msg = isctest.query.create("b.c.d.optout-tld", "A")
|
|
res = isctest.query.tcp(msg, "10.53.0.6")
|
|
nsec3s = [a for a in res.authority if a.rdtype == rdatatype.NSEC3]
|
|
assert len(nsec3s) == 1
|
|
|
|
res = isctest.query.tcp(msg, "10.53.0.6")
|
|
isctest.check.noerror(res)
|
|
isctest.check.noadflag(res)
|
|
msg = isctest.query.create("optout-tld", "SOA")
|
|
res = isctest.query.tcp(msg, "10.53.0.6")
|
|
isctest.check.noadflag(res)
|
|
|
|
|
|
def test_pending_ds(ns4):
|
|
# check that a query against a validating resolver succeeds when there is
|
|
# a negative cache entry with trust level "pending" for the DS. prime
|
|
# with a +cd DS query to produce the negative cache entry, then send a
|
|
# query that uses that entry as part of the validation process.
|
|
ns4.rndc("flush")
|
|
msg = isctest.query.create("insecure.example", "DS", cd=True)
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.noerror(res)
|
|
isctest.check.rr_count_eq(res.answer, 0)
|
|
isctest.check.rr_count_eq(res.authority, 4)
|
|
msg = isctest.query.create("a.insecure.example", "A")
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.noerror(res)
|
|
isctest.check.rr_count_eq(res.answer, 1)
|
|
isctest.check.rr_count_eq(res.authority, 0)
|
|
isctest.check.noadflag(res)
|
|
|
|
|
|
def test_unknown_algorithms():
|
|
# check that unknown DNSKEY algorithm validates as insecure
|
|
msg = isctest.query.create("dnskey-unknown.example", "A", dnssec=False)
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.noerror(res1)
|
|
isctest.check.noerror(res2)
|
|
isctest.check.noadflag(res2)
|
|
|
|
# check that unsupported DNSKEY algorithms are in the DNSKEY RRsets
|
|
msg = isctest.query.create("dnskey-unsupported.example", "DNSKEY")
|
|
res = isctest.query.tcp(msg, "10.53.0.3")
|
|
isctest.check.noerror(res)
|
|
|
|
msg = isctest.query.create("dnskey-unsupported-2.example", "DNSKEY")
|
|
res = isctest.query.tcp(msg, "10.53.0.3")
|
|
isctest.check.noerror(res)
|
|
rrsets = [str(r) for r in res.answer]
|
|
assert any("257 3 255" in r for r in rrsets)
|
|
|
|
# check that unsupported DNSKEY algorithm validates as insecure
|
|
msg = isctest.query.create("dnskey-unsupported.example", "DNSKEY")
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.noerror(res)
|
|
isctest.check.noadflag(res)
|
|
isctest.check.ede(res, EDECode.UNSUPPORTED_DNSKEY_ALGORITHM)
|
|
|
|
# check that DNSKEY with an unsupported reserve key validates
|
|
msg = isctest.query.create("dnskey-unsupported-2.example", "DNSKEY")
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.noerror(res)
|
|
isctest.check.adflag(res)
|
|
|
|
# check EDE code 2 for unsupported DS digest algorithm
|
|
msg = isctest.query.create("a.ds-unsupported.example", "A")
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.ede(res, EDECode.UNSUPPORTED_DS_DIGEST_TYPE)
|
|
|
|
# check EDE code 1 for bad algorithm mnemonic
|
|
msg = isctest.query.create("badalg.secure.example", "A")
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.noerror(res)
|
|
isctest.check.noadflag(res)
|
|
isctest.check.ede(res, EDECode.UNSUPPORTED_DNSKEY_ALGORITHM)
|
|
|
|
# check that zone contents are still secure despite disable-algorithms
|
|
# on query name (name below zone name).
|
|
msg = isctest.query.create("z.secure.example", "A")
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.rr_count_eq(res.answer, 2)
|
|
isctest.check.noerror(res)
|
|
isctest.check.adflag(res)
|
|
|
|
# check that zone contents are trated insecure (name above zone name).
|
|
msg = isctest.query.create("zonecut.ent.secure.example", "A")
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.rr_count_eq(res.answer, 2)
|
|
isctest.check.noerror(res)
|
|
isctest.check.noadflag(res)
|
|
isctest.check.ede(res, EDECode.UNSUPPORTED_DNSKEY_ALGORITHM)
|
|
|
|
# check that DS records are still treated as secure at the
|
|
# disable-algorithm name
|
|
msg = isctest.query.create("badalg.secure.example", "DS")
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.rr_count_eq(res.answer, 2)
|
|
isctest.check.noerror(res)
|
|
isctest.check.adflag(res)
|
|
|
|
# check both EDE code 1 and 2 for unsupported digest on one DNSKEY
|
|
# and unsupported algorithm on the other
|
|
msg = isctest.query.create("a.digest-alg-unsupported.example", "A")
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.noadflag(res)
|
|
isctest.check.ede(res, EDECode.UNSUPPORTED_DNSKEY_ALGORITHM)
|
|
isctest.check.ede(res, EDECode.UNSUPPORTED_DS_DIGEST_TYPE)
|
|
|
|
# check that unknown DNSKEY algorithm + unknown NSEC3 hash algorithm
|
|
# validates as insecure
|
|
msg = isctest.query.create("dnskey-nsec3-unknown.example", "A")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.noerror(res1)
|
|
isctest.check.noerror(res2)
|
|
isctest.check.noadflag(res2)
|
|
|
|
|
|
def test_legacy_dnssec_types_are_signed():
|
|
"""SIG (24), NXT (30) and KEY (25) records carry a covering RRSIG.
|
|
|
|
Per RFC 3755 SIG and NXT are obsolete and treated as opaque zone
|
|
data; KEY remains valid for SIG(0)/TKEY use. All three are
|
|
ordinary zone data and must be signed like any other RRset.
|
|
"""
|
|
for owner, rrtype in [
|
|
("sigrr.secure.example", "SIG"),
|
|
("nxtrr.secure.example", "NXT"),
|
|
("keyrr.secure.example", "KEY"),
|
|
]:
|
|
expected = rdatatype.from_text(rrtype)
|
|
msg = isctest.query.create(owner, rrtype, cd=True)
|
|
res = isctest.query.tcp(msg, "10.53.0.3")
|
|
isctest.check.noerror(res)
|
|
assert any(
|
|
rr.rdtype == expected for rr in res.answer
|
|
), f"{rrtype} record missing in answer for {owner}: {res.answer}"
|
|
assert any(
|
|
rr.rdtype == rdatatype.RRSIG and rr.covers == expected for rr in res.answer
|
|
), f"RRSIG({rrtype}) missing in answer for {owner}: {res.answer}"
|
|
|
|
|
|
def test_rrsigs_for_glue():
|
|
msg = isctest.query.create("ns3.secure.example", "A", cd=True)
|
|
res = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.noerror(res)
|
|
isctest.check.rr_count_eq(res.answer, 2)
|
|
assert any(record.rdtype == rdatatype.A for record in res.answer)
|
|
assert any(
|
|
record.rdtype == rdatatype.RRSIG and record.covers == rdatatype.A
|
|
for record in res.answer
|
|
)
|
|
|
|
|
|
def test_extra_bad_algorithm():
|
|
msg = isctest.query.create("a.extrabadkey.example", "A")
|
|
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
|
res2 = isctest.query.tcp(msg, "10.53.0.4")
|
|
isctest.check.same_answer(res1, res2)
|
|
isctest.check.adflag(res2)
|