Update multisigner system test to set primary

When testing multi-signer as bump-in-the-wire (upcoming test), we want
to be able to do dynamically updates to a hidden primary. Update the
test functions such that we can set a specific primary server.
This commit is contained in:
Matthijs Mekking 2025-10-10 18:02:58 +02:00
parent 9ae449afd1
commit fdf8a171c5

View file

@ -111,11 +111,14 @@ def check_no_dnssec_in_journal(server, zone):
assert not match, f"{match.group(1)} record found in journal"
def check_add_zsk(server, zone, keys, expected, extra_keys, extra):
def check_add_zsk(server, zone, keys, expected, extra_keys, extra, primary=None):
if primary is None:
primary = server
isctest.log.info("add dnskey record:")
isctest.log.info(
f"- zone {zone} {server.identifier}: update zone with ZSK from other providers"
f"- zone {zone} {primary.identifier}: update zone with ZSK from other providers"
)
update_msg = dns.update.UpdateMessage(zone)
@ -123,7 +126,7 @@ def check_add_zsk(server, zone, keys, expected, extra_keys, extra):
dnskey = zsk.dnskey().split()
rdata = " ".join(dnskey[4:])
update_msg.add(f"{zone}.", TTL, "DNSKEY", rdata)
server.nsupdate(update_msg)
primary.nsupdate(update_msg)
# Check the new DNSKEY RRset.
isctest.log.info(
@ -148,11 +151,14 @@ def check_add_zsk(server, zone, keys, expected, extra_keys, extra):
server.log.prohibit(f"dns_zone_findkeys: error reading ./K{zone}")
def check_remove_zsk(server, zone, keys, expected, extra_keys, extra):
isctest.log.info("remove dnskey record:")
def _check_remove_zsk_fail(
server, zone, keys, expected, extra_keys, extra, primary=None
):
if primary is None:
primary = server
isctest.log.info(
f"- zone {zone} {server.identifier}: try to remove own ZSK (should fail)"
f"- zone {zone} {primary.identifier}: try to remove own ZSK (should fail)"
)
zsks = [k for k in keys if not k.is_ksk()]
@ -160,15 +166,15 @@ def check_remove_zsk(server, zone, keys, expected, extra_keys, extra):
rdata = " ".join(dnskey[4:])
update_msg = dns.update.UpdateMessage(zone)
update_msg.delete(f"{zone}.", "DNSKEY", rdata)
with server.watch_log_from_here() as watcher:
server.nsupdate(update_msg)
with primary.watch_log_from_here() as watcher:
primary.nsupdate(update_msg)
watcher.wait_for_line(
f"updating zone '{zone}/IN': attempt to delete in use DNSKEY ignored"
)
# Both ZSKs should still be published.
isctest.log.info(
f"- zone {zone} {server.identifier}: check DNSKEY RRset after update remove"
f"- zone {zone} {server.identifier}: check DNSKEY RRset after ignored remove"
)
check_dnssec(server, zone, keys + extra_keys, expected + extra)
@ -181,9 +187,23 @@ def check_remove_zsk(server, zone, keys, expected, extra_keys, extra):
isctest.log.info(f"- zone {zone} {server.identifier}: check again after keymgr run")
check_dnssec(server, zone, keys + extra_keys, expected + extra)
def check_remove_zsk(
server, zone, keys, expected, extra_keys, extra, primary=None, check_fail=False
):
isctest.log.info("remove dnskey record:")
if primary is None:
primary = server
if check_fail:
_check_remove_zsk_fail(
server, zone, keys, expected, extra_keys, extra, primary=primary
)
# Remove actual ZSK.
isctest.log.info(
f"- zone {zone} {server.identifier}: remove ZSK from other providers"
f"- zone {zone} {primary.identifier}: remove ZSK from other providers"
)
update_msg = dns.update.UpdateMessage(zone)
@ -191,7 +211,7 @@ def check_remove_zsk(server, zone, keys, expected, extra_keys, extra):
dnskey = zsk.dnskey().split()
rdata = " ".join(dnskey[4:])
update_msg.delete(f"{zone}.", "DNSKEY", rdata)
server.nsupdate(update_msg)
primary.nsupdate(update_msg)
# We should have only the KSK and ZSK from server.
isctest.log.info(
@ -209,11 +229,14 @@ def check_remove_zsk(server, zone, keys, expected, extra_keys, extra):
check_dnssec(server, zone, keys, expected)
def check_add_cdnskey(server, zone, keys, expected, extra_keys, extra):
def check_add_cdnskey(server, zone, keys, expected, extra_keys, extra, primary=None):
if primary is None:
primary = server
isctest.log.info("add cdnskey record:")
isctest.log.info(
f"- zone {zone} {server.identifier}: update zone with CDNSKEY from other providers"
f"- zone {zone} {primary.identifier}: update zone with CDNSKEY from other providers"
)
# Retrieve CDNSKEY records from the other providers.
@ -222,7 +245,7 @@ def check_add_cdnskey(server, zone, keys, expected, extra_keys, extra):
dnskey = ksk.dnskey().split()
rdata = " ".join(dnskey[4:])
update_msg.add(f"{zone}.", TTL, "CDNSKEY", rdata)
server.nsupdate(update_msg)
primary.nsupdate(update_msg)
# Now there should be two CDNSKEY records.
isctest.log.info(
@ -240,11 +263,14 @@ def check_add_cdnskey(server, zone, keys, expected, extra_keys, extra):
check_dnssec(server, zone, keys + extra_keys, expected + extra)
def check_remove_cdnskey(server, zone, keys, expected, extra_keys, extra):
isctest.log.info("remove cdnskey record:")
def _check_remove_cdnskey_fail(
server, zone, keys, expected, extra_keys, extra, primary=None
):
if primary is None:
primary = server
isctest.log.info(
f"- zone {zone} {server.identifier}: try to remove own CDNSKEY (should fail)"
f"- zone {zone} {primary.identifier}: try to remove own CDNSKEY (should fail)"
)
ksks = [k for k in keys if not k.is_ksk()]
@ -252,15 +278,15 @@ def check_remove_cdnskey(server, zone, keys, expected, extra_keys, extra):
rdata = " ".join(dnskey[4:])
update_msg = dns.update.UpdateMessage(zone)
update_msg.delete(f"{zone}.", "CDNSKEY", rdata)
with server.watch_log_from_here() as watcher:
server.nsupdate(update_msg)
with primary.watch_log_from_here() as watcher:
primary.nsupdate(update_msg)
watcher.wait_for_line(
f"updating zone '{zone}/IN': attempt to delete in use CDNSKEY ignored"
)
# Both CDNSKEY records should still be published.
isctest.log.info(
f"- zone {zone} {server.identifier}: check CDNSKEY RRset after update remove"
f"- zone {zone} {server.identifier}: check CDNSKEY RRset after ignored remove"
)
check_dnssec(server, zone, keys + extra_keys, expected + extra)
@ -273,9 +299,23 @@ def check_remove_cdnskey(server, zone, keys, expected, extra_keys, extra):
isctest.log.info(f"- zone {zone} {server.identifier}: check again after keymgr run")
check_dnssec(server, zone, keys + extra_keys, expected + extra)
def check_remove_cdnskey(
server, zone, keys, expected, extra_keys, extra, primary=None, check_fail=False
):
isctest.log.info("remove cdnskey record:")
if primary is None:
primary = server
if check_fail:
_check_remove_cdnskey_fail(
server, zone, keys, expected, extra_keys, extra, primary=primary
)
# Remove actual CDNSKEY.
isctest.log.info(
f"- zone {zone} {server.identifier}: remove CDNSKEY from other providers"
f"- zone {zone} {primary.identifier}: remove CDNSKEY from other providers"
)
update_msg = dns.update.UpdateMessage(zone)
@ -283,7 +323,7 @@ def check_remove_cdnskey(server, zone, keys, expected, extra_keys, extra):
dnskey = ksk.dnskey().split()
rdata = " ".join(dnskey[4:])
update_msg.delete(f"{zone}.", "CDNSKEY", rdata)
server.nsupdate(update_msg)
primary.nsupdate(update_msg)
# Now there should be one CDNSKEY record again.
isctest.log.info(
@ -301,11 +341,14 @@ def check_remove_cdnskey(server, zone, keys, expected, extra_keys, extra):
check_dnssec(server, zone, keys, expected)
def check_add_cds(server, zone, keys, expected, extra_keys, extra):
def check_add_cds(server, zone, keys, expected, extra_keys, extra, primary=None):
isctest.log.info("add cds record:")
if primary is None:
primary = server
isctest.log.info(
f"- zone {zone} {server.identifier}: update zone with CDS from other providers"
f"- zone {zone} {primary.identifier}: update zone with CDS from other providers"
)
# Retrieve CDS records from the other providers.
@ -314,7 +357,7 @@ def check_add_cds(server, zone, keys, expected, extra_keys, extra):
ds = dsfromkey(ksk)
rdata = " ".join(ds[4:])
update_msg.add(f"{zone}.", TTL, "CDS", rdata)
server.nsupdate(update_msg)
primary.nsupdate(update_msg)
# Now there should be two CDS records.
isctest.log.info(
@ -332,11 +375,14 @@ def check_add_cds(server, zone, keys, expected, extra_keys, extra):
check_dnssec(server, zone, keys + extra_keys, expected + extra)
def check_remove_cds(server, zone, keys, expected, extra_keys, extra):
isctest.log.info("remove cds record:")
def _check_remove_cds_fail(
server, zone, keys, expected, extra_keys, extra, primary=None
):
if primary is None:
primary = server
isctest.log.info(
f"- zone {zone} {server.identifier}: try to remove own CDS (should fail)"
f"- zone {zone} {primary.identifier}: try to remove own CDS (should fail)"
)
ksks = [k for k in keys if not k.is_ksk()]
@ -344,15 +390,15 @@ def check_remove_cds(server, zone, keys, expected, extra_keys, extra):
rdata = " ".join(ds[4:])
update_msg = dns.update.UpdateMessage(zone)
update_msg.delete(f"{zone}.", "CDS", rdata)
with server.watch_log_from_here() as watcher:
server.nsupdate(update_msg)
with primary.watch_log_from_here() as watcher:
primary.nsupdate(update_msg)
watcher.wait_for_line(
f"updating zone '{zone}/IN': attempt to delete in use CDS ignored"
)
# Both CDS records should still be published.
isctest.log.info(
f"- zone {zone} {server.identifier}: check CDS RRset after update remove"
f"- zone {zone} {server.identifier}: check CDS RRset after ignored remove"
)
check_dnssec(server, zone, keys + extra_keys, expected + extra)
@ -365,9 +411,23 @@ def check_remove_cds(server, zone, keys, expected, extra_keys, extra):
isctest.log.info(f"- zone {zone} {server.identifier}: check again after keymgr run")
check_dnssec(server, zone, keys + extra_keys, expected + extra)
def check_remove_cds(
server, zone, keys, expected, extra_keys, extra, primary=None, check_fail=False
):
isctest.log.info("remove cds record:")
if primary is None:
primary = server
if check_fail:
_check_remove_cds_fail(
server, zone, keys, expected, extra_keys, extra, primary=primary
)
# Remove actual CDS.
isctest.log.info(
f"- zone {zone} {server.identifier}: remove CDS from other providers"
f"- zone {zone} {primary.identifier}: remove CDS from other providers"
)
update_msg = dns.update.UpdateMessage(zone)
@ -375,7 +435,7 @@ def check_remove_cds(server, zone, keys, expected, extra_keys, extra):
ds = dsfromkey(ksk)
rdata = " ".join(ds[4:])
update_msg.delete(f"{zone}.", "CDS", rdata)
server.nsupdate(update_msg)
primary.nsupdate(update_msg)
# Now there should be one CDS record again.
isctest.log.info(
@ -430,8 +490,8 @@ def test_multisigner(ns3, ns4):
check_no_dnssec_in_journal(ns4, zone)
# Remove DNSKEY from RRset.
check_remove_zsk(ns3, zone, keys3, expected3, [zsks4[0]], extra)
check_remove_zsk(ns4, zone, keys4, expected4, [zsks3[0]], extra)
check_remove_zsk(ns3, zone, keys3, expected3, [zsks4[0]], extra, check_fail=True)
check_remove_zsk(ns4, zone, keys4, expected4, [zsks3[0]], extra, check_fail=True)
check_no_dnssec_in_journal(ns4, zone)
# Add CDNSKEY RRset.
@ -445,8 +505,12 @@ def test_multisigner(ns3, ns4):
check_no_dnssec_in_journal(ns4, zone)
# Remove CDNSKEY RRset.
check_remove_cdnskey(ns3, zone, keys3, expected3, [ksks4[0]], extra)
check_remove_cdnskey(ns4, zone, keys4, expected4, [ksks3[0]], extra)
check_remove_cdnskey(
ns3, zone, keys3, expected3, [ksks4[0]], extra, check_fail=True
)
check_remove_cdnskey(
ns4, zone, keys4, expected4, [ksks3[0]], extra, check_fail=True
)
check_no_dnssec_in_journal(ns4, zone)
# Update CDS RRset.
@ -455,6 +519,6 @@ def test_multisigner(ns3, ns4):
check_no_dnssec_in_journal(ns4, zone)
# Remove CDS RRset.
check_remove_cds(ns3, zone, keys3, expected3, [ksks4[0]], extra)
check_remove_cds(ns4, zone, keys4, expected4, [ksks3[0]], extra)
check_remove_cds(ns3, zone, keys3, expected3, [ksks4[0]], extra, check_fail=True)
check_remove_cds(ns4, zone, keys4, expected4, [ksks3[0]], extra, check_fail=True)
check_no_dnssec_in_journal(ns4, zone)