Use convenience wrappers for kasp key operations

This commit is contained in:
Nicki Křížek 2024-10-07 18:08:02 +02:00 committed by Matthijs Mekking
parent a15bf6704b
commit 2b0a8fcfb5
2 changed files with 431 additions and 447 deletions

View file

@ -9,8 +9,12 @@
# See the COPYRIGHT file distributed with this work for additional
# information regarding copyright ownership.
from functools import total_ordering
import os
from pathlib import Path
import re
import time
from typing import Optional, Union
from datetime import datetime
from datetime import timedelta
@ -41,152 +45,191 @@ def _query(server, qname, qtype, outfile=None):
return response
def addtime(value, plus):
# Get timing metadata from a value plus additional time.
# Convert "%Y%m%d%H%M%S" format to epoch seconds.
# Then, add the additional time (can be negative).
now = datetime.strptime(value, "%Y%m%d%H%M%S")
delta = timedelta(seconds=plus)
then = now + delta
return then.strftime("%Y%m%d%H%M%S")
@total_ordering
class KeyTimingMetadata:
"""
Represent a single timing information for a key.
These objects can be easily compared, support addition and subtraction of
timedelta objects or integers(value in seconds). A lack of timing metadata
in the key (value 0) should be represented with None rather than an
instance of this object.
"""
FORMAT = "%Y%m%d%H%M%S"
def __init__(self, timestamp: str):
if int(timestamp) <= 0:
raise ValueError(f'invalid timing metadata value: "{timestamp}"')
self.value = datetime.strptime(timestamp, self.FORMAT)
def __repr__(self):
return self.value.strftime(self.FORMAT)
def __str__(self) -> str:
return self.value.strftime(self.FORMAT)
def __add__(self, other: Union[timedelta, int]):
if isinstance(other, int):
other = timedelta(seconds=other)
result = KeyTimingMetadata.__new__(KeyTimingMetadata)
result.value = self.value + other
return result
def __sub__(self, other: Union[timedelta, int]):
if isinstance(other, int):
other = timedelta(seconds=other)
result = KeyTimingMetadata.__new__(KeyTimingMetadata)
result.value = self.value - other
return result
def __iadd__(self, other: Union[timedelta, int]):
if isinstance(other, int):
other = timedelta(seconds=other)
self.value += other
def __isub__(self, other: Union[timedelta, int]):
if isinstance(other, int):
other = timedelta(seconds=other)
self.value -= other
def __lt__(self, other: "KeyTimingMetadata"):
return self.value < other.value
def __eq__(self, other: object):
return isinstance(other, KeyTimingMetadata) and self.value == other.value
@staticmethod
def now() -> "KeyTimingMetadata":
result = KeyTimingMetadata.__new__(KeyTimingMetadata)
result.value = datetime.now()
return result
def get_timing_metadata(key, metadata, keydir=None, offset=0, must_exist=True):
value = "0"
@total_ordering
class Key:
"""
Represent a key from a keyfile.
if keydir is not None:
keyfile = "{}/{}.key".format(keydir, key)
else:
keyfile = "{}.key".format(key)
This object keeps track of its origin (keydir + name), can be used to
retrieve metadata from the underlying files and supports convenience
operations for KASP tests.
"""
with open(keyfile, "r", encoding="utf-8") as file:
for line in file:
if "; {}".format(metadata) in line:
value = line.split()[2]
break
def __init__(self, name: str, keydir: Optional[Union[str, Path]] = None):
self.name = name
if keydir is None:
self.keydir = Path()
else:
self.keydir = Path(keydir)
self.path = str(self.keydir / name)
self.keyfile = f"{self.path}.key"
self.statefile = f"{self.path}.state"
self.tag = int(self.name[-5:])
if must_exist:
assert int(value) > 0
def get_timing(
self, metadata: str, must_exist: bool = True
) -> Optional[KeyTimingMetadata]:
regex = rf";\s+{metadata}:\s+(\d+).*"
with open(self.keyfile, "r", encoding="utf-8") as file:
for line in file:
match = re.match(regex, line)
if match is not None:
try:
return KeyTimingMetadata(match.group(1))
except ValueError:
break
if must_exist:
raise ValueError(
f'timing metadata "{metadata}" for key "{self.name}" invalid'
)
return None
if int(value) > 0:
return addtime(value, offset)
def get_metadata(self, metadata: str, must_exist=True) -> str:
value = "undefined"
regex = rf"{metadata}:\s+(.*)"
with open(self.statefile, "r", encoding="utf-8") as file:
for line in file:
match = re.match(regex, line)
if match is not None:
value = match.group(1)
break
if must_exist and value == "undefined":
raise ValueError(
'state metadata "{metadata}" for key "{self.name}" undefined'
)
return value
return "0"
def is_ksk(self) -> bool:
return self.get_metadata("KSK") == "yes"
def is_zsk(self) -> bool:
return self.get_metadata("ZSK") == "yes"
def get_metadata(key, metadata, keydir=None, must_exist=True):
if keydir is not None:
statefile = "{}/{}.state".format(keydir, key)
else:
statefile = "{}.state".format(key)
def dnskey_equals(self, value, cdnskey=False):
dnskey = value.split()
value = "undefined"
with open(statefile, "r", encoding="utf-8") as file:
for line in file:
if f"{metadata}: " in line:
value = line.split()[1]
break
if cdnskey:
# fourth element is the rrtype
assert dnskey[3] == "CDNSKEY"
dnskey[3] = "DNSKEY"
if must_exist:
assert value != "undefined"
dnskey_fromfile = []
rdata = " ".join(dnskey[:7])
return value
with open(self.keyfile, "r", encoding="utf-8") as file:
for line in file:
if f"{rdata}" in line:
dnskey_fromfile = line.split()
pubkey_fromfile = "".join(dnskey_fromfile[7:])
pubkey_fromwire = "".join(dnskey[7:])
def get_keystate(key, metadata, keydir=None, must_exist=True):
return pubkey_fromfile == pubkey_fromwire
return get_metadata(key, metadata, keydir, must_exist)
def cds_equals(self, value, alg):
cds = value.split()
dsfromkey_command = [
os.environ.get("DSFROMKEY"),
"-T",
"3600",
"-a",
alg,
"-C",
"-w",
str(self.keyfile),
]
def get_keytag(key):
return int(key[-5:])
out = isctest.run.cmd(dsfromkey_command, log_stdout=True)
dsfromkey = out.stdout.decode("utf-8").split()
rdata_fromfile = " ".join(dsfromkey[:7])
rdata_fromwire = " ".join(cds[:7])
if rdata_fromfile != rdata_fromwire:
isctest.log.debug(
f"CDS RDATA MISMATCH: {rdata_fromfile} - {rdata_fromwire}"
)
return False
def get_keyrole(key, keydir=None):
ksk = "no"
zsk = "no"
digest_fromfile = "".join(dsfromkey[7:]).lower()
digest_fromwire = "".join(cds[7:]).lower()
if digest_fromfile != digest_fromwire:
isctest.log.debug(
f"CDS DIGEST MISMATCH: {digest_fromfile} - {digest_fromwire}"
)
return False
if keydir is not None:
statefile = "{}/{}.state".format(keydir, key)
else:
statefile = "{}.state".format(key)
return digest_fromfile == digest_fromwire
with open(statefile, "r", encoding="utf-8") as file:
for line in file:
if "KSK: " in line:
ksk = line.split()[1]
if "ZSK: " in line:
zsk = line.split()[1]
def __lt__(self, other: "Key"):
return self.name < other.name
return ksk == "yes", zsk == "yes"
def __eq__(self, other: object):
return isinstance(other, Key) and self.path == other.path
def dnskey_equals(key, value, keydir=None, cdnskey=False):
if keydir is not None:
keyfile = f"{keydir}/{key}.key"
else:
keyfile = f"{key}.key"
dnskey = value.split()
if cdnskey:
# fourth element is the rrtype
assert dnskey[3] == "CDNSKEY"
dnskey[3] = "DNSKEY"
dnskey_fromfile = []
rdata = " ".join(dnskey[:7])
with open(keyfile, "r", encoding="utf-8") as file:
for line in file:
if f"{rdata}" in line:
dnskey_fromfile = line.split()
pubkey_fromfile = "".join(dnskey_fromfile[7:])
pubkey_fromwire = "".join(dnskey[7:])
return pubkey_fromfile == pubkey_fromwire
def cds_equals(key, value, alg, keydir=None):
if keydir is not None:
keyfile = f"{keydir}/{key}.key"
else:
keyfile = f"{key}.key"
cds = value.split()
dsfromkey_command = [
*os.environ.get("DSFROMKEY").split(),
"-T",
"3600",
"-a",
alg,
"-C",
"-w",
keyfile,
]
out = isctest.run.cmd(dsfromkey_command, log_stdout=True)
dsfromkey = out.stdout.decode("utf-8").split()
index = 6
while index < len(cds):
dsfromkey[index] = dsfromkey[index].lower()
index += 1
rdata_fromfile = " ".join(dsfromkey[:7])
rdata_fromwire = " ".join(cds[:7])
if rdata_fromfile != rdata_fromwire:
isctest.log.debug(f"CDS RDATA MISMATCH: {rdata_fromfile} - {rdata_fromwire}")
return False
digest_fromfile = "".join(cds[7:])
digest_fromwire = "".join(cds[7:])
if digest_fromfile != digest_fromwire:
isctest.log.debug(f"CDS DIGEST MISMATCH: {digest_fromfile} - {digest_fromwire}")
return False
return digest_fromfile == digest_fromwire
def __repr__(self):
return self.path
def zone_is_signed(server, zone):
@ -278,13 +321,11 @@ def check_dnssecstatus(server, zone, keys, policy=None, view=None):
assert "dnssec-policy: {}".format(policy) in response
for key in keys:
keytag = get_keytag(key)
assert "key: {}".format(keytag) in response
assert "key: {}".format(key.tag) in response
# pylint: disable=too-many-locals,too-many-branches
def _check_signatures(signatures, covers, fqdn, keys, keydir=None):
now = datetime.now().strftime("%Y%m%d%H%M%S")
def _check_signatures(signatures, covers, fqdn, keys):
now = KeyTimingMetadata.now()
numsigs = 0
zrrsig = True
if covers in [dns.rdatatype.DNSKEY, dns.rdatatype.CDNSKEY, dns.rdatatype.CDS]:
@ -292,51 +333,48 @@ def _check_signatures(signatures, covers, fqdn, keys, keydir=None):
krrsig = not zrrsig
for key in keys:
keytag = get_keytag(key)
ksk, zsk = get_keyrole(key, keydir=keydir)
activate = get_timing_metadata(key, "Activate", keydir=keydir)
inactive = get_timing_metadata(key, "Inactive", keydir=keydir, must_exist=False)
activate = key.get_timing("Activate")
inactive = key.get_timing("Inactive", must_exist=False)
active = int(now) >= int(activate)
retired = int(inactive) != 0 and int(inactive) <= int(now)
active = now >= activate
retired = inactive is not None and inactive <= now
signing = active and not retired
if not signing:
for rrsig in signatures:
assert f"{keytag} {fqdn}" not in rrsig
assert f"{key.tag} {fqdn}" not in rrsig
continue
if zrrsig and zsk:
if zrrsig and key.is_zsk():
has_rrsig = False
for rrsig in signatures:
if f"{keytag} {fqdn}" in rrsig:
if f"{key.tag} {fqdn}" in rrsig:
has_rrsig = True
break
assert has_rrsig
numsigs += 1
if zrrsig and not zsk:
if zrrsig and not key.is_zsk():
for rrsig in signatures:
assert f"{keytag} {fqdn}" not in rrsig
assert f"{key.tag} {fqdn}" not in rrsig
if krrsig and ksk:
if krrsig and key.is_ksk():
has_rrsig = False
for rrsig in signatures:
if f"{keytag} {fqdn}" in rrsig:
if f"{key.tag} {fqdn}" in rrsig:
has_rrsig = True
break
assert has_rrsig
numsigs += 1
if krrsig and not ksk:
if krrsig and not key.is_ksk():
for rrsig in signatures:
assert f"{keytag} {fqdn}" not in rrsig
assert f"{key.tag} {fqdn}" not in rrsig
return numsigs
# pylint: disable=too-many-arguments
def check_signatures(rrset, covers, fqdn, ksks, zsks, kskdir=None, zskdir=None):
def check_signatures(rrset, covers, fqdn, ksks, zsks):
# Check if signatures with covering type are signed with the right keys.
# The right keys are the ones that expect a signature and have the
# correct role.
@ -350,14 +388,14 @@ def check_signatures(rrset, covers, fqdn, ksks, zsks, kskdir=None, zskdir=None):
rrsig = f"{rr.name} {rr.ttl} {rdclass} {rdtype} {rdata}"
signatures.append(rrsig)
numsigs += _check_signatures(signatures, covers, fqdn, ksks, keydir=kskdir)
numsigs += _check_signatures(signatures, covers, fqdn, zsks, keydir=zskdir)
numsigs += _check_signatures(signatures, covers, fqdn, ksks)
numsigs += _check_signatures(signatures, covers, fqdn, zsks)
assert numsigs == len(signatures)
def _check_dnskeys(dnskeys, keys, keydir=None, cdnskey=False):
now = datetime.now().strftime("%Y%m%d%H%M%S")
def _check_dnskeys(dnskeys, keys, cdnskey=False):
now = KeyTimingMetadata.now()
numkeys = 0
publish_md = "Publish"
@ -367,19 +405,19 @@ def _check_dnskeys(dnskeys, keys, keydir=None, cdnskey=False):
delete_md = f"Sync{delete_md}"
for key in keys:
publish = get_timing_metadata(key, publish_md, keydir=keydir)
delete = get_timing_metadata(key, delete_md, keydir=keydir, must_exist=False)
published = int(now) >= int(publish)
removed = int(delete) != 0 and int(delete) <= int(now)
publish = key.get_timing(publish_md)
delete = key.get_timing(delete_md, must_exist=False)
published = now >= publish
removed = delete is not None and delete <= now
if not published or removed:
for dnskey in dnskeys:
assert not dnskey_equals(key, dnskey, keydir=keydir, cdnskey=cdnskey)
assert not key.dnskey_equals(dnskey, cdnskey=cdnskey)
continue
has_dnskey = False
for dnskey in dnskeys:
if dnskey_equals(key, dnskey, keydir=keydir, cdnskey=cdnskey):
if key.dnskey_equals(dnskey, cdnskey=cdnskey):
has_dnskey = True
break
@ -389,8 +427,7 @@ def _check_dnskeys(dnskeys, keys, keydir=None, cdnskey=False):
return numkeys
# pylint: disable=too-many-arguments
def check_dnskeys(rrset, ksks, zsks, kskdir=None, zskdir=None, cdnskey=False):
def check_dnskeys(rrset, ksks, zsks, cdnskey=False):
# Check if the correct DNSKEY records are published. If the current time
# is between the timing metadata 'publish' and 'delete', the key must have
# a DNSKEY record published. If 'cdnskey' is True, check against CDNSKEY
@ -405,20 +442,20 @@ def check_dnskeys(rrset, ksks, zsks, kskdir=None, zskdir=None, cdnskey=False):
dnskey = f"{rr.name} {rr.ttl} {rdclass} {rdtype} {rdata}"
dnskeys.append(dnskey)
numkeys += _check_dnskeys(dnskeys, ksks, keydir=kskdir, cdnskey=cdnskey)
numkeys += _check_dnskeys(dnskeys, ksks, cdnskey=cdnskey)
if not cdnskey:
numkeys += _check_dnskeys(dnskeys, zsks, keydir=zskdir)
numkeys += _check_dnskeys(dnskeys, zsks)
assert numkeys == len(dnskeys)
# pylint: disable=too-many-locals
def check_cds(rrset, keys, keydir=None):
def check_cds(rrset, keys):
# Check if the correct CDS records are published. If the current time
# is between the timing metadata 'publish' and 'delete', the key must have
# a DNSKEY record published. If 'cdnskey' is True, check against CDNSKEY
# records instead.
now = datetime.now().strftime("%Y%m%d%H%M%S")
now = KeyTimingMetadata.now()
numcds = 0
cdss = []
@ -430,21 +467,20 @@ def check_cds(rrset, keys, keydir=None):
cdss.append(cds)
for key in keys:
ksk, _ = get_keyrole(key, keydir=keydir)
assert ksk
assert key.is_ksk()
publish = get_timing_metadata(key, "SyncPublish", keydir=keydir)
delete = get_timing_metadata(key, "SyncDelete", keydir=keydir, must_exist=False)
published = int(now) >= int(publish)
removed = int(delete) != 0 and int(delete) <= int(now)
publish = key.get_timing("SyncPublish")
delete = key.get_timing("SyncDelete", must_exist=False)
published = now >= publish
removed = delete is not None and delete <= now
if not published or removed:
for cds in cdss:
assert not cds_equals(key, cds, "SHA-256", keydir=keydir)
assert not key.cds_equals(cds, "SHA-256")
continue
has_cds = False
for cds in cdss:
if cds_equals(key, cds, "SHA-256", keydir=keydir):
if key.cds_equals(cds, "SHA-256"):
has_cds = True
break
@ -475,8 +511,7 @@ def _query_rrset(server, fqdn, qtype):
return rrs, rrsigs
# pylint: disable=too-many-arguments
def check_apex(server, zone, ksks, zsks, kskdir=None, zskdir=None):
def check_apex(server, zone, ksks, zsks):
# Test the apex of a zone. This checks that the SOA and DNSKEY RRsets
# are signed correctly and with the appropriate keys.
fqdn = f"{zone}."
@ -484,42 +519,33 @@ def check_apex(server, zone, ksks, zsks, kskdir=None, zskdir=None):
# test dnskey query
dnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.DNSKEY)
assert len(dnskeys) > 0
check_dnskeys(dnskeys, ksks, zsks, kskdir=kskdir, zskdir=zskdir)
check_dnskeys(dnskeys, ksks, zsks)
assert len(rrsigs) > 0
check_signatures(
rrsigs, dns.rdatatype.DNSKEY, fqdn, ksks, zsks, kskdir=kskdir, zskdir=zskdir
)
check_signatures(rrsigs, dns.rdatatype.DNSKEY, fqdn, ksks, zsks)
# test soa query
soa, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.SOA)
assert len(soa) == 1
assert f"{zone}. {DEFAULT_TTL} IN SOA" in soa[0].to_text()
assert len(rrsigs) > 0
check_signatures(
rrsigs, dns.rdatatype.SOA, fqdn, ksks, zsks, kskdir=kskdir, zskdir=zskdir
)
check_signatures(rrsigs, dns.rdatatype.SOA, fqdn, ksks, zsks)
# test cdnskey query
cdnskeys, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDNSKEY)
assert len(cdnskeys) > 0
check_dnskeys(cdnskeys, ksks, zsks, kskdir=kskdir, zskdir=zskdir, cdnskey=True)
check_dnskeys(cdnskeys, ksks, zsks, cdnskey=True)
assert len(rrsigs) > 0
check_signatures(
rrsigs, dns.rdatatype.CDNSKEY, fqdn, ksks, zsks, kskdir=kskdir, zskdir=zskdir
)
check_signatures(rrsigs, dns.rdatatype.CDNSKEY, fqdn, ksks, zsks)
# test cds query
cds, rrsigs = _query_rrset(server, fqdn, dns.rdatatype.CDS)
assert len(cds) > 0
check_cds(cds, ksks, keydir=kskdir)
check_cds(cds, ksks)
assert len(rrsigs) > 0
check_signatures(
rrsigs, dns.rdatatype.CDS, fqdn, ksks, zsks, kskdir=kskdir, zskdir=zskdir
)
check_signatures(rrsigs, dns.rdatatype.CDS, fqdn, ksks, zsks)
# pylint: disable=too-many-arguments
def check_subdomain(server, zone, ksks, zsks, kskdir=None, zskdir=None):
def check_subdomain(server, zone, ksks, zsks):
# Test an RRset below the apex and verify it is signed correctly.
fqdn = f"{zone}."
qname = f"a.{zone}."
@ -538,4 +564,4 @@ def check_subdomain(server, zone, ksks, zsks, kskdir=None, zskdir=None):
assert match in rrset.to_text()
assert len(rrsigs) > 0
check_signatures(rrsigs, qtype, fqdn, ksks, zsks, kskdir=kskdir, zskdir=zskdir)
check_signatures(rrsigs, qtype, fqdn, ksks, zsks)

View file

@ -11,29 +11,26 @@
# pylint: disable=too-many-lines
from datetime import timedelta
import os
import shutil
import time
from typing import List, Optional
from datetime import datetime
import isctest
from isctest.kasp import (
addtime,
cds_equals,
dnskey_equals,
get_keytag,
get_metadata,
get_timing_metadata,
Key,
KeyTimingMetadata,
)
def between(value, start, end):
if int(value) == 0:
if value is None or start is None or end is None:
return False
return int(value) > int(start) and int(value) < int(end)
return start < value < end
def file_contents_equal(file1, file2):
@ -46,6 +43,10 @@ def file_contents_equal(file1, file2):
isctest.run.cmd(diff_command)
def keystr_to_keylist(keystr: str, keydir: Optional[str] = None) -> List[Key]:
return [Key(name, keydir) for name in keystr.split()]
def keygen(zone, policy, keydir, when="now"):
keygen_command = [
*os.environ.get("KEYGEN").split(),
@ -65,9 +66,7 @@ def keygen(zone, policy, keydir, when="now"):
when,
zone,
]
output = isctest.run.cmd(keygen_command, log_stdout=True).stdout.decode("utf-8")
keys = output.split()
return keys
return isctest.run.cmd(keygen_command, log_stdout=True).stdout.decode("utf-8")
def ksr(zone, policy, action, options="", raise_on_exception=True):
@ -89,21 +88,15 @@ def ksr(zone, policy, action, options="", raise_on_exception=True):
# pylint: disable=too-many-arguments,too-many-branches,too-many-locals,too-many-statements
def check_keys(keys, lifetime, alg, size, keydir=None, offset=0, with_state=False):
def check_keys(keys, lifetime, alg, size, offset=0, with_state=False):
# Check keys that were created.
inception = 0
num = 0
now = datetime.now().strftime("%Y%m%d%H%M%S")
now = KeyTimingMetadata.now()
for key in keys:
if keydir is not None:
statefile = f"{keydir}/{key}.state"
else:
statefile = f"{key}.state"
# created: from keyfile plus offset
created = get_timing_metadata(key, "Created", keydir=keydir, offset=offset)
created = key.get_timing("Created") + offset
# active: retired previous key
if num == 0:
@ -111,23 +104,22 @@ def check_keys(keys, lifetime, alg, size, keydir=None, offset=0, with_state=Fals
else:
active = retired
# published: 2h5m (dnskey-ttl + publish-safety + propagation)
published = addtime(active, -7500)
# published: dnskey-ttl + publish-safety + propagation
published = active - timedelta(hours=2, minutes=5)
# retired: zsk-lifetime
if lifetime > 0:
retired = addtime(active, lifetime)
# removed: 10d1h5m
# (ttlsig + retire-safety + sign-delay + propagation)
removed = addtime(retired, 867900)
if lifetime is not None:
retired = active + lifetime
# removed: ttlsig + retire-safety + sign-delay + propagation
removed = retired + timedelta(days=10, hours=1, minutes=5)
else:
retired = 0
removed = 0
retired = None
removed = None
if between(now, published, retired) or int(retired) == 0:
if retired is None or between(now, published, retired):
goal = "omnipresent"
pubdelay = addtime(published, 7500)
signdelay = addtime(active, 867900)
pubdelay = published + timedelta(hours=2, minutes=5)
signdelay = active + timedelta(days=10, hours=1, minutes=5)
if between(now, published, pubdelay):
state_dnskey = "rumoured"
@ -143,20 +135,21 @@ def check_keys(keys, lifetime, alg, size, keydir=None, offset=0, with_state=Fals
state_dnskey = "hidden"
state_zrrsig = "hidden"
with open(statefile, "r", encoding="utf-8") as file:
with open(key.statefile, "r", encoding="utf-8") as file:
metadata = file.read()
assert f"Algorithm: {alg}" in metadata
assert f"Length: {size}" in metadata
assert f"Lifetime: {lifetime}" in metadata
assert "KSK: no" in metadata
assert "ZSK: yes" in metadata
assert f"Published: {published}" in metadata
assert f"Active: {active}" in metadata
if lifetime > 0:
if lifetime is not None:
assert f"Retired: {retired}" in metadata
assert f"Removed: {removed}" in metadata
assert f"Lifetime: {int(lifetime.total_seconds())}" in metadata
else:
assert "Lifetime: 0" in metadata
assert "Retired:" not in metadata
assert "Removed:" not in metadata
@ -167,39 +160,36 @@ def check_keys(keys, lifetime, alg, size, keydir=None, offset=0, with_state=Fals
assert "KRRSIGState:" not in metadata
assert "DSState:" not in metadata
inception += lifetime
num += 1
def check_keysigningrequest(out, zsks, start, end, keydir=None):
def check_keysigningrequest(out, zsks, start, end):
lines = out.split("\n")
line_no = 0
inception = start
while int(inception) < int(end):
next_bundle = addtime(end, 1)
while inception < end:
next_bundle = end + 1
# expect bundle header
assert f";; KeySigningRequest 1.0 {inception}" in lines[line_no]
line_no += 1
# expect zsks
for key in sorted(zsks):
published = get_timing_metadata(key, "Publish", keydir=keydir)
published = key.get_timing("Publish")
if between(published, inception, next_bundle):
next_bundle = published
removed = get_timing_metadata(
key, "Delete", keydir=keydir, must_exist=False
)
removed = key.get_timing("Delete", must_exist=False)
if between(removed, inception, next_bundle):
next_bundle = removed
if int(published) > int(inception):
if published > inception:
continue
if int(removed) != 0 and int(inception) >= int(removed):
if removed is not None and inception >= removed:
continue
# this zsk must be in the ksr
assert dnskey_equals(key, lines[line_no], keydir=keydir)
assert key.dnskey_equals(lines[line_no])
line_no += 1
inception = next_bundle
@ -225,17 +215,15 @@ def check_signedkeyresponse(
start,
end,
refresh,
kskdir=None,
zskdir=None,
cdnskey=True,
cds="SHA-256",
):
lines = out.split("\n")
line_no = 0
next_bundle = addtime(end, 1)
next_bundle = end + 1
inception = start
while int(inception) < int(end):
while inception < end:
# A single signed key response may consist of:
# ;; SignedKeyResponse (header)
# ;; DNSKEY 257 (one per published key in ksks)
@ -246,9 +234,9 @@ def check_signedkeyresponse(
# ;; CDS (one per published key in ksks)
# ;; RRSIG(CDS) (one per active key in ksks)
sigstart = addtime(inception, -3600) # clockskew: 1 hour
sigend = addtime(inception, 1209600) # sig-validity: 14 days
next_bundle = addtime(sigend, refresh)
sigstart = inception - timedelta(hours=1) # clockskew
sigend = inception + timedelta(days=14) # sig-validity
next_bundle = sigend + refresh
# ignore empty lines
while line_no < len(lines):
@ -263,56 +251,49 @@ def check_signedkeyresponse(
# expect ksks
for key in sorted(ksks):
published = get_timing_metadata(key, "Publish", keydir=kskdir)
removed = get_timing_metadata(
key, "Delete", keydir=kskdir, must_exist=False
)
published = key.get_timing("Publish")
removed = key.get_timing("Delete", must_exist=False)
if int(published) > int(inception):
if published > inception:
continue
if int(removed) != 0 and int(inception) >= int(removed):
if removed is not None and inception >= removed:
continue
# this ksk must be in the ksr
assert dnskey_equals(key, lines[line_no], keydir=kskdir)
assert key.dnskey_equals(lines[line_no])
line_no += 1
# expect zsks
for key in sorted(zsks):
published = get_timing_metadata(key, "Publish", keydir=zskdir)
published = key.get_timing("Publish")
if between(published, inception, next_bundle):
next_bundle = published
removed = get_timing_metadata(
key, "Delete", keydir=zskdir, must_exist=False
)
removed = key.get_timing("Delete", must_exist=False)
if between(removed, inception, next_bundle):
next_bundle = removed
if int(published) > int(inception):
if published > inception:
continue
if int(removed) != 0 and int(inception) >= int(removed):
if removed is not None and inception >= removed:
continue
# this zsk must be in the ksr
assert dnskey_equals(key, lines[line_no], keydir=zskdir)
assert key.dnskey_equals(lines[line_no])
line_no += 1
# expect rrsig(dnskey)
for key in sorted(ksks):
active = get_timing_metadata(key, "Activate", keydir=kskdir)
inactive = get_timing_metadata(
key, "Inactive", keydir=kskdir, must_exist=False
)
if int(active) > int(inception):
active = key.get_timing("Activate")
inactive = key.get_timing("Inactive", must_exist=False)
if active > inception:
continue
if int(inactive) != 0 and int(inception) >= int(inactive):
if inactive is not None and inception >= inactive:
continue
# there must be a signature of this ksk
keytag = get_keytag(key)
alg = get_metadata(key, "Algorithm", keydir=kskdir)
expect = f"{zone}. 3600 IN RRSIG DNSKEY {alg} 2 3600 {sigend} {sigstart} {keytag} {zone}."
alg = key.get_metadata("Algorithm")
expect = f"{zone}. 3600 IN RRSIG DNSKEY {alg} 2 3600 {sigend} {sigstart} {key.tag} {zone}."
rrsig = " ".join(lines[line_no].split())
assert expect in rrsig
line_no += 1
@ -320,34 +301,29 @@ def check_signedkeyresponse(
# expect cdnskey
if cdnskey:
for key in sorted(ksks):
published = get_timing_metadata(key, "Publish", keydir=kskdir)
removed = get_timing_metadata(
key, "Delete", keydir=kskdir, must_exist=False
)
if int(published) > int(inception):
published = key.get_timing("Publish")
removed = key.get_timing("Delete", must_exist=False)
if published > inception:
continue
if int(removed) != 0 and int(inception) >= int(removed):
if removed is not None and inception >= removed:
continue
# the cdnskey of this ksk must be in the ksr
assert dnskey_equals(key, lines[line_no], keydir=kskdir, cdnskey=True)
assert key.dnskey_equals(lines[line_no], cdnskey=True)
line_no += 1
# expect rrsig(cdnskey)
for key in sorted(ksks):
active = get_timing_metadata(key, "Activate", keydir=kskdir)
inactive = get_timing_metadata(
key, "Inactive", keydir=kskdir, must_exist=False
)
if int(active) > int(inception):
active = key.get_timing("Activate")
inactive = key.get_timing("Inactive", must_exist=False)
if active > inception:
continue
if int(inactive) != 0 and int(inception) >= int(inactive):
if inactive is not None and inception >= inactive:
continue
# there must be a signature of this ksk
keytag = get_keytag(key)
alg = get_metadata(key, "Algorithm", keydir=kskdir)
expect = f"{zone}. 3600 IN RRSIG CDNSKEY {alg} 2 3600 {sigend} {sigstart} {keytag} {zone}."
alg = key.get_metadata("Algorithm")
expect = f"{zone}. 3600 IN RRSIG CDNSKEY {alg} 2 3600 {sigend} {sigstart} {key.tag} {zone}."
rrsig = " ".join(lines[line_no].split())
assert expect in rrsig
line_no += 1
@ -355,36 +331,31 @@ def check_signedkeyresponse(
# expect cds
if cds != "":
for key in sorted(ksks):
published = get_timing_metadata(key, "Publish", keydir=kskdir)
removed = get_timing_metadata(
key, "Delete", keydir=kskdir, must_exist=False
)
if int(published) > int(inception):
published = key.get_timing("Publish")
removed = key.get_timing("Delete", must_exist=False)
if published > inception:
continue
if int(removed) != 0 and int(inception) >= int(removed):
if removed is not None and inception >= removed:
continue
# the cds of this ksk must be in the ksr
expected_cds = cds.split(",")
for alg in expected_cds:
assert cds_equals(key, lines[line_no], alg.strip(), keydir=kskdir)
assert key.cds_equals(lines[line_no], alg.strip())
line_no += 1
# expect rrsig(cds)
for key in sorted(ksks):
active = get_timing_metadata(key, "Activate", keydir=kskdir)
inactive = get_timing_metadata(
key, "Inactive", keydir=kskdir, must_exist=False
)
if int(active) > int(inception):
active = key.get_timing("Activate")
inactive = key.get_timing("Inactive", must_exist=False)
if active > inception:
continue
if int(inactive) != 0 and int(inception) >= int(inactive):
if inactive is not None and inception >= inactive:
continue
# there must be a signature of this ksk
keytag = get_keytag(key)
alg = get_metadata(key, "Algorithm", keydir=kskdir)
expect = f"{zone}. 3600 IN RRSIG CDS {alg} 2 3600 {sigend} {sigstart} {keytag} {zone}."
alg = key.get_metadata("Algorithm")
expect = f"{zone}. 3600 IN RRSIG CDS {alg} 2 3600 {sigend} {sigstart} {key.tag} {zone}."
rrsig = " ".join(lines[line_no].split())
assert expect in rrsig
line_no += 1
@ -441,52 +412,55 @@ def test_ksr_common(servers):
n = 1
# create ksk
ksks = keygen(zone, policy, "ns1/offline")
kskdir = "ns1/offline"
out = keygen(zone, policy, kskdir)
ksks = keystr_to_keylist(out, kskdir)
assert len(ksks) == 1
# check that 'dnssec-ksr keygen' pregenerates right amount of keys
out, _ = ksr(zone, policy, "keygen", options="-i now -e +1y")
zsks = out.split()
zsks = keystr_to_keylist(out)
assert len(zsks) == 2
alg = os.environ.get("DEFAULT_ALGORITHM_NUMBER")
size = os.environ.get("DEFAULT_BITS")
lifetime = 16070400
lifetime = timedelta(days=31 * 6)
check_keys(zsks, lifetime, alg, size)
# check that 'dnssec-ksr keygen' pregenerates right amount of keys
# in the given key directory
out, _ = ksr(zone, policy, "keygen", options="-K ns1 -i now -e +1y")
zsks = out.split()
zskdir = "ns1"
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +1y")
zsks = keystr_to_keylist(out, zskdir)
assert len(zsks) == 2
alg = os.environ.get("DEFAULT_ALGORITHM_NUMBER")
size = os.environ.get("DEFAULT_BITS")
lifetime = 16070400
check_keys(zsks, lifetime, alg, size, keydir="ns1")
lifetime = timedelta(days=31 * 6)
check_keys(zsks, lifetime, alg, size)
for key in zsks:
privatefile = f"ns1/{key}.private"
keyfile = f"ns1/{key}.key"
statefile = f"ns1/{key}.state"
privatefile = f"{key.path}.private"
keyfile = f"{key.path}.key"
statefile = f"{key.path}.state"
shutil.copyfile(privatefile, f"{privatefile}.backup")
shutil.copyfile(keyfile, f"{keyfile}.backup")
shutil.copyfile(statefile, f"{statefile}.backup")
# check that 'dnssec-ksr request' creates correct ksr
now = get_timing_metadata(zsks[0], "Created", keydir="ns1")
until = addtime(now, 31536000) # 1 year
out, _ = ksr(zone, policy, "request", options=f"-K ns1 -i {now} -e +1y")
now = zsks[0].get_timing("Created")
until = now + timedelta(days=365)
out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {now} -e +1y")
fname = f"{zone}.ksr.{n}"
with open(fname, "w", encoding="utf-8") as file:
file.write(out)
check_keysigningrequest(out, zsks, now, until, keydir="ns1")
check_keysigningrequest(out, zsks, now, until)
# check that 'dnssec-ksr sign' creates correct skr
out, _ = ksr(
zone, policy, "sign", options=f"-K ns1/offline -f {fname} -i {now} -e +1y"
zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {now} -e +1y"
)
fname = f"{zone}.skr.{n}"
@ -494,28 +468,26 @@ def test_ksr_common(servers):
file.write(out)
refresh = -432000 # 5 days
check_signedkeyresponse(
out, zone, ksks, zsks, now, until, refresh, kskdir="ns1/offline", zskdir="ns1"
)
check_signedkeyresponse(out, zone, ksks, zsks, now, until, refresh)
# common test cases (2)
n = 2
# check that 'dnssec-ksr keygen' selects pregenerated keys for
# the same time bundle
out, _ = ksr(zone, policy, "keygen", options=f"-K ns1 -i {now} -e +1y")
selected_zsks = out.split()
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {now} -e +1y")
selected_zsks = keystr_to_keylist(out, zskdir)
assert len(selected_zsks) == 2
for index, key in enumerate(selected_zsks):
assert zsks[index] == key
file_contents_equal(f"ns1/{key}.private", f"ns1/{key}.private.backup")
file_contents_equal(f"ns1/{key}.key", f"ns1/{key}.key.backup")
file_contents_equal(f"ns1/{key}.state", f"ns1/{key}.state.backup")
file_contents_equal(f"{key.path}.private", f"{key.path}.private.backup")
file_contents_equal(f"{key.path}.key", f"{key.path}.key.backup")
file_contents_equal(f"{key.path}.state", f"{key.path}.state.backup")
# check that 'dnssec-ksr keygen' generates only necessary keys for
# overlapping time bundle
out, err = ksr(zone, policy, "keygen", options=f"-K ns1 -i {now} -e +2y -v 1")
overlapping_zsks = out.split()
out, err = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {now} -e +2y -v 1")
overlapping_zsks = keystr_to_keylist(out, zskdir)
assert len(overlapping_zsks) == 4
verbose = err.split()
@ -531,15 +503,15 @@ def test_ksr_common(servers):
for index, key in enumerate(overlapping_zsks):
if index < 2:
assert zsks[index] == key
file_contents_equal(f"ns1/{key}.private", f"ns1/{key}.private.backup")
file_contents_equal(f"ns1/{key}.key", f"ns1/{key}.key.backup")
file_contents_equal(f"ns1/{key}.state", f"ns1/{key}.state.backup")
file_contents_equal(f"{key.path}.private", f"{key.path}.private.backup")
file_contents_equal(f"{key.path}.key", f"{key.path}.key.backup")
file_contents_equal(f"{key.path}.state", f"{key.path}.state.backup")
# run 'dnssec-ksr keygen' again with verbosity 0
out, _ = ksr(zone, policy, "keygen", options=f"-K ns1 -i {now} -e +2y")
overlapping_zsks2 = out.split()
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {now} -e +2y")
overlapping_zsks2 = keystr_to_keylist(out, zskdir)
assert len(overlapping_zsks2) == 4
check_keys(overlapping_zsks2, lifetime, alg, size, keydir="ns1")
check_keys(overlapping_zsks2, lifetime, alg, size)
for index, key in enumerate(overlapping_zsks2):
assert overlapping_zsks[index] == key
@ -551,7 +523,7 @@ def test_ksr_common(servers):
with open(fname, "w", encoding="utf-8") as file:
file.write(out)
check_keysigningrequest(out, zsks, now, until, keydir="ns1")
check_keysigningrequest(out, zsks, now, until)
# check that 'dnssec-ksr request' creates correct ksr with new interval
out, _ = ksr(zone, policy, "request", options=f"-K ns1 -i {now} -e +2y")
@ -560,8 +532,8 @@ def test_ksr_common(servers):
with open(fname, "w", encoding="utf-8") as file:
file.write(out)
until = addtime(now, 63072000) # 2 years
check_keysigningrequest(out, overlapping_zsks, now, until, keydir="ns1")
until = now + timedelta(days=365 * 2)
check_keysigningrequest(out, overlapping_zsks, now, until)
# check that 'dnssec-ksr request' errors if there are not enough keys
_, err = ksr(
@ -592,8 +564,6 @@ def test_ksr_common(servers):
now,
until,
refresh,
kskdir="ns1/offline",
zskdir="ns1",
)
# add zone
@ -618,15 +588,11 @@ def test_ksr_common(servers):
# - dnssec_verify
isctest.kasp.dnssec_verify(ns1, zone)
# - check keys
check_keys(overlapping_zsks, lifetime, alg, size, keydir="ns1", with_state=True)
check_keys(overlapping_zsks, lifetime, alg, size, with_state=True)
# - check apex
isctest.kasp.check_apex(
ns1, zone, ksks, overlapping_zsks, kskdir="ns1/offline", zskdir="ns1"
)
isctest.kasp.check_apex(ns1, zone, ksks, overlapping_zsks)
# - check subdomain
isctest.kasp.check_subdomain(
ns1, zone, ksks, overlapping_zsks, kskdir="ns1/offline", zskdir="ns1"
)
isctest.kasp.check_subdomain(ns1, zone, ksks, overlapping_zsks)
# pylint: disable=too-many-locals
@ -636,38 +602,39 @@ def test_ksr_lastbundle(servers):
n = 1
# create ksk
now = datetime.now().strftime("%Y%m%d%H%M%S")
offset = -31536000
when = addtime(now, offset)
when = addtime(when, -86400)
ksks = keygen(zone, policy, "ns1/offline", when=when)
kskdir = "ns1/offline"
now = KeyTimingMetadata.now()
offset = -timedelta(days=365)
when = now + offset - timedelta(days=1)
out = keygen(zone, policy, kskdir, when=str(when))
ksks = keystr_to_keylist(out, kskdir)
assert len(ksks) == 1
# check that 'dnssec-ksr keygen' pregenerates right amount of keys
out, _ = ksr(zone, policy, "keygen", options="-K ns1 -i -1y -e +1d")
zsks = out.split()
zskdir = "ns1"
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i -1y -e +1d")
zsks = keystr_to_keylist(out, zskdir)
assert len(zsks) == 2
alg = os.environ.get("DEFAULT_ALGORITHM_NUMBER")
size = os.environ.get("DEFAULT_BITS")
lifetime = 16070400
check_keys(zsks, lifetime, alg, size, keydir="ns1", offset=offset)
lifetime = timedelta(days=31 * 6)
check_keys(zsks, lifetime, alg, size, offset=offset)
# check that 'dnssec-ksr request' creates correct ksr
then = get_timing_metadata(zsks[0], "Created", keydir="ns1")
then = addtime(then, offset)
until = addtime(then, 31622400) # 1 year, 1 day
out, _ = ksr(zone, policy, "request", options=f"-K ns1 -i {then} -e +1d")
then = zsks[0].get_timing("Created") + offset
until = then + timedelta(days=366)
out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {then} -e +1d")
fname = f"{zone}.ksr.{n}"
with open(fname, "w", encoding="utf-8") as file:
file.write(out)
check_keysigningrequest(out, zsks, then, until, keydir="ns1")
check_keysigningrequest(out, zsks, then, until)
# check that 'dnssec-ksr sign' creates correct skr
out, _ = ksr(
zone, policy, "sign", options=f"-K ns1/offline -f {fname} -i {then} -e +1d"
zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {then} -e +1d"
)
fname = f"{zone}.skr.{n}"
@ -675,9 +642,7 @@ def test_ksr_lastbundle(servers):
file.write(out)
refresh = -432000 # 5 days
check_signedkeyresponse(
out, zone, ksks, zsks, then, until, refresh, kskdir="ns1/offline", zskdir="ns1"
)
check_signedkeyresponse(out, zone, ksks, zsks, then, until, refresh)
# add zone
ns1 = servers["ns1"]
@ -701,13 +666,11 @@ def test_ksr_lastbundle(servers):
# - dnssec_verify
isctest.kasp.dnssec_verify(ns1, zone)
# - check keys
check_keys(zsks, lifetime, alg, size, keydir="ns1", offset=offset, with_state=True)
check_keys(zsks, lifetime, alg, size, offset=offset, with_state=True)
# - check apex
isctest.kasp.check_apex(ns1, zone, ksks, zsks, kskdir="ns1/offline", zskdir="ns1")
isctest.kasp.check_apex(ns1, zone, ksks, zsks)
# - check subdomain
isctest.kasp.check_subdomain(
ns1, zone, ksks, zsks, kskdir="ns1/offline", zskdir="ns1"
)
isctest.kasp.check_subdomain(ns1, zone, ksks, zsks)
# check that last bundle warning is logged
warning = "last bundle in skr, please import new skr file"
@ -721,38 +684,40 @@ def test_ksr_inthemiddle(servers):
n = 1
# create ksk
now = datetime.now().strftime("%Y%m%d%H%M%S")
offset = -31536000
when = addtime(now, offset)
when = addtime(when, -86400)
ksks = keygen(zone, policy, "ns1/offline", when=when)
kskdir = "ns1/offline"
now = KeyTimingMetadata.now()
offset = -timedelta(days=365)
when = now + offset - timedelta(days=1)
out = keygen(zone, policy, kskdir, when=str(when))
ksks = keystr_to_keylist(out, kskdir)
assert len(ksks) == 1
# check that 'dnssec-ksr keygen' pregenerates right amount of keys
out, _ = ksr(zone, policy, "keygen", options="-K ns1 -i -1y -e +1y")
zsks = out.split()
zskdir = "ns1"
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i -1y -e +1y")
zsks = keystr_to_keylist(out, zskdir)
assert len(zsks) == 4
alg = os.environ.get("DEFAULT_ALGORITHM_NUMBER")
size = os.environ.get("DEFAULT_BITS")
lifetime = 16070400
check_keys(zsks, lifetime, alg, size, keydir="ns1", offset=offset)
lifetime = timedelta(days=31 * 6)
check_keys(zsks, lifetime, alg, size, offset=offset)
# check that 'dnssec-ksr request' creates correct ksr
then = get_timing_metadata(zsks[0], "Created", keydir="ns1")
then = addtime(then, offset)
until = addtime(then, 63072000) # 2 years
out, _ = ksr(zone, policy, "request", options=f"-K ns1 -i {then} -e +1y")
then = zsks[0].get_timing("Created")
then = then + offset
until = then + timedelta(days=365 * 2)
out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {then} -e +1y")
fname = f"{zone}.ksr.{n}"
with open(fname, "w", encoding="utf-8") as file:
file.write(out)
check_keysigningrequest(out, zsks, then, until, keydir="ns1")
check_keysigningrequest(out, zsks, then, until)
# check that 'dnssec-ksr sign' creates correct skr
out, _ = ksr(
zone, policy, "sign", options=f"-K ns1/offline -f {fname} -i {then} -e +1y"
zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {then} -e +1y"
)
fname = f"{zone}.skr.{n}"
@ -760,9 +725,7 @@ def test_ksr_inthemiddle(servers):
file.write(out)
refresh = -432000 # 5 days
check_signedkeyresponse(
out, zone, ksks, zsks, then, until, refresh, kskdir="ns1/offline", zskdir="ns1"
)
check_signedkeyresponse(out, zone, ksks, zsks, then, until, refresh)
# add zone
ns1 = servers["ns1"]
@ -786,13 +749,11 @@ def test_ksr_inthemiddle(servers):
# - dnssec_verify
isctest.kasp.dnssec_verify(ns1, zone)
# - check keys
check_keys(zsks, lifetime, alg, size, keydir="ns1", offset=offset, with_state=True)
check_keys(zsks, lifetime, alg, size, offset=offset, with_state=True)
# - check apex
isctest.kasp.check_apex(ns1, zone, ksks, zsks, kskdir="ns1/offline", zskdir="ns1")
isctest.kasp.check_apex(ns1, zone, ksks, zsks)
# - check subdomain
isctest.kasp.check_subdomain(
ns1, zone, ksks, zsks, kskdir="ns1/offline", zskdir="ns1"
)
isctest.kasp.check_subdomain(ns1, zone, ksks, zsks)
# check that no last bundle warning is logged
warning = "last bundle in skr, please import new skr file"
@ -804,22 +765,25 @@ def check_ksr_rekey_logs_error(server, zone, policy, offset, end):
n = 1
# create ksk
now = datetime.now().strftime("%Y%m%d%H%M%S")
then = addtime(now, offset)
until = addtime(now, end)
ksks = keygen(zone, policy, "ns1/offline", when=then)
kskdir = "ns1/offline"
now = KeyTimingMetadata.now()
then = now + offset
until = now + end
out = keygen(zone, policy, kskdir, when=str(then))
ksks = keystr_to_keylist(out, kskdir)
assert len(ksks) == 1
# key generation
out, _ = ksr(zone, policy, "keygen", options=f"-K ns1 -i {then} -e {until}")
zsks = out.split()
zskdir = "ns1"
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i {then} -e {until}")
zsks = keystr_to_keylist(out, zskdir)
assert len(zsks) == 2
# create request
now = get_timing_metadata(zsks[0], "Created", keydir="ns1")
then = addtime(now, offset)
until = addtime(now, end)
out, _ = ksr(zone, policy, "request", options=f"-K ns1 -i {then} -e {until}")
now = zsks[0].get_timing("Created")
then = now + offset
until = now + end
out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {then} -e {until}")
fname = f"{zone}.ksr.{n}"
with open(fname, "w", encoding="utf-8") as file:
@ -827,7 +791,7 @@ def check_ksr_rekey_logs_error(server, zone, policy, offset, end):
# sign request
out, _ = ksr(
zone, policy, "sign", options=f"-K ns1/offline -f {fname} -i {then} -e {until}"
zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {then} -e {until}"
)
fname = f"{zone}.skr.{n}"
@ -878,33 +842,36 @@ def test_ksr_unlimited(servers):
n = 1
# create ksk
ksks = keygen(zone, policy, "ns1/offline")
kskdir = "ns1/offline"
out = keygen(zone, policy, kskdir)
ksks = keystr_to_keylist(out, kskdir)
assert len(ksks) == 1
# check that 'dnssec-ksr keygen' pregenerates right amount of keys
out, _ = ksr(zone, policy, "keygen", options="-K ns1 -i now -e +2y")
zsks = out.split()
zskdir = "ns1"
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +2y")
zsks = keystr_to_keylist(out, zskdir)
assert len(zsks) == 1
alg = os.environ.get("DEFAULT_ALGORITHM_NUMBER")
size = os.environ.get("DEFAULT_BITS")
lifetime = 0
check_keys(zsks, lifetime, alg, size, keydir="ns1")
lifetime = None
check_keys(zsks, lifetime, alg, size)
# check that 'dnssec-ksr request' creates correct ksr
now = get_timing_metadata(zsks[0], "Created", keydir="ns1")
until = addtime(now, 4 * 31536000) # 4 years
out, _ = ksr(zone, policy, "request", options=f"-K ns1 -i {now} -e +4y")
now = zsks[0].get_timing("Created")
until = now + timedelta(days=365 * 4)
out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {now} -e +4y")
fname = f"{zone}.ksr.{n}"
with open(fname, "w", encoding="utf-8") as file:
file.write(out)
check_keysigningrequest(out, zsks, now, until, keydir="ns1")
check_keysigningrequest(out, zsks, now, until)
# check that 'dnssec-ksr sign' creates correct skr without cdnskey
out, _ = ksr(
zone, "no-cdnskey", "sign", options=f"-K ns1/offline -f {fname} -i {now} -e +4y"
zone, "no-cdnskey", "sign", options=f"-K {kskdir} -f {fname} -i {now} -e +4y"
)
skrfile = f"{zone}.no-cdnskey.skr.{n}"
@ -920,15 +887,13 @@ def test_ksr_unlimited(servers):
now,
until,
refresh,
kskdir="ns1/offline",
zskdir="ns1",
cdnskey=False,
cds="SHA-1, SHA-256, SHA-384",
)
# check that 'dnssec-ksr sign' creates correct skr without cds
out, _ = ksr(
zone, "no-cds", "sign", options=f"-K ns1/offline -f {fname} -i {now} -e +4y"
zone, "no-cds", "sign", options=f"-K {kskdir} -f {fname} -i {now} -e +4y"
)
skrfile = f"{zone}.no-cds.skr.{n}"
@ -944,14 +909,12 @@ def test_ksr_unlimited(servers):
now,
until,
refresh,
kskdir="ns1/offline",
zskdir="ns1",
cds="",
)
# check that 'dnssec-ksr sign' creates correct skr
out, _ = ksr(
zone, policy, "sign", options=f"-K ns1/offline -f {fname} -i {now} -e +4y"
zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {now} -e +4y"
)
skrfile = f"{zone}.{policy}.skr.{n}"
@ -959,9 +922,7 @@ def test_ksr_unlimited(servers):
file.write(out)
refresh = -432000 # 5 days
check_signedkeyresponse(
out, zone, ksks, zsks, now, until, refresh, kskdir="ns1/offline", zskdir="ns1"
)
check_signedkeyresponse(out, zone, ksks, zsks, now, until, refresh)
# add zone
ns1 = servers["ns1"]
@ -985,13 +946,11 @@ def test_ksr_unlimited(servers):
# - dnssec_verify
isctest.kasp.dnssec_verify(ns1, zone)
# - check keys
check_keys(zsks, lifetime, alg, size, keydir="ns1", with_state=True)
check_keys(zsks, lifetime, alg, size, with_state=True)
# - check apex
isctest.kasp.check_apex(ns1, zone, ksks, zsks, kskdir="ns1/offline", zskdir="ns1")
isctest.kasp.check_apex(ns1, zone, ksks, zsks)
# - check subdomain
isctest.kasp.check_subdomain(
ns1, zone, ksks, zsks, kskdir="ns1/offline", zskdir="ns1"
)
isctest.kasp.check_subdomain(ns1, zone, ksks, zsks)
# pylint: disable=too-many-locals
@ -1001,12 +960,15 @@ def test_ksr_twotone(servers):
n = 1
# create ksk
ksks = keygen(zone, policy, "ns1/offline")
kskdir = "ns1/offline"
out = keygen(zone, policy, kskdir)
ksks = keystr_to_keylist(out, kskdir)
assert len(ksks) == 2
# check that 'dnssec-ksr keygen' pregenerates right amount of keys
out, _ = ksr(zone, policy, "keygen", options="-K ns1 -i now -e +1y")
zsks = out.split()
zskdir = "ns1"
out, _ = ksr(zone, policy, "keygen", options=f"-K {zskdir} -i now -e +1y")
zsks = keystr_to_keylist(out, zskdir)
# First algorithm keys have a lifetime of 3 months, so there should
# be 4 created keys. Second algorithm keys have a lifetime of 5
# months, so there should be 3 created keys. While only two time
@ -1017,7 +979,7 @@ def test_ksr_twotone(servers):
zsks_defalg = []
zsks_altalg = []
for zsk in zsks:
alg = get_metadata(zsk, "Algorithm", keydir="ns1")
alg = zsk.get_metadata("Algorithm")
if alg == os.environ.get("DEFAULT_ALGORITHM_NUMBER"):
zsks_defalg.append(zsk)
elif alg == os.environ.get("ALTERNATIVE_ALGORITHM_NUMBER"):
@ -1028,38 +990,36 @@ def test_ksr_twotone(servers):
alg = os.environ.get("DEFAULT_ALGORITHM_NUMBER")
size = os.environ.get("DEFAULT_BITS")
lifetime = 8035200 # 3 months
check_keys(zsks_defalg, lifetime, alg, size, keydir="ns1")
lifetime = timedelta(days=31 * 3)
check_keys(zsks_defalg, lifetime, alg, size)
alg = os.environ.get("ALTERNATIVE_ALGORITHM_NUMBER")
size = os.environ.get("ALTERNATIVE_BITS")
lifetime = 13392000 # 5 months
check_keys(zsks_altalg, lifetime, alg, size, keydir="ns1")
lifetime = timedelta(days=31 * 5)
check_keys(zsks_altalg, lifetime, alg, size)
# check that 'dnssec-ksr request' creates correct ksr
now = get_timing_metadata(zsks[0], "Created", keydir="ns1")
until = addtime(now, 31536000) # 1 year
out, _ = ksr(zone, policy, "request", options=f"-K ns1 -i {now} -e +1y")
now = zsks[0].get_timing("Created")
until = now + timedelta(days=365)
out, _ = ksr(zone, policy, "request", options=f"-K {zskdir} -i {now} -e +1y")
fname = f"{zone}.ksr.{n}"
with open(fname, "w", encoding="utf-8") as file:
file.write(out)
check_keysigningrequest(out, zsks, now, until, keydir="ns1")
check_keysigningrequest(out, zsks, now, until)
# check that 'dnssec-ksr sign' creates correct skr
out, _ = ksr(
zone, policy, "sign", options=f"-K ns1/offline -f {fname} -i {now} -e +1y"
zone, policy, "sign", options=f"-K {kskdir} -f {fname} -i {now} -e +1y"
)
skrfile = f"{zone}.skr.{n}"
with open(skrfile, "w", encoding="utf-8") as file:
file.write(out)
refresh = -432000 # 5 days
check_signedkeyresponse(
out, zone, ksks, zsks, now, until, refresh, kskdir="ns1/offline", zskdir="ns1"
)
refresh = -timedelta(days=5)
check_signedkeyresponse(out, zone, ksks, zsks, now, until, refresh)
# add zone
ns1 = servers["ns1"]
@ -1085,16 +1045,14 @@ def test_ksr_twotone(servers):
# - check keys
alg = os.environ.get("DEFAULT_ALGORITHM_NUMBER")
size = os.environ.get("DEFAULT_BITS")
lifetime = 8035200 # 3 months
check_keys(zsks_defalg, lifetime, alg, size, keydir="ns1", with_state=True)
lifetime = timedelta(days=31 * 3)
check_keys(zsks_defalg, lifetime, alg, size, with_state=True)
alg = os.environ.get("ALTERNATIVE_ALGORITHM_NUMBER")
size = os.environ.get("ALTERNATIVE_BITS")
lifetime = 13392000 # 5 months
check_keys(zsks_altalg, lifetime, alg, size, keydir="ns1", with_state=True)
lifetime = timedelta(days=31 * 5)
check_keys(zsks_altalg, lifetime, alg, size, with_state=True)
# - check apex
isctest.kasp.check_apex(ns1, zone, ksks, zsks, kskdir="ns1/offline", zskdir="ns1")
isctest.kasp.check_apex(ns1, zone, ksks, zsks)
# - check subdomain
isctest.kasp.check_subdomain(
ns1, zone, ksks, zsks, kskdir="ns1/offline", zskdir="ns1"
)
isctest.kasp.check_subdomain(ns1, zone, ksks, zsks)