From ee8e9f1dedd2ba2bc6ef17fc9a1a687305806f51 Mon Sep 17 00:00:00 2001 From: Matthijs Mekking Date: Thu, 13 Mar 2025 15:46:39 +0100 Subject: [PATCH 1/7] Move test code that can be reused to isctest This is the first step of converting the kasp system test to pytest. Well, perhaps not the first, because earlier the ksr system test was already converted to pytest and then the `isctest/kasp.py` library was already introduced. Lots of this code can be reused for the kasp pytest code. First of all, 'check_file_contents_equal' is moved out of the ksr test and into the 'check' library. This feels the most appropriate place for this function to be reused in other tests. Then, 'keystr_to_keylist' is moved to the 'kasp' library. Introduce two new methods that are unused in this point of time, but we are going to need them for the kasp system test. 'zone_contains' will be used to check if a signature exists in the zonefile. This way we can tell whether the signature has been reused or refreshed. 'file_contents_contain' will be used to check if the comment and public DNSKEY record in the keyfile is correct. --- bin/tests/system/isctest/check.py | 22 ++++++++ bin/tests/system/isctest/kasp.py | 6 ++- bin/tests/system/isctest/util.py | 42 +++++++++++++++ bin/tests/system/ksr/tests_ksr.py | 88 ++++++++++++------------------- 4 files changed, 102 insertions(+), 56 deletions(-) create mode 100644 bin/tests/system/isctest/util.py diff --git a/bin/tests/system/isctest/check.py b/bin/tests/system/isctest/check.py index 0906ad92ff..b35dfe848e 100644 --- a/bin/tests/system/isctest/check.py +++ b/bin/tests/system/isctest/check.py @@ -9,6 +9,7 @@ # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. +import difflib import shutil from typing import Optional @@ -128,3 +129,24 @@ def is_response_to(response: dns.message.Message, query: dns.message.Message) -> single_question(response) single_question(query) assert query.is_response(response), str(response) + + +def file_contents_equal(file1, file2): + def normalize_line(line): + # remove trailing&leading whitespace and replace multiple whitespaces + return " ".join(line.split()) + + def read_lines(file_path): + with open(file_path, "r", encoding="utf-8") as file: + return [normalize_line(line) for line in file.readlines()] + + lines1 = read_lines(file1) + lines2 = read_lines(file2) + + differ = difflib.Differ() + diff = differ.compare(lines1, lines2) + + for line in diff: + assert not line.startswith("+ ") and not line.startswith( + "- " + ), f'file contents of "{file1}" and "{file2}" differ' diff --git a/bin/tests/system/isctest/kasp.py b/bin/tests/system/isctest/kasp.py index 1fbb489319..84102a0198 100644 --- a/bin/tests/system/isctest/kasp.py +++ b/bin/tests/system/isctest/kasp.py @@ -15,7 +15,7 @@ from pathlib import Path import re import subprocess import time -from typing import Optional, Union +from typing import List, Optional, Union from datetime import datetime, timedelta, timezone @@ -577,3 +577,7 @@ def check_subdomain(server, zone, ksks, zsks): assert len(rrsigs) > 0 check_signatures(rrsigs, qtype, fqdn, ksks, zsks) + + +def keystr_to_keylist(keystr: str, keydir: Optional[str] = None) -> List[Key]: + return [Key(name, keydir) for name in keystr.split()] diff --git a/bin/tests/system/isctest/util.py b/bin/tests/system/isctest/util.py new file mode 100644 index 0000000000..904e088ff6 --- /dev/null +++ b/bin/tests/system/isctest/util.py @@ -0,0 +1,42 @@ +# Copyright (C) Internet Systems Consortium, Inc. ("ISC") +# +# SPDX-License-Identifier: MPL-2.0 +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, you can obtain one at https://mozilla.org/MPL/2.0/. +# +# See the COPYRIGHT file distributed with this work for additional +# information regarding copyright ownership. + +import dns.zone + + +def zone_contains( + zone: dns.zone.Zone, rrset: dns.rrset.RRset, compare_ttl=False +) -> bool: + """Check if a zone contains RRset""" + + def compare_rrs(rr1, rrset): + rr2 = next((other_rr for other_rr in rrset if rr1 == other_rr), None) + if rr2 is None: + return False + if compare_ttl: + return rr1.ttl == rr2.ttl + return True + + for _, node in zone.nodes.items(): + for rdataset in node: + for rr in rdataset: + if compare_rrs(rr, rrset): + return True + + return False + + +def file_contents_contain(file, substr): + with open(file, "r", encoding="utf-8") as fp: + for line in fp: + if f"{substr}" in line: + return True + return False diff --git a/bin/tests/system/ksr/tests_ksr.py b/bin/tests/system/ksr/tests_ksr.py index 78724d137b..5512f34fa2 100644 --- a/bin/tests/system/ksr/tests_ksr.py +++ b/bin/tests/system/ksr/tests_ksr.py @@ -10,19 +10,14 @@ # information regarding copyright ownership. from datetime import timedelta -import difflib import os import shutil import time -from typing import List, Optional import pytest import isctest -from isctest.kasp import ( - Key, - KeyTimingMetadata, -) +from isctest.kasp import KeyTimingMetadata pytestmark = pytest.mark.extra_artifacts( [ @@ -89,31 +84,6 @@ def between(value, start, end): return start < value < end -def check_file_contents_equal(file1, file2): - def normalize_line(line): - # remove trailing&leading whitespace and replace multiple whitespaces - return " ".join(line.split()) - - def read_lines(file_path): - with open(file_path, "r", encoding="utf-8") as file: - return [normalize_line(line) for line in file.readlines()] - - lines1 = read_lines(file1) - lines2 = read_lines(file2) - - differ = difflib.Differ() - diff = differ.compare(lines1, lines2) - - for line in diff: - assert not line.startswith("+ ") and not line.startswith( - "- " - ), f'file contents of "{file1}" and "{file2}" differ' - - -def keystr_to_keylist(keystr: str, keydir: Optional[str] = None) -> List[Key]: - return [Key(name, keydir) for name in keystr.split()] - - def ksr(zone, policy, action, options="", raise_on_exception=True): ksr_command = [ os.environ.get("KSR"), @@ -515,14 +485,14 @@ def test_ksr_common(servers): # create ksk kskdir = "ns1/offline" out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i now -e +1y -o") - ksks = keystr_to_keylist(out, kskdir) + ksks = isctest.kasp.keystr_to_keylist(out, kskdir) assert len(ksks) == 1 check_keys(ksks, None) # check that 'dnssec-ksr keygen' pregenerates right amount of keys out, _ = ksr(zone, policy, "keygen", options="-i now -e +1y") - zsks = keystr_to_keylist(out) + zsks = isctest.kasp.keystr_to_keylist(out) assert len(zsks) == 2 lifetime = timedelta(days=31 * 6) @@ -532,7 +502,7 @@ def test_ksr_common(servers): # in the given key directory zskdir = "ns1" out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +1y") - zsks = keystr_to_keylist(out, zskdir) + zsks = isctest.kasp.keystr_to_keylist(out, zskdir) assert len(zsks) == 2 lifetime = timedelta(days=31 * 6) @@ -575,18 +545,22 @@ def test_ksr_common(servers): # check that 'dnssec-ksr keygen' selects pregenerated keys for # the same time bundle out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {now} -e +1y") - selected_zsks = keystr_to_keylist(out, zskdir) + selected_zsks = isctest.kasp.keystr_to_keylist(out, zskdir) assert len(selected_zsks) == 2 for index, key in enumerate(selected_zsks): assert zsks[index] == key - check_file_contents_equal(f"{key.path}.private", f"{key.path}.private.backup") - check_file_contents_equal(f"{key.path}.key", f"{key.path}.key.backup") - check_file_contents_equal(f"{key.path}.state", f"{key.path}.state.backup") + isctest.check.file_contents_equal( + f"{key.path}.private", f"{key.path}.private.backup" + ) + isctest.check.file_contents_equal(f"{key.path}.key", f"{key.path}.key.backup") + isctest.check.file_contents_equal( + f"{key.path}.state", f"{key.path}.state.backup" + ) # check that 'dnssec-ksr keygen' generates only necessary keys for # overlapping time bundle out, err = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {now} -e +2y -v 1") - overlapping_zsks = keystr_to_keylist(out, zskdir) + overlapping_zsks = isctest.kasp.keystr_to_keylist(out, zskdir) assert len(overlapping_zsks) == 4 verbose = err.split() @@ -606,15 +580,19 @@ def test_ksr_common(servers): for index, key in enumerate(overlapping_zsks): if index < 2: assert zsks[index] == key - check_file_contents_equal( + isctest.check.file_contents_equal( f"{key.path}.private", f"{key.path}.private.backup" ) - check_file_contents_equal(f"{key.path}.key", f"{key.path}.key.backup") - check_file_contents_equal(f"{key.path}.state", f"{key.path}.state.backup") + isctest.check.file_contents_equal( + f"{key.path}.key", f"{key.path}.key.backup" + ) + isctest.check.file_contents_equal( + f"{key.path}.state", f"{key.path}.state.backup" + ) # run 'dnssec-ksr keygen' again with verbosity 0 out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {now} -e +2y") - overlapping_zsks2 = keystr_to_keylist(out, zskdir) + overlapping_zsks2 = isctest.kasp.keystr_to_keylist(out, zskdir) assert len(overlapping_zsks2) == 4 check_keys(overlapping_zsks2, lifetime) for index, key in enumerate(overlapping_zsks2): @@ -709,7 +687,7 @@ def test_ksr_lastbundle(servers): kskdir = "ns1/offline" offset = -timedelta(days=365) out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i -1y -e +1d -o") - ksks = keystr_to_keylist(out, kskdir) + ksks = isctest.kasp.keystr_to_keylist(out, kskdir) assert len(ksks) == 1 check_keys(ksks, None, offset=offset) @@ -717,7 +695,7 @@ def test_ksr_lastbundle(servers): # check that 'dnssec-ksr keygen' pregenerates right amount of keys zskdir = "ns1" out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i -1y -e +1d") - zsks = keystr_to_keylist(out, zskdir) + zsks = isctest.kasp.keystr_to_keylist(out, zskdir) assert len(zsks) == 2 lifetime = timedelta(days=31 * 6) @@ -788,7 +766,7 @@ def test_ksr_inthemiddle(servers): kskdir = "ns1/offline" offset = -timedelta(days=365) out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i -1y -e +1y -o") - ksks = keystr_to_keylist(out, kskdir) + ksks = isctest.kasp.keystr_to_keylist(out, kskdir) assert len(ksks) == 1 check_keys(ksks, None, offset=offset) @@ -796,7 +774,7 @@ def test_ksr_inthemiddle(servers): # check that 'dnssec-ksr keygen' pregenerates right amount of keys zskdir = "ns1" out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i -1y -e +1y") - zsks = keystr_to_keylist(out, zskdir) + zsks = isctest.kasp.keystr_to_keylist(out, zskdir) assert len(zsks) == 4 lifetime = timedelta(days=31 * 6) @@ -868,13 +846,13 @@ def check_ksr_rekey_logs_error(server, zone, policy, offset, end): then = now + offset until = now + end out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i {then} -e {until} -o") - ksks = keystr_to_keylist(out, kskdir) + ksks = isctest.kasp.keystr_to_keylist(out, kskdir) assert len(ksks) == 1 # key generation zskdir = "ns1" out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {then} -e {until}") - zsks = keystr_to_keylist(out, zskdir) + zsks = isctest.kasp.keystr_to_keylist(out, zskdir) assert len(zsks) == 2 # create request @@ -941,7 +919,7 @@ def test_ksr_unlimited(servers): # create ksk kskdir = "ns1/offline" out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i now -e +2y -o") - ksks = keystr_to_keylist(out, kskdir) + ksks = isctest.kasp.keystr_to_keylist(out, kskdir) assert len(ksks) == 1 check_keys(ksks, None) @@ -949,7 +927,7 @@ def test_ksr_unlimited(servers): # check that 'dnssec-ksr keygen' pregenerates right amount of keys zskdir = "ns1" out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +2y") - zsks = keystr_to_keylist(out, zskdir) + zsks = isctest.kasp.keystr_to_keylist(out, zskdir) assert len(zsks) == 1 lifetime = None @@ -1058,7 +1036,7 @@ def test_ksr_twotone(servers): # create ksk kskdir = "ns1/offline" out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i now -e +1y -o") - ksks = keystr_to_keylist(out, kskdir) + ksks = isctest.kasp.keystr_to_keylist(out, kskdir) assert len(ksks) == 2 ksks_defalg = [] @@ -1082,7 +1060,7 @@ def test_ksr_twotone(servers): # check that 'dnssec-ksr keygen' pregenerates right amount of keys zskdir = "ns1" out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +1y") - zsks = keystr_to_keylist(out, zskdir) + zsks = isctest.kasp.keystr_to_keylist(out, zskdir) # First algorithm keys have a lifetime of 3 months, so there should # be 4 created keys. Second algorithm keys have a lifetime of 5 # months, so there should be 3 created keys. While only two time @@ -1176,7 +1154,7 @@ def test_ksr_kskroll(servers): # create ksk kskdir = "ns1/offline" out, _ = ksr(zone, policy, "keygen", options=f"-K {kskdir} -i now -e +1y -o") - ksks = keystr_to_keylist(out, kskdir) + ksks = isctest.kasp.keystr_to_keylist(out, kskdir) assert len(ksks) == 2 lifetime = timedelta(days=31 * 6) @@ -1185,7 +1163,7 @@ def test_ksr_kskroll(servers): # check that 'dnssec-ksr keygen' pregenerates right amount of keys zskdir = "ns1" out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +1y") - zsks = keystr_to_keylist(out, zskdir) + zsks = isctest.kasp.keystr_to_keylist(out, zskdir) assert len(zsks) == 1 check_keys(zsks, None) From 0b9fbca18e1bcc0e5b613fc2c49908a7550a976d Mon Sep 17 00:00:00 2001 From: Matthijs Mekking Date: Fri, 14 Mar 2025 10:23:37 +0100 Subject: [PATCH 2/7] Introduce class KeyProperties In isctest.kasp, introduce a new class 'KeyProperties' that can be used to check if a Key matches expected properties. Properties are for the time being divided in three parts: 'properties' that contain some attributes of the expected properties (such as are we dealing with a legacy key, is the private key available, and other things that do not fit the metadata exactly), 'metadata' that contains expected metadata (such as 'Algorithm', 'Lifetime', 'Length'), and 'timing', which is metadata of the class KeyTimingMetadata. The 'default()' method fills in the expected properties for the default DNSSEC policy. The 'set_expected_times()' sets the expected timing metadata, derived from when the key was created. This method can take an offset to push the expected timing metadata a duration in the future or back into the past. If 'pregenerated=True', derive the expected timing metadata from the 'Publish' metadata derived from the keyfile, rather than from the 'Created' metadata. The calculations in the 'Ipub', 'IpubC' and 'Iret' methods are derived from RFC 7583 DNSSEC Key Rollover Timing Considerations. --- bin/tests/system/isctest/kasp.py | 203 ++++++++++++++++++++++++++++++- 1 file changed, 202 insertions(+), 1 deletion(-) diff --git a/bin/tests/system/isctest/kasp.py b/bin/tests/system/isctest/kasp.py index 84102a0198..6ff95be99d 100644 --- a/bin/tests/system/isctest/kasp.py +++ b/bin/tests/system/isctest/kasp.py @@ -15,7 +15,7 @@ from pathlib import Path import re import subprocess import time -from typing import List, Optional, Union +from typing import Dict, List, Optional, Union from datetime import datetime, timedelta, timezone @@ -100,6 +100,165 @@ class KeyTimingMetadata: return result +class KeyProperties: + """ + Represent the (expected) properties a key should have. + """ + + def __init__( + self, + name: str, + properties: dict, + metadata: dict, + timing: Dict[str, KeyTimingMetadata], + ): + self.name = name + self.key = None + self.properties = properties + self.metadata = metadata + self.timing = timing + + def __repr__(self): + return self.name + + def __str__(self) -> str: + return self.name + + @staticmethod + def default(with_state=True) -> "KeyProperties": + properties = { + "expect": True, + "private": True, + "legacy": False, + "role": "csk", + "role_full": "key-signing", + "dnskey_ttl": 3600, + "flags": 257, + } + metadata = { + "Algorithm": isctest.vars.algorithms.ECDSAP256SHA256.number, + "Length": 256, + "Lifetime": 0, + "KSK": "yes", + "ZSK": "yes", + } + timing: Dict[str, KeyTimingMetadata] = {} + + result = KeyProperties( + name="DEFAULT", properties=properties, metadata=metadata, timing=timing + ) + result.name = "DEFAULT" + result.key = None + if with_state: + result.metadata["GoalState"] = "omnipresent" + result.metadata["DNSKEYState"] = "rumoured" + result.metadata["KRRSIGState"] = "rumoured" + result.metadata["ZRRSIGState"] = "rumoured" + result.metadata["DSState"] = "hidden" + + return result + + def Ipub(self, config): + ipub = timedelta(0) + + if self.key.get_metadata("Predecessor", must_exist=False) != "undefined": + # Ipub = Dprp + TTLkey + ipub = ( + config["dnskey-ttl"] + + config["zone-propagation-delay"] + + config["publish-safety"] + ) + + self.timing["Active"] = self.timing["Published"] + ipub + + def IpubC(self, config): + if not self.key.is_ksk(): + return + + ttl1 = config["dnskey-ttl"] + config["publish-safety"] + ttl2 = timedelta(0) + + if self.key.get_metadata("Predecessor", must_exist=False) == "undefined": + # If this is the first key, we also need to wait until the zone + # signatures are omnipresent. Use max-zone-ttl instead of + # dnskey-ttl, and no publish-safety (because we are looking at + # signatures here, not the public key). + ttl2 = config["max-zone-ttl"] + + # IpubC = DprpC + TTLkey + ipubc = config["zone-propagation-delay"] + max(ttl1, ttl2) + + self.timing["PublishCDS"] = self.timing["Published"] + ipubc + + if self.metadata["Lifetime"] != 0: + self.timing["DeleteCDS"] = ( + self.timing["PublishCDS"] + self.metadata["Lifetime"] + ) + + def Iret(self, config): + if self.metadata["Lifetime"] == 0: + return + + sign_delay = config["signatures-validity"] - config["signatures-refresh"] + safety_interval = config["retire-safety"] + + iretKSK = timedelta(0) + iretZSK = timedelta(0) + if self.key.is_ksk(): + # Iret = DprpP + TTLds + iretKSK = ( + config["parent-propagation-delay"] + config["ds-ttl"] + safety_interval + ) + if self.key.is_zsk(): + # Iret = Dsgn + Dprp + TTLsig + iretZSK = ( + sign_delay + + config["zone-propagation-delay"] + + config["max-zone-ttl"] + + safety_interval + ) + + self.timing["Removed"] = self.timing["Retired"] + max(iretKSK, iretZSK) + + def set_expected_keytimes(self, config, offset=None, pregenerated=False): + if self.key is None: + raise ValueError("KeyProperties must be attached to a Key") + + if self.properties["legacy"]: + return + + if offset is None: + offset = self.properties["offset"] + + self.timing["Generated"] = self.key.get_timing("Created") + + self.timing["Published"] = self.timing["Generated"] + if pregenerated: + self.timing["Published"] = self.key.get_timing("Publish") + self.timing["Published"] = self.timing["Published"] + offset + self.Ipub(config) + + # Set Retired timing metadata if key has lifetime. + if self.metadata["Lifetime"] != 0: + self.timing["Retired"] = self.timing["Active"] + self.metadata["Lifetime"] + + self.IpubC(config) + self.Iret(config) + + # Key state change times must exist, but since we cannot reliably tell + # when named made the actual state change, we don't care what the + # value is. Set it to None will verify that the metadata exists, but + # without actual checking the value. + self.timing["DNSKEYChange"] = None + + if self.key.is_ksk(): + self.timing["DSChange"] = None + self.timing["KRRSIGChange"] = None + + if self.key.is_zsk(): + self.timing["ZRRSIGChange"] = None + + @total_ordering class Key: """ @@ -579,5 +738,47 @@ def check_subdomain(server, zone, ksks, zsks): check_signatures(rrsigs, qtype, fqdn, ksks, zsks) +def keydir_to_keylist( + zone: Optional[str], keydir: Optional[str] = None, in_use: bool = False +) -> List[Key]: + """ + Retrieve all keys from the key files in a directory. If 'zone' is None, + retrieve all keys in the directory, otherwise only those matching the + zone name. If 'keydir' is None, search the current directory. + """ + if zone is None: + zone = "" + + all_keys = [] + if keydir is None: + regex = rf"(K{zone}\.\+.*\+.*)\.key" + for filename in glob.glob(f"K{zone}.+*+*.key"): + match = re.match(regex, filename) + if match is not None: + all_keys.append(Key(match.group(1))) + else: + regex = rf"{keydir}/(K{zone}\.\+.*\+.*)\.key" + for filename in glob.glob(f"{keydir}/K{zone}.+*+*.key"): + match = re.match(regex, filename) + if match is not None: + all_keys.append(Key(match.group(1), keydir)) + + states = ["GoalState", "DNSKEYState", "KRRSIGState", "ZRRSIGState", "DSState"] + + def used(kk): + if not in_use: + return True + + for state in states: + val = kk.get_metadata(state, must_exist=False) + if val not in ["undefined", "hidden"]: + isctest.log.debug(f"key {kk} in use") + return True + + return False + + return [k for k in all_keys if used(k)] + + def keystr_to_keylist(keystr: str, keydir: Optional[str] = None) -> List[Key]: return [Key(name, keydir) for name in keystr.split()] From 97f6b7ad11c3c1d6c1ceeaada5776759649afa49 Mon Sep 17 00:00:00 2001 From: Matthijs Mekking Date: Fri, 14 Mar 2025 10:38:43 +0100 Subject: [PATCH 3/7] Update class Key Because we want to check the metadata in all three files, a new value in the Key class is added: 'privatefile'. The 'get_metadata' function is adapted so that we can also check metadata in other files. Introduce methods to easily retrieve the TTL and public DNSKEY record from the keyfile. When checking if the CDS is equal to the expected value, use the DNSKEY TTL instead of hardcoded 3600. --- bin/tests/system/isctest/kasp.py | 35 ++++++++++++++++++++++++++------ 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/bin/tests/system/isctest/kasp.py b/bin/tests/system/isctest/kasp.py index 6ff95be99d..fe31c991c8 100644 --- a/bin/tests/system/isctest/kasp.py +++ b/bin/tests/system/isctest/kasp.py @@ -276,6 +276,7 @@ class Key: else: self.keydir = Path(keydir) self.path = str(self.keydir / name) + self.privatefile = f"{self.path}.private" self.keyfile = f"{self.path}.key" self.statefile = f"{self.path}.state" self.tag = int(self.name[-5:]) @@ -298,21 +299,43 @@ class Key: ) return None - def get_metadata(self, metadata: str, must_exist=True) -> str: + def get_metadata( + self, metadata: str, file=None, comment=False, must_exist=True + ) -> str: + if file is None: + file = self.statefile value = "undefined" - regex = rf"{metadata}:\s+(.*)" - with open(self.statefile, "r", encoding="utf-8") as file: - for line in file: + regex = rf"{metadata}:\s+(\S+).*" + if comment: + # The expected metadata is prefixed with a ';'. + regex = rf";\s+{metadata}:\s+(\S+).*" + with open(file, "r", encoding="utf-8") as fp: + for line in fp: match = re.match(regex, line) if match is not None: value = match.group(1) break if must_exist and value == "undefined": raise ValueError( - 'state metadata "{metadata}" for key "{self.name}" undefined' + f'metadata "{metadata}" for key "{self.name}" in file "{file}" undefined' ) return value + def ttl(self) -> int: + with open(self.keyfile, "r", encoding="utf-8") as file: + for line in file: + if line.startswith(";"): + continue + return int(line.split()[1]) + return 0 + + def dnskey(self): + with open(self.keyfile, "r", encoding="utf-8") as file: + for line in file: + if "DNSKEY" in line: + return line.strip() + return "undefined" + def is_ksk(self) -> bool: return self.get_metadata("KSK") == "yes" @@ -346,7 +369,7 @@ class Key: dsfromkey_command = [ os.environ.get("DSFROMKEY"), "-T", - "3600", + str(self.ttl()), "-a", alg, "-C", From 44ff63a50d660fd5e836b9bb0365d1ae6bdbf60a Mon Sep 17 00:00:00 2001 From: Matthijs Mekking Date: Fri, 14 Mar 2025 10:51:36 +0100 Subject: [PATCH 4/7] Introduce pytest verify_keys and check_keytimes This commit introduces replacements for the 'check_keys' and 'check_keytimes' from the shell test library. 'check_keys' is renamed to 'verify_keys' because it does not assert. For that, we introduce more functions for the class Key. The 'match_properties' function is used in 'verify_keys' to see if a set of KeyProperties match the Key. This speficially ignores timing metadata. The function resembles what is in 'kasp.sh:check_key()'. The 'match_timingmetadata' function is used in 'check_keytimes' to see if the timing metadata of a set of KeyProperties match the Key. The values are checked in all three key files (except if the private key is not available (set with properties["private"]), or if it is a legacy key (set with properties["legacy"]). An additional check function is added, to check if the key relationships are set correctly. It follows a similar pattern as 'check_keytimes'. If "Predecessor" and/or "Successor" are expected to be set in the state file, this function checks so, and also verifies that they are not set if they should not be. --- bin/tests/system/isctest/kasp.py | 249 +++++++++++++++++++++++++++++++ 1 file changed, 249 insertions(+) diff --git a/bin/tests/system/isctest/kasp.py b/bin/tests/system/isctest/kasp.py index fe31c991c8..1cab813896 100644 --- a/bin/tests/system/isctest/kasp.py +++ b/bin/tests/system/isctest/kasp.py @@ -398,6 +398,149 @@ class Key: return digest_fromfile == digest_fromwire + def is_metadata_consistent(self, key, metadata, checkval=True): + """ + If 'key' exists in 'metadata' then it must also exist in the state + meta data. Otherwise, it must not exist in the state meta data. + If 'checkval' is True, the meta data values must also match. + """ + if key in metadata: + if checkval: + value = self.get_metadata(key) + if value != f"{metadata[key]}": + isctest.log.debug( + f"{self.name} {key} METADATA MISMATCH: {value} - {metadata[key]}" + ) + return value == f"{metadata[key]}" + + return self.get_metadata(key) != "undefined" + + value = self.get_metadata(key, must_exist=False) + if value != "undefined": + isctest.log.debug(f"{self.name} {key} METADATA UNEXPECTED: {value}") + return value == "undefined" + + def is_timing_consistent(self, key, timing, file, comment=False): + """ + If 'key' exists in 'timing' then it must match the value in the state + timing data. Otherwise, it must also not exist in the state timing data. + """ + if key in timing: + value = self.get_metadata(key, file=file, comment=comment) + if value != str(timing[key]): + isctest.log.debug( + f"{self.name} {key} TIMING MISMATCH: {value} - {timing[key]}" + ) + return value == str(timing[key]) + + value = self.get_metadata(key, file=file, comment=comment, must_exist=False) + if value != "undefined": + isctest.log.debug(f"{self.name} {key} TIMING UNEXPECTED: {value}") + return value == "undefined" + + def match_properties(self, zone, properties): + """ + Check the key with given properties. + """ + if not properties.properties["expect"]: + return False + + # Check file existence. + # Noop. If file is missing then the get_metadata calls will fail. + + # Check the public key file. + role = properties.properties["role_full"] + comment = f"This is a {role} key, keyid {self.tag}, for {zone}." + if not isctest.util.file_contents_contain(self.keyfile, comment): + isctest.log.debug(f"{self.name} COMMENT MISMATCH: expected '{comment}'") + return False + + ttl = properties.properties["dnskey_ttl"] + flags = properties.properties["flags"] + alg = properties.metadata["Algorithm"] + dnskey = f"{zone}. {ttl} IN DNSKEY {flags} 3 {alg}" + if not isctest.util.file_contents_contain(self.keyfile, dnskey): + isctest.log.debug(f"{self.name} DNSKEY MISMATCH: expected '{dnskey}'") + return False + + # Now check the private key file. + if properties.properties["private"]: + # Retrieve creation date. + created = self.get_metadata("Generated") + + pval = self.get_metadata("Created", file=self.privatefile) + if pval != created: + isctest.log.debug( + f"{self.name} Created METADATA MISMATCH: {pval} - {created}" + ) + return False + pval = self.get_metadata("Private-key-format", file=self.privatefile) + if pval != "v1.3": + isctest.log.debug( + f"{self.name} Private-key-format METADATA MISMATCH: {pval} - v1.3" + ) + return False + pval = self.get_metadata("Algorithm", file=self.privatefile) + if pval != f"{alg}": + isctest.log.debug( + f"{self.name} Algorithm METADATA MISMATCH: {pval} - {alg}" + ) + return False + + # Now check the key state file. + if properties.properties["legacy"]: + return True + + comment = f"This is the state of key {self.tag}, for {zone}." + if not isctest.util.file_contents_contain(self.statefile, comment): + isctest.log.debug(f"{self.name} COMMENT MISMATCH: expected '{comment}'") + return False + + attributes = [ + "Lifetime", + "Algorithm", + "Length", + "KSK", + "ZSK", + "GoalState", + "DNSKEYState", + "KRRSIGState", + "ZRRSIGState", + "DSState", + ] + for key in attributes: + if not self.is_metadata_consistent(key, properties.metadata): + return False + + # A match is found. + return True + + def match_timingmetadata(self, timings, file=None, comment=False): + if file is None: + file = self.statefile + + attributes = [ + "Generated", + "Created", + "Published", + "Publish", + "PublishCDS", + "SyncPublish", + "Active", + "Activate", + "Retired", + "Inactive", + "Revoked", + "Removed", + "Delete", + ] + for key in attributes: + if not self.is_timing_consistent(key, timings, file, comment=comment): + isctest.log.debug(f"{self.name} TIMING METADATA MISMATCH: {key}") + return False + + return True + def __lt__(self, other: "Key"): return self.name < other.name @@ -459,6 +602,112 @@ def check_zone_is_signed(server, zone): assert signed +def verify_keys(zone, keys, expected): + """ + Checks keys for a configured zone. This verifies: + 1. The expected number of keys exist in 'keys'. + 2. The keys match the expected properties. + """ + + def _verify_keys(): + # check number of keys matches expected. + if len(keys) != len(expected): + return False + + if len(keys) == 0: + return True + + for expect in expected: + expect.key = None + + for key in keys: + found = False + i = 0 + while not found and i < len(expected): + if expected[i].key is None: + found = key.match_properties(zone, expected[i]) + if found: + key.external = expected[i].properties["legacy"] + expected[i].key = key + i += 1 + if not found: + return False + + return True + + isctest.run.retry_with_timeout(_verify_keys, timeout=10) + + +def check_keytimes(keys, expected): + """ + Check the key timing metadata for all keys in 'keys'. + """ + assert len(keys) == len(expected) + + if len(keys) == 0: + return + + for key in keys: + for expect in expected: + if expect.properties["legacy"]: + continue + + if not key is expect.key: + continue + + synonyms = {} + if "Generated" in expect.timing: + synonyms["Created"] = expect.timing["Generated"] + if "Published" in expect.timing: + synonyms["Publish"] = expect.timing["Published"] + if "PublishCDS" in expect.timing: + synonyms["SyncPublish"] = expect.timing["PublishCDS"] + if "Active" in expect.timing: + synonyms["Activate"] = expect.timing["Active"] + if "Retired" in expect.timing: + synonyms["Inactive"] = expect.timing["Retired"] + if "DeleteCDS" in expect.timing: + synonyms["SyncDelete"] = expect.timing["DeleteCDS"] + if "Revoked" in expect.timing: + synonyms["Revoked"] = expect.timing["Revoked"] + if "Removed" in expect.timing: + synonyms["Delete"] = expect.timing["Removed"] + + assert key.match_timingmetadata(synonyms, file=key.keyfile, comment=True) + if expect.properties["private"]: + assert key.match_timingmetadata(synonyms, file=key.privatefile) + if not expect.properties["legacy"]: + assert key.match_timingmetadata(expect.timing) + + state_changes = [ + "DNSKEYChange", + "KRRSIGChange", + "ZRRSIGChange", + "DSChange", + ] + for change in state_changes: + assert key.is_metadata_consistent( + change, expect.timing, checkval=False + ) + + +def check_keyrelationships(keys, expected): + """ + Check the key relationships (Successor and Predecessor metadata). + """ + for key in keys: + for expect in expected: + if expect.properties["legacy"]: + continue + + if not key is expect.key: + continue + + relationship_status = ["Predecessor", "Successor"] + for status in relationship_status: + assert key.is_metadata_consistent(status, expect.metadata) + + def check_dnssec_verify(server, zone): # Check if zone if DNSSEC valid with dnssec-verify. fqdn = f"{zone}." From 12e57eb222c3e4e721d5978d41c84efe0caadd77 Mon Sep 17 00:00:00 2001 From: Matthijs Mekking Date: Fri, 14 Mar 2025 11:10:22 +0100 Subject: [PATCH 5/7] Introduce pytest check_next_key_event, get_keyids For the kasp tests we need a new utility that can retrieve a list of Keys from a given directory, belonging to a specific zone. This is 'keydir_to_keylist' and is the replacement of 'kasp.sh:get_keyids()'. 'next_key_event_eqauls' is a method to check when the next key event is scheduled, needed for the rollover tests, and is the equivalent of shell script 'check_next_key_event'. --- bin/tests/system/isctest/kasp.py | 40 ++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/bin/tests/system/isctest/kasp.py b/bin/tests/system/isctest/kasp.py index 1cab813896..fa3c297f78 100644 --- a/bin/tests/system/isctest/kasp.py +++ b/bin/tests/system/isctest/kasp.py @@ -10,6 +10,7 @@ # information regarding copyright ownership. from functools import total_ordering +import glob import os from pathlib import Path import re @@ -25,6 +26,8 @@ import isctest.query DEFAULT_TTL = 300 +NEXT_KEY_EVENT_THRESHOLD = 100 + def _query(server, qname, qtype): query = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True) @@ -1010,6 +1013,43 @@ def check_subdomain(server, zone, ksks, zsks): check_signatures(rrsigs, qtype, fqdn, ksks, zsks) +def next_key_event_equals(server, zone, next_event): + if next_event is None: + # No next key event check. + return True + + val = int(next_event.total_seconds()) + if val == 3600: + waitfor = rf".*zone {zone}.*: next key event in (.*) seconds" + else: + # Don't want default loadkeys interval. + waitfor = rf".*zone {zone}.*: next key event in (?!3600$)(.*) seconds" + + with server.watch_log_from_start() as watcher: + watcher.wait_for_line(re.compile(waitfor)) + + # WMM: The with code below is extracting the line the watcher was + # waiting for. If WatchLog.wait_for_line()` returned the matched string, + # we can use it directly on `re.match`. + next_found = False + minval = val - NEXT_KEY_EVENT_THRESHOLD + maxval = val + NEXT_KEY_EVENT_THRESHOLD + with open(f"{server.identifier}/named.run", "r", encoding="utf-8") as fp: + for line in fp: + match = re.match(waitfor, line) + if match is not None: + nextval = int(match.group(1)) + if minval <= nextval <= maxval: + next_found = True + break + + isctest.log.debug( + f"check next key event: expected {val} in: {line.strip()}" + ) + + return next_found + + def keydir_to_keylist( zone: Optional[str], keydir: Optional[str] = None, in_use: bool = False ) -> List[Key]: From 9cb287afa0d428820d41577601cb2a763206a48b Mon Sep 17 00:00:00 2001 From: Matthijs Mekking Date: Fri, 14 Mar 2025 11:19:40 +0100 Subject: [PATCH 6/7] Add support for TSIG in isctest.kasp For some kasp test we are going to need TSIG based queries to differentiate between views. --- bin/tests/system/isctest/kasp.py | 35 +++++++++++++++++++------------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/bin/tests/system/isctest/kasp.py b/bin/tests/system/isctest/kasp.py index fa3c297f78..7c05206d0f 100644 --- a/bin/tests/system/isctest/kasp.py +++ b/bin/tests/system/isctest/kasp.py @@ -21,6 +21,7 @@ from typing import Dict, List, Optional, Union from datetime import datetime, timedelta, timezone import dns +import dns.tsig import isctest.log import isctest.query @@ -29,8 +30,14 @@ DEFAULT_TTL = 300 NEXT_KEY_EVENT_THRESHOLD = 100 -def _query(server, qname, qtype): +def _query(server, qname, qtype, tsig=None): query = dns.message.make_query(qname, qtype, use_edns=True, want_dnssec=True) + + if tsig is not None: + tsigkey = tsig.split(":") + keyring = dns.tsig.Key(tsigkey[1], tsigkey[2], tsigkey[0]) + query.use_tsig(keyring) + try: response = isctest.query.tcp(query, server.ip, server.ports.dns, timeout=3) except dns.exception.Timeout: @@ -554,14 +561,14 @@ class Key: return self.path -def check_zone_is_signed(server, zone): +def check_zone_is_signed(server, zone, tsig=None): addr = server.ip fqdn = f"{zone}." # wait until zone is fully signed signed = False for _ in range(10): - response = _query(server, fqdn, dns.rdatatype.NSEC) + response = _query(server, fqdn, dns.rdatatype.NSEC, tsig=tsig) if not isinstance(response, dns.message.Message): isctest.log.debug(f"no response for {fqdn} NSEC from {addr}") elif response.rcode() != dns.rcode.NOERROR: @@ -711,13 +718,13 @@ def check_keyrelationships(keys, expected): assert key.is_metadata_consistent(status, expect.metadata) -def check_dnssec_verify(server, zone): +def check_dnssec_verify(server, zone, tsig=None): # Check if zone if DNSSEC valid with dnssec-verify. fqdn = f"{zone}." verified = False for _ in range(10): - transfer = _query(server, fqdn, dns.rdatatype.AXFR) + transfer = _query(server, fqdn, dns.rdatatype.AXFR, tsig=tsig) if not isinstance(transfer, dns.message.Message): isctest.log.debug(f"no response for {fqdn} AXFR from {server.ip}") elif transfer.rcode() != dns.rcode.NOERROR: @@ -936,8 +943,8 @@ def check_cds(rrset, keys): assert numcds == len(cdss) -def _query_rrset(server, fqdn, qtype): - response = _query(server, fqdn, qtype) +def _query_rrset(server, fqdn, qtype, tsig=None): + response = _query(server, fqdn, qtype, tsig=tsig) assert response.rcode() == dns.rcode.NOERROR rrs = [] @@ -957,46 +964,46 @@ def _query_rrset(server, fqdn, qtype): return rrs, rrsigs -def check_apex(server, zone, ksks, zsks): +def check_apex(server, zone, ksks, zsks, tsig=None): # Test the apex of a zone. This checks that the SOA and DNSKEY RRsets # are signed correctly and with the appropriate keys. fqdn = f"{zone}." # test dnskey query - dnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.DNSKEY) + dnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.DNSKEY, tsig=tsig) assert len(dnskeys) > 0 check_dnskeys(dnskeys, ksks, zsks) assert len(rrsigs) > 0 check_signatures(rrsigs, dns.rdatatype.DNSKEY, fqdn, ksks, zsks) # test soa query - soa, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.SOA) + soa, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.SOA, tsig=tsig) assert len(soa) == 1 assert f"{zone}. {DEFAULT_TTL} IN SOA" in soa[0].to_text() assert len(rrsigs) > 0 check_signatures(rrsigs, dns.rdatatype.SOA, fqdn, ksks, zsks) # test cdnskey query - cdnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDNSKEY) + cdnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDNSKEY, tsig=tsig) check_dnskeys(cdnskeys, ksks, zsks, cdnskey=True) if len(cdnskeys) > 0: assert len(rrsigs) > 0 check_signatures(rrsigs, dns.rdatatype.CDNSKEY, fqdn, ksks, zsks) # test cds query - cds, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDS) + cds, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDS, tsig=tsig) check_cds(cds, ksks) if len(cds) > 0: assert len(rrsigs) > 0 check_signatures(rrsigs, dns.rdatatype.CDS, fqdn, ksks, zsks) -def check_subdomain(server, zone, ksks, zsks): +def check_subdomain(server, zone, ksks, zsks, tsig=None): # Test an RRset below the apex and verify it is signed correctly. fqdn = f"{zone}." qname = f"a.{zone}." qtype = dns.rdatatype.A - response = _query(server, qname, qtype) + response = _query(server, qname, qtype, tsig=tsig) assert response.rcode() == dns.rcode.NOERROR match = f"{qname} {DEFAULT_TTL} IN A 10.0.0.1" From 0a6cc42914964cd83fe4973456fc54f8bff90c30 Mon Sep 17 00:00:00 2001 From: Matthijs Mekking Date: Mon, 24 Feb 2025 11:35:37 +0100 Subject: [PATCH 7/7] Update _check_dnskeys function In the kasp system test there are cases that the SyncPublish is not set, nor it is required to do so. Update the _check_dnskeys function accordingly. --- bin/tests/system/isctest/kasp.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bin/tests/system/isctest/kasp.py b/bin/tests/system/isctest/kasp.py index 7c05206d0f..1b82baf416 100644 --- a/bin/tests/system/isctest/kasp.py +++ b/bin/tests/system/isctest/kasp.py @@ -856,9 +856,9 @@ def _check_dnskeys(dnskeys, keys, cdnskey=False): delete_md = f"Sync{delete_md}" for key in keys: - publish = key.get_timing(publish_md) + publish = key.get_timing(publish_md, must_exist=False) delete = key.get_timing(delete_md, must_exist=False) - published = now >= publish + published = publish is not None and now >= publish removed = delete is not None and delete <= now if not published or removed: