[9.20] fix: test: Add wait_for_keymgr_done() util function to tests

The kasp test cases assume that keymgr operations on the zone under test
have been completed before the test is executed. These are typically
quite fast, but the logs need to be explicitly checked for the messages,
otherwise there's a possibility of race conditions causing the
kasp/rollover tests to become unstable.
    
Call the wait function in all the kasp/rollover tests where it is
expected (which is generally in each test, unless we're dealing with
unsigned zones).

Closes #5371

Backport of MR !10717

Merge branch 'backport-5371-wait-keymgr-done-rollover-kasp-tests-9.20' into 'bind-9.20'

See merge request isc-projects/bind9!10752
This commit is contained in:
Nicki Křížek 2025-07-18 16:54:58 +02:00
commit d4fb3a060b
35 changed files with 682 additions and 409 deletions

View file

@ -59,13 +59,12 @@ def check_if_server_is_responsive(ns3):
return False
def test_rndc_deadlock(servers):
def test_rndc_deadlock(ns3):
"""
Test whether running "rndc addzone", "rndc modzone", and "rndc delzone"
commands concurrently does not trigger a deadlock
"""
test_state = {"finished": False}
ns3 = servers["ns3"]
# Create 4 worker threads running "rndc" commands in a loop.
with concurrent.futures.ThreadPoolExecutor() as executor:

View file

@ -462,16 +462,16 @@ checkds_tests = (
@pytest.mark.parametrize("params", checkds_tests, ids=lambda t: t.zone)
def test_checkds(servers, params):
def test_checkds(ns2, ns9, params):
# Wait until the provided zone is signed and then verify its DNSSEC data.
zone_check(servers["ns9"], params.zone)
zone_check(ns9, params.zone)
# Wait up to 10 seconds until all the expected log lines are found in the
# log file for the provided server. Rekey every second if necessary.
time_remaining = 10
for log_string in params.logs_to_wait_for:
line = f"zone {params.zone}/IN (signed): checkds: {log_string}"
while line not in servers["ns9"].log:
while line not in ns9.log:
rekey(params.zone)
time_remaining -= 1
assert time_remaining, f'Timed out waiting for "{log_string}" to be logged'
@ -479,4 +479,4 @@ def test_checkds(servers, params):
# Check whether key states on the parent server provided match
# expectations.
keystate_check(servers["ns2"], params.zone, params.expected_parent_state)
keystate_check(ns2, params.zone, params.expected_parent_state)

View file

@ -634,3 +634,53 @@ def servers(system_test_dir):
except ValueError:
continue
return instances
@pytest.fixture(scope="module")
def ns1(servers):
return servers["ns1"]
@pytest.fixture(scope="module")
def ns2(servers):
return servers["ns2"]
@pytest.fixture(scope="module")
def ns3(servers):
return servers["ns3"]
@pytest.fixture(scope="module")
def ns4(servers):
return servers["ns4"]
@pytest.fixture(scope="module")
def ns5(servers):
return servers["ns5"]
@pytest.fixture(scope="module")
def ns6(servers):
return servers["ns6"]
@pytest.fixture(scope="module")
def ns7(servers):
return servers["ns7"]
@pytest.fixture(scope="module")
def ns8(servers):
return servers["ns8"]
@pytest.fixture(scope="module")
def ns9(servers):
return servers["ns9"]
@pytest.fixture(scope="module")
def ns10(servers):
return servers["ns10"]

View file

@ -14,7 +14,7 @@ import isctest
import dns.message
def test_database(servers, templates):
def test_database(ns1, templates):
msg = dns.message.make_query("database.", "SOA")
# checking pre reload zone
@ -28,8 +28,8 @@ def test_database(servers, templates):
)
templates.render("ns1/named.conf", {"rname": "marka.isc.org."})
with servers["ns1"].watch_log_from_here() as watcher:
servers["ns1"].rndc("reload")
with ns1.watch_log_from_here() as watcher:
ns1.rndc("reload")
watcher.wait_for_line("all zones loaded")
# checking post reload zone

View file

@ -14,10 +14,11 @@ import dns.message
import isctest
def test_emptyzones(servers, templates):
def test_emptyzones(ns1, templates):
# check that switching to automatic empty zones works
ns1 = servers["ns1"]
ns1.rndc("reload")
with ns1.watch_log_from_here() as watcher:
ns1.rndc("reload")
watcher.wait_for_line("all zones loaded")
templates.render("ns1/named.conf", {"automatic_empty_zones": True})
ns1.rndc("reload")
msg = dns.message.make_query("version.bind", "TXT", "CH")

View file

@ -9,6 +9,7 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from datetime import datetime, timedelta, timezone
from functools import total_ordering
import glob
import os
@ -18,10 +19,10 @@ import subprocess
import time
from typing import Dict, List, Optional, Tuple, Union
from datetime import datetime, timedelta, timezone
import dns
import dns.tsig
from isctest.instance import NamedInstance
import isctest.log
import isctest.query
import isctest.util
@ -1520,3 +1521,15 @@ def policy_to_properties(ttl, keys: List[str]) -> List[KeyProperties]:
proplist.append(keyprop)
return proplist
def wait_keymgr_done(server: NamedInstance, zone: str, reconfig: bool = False) -> None:
"""
Block and wait until the keymgr is done processing zone.
"""
messages = []
if reconfig:
messages.append("received control channel command 'reconfig'")
messages.append(f"keymgr: {zone} done")
with server.watch_log_from_start() as watcher:
watcher.wait_for_sequence(messages)

View file

@ -633,16 +633,17 @@ def cb_remove_keyfiles(params, ksks=None, zsks=None):
),
],
)
def test_kasp_case(servers, params):
def test_kasp_case(servers, ns3, params):
# Test many different configurations and expected keys and states after
# initial startup.
server = servers["ns3"]
keydir = server.identifier
keydir = ns3.identifier
# Get test parameters.
zone = params["zone"]
policy = params["policy"]
isctest.kasp.wait_keymgr_done(ns3, zone)
params["config"]["key-directory"] = params["config"]["key-directory"].replace(
"{keydir}", keydir
)
@ -660,7 +661,7 @@ def test_kasp_case(servers, params):
isctest.log.info(f"check test case zone {zone} policy {policy}")
# First make sure the zone is signed.
isctest.kasp.check_dnssec_verify(server, zone)
isctest.kasp.check_dnssec_verify(ns3, zone)
# Key properties.
expected = isctest.kasp.policy_to_properties(ttl=ttl, keys=params["key-properties"])
@ -678,7 +679,7 @@ def test_kasp_case(servers, params):
ksks = [k for k in keys if k.is_ksk()]
zsks = [k for k in keys if not k.is_ksk()]
isctest.kasp.check_dnssec_verify(server, zone)
isctest.kasp.check_dnssec_verify(ns3, zone)
isctest.kasp.check_keys(zone, keys, expected)
offset = params["offset"] if "offset" in params else None
@ -691,7 +692,7 @@ def test_kasp_case(servers, params):
if "rumoured" not in params:
isctest.kasp.check_keytimes(keys, expected)
check_all(server, zone, policy, ksks, zsks, zsk_missing=zsk_missing)
check_all(ns3, zone, policy, ksks, zsks, zsk_missing=zsk_missing)
if "additional-tests" in params:
params["servers"] = servers
@ -754,6 +755,8 @@ def test_kasp_inherit_signed(zone, policy, server_id, alg, tsig_kind, servers):
else None
)
isctest.kasp.wait_keymgr_done(server, zone)
key1 = KeyProperties.default()
key1.metadata["Algorithm"] = alg.number
key1.metadata["Length"] = alg.bits
@ -774,26 +777,27 @@ def test_kasp_inherit_signed(zone, policy, server_id, alg, tsig_kind, servers):
param("3", "no", "yes", "view2"),
],
)
def test_kasp_inherit_view(number, dynamic, inline_signing, txt_rdata, servers):
def test_kasp_inherit_view(number, dynamic, inline_signing, txt_rdata, ns4):
zone = "example.net"
policy = "test"
server = servers["ns4"]
view = f"example{number}"
tsig = f"{os.environ['DEFAULT_HMAC']}:keyforview{number}:{KASP_INHERIT_TSIG_SECRET[f'view{number}']}"
isctest.kasp.wait_keymgr_done(ns4, zone)
key1 = KeyProperties.default()
key1.metadata["Algorithm"] = ECDSAP384SHA384.number
key1.metadata["Length"] = ECDSAP384SHA384.bits
keys = isctest.kasp.keydir_to_keylist(zone, server.identifier)
keys = isctest.kasp.keydir_to_keylist(zone, ns4.identifier)
isctest.kasp.check_dnssec_verify(server, zone, tsig=tsig)
isctest.kasp.check_dnssec_verify(ns4, zone, tsig=tsig)
isctest.kasp.check_keys(zone, keys, [key1])
set_keytimes_default_policy(key1)
isctest.kasp.check_keytimes(keys, [key1])
isctest.kasp.check_dnssecstatus(server, zone, keys, policy=policy, view=view)
isctest.kasp.check_apex(server, zone, keys, [], tsig=tsig)
isctest.kasp.check_dnssecstatus(ns4, zone, keys, policy=policy, view=view)
isctest.kasp.check_apex(ns4, zone, keys, [], tsig=tsig)
# check zonestatus
response = server.rndc(f"zonestatus {zone} in {view}", log=False)
response = ns4.rndc(f"zonestatus {zone} in {view}", log=False)
assert f"dynamic: {dynamic}" in response
assert f"inline signing: {inline_signing}" in response
# check subdomain
@ -806,9 +810,9 @@ def test_kasp_inherit_view(number, dynamic, inline_signing, txt_rdata, servers):
keyring = dns.tsig.Key(tsigkey[1], tsigkey[2], tsigkey[0])
query.use_tsig(keyring)
try:
response = isctest.query.tcp(query, server.ip, server.ports.dns, timeout=3)
response = isctest.query.tcp(query, ns4.ip, ns4.ports.dns, timeout=3)
except dns.exception.Timeout:
isctest.log.debug(f"query timeout for query {qname} {qtype} to {server.ip}")
isctest.log.debug(f"query timeout for query {qname} {qtype} to {ns4.ip}")
response = None
assert response.rcode() == dns.rcode.NOERROR
match = f'{qname} 300 IN TXT "{rdata}"'
@ -824,14 +828,14 @@ def test_kasp_inherit_view(number, dynamic, inline_signing, txt_rdata, servers):
isctest.kasp.check_signatures(rrsigs, qtype, fqdn, keys, [])
def test_kasp_default(servers):
server = servers["ns3"]
def test_kasp_default(ns3):
# check the zone with default kasp policy has loaded and is signed.
isctest.log.info("check a zone with the default policy is signed")
zone = "default.kasp"
policy = "default"
isctest.kasp.wait_keymgr_done(ns3, zone)
# Key properties.
# DNSKEY, RRSIG (ksk), RRSIG (zsk) are published. DS needs to wait.
keyprops = [
@ -839,11 +843,11 @@ def test_kasp_default(servers):
]
expected = isctest.kasp.policy_to_properties(ttl=3600, keys=keyprops)
keys = isctest.kasp.keydir_to_keylist(zone, "ns3")
isctest.kasp.check_dnssec_verify(server, zone)
isctest.kasp.check_dnssec_verify(ns3, zone)
isctest.kasp.check_keys(zone, keys, expected)
set_keytimes_default_policy(expected[0])
isctest.kasp.check_keytimes(keys, expected)
check_all(server, zone, policy, keys, [])
check_all(ns3, zone, policy, keys, [])
# Trigger a keymgr run. Make sure the key files are not touched if there
# are no modifications to the key metadata.
@ -855,8 +859,8 @@ def test_kasp_default(servers):
pubkey_stat = os.stat(key.keyfile)
state_stat = os.stat(key.statefile)
with server.watch_log_from_here() as watcher:
server.rndc(f"loadkeys {zone}", log=False)
with ns3.watch_log_from_here() as watcher:
ns3.rndc(f"loadkeys {zone}", log=False)
watcher.wait_for_line(f"keymgr: {zone} done")
assert privkey_stat.st_mtime == os.stat(key.privatefile).st_mtime
@ -864,8 +868,8 @@ def test_kasp_default(servers):
assert state_stat.st_mtime == os.stat(key.statefile).st_mtime
# again
with server.watch_log_from_here() as watcher:
server.rndc(f"loadkeys {zone}", log=False)
with ns3.watch_log_from_here() as watcher:
ns3.rndc(f"loadkeys {zone}", log=False)
watcher.wait_for_line(f"keymgr: {zone} done")
assert privkey_stat.st_mtime == os.stat(key.privatefile).st_mtime
@ -875,7 +879,7 @@ def test_kasp_default(servers):
# modify unsigned zone file and check that new record is signed.
isctest.log.info("check that an updated zone signs the new record")
shutil.copyfile("ns3/template2.db.in", f"ns3/{zone}.db")
server.rndc(f"reload {zone}", log=False)
ns3.rndc(f"reload {zone}", log=False)
def update_is_signed():
parts = update.split()
@ -883,7 +887,7 @@ def test_kasp_default(servers):
qtype = dns.rdatatype.from_text(parts[1])
rdata = parts[2]
return isctest.kasp.verify_update_is_signed(
server, zone, qname, qtype, rdata, keys, []
ns3, zone, qname, qtype, rdata, keys, []
)
expected_updates = [f"a.{zone}. A 10.0.0.11", f"d.{zone}. A 10.0.0.44"]
@ -895,48 +899,51 @@ def test_kasp_default(servers):
isctest.log.info("check that missing private key doesn't trigger rollover")
shutil.move(f"{key.privatefile}", f"{key.path}.offline")
expectmsg = "zone_rekey:zone_verifykeys failed: some key files are missing"
with server.watch_log_from_here() as watcher:
server.rndc(f"loadkeys {zone}", log=False)
with ns3.watch_log_from_here() as watcher:
ns3.rndc(f"loadkeys {zone}", log=False)
watcher.wait_for_line(f"zone {zone}/IN (signed): {expectmsg}")
# Nothing has changed.
expected[0].properties["private"] = False
isctest.kasp.check_dnssec_verify(server, zone)
isctest.kasp.check_dnssec_verify(ns3, zone)
isctest.kasp.check_keys(zone, keys, expected)
isctest.kasp.check_keytimes(keys, expected)
check_all(server, zone, policy, keys, [])
check_all(ns3, zone, policy, keys, [])
# A zone that uses inline-signing.
isctest.log.info("check an inline-signed zone with the default policy is signed")
zone = "inline-signing.kasp"
isctest.kasp.wait_keymgr_done(ns3, zone)
# Key properties.
key1 = KeyProperties.default()
keys = isctest.kasp.keydir_to_keylist(zone, "ns3")
expected = [key1]
isctest.kasp.check_dnssec_verify(server, zone)
isctest.kasp.check_dnssec_verify(ns3, zone)
isctest.kasp.check_keys(zone, keys, expected)
set_keytimes_default_policy(key1)
isctest.kasp.check_keytimes(keys, expected)
check_all(server, zone, policy, keys, [])
check_all(ns3, zone, policy, keys, [])
def test_kasp_dynamic(servers):
# Dynamic update test cases.
server = servers["ns3"]
def test_kasp_dynamic(ns3):
# Standard dynamic zone.
isctest.log.info("check dynamic zone is updated and signed after update")
zone = "dynamic.kasp"
policy = "default"
isctest.kasp.wait_keymgr_done(ns3, zone)
# Key properties.
key1 = KeyProperties.default()
expected = [key1]
keys = isctest.kasp.keydir_to_keylist(zone, "ns3")
isctest.kasp.check_dnssec_verify(server, zone)
isctest.kasp.check_dnssec_verify(ns3, zone)
isctest.kasp.check_keys(zone, keys, expected)
set_keytimes_default_policy(key1)
expected = [key1]
isctest.kasp.check_keytimes(keys, expected)
check_all(server, zone, policy, keys, [])
check_all(ns3, zone, policy, keys, [])
def update_is_signed():
parts = update.split()
@ -944,14 +951,14 @@ def test_kasp_dynamic(servers):
qtype = dns.rdatatype.from_text(parts[1])
rdata = parts[2]
return isctest.kasp.verify_update_is_signed(
server, zone, qname, qtype, rdata, keys, []
ns3, zone, qname, qtype, rdata, keys, []
)
update_msg = dns.update.UpdateMessage(zone)
update_msg.delete(f"a.{zone}.", "A", "10.0.0.1")
update_msg.add(f"a.{zone}.", 300, "A", "10.0.0.101")
update_msg.add(f"d.{zone}.", 300, "A", "10.0.0.4")
server.nsupdate(update_msg)
ns3.nsupdate(update_msg)
expected_updates = [f"a.{zone}. A 10.0.0.101", f"d.{zone}. A 10.0.0.4"]
for update in expected_updates:
@ -962,15 +969,15 @@ def test_kasp_dynamic(servers):
update_msg.add(f"a.{zone}.", 300, "A", "10.0.0.1")
update_msg.delete(f"a.{zone}.", "A", "10.0.0.101")
update_msg.delete(f"d.{zone}.", "A", "10.0.0.4")
server.nsupdate(update_msg)
ns3.nsupdate(update_msg)
update = f"a.{zone}. A 10.0.0.1"
isctest.run.retry_with_timeout(update_is_signed, timeout=5)
# Update zone with freeze/thaw.
isctest.log.info("check dynamic zone is updated and signed after freeze and thaw")
with server.watch_log_from_here() as watcher:
server.rndc(f"freeze {zone}", log=False)
with ns3.watch_log_from_here() as watcher:
ns3.rndc(f"freeze {zone}", log=False)
watcher.wait_for_line(f"freezing zone '{zone}/IN': success")
time.sleep(1)
@ -978,8 +985,8 @@ def test_kasp_dynamic(servers):
zonefile.write(f"d.{zone}. 300 A 10.0.0.44\n")
time.sleep(1)
with server.watch_log_from_here() as watcher:
server.rndc(f"thaw {zone}", log=False)
with ns3.watch_log_from_here() as watcher:
ns3.rndc(f"thaw {zone}", log=False)
watcher.wait_for_line(f"thawing zone '{zone}/IN': success")
expected_updates = [f"a.{zone}. A 10.0.0.1", f"d.{zone}. A 10.0.0.44"]
@ -989,31 +996,34 @@ def test_kasp_dynamic(servers):
# Dynamic, and inline-signing.
zone = "dynamic-inline-signing.kasp"
isctest.kasp.wait_keymgr_done(ns3, zone)
# Key properties.
key1 = KeyProperties.default()
expected = [key1]
keys = isctest.kasp.keydir_to_keylist(zone, "ns3")
isctest.kasp.check_dnssec_verify(server, zone)
isctest.kasp.check_dnssec_verify(ns3, zone)
isctest.kasp.check_keys(zone, keys, expected)
set_keytimes_default_policy(key1)
expected = [key1]
isctest.kasp.check_keytimes(keys, expected)
check_all(server, zone, policy, keys, [])
check_all(ns3, zone, policy, keys, [])
# Update zone with freeze/thaw.
isctest.log.info(
"check dynamic inline-signed zone is updated and signed after freeze and thaw"
)
with server.watch_log_from_here() as watcher:
server.rndc(f"freeze {zone}", log=False)
with ns3.watch_log_from_here() as watcher:
ns3.rndc(f"freeze {zone}", log=False)
watcher.wait_for_line(f"freezing zone '{zone}/IN': success")
time.sleep(1)
shutil.copyfile("ns3/template2.db.in", f"ns3/{zone}.db")
time.sleep(1)
with server.watch_log_from_here() as watcher:
server.rndc(f"thaw {zone}", log=False)
with ns3.watch_log_from_here() as watcher:
ns3.rndc(f"thaw {zone}", log=False)
watcher.wait_for_line(f"thawing zone '{zone}/IN': success")
expected_updates = [f"a.{zone}. A 10.0.0.11", f"d.{zone}. A 10.0.0.44"]
@ -1023,6 +1033,9 @@ def test_kasp_dynamic(servers):
# Dynamic, signed, and inline-signing.
isctest.log.info("check dynamic signed, and inline-signed zone")
zone = "dynamic-signed-inline-signing.kasp"
isctest.kasp.wait_keymgr_done(ns3, zone)
# Key properties.
key1 = KeyProperties.default()
# The ns3/setup.sh script sets all states to omnipresent.
@ -1032,16 +1045,14 @@ def test_kasp_dynamic(servers):
key1.metadata["DSState"] = "omnipresent"
expected = [key1]
keys = isctest.kasp.keydir_to_keylist(zone, "ns3/keys")
isctest.kasp.check_dnssec_verify(server, zone)
isctest.kasp.check_dnssec_verify(ns3, zone)
isctest.kasp.check_keys(zone, keys, expected)
check_all(server, zone, policy, keys, [])
check_all(ns3, zone, policy, keys, [])
# Ensure no zone_resigninc for the unsigned version of the zone is triggered.
assert f"zone_resigninc: zone {zone}/IN (unsigned): enter" not in "ns3/named.run"
def test_kasp_checkds(servers):
server = servers["ns3"]
def test_kasp_checkds(ns3):
def wait_for_metadata():
return isctest.util.file_contents_contain(ksk.statefile, metadata)
@ -1054,19 +1065,22 @@ def test_kasp_checkds(servers):
f"ksk unlimited {alg} {size} goal:omnipresent dnskey:rumoured krrsig:rumoured ds:hidden",
f"zsk unlimited {alg} {size} goal:omnipresent dnskey:rumoured zrrsig:rumoured",
]
isctest.kasp.wait_keymgr_done(ns3, zone)
expected = isctest.kasp.policy_to_properties(ttl=303, keys=policy_keys)
keys = isctest.kasp.keydir_to_keylist(zone, "ns3")
ksks = [k for k in keys if k.is_ksk()]
zsks = [k for k in keys if k.is_zsk()]
isctest.kasp.check_dnssec_verify(server, zone)
isctest.kasp.check_dnssec_verify(ns3, zone)
isctest.kasp.check_keys(zone, keys, expected)
check_all(server, zone, policy, ksks, zsks)
check_all(ns3, zone, policy, ksks, zsks)
now = KeyTimingMetadata.now()
ksk = ksks[0]
isctest.log.info("check if checkds -publish correctly sets DSPublish")
server.rndc(f"dnssec -checkds -when {now} published {zone}", log=False)
ns3.rndc(f"dnssec -checkds -when {now} published {zone}", log=False)
metadata = f"DSPublish: {now}"
isctest.run.retry_with_timeout(wait_for_metadata, timeout=3)
expected[0].metadata["DSState"] = "rumoured"
@ -1074,7 +1088,7 @@ def test_kasp_checkds(servers):
isctest.kasp.check_keys(zone, keys, expected)
isctest.log.info("check if checkds -withdrawn correctly sets DSRemoved")
server.rndc(f"dnssec -checkds -when {now} withdrawn {zone}", log=False)
ns3.rndc(f"dnssec -checkds -when {now} withdrawn {zone}", log=False)
metadata = f"DSRemoved: {now}"
isctest.run.retry_with_timeout(wait_for_metadata, timeout=3)
expected[0].metadata["DSState"] = "unretentive"
@ -1082,9 +1096,7 @@ def test_kasp_checkds(servers):
isctest.kasp.check_keys(zone, keys, expected)
def test_kasp_checkds_doubleksk(servers):
server = servers["ns3"]
def test_kasp_checkds_doubleksk(ns3):
def wait_for_metadata():
return isctest.util.file_contents_contain(ksk.statefile, metadata)
@ -1098,13 +1110,16 @@ def test_kasp_checkds_doubleksk(servers):
f"ksk unlimited {alg} {size} goal:omnipresent dnskey:rumoured krrsig:rumoured ds:hidden",
f"zsk unlimited {alg} {size} goal:omnipresent dnskey:rumoured zrrsig:rumoured",
]
isctest.kasp.wait_keymgr_done(ns3, zone)
expected = isctest.kasp.policy_to_properties(ttl=303, keys=policy_keys)
keys = isctest.kasp.keydir_to_keylist(zone, "ns3")
ksks = [k for k in keys if k.is_ksk()]
zsks = [k for k in keys if k.is_zsk()]
isctest.kasp.check_dnssec_verify(server, zone)
isctest.kasp.check_dnssec_verify(ns3, zone)
isctest.kasp.check_keys(zone, keys, expected)
check_all(server, zone, policy, ksks, zsks)
check_all(ns3, zone, policy, ksks, zsks)
now = KeyTimingMetadata.now()
ksk = ksks[0]
@ -1113,7 +1128,7 @@ def test_kasp_checkds_doubleksk(servers):
isctest.log.info("check invalid checkds commands")
def check_error():
response = server.rndc(test["command"], log=False)
response = ns3.rndc(test["command"], log=False)
assert test["error"] in response
test_cases = [
@ -1138,9 +1153,7 @@ def test_kasp_checkds_doubleksk(servers):
check_error()
isctest.log.info("check if checkds -publish -key correctly sets DSPublish")
server.rndc(
f"dnssec -checkds -when {now} -key {ksk.tag} published {zone}", log=False
)
ns3.rndc(f"dnssec -checkds -when {now} -key {ksk.tag} published {zone}", log=False)
metadata = f"DSPublish: {now}"
isctest.run.retry_with_timeout(wait_for_metadata, timeout=3)
expected[0].metadata["DSState"] = "rumoured"
@ -1149,9 +1162,7 @@ def test_kasp_checkds_doubleksk(servers):
isctest.log.info("check if checkds -withdrawn -key correctly sets DSRemoved")
ksk = ksks[1]
server.rndc(
f"dnssec -checkds -when {now} -key {ksk.tag} withdrawn {zone}", log=False
)
ns3.rndc(f"dnssec -checkds -when {now} -key {ksk.tag} withdrawn {zone}", log=False)
metadata = f"DSRemoved: {now}"
isctest.run.retry_with_timeout(wait_for_metadata, timeout=3)
expected[1].metadata["DSState"] = "unretentive"
@ -1159,9 +1170,7 @@ def test_kasp_checkds_doubleksk(servers):
isctest.kasp.check_keys(zone, keys, expected)
def test_kasp_checkds_csk(servers):
server = servers["ns3"]
def test_kasp_checkds_csk(ns3):
def wait_for_metadata():
return isctest.util.file_contents_contain(ksk.statefile, metadata)
@ -1173,17 +1182,20 @@ def test_kasp_checkds_csk(servers):
policy_keys = [
f"csk unlimited {alg} {size} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
]
isctest.kasp.wait_keymgr_done(ns3, zone)
expected = isctest.kasp.policy_to_properties(ttl=303, keys=policy_keys)
keys = isctest.kasp.keydir_to_keylist(zone, "ns3")
isctest.kasp.check_dnssec_verify(server, zone)
isctest.kasp.check_dnssec_verify(ns3, zone)
isctest.kasp.check_keys(zone, keys, expected)
check_all(server, zone, policy, keys, [])
check_all(ns3, zone, policy, keys, [])
now = KeyTimingMetadata.now()
ksk = keys[0]
isctest.log.info("check if checkds -publish csk correctly sets DSPublish")
server.rndc(f"dnssec -checkds -when {now} published {zone}", log=False)
ns3.rndc(f"dnssec -checkds -when {now} published {zone}", log=False)
metadata = f"DSPublish: {now}"
isctest.run.retry_with_timeout(wait_for_metadata, timeout=3)
expected[0].metadata["DSState"] = "rumoured"
@ -1191,7 +1203,7 @@ def test_kasp_checkds_csk(servers):
isctest.kasp.check_keys(zone, keys, expected)
isctest.log.info("check if checkds -withdrawn csk correctly sets DSRemoved")
server.rndc(f"dnssec -checkds -when {now} withdrawn {zone}", log=False)
ns3.rndc(f"dnssec -checkds -when {now} withdrawn {zone}", log=False)
metadata = f"DSRemoved: {now}"
isctest.run.retry_with_timeout(wait_for_metadata, timeout=3)
expected[0].metadata["DSState"] = "unretentive"
@ -1199,50 +1211,50 @@ def test_kasp_checkds_csk(servers):
isctest.kasp.check_keys(zone, keys, expected)
def test_kasp_special_characters(servers):
server = servers["ns3"]
def test_kasp_special_characters(ns3):
# A zone with special characters.
isctest.log.info("check special characters")
zone = r'i-am.":\;?&[]\@!\$*+,|=\.\(\)special.kasp'
zone = r"i-am.\":\;?&[]\@!\$*+,|=\.\(\)special.kasp"
isctest.kasp.wait_keymgr_done(ns3, zone)
# It is non-trivial to adapt the tests to deal with all possible different
# escaping characters, so we will just try to verify the zone.
isctest.kasp.check_dnssec_verify(server, zone)
isctest.kasp.check_dnssec_verify(ns3, zone)
def test_kasp_insecure(servers):
server = servers["ns3"]
def test_kasp_insecure(ns3):
# Insecure zones.
isctest.log.info("check insecure zones")
zone = "insecure.kasp"
isctest.kasp.wait_keymgr_done(ns3, zone)
expected = []
keys = isctest.kasp.keydir_to_keylist(zone, "ns3")
isctest.kasp.check_keys(zone, keys, expected)
isctest.kasp.check_dnssecstatus(server, zone, keys, policy="insecure")
isctest.kasp.check_apex(server, zone, keys, [])
isctest.kasp.check_subdomain(server, zone, keys, [])
isctest.kasp.check_dnssecstatus(ns3, zone, keys, policy="insecure")
isctest.kasp.check_apex(ns3, zone, keys, [])
isctest.kasp.check_subdomain(ns3, zone, keys, [])
zone = "unsigned.kasp"
expected = []
keys = isctest.kasp.keydir_to_keylist(zone, "ns3")
isctest.kasp.check_keys(zone, keys, expected)
isctest.kasp.check_dnssecstatus(server, zone, keys, policy=None)
isctest.kasp.check_apex(server, zone, keys, [])
isctest.kasp.check_subdomain(server, zone, keys, [])
isctest.kasp.check_dnssecstatus(ns3, zone, keys, policy=None)
isctest.kasp.check_apex(ns3, zone, keys, [])
isctest.kasp.check_subdomain(ns3, zone, keys, [])
# Make sure the zone file is untouched.
isctest.check.file_contents_equal(f"ns3/{zone}.db.infile", f"ns3/{zone}.db")
def test_kasp_bad_maxzonettl(servers):
server = servers["ns3"]
def test_kasp_bad_maxzonettl(ns3):
# check that max-zone-ttl rejects zones with too high TTL.
isctest.log.info("check max-zone-ttl rejects zones with too high TTL")
zone = "max-zone-ttl.kasp"
assert f"loading from master file {zone}.db failed: out of range" in server.log
assert f"loading from master file {zone}.db failed: out of range" in ns3.log
def test_kasp_dnssec_keygen():
@ -1431,9 +1443,7 @@ def test_kasp_dnssec_keygen():
isctest.kasp.check_keytimes(keys, expected)
def test_kasp_zsk_retired(servers):
server = servers["ns3"]
def test_kasp_zsk_retired(ns3):
config = {
"dnskey-ttl": timedelta(seconds=300),
"ds-ttl": timedelta(days=1),
@ -1457,11 +1467,14 @@ def test_kasp_zsk_retired(servers):
# zsk successor
f"zsk 31536000 {alg} {size} goal:omnipresent dnskey:rumoured zrrsig:hidden",
]
isctest.kasp.wait_keymgr_done(ns3, zone)
expected = isctest.kasp.policy_to_properties(300, key_properties)
keys = isctest.kasp.keydir_to_keylist(zone, "ns3")
ksks = [k for k in keys if k.is_ksk()]
zsks = [k for k in keys if not k.is_ksk()]
isctest.kasp.check_dnssec_verify(server, zone)
isctest.kasp.check_dnssec_verify(ns3, zone)
isctest.kasp.check_keys(zone, keys, expected)
offset = -timedelta(days=30 * 6)
@ -1529,7 +1542,7 @@ def test_kasp_zsk_retired(servers):
expected[2].timing["ZRRSIGChange"] = None
isctest.kasp.check_keytimes(keys, expected)
check_all(server, zone, policy, ksks, zsks)
check_all(ns3, zone, policy, ksks, zsks)
queries = [
f"{zone} DNSKEY",
@ -1551,25 +1564,23 @@ def test_kasp_zsk_retired(servers):
qname = parts[0]
qtype = dns.rdatatype.from_text(parts[1])
return isctest.kasp.verify_rrsig_is_refreshed(
server, zone, f"ns3/{zone}.db.signed", qname, qtype, ksks, zsks
ns3, zone, f"ns3/{zone}.db.signed", qname, qtype, ksks, zsks
)
for query in queries:
isctest.run.retry_with_timeout(rrsig_is_refreshed, timeout=5)
# Load again, make sure the purged key is not an issue when verifying keys.
with server.watch_log_from_here() as watcher:
server.rndc(f"loadkeys {zone}", log=False)
with ns3.watch_log_from_here() as watcher:
ns3.rndc(f"loadkeys {zone}", log=False)
watcher.wait_for_line(f"keymgr: {zone} done")
msg = f"zone {zone}/IN (signed): zone_rekey:zone_verifykeys failed: some key files are missing"
server.log.prohibit(msg)
ns3.log.prohibit(msg)
def test_kasp_purge_keys(servers):
def test_kasp_purge_keys(ns4):
zone = "purgekeys.kasp"
server = servers["ns4"]
tsig1 = (
f"{os.environ['DEFAULT_HMAC']}:keyforview1:{KASP_INHERIT_TSIG_SECRET['view1']}"
)
@ -1577,24 +1588,25 @@ def test_kasp_purge_keys(servers):
f"{os.environ['DEFAULT_HMAC']}:keyforview2:{KASP_INHERIT_TSIG_SECRET['view2']}"
)
isctest.kasp.check_dnssec_verify(server, zone, tsig=tsig1)
isctest.kasp.check_dnssec_verify(server, zone, tsig=tsig2)
isctest.kasp.wait_keymgr_done(ns4, zone)
isctest.kasp.check_dnssec_verify(ns4, zone, tsig=tsig1)
isctest.kasp.check_dnssec_verify(ns4, zone, tsig=tsig2)
# Reconfig, make sure the purged key is not an issue when verifying keys.
shutil.copyfile("ns4/purgekeys2.conf", "ns4/purgekeys.conf")
with server.watch_log_from_here() as watcher:
server.rndc("reconfig", log=False)
with ns4.watch_log_from_here() as watcher:
ns4.rndc("reconfig", log=False)
watcher.wait_for_line(f"keymgr: {zone} done")
msg = f"zone {zone}/IN/example1 (signed): zone_rekey:zone_verifykeys failed: some key files are missing"
server.log.prohibit(msg)
ns4.log.prohibit(msg)
msg = f"zone {zone}/IN/example2 (signed): zone_rekey:zone_verifykeys failed: some key files are missing"
server.log.prohibit(msg)
ns4.log.prohibit(msg)
def test_kasp_reload_restart(servers):
server = servers["ns6"]
def test_kasp_reload_restart(ns6):
zone = "example"
def query_soa(qname):
@ -1602,9 +1614,9 @@ def test_kasp_reload_restart(servers):
qtype = dns.rdatatype.SOA
query = dns.message.make_query(fqdn, qtype, use_edns=True, want_dnssec=True)
try:
response = isctest.query.tcp(query, server.ip, server.ports.dns, timeout=3)
response = isctest.query.tcp(query, ns6.ip, ns6.ports.dns, timeout=3)
except dns.exception.Timeout:
isctest.log.debug(f"query timeout for query {qname} SOA to {server.ip}")
isctest.log.debug(f"query timeout for query {qname} SOA to {ns6.ip}")
return 0, 0
assert response.rcode() == dns.rcode.NOERROR
@ -1629,8 +1641,8 @@ def test_kasp_reload_restart(servers):
assert ttl1 == 300
shutil.copyfile(f"ns6/{zone}2.db.in", f"ns6/{zone}.db")
with server.watch_log_from_here() as watcher:
server.rndc("reload", log=False)
with ns6.watch_log_from_here() as watcher:
ns6.rndc("reload", log=False)
watcher.wait_for_line("all zones loaded")
newttl = 300
@ -1641,11 +1653,11 @@ def test_kasp_reload_restart(servers):
soa1, ttl1 = query_soa(zone)
assert ttl1 == 300
server.stop()
ns6.stop()
shutil.copyfile(f"ns6/{zone}3.db.in", f"ns6/{zone}.db")
os.unlink(f"ns6/{zone}.db.jnl")
with server.watch_log_from_here() as watcher:
server.start(["--noclean", "--restart", "--port", os.environ["PORT"]])
with ns6.watch_log_from_here() as watcher:
ns6.start(["--noclean", "--restart", "--port", os.environ["PORT"]])
watcher.wait_for_line("all zones loaded")
newttl = 400

View file

@ -18,9 +18,9 @@ pytestmark = pytest.mark.extra_artifacts(
)
def test_dig_tcp_keepalive_handling(named_port, servers):
def test_dig_tcp_keepalive_handling(named_port, ns2):
def get_keepalive_options_received():
servers["ns2"].rndc("stats", log=False)
ns2.rndc("stats", log=False)
options_received = 0
with open("ns2/named.stats", "r", encoding="utf-8") as ns2_stats_file:
for line in ns2_stats_file:
@ -53,7 +53,7 @@ def test_dig_tcp_keepalive_handling(named_port, servers):
)
isctest.log.info("check a re-configured keepalive value")
response = servers["ns2"].rndc("tcp-timeouts 300 300 300 200", log=False)
response = ns2.rndc("tcp-timeouts 300 300 300 200", log=False)
assert "tcp-initial-timeout=300" in response
assert "tcp-idle-timeout=300" in response
assert "tcp-keepalive-timeout=300" in response

View file

@ -564,7 +564,7 @@ def test_ksr_errors():
assert "dnssec-ksr: fatal: 'sign' requires a KSR file" in err
def test_ksr_common(servers):
def test_ksr_common(ns1):
# common test cases (1)
zone = "common.test"
policy = "common"
@ -738,7 +738,6 @@ def test_ksr_common(servers):
)
# add zone
ns1 = servers["ns1"]
ns1.rndc(
f"addzone {zone} "
+ "{ type primary; file "
@ -764,7 +763,7 @@ def test_ksr_common(servers):
isctest.kasp.check_subdomain(ns1, zone, ksks, overlapping_zsks, offline_ksk=True)
def test_ksr_lastbundle(servers):
def test_ksr_lastbundle(ns1):
zone = "last-bundle.test"
policy = "common"
n = 1
@ -811,7 +810,6 @@ def test_ksr_lastbundle(servers):
check_signedkeyresponse(out, zone, ksks, zsks, then, until, refresh)
# add zone
ns1 = servers["ns1"]
ns1.rndc(
f"addzone {zone} "
+ "{ type primary; file "
@ -841,7 +839,7 @@ def test_ksr_lastbundle(servers):
assert f"zone {zone}/IN (signed): zone_rekey: {warning}" in ns1.log
def test_ksr_inthemiddle(servers):
def test_ksr_inthemiddle(ns1):
zone = "in-the-middle.test"
policy = "common"
n = 1
@ -889,7 +887,6 @@ def test_ksr_inthemiddle(servers):
check_signedkeyresponse(out, zone, ksks, zsks, then, until, refresh)
# add zone
ns1 = servers["ns1"]
ns1.rndc(
f"addzone {zone} "
+ "{ type primary; file "
@ -982,18 +979,14 @@ def check_ksr_rekey_logs_error(server, zone, policy, offset, end):
assert line in server.log
def test_ksr_rekey_logs_error(servers):
def test_ksr_rekey_logs_error(ns1):
# check that an SKR that is too old logs error
check_ksr_rekey_logs_error(
servers["ns1"], "past.test", "common", -63072000, -31536000
)
check_ksr_rekey_logs_error(ns1, "past.test", "common", -63072000, -31536000)
# check that an SKR that is too new logs error
check_ksr_rekey_logs_error(
servers["ns1"], "future.test", "common", 2592000, 31536000
)
check_ksr_rekey_logs_error(ns1, "future.test", "common", 2592000, 31536000)
def test_ksr_unlimited(servers):
def test_ksr_unlimited(ns1):
zone = "unlimited.test"
policy = "unlimited"
n = 1
@ -1082,7 +1075,6 @@ def test_ksr_unlimited(servers):
check_signedkeyresponse(out, zone, ksks, zsks, now, until, refresh)
# add zone
ns1 = servers["ns1"]
ns1.rndc(
f"addzone {zone} "
+ "{ type primary; file "
@ -1108,7 +1100,7 @@ def test_ksr_unlimited(servers):
isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True)
def test_ksr_twotone(servers):
def test_ksr_twotone(ns1):
zone = "two-tone.test"
policy = "two-tone"
n = 1
@ -1192,7 +1184,6 @@ def test_ksr_twotone(servers):
check_signedkeyresponse(out, zone, ksks, zsks, now, until, refresh)
# add zone
ns1 = servers["ns1"]
ns1.rndc(
f"addzone {zone} "
+ "{ type primary; file "
@ -1224,7 +1215,7 @@ def test_ksr_twotone(servers):
isctest.kasp.check_subdomain(ns1, zone, ksks, zsks, offline_ksk=True)
def test_ksr_kskroll(servers):
def test_ksr_kskroll(ns1):
zone = "ksk-roll.test"
policy = "ksk-roll"
n = 1
@ -1270,7 +1261,6 @@ def test_ksr_kskroll(servers):
check_signedkeyresponse(out, zone, ksks, zsks, now, until, refresh)
# add zone
ns1 = servers["ns1"]
ns1.rndc(
f"addzone {zone} "
+ "{ type primary; file "

View file

@ -9,19 +9,16 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from typing import Dict
from isctest.instance import NamedInstance
from isctest.mark import live_internet_test
@live_internet_test
def test_mirror_root_zone(servers: Dict[str, NamedInstance]):
def test_mirror_root_zone(ns1: NamedInstance):
"""
This test pulls the root zone from the Internet, so let's only run
it when CI_ENABLE_LIVE_INTERNET_TESTS is set.
"""
ns1 = servers["ns1"]
with ns1.watch_log_from_start() as watch_log:
# TimeoutError is raised if the line is not found and the test will fail.
watch_log.wait_for_line("Transfer status: success")

View file

@ -27,22 +27,22 @@ pytestmark = [
]
def test_nzd2nzf(servers):
def test_nzd2nzf(ns1):
zone_data = '"added.example" { type primary; file "added.db"; };'
msg = dns.message.make_query("a.added.example.", "A")
# query for non-existing zone data
res = isctest.query.tcp(msg, servers["ns1"].ip)
res = isctest.query.tcp(msg, ns1.ip)
isctest.check.refused(res)
# add new zone into the default NZD using "rndc addzone"
servers["ns1"].rndc(f"addzone {zone_data}", log=False)
ns1.rndc(f"addzone {zone_data}", log=False)
# query for existing zone data
res = isctest.query.tcp(msg, servers["ns1"].ip)
res = isctest.query.tcp(msg, ns1.ip)
isctest.check.noerror(res)
servers["ns1"].stop()
ns1.stop()
# dump "_default.nzd" to "_default.nzf" and check that it contains the expected content
cfg_dir = "ns1"
@ -59,8 +59,8 @@ def test_nzd2nzf(servers):
os.remove(nzd_filename)
# start ns1 again, it should migrate "_default.nzf" to "_default.nzd"
servers["ns1"].start(["--noclean", "--restart", "--port", os.environ["PORT"]])
ns1.start(["--noclean", "--restart", "--port", os.environ["PORT"]])
# query for zone data from the migrated zone config
res = isctest.query.tcp(msg, servers["ns1"].ip)
res = isctest.query.tcp(msg, ns1.ip)
isctest.check.noerror(res)

View file

@ -20,13 +20,13 @@ pytestmark = pytest.mark.extra_artifacts(
@live_internet_test
def test_rfc5011_rootdnskeyrefresh(servers):
with servers["ns1"].watch_log_from_start() as watcher:
def test_rfc5011_rootdnskeyrefresh(ns1):
with ns1.watch_log_from_start() as watcher:
watcher.wait_for_line(
"managed-keys-zone: Initializing automatic trust anchor management for zone '.'; DNSKEY ID 20326 is now trusted, waiving the normal 30-day waiting period"
)
with servers["ns1"].watch_log_from_start() as watcher:
with ns1.watch_log_from_start() as watcher:
watcher.wait_for_line(
"managed-keys-zone: Initializing automatic trust anchor management for zone '.'; DNSKEY ID 38696 is now trusted, waiving the normal 30-day waiting period"
)

View file

@ -21,16 +21,19 @@ from common import (
)
def test_algoroll_csk_initial(servers):
def test_algoroll_csk_initial(ns6):
config = ALGOROLL_CONFIG
policy = "csk-algoroll"
zone = "step1.csk-algorithm-roll.kasp"
isctest.kasp.wait_keymgr_done(ns6, zone)
step = {
"zone": "step1.csk-algorithm-roll.kasp",
"zone": zone,
"cdss": CDSS,
"keyprops": [
f"csk 0 8 2048 goal:omnipresent dnskey:omnipresent krrsig:omnipresent zrrsig:omnipresent ds:omnipresent offset:{-DURATION['P7D']}",
],
"nextev": TIMEDELTA["PT1H"],
}
isctest.kasp.check_rollover_step(servers["ns6"], config, policy, step)
isctest.kasp.check_rollover_step(ns6, config, policy, step)

View file

@ -37,20 +37,26 @@ TIME_PASSED = 0 # set in reconfigure() fixture
@pytest.fixture(scope="module", autouse=True)
def reconfigure(servers, templates):
def reconfigure(ns6, templates):
global TIME_PASSED # pylint: disable=global-statement
start_time = KeyTimingMetadata.now()
isctest.kasp.wait_keymgr_done(ns6, "step1.csk-algorithm-roll.kasp")
templates.render("ns6/named.conf", {"csk_roll": True})
servers["ns6"].reconfigure()
start_time = KeyTimingMetadata.now()
ns6.reconfigure()
# Calculate time passed to correctly check for next key events.
TIME_PASSED = KeyTimingMetadata.now().value - start_time.value
def test_algoroll_csk_reconfig_step1(servers, alg, size):
def test_algoroll_csk_reconfig_step1(ns6, alg, size):
zone = "step1.csk-algorithm-roll.kasp"
isctest.kasp.wait_keymgr_done(ns6, zone, reconfig=True)
step = {
"zone": "step1.csk-algorithm-roll.kasp",
"zone": zone,
"cdss": CDSS,
"keyprops": [
# The RSASHA keys are outroducing.
@ -61,12 +67,16 @@ def test_algoroll_csk_reconfig_step1(servers, alg, size):
# Next key event is when the ecdsa256 keys have been propagated.
"nextev": ALGOROLL_IPUB,
}
isctest.kasp.check_rollover_step(servers["ns6"], CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns6, CONFIG, POLICY, step)
def test_algoroll_csk_reconfig_step2(servers, alg, size):
def test_algoroll_csk_reconfig_step2(ns6, alg, size):
zone = "step2.csk-algorithm-roll.kasp"
isctest.kasp.wait_keymgr_done(ns6, zone, reconfig=True)
step = {
"zone": "step2.csk-algorithm-roll.kasp",
"zone": zone,
"cdss": CDSS,
"keyprops": [
# The RSASHA keys are outroducing, but need to stay present
@ -84,12 +94,16 @@ def test_algoroll_csk_reconfig_step2(servers, alg, size):
# the time passed between key creation and invoking 'rndc reconfig'.
"nextev": ALGOROLL_IPUBC - ALGOROLL_IPUB - TIME_PASSED,
}
isctest.kasp.check_rollover_step(servers["ns6"], CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns6, CONFIG, POLICY, step)
def test_algoroll_csk_reconfig_step3(servers, alg, size):
def test_algoroll_csk_reconfig_step3(ns6, alg, size):
zone = "step3.csk-algorithm-roll.kasp"
isctest.kasp.wait_keymgr_done(ns6, zone, reconfig=True)
step = {
"zone": "step3.csk-algorithm-roll.kasp",
"zone": zone,
"cdss": CDSS,
"keyprops": [
# The DS can be swapped.
@ -100,12 +114,16 @@ def test_algoroll_csk_reconfig_step3(servers, alg, size):
# after the publication interval of the parent side.
"nextev": ALGOROLL_IRETKSK - TIME_PASSED,
}
isctest.kasp.check_rollover_step(servers["ns6"], CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns6, CONFIG, POLICY, step)
def test_algoroll_csk_reconfig_step4(servers, alg, size):
def test_algoroll_csk_reconfig_step4(ns6, alg, size):
zone = "step4.csk-algorithm-roll.kasp"
isctest.kasp.wait_keymgr_done(ns6, zone, reconfig=True)
step = {
"zone": "step4.csk-algorithm-roll.kasp",
"zone": zone,
"cdss": CDSS,
"keyprops": [
# The old DS is HIDDEN, we can remove the old algorithm records.
@ -116,12 +134,16 @@ def test_algoroll_csk_reconfig_step4(servers, alg, size):
# This happens after the DNSKEY TTL plus zone propagation delay.
"nextev": ALGOROLL_KEYTTLPROP,
}
isctest.kasp.check_rollover_step(servers["ns6"], CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns6, CONFIG, POLICY, step)
def test_algoroll_csk_reconfig_step5(servers, alg, size):
def test_algoroll_csk_reconfig_step5(ns6, alg, size):
zone = "step5.csk-algorithm-roll.kasp"
isctest.kasp.wait_keymgr_done(ns6, zone, reconfig=True)
step = {
"zone": "step5.csk-algorithm-roll.kasp",
"zone": zone,
"cdss": CDSS,
"keyprops": [
# The DNSKEY becomes HIDDEN.
@ -136,12 +158,16 @@ def test_algoroll_csk_reconfig_step5(servers, alg, size):
# between key creation and invoking 'rndc reconfig'.
"nextev": ALGOROLL_IRET - ALGOROLL_IRETKSK - ALGOROLL_KEYTTLPROP - TIME_PASSED,
}
isctest.kasp.check_rollover_step(servers["ns6"], CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns6, CONFIG, POLICY, step)
def test_algoroll_csk_reconfig_step6(servers, alg, size):
def test_algoroll_csk_reconfig_step6(ns6, alg, size):
zone = "step6.csk-algorithm-roll.kasp"
isctest.kasp.wait_keymgr_done(ns6, zone, reconfig=True)
step = {
"zone": "step6.csk-algorithm-roll.kasp",
"zone": zone,
"cdss": CDSS,
"keyprops": [
# The zone signatures are now HIDDEN.
@ -153,4 +179,4 @@ def test_algoroll_csk_reconfig_step6(servers, alg, size):
# loadkeys interval.
"nextev": TIMEDELTA["PT1H"],
}
isctest.kasp.check_rollover_step(servers["ns6"], CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns6, CONFIG, POLICY, step)

View file

@ -21,12 +21,15 @@ from common import (
)
def test_algoroll_ksk_zsk_initial(servers):
def test_algoroll_ksk_zsk_initial(ns6):
config = ALGOROLL_CONFIG
policy = "rsasha256"
zone = "step1.algorithm-roll.kasp"
isctest.kasp.wait_keymgr_done(ns6, zone)
step = {
"zone": "step1.algorithm-roll.kasp",
"zone": zone,
"cdss": CDSS,
"keyprops": [
f"ksk 0 8 2048 goal:omnipresent dnskey:omnipresent krrsig:omnipresent ds:omnipresent offset:{-DURATION['P7D']}",
@ -34,4 +37,4 @@ def test_algoroll_ksk_zsk_initial(servers):
],
"nextev": TIMEDELTA["PT1H"],
}
isctest.kasp.check_rollover_step(servers["ns6"], config, policy, step)
isctest.kasp.check_rollover_step(ns6, config, policy, step)

View file

@ -37,20 +37,26 @@ TIME_PASSED = 0 # set in reconfigure() fixture
@pytest.fixture(scope="module", autouse=True)
def reconfigure(servers, templates):
def reconfigure(ns6, templates):
global TIME_PASSED # pylint: disable=global-statement
start_time = KeyTimingMetadata.now()
isctest.kasp.wait_keymgr_done(ns6, "step1.algorithm-roll.kasp")
templates.render("ns6/named.conf", {"alg_roll": True})
servers["ns6"].reconfigure()
start_time = KeyTimingMetadata.now()
ns6.reconfigure()
# Calculate time passed to correctly check for next key events.
TIME_PASSED = KeyTimingMetadata.now().value - start_time.value
def test_algoroll_ksk_zsk_reconfig_step1(servers, alg, size):
def test_algoroll_ksk_zsk_reconfig_step1(ns6, alg, size):
zone = "step1.algorithm-roll.kasp"
isctest.kasp.wait_keymgr_done(ns6, zone, reconfig=True)
step = {
"zone": "step1.algorithm-roll.kasp",
"zone": zone,
"cdss": CDSS,
"keyprops": [
# The RSASHA keys are outroducing.
@ -63,12 +69,16 @@ def test_algoroll_ksk_zsk_reconfig_step1(servers, alg, size):
# Next key event is when the ecdsa256 keys have been propagated.
"nextev": ALGOROLL_IPUB,
}
isctest.kasp.check_rollover_step(servers["ns6"], CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns6, CONFIG, POLICY, step)
def test_algoroll_ksk_zsk_reconfig_step2(servers, alg, size):
def test_algoroll_ksk_zsk_reconfig_step2(ns6, alg, size):
zone = "step2.algorithm-roll.kasp"
isctest.kasp.wait_keymgr_done(ns6, zone, reconfig=True)
step = {
"zone": "step2.algorithm-roll.kasp",
"zone": zone,
"cdss": CDSS,
"keyprops": [
# The RSASHA keys are outroducing, but need to stay present
@ -88,12 +98,16 @@ def test_algoroll_ksk_zsk_reconfig_step2(servers, alg, size):
# key creation and invoking 'rndc reconfig'.
"nextev": ALGOROLL_IPUBC - ALGOROLL_IPUB - TIME_PASSED,
}
isctest.kasp.check_rollover_step(servers["ns6"], CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns6, CONFIG, POLICY, step)
def test_algoroll_ksk_zsk_reconfig_step3(servers, alg, size):
def test_algoroll_ksk_zsk_reconfig_step3(ns6, alg, size):
zone = "step3.algorithm-roll.kasp"
isctest.kasp.wait_keymgr_done(ns6, zone, reconfig=True)
step = {
"zone": "step3.algorithm-roll.kasp",
"zone": zone,
"cdss": CDSS,
"keyprops": [
# The DS can be swapped.
@ -106,12 +120,16 @@ def test_algoroll_ksk_zsk_reconfig_step3(servers, alg, size):
# after the retire interval.
"nextev": ALGOROLL_IRETKSK - TIME_PASSED,
}
isctest.kasp.check_rollover_step(servers["ns6"], CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns6, CONFIG, POLICY, step)
def test_algoroll_ksk_zsk_reconfig_step4(servers, alg, size):
def test_algoroll_ksk_zsk_reconfig_step4(ns6, alg, size):
zone = "step4.algorithm-roll.kasp"
isctest.kasp.wait_keymgr_done(ns6, zone, reconfig=True)
step = {
"zone": "step4.algorithm-roll.kasp",
"zone": zone,
"cdss": CDSS,
"keyprops": [
# The old DS is HIDDEN, we can remove the old algorithm records.
@ -124,12 +142,16 @@ def test_algoroll_ksk_zsk_reconfig_step4(servers, alg, size):
# This happens after the DNSKEY TTL plus zone propagation delay.
"nextev": ALGOROLL_KEYTTLPROP,
}
isctest.kasp.check_rollover_step(servers["ns6"], CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns6, CONFIG, POLICY, step)
def test_algoroll_ksk_zsk_reconfig_step5(servers, alg, size):
def test_algoroll_ksk_zsk_reconfig_step5(ns6, alg, size):
zone = "step5.algorithm-roll.kasp"
isctest.kasp.wait_keymgr_done(ns6, zone, reconfig=True)
step = {
"zone": "step5.algorithm-roll.kasp",
"zone": zone,
"cdss": CDSS,
"keyprops": [
# The DNSKEY becomes HIDDEN.
@ -146,12 +168,16 @@ def test_algoroll_ksk_zsk_reconfig_step5(servers, alg, size):
# between key creation and invoking 'rndc reconfig'.
"nextev": ALGOROLL_IRET - ALGOROLL_IRETKSK - ALGOROLL_KEYTTLPROP - TIME_PASSED,
}
isctest.kasp.check_rollover_step(servers["ns6"], CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns6, CONFIG, POLICY, step)
def test_algoroll_ksk_zsk_reconfig_step6(servers, alg, size):
def test_algoroll_ksk_zsk_reconfig_step6(ns6, alg, size):
zone = "step6.algorithm-roll.kasp"
isctest.kasp.wait_keymgr_done(ns6, zone, reconfig=True)
step = {
"zone": "step6.algorithm-roll.kasp",
"zone": zone,
"cdss": CDSS,
"keyprops": [
# The zone signatures are now HIDDEN.
@ -165,4 +191,4 @@ def test_algoroll_ksk_zsk_reconfig_step6(servers, alg, size):
# loadkeys interval.
"nextev": TIMEDELTA["PT1H"],
}
isctest.kasp.check_rollover_step(servers["ns6"], CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns6, CONFIG, POLICY, step)

View file

@ -62,10 +62,14 @@ OFFSETS["step8-p"] = OFFSETS["step7-p"] - int(CONFIG["purge-keys"].total_seconds
OFFSETS["step8-s"] = OFFSETS["step7-s"] - int(CONFIG["purge-keys"].total_seconds())
def test_csk_roll1_step1(alg, size, servers):
def test_csk_roll1_step1(alg, size, ns3):
zone = "step1.csk-roll1.autosign"
isctest.kasp.wait_keymgr_done(ns3, zone)
step = {
# Introduce the first key. This will immediately be active.
"zone": "step1.csk-roll1.autosign",
"zone": zone,
"cdss": CDSS,
"keyprops": [
f"csk {LIFETIME_POLICY} {alg} {size} goal:omnipresent dnskey:omnipresent krrsig:omnipresent zrrsig:omnipresent ds:omnipresent offset:{OFFSETS['step1-p']}",
@ -75,10 +79,14 @@ def test_csk_roll1_step1(alg, size, servers):
# registration delay).
"nextev": CSK_LIFETIME - IPUB - timedelta(days=7),
}
isctest.kasp.check_rollover_step(servers["ns3"], CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns3, CONFIG, POLICY, step)
def test_csk_roll1_step2(alg, size, servers):
def test_csk_roll1_step2(alg, size, ns3):
zone = "step2.csk-roll1.autosign"
isctest.kasp.wait_keymgr_done(ns3, zone)
step = {
# Successor CSK is prepublished (signs DNSKEY RRset, but not yet
# other RRsets).
@ -86,7 +94,7 @@ def test_csk_roll1_step2(alg, size, servers):
# CSK2 goal: hidden -> omnipresent
# CSK2 dnskey: hidden -> rumoured
# CSK2 krrsig: hidden -> rumoured
"zone": "step2.csk-roll1.autosign",
"zone": zone,
"cdss": CDSS,
"keyprops": [
f"csk {LIFETIME_POLICY} {alg} {size} goal:hidden dnskey:omnipresent krrsig:omnipresent zrrsig:omnipresent ds:omnipresent offset:{OFFSETS['step2-p']}",
@ -96,15 +104,19 @@ def test_csk_roll1_step2(alg, size, servers):
# Next key event is when the successor CSK becomes OMNIPRESENT.
"nextev": IPUB,
}
isctest.kasp.check_rollover_step(servers["ns3"], CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns3, CONFIG, POLICY, step)
def test_csk_roll1_step3(alg, size, servers):
def test_csk_roll1_step3(alg, size, ns3):
zone = "step3.csk-roll1.autosign"
isctest.kasp.wait_keymgr_done(ns3, zone)
step = {
# Successor CSK becomes omnipresent, meaning we can start signing
# the remainder of the zone with the successor CSK, and we can
# submit the DS.
"zone": "step3.csk-roll1.autosign",
"zone": zone,
"cdss": CDSS,
# Predecessor CSK will be removed, so moving to UNRETENTIVE.
# CSK1 zrrsig: omnipresent -> unretentive
@ -130,12 +142,16 @@ def test_csk_roll1_step3(alg, size, servers):
# from the predecessor ZSK.
"smooth": True,
}
isctest.kasp.check_rollover_step(servers["ns3"], CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns3, CONFIG, POLICY, step)
def test_csk_roll1_step4(alg, size, servers):
def test_csk_roll1_step4(alg, size, ns3):
zone = "step4.csk-roll1.autosign"
isctest.kasp.wait_keymgr_done(ns3, zone)
step = {
"zone": "step4.csk-roll1.autosign",
"zone": zone,
"cdss": CDSS,
# The predecessor CSK is no longer signing the DNSKEY RRset.
# CSK1 krrsig: omnipresent -> unretentive
@ -153,12 +169,16 @@ def test_csk_roll1_step4(alg, size, servers):
# We already swapped the DS in the previous step, so disable ds-swap.
"ds-swap": False,
}
isctest.kasp.check_rollover_step(servers["ns3"], CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns3, CONFIG, POLICY, step)
def test_csk_roll1_step5(alg, size, servers):
def test_csk_roll1_step5(alg, size, ns3):
zone = "step5.csk-roll1.autosign"
isctest.kasp.wait_keymgr_done(ns3, zone)
step = {
"zone": "step5.csk-roll1.autosign",
"zone": zone,
"cdss": CDSS,
# The predecessor KRRSIG records are now all hidden.
# CSK1 krrsig: unretentive -> hidden
@ -172,12 +192,16 @@ def test_csk_roll1_step5(alg, size, servers):
# CSK.
"nextev": SIGNDELAY,
}
isctest.kasp.check_rollover_step(servers["ns3"], CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns3, CONFIG, POLICY, step)
def test_csk_roll1_step6(alg, size, servers):
def test_csk_roll1_step6(alg, size, ns3):
zone = "step6.csk-roll1.autosign"
isctest.kasp.wait_keymgr_done(ns3, zone)
step = {
"zone": "step6.csk-roll1.autosign",
"zone": zone,
"cdss": CDSS,
# The predecessor ZRRSIG records are now all hidden (so the DNSKEY
# can be removed).
@ -193,12 +217,16 @@ def test_csk_roll1_step6(alg, size, servers):
# This is the DNSKEY TTL plus zone propagation delay.
"nextev": KEYTTLPROP,
}
isctest.kasp.check_rollover_step(servers["ns3"], CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns3, CONFIG, POLICY, step)
def test_csk_roll1_step7(alg, size, servers):
def test_csk_roll1_step7(alg, size, ns3):
zone = "step7.csk-roll1.autosign"
isctest.kasp.wait_keymgr_done(ns3, zone)
step = {
"zone": "step7.csk-roll1.autosign",
"zone": zone,
"cdss": CDSS,
# The predecessor CSK is now completely HIDDEN.
"keyprops": [
@ -211,16 +239,20 @@ def test_csk_roll1_step7(alg, size, servers):
# minus the prepublication time.
"nextev": CSK_LIFETIME - IRETZSK - IPUB - KEYTTLPROP,
}
isctest.kasp.check_rollover_step(servers["ns3"], CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns3, CONFIG, POLICY, step)
def test_csk_roll1_step8(alg, size, servers):
def test_csk_roll1_step8(alg, size, ns3):
zone = "step8.csk-roll1.autosign"
isctest.kasp.wait_keymgr_done(ns3, zone)
step = {
"zone": "step8.csk-roll1.autosign",
"zone": zone,
"cdss": CDSS,
"keyprops": [
f"csk {LIFETIME_POLICY} {alg} {size} goal:omnipresent dnskey:omnipresent krrsig:omnipresent zrrsig:omnipresent ds:omnipresent offset:{OFFSETS['step8-s']}",
],
"nextev": None,
}
isctest.kasp.check_rollover_step(servers["ns3"], CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns3, CONFIG, POLICY, step)

View file

@ -65,10 +65,14 @@ OFFSETS["step7-p"] = OFFSETS["step6-p"] - int(timedelta(days=90).total_seconds()
OFFSETS["step7-s"] = OFFSETS["step6-s"] - int(timedelta(days=90).total_seconds())
def test_csk_roll2_step1(alg, size, servers):
def test_csk_roll2_step1(alg, size, ns3):
zone = "step1.csk-roll2.autosign"
isctest.kasp.wait_keymgr_done(ns3, zone)
step = {
# Introduce the first key. This will immediately be active.
"zone": "step1.csk-roll2.autosign",
"zone": zone,
"cdss": CDSS,
"keyprops": [
f"csk {LIFETIME_POLICY} {alg} {size} goal:omnipresent dnskey:omnipresent krrsig:omnipresent zrrsig:omnipresent ds:omnipresent offset:{OFFSETS['step1-p']}",
@ -78,10 +82,14 @@ def test_csk_roll2_step1(alg, size, servers):
# registration delay).
"nextev": CSK_LIFETIME - IPUB - TIMEDELTA["P7D"],
}
isctest.kasp.check_rollover_step(servers["ns3"], CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns3, CONFIG, POLICY, step)
def test_csk_roll2_step2(alg, size, servers):
def test_csk_roll2_step2(alg, size, ns3):
zone = "step2.csk-roll2.autosign"
isctest.kasp.wait_keymgr_done(ns3, zone)
step = {
# Successor CSK is prepublished (signs DNSKEY RRset, but not yet
# other RRsets).
@ -89,7 +97,7 @@ def test_csk_roll2_step2(alg, size, servers):
# CSK2 goal: hidden -> omnipresent
# CSK2 dnskey: hidden -> rumoured
# CSK2 krrsig: hidden -> rumoured
"zone": "step2.csk-roll2.autosign",
"zone": zone,
"cdss": CDSS,
"keyprops": [
f"csk {LIFETIME_POLICY} {alg} {size} goal:hidden dnskey:omnipresent krrsig:omnipresent zrrsig:omnipresent ds:omnipresent offset:{OFFSETS['step2-p']}",
@ -99,15 +107,19 @@ def test_csk_roll2_step2(alg, size, servers):
# Next key event is when the successor CSK becomes OMNIPRESENT.
"nextev": IPUB,
}
isctest.kasp.check_rollover_step(servers["ns3"], CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns3, CONFIG, POLICY, step)
def test_csk_roll2_step3(alg, size, servers):
def test_csk_roll2_step3(alg, size, ns3):
zone = "step3.csk-roll2.autosign"
isctest.kasp.wait_keymgr_done(ns3, zone)
step = {
# Successor CSK becomes omnipresent, meaning we can start signing
# the remainder of the zone with the successor CSK, and we can
# submit the DS.
"zone": "step3.csk-roll2.autosign",
"zone": zone,
"cdss": CDSS,
# Predecessor CSK will be removed, so moving to UNRETENTIVE.
# CSK1 zrrsig: omnipresent -> unretentive
@ -133,12 +145,16 @@ def test_csk_roll2_step3(alg, size, servers):
# from the predecessor ZSK.
"smooth": True,
}
isctest.kasp.check_rollover_step(servers["ns3"], CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns3, CONFIG, POLICY, step)
def test_csk_roll2_step4(alg, size, servers):
def test_csk_roll2_step4(alg, size, ns3):
zone = "step4.csk-roll2.autosign"
isctest.kasp.wait_keymgr_done(ns3, zone)
step = {
"zone": "step4.csk-roll2.autosign",
"zone": zone,
"cdss": CDSS,
# The predecessor ZRRSIG is HIDDEN. The successor ZRRSIG is
# OMNIPRESENT.
@ -158,12 +174,16 @@ def test_csk_roll2_step4(alg, size, servers):
# We already swapped the DS in the previous step, so disable ds-swap.
"ds-swap": False,
}
isctest.kasp.check_rollover_step(servers["ns3"], CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns3, CONFIG, POLICY, step)
def test_csk_roll2_step5(alg, size, servers):
def test_csk_roll2_step5(alg, size, ns3):
zone = "step5.csk-roll2.autosign"
isctest.kasp.wait_keymgr_done(ns3, zone)
step = {
"zone": "step5.csk-roll2.autosign",
"zone": zone,
"cdss": CDSS,
# The predecessor DNSKEY can be removed.
# CSK1 dnskey: omnipresent -> unretentive
@ -180,12 +200,16 @@ def test_csk_roll2_step5(alg, size, servers):
# This is the DNSKEY TTL plus zone propagation delay.
"nextev": KEYTTLPROP,
}
isctest.kasp.check_rollover_step(servers["ns3"], CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns3, CONFIG, POLICY, step)
def test_csk_roll2_step6(alg, size, servers):
def test_csk_roll2_step6(alg, size, ns3):
zone = "step6.csk-roll2.autosign"
isctest.kasp.wait_keymgr_done(ns3, zone)
step = {
"zone": "step6.csk-roll2.autosign",
"zone": zone,
"cdss": CDSS,
# The predecessor CSK is now completely HIDDEN.
# CSK1 dnskey: unretentive -> hidden
@ -199,12 +223,16 @@ def test_csk_roll2_step6(alg, size, servers):
# This is the Lcsk, minus time passed since the key was published.
"nextev": CSK_LIFETIME - IRET - IPUB - KEYTTLPROP,
}
isctest.kasp.check_rollover_step(servers["ns3"], CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns3, CONFIG, POLICY, step)
def test_csk_roll2_step7(alg, size, servers):
def test_csk_roll2_step7(alg, size, ns3):
zone = "step7.csk-roll2.autosign"
isctest.kasp.wait_keymgr_done(ns3, zone)
step = {
"zone": "step7.csk-roll2.autosign",
"zone": zone,
"cdss": CDSS,
# The predecessor CSK is now completely HIDDEN.
"keyprops": [
@ -214,4 +242,4 @@ def test_csk_roll2_step7(alg, size, servers):
"keyrelationships": [0, 1],
"nextev": None,
}
isctest.kasp.check_rollover_step(servers["ns3"], CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns3, CONFIG, POLICY, step)

View file

@ -21,12 +21,15 @@ from common import (
)
def test_dynamic2inline(alg, size, servers, templates):
def test_dynamic2inline(alg, size, ns6, templates):
config = DEFAULT_CONFIG
policy = "default"
zone = "dynamic2inline.kasp"
isctest.kasp.wait_keymgr_done(ns6, zone)
step = {
"zone": "dynamic2inline.kasp",
"zone": zone,
"cdss": CDSS,
"keyprops": [
f"csk unlimited {alg} {size} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden",
@ -34,9 +37,10 @@ def test_dynamic2inline(alg, size, servers, templates):
"nextev": None,
}
isctest.kasp.check_rollover_step(servers["ns6"], config, policy, step)
isctest.kasp.check_rollover_step(ns6, config, policy, step)
templates.render("ns6/named.conf", {"change_lifetime": True})
servers["ns6"].reconfigure()
ns6.reconfigure()
isctest.kasp.wait_keymgr_done(ns6, zone, reconfig=True)
isctest.kasp.check_rollover_step(servers["ns6"], config, policy, step)
isctest.kasp.check_rollover_step(ns6, config, policy, step)

View file

@ -44,9 +44,13 @@ OFFSETS["step3"] = -int(IRETZSK.total_seconds())
OFFSETS["step4"] = -int(IPUBC.total_seconds() + IRETKSK.total_seconds())
def test_rollover_enable_dnssec_step1(alg, size, servers):
def test_rollover_enable_dnssec_step1(alg, size, ns3):
zone = "step1.enable-dnssec.autosign"
isctest.kasp.wait_keymgr_done(ns3, zone)
step = {
"zone": "step1.enable-dnssec.autosign",
"zone": zone,
"cdss": CDSS,
"keyprops": [
f"csk unlimited {alg} {size} goal:omnipresent dnskey:rumoured krrsig:rumoured zrrsig:rumoured ds:hidden offset:{OFFSETS['step1']}",
@ -55,12 +59,16 @@ def test_rollover_enable_dnssec_step1(alg, size, servers):
# after the publication interval.
"nextev": IPUB,
}
isctest.kasp.check_rollover_step(servers["ns3"], CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns3, CONFIG, POLICY, step)
def test_rollover_enable_dnssec_step2(alg, size, servers):
def test_rollover_enable_dnssec_step2(alg, size, ns3):
zone = "step2.enable-dnssec.autosign"
isctest.kasp.wait_keymgr_done(ns3, zone)
step = {
"zone": "step2.enable-dnssec.autosign",
"zone": zone,
"cdss": CDSS,
# The DNSKEY is omnipresent, but the zone signatures not yet.
# Thus, the DS remains hidden.
@ -73,12 +81,16 @@ def test_rollover_enable_dnssec_step2(alg, size, servers):
# Minus the time already elapsed.
"nextev": IRETZSK - IPUB,
}
isctest.kasp.check_rollover_step(servers["ns3"], CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns3, CONFIG, POLICY, step)
def test_rollover_enable_dnssec_step3(alg, size, servers):
def test_rollover_enable_dnssec_step3(alg, size, ns3):
zone = "step3.enable-dnssec.autosign"
isctest.kasp.wait_keymgr_done(ns3, zone)
step = {
"zone": "step3.enable-dnssec.autosign",
"zone": zone,
"cdss": CDSS,
# All signatures should be omnipresent, so the DS can be submitted.
# zrrsig: rumoured -> omnipresent
@ -90,12 +102,16 @@ def test_rollover_enable_dnssec_step3(alg, size, servers):
# This is after the retire interval.
"nextev": IRETKSK,
}
isctest.kasp.check_rollover_step(servers["ns3"], CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns3, CONFIG, POLICY, step)
def test_rollover_enable_dnssec_step4(alg, size, servers):
def test_rollover_enable_dnssec_step4(alg, size, ns3):
zone = "step4.enable-dnssec.autosign"
isctest.kasp.wait_keymgr_done(ns3, zone)
step = {
"zone": "step4.enable-dnssec.autosign",
"zone": zone,
"cdss": CDSS,
# DS has been published long enough.
# ds: rumoured -> omnipresent
@ -106,4 +122,4 @@ def test_rollover_enable_dnssec_step4(alg, size, servers):
# established. So we fall back to the default loadkeys interval.
"nextev": TIMEDELTA["PT1H"],
}
isctest.kasp.check_rollover_step(servers["ns3"], CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns3, CONFIG, POLICY, step)

View file

@ -31,11 +31,15 @@ from common import (
"going-insecure-dynamic.kasp",
],
)
def test_going_insecure_initial(zone, servers, alg, size):
def test_going_insecure_initial(zone, ns6, alg, size):
config = UNSIGNING_CONFIG
policy = "unsigning"
zone = f"step1.{zone}"
isctest.kasp.wait_keymgr_done(ns6, zone)
step = {
"zone": f"step1.{zone}",
"zone": zone,
"cdss": CDSS,
"keyprops": [
f"ksk 0 {alg} {size} goal:omnipresent dnskey:omnipresent krrsig:omnipresent ds:omnipresent offset:{-DURATION['P10D']}",
@ -43,4 +47,4 @@ def test_going_insecure_initial(zone, servers, alg, size):
],
"nextev": None,
}
isctest.kasp.check_rollover_step(servers["ns6"], config, policy, step)
isctest.kasp.check_rollover_step(ns6, config, policy, step)

View file

@ -26,9 +26,9 @@ from common import (
@pytest.fixture(scope="module", autouse=True)
def reconfigure_policy(servers, templates):
def reconfigure_policy(ns6, templates):
templates.render("ns6/named.conf", {"policy": "insecure"})
servers["ns6"].reconfigure()
ns6.reconfigure()
@pytest.mark.parametrize(
@ -38,14 +38,17 @@ def reconfigure_policy(servers, templates):
"going-insecure-dynamic.kasp",
],
)
def test_going_insecure_reconfig_step1(zone, alg, size, servers):
def test_going_insecure_reconfig_step1(zone, alg, size, ns6):
config = DEFAULT_CONFIG
policy = "insecure"
zone = f"step1.{zone}"
isctest.kasp.wait_keymgr_done(ns6, zone, reconfig=True)
# Key goal states should be HIDDEN.
# The DS may be removed if we are going insecure.
step = {
"zone": f"step1.{zone}",
"zone": zone,
"cdss": CDSS,
"keyprops": [
f"ksk 0 {alg} {size} goal:hidden dnskey:omnipresent krrsig:omnipresent ds:unretentive offset:{-DURATION['P10D']}",
@ -58,7 +61,7 @@ def test_going_insecure_reconfig_step1(zone, alg, size, servers):
"cds-delete": True,
"check-keytimes": False,
}
isctest.kasp.check_rollover_step(servers["ns6"], config, policy, step)
isctest.kasp.check_rollover_step(ns6, config, policy, step)
@pytest.mark.parametrize(
@ -68,15 +71,18 @@ def test_going_insecure_reconfig_step1(zone, alg, size, servers):
"going-insecure-dynamic.kasp",
],
)
def test_going_insecure_reconfig_step2(zone, alg, size, servers):
def test_going_insecure_reconfig_step2(zone, alg, size, ns6):
config = DEFAULT_CONFIG
policy = "insecure"
zone = f"step2.{zone}"
isctest.kasp.wait_keymgr_done(ns6, zone, reconfig=True)
# The DS is long enough removed from the zone to be considered
# HIDDEN. This means the DNSKEY and the KSK signatures can be
# removed.
step = {
"zone": f"step2.{zone}",
"zone": zone,
"cdss": CDSS,
"keyprops": [
f"ksk 0 {alg} {size} goal:hidden dnskey:unretentive krrsig:unretentive ds:hidden offset:{-DURATION['P10D']}",
@ -90,4 +96,4 @@ def test_going_insecure_reconfig_step2(zone, alg, size, servers):
"zone-signed": False,
"check-keytimes": False,
}
isctest.kasp.check_rollover_step(servers["ns6"], config, policy, step)
isctest.kasp.check_rollover_step(ns6, config, policy, step)

View file

@ -33,11 +33,12 @@ OFFSET2 = -int(timedelta(hours=27).total_seconds())
TTL = int(KSK_CONFIG["dnskey-ttl"].total_seconds())
def test_rollover_ksk_three_is_a_crowd(alg, size, servers):
def test_rollover_ksk_three_is_a_crowd(alg, size, ns3):
"""Test #2375: Scheduled rollovers are happening faster than they can finish."""
server = servers["ns3"]
zone = "three-is-a-crowd.kasp"
isctest.kasp.wait_keymgr_done(ns3, zone)
step = {
"zone": zone,
"cdss": CDSS,
@ -48,16 +49,16 @@ def test_rollover_ksk_three_is_a_crowd(alg, size, servers):
],
"keyrelationships": [0, 1],
}
isctest.kasp.check_rollover_step(servers["ns3"], KSK_CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns3, KSK_CONFIG, POLICY, step)
# Rollover successor KSK (with DS in rumoured state).
expected = isctest.kasp.policy_to_properties(TTL, step["keyprops"])
keys = isctest.kasp.keydir_to_keylist(zone, server.identifier)
keys = isctest.kasp.keydir_to_keylist(zone, ns3.identifier)
isctest.kasp.check_keys(zone, keys, expected)
key = expected[1].key
now = KeyTimingMetadata.now()
with server.watch_log_from_here() as watcher:
server.rndc(f"dnssec -rollover -key {key.tag} -when {now} {zone}")
with ns3.watch_log_from_here() as watcher:
ns3.rndc(f"dnssec -rollover -key {key.tag} -when {now} {zone}")
watcher.wait_for_line(f"keymgr: {zone} done")
# We now expect four keys (3x KSK, 1x ZSK).
@ -72,10 +73,10 @@ def test_rollover_ksk_three_is_a_crowd(alg, size, servers):
],
"check-keytimes": False, # checked manually with modified values
}
isctest.kasp.check_rollover_step(servers["ns3"], KSK_CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns3, KSK_CONFIG, POLICY, step)
expected = isctest.kasp.policy_to_properties(TTL, step["keyprops"])
keys = isctest.kasp.keydir_to_keylist(zone, server.identifier)
keys = isctest.kasp.keydir_to_keylist(zone, ns3.identifier)
isctest.kasp.check_keys(zone, keys, expected)
expected[0].metadata["Successor"] = expected[1].key.tag

View file

@ -45,10 +45,14 @@ OFFSETS["step6-p"] = OFFSETS["step5-p"] - int(KSK_CONFIG["purge-keys"].total_sec
OFFSETS["step6-s"] = OFFSETS["step5-s"] - int(KSK_CONFIG["purge-keys"].total_seconds())
def test_ksk_doubleksk_step1(alg, size, servers):
def test_ksk_doubleksk_step1(alg, size, ns3):
zone = "step1.ksk-doubleksk.autosign"
isctest.kasp.wait_keymgr_done(ns3, zone)
step = {
# Introduce the first key. This will immediately be active.
"zone": "step1.ksk-doubleksk.autosign",
"zone": zone,
"cdss": CDSS,
"keyprops": [
f"zsk unlimited {alg} {size} goal:omnipresent dnskey:omnipresent zrrsig:omnipresent offset:{OFFSETS['step1-p']}",
@ -59,17 +63,21 @@ def test_ksk_doubleksk_step1(alg, size, servers):
# already passed).
"nextev": KSK_LIFETIME - KSK_IPUB - timedelta(days=7),
}
isctest.kasp.check_rollover_step(servers["ns3"], KSK_CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns3, KSK_CONFIG, POLICY, step)
def test_ksk_doubleksk_step2(alg, size, servers):
def test_ksk_doubleksk_step2(alg, size, ns3):
zone = "step2.ksk-doubleksk.autosign"
isctest.kasp.wait_keymgr_done(ns3, zone)
step = {
# Successor KSK is prepublished (and signs DNSKEY RRset).
# KSK1 goal: omnipresent -> hidden
# KSK2 goal: hidden -> omnipresent
# KSK2 dnskey: hidden -> rumoured
# KSK2 krrsig: hidden -> rumoured
"zone": "step2.ksk-doubleksk.autosign",
"zone": zone,
"cdss": CDSS,
"keyprops": [
f"zsk unlimited {alg} {size} goal:omnipresent dnskey:omnipresent zrrsig:omnipresent offset:{OFFSETS['step2-p']}",
@ -80,10 +88,14 @@ def test_ksk_doubleksk_step2(alg, size, servers):
# Next key event is when the successor KSK becomes OMNIPRESENT.
"nextev": KSK_IPUB,
}
isctest.kasp.check_rollover_step(servers["ns3"], KSK_CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns3, KSK_CONFIG, POLICY, step)
def test_ksk_doubleksk_step3(alg, size, servers):
def test_ksk_doubleksk_step3(alg, size, ns3):
zone = "step3.ksk-doubleksk.autosign"
isctest.kasp.wait_keymgr_done(ns3, zone)
step = {
# The successor DNSKEY RRset has become omnipresent. The
# predecessor DS can be withdrawn and the successor DS can be
@ -92,7 +104,7 @@ def test_ksk_doubleksk_step3(alg, size, servers):
# KSK2 dnskey: rumoured -> omnipresent
# KSK2 krrsig: rumoured -> omnipresent
# KSK2 ds: hidden -> rumoured
"zone": "step3.ksk-doubleksk.autosign",
"zone": zone,
"cdss": CDSS,
"keyprops": [
f"zsk unlimited {alg} {size} goal:omnipresent dnskey:omnipresent zrrsig:omnipresent offset:{OFFSETS['step3-p']}",
@ -106,10 +118,14 @@ def test_ksk_doubleksk_step3(alg, size, servers):
# successor DS. This is the the retire interval.
"nextev": KSK_IRET,
}
isctest.kasp.check_rollover_step(servers["ns3"], KSK_CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns3, KSK_CONFIG, POLICY, step)
def test_ksk_doubleksk_step4(alg, size, servers):
def test_ksk_doubleksk_step4(alg, size, ns3):
zone = "step4.ksk-doubleksk.autosign"
isctest.kasp.wait_keymgr_done(ns3, zone)
step = {
# The predecessor DNSKEY may be removed, the successor DS is
# omnipresent.
@ -117,7 +133,7 @@ def test_ksk_doubleksk_step4(alg, size, servers):
# KSK1 krrsig: omnipresent -> unretentive
# KSK1 ds: unretentive -> hidden
# KSK2 ds: rumoured -> omnipresent
"zone": "step4.ksk-doubleksk.autosign",
"zone": zone,
"cdss": CDSS,
"keyprops": [
f"zsk unlimited {alg} {size} goal:omnipresent dnskey:omnipresent zrrsig:omnipresent offset:{OFFSETS['step4-p']}",
@ -129,16 +145,20 @@ def test_ksk_doubleksk_step4(alg, size, servers):
# This is the DNSKEY TTL plus zone propagation delay.
"nextev": KSK_KEYTTLPROP,
}
isctest.kasp.check_rollover_step(servers["ns3"], KSK_CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns3, KSK_CONFIG, POLICY, step)
def test_ksk_doubleksk_step5(alg, size, servers):
def test_ksk_doubleksk_step5(alg, size, ns3):
zone = "step5.ksk-doubleksk.autosign"
isctest.kasp.wait_keymgr_done(ns3, zone)
step = {
# The predecessor DNSKEY is long enough removed from the zone it
# has become hidden.
# KSK1 dnskey: unretentive -> hidden
# KSK1 krrsig: unretentive -> hidden
"zone": "step5.ksk-doubleksk.autosign",
"zone": zone,
"cdss": CDSS,
"keyprops": [
f"zsk unlimited {alg} {size} goal:omnipresent dnskey:omnipresent zrrsig:omnipresent offset:{OFFSETS['step5-p']}",
@ -150,13 +170,17 @@ def test_ksk_doubleksk_step5(alg, size, servers):
# This is the KSK lifetime minus Ipub minus Iret minus time elapsed.
"nextev": KSK_LIFETIME - KSK_IPUB - KSK_IRET - KSK_KEYTTLPROP,
}
isctest.kasp.check_rollover_step(servers["ns3"], KSK_CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns3, KSK_CONFIG, POLICY, step)
def test_ksk_doubleksk_step6(alg, size, servers):
def test_ksk_doubleksk_step6(alg, size, ns3):
zone = "step6.ksk-doubleksk.autosign"
isctest.kasp.wait_keymgr_done(ns3, zone)
step = {
# Predecessor KSK is now purged.
"zone": "step6.ksk-doubleksk.autosign",
"zone": zone,
"cdss": CDSS,
"keyprops": [
f"zsk unlimited {alg} {size} goal:omnipresent dnskey:omnipresent zrrsig:omnipresent offset:{OFFSETS['step6-p']}",
@ -164,4 +188,4 @@ def test_ksk_doubleksk_step6(alg, size, servers):
],
"nextev": None,
}
isctest.kasp.check_rollover_step(servers["ns3"], KSK_CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns3, KSK_CONFIG, POLICY, step)

View file

@ -34,9 +34,11 @@ from common import (
param("unlimit-lifetime", "short-lifetime", "P6M"),
],
)
def test_lifetime_initial(zone, policy, lifetime, alg, size, servers):
def test_lifetime_initial(zone, policy, lifetime, alg, size, ns6):
config = DEFAULT_CONFIG
isctest.kasp.wait_keymgr_done(ns6, zone)
step = {
"zone": zone,
"cdss": CDSS,
@ -45,4 +47,4 @@ def test_lifetime_initial(zone, policy, lifetime, alg, size, servers):
],
"nextev": None,
}
isctest.kasp.check_rollover_step(servers["ns6"], config, policy, step)
isctest.kasp.check_rollover_step(ns6, config, policy, step)

View file

@ -26,9 +26,14 @@ from common import (
@pytest.fixture(scope="module", autouse=True)
def reconfigure_policy(servers, templates):
def reconfigure_policy(ns6, templates):
isctest.kasp.wait_keymgr_done(ns6, "shorter-lifetime")
isctest.kasp.wait_keymgr_done(ns6, "longer-lifetime")
isctest.kasp.wait_keymgr_done(ns6, "limit-lifetime")
isctest.kasp.wait_keymgr_done(ns6, "unlimit-lifetime")
templates.render("ns6/named.conf", {"change_lifetime": True})
servers["ns6"].reconfigure()
ns6.reconfigure()
@pytest.mark.parametrize(
@ -44,9 +49,11 @@ def reconfigure_policy(servers, templates):
param("unlimit-lifetime", "unlimited-lifetime", 0),
],
)
def test_lifetime_reconfig(zone, policy, lifetime, alg, size, servers):
def test_lifetime_reconfig(zone, policy, lifetime, alg, size, ns6):
config = DEFAULT_CONFIG
isctest.kasp.wait_keymgr_done(ns6, zone, reconfig=True)
step = {
"zone": zone,
"cdss": CDSS,
@ -55,4 +62,4 @@ def test_lifetime_reconfig(zone, policy, lifetime, alg, size, servers):
],
"nextev": None,
}
isctest.kasp.check_rollover_step(servers["ns6"], config, policy, step)
isctest.kasp.check_rollover_step(ns6, config, policy, step)

View file

@ -28,8 +28,7 @@ from common import (
)
def test_rollover_multisigner(servers, alg, size):
server = servers["ns3"]
def test_rollover_multisigner(ns3, alg, size):
policy = "multisigner-model2"
config = {
"dnskey-ttl": timedelta(hours=1),
@ -63,7 +62,9 @@ def test_rollover_multisigner(servers, alg, size):
zone = "multisigner-model2.kasp"
isctest.kasp.check_dnssec_verify(server, zone)
isctest.kasp.wait_keymgr_done(ns3, zone)
isctest.kasp.check_dnssec_verify(ns3, zone)
key_properties = [
f"ksk unlimited {alg} {size} goal:omnipresent dnskey:rumoured krrsig:rumoured ds:hidden tag-range:32768-65535",
@ -77,7 +78,7 @@ def test_rollover_multisigner(servers, alg, size):
expected2[0].properties["legacy"] = True
expected = expected + expected2
ownkeys = isctest.kasp.keydir_to_keylist(zone, server.identifier)
ownkeys = isctest.kasp.keydir_to_keylist(zone, ns3.identifier)
extkeys = isctest.kasp.keydir_to_keylist(zone)
keys = ownkeys + extkeys
ksks = [k for k in ownkeys if k.is_ksk()]
@ -88,9 +89,9 @@ def test_rollover_multisigner(servers, alg, size):
for kp in expected:
kp.set_expected_keytimes(config)
isctest.kasp.check_keytimes(keys, expected)
isctest.kasp.check_dnssecstatus(server, zone, keys, policy=policy)
isctest.kasp.check_apex(server, zone, ksks, zsks)
isctest.kasp.check_subdomain(server, zone, ksks, zsks)
isctest.kasp.check_dnssecstatus(ns3, zone, keys, policy=policy)
isctest.kasp.check_apex(ns3, zone, ksks, zsks)
isctest.kasp.check_subdomain(ns3, zone, ksks, zsks)
# Update zone with ZSK from another provider for zone.
out = keygen(zone)
@ -106,15 +107,15 @@ def test_rollover_multisigner(servers, alg, size):
update_msg = dns.update.UpdateMessage(zone)
update_msg.add(f"{dnskey[0]}", 3600, "DNSKEY", rdata)
server.nsupdate(update_msg)
ns3.nsupdate(update_msg)
isctest.kasp.check_dnssec_verify(server, zone)
isctest.kasp.check_dnssec_verify(ns3, zone)
keys = keys + newkeys
zsks = zsks + newkeys
isctest.kasp.check_keys(zone, keys, expected)
isctest.kasp.check_apex(server, zone, ksks, zsks)
isctest.kasp.check_subdomain(server, zone, ksks, zsks)
isctest.kasp.check_apex(ns3, zone, ksks, zsks)
isctest.kasp.check_subdomain(ns3, zone, ksks, zsks)
# Remove ZSKs from the other providers for zone.
dnskey2 = extkeys[0].dnskey().split()
@ -122,24 +123,26 @@ def test_rollover_multisigner(servers, alg, size):
update_msg = dns.update.UpdateMessage(zone)
update_msg.delete(f"{dnskey[0]}", "DNSKEY", rdata)
update_msg.delete(f"{dnskey2[0]}", "DNSKEY", rdata2)
server.nsupdate(update_msg)
ns3.nsupdate(update_msg)
isctest.kasp.check_dnssec_verify(server, zone)
isctest.kasp.check_dnssec_verify(ns3, zone)
expected = isctest.kasp.policy_to_properties(ttl, key_properties)
keys = ownkeys
ksks = [k for k in ownkeys if k.is_ksk()]
zsks = [k for k in ownkeys if not k.is_ksk()]
isctest.kasp.check_keys(zone, keys, expected)
isctest.kasp.check_apex(server, zone, ksks, zsks)
isctest.kasp.check_subdomain(server, zone, ksks, zsks)
isctest.kasp.check_apex(ns3, zone, ksks, zsks)
isctest.kasp.check_subdomain(ns3, zone, ksks, zsks)
# A zone transitioning from single-signed to multi-signed. We should have
# the old omnipresent keys outside of the desired key range and the new
# keys in the desired key range.
zone = "single-to-multisigner.kasp"
isctest.kasp.check_dnssec_verify(server, zone)
isctest.kasp.wait_keymgr_done(ns3, zone)
isctest.kasp.check_dnssec_verify(ns3, zone)
key_properties = [
f"ksk unlimited {alg} {size} goal:omnipresent dnskey:rumoured krrsig:rumoured ds:hidden tag-range:32768-65535",
@ -148,7 +151,7 @@ def test_rollover_multisigner(servers, alg, size):
f"zsk unlimited {alg} {size} goal:hidden dnskey:omnipresent zrrsig:omnipresent tag-range:0-32767 offset:{offval}",
]
expected = isctest.kasp.policy_to_properties(ttl, key_properties)
keys = isctest.kasp.keydir_to_keylist(zone, server.identifier)
keys = isctest.kasp.keydir_to_keylist(zone, ns3.identifier)
ksks = [k for k in keys if k.is_ksk()]
zsks = [k for k in keys if not k.is_ksk()]
@ -168,6 +171,6 @@ def test_rollover_multisigner(servers, alg, size):
)
isctest.kasp.check_keytimes(keys, expected)
isctest.kasp.check_dnssecstatus(server, zone, keys, policy=policy)
isctest.kasp.check_apex(server, zone, ksks, zsks)
isctest.kasp.check_subdomain(server, zone, ksks, zsks)
isctest.kasp.check_dnssecstatus(ns3, zone, keys, policy=policy)
isctest.kasp.check_apex(ns3, zone, ksks, zsks)
isctest.kasp.check_subdomain(ns3, zone, ksks, zsks)

View file

@ -31,10 +31,12 @@ from common import (
"going-straight-to-none-dynamic.kasp",
],
)
def test_straight2none_initial(zone, servers, alg, size):
def test_straight2none_initial(zone, ns6, alg, size):
config = DEFAULT_CONFIG
policy = "default"
isctest.kasp.wait_keymgr_done(ns6, zone)
step = {
"zone": zone,
"cdss": CDSS,
@ -43,4 +45,4 @@ def test_straight2none_initial(zone, servers, alg, size):
],
"nextev": None,
}
isctest.kasp.check_rollover_step(servers["ns6"], config, policy, step)
isctest.kasp.check_rollover_step(ns6, config, policy, step)

View file

@ -25,9 +25,12 @@ from common import (
@pytest.fixture(scope="module", autouse=True)
def reconfigure_policy(servers, templates):
def reconfigure_policy(ns6, templates):
isctest.kasp.wait_keymgr_done(ns6, "going-straight-to-none.kasp")
isctest.kasp.wait_keymgr_done(ns6, "going-straight-to-none-dynamic.kasp")
templates.render("ns6/named.conf", {"policy": "none"})
servers["ns6"].reconfigure()
ns6.reconfigure()
@pytest.mark.parametrize(
@ -37,7 +40,7 @@ def reconfigure_policy(servers, templates):
"going-straight-to-none-dynamic.kasp",
],
)
def test_straight2none_reconfig(zone, servers, alg, size):
def test_straight2none_reconfig(zone, ns6, alg, size):
config = DEFAULT_CONFIG
policy = None
@ -51,4 +54,4 @@ def test_straight2none_reconfig(zone, servers, alg, size):
],
"nextev": None,
}
isctest.kasp.check_rollover_step(servers["ns6"], config, policy, step)
isctest.kasp.check_rollover_step(ns6, config, policy, step)

View file

@ -54,10 +54,14 @@ OFFSETS["step6-p"] = OFFSETS["step5-p"] - int(CONFIG["purge-keys"].total_seconds
OFFSETS["step6-s"] = OFFSETS["step5-s"] - int(CONFIG["purge-keys"].total_seconds())
def test_zsk_prepub_step1(alg, size, servers):
def test_zsk_prepub_step1(alg, size, ns3):
zone = "step1.zsk-prepub.autosign"
isctest.kasp.wait_keymgr_done(ns3, zone)
step = {
# Introduce the first key. This will immediately be active.
"zone": "step1.zsk-prepub.autosign",
"zone": zone,
"keyprops": [
f"ksk unlimited {alg} {size} goal:omnipresent dnskey:omnipresent krrsig:omnipresent ds:omnipresent offset:{OFFSETS['step1-p']}",
f"zsk {LIFETIME_POLICY} {alg} {size} goal:omnipresent dnskey:omnipresent zrrsig:omnipresent offset:{OFFSETS['step1-p']}",
@ -67,16 +71,20 @@ def test_zsk_prepub_step1(alg, size, servers):
# already passed).
"nextev": ZSK_LIFETIME - IPUB - timedelta(days=7),
}
isctest.kasp.check_rollover_step(servers["ns3"], CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns3, CONFIG, POLICY, step)
def test_zsk_prepub_step2(alg, size, servers):
def test_zsk_prepub_step2(alg, size, ns3):
zone = "step2.zsk-prepub.autosign"
isctest.kasp.wait_keymgr_done(ns3, zone)
step = {
# it is time to pre-publish the successor zsk.
# zsk1 goal: omnipresent -> hidden
# zsk2 goal: hidden -> omnipresent
# zsk2 dnskey: hidden -> rumoured
"zone": "step2.zsk-prepub.autosign",
"zone": zone,
"keyprops": [
f"ksk unlimited {alg} {size} goal:omnipresent dnskey:omnipresent krrsig:omnipresent ds:omnipresent offset:{OFFSETS['step2-p']}",
f"zsk {LIFETIME_POLICY} {alg} {size} goal:hidden dnskey:omnipresent zrrsig:omnipresent offset:{OFFSETS['step2-p']}",
@ -87,17 +95,21 @@ def test_zsk_prepub_step2(alg, size, servers):
# that is the dnskey ttl plus the zone propagation delay
"nextev": IPUB,
}
isctest.kasp.check_rollover_step(servers["ns3"], CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns3, CONFIG, POLICY, step)
def test_zsk_prepub_step3(alg, size, servers):
def test_zsk_prepub_step3(alg, size, ns3):
zone = "step3.zsk-prepub.autosign"
isctest.kasp.wait_keymgr_done(ns3, zone)
step = {
# predecessor zsk is no longer actively signing. successor zsk is
# now actively signing.
# zsk1 zrrsig: omnipresent -> unretentive
# zsk2 dnskey: rumoured -> omnipresent
# zsk2 zrrsig: hidden -> rumoured
"zone": "step3.zsk-prepub.autosign",
"zone": zone,
"keyprops": [
f"ksk unlimited {alg} {size} goal:omnipresent dnskey:omnipresent krrsig:omnipresent ds:omnipresent offset:{OFFSETS['step3-p']}",
f"zsk {LIFETIME_POLICY} {alg} {size} goal:hidden dnskey:omnipresent zrrsig:unretentive offset:{OFFSETS['step3-p']}",
@ -112,17 +124,21 @@ def test_zsk_prepub_step3(alg, size, servers):
# from the predecessor zsk.
"smooth": True,
}
isctest.kasp.check_rollover_step(servers["ns3"], CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns3, CONFIG, POLICY, step)
def test_zsk_prepub_step4(alg, size, servers):
def test_zsk_prepub_step4(alg, size, ns3):
zone = "step4.zsk-prepub.autosign"
isctest.kasp.wait_keymgr_done(ns3, zone)
step = {
# predecessor zsk is no longer needed. all rrsets are signed with
# the successor zsk.
# zsk1 dnskey: omnipresent -> unretentive
# zsk1 zrrsig: unretentive -> hidden
# zsk2 zrrsig: rumoured -> omnipresent
"zone": "step4.zsk-prepub.autosign",
"zone": zone,
"keyprops": [
f"ksk unlimited {alg} {size} goal:omnipresent dnskey:omnipresent krrsig:omnipresent ds:omnipresent offset:{OFFSETS['step4-p']}",
f"zsk {LIFETIME_POLICY} {alg} {size} goal:hidden dnskey:unretentive zrrsig:hidden offset:{OFFSETS['step4-p']}",
@ -133,14 +149,18 @@ def test_zsk_prepub_step4(alg, size, servers):
# this is the dnskey ttl plus zone propagation delay.
"nextev": KEYTTLPROP,
}
isctest.kasp.check_rollover_step(servers["ns3"], CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns3, CONFIG, POLICY, step)
def test_zsk_prepub_step5(alg, size, servers):
def test_zsk_prepub_step5(alg, size, ns3):
zone = "step5.zsk-prepub.autosign"
isctest.kasp.wait_keymgr_done(ns3, zone)
step = {
# predecessor zsk is now removed.
# zsk1 dnskey: unretentive -> hidden
"zone": "step5.zsk-prepub.autosign",
"zone": zone,
"keyprops": [
f"ksk unlimited {alg} {size} goal:omnipresent dnskey:omnipresent krrsig:omnipresent ds:omnipresent offset:{OFFSETS['step5-p']}",
f"zsk {LIFETIME_POLICY} {alg} {size} goal:hidden dnskey:hidden zrrsig:hidden offset:{OFFSETS['step5-p']}",
@ -152,17 +172,21 @@ def test_zsk_prepub_step5(alg, size, servers):
# elapsed.
"nextev": ZSK_LIFETIME - IRET - IPUB - KEYTTLPROP,
}
isctest.kasp.check_rollover_step(servers["ns3"], CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns3, CONFIG, POLICY, step)
def test_zsk_prepub_step6(alg, size, servers):
def test_zsk_prepub_step6(alg, size, ns3):
zone = "step6.zsk-prepub.autosign"
isctest.kasp.wait_keymgr_done(ns3, zone)
step = {
# predecessor zsk is now purged.
"zone": "step6.zsk-prepub.autosign",
"zone": zone,
"keyprops": [
f"ksk unlimited {alg} {size} goal:omnipresent dnskey:omnipresent krrsig:omnipresent ds:omnipresent offset:{OFFSETS['step6-p']}",
f"zsk {LIFETIME_POLICY} {alg} {size} goal:omnipresent dnskey:omnipresent zrrsig:omnipresent offset:{OFFSETS['step6-s']}",
],
"nextev": None,
}
isctest.kasp.check_rollover_step(servers["ns3"], CONFIG, POLICY, step)
isctest.kasp.check_rollover_step(ns3, CONFIG, POLICY, step)

View file

@ -18,8 +18,7 @@ from isctest.kasp import KeyTimingMetadata, Ipub, Iret
from common import pytestmark # pylint: disable=unused-import
def test_rollover_manual(servers):
server = servers["ns3"]
def test_rollover_manual(ns3):
policy = "manual-rollover"
config = {
"dnskey-ttl": timedelta(hours=1),
@ -37,17 +36,16 @@ def test_rollover_manual(servers):
size = os.environ["DEFAULT_BITS"]
zone = "manual-rollover.kasp"
with server.watch_log_from_start() as watcher:
watcher.wait_for_line(f"keymgr: {zone} done")
isctest.kasp.wait_keymgr_done(ns3, zone)
isctest.kasp.check_dnssec_verify(server, zone)
isctest.kasp.check_dnssec_verify(ns3, zone)
key_properties = [
f"ksk unlimited {alg} {size} goal:omnipresent dnskey:omnipresent krrsig:omnipresent ds:omnipresent",
f"zsk unlimited {alg} {size} goal:omnipresent dnskey:omnipresent zrrsig:omnipresent",
]
expected = isctest.kasp.policy_to_properties(ttl, key_properties)
keys = isctest.kasp.keydir_to_keylist(zone, server.identifier)
keys = isctest.kasp.keydir_to_keylist(zone, ns3.identifier)
ksks = [k for k in keys if k.is_ksk()]
zsks = [k for k in keys if not k.is_ksk()]
@ -58,9 +56,9 @@ def test_rollover_manual(servers):
kp.set_expected_keytimes(config, offset=offset)
isctest.kasp.check_keytimes(keys, expected)
isctest.kasp.check_dnssecstatus(server, zone, keys, policy=policy)
isctest.kasp.check_apex(server, zone, ksks, zsks)
isctest.kasp.check_subdomain(server, zone, ksks, zsks)
isctest.kasp.check_dnssecstatus(ns3, zone, keys, policy=policy)
isctest.kasp.check_apex(ns3, zone, ksks, zsks)
isctest.kasp.check_subdomain(ns3, zone, ksks, zsks)
# Schedule KSK rollover in six months.
assert len(ksks) == 1
@ -71,24 +69,24 @@ def test_rollover_manual(servers):
config, zsk=False, ksk=True
)
with server.watch_log_from_here() as watcher:
server.rndc(f"dnssec -rollover -key {ksk.tag} -when {startroll} {zone}")
with ns3.watch_log_from_here() as watcher:
ns3.rndc(f"dnssec -rollover -key {ksk.tag} -when {startroll} {zone}")
watcher.wait_for_line(f"keymgr: {zone} done")
isctest.kasp.check_dnssec_verify(server, zone)
isctest.kasp.check_dnssec_verify(ns3, zone)
isctest.kasp.check_keys(zone, keys, expected)
isctest.kasp.check_keytimes(keys, expected)
isctest.kasp.check_dnssecstatus(server, zone, keys, policy=policy)
isctest.kasp.check_apex(server, zone, ksks, zsks)
isctest.kasp.check_subdomain(server, zone, ksks, zsks)
isctest.kasp.check_dnssecstatus(ns3, zone, keys, policy=policy)
isctest.kasp.check_apex(ns3, zone, ksks, zsks)
isctest.kasp.check_subdomain(ns3, zone, ksks, zsks)
# Schedule KSK rollover now.
now = KeyTimingMetadata.now()
with server.watch_log_from_here() as watcher:
server.rndc(f"dnssec -rollover -key {ksk.tag} -when {now} {zone}")
with ns3.watch_log_from_here() as watcher:
ns3.rndc(f"dnssec -rollover -key {ksk.tag} -when {now} {zone}")
watcher.wait_for_line(f"keymgr: {zone} done")
isctest.kasp.check_dnssec_verify(server, zone)
isctest.kasp.check_dnssec_verify(ns3, zone)
key_properties = [
f"ksk unlimited {alg} {size} goal:hidden dnskey:omnipresent krrsig:omnipresent ds:omnipresent",
@ -96,7 +94,7 @@ def test_rollover_manual(servers):
f"zsk unlimited {alg} {size} goal:omnipresent dnskey:omnipresent zrrsig:omnipresent",
]
expected = isctest.kasp.policy_to_properties(ttl, key_properties)
keys = isctest.kasp.keydir_to_keylist(zone, server.identifier)
keys = isctest.kasp.keydir_to_keylist(zone, ns3.identifier)
ksks = [k for k in keys if k.is_ksk()]
zsks = [k for k in keys if not k.is_ksk()]
@ -118,19 +116,19 @@ def test_rollover_manual(servers):
)
isctest.kasp.check_keytimes(keys, expected)
isctest.kasp.check_dnssecstatus(server, zone, keys, policy=policy)
isctest.kasp.check_apex(server, zone, ksks, zsks)
isctest.kasp.check_subdomain(server, zone, ksks, zsks)
isctest.kasp.check_dnssecstatus(ns3, zone, keys, policy=policy)
isctest.kasp.check_apex(ns3, zone, ksks, zsks)
isctest.kasp.check_subdomain(ns3, zone, ksks, zsks)
# Schedule ZSK rollover now.
assert len(zsks) == 1
zsk = zsks[0]
now = KeyTimingMetadata.now()
with server.watch_log_from_here() as watcher:
server.rndc(f"dnssec -rollover -key {zsk.tag} -when {now} {zone}")
with ns3.watch_log_from_here() as watcher:
ns3.rndc(f"dnssec -rollover -key {zsk.tag} -when {now} {zone}")
watcher.wait_for_line(f"keymgr: {zone} done")
isctest.kasp.check_dnssec_verify(server, zone)
isctest.kasp.check_dnssec_verify(ns3, zone)
key_properties = [
f"ksk unlimited {alg} {size} goal:hidden dnskey:omnipresent krrsig:omnipresent ds:omnipresent",
@ -139,7 +137,7 @@ def test_rollover_manual(servers):
f"zsk unlimited {alg} {size} goal:omnipresent dnskey:rumoured zrrsig:hidden",
]
expected = isctest.kasp.policy_to_properties(ttl, key_properties)
keys = isctest.kasp.keydir_to_keylist(zone, server.identifier)
keys = isctest.kasp.keydir_to_keylist(zone, ns3.identifier)
ksks = [k for k in keys if k.is_ksk()]
zsks = [k for k in keys if not k.is_ksk()]
@ -153,5 +151,5 @@ def test_rollover_manual(servers):
# Try to schedule a ZSK rollover for an inactive key (should fail).
zsk = expected[3].key
response = server.rndc(f"dnssec -rollover -key {zsk.tag} {zone}")
response = ns3.rndc(f"dnssec -rollover -key {zsk.tag} {zone}")
assert "key is not actively signing" in response

View file

@ -13,7 +13,7 @@ import pytest
@pytest.mark.requires_zones_loaded("ns1")
def test_spf_log(servers):
def test_spf_log(ns1):
for msg in (
"zone spf/IN: 'y.spf' found type SPF record but no SPF TXT record found",
"zone warn/IN: 'y.warn' found type SPF record but no SPF TXT record found",
@ -21,7 +21,7 @@ def test_spf_log(servers):
"zone warn/IN: loaded serial 0",
"zone nowarn/IN: loaded serial 0",
):
servers["ns1"].log.expect(msg)
ns1.log.expect(msg)
for msg in (
"zone nowarn/IN: 'y.nowarn' found type SPF record but no SPF TXT record found",
@ -29,4 +29,4 @@ def test_spf_log(servers):
"zone warn/IN: 'warn' found type SPF record but no SPF TXT record found",
"zone nowarn/IN: 'nowarn' found type SPF record but no SPF TXT record found",
):
servers["ns1"].log.prohibit(msg)
ns1.log.prohibit(msg)

View file

@ -26,7 +26,7 @@ pytestmark = pytest.mark.extra_artifacts(
)
def test_stub_zones_availability(servers):
def test_stub_zones_availability(ns3):
# check that the stub zone has been saved to disk
assert os.path.exists("ns3/child.example.st")
@ -65,8 +65,8 @@ def test_stub_zones_availability(servers):
stub_zone_lookout_without_recursion()
stub_zone_lookout_with_recursion()
servers["ns3"].stop()
servers["ns3"].start(["--noclean", "--restart", "--port", os.environ["PORT"]])
ns3.stop()
ns3.start(["--noclean", "--restart", "--port", os.environ["PORT"]])
axfr_denied()
stub_zone_lookout_without_recursion()

View file

@ -101,11 +101,10 @@ def test_tsig_fuzz_rdata(
error,
mangle_orig_id,
other,
servers,
ns1,
named_port,
):
alg, mac = alg_and_mac
ns1 = servers["ns1"]
msg = dns.message.make_query("example.com.", "AXFR")
msg.keyring = False # don't validate received TSIG

View file

@ -33,14 +33,14 @@ pytestmark = pytest.mark.extra_artifacts(
)
def test_xferquota(named_port, servers):
def test_xferquota(named_port, ns1, ns2):
# Changing test zone ensuring that the time stamp changes
time.sleep(1)
shutil.copyfile("ns1/changing2.db", "ns1/changing.db")
with open("ns1/named.pid", "r", encoding="utf-8") as pidfile:
pid = int(pidfile.read())
os.kill(pid, signal.SIGHUP)
with servers["ns1"].watch_log_from_start() as watcher:
with ns1.watch_log_from_start() as watcher:
watcher.wait_for_line("received SIGHUP signal to reload zones")
def check_line_count():
@ -75,6 +75,6 @@ def test_xferquota(named_port, servers):
f"transfer of 'changing/IN' from 10.53.0.1#{named_port}: "
f"Transfer completed: .*\\(serial 2\\)"
)
with servers["ns2"].watch_log_from_start(timeout=30) as watcher:
with ns2.watch_log_from_start(timeout=30) as watcher:
watcher.wait_for_line(pattern)
query_and_compare(a_msg)