diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index af9da14ff3..adeb82d10b 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -301,7 +301,7 @@ stages: .rule_mr_system_tests: &rule_mr_system_tests - if: '$CI_MERGE_REQUEST_DIFF_BASE_SHA != null' changes: - - 'bin/tests/system/**' + - 'bin/tests/system/**/*' .rule_mr_manual: &rule_mr_manual - if: '$CI_MERGE_REQUEST_DIFF_BASE_SHA != null' diff --git a/bin/tests/system/dnssec/tests_validation.py b/bin/tests/system/dnssec/tests_validation.py index a27a899987..ad298b39e8 100644 --- a/bin/tests/system/dnssec/tests_validation.py +++ b/bin/tests/system/dnssec/tests_validation.py @@ -14,11 +14,12 @@ import re import shutil import time -from dns import edns, flags, name, rdataclass, rdatatype +from dns import flags, name, rdataclass, rdatatype import pytest import isctest +from isctest.compat import EDECode import isctest.mark from isctest.util import param @@ -1131,8 +1132,7 @@ def test_expired_signatures(ns4): res = isctest.query.tcp(msg, "10.53.0.4") isctest.check.servfail(res) isctest.check.noadflag(res) - if hasattr(res, "extended_errors"): - assert res.extended_errors()[0].code == edns.EDECode.SIGNATURE_EXPIRED + isctest.check.ede(res, EDECode.SIGNATURE_EXPIRED) assert grep_q("expired.example/.*: RRSIG has expired", "ns4/named.run") # check future signatures do not validate @@ -1140,8 +1140,7 @@ def test_expired_signatures(ns4): res = isctest.query.tcp(msg, "10.53.0.4") isctest.check.servfail(res) isctest.check.noadflag(res) - if hasattr(res, "extended_errors"): - assert res.extended_errors()[0].code == edns.EDECode.SIGNATURE_NOT_YET_VALID + isctest.check.ede(res, EDECode.SIGNATURE_NOT_YET_VALID) assert grep_q( "future.example/.*: RRSIG validity period has not begun", "ns4/named.run" ) @@ -1301,10 +1300,7 @@ def test_unknown_algorithms(): res = isctest.query.tcp(msg, "10.53.0.4") isctest.check.noerror(res) isctest.check.noadflag(res) - if hasattr(res, "extended_errors"): - assert ( - res.extended_errors()[0].code == edns.EDECode.UNSUPPORTED_DNSKEY_ALGORITHM - ) + isctest.check.ede(res, EDECode.UNSUPPORTED_DNSKEY_ALGORITHM) # check that DNSKEY with an unsupported reserve key validates msg = isctest.query.create("dnskey-unsupported-2.example", "DNSKEY") @@ -1315,18 +1311,14 @@ def test_unknown_algorithms(): # check EDE code 2 for unsupported DS digest algorithm msg = isctest.query.create("a.ds-unsupported.example", "A") res = isctest.query.tcp(msg, "10.53.0.4") - if hasattr(res, "extended_errors"): - assert res.extended_errors()[0].code == edns.EDECode.UNSUPPORTED_DS_DIGEST_TYPE + isctest.check.ede(res, EDECode.UNSUPPORTED_DS_DIGEST_TYPE) # check EDE code 1 for bad algorithm mnemonic msg = isctest.query.create("badalg.secure.example", "A") res = isctest.query.tcp(msg, "10.53.0.4") isctest.check.noerror(res) isctest.check.noadflag(res) - if hasattr(res, "extended_errors"): - assert ( - res.extended_errors()[0].code == edns.EDECode.UNSUPPORTED_DNSKEY_ALGORITHM - ) + isctest.check.ede(res, EDECode.UNSUPPORTED_DNSKEY_ALGORITHM) # check that zone contents are still secure despite disable-algorithms # on query name (name below zone name). @@ -1342,10 +1334,7 @@ def test_unknown_algorithms(): isctest.check.rr_count_eq(res.answer, 2) isctest.check.noerror(res) isctest.check.noadflag(res) - if hasattr(res, "extended_errors"): - assert ( - res.extended_errors()[0].code == edns.EDECode.UNSUPPORTED_DNSKEY_ALGORITHM - ) + isctest.check.ede(res, EDECode.UNSUPPORTED_DNSKEY_ALGORITHM) # check that DS records are still treated as secure at the # disable-algorithm name @@ -1360,10 +1349,8 @@ def test_unknown_algorithms(): msg = isctest.query.create("a.digest-alg-unsupported.example", "A") res = isctest.query.tcp(msg, "10.53.0.4") isctest.check.noadflag(res) - if hasattr(res, "extended_errors"): - codes = {ede.code for ede in res.extended_errors()} - assert edns.EDECode.UNSUPPORTED_DNSKEY_ALGORITHM in codes - assert edns.EDECode.UNSUPPORTED_DS_DIGEST_TYPE in codes + isctest.check.ede(res, EDECode.UNSUPPORTED_DNSKEY_ALGORITHM) + isctest.check.ede(res, EDECode.UNSUPPORTED_DS_DIGEST_TYPE) # check that unknown DNSKEY algorithm + unknown NSEC3 hash algorithm # validates as insecure diff --git a/bin/tests/system/dnssec/tests_validation_many_anchors.py b/bin/tests/system/dnssec/tests_validation_many_anchors.py index 90b071ec58..a9865db2dc 100644 --- a/bin/tests/system/dnssec/tests_validation_many_anchors.py +++ b/bin/tests/system/dnssec/tests_validation_many_anchors.py @@ -9,11 +9,11 @@ # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. -from dns import edns import pytest import isctest +from isctest.compat import EDECode from isctest.util import param # isctest.asyncserver requires dnspython >= 2.0.0 @@ -60,8 +60,7 @@ def test_trust_anchors(): isctest.check.noerror(res1) isctest.check.noerror(res2) isctest.check.adflag(res2) - if hasattr(res2, "extended_errors"): - assert not res2.extended_errors() + isctest.check.noede(res2) msg = isctest.query.create("a.secure.managed", "A") res1 = isctest.query.tcp(msg, "10.53.0.3") @@ -69,18 +68,14 @@ def test_trust_anchors(): isctest.check.noerror(res1) isctest.check.noerror(res2) isctest.check.adflag(res2) - if hasattr(res2, "extended_errors"): - assert not res2.extended_errors() + isctest.check.noede(res2) # check that an unsupported signing algorithm yields insecure msg = isctest.query.create("a.unsupported.trusted", "A") res1 = isctest.query.tcp(msg, "10.53.0.3") res2 = isctest.query.tcp(msg, "10.53.0.5") isctest.check.noerror(res1) - if hasattr(res2, "extended_errors"): - assert ( - res2.extended_errors()[0].code == edns.EDECode.UNSUPPORTED_DNSKEY_ALGORITHM - ) + isctest.check.ede(res2, EDECode.UNSUPPORTED_DNSKEY_ALGORITHM) isctest.check.noerror(res2) isctest.check.noadflag(res2) @@ -88,10 +83,7 @@ def test_trust_anchors(): res1 = isctest.query.tcp(msg, "10.53.0.3") res2 = isctest.query.tcp(msg, "10.53.0.5") isctest.check.noerror(res1) - if hasattr(res2, "extended_errors"): - assert ( - res2.extended_errors()[0].code == edns.EDECode.UNSUPPORTED_DNSKEY_ALGORITHM - ) + isctest.check.ede(res2, EDECode.UNSUPPORTED_DNSKEY_ALGORITHM) isctest.check.noerror(res2) isctest.check.noadflag(res2) diff --git a/bin/tests/system/ede24/common.py b/bin/tests/system/ede24/common.py index 89b37b2993..069ddd2ef5 100644 --- a/bin/tests/system/ede24/common.py +++ b/bin/tests/system/ede24/common.py @@ -10,6 +10,7 @@ # information regarding copyright ownership. import isctest +from isctest.compat import EDECode def check_soa_noerror(): @@ -22,13 +23,7 @@ def check_soa_servfail_ede24(edemsg): msg = isctest.query.create("foo.fr", "SOA") res = isctest.query.udp(msg, "10.53.0.2") isctest.check.servfail(res) - - # Few CI machines uses old version of dnspython which doesn't supports - # EDNS, so we effectively bypass the check for those one. (It's fine, a - # bunch of other CI machines _does_ have recent version of dnspython). - if hasattr(res, "extended_errors"): - assert len(res.extended_errors()) == 1 - assert res.extended_errors()[0].to_text() == f"EDE 24 (Invalid Data): {edemsg}" + isctest.check.ede(res, EDECode.INVALID_DATA, edemsg) def check_ns2_ready(ns2): diff --git a/bin/tests/system/isctest/check.py b/bin/tests/system/isctest/check.py index b705bfee21..7ca7779a3d 100644 --- a/bin/tests/system/isctest/check.py +++ b/bin/tests/system/isctest/check.py @@ -12,15 +12,16 @@ import difflib import shutil import os -from typing import Optional +from typing import cast, List, Optional +import dns.edns import dns.flags import dns.message import dns.rcode import dns.zone import isctest.log -from isctest.compat import dns_rcode +from isctest.compat import dns_rcode, EDECode, EDEOption def rcode(message: dns.message.Message, expected_rcode) -> None: @@ -67,6 +68,54 @@ def noraflag(message: dns.message.Message) -> None: assert (message.flags & dns.flags.RA) == 0, str(message) +def _extract_ede_options( + message: dns.message.Message, +) -> List[EDEOption]: + """Extract EDE options from the DNS message.""" + return cast( + List[EDEOption], + [ + option + for option in message.options + if option.otype == dns.edns.OptionType.EDE + ], + ) + + +def noede(message: dns.message.Message) -> None: + """Check that message contains no EDE option.""" + if not hasattr(dns.edns, "EDECode"): + # dnspython<2.2.0 doesn't support EDE, skip check + return + + ede_options = _extract_ede_options(message) + assert not ede_options, f"unexpected EDE options {ede_options} in {message}" + + +def ede( + message: dns.message.Message, code: EDECode, text: Optional[str] = None +) -> None: + """Check if message contains expected EDE code (and its text).""" + if not hasattr(dns.edns, "EDECode"): + # dnspython<2.2.0 doesn't support EDE, skip check + return + + msg_opts = _extract_ede_options(message) + matching_opts = [opt for opt in msg_opts if opt.code == code] + + assert matching_opts, f"missing EDE code {code} in {message}" + + if text is None: + return + + # check at least one matching EDE option has the required text + for opt in matching_opts: + if opt.text == text: + return + opt_str = ", ".join([opt.to_text() for opt in matching_opts]) + assert False, f'EDE text "{text}" not found in [{opt_str}]' + + def section_equal(first_section: list, second_section: list) -> None: for rrset in first_section: assert ( diff --git a/bin/tests/system/isctest/compat.py b/bin/tests/system/isctest/compat.py index 5580f1f4c5..3dc5810745 100644 --- a/bin/tests/system/isctest/compat.py +++ b/bin/tests/system/isctest/compat.py @@ -9,8 +9,9 @@ # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. -from typing import Any +from typing import Any, TYPE_CHECKING +import dns.edns import dns.rcode # compatiblity with dnspython<2.0.0 @@ -22,3 +23,34 @@ except AttributeError: # In dnspython<2.0.0, selected rcodes are available as integers directly # from dns.rcode dns_rcode = dns.rcode + + +if TYPE_CHECKING: + EDECode = dns.edns.EDECode + EDEOption = dns.edns.EDEOption +else: + try: # compatiblity with dnspython<2.2.0 + EDECode = dns.edns.EDECode + except AttributeError: + # In dnspython<2.2.0, the dns.edns.EDECode doesn't exist. + # + # The primary use-case is for us to use existing EDECode objects from the + # class, e.g. EDECode.FILTERED. To mimick this behavior, use a string + # factory that just turns the attribute name into a string. + # + # The used compatibility hack doesn't really matter (as long as EDECode.xxx + # doesn't raise exception), as with dnspython versions prior to 2.2.0, any + # EDE checking will be skipped anyway. + class _CompatEDECode: + def __getattr__(self, name: str) -> str: + return name + + EDECode = _CompatEDECode() + try: + EDEOption = dns.edns.EDEOption + except AttributeError: + # In dnspython<2.2.0, the dns.edns.EDEOption doesn't exist, so we stub it to be + # able to use it in type annotations. + class EDEOption: + def __new__(cls, *args, **kwargs): + raise RuntimeError("Using EDEOption requires dnspython>=2.2.0")