diff --git a/bin/tests/system/checkds/tests_checkds.py b/bin/tests/system/checkds/tests_checkds.py index 6360b2ed45..86b430e5d4 100755 --- a/bin/tests/system/checkds/tests_checkds.py +++ b/bin/tests/system/checkds/tests_checkds.py @@ -11,6 +11,8 @@ # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. +from typing import NamedTuple, Tuple + import os import subprocess import sys @@ -41,8 +43,8 @@ def has_signed_apex_nsec(zone, response): nextname = "a." labelcount = zone.count(".") # zone is specified as FQDN types = "NS SOA RRSIG NSEC DNSKEY" - match = "{0} {1} IN NSEC {2}{0} {3}".format(zone, ttl, nextname, types) - sig = "{0} {1} IN RRSIG NSEC 13 {2} 300".format(zone, ttl, labelcount) + match = f"{zone} {ttl} IN NSEC {nextname}{zone} {types}" + sig = f"{zone} {ttl} IN RRSIG NSEC 13 {labelcount} 300" for rr in response.answer: if match in rr.to_text(): @@ -76,7 +78,7 @@ def verify_zone(zone, transfer): verify = os.getenv("VERIFY") assert verify is not None - filename = "{}out".format(zone) + filename = f"{zone}out" with open(filename, "w", encoding="utf-8") as file: for rr in transfer.answer: file.write(rr.to_text()) @@ -88,7 +90,7 @@ def verify_zone(zone, transfer): verifier = subprocess.run(verify_cmd, capture_output=True, check=True) if verifier.returncode != 0: - print("error: dnssec-verify {} failed".format(zone)) + print(f"error: dnssec-verify {zone} failed") sys.stderr.buffer.write(verifier.stderr) return verifier.returncode == 0 @@ -102,7 +104,7 @@ def read_statefile(server, zone): response = do_query(server, zone, "DS", tcp=True) if not isinstance(response, dns.message.Message): - print("error: no response for {} DS from {}".format(zone, addr)) + print(f"error: no response for {zone} DS from {addr}") return {} if response.rcode() == dns.rcode.NOERROR: @@ -120,20 +122,16 @@ def read_statefile(server, zone): if count != 1: print( - "error: expected a single DS in response for {} from {}," - "got {}".format(zone, addr, count) + f"error: expected a single DS in response for {zone} from {addr}, got {count}" ) return {} else: - print( - "error: {} response for {} DNSKEY from {}".format( - dns.rcode.to_text(response.rcode()), zone, addr - ) - ) + rcode = dns.rcode.to_text(response.rcode()) + print(f"error: {rcode} response for {zone} DNSKEY from {addr}") return {} - filename = "ns9/K{}+013+{:05d}.state".format(zone, keyid) - print("read state file {}".format(filename)) + filename = f"ns9/K{zone}+013+{keyid:05d}.state" + print(f"read state file {filename}") try: with open(filename, "r", encoding="utf-8") as file: @@ -152,22 +150,19 @@ def read_statefile(server, zone): def zone_check(server, zone): addr = server.ip - fqdn = "{}.".format(zone) + fqdn = f"{zone}." # wait until zone is fully signed. signed = False for _ in range(10): response = do_query(server, fqdn, "NSEC") if not isinstance(response, dns.message.Message): - print("error: no response for {} NSEC from {}".format(fqdn, addr)) + print(f"error: no response for {fqdn} NSEC from {addr}") elif response.rcode() == dns.rcode.NOERROR: signed = has_signed_apex_nsec(fqdn, response) else: - print( - "error: {} response for {} NSEC from {}".format( - dns.rcode.to_text(response.rcode()), fqdn, addr - ) - ) + rcode = dns.rcode.to_text(response.rcode()) + print(f"error: {rcode} response for {fqdn} NSEC from {addr}") if signed: break @@ -180,21 +175,18 @@ def zone_check(server, zone): verified = False transfer = do_query(server, fqdn, "AXFR", tcp=True) if not isinstance(transfer, dns.message.Message): - print("error: no response for {} AXFR from {}".format(fqdn, addr)) + print(f"error: no response for {fqdn} AXFR from {addr}") elif transfer.rcode() == dns.rcode.NOERROR: verified = verify_zone(fqdn, transfer) else: - print( - "error: {} response for {} AXFR from {}".format( - dns.rcode.to_text(transfer.rcode()), fqdn, addr - ) - ) + rcode = dns.rcode.to_text(transfer.rcode()) + print(f"error: {rcode} response for {fqdn} AXFR from {addr}") assert verified def keystate_check(server, zone, key): - fqdn = "{}.".format(zone) + fqdn = f"{zone}." val = 0 deny = False @@ -245,278 +237,273 @@ def rekey(zone): controller = subprocess.run(rndc_cmd, capture_output=True, check=True) if controller.returncode != 0: - print("error: rndc loadkeys {} failed".format(zone)) + print(f"error: rndc loadkeys {zone} failed") sys.stderr.buffer.write(controller.stderr) assert controller.returncode == 0 -def checkds_dspublished(named_port, servers, checkds, addr): - # - # 1.1.1: DS is correctly published in parent. - # parental-agents: ns2 - # - - # The simple case. - zone = "good.{}.dspublish.ns2".format(checkds) - zone_check(servers["ns9"], zone) - with servers["ns9"].watch_log_from_start() as watcher: - line = f"zone {zone}/IN (signed): checkds: DS response from {addr}" - watcher.wait_for_line(line) - keystate_check(servers["ns2"], zone, "DSPublish") - - # - # 1.1.2: DS is not published in parent. - # parental-agents: ns5 - # - zone = "not-yet.{}.dspublish.ns5".format(checkds) - zone_check(servers["ns9"], zone) - with servers["ns9"].watch_log_from_start() as watcher: - line = f"zone {zone}/IN (signed): checkds: empty DS response from 10.53.0.5" - watcher.wait_for_line(line) - keystate_check(servers["ns2"], zone, "!DSPublish") - - # - # 1.1.3: The parental agent is badly configured. - # parental-agents: ns6 - # - zone = "bad.{}.dspublish.ns6".format(checkds) - zone_check(servers["ns9"], zone) - if checkds == "explicit": - with servers["ns9"].watch_log_from_start() as watcher: - line = f"zone {zone}/IN (signed): checkds: bad DS response from 10.53.0.6" - watcher.wait_for_line(line) - elif checkds == "yes": - with servers["ns9"].watch_log_from_start() as watcher: - line = f"zone {zone}/IN (signed): checkds: error during parental-agents processing" - watcher.wait_for_line(line) - keystate_check(servers["ns2"], zone, "!DSPublish") - - # - # 1.1.4: DS is published, but has bogus signature. - # - # TBD - - # - # 1.2.1: DS is correctly published in all parents. - # parental-agents: ns2, ns4 - # - zone = "good.{}.dspublish.ns2-4".format(checkds) - zone_check(servers["ns9"], zone) - with servers["ns9"].watch_log_from_start() as watcher: - line = f"zone {zone}/IN (signed): checkds: DS response from {addr}" - watcher.wait_for_line(line) - with servers["ns9"].watch_log_from_start() as watcher: - line = f"zone {zone}/IN (signed): checkds: DS response from 10.53.0.4" - watcher.wait_for_line(line) - keystate_check(servers["ns2"], zone, "DSPublish") - - # - # 1.2.2: DS is not published in some parents. - # parental-agents: ns2, ns4, ns5 - # - zone = "incomplete.{}.dspublish.ns2-4-5".format(checkds) - zone_check(servers["ns9"], zone) - with servers["ns9"].watch_log_from_start() as watcher: - line = f"zone {zone}/IN (signed): checkds: DS response from {addr}" - watcher.wait_for_line(line) - with servers["ns9"].watch_log_from_start() as watcher: - line = f"zone {zone}/IN (signed): checkds: DS response from 10.53.0.4" - watcher.wait_for_line(line) - with servers["ns9"].watch_log_from_start() as watcher: - line = f"zone {zone}/IN (signed): checkds: empty DS response from 10.53.0.5" - watcher.wait_for_line(line) - keystate_check(servers["ns2"], zone, "!DSPublish") - - # - # 1.2.3: One parental agent is badly configured. - # parental-agents: ns2, ns4, ns6 - # - zone = "bad.{}.dspublish.ns2-4-6".format(checkds) - zone_check(servers["ns9"], zone) - with servers["ns9"].watch_log_from_start() as watcher: - line = f"zone {zone}/IN (signed): checkds: DS response from {addr}" - watcher.wait_for_line(line) - with servers["ns9"].watch_log_from_start() as watcher: - line = f"zone {zone}/IN (signed): checkds: DS response from 10.53.0.4" - watcher.wait_for_line(line) - with servers["ns9"].watch_log_from_start() as watcher: - line = f"zone {zone}/IN (signed): checkds: bad DS response from 10.53.0.6" - watcher.wait_for_line(line) - keystate_check(servers["ns2"], zone, "!DSPublish") - - # - # 1.2.4: DS is completely published, bogus signature. - # - # TBD - - # TBD: Check with TSIG - # TBD: Check with TLS +class CheckDSTest(NamedTuple): + zone: str + logs_to_wait_for: Tuple[str] + expected_parent_state: str -def checkds_dswithdrawn(named_port, servers, checkds, addr): - # - # 2.1.1: DS correctly withdrawn from the parent. - # parental-agents: ns5 - # - - # The simple case. - zone = "good.{}.dsremoved.ns5".format(checkds) - zone_check(servers["ns9"], zone) - with servers["ns9"].watch_log_from_start() as watcher: - line = f"zone {zone}/IN (signed): checkds: empty DS response from {addr}" - watcher.wait_for_line(line) - keystate_check(servers["ns2"], zone, "DSRemoved") - - # - # 2.1.2: DS is published in the parent. - # parental-agents: ns2 - # - zone = "still-there.{}.dsremoved.ns2".format(checkds) - zone_check(servers["ns9"], zone) - with servers["ns9"].watch_log_from_start() as watcher: - line = f"zone {zone}/IN (signed): checkds: DS response from 10.53.0.2" - watcher.wait_for_line(line) - keystate_check(servers["ns2"], zone, "!DSRemoved") - - # - # 2.1.3: The parental agent is badly configured. - # parental-agents: ns6 - # - zone = "bad.{}.dsremoved.ns6".format(checkds) - zone_check(servers["ns9"], zone) - if checkds == "explicit": - with servers["ns9"].watch_log_from_start() as watcher: - line = f"zone {zone}/IN (signed): checkds: bad DS response from 10.53.0.6" - watcher.wait_for_line(line) - elif checkds == "yes": - with servers["ns9"].watch_log_from_start() as watcher: - line = f"zone {zone}/IN (signed): checkds: error during parental-agents processing" - watcher.wait_for_line(line) - keystate_check(servers["ns2"], zone, "!DSRemoved") - - # - # 2.1.4: DS is withdrawn, but has bogus signature. - # - # TBD - - # - # 2.2.1: DS is correctly withdrawn from all parents. - # parental-agents: ns5, ns7 - # - zone = "good.{}.dsremoved.ns5-7".format(checkds) - zone_check(servers["ns9"], zone) - with servers["ns9"].watch_log_from_start() as watcher: - line = f"zone {zone}/IN (signed): checkds: empty DS response from {addr}" - watcher.wait_for_line(line) - with servers["ns9"].watch_log_from_start() as watcher: - line = f"zone {zone}/IN (signed): checkds: empty DS response from 10.53.0.7" - watcher.wait_for_line(line) - keystate_check(servers["ns2"], zone, "DSRemoved") - - # - # 2.2.2: DS is not withdrawn from some parents. - # parental-agents: ns2, ns5, ns7 - # - zone = "incomplete.{}.dsremoved.ns2-5-7".format(checkds) - zone_check(servers["ns9"], zone) - with servers["ns9"].watch_log_from_start() as watcher: - line = f"zone {zone}/IN (signed): checkds: DS response from 10.53.0.2" - watcher.wait_for_line(line) - with servers["ns9"].watch_log_from_start() as watcher: - line = f"zone {zone}/IN (signed): checkds: empty DS response from {addr}" - watcher.wait_for_line(line) - with servers["ns9"].watch_log_from_start() as watcher: - line = f"zone {zone}/IN (signed): checkds: empty DS response from 10.53.0.7" - watcher.wait_for_line(line) - keystate_check(servers["ns2"], zone, "!DSRemoved") - - # - # 2.2.3: One parental agent is badly configured. - # parental-agents: ns5, ns6, ns7 - # - zone = "bad.{}.dsremoved.ns5-6-7".format(checkds) - zone_check(servers["ns9"], zone) - with servers["ns9"].watch_log_from_start() as watcher: - line = f"zone {zone}/IN (signed): checkds: empty DS response from {addr}" - watcher.wait_for_line(line) - with servers["ns9"].watch_log_from_start() as watcher: - line = f"zone {zone}/IN (signed): checkds: empty DS response from 10.53.0.7" - watcher.wait_for_line(line) - with servers["ns9"].watch_log_from_start() as watcher: - line = f"zone {zone}/IN (signed): checkds: bad DS response from 10.53.0.6" - watcher.wait_for_line(line) - keystate_check(servers["ns2"], zone, "!DSRemoved") - - # - # 2.2.4:: DS is removed completely, bogus signature. - # - # TBD - - -def test_checkds_reference(named_port, servers): +parental_agents_tests = [ # Using a reference to parental-agents. - zone = "reference.explicit.dspublish.ns2" - zone_check(servers["ns9"], zone) - with servers["ns9"].watch_log_from_start() as watcher: - line = f"zone {zone}/IN (signed): checkds: DS response from 10.53.0.8" - watcher.wait_for_line(line) - keystate_check(servers["ns2"], zone, "DSPublish") - - -def test_checkds_resolver(named_port, servers): + CheckDSTest( + zone="reference.explicit.dspublish.ns2", + logs_to_wait_for=("DS response from 10.53.0.8",), + expected_parent_state="DSPublish", + ), # Using a resolver as parental-agent (ns3). - zone = "resolver.explicit.dspublish.ns2" - zone_check(servers["ns9"], zone) - with servers["ns9"].watch_log_from_start() as watcher: - line = f"zone {zone}/IN (signed): checkds: DS response from 10.53.0.3" - watcher.wait_for_line(line) - keystate_check(servers["ns2"], zone, "DSPublish") - + CheckDSTest( + zone="resolver.explicit.dspublish.ns2", + logs_to_wait_for=("DS response from 10.53.0.3",), + expected_parent_state="DSPublish", + ), # Using a resolver as parental-agent (ns3). - zone = "resolver.explicit.dsremoved.ns5" - zone_check(servers["ns9"], zone) - with servers["ns9"].watch_log_from_start() as watcher: - line = f"zone {zone}/IN (signed): checkds: empty DS response from 10.53.0.3" - watcher.wait_for_line(line) - keystate_check(servers["ns2"], zone, "DSRemoved") + CheckDSTest( + zone="resolver.explicit.dsremoved.ns5", + logs_to_wait_for=("empty DS response from 10.53.0.3",), + expected_parent_state="DSRemoved", + ), +] + +no_ent_tests = [ + CheckDSTest( + zone="no-ent.ns2", + logs_to_wait_for=("DS response from 10.53.0.2",), + expected_parent_state="DSPublish", + ), + CheckDSTest( + zone="no-ent.ns5", + logs_to_wait_for=("DS response from 10.53.0.5",), + expected_parent_state="DSRemoved", + ), +] -def test_checkds_no_ent(named_port, servers): - zone = "no-ent.ns2" - zone_check(servers["ns9"], zone) - with servers["ns9"].watch_log_from_start() as watcher: - line = f"zone {zone}/IN (signed): checkds: DS response from 10.53.0.2" - watcher.wait_for_line(line) - keystate_check(servers["ns2"], zone, "DSPublish") - - zone = "no-ent.ns5" - zone_check(servers["ns9"], zone) - with servers["ns9"].watch_log_from_start() as watcher: - line = f"zone {zone}/IN (signed): checkds: DS response from 10.53.0.5" - watcher.wait_for_line(line) - keystate_check(servers["ns2"], zone, "DSRemoved") +def dspublished_tests(checkds, addr): + return [ + # + # 1.1.1: DS is correctly published in parent. + # parental-agents: ns2 + # + # The simple case. + CheckDSTest( + zone=f"good.{checkds}.dspublish.ns2", + logs_to_wait_for=(f"DS response from {addr}",), + expected_parent_state="DSPublish", + ), + # + # 1.1.2: DS is not published in parent. + # parental-agents: ns5 + # + CheckDSTest( + zone=f"not-yet.{checkds}.dspublish.ns5", + logs_to_wait_for=("empty DS response from 10.53.0.5",), + expected_parent_state="!DSPublish", + ), + # + # 1.1.3: The parental agent is badly configured. + # parental-agents: ns6 + # + CheckDSTest( + zone=f"bad.{checkds}.dspublish.ns6", + logs_to_wait_for=( + "bad DS response from 10.53.0.6" + if checkds == "explicit" + else "error during parental-agents processing", + ), + expected_parent_state="!DSPublish", + ), + # + # 1.1.4: DS is published, but has bogus signature. + # + # TBD + # + # 1.2.1: DS is correctly published in all parents. + # parental-agents: ns2, ns4 + # + CheckDSTest( + zone=f"good.{checkds}.dspublish.ns2-4", + logs_to_wait_for=(f"DS response from {addr}", "DS response from 10.53.0.4"), + expected_parent_state="DSPublish", + ), + # + # 1.2.2: DS is not published in some parents. + # parental-agents: ns2, ns4, ns5 + # + CheckDSTest( + zone=f"incomplete.{checkds}.dspublish.ns2-4-5", + logs_to_wait_for=( + f"DS response from {addr}", + "DS response from 10.53.0.4", + "empty DS response from 10.53.0.5", + ), + expected_parent_state="!DSPublish", + ), + # + # 1.2.3: One parental agent is badly configured. + # parental-agents: ns2, ns4, ns6 + # + CheckDSTest( + zone=f"bad.{checkds}.dspublish.ns2-4-6", + logs_to_wait_for=( + f"DS response from {addr}", + "DS response from 10.53.0.4", + "bad DS response from 10.53.0.6", + ), + expected_parent_state="!DSPublish", + ), + # + # 1.2.4: DS is completely published, bogus signature. + # + # TBD + # TBD: Check with TSIG + # TBD: Check with TLS + ] -def test_checkds_dspublished(named_port, servers): - checkds_dspublished(named_port, servers, "explicit", "10.53.0.8") - checkds_dspublished(named_port, servers, "yes", "10.53.0.2") +def dswithdrawn_tests(checkds, addr): + return [ + # + # 2.1.1: DS correctly withdrawn from the parent. + # parental-agents: ns5 + # + # The simple case. + CheckDSTest( + zone=f"good.{checkds}.dsremoved.ns5", + logs_to_wait_for=(f"empty DS response from {addr}",), + expected_parent_state="DSRemoved", + ), + # + # 2.1.2: DS is published in the parent. + # parental-agents: ns2 + # + CheckDSTest( + zone=f"still-there.{checkds}.dsremoved.ns2", + logs_to_wait_for=("DS response from 10.53.0.2",), + expected_parent_state="!DSRemoved", + ), + # + # 2.1.3: The parental agent is badly configured. + # parental-agents: ns6 + # + CheckDSTest( + zone=f"bad.{checkds}.dsremoved.ns6", + logs_to_wait_for=( + "bad DS response from 10.53.0.6" + if checkds == "explicit" + else "error during parental-agents processing", + ), + expected_parent_state="!DSRemoved", + ), + # + # 2.1.4: DS is withdrawn, but has bogus signature. + # + # TBD + # + # 2.2.1: DS is correctly withdrawn from all parents. + # parental-agents: ns5, ns7 + # + CheckDSTest( + zone=f"good.{checkds}.dsremoved.ns5-7", + logs_to_wait_for=( + f"empty DS response from {addr}", + "empty DS response from 10.53.0.7", + ), + expected_parent_state="DSRemoved", + ), + # + # 2.2.2: DS is not withdrawn from some parents. + # parental-agents: ns2, ns5, ns7 + # + CheckDSTest( + zone=f"incomplete.{checkds}.dsremoved.ns2-5-7", + logs_to_wait_for=( + "DS response from 10.53.0.2", + f"empty DS response from {addr}", + "empty DS response from 10.53.0.7", + ), + expected_parent_state="!DSRemoved", + ), + # + # 2.2.3: One parental agent is badly configured. + # parental-agents: ns5, ns6, ns7 + # + CheckDSTest( + zone=f"bad.{checkds}.dsremoved.ns5-6-7", + logs_to_wait_for=( + f"empty DS response from {addr}", + "empty DS response from 10.53.0.7", + "bad DS response from 10.53.0.6", + ), + expected_parent_state="!DSRemoved", + ), + # + # 2.2.4:: DS is removed completely, bogus signature. + # + # TBD + ] -def test_checkds_dswithdrawn(named_port, servers): - checkds_dswithdrawn(named_port, servers, "explicit", "10.53.0.10") - checkds_dswithdrawn(named_port, servers, "yes", "10.53.0.5") +checkds_no_tests = [ + CheckDSTest( + zone="good.no.dspublish.ns2", + logs_to_wait_for=(), + expected_parent_state="!DSPublish", + ), + CheckDSTest( + zone="good.no.dspublish.ns2-4", + logs_to_wait_for=(), + expected_parent_state="!DSPublish", + ), + CheckDSTest( + zone="good.no.dsremoved.ns5", + logs_to_wait_for=(), + expected_parent_state="!DSRemoved", + ), + CheckDSTest( + zone="good.no.dsremoved.ns5-7", + logs_to_wait_for=(), + expected_parent_state="!DSRemoved", + ), +] -def test_checkds_no(named_port, servers): - zone_check(servers["ns9"], "good.no.dspublish.ns2") - keystate_check(servers["ns2"], "good.no.dspublish.ns2", "!DSPublish") +checkds_tests = ( + parental_agents_tests + + no_ent_tests + + dspublished_tests("explicit", "10.53.0.8") + + dspublished_tests("yes", "10.53.0.2") + + dswithdrawn_tests("explicit", "10.53.0.10") + + dswithdrawn_tests("yes", "10.53.0.5") + + checkds_no_tests +) - zone_check(servers["ns9"], "good.no.dspublish.ns2-4") - keystate_check(servers["ns2"], "good.no.dspublish.ns2-4", "!DSPublish") - zone_check(servers["ns9"], "good.no.dsremoved.ns5") - keystate_check(servers["ns2"], "good.no.dsremoved.ns5", "!DSRemoved") +@pytest.mark.parametrize("params", checkds_tests, ids=lambda t: t.zone) +def test_checkds(servers, params): + # Wait until the provided zone is signed and then verify its DNSSEC data. + zone_check(servers["ns9"], params.zone) - zone_check(servers["ns9"], "good.no.dsremoved.ns5-7") - keystate_check(servers["ns2"], "good.no.dsremoved.ns5-7", "!DSRemoved") + # Wait until all the expected log lines are found in the log file for the + # provided server. + for log_string in params.logs_to_wait_for: + for _ in range(10): + with servers["ns9"].watch_log_from_start() as watcher: + line = f"zone {params.zone}/IN (signed): checkds: {log_string}" + try: + watcher.wait_for_line(line, timeout=1) + except TimeoutError: + rekey(params.zone) + else: + break + else: + raise TimeoutError + + # Check whether key states on the parent server provided match + # expectations. + keystate_check(servers["ns2"], params.zone, params.expected_parent_state)