mirror of
https://github.com/isc-projects/bind9.git
synced 2026-05-25 02:47:54 -04:00
new: test: Add isctest.check.ede() helper for pytest
Add a utility function to check for EDE codes present in the DNS message. The primary benefit of this helper function is that it handles the compatibility issues with different dnspython versions and the actual test code doesn't have to deal with that any more. Merge branch 'nicki/isctest-check-ede-helper' into 'main' See merge request isc-projects/bind9!11182
This commit is contained in:
commit
fbc9262c39
6 changed files with 102 additions and 47 deletions
|
|
@ -301,7 +301,7 @@ stages:
|
|||
.rule_mr_system_tests: &rule_mr_system_tests
|
||||
- if: '$CI_MERGE_REQUEST_DIFF_BASE_SHA != null'
|
||||
changes:
|
||||
- 'bin/tests/system/**'
|
||||
- 'bin/tests/system/**/*'
|
||||
|
||||
.rule_mr_manual: &rule_mr_manual
|
||||
- if: '$CI_MERGE_REQUEST_DIFF_BASE_SHA != null'
|
||||
|
|
|
|||
|
|
@ -14,11 +14,12 @@ import re
|
|||
import shutil
|
||||
import time
|
||||
|
||||
from dns import edns, flags, name, rdataclass, rdatatype
|
||||
from dns import flags, name, rdataclass, rdatatype
|
||||
|
||||
import pytest
|
||||
|
||||
import isctest
|
||||
from isctest.compat import EDECode
|
||||
import isctest.mark
|
||||
from isctest.util import param
|
||||
|
||||
|
|
@ -1131,8 +1132,7 @@ def test_expired_signatures(ns4):
|
|||
res = isctest.query.tcp(msg, "10.53.0.4")
|
||||
isctest.check.servfail(res)
|
||||
isctest.check.noadflag(res)
|
||||
if hasattr(res, "extended_errors"):
|
||||
assert res.extended_errors()[0].code == edns.EDECode.SIGNATURE_EXPIRED
|
||||
isctest.check.ede(res, EDECode.SIGNATURE_EXPIRED)
|
||||
assert grep_q("expired.example/.*: RRSIG has expired", "ns4/named.run")
|
||||
|
||||
# check future signatures do not validate
|
||||
|
|
@ -1140,8 +1140,7 @@ def test_expired_signatures(ns4):
|
|||
res = isctest.query.tcp(msg, "10.53.0.4")
|
||||
isctest.check.servfail(res)
|
||||
isctest.check.noadflag(res)
|
||||
if hasattr(res, "extended_errors"):
|
||||
assert res.extended_errors()[0].code == edns.EDECode.SIGNATURE_NOT_YET_VALID
|
||||
isctest.check.ede(res, EDECode.SIGNATURE_NOT_YET_VALID)
|
||||
assert grep_q(
|
||||
"future.example/.*: RRSIG validity period has not begun", "ns4/named.run"
|
||||
)
|
||||
|
|
@ -1301,10 +1300,7 @@ def test_unknown_algorithms():
|
|||
res = isctest.query.tcp(msg, "10.53.0.4")
|
||||
isctest.check.noerror(res)
|
||||
isctest.check.noadflag(res)
|
||||
if hasattr(res, "extended_errors"):
|
||||
assert (
|
||||
res.extended_errors()[0].code == edns.EDECode.UNSUPPORTED_DNSKEY_ALGORITHM
|
||||
)
|
||||
isctest.check.ede(res, EDECode.UNSUPPORTED_DNSKEY_ALGORITHM)
|
||||
|
||||
# check that DNSKEY with an unsupported reserve key validates
|
||||
msg = isctest.query.create("dnskey-unsupported-2.example", "DNSKEY")
|
||||
|
|
@ -1315,18 +1311,14 @@ def test_unknown_algorithms():
|
|||
# check EDE code 2 for unsupported DS digest algorithm
|
||||
msg = isctest.query.create("a.ds-unsupported.example", "A")
|
||||
res = isctest.query.tcp(msg, "10.53.0.4")
|
||||
if hasattr(res, "extended_errors"):
|
||||
assert res.extended_errors()[0].code == edns.EDECode.UNSUPPORTED_DS_DIGEST_TYPE
|
||||
isctest.check.ede(res, EDECode.UNSUPPORTED_DS_DIGEST_TYPE)
|
||||
|
||||
# check EDE code 1 for bad algorithm mnemonic
|
||||
msg = isctest.query.create("badalg.secure.example", "A")
|
||||
res = isctest.query.tcp(msg, "10.53.0.4")
|
||||
isctest.check.noerror(res)
|
||||
isctest.check.noadflag(res)
|
||||
if hasattr(res, "extended_errors"):
|
||||
assert (
|
||||
res.extended_errors()[0].code == edns.EDECode.UNSUPPORTED_DNSKEY_ALGORITHM
|
||||
)
|
||||
isctest.check.ede(res, EDECode.UNSUPPORTED_DNSKEY_ALGORITHM)
|
||||
|
||||
# check that zone contents are still secure despite disable-algorithms
|
||||
# on query name (name below zone name).
|
||||
|
|
@ -1342,10 +1334,7 @@ def test_unknown_algorithms():
|
|||
isctest.check.rr_count_eq(res.answer, 2)
|
||||
isctest.check.noerror(res)
|
||||
isctest.check.noadflag(res)
|
||||
if hasattr(res, "extended_errors"):
|
||||
assert (
|
||||
res.extended_errors()[0].code == edns.EDECode.UNSUPPORTED_DNSKEY_ALGORITHM
|
||||
)
|
||||
isctest.check.ede(res, EDECode.UNSUPPORTED_DNSKEY_ALGORITHM)
|
||||
|
||||
# check that DS records are still treated as secure at the
|
||||
# disable-algorithm name
|
||||
|
|
@ -1360,10 +1349,8 @@ def test_unknown_algorithms():
|
|||
msg = isctest.query.create("a.digest-alg-unsupported.example", "A")
|
||||
res = isctest.query.tcp(msg, "10.53.0.4")
|
||||
isctest.check.noadflag(res)
|
||||
if hasattr(res, "extended_errors"):
|
||||
codes = {ede.code for ede in res.extended_errors()}
|
||||
assert edns.EDECode.UNSUPPORTED_DNSKEY_ALGORITHM in codes
|
||||
assert edns.EDECode.UNSUPPORTED_DS_DIGEST_TYPE in codes
|
||||
isctest.check.ede(res, EDECode.UNSUPPORTED_DNSKEY_ALGORITHM)
|
||||
isctest.check.ede(res, EDECode.UNSUPPORTED_DS_DIGEST_TYPE)
|
||||
|
||||
# check that unknown DNSKEY algorithm + unknown NSEC3 hash algorithm
|
||||
# validates as insecure
|
||||
|
|
|
|||
|
|
@ -9,11 +9,11 @@
|
|||
# See the COPYRIGHT file distributed with this work for additional
|
||||
# information regarding copyright ownership.
|
||||
|
||||
from dns import edns
|
||||
|
||||
import pytest
|
||||
|
||||
import isctest
|
||||
from isctest.compat import EDECode
|
||||
from isctest.util import param
|
||||
|
||||
# isctest.asyncserver requires dnspython >= 2.0.0
|
||||
|
|
@ -60,8 +60,7 @@ def test_trust_anchors():
|
|||
isctest.check.noerror(res1)
|
||||
isctest.check.noerror(res2)
|
||||
isctest.check.adflag(res2)
|
||||
if hasattr(res2, "extended_errors"):
|
||||
assert not res2.extended_errors()
|
||||
isctest.check.noede(res2)
|
||||
|
||||
msg = isctest.query.create("a.secure.managed", "A")
|
||||
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
||||
|
|
@ -69,18 +68,14 @@ def test_trust_anchors():
|
|||
isctest.check.noerror(res1)
|
||||
isctest.check.noerror(res2)
|
||||
isctest.check.adflag(res2)
|
||||
if hasattr(res2, "extended_errors"):
|
||||
assert not res2.extended_errors()
|
||||
isctest.check.noede(res2)
|
||||
|
||||
# check that an unsupported signing algorithm yields insecure
|
||||
msg = isctest.query.create("a.unsupported.trusted", "A")
|
||||
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
||||
res2 = isctest.query.tcp(msg, "10.53.0.5")
|
||||
isctest.check.noerror(res1)
|
||||
if hasattr(res2, "extended_errors"):
|
||||
assert (
|
||||
res2.extended_errors()[0].code == edns.EDECode.UNSUPPORTED_DNSKEY_ALGORITHM
|
||||
)
|
||||
isctest.check.ede(res2, EDECode.UNSUPPORTED_DNSKEY_ALGORITHM)
|
||||
isctest.check.noerror(res2)
|
||||
isctest.check.noadflag(res2)
|
||||
|
||||
|
|
@ -88,10 +83,7 @@ def test_trust_anchors():
|
|||
res1 = isctest.query.tcp(msg, "10.53.0.3")
|
||||
res2 = isctest.query.tcp(msg, "10.53.0.5")
|
||||
isctest.check.noerror(res1)
|
||||
if hasattr(res2, "extended_errors"):
|
||||
assert (
|
||||
res2.extended_errors()[0].code == edns.EDECode.UNSUPPORTED_DNSKEY_ALGORITHM
|
||||
)
|
||||
isctest.check.ede(res2, EDECode.UNSUPPORTED_DNSKEY_ALGORITHM)
|
||||
isctest.check.noerror(res2)
|
||||
isctest.check.noadflag(res2)
|
||||
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@
|
|||
# information regarding copyright ownership.
|
||||
|
||||
import isctest
|
||||
from isctest.compat import EDECode
|
||||
|
||||
|
||||
def check_soa_noerror():
|
||||
|
|
@ -22,13 +23,7 @@ def check_soa_servfail_ede24(edemsg):
|
|||
msg = isctest.query.create("foo.fr", "SOA")
|
||||
res = isctest.query.udp(msg, "10.53.0.2")
|
||||
isctest.check.servfail(res)
|
||||
|
||||
# Few CI machines uses old version of dnspython which doesn't supports
|
||||
# EDNS, so we effectively bypass the check for those one. (It's fine, a
|
||||
# bunch of other CI machines _does_ have recent version of dnspython).
|
||||
if hasattr(res, "extended_errors"):
|
||||
assert len(res.extended_errors()) == 1
|
||||
assert res.extended_errors()[0].to_text() == f"EDE 24 (Invalid Data): {edemsg}"
|
||||
isctest.check.ede(res, EDECode.INVALID_DATA, edemsg)
|
||||
|
||||
|
||||
def check_ns2_ready(ns2):
|
||||
|
|
|
|||
|
|
@ -12,15 +12,16 @@
|
|||
import difflib
|
||||
import shutil
|
||||
import os
|
||||
from typing import Optional
|
||||
from typing import cast, List, Optional
|
||||
|
||||
import dns.edns
|
||||
import dns.flags
|
||||
import dns.message
|
||||
import dns.rcode
|
||||
import dns.zone
|
||||
|
||||
import isctest.log
|
||||
from isctest.compat import dns_rcode
|
||||
from isctest.compat import dns_rcode, EDECode, EDEOption
|
||||
|
||||
|
||||
def rcode(message: dns.message.Message, expected_rcode) -> None:
|
||||
|
|
@ -67,6 +68,54 @@ def noraflag(message: dns.message.Message) -> None:
|
|||
assert (message.flags & dns.flags.RA) == 0, str(message)
|
||||
|
||||
|
||||
def _extract_ede_options(
|
||||
message: dns.message.Message,
|
||||
) -> List[EDEOption]:
|
||||
"""Extract EDE options from the DNS message."""
|
||||
return cast(
|
||||
List[EDEOption],
|
||||
[
|
||||
option
|
||||
for option in message.options
|
||||
if option.otype == dns.edns.OptionType.EDE
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
def noede(message: dns.message.Message) -> None:
|
||||
"""Check that message contains no EDE option."""
|
||||
if not hasattr(dns.edns, "EDECode"):
|
||||
# dnspython<2.2.0 doesn't support EDE, skip check
|
||||
return
|
||||
|
||||
ede_options = _extract_ede_options(message)
|
||||
assert not ede_options, f"unexpected EDE options {ede_options} in {message}"
|
||||
|
||||
|
||||
def ede(
|
||||
message: dns.message.Message, code: EDECode, text: Optional[str] = None
|
||||
) -> None:
|
||||
"""Check if message contains expected EDE code (and its text)."""
|
||||
if not hasattr(dns.edns, "EDECode"):
|
||||
# dnspython<2.2.0 doesn't support EDE, skip check
|
||||
return
|
||||
|
||||
msg_opts = _extract_ede_options(message)
|
||||
matching_opts = [opt for opt in msg_opts if opt.code == code]
|
||||
|
||||
assert matching_opts, f"missing EDE code {code} in {message}"
|
||||
|
||||
if text is None:
|
||||
return
|
||||
|
||||
# check at least one matching EDE option has the required text
|
||||
for opt in matching_opts:
|
||||
if opt.text == text:
|
||||
return
|
||||
opt_str = ", ".join([opt.to_text() for opt in matching_opts])
|
||||
assert False, f'EDE text "{text}" not found in [{opt_str}]'
|
||||
|
||||
|
||||
def section_equal(first_section: list, second_section: list) -> None:
|
||||
for rrset in first_section:
|
||||
assert (
|
||||
|
|
|
|||
|
|
@ -9,8 +9,9 @@
|
|||
# See the COPYRIGHT file distributed with this work for additional
|
||||
# information regarding copyright ownership.
|
||||
|
||||
from typing import Any
|
||||
from typing import Any, TYPE_CHECKING
|
||||
|
||||
import dns.edns
|
||||
import dns.rcode
|
||||
|
||||
# compatiblity with dnspython<2.0.0
|
||||
|
|
@ -22,3 +23,34 @@ except AttributeError:
|
|||
# In dnspython<2.0.0, selected rcodes are available as integers directly
|
||||
# from dns.rcode
|
||||
dns_rcode = dns.rcode
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
EDECode = dns.edns.EDECode
|
||||
EDEOption = dns.edns.EDEOption
|
||||
else:
|
||||
try: # compatiblity with dnspython<2.2.0
|
||||
EDECode = dns.edns.EDECode
|
||||
except AttributeError:
|
||||
# In dnspython<2.2.0, the dns.edns.EDECode doesn't exist.
|
||||
#
|
||||
# The primary use-case is for us to use existing EDECode objects from the
|
||||
# class, e.g. EDECode.FILTERED. To mimick this behavior, use a string
|
||||
# factory that just turns the attribute name into a string.
|
||||
#
|
||||
# The used compatibility hack doesn't really matter (as long as EDECode.xxx
|
||||
# doesn't raise exception), as with dnspython versions prior to 2.2.0, any
|
||||
# EDE checking will be skipped anyway.
|
||||
class _CompatEDECode:
|
||||
def __getattr__(self, name: str) -> str:
|
||||
return name
|
||||
|
||||
EDECode = _CompatEDECode()
|
||||
try:
|
||||
EDEOption = dns.edns.EDEOption
|
||||
except AttributeError:
|
||||
# In dnspython<2.2.0, the dns.edns.EDEOption doesn't exist, so we stub it to be
|
||||
# able to use it in type annotations.
|
||||
class EDEOption:
|
||||
def __new__(cls, *args, **kwargs):
|
||||
raise RuntimeError("Using EDEOption requires dnspython>=2.2.0")
|
||||
|
|
|
|||
Loading…
Reference in a new issue