From 2b0a8fcfb5084b23477f1c66b9f32445422a4461 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nicki=20K=C5=99=C3=AD=C5=BEek?= Date: Mon, 7 Oct 2024 18:08:02 +0200 Subject: [PATCH] Use convenience wrappers for kasp key operations --- bin/tests/system/isctest/kasp.py | 404 +++++++++++++------------ bin/tests/system/ksr/tests_ksr.py | 474 ++++++++++++++---------------- 2 files changed, 431 insertions(+), 447 deletions(-) diff --git a/bin/tests/system/isctest/kasp.py b/bin/tests/system/isctest/kasp.py index 685250f291..6fbc57b3b5 100644 --- a/bin/tests/system/isctest/kasp.py +++ b/bin/tests/system/isctest/kasp.py @@ -9,8 +9,12 @@ # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. +from functools import total_ordering import os +from pathlib import Path +import re import time +from typing import Optional, Union from datetime import datetime from datetime import timedelta @@ -41,152 +45,191 @@ def _query(server, qname, qtype, outfile=None): return response -def addtime(value, plus): - # Get timing metadata from a value plus additional time. - # Convert "%Y%m%d%H%M%S" format to epoch seconds. - # Then, add the additional time (can be negative). - now = datetime.strptime(value, "%Y%m%d%H%M%S") - delta = timedelta(seconds=plus) - then = now + delta - return then.strftime("%Y%m%d%H%M%S") +@total_ordering +class KeyTimingMetadata: + """ + Represent a single timing information for a key. + + These objects can be easily compared, support addition and subtraction of + timedelta objects or integers(value in seconds). A lack of timing metadata + in the key (value 0) should be represented with None rather than an + instance of this object. + """ + + FORMAT = "%Y%m%d%H%M%S" + + def __init__(self, timestamp: str): + if int(timestamp) <= 0: + raise ValueError(f'invalid timing metadata value: "{timestamp}"') + self.value = datetime.strptime(timestamp, self.FORMAT) + + def __repr__(self): + return self.value.strftime(self.FORMAT) + + def __str__(self) -> str: + return self.value.strftime(self.FORMAT) + + def __add__(self, other: Union[timedelta, int]): + if isinstance(other, int): + other = timedelta(seconds=other) + result = KeyTimingMetadata.__new__(KeyTimingMetadata) + result.value = self.value + other + return result + + def __sub__(self, other: Union[timedelta, int]): + if isinstance(other, int): + other = timedelta(seconds=other) + result = KeyTimingMetadata.__new__(KeyTimingMetadata) + result.value = self.value - other + return result + + def __iadd__(self, other: Union[timedelta, int]): + if isinstance(other, int): + other = timedelta(seconds=other) + self.value += other + + def __isub__(self, other: Union[timedelta, int]): + if isinstance(other, int): + other = timedelta(seconds=other) + self.value -= other + + def __lt__(self, other: "KeyTimingMetadata"): + return self.value < other.value + + def __eq__(self, other: object): + return isinstance(other, KeyTimingMetadata) and self.value == other.value + + @staticmethod + def now() -> "KeyTimingMetadata": + result = KeyTimingMetadata.__new__(KeyTimingMetadata) + result.value = datetime.now() + return result -def get_timing_metadata(key, metadata, keydir=None, offset=0, must_exist=True): - value = "0" +@total_ordering +class Key: + """ + Represent a key from a keyfile. - if keydir is not None: - keyfile = "{}/{}.key".format(keydir, key) - else: - keyfile = "{}.key".format(key) + This object keeps track of its origin (keydir + name), can be used to + retrieve metadata from the underlying files and supports convenience + operations for KASP tests. + """ - with open(keyfile, "r", encoding="utf-8") as file: - for line in file: - if "; {}".format(metadata) in line: - value = line.split()[2] - break + def __init__(self, name: str, keydir: Optional[Union[str, Path]] = None): + self.name = name + if keydir is None: + self.keydir = Path() + else: + self.keydir = Path(keydir) + self.path = str(self.keydir / name) + self.keyfile = f"{self.path}.key" + self.statefile = f"{self.path}.state" + self.tag = int(self.name[-5:]) - if must_exist: - assert int(value) > 0 + def get_timing( + self, metadata: str, must_exist: bool = True + ) -> Optional[KeyTimingMetadata]: + regex = rf";\s+{metadata}:\s+(\d+).*" + with open(self.keyfile, "r", encoding="utf-8") as file: + for line in file: + match = re.match(regex, line) + if match is not None: + try: + return KeyTimingMetadata(match.group(1)) + except ValueError: + break + if must_exist: + raise ValueError( + f'timing metadata "{metadata}" for key "{self.name}" invalid' + ) + return None - if int(value) > 0: - return addtime(value, offset) + def get_metadata(self, metadata: str, must_exist=True) -> str: + value = "undefined" + regex = rf"{metadata}:\s+(.*)" + with open(self.statefile, "r", encoding="utf-8") as file: + for line in file: + match = re.match(regex, line) + if match is not None: + value = match.group(1) + break + if must_exist and value == "undefined": + raise ValueError( + 'state metadata "{metadata}" for key "{self.name}" undefined' + ) + return value - return "0" + def is_ksk(self) -> bool: + return self.get_metadata("KSK") == "yes" + def is_zsk(self) -> bool: + return self.get_metadata("ZSK") == "yes" -def get_metadata(key, metadata, keydir=None, must_exist=True): - if keydir is not None: - statefile = "{}/{}.state".format(keydir, key) - else: - statefile = "{}.state".format(key) + def dnskey_equals(self, value, cdnskey=False): + dnskey = value.split() - value = "undefined" - with open(statefile, "r", encoding="utf-8") as file: - for line in file: - if f"{metadata}: " in line: - value = line.split()[1] - break + if cdnskey: + # fourth element is the rrtype + assert dnskey[3] == "CDNSKEY" + dnskey[3] = "DNSKEY" - if must_exist: - assert value != "undefined" + dnskey_fromfile = [] + rdata = " ".join(dnskey[:7]) - return value + with open(self.keyfile, "r", encoding="utf-8") as file: + for line in file: + if f"{rdata}" in line: + dnskey_fromfile = line.split() + pubkey_fromfile = "".join(dnskey_fromfile[7:]) + pubkey_fromwire = "".join(dnskey[7:]) -def get_keystate(key, metadata, keydir=None, must_exist=True): + return pubkey_fromfile == pubkey_fromwire - return get_metadata(key, metadata, keydir, must_exist) + def cds_equals(self, value, alg): + cds = value.split() + dsfromkey_command = [ + os.environ.get("DSFROMKEY"), + "-T", + "3600", + "-a", + alg, + "-C", + "-w", + str(self.keyfile), + ] -def get_keytag(key): - return int(key[-5:]) + out = isctest.run.cmd(dsfromkey_command, log_stdout=True) + dsfromkey = out.stdout.decode("utf-8").split() + rdata_fromfile = " ".join(dsfromkey[:7]) + rdata_fromwire = " ".join(cds[:7]) + if rdata_fromfile != rdata_fromwire: + isctest.log.debug( + f"CDS RDATA MISMATCH: {rdata_fromfile} - {rdata_fromwire}" + ) + return False -def get_keyrole(key, keydir=None): - ksk = "no" - zsk = "no" + digest_fromfile = "".join(dsfromkey[7:]).lower() + digest_fromwire = "".join(cds[7:]).lower() + if digest_fromfile != digest_fromwire: + isctest.log.debug( + f"CDS DIGEST MISMATCH: {digest_fromfile} - {digest_fromwire}" + ) + return False - if keydir is not None: - statefile = "{}/{}.state".format(keydir, key) - else: - statefile = "{}.state".format(key) + return digest_fromfile == digest_fromwire - with open(statefile, "r", encoding="utf-8") as file: - for line in file: - if "KSK: " in line: - ksk = line.split()[1] - if "ZSK: " in line: - zsk = line.split()[1] + def __lt__(self, other: "Key"): + return self.name < other.name - return ksk == "yes", zsk == "yes" + def __eq__(self, other: object): + return isinstance(other, Key) and self.path == other.path - -def dnskey_equals(key, value, keydir=None, cdnskey=False): - if keydir is not None: - keyfile = f"{keydir}/{key}.key" - else: - keyfile = f"{key}.key" - - dnskey = value.split() - - if cdnskey: - # fourth element is the rrtype - assert dnskey[3] == "CDNSKEY" - dnskey[3] = "DNSKEY" - - dnskey_fromfile = [] - rdata = " ".join(dnskey[:7]) - - with open(keyfile, "r", encoding="utf-8") as file: - for line in file: - if f"{rdata}" in line: - dnskey_fromfile = line.split() - - pubkey_fromfile = "".join(dnskey_fromfile[7:]) - pubkey_fromwire = "".join(dnskey[7:]) - - return pubkey_fromfile == pubkey_fromwire - - -def cds_equals(key, value, alg, keydir=None): - if keydir is not None: - keyfile = f"{keydir}/{key}.key" - else: - keyfile = f"{key}.key" - - cds = value.split() - - dsfromkey_command = [ - *os.environ.get("DSFROMKEY").split(), - "-T", - "3600", - "-a", - alg, - "-C", - "-w", - keyfile, - ] - - out = isctest.run.cmd(dsfromkey_command, log_stdout=True) - dsfromkey = out.stdout.decode("utf-8").split() - index = 6 - while index < len(cds): - dsfromkey[index] = dsfromkey[index].lower() - index += 1 - - rdata_fromfile = " ".join(dsfromkey[:7]) - rdata_fromwire = " ".join(cds[:7]) - if rdata_fromfile != rdata_fromwire: - isctest.log.debug(f"CDS RDATA MISMATCH: {rdata_fromfile} - {rdata_fromwire}") - return False - - digest_fromfile = "".join(cds[7:]) - digest_fromwire = "".join(cds[7:]) - if digest_fromfile != digest_fromwire: - isctest.log.debug(f"CDS DIGEST MISMATCH: {digest_fromfile} - {digest_fromwire}") - return False - - return digest_fromfile == digest_fromwire + def __repr__(self): + return self.path def zone_is_signed(server, zone): @@ -278,13 +321,11 @@ def check_dnssecstatus(server, zone, keys, policy=None, view=None): assert "dnssec-policy: {}".format(policy) in response for key in keys: - keytag = get_keytag(key) - assert "key: {}".format(keytag) in response + assert "key: {}".format(key.tag) in response -# pylint: disable=too-many-locals,too-many-branches -def _check_signatures(signatures, covers, fqdn, keys, keydir=None): - now = datetime.now().strftime("%Y%m%d%H%M%S") +def _check_signatures(signatures, covers, fqdn, keys): + now = KeyTimingMetadata.now() numsigs = 0 zrrsig = True if covers in [dns.rdatatype.DNSKEY, dns.rdatatype.CDNSKEY, dns.rdatatype.CDS]: @@ -292,51 +333,48 @@ def _check_signatures(signatures, covers, fqdn, keys, keydir=None): krrsig = not zrrsig for key in keys: - keytag = get_keytag(key) - ksk, zsk = get_keyrole(key, keydir=keydir) - activate = get_timing_metadata(key, "Activate", keydir=keydir) - inactive = get_timing_metadata(key, "Inactive", keydir=keydir, must_exist=False) + activate = key.get_timing("Activate") + inactive = key.get_timing("Inactive", must_exist=False) - active = int(now) >= int(activate) - retired = int(inactive) != 0 and int(inactive) <= int(now) + active = now >= activate + retired = inactive is not None and inactive <= now signing = active and not retired if not signing: for rrsig in signatures: - assert f"{keytag} {fqdn}" not in rrsig + assert f"{key.tag} {fqdn}" not in rrsig continue - if zrrsig and zsk: + if zrrsig and key.is_zsk(): has_rrsig = False for rrsig in signatures: - if f"{keytag} {fqdn}" in rrsig: + if f"{key.tag} {fqdn}" in rrsig: has_rrsig = True break assert has_rrsig numsigs += 1 - if zrrsig and not zsk: + if zrrsig and not key.is_zsk(): for rrsig in signatures: - assert f"{keytag} {fqdn}" not in rrsig + assert f"{key.tag} {fqdn}" not in rrsig - if krrsig and ksk: + if krrsig and key.is_ksk(): has_rrsig = False for rrsig in signatures: - if f"{keytag} {fqdn}" in rrsig: + if f"{key.tag} {fqdn}" in rrsig: has_rrsig = True break assert has_rrsig numsigs += 1 - if krrsig and not ksk: + if krrsig and not key.is_ksk(): for rrsig in signatures: - assert f"{keytag} {fqdn}" not in rrsig + assert f"{key.tag} {fqdn}" not in rrsig return numsigs -# pylint: disable=too-many-arguments -def check_signatures(rrset, covers, fqdn, ksks, zsks, kskdir=None, zskdir=None): +def check_signatures(rrset, covers, fqdn, ksks, zsks): # Check if signatures with covering type are signed with the right keys. # The right keys are the ones that expect a signature and have the # correct role. @@ -350,14 +388,14 @@ def check_signatures(rrset, covers, fqdn, ksks, zsks, kskdir=None, zskdir=None): rrsig = f"{rr.name} {rr.ttl} {rdclass} {rdtype} {rdata}" signatures.append(rrsig) - numsigs += _check_signatures(signatures, covers, fqdn, ksks, keydir=kskdir) - numsigs += _check_signatures(signatures, covers, fqdn, zsks, keydir=zskdir) + numsigs += _check_signatures(signatures, covers, fqdn, ksks) + numsigs += _check_signatures(signatures, covers, fqdn, zsks) assert numsigs == len(signatures) -def _check_dnskeys(dnskeys, keys, keydir=None, cdnskey=False): - now = datetime.now().strftime("%Y%m%d%H%M%S") +def _check_dnskeys(dnskeys, keys, cdnskey=False): + now = KeyTimingMetadata.now() numkeys = 0 publish_md = "Publish" @@ -367,19 +405,19 @@ def _check_dnskeys(dnskeys, keys, keydir=None, cdnskey=False): delete_md = f"Sync{delete_md}" for key in keys: - publish = get_timing_metadata(key, publish_md, keydir=keydir) - delete = get_timing_metadata(key, delete_md, keydir=keydir, must_exist=False) - published = int(now) >= int(publish) - removed = int(delete) != 0 and int(delete) <= int(now) + publish = key.get_timing(publish_md) + delete = key.get_timing(delete_md, must_exist=False) + published = now >= publish + removed = delete is not None and delete <= now if not published or removed: for dnskey in dnskeys: - assert not dnskey_equals(key, dnskey, keydir=keydir, cdnskey=cdnskey) + assert not key.dnskey_equals(dnskey, cdnskey=cdnskey) continue has_dnskey = False for dnskey in dnskeys: - if dnskey_equals(key, dnskey, keydir=keydir, cdnskey=cdnskey): + if key.dnskey_equals(dnskey, cdnskey=cdnskey): has_dnskey = True break @@ -389,8 +427,7 @@ def _check_dnskeys(dnskeys, keys, keydir=None, cdnskey=False): return numkeys -# pylint: disable=too-many-arguments -def check_dnskeys(rrset, ksks, zsks, kskdir=None, zskdir=None, cdnskey=False): +def check_dnskeys(rrset, ksks, zsks, cdnskey=False): # Check if the correct DNSKEY records are published. If the current time # is between the timing metadata 'publish' and 'delete', the key must have # a DNSKEY record published. If 'cdnskey' is True, check against CDNSKEY @@ -405,20 +442,20 @@ def check_dnskeys(rrset, ksks, zsks, kskdir=None, zskdir=None, cdnskey=False): dnskey = f"{rr.name} {rr.ttl} {rdclass} {rdtype} {rdata}" dnskeys.append(dnskey) - numkeys += _check_dnskeys(dnskeys, ksks, keydir=kskdir, cdnskey=cdnskey) + numkeys += _check_dnskeys(dnskeys, ksks, cdnskey=cdnskey) if not cdnskey: - numkeys += _check_dnskeys(dnskeys, zsks, keydir=zskdir) + numkeys += _check_dnskeys(dnskeys, zsks) assert numkeys == len(dnskeys) # pylint: disable=too-many-locals -def check_cds(rrset, keys, keydir=None): +def check_cds(rrset, keys): # Check if the correct CDS records are published. If the current time # is between the timing metadata 'publish' and 'delete', the key must have # a DNSKEY record published. If 'cdnskey' is True, check against CDNSKEY # records instead. - now = datetime.now().strftime("%Y%m%d%H%M%S") + now = KeyTimingMetadata.now() numcds = 0 cdss = [] @@ -430,21 +467,20 @@ def check_cds(rrset, keys, keydir=None): cdss.append(cds) for key in keys: - ksk, _ = get_keyrole(key, keydir=keydir) - assert ksk + assert key.is_ksk() - publish = get_timing_metadata(key, "SyncPublish", keydir=keydir) - delete = get_timing_metadata(key, "SyncDelete", keydir=keydir, must_exist=False) - published = int(now) >= int(publish) - removed = int(delete) != 0 and int(delete) <= int(now) + publish = key.get_timing("SyncPublish") + delete = key.get_timing("SyncDelete", must_exist=False) + published = now >= publish + removed = delete is not None and delete <= now if not published or removed: for cds in cdss: - assert not cds_equals(key, cds, "SHA-256", keydir=keydir) + assert not key.cds_equals(cds, "SHA-256") continue has_cds = False for cds in cdss: - if cds_equals(key, cds, "SHA-256", keydir=keydir): + if key.cds_equals(cds, "SHA-256"): has_cds = True break @@ -475,8 +511,7 @@ def _query_rrset(server, fqdn, qtype): return rrs, rrsigs -# pylint: disable=too-many-arguments -def check_apex(server, zone, ksks, zsks, kskdir=None, zskdir=None): +def check_apex(server, zone, ksks, zsks): # Test the apex of a zone. This checks that the SOA and DNSKEY RRsets # are signed correctly and with the appropriate keys. fqdn = f"{zone}." @@ -484,42 +519,33 @@ def check_apex(server, zone, ksks, zsks, kskdir=None, zskdir=None): # test dnskey query dnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.DNSKEY) assert len(dnskeys) > 0 - check_dnskeys(dnskeys, ksks, zsks, kskdir=kskdir, zskdir=zskdir) + check_dnskeys(dnskeys, ksks, zsks) assert len(rrsigs) > 0 - check_signatures( - rrsigs, dns.rdatatype.DNSKEY, fqdn, ksks, zsks, kskdir=kskdir, zskdir=zskdir - ) + check_signatures(rrsigs, dns.rdatatype.DNSKEY, fqdn, ksks, zsks) # test soa query soa, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.SOA) assert len(soa) == 1 assert f"{zone}. {DEFAULT_TTL} IN SOA" in soa[0].to_text() assert len(rrsigs) > 0 - check_signatures( - rrsigs, dns.rdatatype.SOA, fqdn, ksks, zsks, kskdir=kskdir, zskdir=zskdir - ) + check_signatures(rrsigs, dns.rdatatype.SOA, fqdn, ksks, zsks) # test cdnskey query cdnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDNSKEY) assert len(cdnskeys) > 0 - check_dnskeys(cdnskeys, ksks, zsks, kskdir=kskdir, zskdir=zskdir, cdnskey=True) + check_dnskeys(cdnskeys, ksks, zsks, cdnskey=True) assert len(rrsigs) > 0 - check_signatures( - rrsigs, dns.rdatatype.CDNSKEY, fqdn, ksks, zsks, kskdir=kskdir, zskdir=zskdir - ) + check_signatures(rrsigs, dns.rdatatype.CDNSKEY, fqdn, ksks, zsks) # test cds query cds, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDS) assert len(cds) > 0 - check_cds(cds, ksks, keydir=kskdir) + check_cds(cds, ksks) assert len(rrsigs) > 0 - check_signatures( - rrsigs, dns.rdatatype.CDS, fqdn, ksks, zsks, kskdir=kskdir, zskdir=zskdir - ) + check_signatures(rrsigs, dns.rdatatype.CDS, fqdn, ksks, zsks) -# pylint: disable=too-many-arguments -def check_subdomain(server, zone, ksks, zsks, kskdir=None, zskdir=None): +def check_subdomain(server, zone, ksks, zsks): # Test an RRset below the apex and verify it is signed correctly. fqdn = f"{zone}." qname = f"a.{zone}." @@ -538,4 +564,4 @@ def check_subdomain(server, zone, ksks, zsks, kskdir=None, zskdir=None): assert match in rrset.to_text() assert len(rrsigs) > 0 - check_signatures(rrsigs, qtype, fqdn, ksks, zsks, kskdir=kskdir, zskdir=zskdir) + check_signatures(rrsigs, qtype, fqdn, ksks, zsks) diff --git a/bin/tests/system/ksr/tests_ksr.py b/bin/tests/system/ksr/tests_ksr.py index efe688d983..73a0fa0fb3 100644 --- a/bin/tests/system/ksr/tests_ksr.py +++ b/bin/tests/system/ksr/tests_ksr.py @@ -11,29 +11,26 @@ # pylint: disable=too-many-lines +from datetime import timedelta import os import shutil import time +from typing import List, Optional from datetime import datetime import isctest - from isctest.kasp import ( - addtime, - cds_equals, - dnskey_equals, - get_keytag, - get_metadata, - get_timing_metadata, + Key, + KeyTimingMetadata, ) def between(value, start, end): - if int(value) == 0: + if value is None or start is None or end is None: return False - return int(value) > int(start) and int(value) < int(end) + return start < value < end def file_contents_equal(file1, file2): @@ -46,6 +43,10 @@ def file_contents_equal(file1, file2): isctest.run.cmd(diff_command) +def keystr_to_keylist(keystr: str, keydir: Optional[str] = None) -> List[Key]: + return [Key(name, keydir) for name in keystr.split()] + + def keygen(zone, policy, keydir, when="now"): keygen_command = [ *os.environ.get("KEYGEN").split(), @@ -65,9 +66,7 @@ def keygen(zone, policy, keydir, when="now"): when, zone, ] - output = isctest.run.cmd(keygen_command, log_stdout=True).stdout.decode("utf-8") - keys = output.split() - return keys + return isctest.run.cmd(keygen_command, log_stdout=True).stdout.decode("utf-8") def ksr(zone, policy, action, options="", raise_on_exception=True): @@ -89,21 +88,15 @@ def ksr(zone, policy, action, options="", raise_on_exception=True): # pylint: disable=too-many-arguments,too-many-branches,too-many-locals,too-many-statements -def check_keys(keys, lifetime, alg, size, keydir=None, offset=0, with_state=False): +def check_keys(keys, lifetime, alg, size, offset=0, with_state=False): # Check keys that were created. - inception = 0 num = 0 - now = datetime.now().strftime("%Y%m%d%H%M%S") + now = KeyTimingMetadata.now() for key in keys: - if keydir is not None: - statefile = f"{keydir}/{key}.state" - else: - statefile = f"{key}.state" - # created: from keyfile plus offset - created = get_timing_metadata(key, "Created", keydir=keydir, offset=offset) + created = key.get_timing("Created") + offset # active: retired previous key if num == 0: @@ -111,23 +104,22 @@ def check_keys(keys, lifetime, alg, size, keydir=None, offset=0, with_state=Fals else: active = retired - # published: 2h5m (dnskey-ttl + publish-safety + propagation) - published = addtime(active, -7500) + # published: dnskey-ttl + publish-safety + propagation + published = active - timedelta(hours=2, minutes=5) # retired: zsk-lifetime - if lifetime > 0: - retired = addtime(active, lifetime) - # removed: 10d1h5m - # (ttlsig + retire-safety + sign-delay + propagation) - removed = addtime(retired, 867900) + if lifetime is not None: + retired = active + lifetime + # removed: ttlsig + retire-safety + sign-delay + propagation + removed = retired + timedelta(days=10, hours=1, minutes=5) else: - retired = 0 - removed = 0 + retired = None + removed = None - if between(now, published, retired) or int(retired) == 0: + if retired is None or between(now, published, retired): goal = "omnipresent" - pubdelay = addtime(published, 7500) - signdelay = addtime(active, 867900) + pubdelay = published + timedelta(hours=2, minutes=5) + signdelay = active + timedelta(days=10, hours=1, minutes=5) if between(now, published, pubdelay): state_dnskey = "rumoured" @@ -143,20 +135,21 @@ def check_keys(keys, lifetime, alg, size, keydir=None, offset=0, with_state=Fals state_dnskey = "hidden" state_zrrsig = "hidden" - with open(statefile, "r", encoding="utf-8") as file: + with open(key.statefile, "r", encoding="utf-8") as file: metadata = file.read() assert f"Algorithm: {alg}" in metadata assert f"Length: {size}" in metadata - assert f"Lifetime: {lifetime}" in metadata assert "KSK: no" in metadata assert "ZSK: yes" in metadata assert f"Published: {published}" in metadata assert f"Active: {active}" in metadata - if lifetime > 0: + if lifetime is not None: assert f"Retired: {retired}" in metadata assert f"Removed: {removed}" in metadata + assert f"Lifetime: {int(lifetime.total_seconds())}" in metadata else: + assert "Lifetime: 0" in metadata assert "Retired:" not in metadata assert "Removed:" not in metadata @@ -167,39 +160,36 @@ def check_keys(keys, lifetime, alg, size, keydir=None, offset=0, with_state=Fals assert "KRRSIGState:" not in metadata assert "DSState:" not in metadata - inception += lifetime num += 1 -def check_keysigningrequest(out, zsks, start, end, keydir=None): +def check_keysigningrequest(out, zsks, start, end): lines = out.split("\n") line_no = 0 inception = start - while int(inception) < int(end): - next_bundle = addtime(end, 1) + while inception < end: + next_bundle = end + 1 # expect bundle header assert f";; KeySigningRequest 1.0 {inception}" in lines[line_no] line_no += 1 # expect zsks for key in sorted(zsks): - published = get_timing_metadata(key, "Publish", keydir=keydir) + published = key.get_timing("Publish") if between(published, inception, next_bundle): next_bundle = published - removed = get_timing_metadata( - key, "Delete", keydir=keydir, must_exist=False - ) + removed = key.get_timing("Delete", must_exist=False) if between(removed, inception, next_bundle): next_bundle = removed - if int(published) > int(inception): + if published > inception: continue - if int(removed) != 0 and int(inception) >= int(removed): + if removed is not None and inception >= removed: continue # this zsk must be in the ksr - assert dnskey_equals(key, lines[line_no], keydir=keydir) + assert key.dnskey_equals(lines[line_no]) line_no += 1 inception = next_bundle @@ -225,17 +215,15 @@ def check_signedkeyresponse( start, end, refresh, - kskdir=None, - zskdir=None, cdnskey=True, cds="SHA-256", ): lines = out.split("\n") line_no = 0 - next_bundle = addtime(end, 1) + next_bundle = end + 1 inception = start - while int(inception) < int(end): + while inception < end: # A single signed key response may consist of: # ;; SignedKeyResponse (header) # ;; DNSKEY 257 (one per published key in ksks) @@ -246,9 +234,9 @@ def check_signedkeyresponse( # ;; CDS (one per published key in ksks) # ;; RRSIG(CDS) (one per active key in ksks) - sigstart = addtime(inception, -3600) # clockskew: 1 hour - sigend = addtime(inception, 1209600) # sig-validity: 14 days - next_bundle = addtime(sigend, refresh) + sigstart = inception - timedelta(hours=1) # clockskew + sigend = inception + timedelta(days=14) # sig-validity + next_bundle = sigend + refresh # ignore empty lines while line_no < len(lines): @@ -263,56 +251,49 @@ def check_signedkeyresponse( # expect ksks for key in sorted(ksks): - published = get_timing_metadata(key, "Publish", keydir=kskdir) - removed = get_timing_metadata( - key, "Delete", keydir=kskdir, must_exist=False - ) + published = key.get_timing("Publish") + removed = key.get_timing("Delete", must_exist=False) - if int(published) > int(inception): + if published > inception: continue - if int(removed) != 0 and int(inception) >= int(removed): + if removed is not None and inception >= removed: continue # this ksk must be in the ksr - assert dnskey_equals(key, lines[line_no], keydir=kskdir) + assert key.dnskey_equals(lines[line_no]) line_no += 1 # expect zsks for key in sorted(zsks): - published = get_timing_metadata(key, "Publish", keydir=zskdir) + published = key.get_timing("Publish") if between(published, inception, next_bundle): next_bundle = published - removed = get_timing_metadata( - key, "Delete", keydir=zskdir, must_exist=False - ) + removed = key.get_timing("Delete", must_exist=False) if between(removed, inception, next_bundle): next_bundle = removed - if int(published) > int(inception): + if published > inception: continue - if int(removed) != 0 and int(inception) >= int(removed): + if removed is not None and inception >= removed: continue # this zsk must be in the ksr - assert dnskey_equals(key, lines[line_no], keydir=zskdir) + assert key.dnskey_equals(lines[line_no]) line_no += 1 # expect rrsig(dnskey) for key in sorted(ksks): - active = get_timing_metadata(key, "Activate", keydir=kskdir) - inactive = get_timing_metadata( - key, "Inactive", keydir=kskdir, must_exist=False - ) - if int(active) > int(inception): + active = key.get_timing("Activate") + inactive = key.get_timing("Inactive", must_exist=False) + if active > inception: continue - if int(inactive) != 0 and int(inception) >= int(inactive): + if inactive is not None and inception >= inactive: continue # there must be a signature of this ksk - keytag = get_keytag(key) - alg = get_metadata(key, "Algorithm", keydir=kskdir) - expect = f"{zone}. 3600 IN RRSIG DNSKEY {alg} 2 3600 {sigend} {sigstart} {keytag} {zone}." + alg = key.get_metadata("Algorithm") + expect = f"{zone}. 3600 IN RRSIG DNSKEY {alg} 2 3600 {sigend} {sigstart} {key.tag} {zone}." rrsig = " ".join(lines[line_no].split()) assert expect in rrsig line_no += 1 @@ -320,34 +301,29 @@ def check_signedkeyresponse( # expect cdnskey if cdnskey: for key in sorted(ksks): - published = get_timing_metadata(key, "Publish", keydir=kskdir) - removed = get_timing_metadata( - key, "Delete", keydir=kskdir, must_exist=False - ) - if int(published) > int(inception): + published = key.get_timing("Publish") + removed = key.get_timing("Delete", must_exist=False) + if published > inception: continue - if int(removed) != 0 and int(inception) >= int(removed): + if removed is not None and inception >= removed: continue # the cdnskey of this ksk must be in the ksr - assert dnskey_equals(key, lines[line_no], keydir=kskdir, cdnskey=True) + assert key.dnskey_equals(lines[line_no], cdnskey=True) line_no += 1 # expect rrsig(cdnskey) for key in sorted(ksks): - active = get_timing_metadata(key, "Activate", keydir=kskdir) - inactive = get_timing_metadata( - key, "Inactive", keydir=kskdir, must_exist=False - ) - if int(active) > int(inception): + active = key.get_timing("Activate") + inactive = key.get_timing("Inactive", must_exist=False) + if active > inception: continue - if int(inactive) != 0 and int(inception) >= int(inactive): + if inactive is not None and inception >= inactive: continue # there must be a signature of this ksk - keytag = get_keytag(key) - alg = get_metadata(key, "Algorithm", keydir=kskdir) - expect = f"{zone}. 3600 IN RRSIG CDNSKEY {alg} 2 3600 {sigend} {sigstart} {keytag} {zone}." + alg = key.get_metadata("Algorithm") + expect = f"{zone}. 3600 IN RRSIG CDNSKEY {alg} 2 3600 {sigend} {sigstart} {key.tag} {zone}." rrsig = " ".join(lines[line_no].split()) assert expect in rrsig line_no += 1 @@ -355,36 +331,31 @@ def check_signedkeyresponse( # expect cds if cds != "": for key in sorted(ksks): - published = get_timing_metadata(key, "Publish", keydir=kskdir) - removed = get_timing_metadata( - key, "Delete", keydir=kskdir, must_exist=False - ) - if int(published) > int(inception): + published = key.get_timing("Publish") + removed = key.get_timing("Delete", must_exist=False) + if published > inception: continue - if int(removed) != 0 and int(inception) >= int(removed): + if removed is not None and inception >= removed: continue # the cds of this ksk must be in the ksr expected_cds = cds.split(",") for alg in expected_cds: - assert cds_equals(key, lines[line_no], alg.strip(), keydir=kskdir) + assert key.cds_equals(lines[line_no], alg.strip()) line_no += 1 # expect rrsig(cds) for key in sorted(ksks): - active = get_timing_metadata(key, "Activate", keydir=kskdir) - inactive = get_timing_metadata( - key, "Inactive", keydir=kskdir, must_exist=False - ) - if int(active) > int(inception): + active = key.get_timing("Activate") + inactive = key.get_timing("Inactive", must_exist=False) + if active > inception: continue - if int(inactive) != 0 and int(inception) >= int(inactive): + if inactive is not None and inception >= inactive: continue # there must be a signature of this ksk - keytag = get_keytag(key) - alg = get_metadata(key, "Algorithm", keydir=kskdir) - expect = f"{zone}. 3600 IN RRSIG CDS {alg} 2 3600 {sigend} {sigstart} {keytag} {zone}." + alg = key.get_metadata("Algorithm") + expect = f"{zone}. 3600 IN RRSIG CDS {alg} 2 3600 {sigend} {sigstart} {key.tag} {zone}." rrsig = " ".join(lines[line_no].split()) assert expect in rrsig line_no += 1 @@ -441,52 +412,55 @@ def test_ksr_common(servers): n = 1 # create ksk - ksks = keygen(zone, policy, "ns1/offline") + kskdir = "ns1/offline" + out = keygen(zone, policy, kskdir) + ksks = keystr_to_keylist(out, kskdir) assert len(ksks) == 1 # check that 'dnssec-ksr keygen' pregenerates right amount of keys out, _ = ksr(zone, policy, "keygen", options="-i now -e +1y") - zsks = out.split() + zsks = keystr_to_keylist(out) assert len(zsks) == 2 alg = os.environ.get("DEFAULT_ALGORITHM_NUMBER") size = os.environ.get("DEFAULT_BITS") - lifetime = 16070400 + lifetime = timedelta(days=31 * 6) check_keys(zsks, lifetime, alg, size) # check that 'dnssec-ksr keygen' pregenerates right amount of keys # in the given key directory - out, _ = ksr(zone, policy, "keygen", options="-K ns1 -i now -e +1y") - zsks = out.split() + zskdir = "ns1" + out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +1y") + zsks = keystr_to_keylist(out, zskdir) assert len(zsks) == 2 alg = os.environ.get("DEFAULT_ALGORITHM_NUMBER") size = os.environ.get("DEFAULT_BITS") - lifetime = 16070400 - check_keys(zsks, lifetime, alg, size, keydir="ns1") + lifetime = timedelta(days=31 * 6) + check_keys(zsks, lifetime, alg, size) for key in zsks: - privatefile = f"ns1/{key}.private" - keyfile = f"ns1/{key}.key" - statefile = f"ns1/{key}.state" + privatefile = f"{key.path}.private" + keyfile = f"{key.path}.key" + statefile = f"{key.path}.state" shutil.copyfile(privatefile, f"{privatefile}.backup") shutil.copyfile(keyfile, f"{keyfile}.backup") shutil.copyfile(statefile, f"{statefile}.backup") # check that 'dnssec-ksr request' creates correct ksr - now = get_timing_metadata(zsks[0], "Created", keydir="ns1") - until = addtime(now, 31536000) # 1 year - out, _ = ksr(zone, policy, "request", options=f"-K ns1 -i {now} -e +1y") + now = zsks[0].get_timing("Created") + until = now + timedelta(days=365) + out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {now} -e +1y") fname = f"{zone}.ksr.{n}" with open(fname, "w", encoding="utf-8") as file: file.write(out) - check_keysigningrequest(out, zsks, now, until, keydir="ns1") + check_keysigningrequest(out, zsks, now, until) # check that 'dnssec-ksr sign' creates correct skr out, _ = ksr( - zone, policy, "sign", options=f"-K ns1/offline -f {fname} -i {now} -e +1y" + zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {now} -e +1y" ) fname = f"{zone}.skr.{n}" @@ -494,28 +468,26 @@ def test_ksr_common(servers): file.write(out) refresh = -432000 # 5 days - check_signedkeyresponse( - out, zone, ksks, zsks, now, until, refresh, kskdir="ns1/offline", zskdir="ns1" - ) + check_signedkeyresponse(out, zone, ksks, zsks, now, until, refresh) # common test cases (2) n = 2 # check that 'dnssec-ksr keygen' selects pregenerated keys for # the same time bundle - out, _ = ksr(zone, policy, "keygen", options=f"-K ns1 -i {now} -e +1y") - selected_zsks = out.split() + out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {now} -e +1y") + selected_zsks = keystr_to_keylist(out, zskdir) assert len(selected_zsks) == 2 for index, key in enumerate(selected_zsks): assert zsks[index] == key - file_contents_equal(f"ns1/{key}.private", f"ns1/{key}.private.backup") - file_contents_equal(f"ns1/{key}.key", f"ns1/{key}.key.backup") - file_contents_equal(f"ns1/{key}.state", f"ns1/{key}.state.backup") + file_contents_equal(f"{key.path}.private", f"{key.path}.private.backup") + file_contents_equal(f"{key.path}.key", f"{key.path}.key.backup") + file_contents_equal(f"{key.path}.state", f"{key.path}.state.backup") # check that 'dnssec-ksr keygen' generates only necessary keys for # overlapping time bundle - out, err = ksr(zone, policy, "keygen", options=f"-K ns1 -i {now} -e +2y -v 1") - overlapping_zsks = out.split() + out, err = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {now} -e +2y -v 1") + overlapping_zsks = keystr_to_keylist(out, zskdir) assert len(overlapping_zsks) == 4 verbose = err.split() @@ -531,15 +503,15 @@ def test_ksr_common(servers): for index, key in enumerate(overlapping_zsks): if index < 2: assert zsks[index] == key - file_contents_equal(f"ns1/{key}.private", f"ns1/{key}.private.backup") - file_contents_equal(f"ns1/{key}.key", f"ns1/{key}.key.backup") - file_contents_equal(f"ns1/{key}.state", f"ns1/{key}.state.backup") + file_contents_equal(f"{key.path}.private", f"{key.path}.private.backup") + file_contents_equal(f"{key.path}.key", f"{key.path}.key.backup") + file_contents_equal(f"{key.path}.state", f"{key.path}.state.backup") # run 'dnssec-ksr keygen' again with verbosity 0 - out, _ = ksr(zone, policy, "keygen", options=f"-K ns1 -i {now} -e +2y") - overlapping_zsks2 = out.split() + out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {now} -e +2y") + overlapping_zsks2 = keystr_to_keylist(out, zskdir) assert len(overlapping_zsks2) == 4 - check_keys(overlapping_zsks2, lifetime, alg, size, keydir="ns1") + check_keys(overlapping_zsks2, lifetime, alg, size) for index, key in enumerate(overlapping_zsks2): assert overlapping_zsks[index] == key @@ -551,7 +523,7 @@ def test_ksr_common(servers): with open(fname, "w", encoding="utf-8") as file: file.write(out) - check_keysigningrequest(out, zsks, now, until, keydir="ns1") + check_keysigningrequest(out, zsks, now, until) # check that 'dnssec-ksr request' creates correct ksr with new interval out, _ = ksr(zone, policy, "request", options=f"-K ns1 -i {now} -e +2y") @@ -560,8 +532,8 @@ def test_ksr_common(servers): with open(fname, "w", encoding="utf-8") as file: file.write(out) - until = addtime(now, 63072000) # 2 years - check_keysigningrequest(out, overlapping_zsks, now, until, keydir="ns1") + until = now + timedelta(days=365 * 2) + check_keysigningrequest(out, overlapping_zsks, now, until) # check that 'dnssec-ksr request' errors if there are not enough keys _, err = ksr( @@ -592,8 +564,6 @@ def test_ksr_common(servers): now, until, refresh, - kskdir="ns1/offline", - zskdir="ns1", ) # add zone @@ -618,15 +588,11 @@ def test_ksr_common(servers): # - dnssec_verify isctest.kasp.dnssec_verify(ns1, zone) # - check keys - check_keys(overlapping_zsks, lifetime, alg, size, keydir="ns1", with_state=True) + check_keys(overlapping_zsks, lifetime, alg, size, with_state=True) # - check apex - isctest.kasp.check_apex( - ns1, zone, ksks, overlapping_zsks, kskdir="ns1/offline", zskdir="ns1" - ) + isctest.kasp.check_apex(ns1, zone, ksks, overlapping_zsks) # - check subdomain - isctest.kasp.check_subdomain( - ns1, zone, ksks, overlapping_zsks, kskdir="ns1/offline", zskdir="ns1" - ) + isctest.kasp.check_subdomain(ns1, zone, ksks, overlapping_zsks) # pylint: disable=too-many-locals @@ -636,38 +602,39 @@ def test_ksr_lastbundle(servers): n = 1 # create ksk - now = datetime.now().strftime("%Y%m%d%H%M%S") - offset = -31536000 - when = addtime(now, offset) - when = addtime(when, -86400) - ksks = keygen(zone, policy, "ns1/offline", when=when) + kskdir = "ns1/offline" + now = KeyTimingMetadata.now() + offset = -timedelta(days=365) + when = now + offset - timedelta(days=1) + out = keygen(zone, policy, kskdir, when=str(when)) + ksks = keystr_to_keylist(out, kskdir) assert len(ksks) == 1 # check that 'dnssec-ksr keygen' pregenerates right amount of keys - out, _ = ksr(zone, policy, "keygen", options="-K ns1 -i -1y -e +1d") - zsks = out.split() + zskdir = "ns1" + out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i -1y -e +1d") + zsks = keystr_to_keylist(out, zskdir) assert len(zsks) == 2 alg = os.environ.get("DEFAULT_ALGORITHM_NUMBER") size = os.environ.get("DEFAULT_BITS") - lifetime = 16070400 - check_keys(zsks, lifetime, alg, size, keydir="ns1", offset=offset) + lifetime = timedelta(days=31 * 6) + check_keys(zsks, lifetime, alg, size, offset=offset) # check that 'dnssec-ksr request' creates correct ksr - then = get_timing_metadata(zsks[0], "Created", keydir="ns1") - then = addtime(then, offset) - until = addtime(then, 31622400) # 1 year, 1 day - out, _ = ksr(zone, policy, "request", options=f"-K ns1 -i {then} -e +1d") + then = zsks[0].get_timing("Created") + offset + until = then + timedelta(days=366) + out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {then} -e +1d") fname = f"{zone}.ksr.{n}" with open(fname, "w", encoding="utf-8") as file: file.write(out) - check_keysigningrequest(out, zsks, then, until, keydir="ns1") + check_keysigningrequest(out, zsks, then, until) # check that 'dnssec-ksr sign' creates correct skr out, _ = ksr( - zone, policy, "sign", options=f"-K ns1/offline -f {fname} -i {then} -e +1d" + zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {then} -e +1d" ) fname = f"{zone}.skr.{n}" @@ -675,9 +642,7 @@ def test_ksr_lastbundle(servers): file.write(out) refresh = -432000 # 5 days - check_signedkeyresponse( - out, zone, ksks, zsks, then, until, refresh, kskdir="ns1/offline", zskdir="ns1" - ) + check_signedkeyresponse(out, zone, ksks, zsks, then, until, refresh) # add zone ns1 = servers["ns1"] @@ -701,13 +666,11 @@ def test_ksr_lastbundle(servers): # - dnssec_verify isctest.kasp.dnssec_verify(ns1, zone) # - check keys - check_keys(zsks, lifetime, alg, size, keydir="ns1", offset=offset, with_state=True) + check_keys(zsks, lifetime, alg, size, offset=offset, with_state=True) # - check apex - isctest.kasp.check_apex(ns1, zone, ksks, zsks, kskdir="ns1/offline", zskdir="ns1") + isctest.kasp.check_apex(ns1, zone, ksks, zsks) # - check subdomain - isctest.kasp.check_subdomain( - ns1, zone, ksks, zsks, kskdir="ns1/offline", zskdir="ns1" - ) + isctest.kasp.check_subdomain(ns1, zone, ksks, zsks) # check that last bundle warning is logged warning = "last bundle in skr, please import new skr file" @@ -721,38 +684,40 @@ def test_ksr_inthemiddle(servers): n = 1 # create ksk - now = datetime.now().strftime("%Y%m%d%H%M%S") - offset = -31536000 - when = addtime(now, offset) - when = addtime(when, -86400) - ksks = keygen(zone, policy, "ns1/offline", when=when) + kskdir = "ns1/offline" + now = KeyTimingMetadata.now() + offset = -timedelta(days=365) + when = now + offset - timedelta(days=1) + out = keygen(zone, policy, kskdir, when=str(when)) + ksks = keystr_to_keylist(out, kskdir) assert len(ksks) == 1 # check that 'dnssec-ksr keygen' pregenerates right amount of keys - out, _ = ksr(zone, policy, "keygen", options="-K ns1 -i -1y -e +1y") - zsks = out.split() + zskdir = "ns1" + out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i -1y -e +1y") + zsks = keystr_to_keylist(out, zskdir) assert len(zsks) == 4 alg = os.environ.get("DEFAULT_ALGORITHM_NUMBER") size = os.environ.get("DEFAULT_BITS") - lifetime = 16070400 - check_keys(zsks, lifetime, alg, size, keydir="ns1", offset=offset) + lifetime = timedelta(days=31 * 6) + check_keys(zsks, lifetime, alg, size, offset=offset) # check that 'dnssec-ksr request' creates correct ksr - then = get_timing_metadata(zsks[0], "Created", keydir="ns1") - then = addtime(then, offset) - until = addtime(then, 63072000) # 2 years - out, _ = ksr(zone, policy, "request", options=f"-K ns1 -i {then} -e +1y") + then = zsks[0].get_timing("Created") + then = then + offset + until = then + timedelta(days=365 * 2) + out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {then} -e +1y") fname = f"{zone}.ksr.{n}" with open(fname, "w", encoding="utf-8") as file: file.write(out) - check_keysigningrequest(out, zsks, then, until, keydir="ns1") + check_keysigningrequest(out, zsks, then, until) # check that 'dnssec-ksr sign' creates correct skr out, _ = ksr( - zone, policy, "sign", options=f"-K ns1/offline -f {fname} -i {then} -e +1y" + zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {then} -e +1y" ) fname = f"{zone}.skr.{n}" @@ -760,9 +725,7 @@ def test_ksr_inthemiddle(servers): file.write(out) refresh = -432000 # 5 days - check_signedkeyresponse( - out, zone, ksks, zsks, then, until, refresh, kskdir="ns1/offline", zskdir="ns1" - ) + check_signedkeyresponse(out, zone, ksks, zsks, then, until, refresh) # add zone ns1 = servers["ns1"] @@ -786,13 +749,11 @@ def test_ksr_inthemiddle(servers): # - dnssec_verify isctest.kasp.dnssec_verify(ns1, zone) # - check keys - check_keys(zsks, lifetime, alg, size, keydir="ns1", offset=offset, with_state=True) + check_keys(zsks, lifetime, alg, size, offset=offset, with_state=True) # - check apex - isctest.kasp.check_apex(ns1, zone, ksks, zsks, kskdir="ns1/offline", zskdir="ns1") + isctest.kasp.check_apex(ns1, zone, ksks, zsks) # - check subdomain - isctest.kasp.check_subdomain( - ns1, zone, ksks, zsks, kskdir="ns1/offline", zskdir="ns1" - ) + isctest.kasp.check_subdomain(ns1, zone, ksks, zsks) # check that no last bundle warning is logged warning = "last bundle in skr, please import new skr file" @@ -804,22 +765,25 @@ def check_ksr_rekey_logs_error(server, zone, policy, offset, end): n = 1 # create ksk - now = datetime.now().strftime("%Y%m%d%H%M%S") - then = addtime(now, offset) - until = addtime(now, end) - ksks = keygen(zone, policy, "ns1/offline", when=then) + kskdir = "ns1/offline" + now = KeyTimingMetadata.now() + then = now + offset + until = now + end + out = keygen(zone, policy, kskdir, when=str(then)) + ksks = keystr_to_keylist(out, kskdir) assert len(ksks) == 1 # key generation - out, _ = ksr(zone, policy, "keygen", options=f"-K ns1 -i {then} -e {until}") - zsks = out.split() + zskdir = "ns1" + out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {then} -e {until}") + zsks = keystr_to_keylist(out, zskdir) assert len(zsks) == 2 # create request - now = get_timing_metadata(zsks[0], "Created", keydir="ns1") - then = addtime(now, offset) - until = addtime(now, end) - out, _ = ksr(zone, policy, "request", options=f"-K ns1 -i {then} -e {until}") + now = zsks[0].get_timing("Created") + then = now + offset + until = now + end + out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {then} -e {until}") fname = f"{zone}.ksr.{n}" with open(fname, "w", encoding="utf-8") as file: @@ -827,7 +791,7 @@ def check_ksr_rekey_logs_error(server, zone, policy, offset, end): # sign request out, _ = ksr( - zone, policy, "sign", options=f"-K ns1/offline -f {fname} -i {then} -e {until}" + zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {then} -e {until}" ) fname = f"{zone}.skr.{n}" @@ -878,33 +842,36 @@ def test_ksr_unlimited(servers): n = 1 # create ksk - ksks = keygen(zone, policy, "ns1/offline") + kskdir = "ns1/offline" + out = keygen(zone, policy, kskdir) + ksks = keystr_to_keylist(out, kskdir) assert len(ksks) == 1 # check that 'dnssec-ksr keygen' pregenerates right amount of keys - out, _ = ksr(zone, policy, "keygen", options="-K ns1 -i now -e +2y") - zsks = out.split() + zskdir = "ns1" + out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +2y") + zsks = keystr_to_keylist(out, zskdir) assert len(zsks) == 1 alg = os.environ.get("DEFAULT_ALGORITHM_NUMBER") size = os.environ.get("DEFAULT_BITS") - lifetime = 0 - check_keys(zsks, lifetime, alg, size, keydir="ns1") + lifetime = None + check_keys(zsks, lifetime, alg, size) # check that 'dnssec-ksr request' creates correct ksr - now = get_timing_metadata(zsks[0], "Created", keydir="ns1") - until = addtime(now, 4 * 31536000) # 4 years - out, _ = ksr(zone, policy, "request", options=f"-K ns1 -i {now} -e +4y") + now = zsks[0].get_timing("Created") + until = now + timedelta(days=365 * 4) + out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {now} -e +4y") fname = f"{zone}.ksr.{n}" with open(fname, "w", encoding="utf-8") as file: file.write(out) - check_keysigningrequest(out, zsks, now, until, keydir="ns1") + check_keysigningrequest(out, zsks, now, until) # check that 'dnssec-ksr sign' creates correct skr without cdnskey out, _ = ksr( - zone, "no-cdnskey", "sign", options=f"-K ns1/offline -f {fname} -i {now} -e +4y" + zone, "no-cdnskey", "sign", options=f"-K {kskdir} -f {fname} -i {now} -e +4y" ) skrfile = f"{zone}.no-cdnskey.skr.{n}" @@ -920,15 +887,13 @@ def test_ksr_unlimited(servers): now, until, refresh, - kskdir="ns1/offline", - zskdir="ns1", cdnskey=False, cds="SHA-1, SHA-256, SHA-384", ) # check that 'dnssec-ksr sign' creates correct skr without cds out, _ = ksr( - zone, "no-cds", "sign", options=f"-K ns1/offline -f {fname} -i {now} -e +4y" + zone, "no-cds", "sign", options=f"-K {kskdir} -f {fname} -i {now} -e +4y" ) skrfile = f"{zone}.no-cds.skr.{n}" @@ -944,14 +909,12 @@ def test_ksr_unlimited(servers): now, until, refresh, - kskdir="ns1/offline", - zskdir="ns1", cds="", ) # check that 'dnssec-ksr sign' creates correct skr out, _ = ksr( - zone, policy, "sign", options=f"-K ns1/offline -f {fname} -i {now} -e +4y" + zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {now} -e +4y" ) skrfile = f"{zone}.{policy}.skr.{n}" @@ -959,9 +922,7 @@ def test_ksr_unlimited(servers): file.write(out) refresh = -432000 # 5 days - check_signedkeyresponse( - out, zone, ksks, zsks, now, until, refresh, kskdir="ns1/offline", zskdir="ns1" - ) + check_signedkeyresponse(out, zone, ksks, zsks, now, until, refresh) # add zone ns1 = servers["ns1"] @@ -985,13 +946,11 @@ def test_ksr_unlimited(servers): # - dnssec_verify isctest.kasp.dnssec_verify(ns1, zone) # - check keys - check_keys(zsks, lifetime, alg, size, keydir="ns1", with_state=True) + check_keys(zsks, lifetime, alg, size, with_state=True) # - check apex - isctest.kasp.check_apex(ns1, zone, ksks, zsks, kskdir="ns1/offline", zskdir="ns1") + isctest.kasp.check_apex(ns1, zone, ksks, zsks) # - check subdomain - isctest.kasp.check_subdomain( - ns1, zone, ksks, zsks, kskdir="ns1/offline", zskdir="ns1" - ) + isctest.kasp.check_subdomain(ns1, zone, ksks, zsks) # pylint: disable=too-many-locals @@ -1001,12 +960,15 @@ def test_ksr_twotone(servers): n = 1 # create ksk - ksks = keygen(zone, policy, "ns1/offline") + kskdir = "ns1/offline" + out = keygen(zone, policy, kskdir) + ksks = keystr_to_keylist(out, kskdir) assert len(ksks) == 2 # check that 'dnssec-ksr keygen' pregenerates right amount of keys - out, _ = ksr(zone, policy, "keygen", options="-K ns1 -i now -e +1y") - zsks = out.split() + zskdir = "ns1" + out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +1y") + zsks = keystr_to_keylist(out, zskdir) # First algorithm keys have a lifetime of 3 months, so there should # be 4 created keys. Second algorithm keys have a lifetime of 5 # months, so there should be 3 created keys. While only two time @@ -1017,7 +979,7 @@ def test_ksr_twotone(servers): zsks_defalg = [] zsks_altalg = [] for zsk in zsks: - alg = get_metadata(zsk, "Algorithm", keydir="ns1") + alg = zsk.get_metadata("Algorithm") if alg == os.environ.get("DEFAULT_ALGORITHM_NUMBER"): zsks_defalg.append(zsk) elif alg == os.environ.get("ALTERNATIVE_ALGORITHM_NUMBER"): @@ -1028,38 +990,36 @@ def test_ksr_twotone(servers): alg = os.environ.get("DEFAULT_ALGORITHM_NUMBER") size = os.environ.get("DEFAULT_BITS") - lifetime = 8035200 # 3 months - check_keys(zsks_defalg, lifetime, alg, size, keydir="ns1") + lifetime = timedelta(days=31 * 3) + check_keys(zsks_defalg, lifetime, alg, size) alg = os.environ.get("ALTERNATIVE_ALGORITHM_NUMBER") size = os.environ.get("ALTERNATIVE_BITS") - lifetime = 13392000 # 5 months - check_keys(zsks_altalg, lifetime, alg, size, keydir="ns1") + lifetime = timedelta(days=31 * 5) + check_keys(zsks_altalg, lifetime, alg, size) # check that 'dnssec-ksr request' creates correct ksr - now = get_timing_metadata(zsks[0], "Created", keydir="ns1") - until = addtime(now, 31536000) # 1 year - out, _ = ksr(zone, policy, "request", options=f"-K ns1 -i {now} -e +1y") + now = zsks[0].get_timing("Created") + until = now + timedelta(days=365) + out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {now} -e +1y") fname = f"{zone}.ksr.{n}" with open(fname, "w", encoding="utf-8") as file: file.write(out) - check_keysigningrequest(out, zsks, now, until, keydir="ns1") + check_keysigningrequest(out, zsks, now, until) # check that 'dnssec-ksr sign' creates correct skr out, _ = ksr( - zone, policy, "sign", options=f"-K ns1/offline -f {fname} -i {now} -e +1y" + zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {now} -e +1y" ) skrfile = f"{zone}.skr.{n}" with open(skrfile, "w", encoding="utf-8") as file: file.write(out) - refresh = -432000 # 5 days - check_signedkeyresponse( - out, zone, ksks, zsks, now, until, refresh, kskdir="ns1/offline", zskdir="ns1" - ) + refresh = -timedelta(days=5) + check_signedkeyresponse(out, zone, ksks, zsks, now, until, refresh) # add zone ns1 = servers["ns1"] @@ -1085,16 +1045,14 @@ def test_ksr_twotone(servers): # - check keys alg = os.environ.get("DEFAULT_ALGORITHM_NUMBER") size = os.environ.get("DEFAULT_BITS") - lifetime = 8035200 # 3 months - check_keys(zsks_defalg, lifetime, alg, size, keydir="ns1", with_state=True) + lifetime = timedelta(days=31 * 3) + check_keys(zsks_defalg, lifetime, alg, size, with_state=True) alg = os.environ.get("ALTERNATIVE_ALGORITHM_NUMBER") size = os.environ.get("ALTERNATIVE_BITS") - lifetime = 13392000 # 5 months - check_keys(zsks_altalg, lifetime, alg, size, keydir="ns1", with_state=True) + lifetime = timedelta(days=31 * 5) + check_keys(zsks_altalg, lifetime, alg, size, with_state=True) # - check apex - isctest.kasp.check_apex(ns1, zone, ksks, zsks, kskdir="ns1/offline", zskdir="ns1") + isctest.kasp.check_apex(ns1, zone, ksks, zsks) # - check subdomain - isctest.kasp.check_subdomain( - ns1, zone, ksks, zsks, kskdir="ns1/offline", zskdir="ns1" - ) + isctest.kasp.check_subdomain(ns1, zone, ksks, zsks)