diff --git a/bin/tests/system/filter-aaaa/tests_filter_aaaa.py b/bin/tests/system/filter-aaaa/tests_filter_aaaa.py index 920a8876f3..b11addc263 100644 --- a/bin/tests/system/filter-aaaa/tests_filter_aaaa.py +++ b/bin/tests/system/filter-aaaa/tests_filter_aaaa.py @@ -37,13 +37,14 @@ pytestmark = pytest.mark.extra_artifacts( # helper functions def reset_server(server, family, ftype, servers, templates): - templates.render(f"{server}/named.conf", - {"family": family, "filtertype": ftype}) + templates.render(f"{server}/named.conf", {"family": family, "filtertype": ftype}) servers[server].reconfigure(log=False) filter_family = "v4" filter_type = "aaaa" + + def reset_servers(family, ftype, servers, templates): reset_server("ns1", family, ftype, servers, templates) reset_server("ns2", family, ftype, servers, templates) @@ -52,6 +53,63 @@ def reset_servers(family, ftype, servers, templates): filter_family = family +def check_aaaa_only(dest, source, qname, expected, adflag): + msg = isctest.query.create(qname, "aaaa") + res = isctest.query.tcp(msg, dest, source=source) + isctest.check.noerror(res) + if adflag: + isctest.check.adflag(res) + else: + isctest.check.noadflag(res) + assert not [a for a in res.answer if a.rdtype == rdatatype.A] + aaaa = res.answer[0] + assert aaaa.rdtype == rdatatype.AAAA + assert expected in str(aaaa[0]) + + +def check_any(dest, source, qname, expected4, expected6, do): + if do: + msg = isctest.query.create(qname, "any") # sends DO=1 + else: + msg = message.make_query(qname, "any") # sends DO=0 + res = isctest.query.tcp(msg, dest, source=source) + isctest.check.noerror(res) + records = sum([str(a).splitlines() for a in res.answer], []) + if expected4: + assert any(expected4 in r for r in records), str(res) + else: + assert not any(a.rdtype == rdatatype.A for a in res.answer), str(res) + if expected6: + assert any(expected6 in r for r in records), str(res) + else: + assert not any(a.rdtype == rdatatype.AAAA for a in res.answer), str(res) + + +def check_nodata(dest, source, qname, qtype, do, adflag): + if do: + msg = isctest.query.create(qname, qtype) # sends DO=1 + else: + msg = message.make_query(qname, qtype) # sends DO=0 + res = isctest.query.tcp(msg, dest, source=source) + isctest.check.noerror(res) + isctest.check.empty_answer(res) + if adflag: + isctest.check.adflag(res) + else: + isctest.check.noadflag(res) + + +def check_additional(dest, source, qname, qtype, expect_aaaa, adcount): + msg = isctest.query.create(qname, qtype) + res = isctest.query.tcp(msg, dest, source=source) + isctest.check.noerror(res) + isctest.check.rr_count_eq(res.additional, adcount) + if expect_aaaa: + assert [a for a in res.additional if a.rdtype == rdatatype.AAAA] + else: + assert not [a for a in res.additional if a.rdtype == rdatatype.AAAA] + + # run the checkconf tests def test_checkconf(): for filename in glob.glob("conf/good*.conf"): @@ -61,1341 +119,161 @@ def test_checkconf(): isctest.run.cmd([os.environ["CHECKCONF"], filename]) -# These tests are against an authoritative server configured with: -## filter-aaaa-on-v4 yes; -## filter-aaaa { 10.53.0.1; }; -def test_auth_filter_aaaa_on_v4(servers, templates): - if filter_family != "v4" or filter_type != "aaaa": - reset_servers("v4", "aaaa", servers, templates) +def check_filter(addr, altaddr, break_dnssec, recursive): + if recursive: + # (when testing recursive, we need to prime the cache first with + # the MX addresses, since additional section data isn't included + # unless it's been validated.) + for name in ["mx", "ns"]: + for zone in ["signed", "unsigned"]: + for qtype in ["a", "aaaa"]: + isctest.query.tcp(isctest.query.create(f"{name}.{zone}", qtype), addr) # check that AAAA is returned when only AAAA record exists, signed - msg = isctest.query.create("aaaa-only.signed", "aaaa") - res = isctest.query.tcp(msg, "10.53.0.1", source="10.53.0.1") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 2) - aaaa, _ = res.answer - assert "2001:db8::2" in str(aaaa[0]) + check_aaaa_only(addr, addr, "aaaa-only.signed", "2001:db8::2", recursive) # check that AAAA is returned when only AAAA record exists, unsigned - msg = isctest.query.create("aaaa-only.unsigned", "aaaa") - res = isctest.query.tcp(msg, "10.53.0.1", source="10.53.0.1") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 1) - aaaa = res.answer[0] - assert "2001:db8::5" in str(aaaa[0]) + check_aaaa_only(addr, addr, "aaaa-only.unsigned", "2001:db8::5", False) # check that NODATA/NOERROR is returned when both AAAA and A exist, # signed, DO=0 - msg = message.make_query("dual.signed", "aaaa") # sends DO=0 - res = isctest.query.tcp(msg, "10.53.0.1", source="10.53.0.1") - isctest.check.noerror(res) - isctest.check.empty_answer(res) + check_nodata(addr, addr, "dual.signed", "aaaa", False, False) # check that NODATA/NOERROR is returned when both AAAA and A exist, # unsigned, DO=0 - msg = message.make_query("dual.unsigned", "aaaa") # sends DO=0 - res = isctest.query.tcp(msg, "10.53.0.1", source="10.53.0.1") - isctest.check.noerror(res) - isctest.check.empty_answer(res) + check_nodata(addr, addr, "dual.unsigned", "aaaa", False, False) - # check that AAAA is returned when both AAAA and A exist, signed, DO=1 - msg = isctest.query.create("dual.signed", "aaaa") - res = isctest.query.tcp(msg, "10.53.0.1", source="10.53.0.1") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 2) - aaaa = res.answer[0] - assert "2001:db8::3" in str(aaaa[0]) + # check that AAAA is returned when both AAAA and A exist, signed, + # DO=1, unless break-dnssec is enabled + if break_dnssec: + check_nodata(addr, addr, "dual.signed", "aaaa", False, False) + else: + check_aaaa_only(addr, addr, "dual.signed", "2001:db8::3", recursive) # check that NODATA/NOERROR is returned when both AAAA and A exist, # unsigned, DO=1 - msg = isctest.query.create("dual.unsigned", "aaaa") - res = isctest.query.tcp(msg, "10.53.0.1", source="10.53.0.1") - isctest.check.noerror(res) - isctest.check.empty_answer(res) + check_nodata(addr, addr, "dual.unsigned", "aaaa", recursive, False) # check that AAAA is returned if both AAAA and A exist and the query # source doesn't match the ACL - msg = isctest.query.create("dual.unsigned", "aaaa") - res = isctest.query.tcp(msg, "10.53.0.1", source="10.53.0.2") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 1) - aaaa = res.answer[0] - assert "2001:db8::6" in str(aaaa[0]) + check_aaaa_only(addr, altaddr, "dual.unsigned", "2001:db8::6", False) # check that A (and not AAAA) is returned if both AAAA and A exist, # signed, qtype=ANY, DO=0 - msg = message.make_query("dual.signed", "any") # sends DO=0 - res = isctest.query.tcp(msg, "10.53.0.1", source="10.53.0.1") - isctest.check.noerror(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.3" in r for r in records) - assert not any("2001:db8::3" in r for r in records) + check_any(addr, addr, "dual.signed", "1.0.0.3", None, False) # check that both A and AAAA are returned if both AAAA and A exist, - # signed, qtype=ANY, DO=1 - msg = isctest.query.create("dual.signed", "any") - res = isctest.query.tcp(msg, "10.53.0.1", source="10.53.0.1") - isctest.check.noerror(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.3" in r for r in records) - assert any("2001:db8::3" in r for r in records) + # signed, qtype=ANY, DO=1, unless break-dnssec is enabled + if break_dnssec: + check_any(addr, addr, "dual.signed", "1.0.0.3", None, True) + else: + check_any(addr, addr, "dual.signed", "1.0.0.3", "2001:db8::3", True) # check that A (and not AAAA) is returned if both AAAA and A exist, # unsigned, qtype=ANY, DO=0 - msg = message.make_query("dual.unsigned", "any") # sends DO=0 - res = isctest.query.tcp(msg, "10.53.0.1", source="10.53.0.1") - isctest.check.noerror(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.6" in r for r in records) - assert not any("2001:db8::6" in r for r in records) + check_any(addr, addr, "dual.unsigned", "1.0.0.6", None, False) # check that A (and not AAAA) is returned if both AAAA and A exist, # unsigned, qtype=ANY, DO=1 - msg = isctest.query.create("dual.unsigned", "any") - res = isctest.query.tcp(msg, "10.53.0.1", source="10.53.0.1") - isctest.check.noerror(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.6" in r for r in records) - assert not any("2001:db8::6" in r for r in records) + check_any(addr, addr, "dual.unsigned", "1.0.0.6", None, True) # check that both A and AAAA are returned if both AAAA and A exist, # signed, qtype=ANY, query source does not match ACL - msg = isctest.query.create("dual.unsigned", "any") - res = isctest.query.tcp(msg, "10.53.0.1", source="10.53.0.2") - isctest.check.noerror(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.6" in r for r in records) - assert any("2001:db8::6" in r for r in records) + check_any(addr, altaddr, "dual.unsigned", "1.0.0.6", "2001:db8::6", True) # check that AAAA is omitted from additional section, qtype=NS, unsigned - msg = isctest.query.create("unsigned", "ns") - res = isctest.query.tcp(msg, "10.53.0.1", source="10.53.0.1") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.additional, 1) - assert not [a for a in res.additional if a.rdtype == rdatatype.AAAA] + check_additional(addr, addr, "unsigned", "ns", False, 1) # check that AAAA is omitted from additional section, qtype=MX, unsigned - msg = isctest.query.create("unsigned", "mx") - res = isctest.query.tcp(msg, "10.53.0.1", source="10.53.0.1") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.additional, 2) - assert not [a for a in res.additional if a.rdtype == rdatatype.AAAA] - - # check that AAAA is included in additional section, qtype=MX, signed - msg = isctest.query.create("signed", "mx") - res = isctest.query.tcp(msg, "10.53.0.1", source="10.53.0.1") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 2) - assert [a for a in res.additional if a.rdtype == rdatatype.AAAA] - - -@isctest.mark.with_ipv6 -def test_auth_filter_aaaa_on_v4_via_v6(servers, templates): - if filter_family != "v4" or filter_type != "aaaa": - reset_servers("v4", "aaaa", servers, templates) - - # check that AAAA is returned when both AAAA and A record exists, - # unsigned, over IPv6 - msg = isctest.query.create("dual.unsigned", "aaaa") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::1") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 1) - aaaa = res.answer[0] - assert "2001:db8::6" in str(aaaa[0]) - - # check that AAAA is included in additional section, qtype=MX, - # unsigned, over IPv6 - msg = isctest.query.create("unsigned", "mx") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::1") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 1) - assert [a for a in res.additional if a.rdtype == rdatatype.AAAA] - - -# These tests are against an authoritative server configured with: -## filter-aaaa-on-v4 break-dnssec; -## filter-aaaa { 10.53.0.4; }; -def test_auth_break_dnssec_filter_aaaa_on_v4(servers, templates): - if filter_family != "v4" or filter_type != "aaaa": - reset_servers("v4", "aaaa", servers, templates) - - # check that AAAA is returned when only AAAA record exists, signed, - # with break-dnssec - msg = isctest.query.create("aaaa-only.signed", "aaaa") - res = isctest.query.tcp(msg, "10.53.0.4", source="10.53.0.4") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 2) - aaaa, _ = res.answer - assert "2001:db8::2" in str(aaaa[0]) - - # check that AAAA is returned when only AAAA record exists, unsigned, - # with break-dnssec - msg = isctest.query.create("aaaa-only.unsigned", "aaaa") - res = isctest.query.tcp(msg, "10.53.0.4", source="10.53.0.4") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 1) - aaaa = res.answer[0] - assert "2001:db8::5" in str(aaaa[0]) - - # check that NODATA/NOERROR is returned when both AAAA and A exist, - # signed, DO=0, with break-dnssec - msg = message.make_query("dual.signed", "aaaa") # sends DO=0 - res = isctest.query.tcp(msg, "10.53.0.4", source="10.53.0.4") - isctest.check.noerror(res) - isctest.check.empty_answer(res) - - # check that NODATA/NOERROR is returned when both AAAA and A exist, - # unsigned, DO=0, with break-dnssec - msg = message.make_query("dual.unsigned", "aaaa") # sends DO=0 - res = isctest.query.tcp(msg, "10.53.0.4", source="10.53.0.4") - isctest.check.noerror(res) - isctest.check.empty_answer(res) - - # check that AAAA is returned when both AAAA and A exist, signed, DO=1, - # with break-dnssec - msg = isctest.query.create("dual.signed", "aaaa") - res = isctest.query.tcp(msg, "10.53.0.4", source="10.53.0.4") - isctest.check.noerror(res) - isctest.check.empty_answer(res) - - # check that NODATA/NOERROR is returned when both AAAA and A exist, - # unsigned, DO=1, with break-dnssec - msg = isctest.query.create("dual.unsigned", "aaaa") - res = isctest.query.tcp(msg, "10.53.0.4", source="10.53.0.4") - isctest.check.noerror(res) - isctest.check.empty_answer(res) - - # check that AAAA is returned if both AAAA and A exist and the query - # source doesn't match the ACL, with break-dnssec - msg = isctest.query.create("dual.unsigned", "aaaa") - res = isctest.query.tcp(msg, "10.53.0.4", source="10.53.0.2") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 1) - aaaa = res.answer[0] - assert "2001:db8::6" in str(aaaa[0]) - - # check that A (and not AAAA) is returned if both AAAA and A exist, - # signed, qtype=ANY, DO=0, with break-dnssec - msg = message.make_query("dual.signed", "any") # sends DO=0 - res = isctest.query.tcp(msg, "10.53.0.4", source="10.53.0.4") - isctest.check.noerror(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.3" in r for r in records) - assert not any("2001:db8::3" in r for r in records) - - # check that A (and not AAAA) is returned if both AAAA and A exist, - # signed, qtype=ANY, DO=1, with break-dnssec - msg = isctest.query.create("dual.signed", "any") - res = isctest.query.tcp(msg, "10.53.0.4", source="10.53.0.4") - isctest.check.noerror(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.3" in r for r in records) - assert not any("2001:db8::3" in r for r in records) - - # check that A (and not AAAA) is returned if both AAAA and A exist, - # unsigned, qtype=ANY, DO=0, with break-dnssec - msg = message.make_query("dual.unsigned", "any") # sends DO=0 - res = isctest.query.tcp(msg, "10.53.0.4", source="10.53.0.4") - isctest.check.noerror(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.6" in r for r in records) - assert not any("2001:db8::6" in r for r in records) - - # check that A (and not AAAA) is returned if both AAAA and A exist, - # unsigned, qtype=ANY, DO=1, with break-dnssec - msg = isctest.query.create("dual.unsigned", "any") - res = isctest.query.tcp(msg, "10.53.0.4", source="10.53.0.4") - isctest.check.noerror(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.6" in r for r in records) - assert not any("2001:db8::6" in r for r in records) - - # check that both A and AAAA are returned if both AAAA and A exist, - # signed, qtype=ANY, query source does not match ACL, with break-dnssec - msg = isctest.query.create("dual.unsigned", "any") - res = isctest.query.tcp(msg, "10.53.0.4", source="10.53.0.2") - isctest.check.noerror(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.6" in r for r in records) - assert any("2001:db8::6" in r for r in records) - - # check that AAAA is omitted from additional section, qtype=NS, - # unsigned, with break-dnssec - msg = isctest.query.create("unsigned", "ns") - res = isctest.query.tcp(msg, "10.53.0.4", source="10.53.0.4") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.additional, 1) - assert not [a for a in res.additional if a.rdtype == rdatatype.AAAA] - - # check that AAAA is omitted from additional section, qtype=MX, - # unsigned, with break-dnssec - msg = isctest.query.create("unsigned", "mx") - res = isctest.query.tcp(msg, "10.53.0.4", source="10.53.0.4") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.additional, 2) - assert not [a for a in res.additional if a.rdtype == rdatatype.AAAA] - - # check that AAAA is omitted from additional section, qtype=MX, - # signed, with break-dnssec - msg = isctest.query.create("signed", "mx") - res = isctest.query.tcp(msg, "10.53.0.4", source="10.53.0.4") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 2) - assert not [a for a in res.additional if a.rdtype == rdatatype.AAAA] - - -@isctest.mark.with_ipv6 -def test_auth_break_dnssec_filter_aaaa_on_v4_via_v6(servers, templates): - if filter_family != "v4" or filter_type != "aaaa": - reset_servers("v4", "aaaa", servers, templates) - - # check that AAAA is returned when both AAAA and A record exists, - # unsigned, over IPv6, with break-dnssec - msg = isctest.query.create("dual.unsigned", "aaaa") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::4") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 1) - aaaa = res.answer[0] - assert "2001:db8::6" in str(aaaa[0]) - - # check that AAAA is included in additional section, qtype=MX, - # unsigned, over IPv6, with break-dnssec - msg = isctest.query.create("unsigned", "mx") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::4") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 1) - assert [a for a in res.additional if a.rdtype == rdatatype.AAAA] - - -# These tests are against a recursive server configured with: -## filter-aaaa-on-v4 yes; -## filter-aaaa { 10.53.0.2; }; -def test_recursive_filter_aaaa_on_v4(servers, templates): - if filter_family != "v4" or filter_type != "aaaa": - reset_servers("v4", "aaaa", servers, templates) - - # check that AAAA is returned when only AAAA record exists, signed, - # recursive - msg = isctest.query.create("aaaa-only.signed", "aaaa") - res = isctest.query.tcp(msg, "10.53.0.2", source="10.53.0.2") - isctest.check.noerror(res) - isctest.check.adflag(res) - isctest.check.rr_count_eq(res.authority, 2) - aaaa, _ = res.answer - assert "2001:db8::2" in str(aaaa[0]) - - # check that AAAA is returned when only AAAA record exists, unsigned, - # recursive - msg = isctest.query.create("aaaa-only.unsigned", "aaaa") - res = isctest.query.tcp(msg, "10.53.0.2", source="10.53.0.2") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 2) - aaaa = res.answer[0] - assert "2001:db8::5" in str(aaaa[0]) - - # check that NODATA/NOERROR is returned when both AAAA and A exist, - # signed, DO=0, recursive - msg = message.make_query("dual.signed", "aaaa") # sends DO=0 - res = isctest.query.tcp(msg, "10.53.0.2", source="10.53.0.2") - isctest.check.noerror(res) - isctest.check.noadflag(res) - isctest.check.empty_answer(res) - - # check that NODATA/NOERROR is returned when both AAAA and A exist, - # unsigned, DO=0, recursive - msg = message.make_query("dual.unsigned", "aaaa") # sends DO=0 - res = isctest.query.tcp(msg, "10.53.0.2", source="10.53.0.2") - isctest.check.noerror(res) - isctest.check.noadflag(res) - isctest.check.empty_answer(res) - - # check that AAAA is returned when both AAAA and A exist, signed, DO=1, - # recursive - msg = isctest.query.create("dual.signed", "aaaa") - res = isctest.query.tcp(msg, "10.53.0.2", source="10.53.0.2") - isctest.check.noerror(res) - isctest.check.adflag(res) - isctest.check.rr_count_eq(res.authority, 2) - aaaa = res.answer[0] - assert "2001:db8::3" in str(aaaa[0]) - - # check that NODATA/NOERROR is returned when both AAAA and A exist, - # unsigned, DO=1, recursive - msg = isctest.query.create("dual.unsigned", "aaaa") - res = isctest.query.tcp(msg, "10.53.0.2", source="10.53.0.2") - isctest.check.noadflag(res) - isctest.check.noerror(res) - isctest.check.empty_answer(res) - - # check that AAAA is returned if both AAAA and A exist and the query - # source doesn't match the ACL - msg = isctest.query.create("dual.unsigned", "aaaa") - res = isctest.query.tcp(msg, "10.53.0.2", source="10.53.0.1") - isctest.check.noerror(res) - isctest.check.noadflag(res) - isctest.check.rr_count_eq(res.authority, 1) - aaaa = res.answer[0] - assert "2001:db8::6" in str(aaaa[0]) - - # check that A (and not AAAA) is returned if both AAAA and A exist, - # signed, qtype=ANY, DO=0, recursive - msg = message.make_query("dual.signed", "any") # sends DO=0 - res = isctest.query.tcp(msg, "10.53.0.2", source="10.53.0.2") - isctest.check.noerror(res) - isctest.check.noadflag(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.3" in r for r in records) - assert not any("2001:db8::3" in r for r in records) - - # check that both A and AAAA are returned if both AAAA and A exist, - # signed, qtype=ANY, DO=1, recursive - msg = isctest.query.create("dual.signed", "any") - res = isctest.query.tcp(msg, "10.53.0.2", source="10.53.0.2") - isctest.check.noerror(res) - isctest.check.adflag(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.3" in r for r in records) - assert any("2001:db8::3" in r for r in records) - - # check that A (and not AAAA) is returned if both AAAA and A exist, - # unsigned, qtype=ANY, DO=0, recursive - msg = message.make_query("dual.unsigned", "any") # sends DO=0 - res = isctest.query.tcp(msg, "10.53.0.2", source="10.53.0.2") - isctest.check.noerror(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.6" in r for r in records) - assert not any("2001:db8::6" in r for r in records) - - # check that A (and not AAAA) is returned if both AAAA and A exist, - # unsigned, qtype=ANY, DO=1, recursive - msg = isctest.query.create("dual.unsigned", "any") - res = isctest.query.tcp(msg, "10.53.0.2", source="10.53.0.2") - isctest.check.noerror(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.6" in r for r in records) - assert not any("2001:db8::6" in r for r in records) - - # check that both A and AAAA are returned if both AAAA and A exist, - # signed, qtype=ANY, query source does not match ACL, recursive - msg = isctest.query.create("dual.unsigned", "any") - res = isctest.query.tcp(msg, "10.53.0.2", source="10.53.0.1") - isctest.check.noerror(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.6" in r for r in records) - assert any("2001:db8::6" in r for r in records) - - # check that AAAA is omitted from additional section, qtype=NS, - # unsigned, recursive - msg = isctest.query.create("unsigned", "ns") - res = isctest.query.tcp(msg, "10.53.0.2", source="10.53.0.2") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.additional, 1) - assert not [a for a in res.additional if a.rdtype == rdatatype.AAAA] - - # check that AAAA is omitted from additional section, qtype=MX, - # unsigned, recursive - msg = isctest.query.create("unsigned", "mx") - res = isctest.query.tcp(msg, "10.53.0.2", source="10.53.0.2") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.additional, 2) - assert not [a for a in res.additional if a.rdtype == rdatatype.AAAA] - - # (we need to prime the cache first with the MX addresses, since - # additional section data isn't included unless it's already validated.) - msg = isctest.query.create("mx.signed", "a") - isctest.query.tcp(msg, "10.53.0.2") - msg = isctest.query.create("mx.signed", "aaaa") - isctest.query.tcp(msg, "10.53.0.2") + check_additional(addr, addr, "unsigned", "mx", False, 2) # check that AAAA is included in additional section, qtype=MX, signed, - # recursive - msg = isctest.query.create("signed", "mx") - res = isctest.query.tcp(msg, "10.53.0.2", source="10.53.0.2") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 2) - assert [a for a in res.additional if a.rdtype == rdatatype.AAAA] + # unless break-dnssec is enabled + if break_dnssec: + check_additional(addr, addr, "signed", "mx", False, 4) + else: + check_additional(addr, addr, "signed", "mx", True, 8) + + +def check_filter_other_family(addr): + # check that AAAA is returned when both AAAA and A record exists, + # unsigned, over IPv6 + check_aaaa_only(addr, addr, "dual.unsigned", "2001:db8::6", False) + + # check that AAAA is included in additional section, qtype=MX, + # unsigned, over IPv6 + check_additional(addr, addr, "unsigned", "mx", True, 4) + + +def test_filter_aaaa_on_v4(servers, templates): + if filter_family != "v4" or filter_type != "aaaa": + reset_servers("v4", "aaaa", servers, templates) + + # ns1: auth, configured with: + ## filter-aaaa-on-v4 yes; + ## filter-aaaa { 10.53.0.1; }; + check_filter("10.53.0.1", "10.53.0.2", False, False) + + # ns4: auth, configured with: + ## filter-aaaa-on-v4 break-dnssec; + ## filter-aaaa { 10.53.0.4; }; + check_filter("10.53.0.4", "10.53.0.2", True, False) + + # ns2: recursive, configured with: + ## filter-aaaa-on-v4 yes; + ## filter-aaaa { 10.53.0.2; }; + check_filter("10.53.0.2", "10.53.0.1", False, True) + + # ns3: recursive, configured with: + ## filter-aaaa-on-v4 break-dnssec; + ## filter-aaaa { 10.53.0.3; }; + check_filter("10.53.0.3", "10.53.0.1", True, True) @isctest.mark.with_ipv6 -def test_recursive_filter_aaaa_on_v4_via_v6(servers, templates): +def test_filter_aaaa_on_v4_via_v6(servers, templates): if filter_family != "v4" or filter_type != "aaaa": reset_servers("v4", "aaaa", servers, templates) - # check that AAAA is returned when both AAAA and A record exists, - # unsigned, over IPv6, recursive - msg = isctest.query.create("dual.unsigned", "aaaa") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::4") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 1) - aaaa = res.answer[0] - assert "2001:db8::6" in str(aaaa[0]) - - # check that AAAA is included in additional section, qtype=MX, - # unsigned, over IPv6, recursive - msg = isctest.query.create("unsigned", "mx") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::4") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 1) - assert [a for a in res.additional if a.rdtype == rdatatype.AAAA] - - -# These tests are against a recursive server configured with: -## filter-aaaa-on-v4 break-dnssec; -## filter-aaaa { 10.53.0.3; }; -def test_recursive_break_dnssec_filter_aaaa_on_v4(servers, templates): - if filter_family != "v4" or filter_type != "aaaa": - reset_servers("v4", "aaaa", servers, templates) - - # check that AAAA is returned when only AAAA record exists, signed, - # recursive, with break-dnssec - msg = isctest.query.create("aaaa-only.signed", "aaaa") - res = isctest.query.tcp(msg, "10.53.0.3", source="10.53.0.3") - isctest.check.noerror(res) - isctest.check.adflag(res) - aaaa, _ = res.answer - assert "2001:db8::2" in str(aaaa[0]) - - # check that AAAA is returned when only AAAA record exists, unsigned, - # recursive, with break-dnssec - msg = isctest.query.create("aaaa-only.unsigned", "aaaa") - res = isctest.query.tcp(msg, "10.53.0.3", source="10.53.0.3") - isctest.check.noerror(res) - aaaa = res.answer[0] - assert "2001:db8::5" in str(aaaa[0]) - - # check that NODATA/NOERROR is returned when both AAAA and A exist, - # signed, DO=0, recursive, with break-dnssec - msg = message.make_query("dual.signed", "aaaa") # sends DO=0 - res = isctest.query.tcp(msg, "10.53.0.3", source="10.53.0.3") - isctest.check.noerror(res) - isctest.check.noadflag(res) - isctest.check.empty_answer(res) - - # check that NODATA/NOERROR is returned when both AAAA and A exist, - # unsigned, DO=0, recursive, with break-dnssec - msg = message.make_query("dual.unsigned", "aaaa") # sends DO=0 - res = isctest.query.tcp(msg, "10.53.0.3", source="10.53.0.3") - isctest.check.noerror(res) - isctest.check.noadflag(res) - isctest.check.empty_answer(res) - - # check that NODATA/NOERROR is returned when both AAAA and A exist, - # unsigned, DO=1, recursive, with break-dnssec - msg = isctest.query.create("dual.signed", "aaaa") - res = isctest.query.tcp(msg, "10.53.0.3", source="10.53.0.3") - isctest.check.noerror(res) - isctest.check.noadflag(res) - isctest.check.empty_answer(res) - - # check that NODATA/NOERROR is returned when both AAAA and A exist, - # unsigned, DO=1, recursive, with break-dnssec - msg = isctest.query.create("dual.unsigned", "aaaa") - res = isctest.query.tcp(msg, "10.53.0.3", source="10.53.0.3") - isctest.check.noadflag(res) - isctest.check.noerror(res) - isctest.check.empty_answer(res) - - # check that AAAA is returned if both AAAA and A exist and the query - # source doesn't match the ACL, with break-dnssec - msg = isctest.query.create("dual.unsigned", "aaaa") - res = isctest.query.tcp(msg, "10.53.0.3", source="10.53.0.1") - isctest.check.noerror(res) - isctest.check.noadflag(res) - isctest.check.rr_count_eq(res.authority, 1) - aaaa = res.answer[0] - assert "2001:db8::6" in str(aaaa[0]) - - # check that A (and not AAAA) is returned if both AAAA and A exist, - # signed, qtype=ANY, DO=0, recursive, with break-dnssec - msg = message.make_query("dual.signed", "any") # sends DO=0 - res = isctest.query.tcp(msg, "10.53.0.3", source="10.53.0.3") - isctest.check.noerror(res) - isctest.check.noadflag(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.3" in r for r in records) - assert not any("2001:db8::3" in r for r in records) - - # check that A (and not AAAA) is returned if both AAAA and A exist, - # signed, qtype=ANY, DO=0, recursive, with break-dnssec - msg = isctest.query.create("dual.signed", "any") - res = isctest.query.tcp(msg, "10.53.0.3", source="10.53.0.3") - isctest.check.noerror(res) - isctest.check.noadflag(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.3" in r for r in records) - assert not any("2001:db8::3" in r for r in records) - - # check that A (and not AAAA) is returned if both AAAA and A exist, - # unsigned, qtype=ANY, DO=0, recursive, with break-dnssec - msg = message.make_query("dual.unsigned", "any") # sends DO=0 - res = isctest.query.tcp(msg, "10.53.0.3", source="10.53.0.3") - isctest.check.noerror(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.6" in r for r in records) - assert not any("2001:db8::6" in r for r in records) - - # check that A (and not AAAA) is returned if both AAAA and A exist, - # unsigned, qtype=ANY, DO=1, recursive, with break-dnssec - msg = isctest.query.create("dual.unsigned", "any") - res = isctest.query.tcp(msg, "10.53.0.3", source="10.53.0.3") - isctest.check.noerror(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.6" in r for r in records) - assert not any("2001:db8::6" in r for r in records) - - # check that both A and AAAA are returned if both AAAA and A exist, - # signed, qtype=ANY, query source does not match ACL, recursive, with - # break-dnssec - msg = isctest.query.create("dual.unsigned", "any") - res = isctest.query.tcp(msg, "10.53.0.3", source="10.53.0.1") - isctest.check.noerror(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.6" in r for r in records) - assert any("2001:db8::6" in r for r in records) - - # check that AAAA is omitted from additional section, qtype=NS, - # unsigned, recursive, with break-dnssec - msg = isctest.query.create("unsigned", "ns") - res = isctest.query.tcp(msg, "10.53.0.3", source="10.53.0.3") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.additional, 1) - assert not [a for a in res.additional if a.rdtype == rdatatype.AAAA] - - # check that AAAA is omitted from additional section, qtype=MX, - # unsigned, recursive, with break-dnssec - msg = isctest.query.create("unsigned", "mx") - res = isctest.query.tcp(msg, "10.53.0.3", source="10.53.0.3") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.additional, 2) - assert not [a for a in res.additional if a.rdtype == rdatatype.AAAA] - - # check that AAAA is omitted from additional section, qtype=MX, signed, - # recursive, with break-dnssec - msg = isctest.query.create("signed", "mx") - res = isctest.query.tcp(msg, "10.53.0.3", source="10.53.0.3") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 2) - assert not [a for a in res.additional if a.rdtype == rdatatype.AAAA] - - -@isctest.mark.with_ipv6 -def test_recursive_break_dnssec_filter_aaaa_on_v4_via_v6(servers, templates): - if filter_family != "v4" or filter_type != "aaaa": - reset_servers("v4", "aaaa", servers, templates) - - # check that AAAA is returned when both AAAA and A record exists, - # unsigned, over IPv6, recursive - msg = isctest.query.create("dual.unsigned", "aaaa") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::4") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 1) - aaaa = res.answer[0] - assert "2001:db8::6" in str(aaaa[0]) - - # check that AAAA is included in additional section, qtype=MX, - # unsigned, over IPv6, recursive - msg = isctest.query.create("unsigned", "mx") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::4") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 1) - assert [a for a in res.additional if a.rdtype == rdatatype.AAAA] + check_filter_other_family("fd92:7065:b8e:ffff::1") + check_filter_other_family("fd92:7065:b8e:ffff::2") + check_filter_other_family("fd92:7065:b8e:ffff::3") + check_filter_other_family("fd92:7065:b8e:ffff::4") # These tests are against an authoritative server configured with: ## filter-aaaa-on-v6 yes; -## filter-aaaa { fd92:7065:b8e:ffff::1; }; @isctest.mark.with_ipv6 -def test_auth_filter_aaaa_on_v6(servers, templates): +def test_filter_aaaa_on_v6(servers, templates): if filter_family != "v6" or filter_type != "aaaa": reset_servers("v6", "aaaa", servers, templates) - # check that AAAA is returned when only AAAA record exists, signed - msg = isctest.query.create("aaaa-only.signed", "aaaa") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::1", - source="fd92:7065:b8e:ffff::1") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 2) - aaaa, _ = res.answer - assert "2001:db8::2" in str(aaaa[0]) + # ns1: auth, configured with: + ## filter-aaaa-on-v6 yes; + ## filter-aaaa { fd92:7065:b8e:ffff::1; }; + check_filter("fd92:7065:b8e:ffff::1", "fd92:7065:b8e:ffff::2", False, False) - # check that AAAA is returned when only AAAA record exists, unsigned - msg = isctest.query.create("aaaa-only.unsigned", "aaaa") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::1", - source="fd92:7065:b8e:ffff::1") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 1) - aaaa = res.answer[0] - assert "2001:db8::5" in str(aaaa[0]) + # ns4: auth, configured with: + ## filter-aaaa-on-v6 break-dnssec; + ## filter-aaaa { fd92:7065:b8e:ffff::4; }; + check_filter("fd92:7065:b8e:ffff::4", "fd92:7065:b8e:ffff::2", True, False) - # check that NODATA/NOERROR is returned when both AAAA and A exist, - # signed, DO=0 - msg = message.make_query("dual.signed", "aaaa") # sends DO=0 - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::1", - source="fd92:7065:b8e:ffff::1") - isctest.check.noerror(res) - isctest.check.empty_answer(res) + # ns2: recursive, configured with: + ## filter-aaaa-on-v6 yes; + ## filter-aaaa { fd92:7065:b8e:ffff::2; }; + check_filter("fd92:7065:b8e:ffff::2", "fd92:7065:b8e:ffff::1", False, True) - # check that NODATA/NOERROR is returned when both AAAA and A exist, - # unsigned, DO=0 - msg = message.make_query("dual.unsigned", "aaaa") # sends DO=0 - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::1", - source="fd92:7065:b8e:ffff::1") - isctest.check.noerror(res) - isctest.check.empty_answer(res) - - # check that AAAA is returned when both AAAA and A exist, signed, DO=1 - msg = isctest.query.create("dual.signed", "aaaa") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::1", - source="fd92:7065:b8e:ffff::1") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 2) - aaaa = res.answer[0] - assert "2001:db8::3" in str(aaaa[0]) - - # check that NODATA/NOERROR is returned when both AAAA and A exist, - # unsigned, DO=1 - msg = isctest.query.create("dual.unsigned", "aaaa") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::1", - source="fd92:7065:b8e:ffff::1") - isctest.check.noerror(res) - isctest.check.empty_answer(res) - - # check that AAAA is returned if both AAAA and A exist and the query - # source doesn't match the ACL - msg = isctest.query.create("dual.unsigned", "aaaa") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::1", - source="fd92:7065:b8e:ffff::2") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 1) - aaaa = res.answer[0] - assert "2001:db8::6" in str(aaaa[0]) - - # check that A (and not AAAA) is returned if both AAAA and A exist, - # signed, qtype=ANY, DO=0 - msg = message.make_query("dual.signed", "any") # sends DO=0 - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::1", - source="fd92:7065:b8e:ffff::1") - isctest.check.noerror(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.3" in r for r in records) - assert not any("2001:db8::3" in r for r in records) - - # check that both A and AAAA are returned if both AAAA and A exist, - # signed, qtype=ANY, DO=1 - msg = isctest.query.create("dual.signed", "any") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::1", - source="fd92:7065:b8e:ffff::1") - isctest.check.noerror(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.3" in r for r in records) - assert any("2001:db8::3" in r for r in records) - - # check that A (and not AAAA) is returned if both AAAA and A exist, - # unsigned, qtype=ANY, DO=0 - msg = message.make_query("dual.unsigned", "any") # sends DO=0 - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::1", - source="fd92:7065:b8e:ffff::1") - isctest.check.noerror(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.6" in r for r in records) - assert not any("2001:db8::6" in r for r in records) - - # check that A (and not AAAA) is returned if both AAAA and A exist, - # unsigned, qtype=ANY, DO=1 - msg = isctest.query.create("dual.unsigned", "any") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::1", - source="fd92:7065:b8e:ffff::1") - isctest.check.noerror(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.6" in r for r in records) - assert not any("2001:db8::6" in r for r in records) - - # check that both A and AAAA are returned if both AAAA and A exist, - # signed, qtype=ANY, query source does not match ACL - msg = isctest.query.create("dual.unsigned", "any") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::1", - source="fd92:7065:b8e:ffff::2") - isctest.check.noerror(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.6" in r for r in records) - assert any("2001:db8::6" in r for r in records) - - # check that AAAA is omitted from additional section, qtype=NS, unsigned - msg = isctest.query.create("unsigned", "ns") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::1", - source="fd92:7065:b8e:ffff::1") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.additional, 1) - assert not [a for a in res.additional if a.rdtype == rdatatype.AAAA] - - # check that AAAA is omitted from additional section, qtype=MX, unsigned - msg = isctest.query.create("unsigned", "mx") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::1", - source="fd92:7065:b8e:ffff::1") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.additional, 2) - assert not [a for a in res.additional if a.rdtype == rdatatype.AAAA] - - # check that AAAA is included in additional section, qtype=MX, signed - msg = isctest.query.create("signed", "mx") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::1", - source="fd92:7065:b8e:ffff::1") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 2) - assert [a for a in res.additional if a.rdtype == rdatatype.AAAA] + # ns3: recursive, configured with: + ## filter-aaaa-on-v6 break-dnssec; + ## filter-aaaa { fd92:7065:b8e:ffff::3; }; + check_filter("fd92:7065:b8e:ffff::3", "fd92:7065:b8e:ffff::1", True, True) -def test_auth_filter_aaaa_on_v6_via_v4(servers, templates): +def test_filter_aaaa_on_v6_via_v4(servers, templates): if filter_family != "v6" or filter_type != "aaaa": reset_servers("v6", "aaaa", servers, templates) - # check that AAAA is returned when both AAAA and A record exists, - # unsigned, over IPv6 - msg = isctest.query.create("dual.unsigned", "aaaa") - res = isctest.query.tcp(msg, "10.53.0.1") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 1) - aaaa = res.answer[0] - assert "2001:db8::6" in str(aaaa[0]) - - # check that AAAA is included in additional section, qtype=MX, - # unsigned, over IPv6 - msg = isctest.query.create("unsigned", "mx") - res = isctest.query.tcp(msg, "10.53.0.1") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 1) - assert [a for a in res.additional if a.rdtype == rdatatype.AAAA] - - -# These tests are against an authoritative server configured with: -## filter-aaaa-on-v6 break-dnssec; -## filter-aaaa { fd92:7065:b8e:ffff::4; }; -@isctest.mark.with_ipv6 -def test_auth_break_dnssec_filter_aaaa_on_v6(servers, templates): - if filter_family != "v6" or filter_type != "aaaa": - reset_servers("v6", "aaaa", servers, templates) - - # check that AAAA is returned when only AAAA record exists, signed, - # with break-dnssec - msg = isctest.query.create("aaaa-only.signed", "aaaa") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::4", - source="fd92:7065:b8e:ffff::4") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 2) - aaaa, _ = res.answer - assert "2001:db8::2" in str(aaaa[0]) - - # check that AAAA is returned when only AAAA record exists, unsigned, - # with break-dnssec - msg = isctest.query.create("aaaa-only.unsigned", "aaaa") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::4", - source="fd92:7065:b8e:ffff::4") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 1) - aaaa = res.answer[0] - assert "2001:db8::5" in str(aaaa[0]) - - # check that NODATA/NOERROR is returned when both AAAA and A exist, - # signed, DO=0, with break-dnssec - msg = message.make_query("dual.signed", "aaaa") # sends DO=0 - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::4", - source="fd92:7065:b8e:ffff::4") - isctest.check.noerror(res) - isctest.check.empty_answer(res) - - # check that NODATA/NOERROR is returned when both AAAA and A exist, - # unsigned, DO=0, with break-dnssec - msg = message.make_query("dual.unsigned", "aaaa") # sends DO=0 - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::4", - source="fd92:7065:b8e:ffff::4") - isctest.check.noerror(res) - isctest.check.empty_answer(res) - - # check that AAAA is returned when both AAAA and A exist, signed, DO=1, - # with break-dnssec - msg = isctest.query.create("dual.signed", "aaaa") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::4", - source="fd92:7065:b8e:ffff::4") - isctest.check.noerror(res) - isctest.check.empty_answer(res) - - # check that NODATA/NOERROR is returned when both AAAA and A exist, - # unsigned, DO=1, with break-dnssec - msg = isctest.query.create("dual.unsigned", "aaaa") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::4", - source="fd92:7065:b8e:ffff::4") - isctest.check.noerror(res) - isctest.check.empty_answer(res) - - # check that AAAA is returned if both AAAA and A exist and the query - # source doesn't match the ACL, with break-dnssec - msg = isctest.query.create("dual.unsigned", "aaaa") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::4", - source="fd92:7065:b8e:ffff::2") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 1) - aaaa = res.answer[0] - assert "2001:db8::6" in str(aaaa[0]) - - # check that A (and not AAAA) is returned if both AAAA and A exist, - # signed, qtype=ANY, DO=0, with break-dnssec - msg = message.make_query("dual.signed", "any") # sends DO=0 - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::4", - source="fd92:7065:b8e:ffff::4") - isctest.check.noerror(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.3" in r for r in records) - assert not any("2001:db8::3" in r for r in records) - - # check that A (and not AAAA) is returned if both AAAA and A exist, - # signed, qtype=ANY, DO=1, with break-dnssec - msg = isctest.query.create("dual.signed", "any") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::4", - source="fd92:7065:b8e:ffff::4") - isctest.check.noerror(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.3" in r for r in records) - assert not any("2001:db8::3" in r for r in records) - - # check that A (and not AAAA) is returned if both AAAA and A exist, - # unsigned, qtype=ANY, DO=0, with break-dnssec - msg = message.make_query("dual.unsigned", "any") # sends DO=0 - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::4", - source="fd92:7065:b8e:ffff::4") - isctest.check.noerror(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.6" in r for r in records) - assert not any("2001:db8::6" in r for r in records) - - # check that A (and not AAAA) is returned if both AAAA and A exist, - # unsigned, qtype=ANY, DO=1, with break-dnssec - msg = isctest.query.create("dual.unsigned", "any") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::4", - source="fd92:7065:b8e:ffff::4") - isctest.check.noerror(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.6" in r for r in records) - assert not any("2001:db8::6" in r for r in records) - - # check that both A and AAAA are returned if both AAAA and A exist, - # signed, qtype=ANY, query source does not match ACL, with break-dnssec - msg = isctest.query.create("dual.unsigned", "any") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::4", - source="fd92:7065:b8e:ffff::2") - isctest.check.noerror(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.6" in r for r in records) - assert any("2001:db8::6" in r for r in records) - - # check that AAAA is omitted from additional section, qtype=NS, - # unsigned, with break-dnssec - msg = isctest.query.create("unsigned", "ns") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::4", - source="fd92:7065:b8e:ffff::4") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.additional, 1) - assert not [a for a in res.additional if a.rdtype == rdatatype.AAAA] - - # check that AAAA is omitted from additional section, qtype=MX, - # unsigned, with break-dnssec - msg = isctest.query.create("unsigned", "mx") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::4", - source="fd92:7065:b8e:ffff::4") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.additional, 2) - assert not [a for a in res.additional if a.rdtype == rdatatype.AAAA] - - # check that AAAA is omitted from additional section, qtype=MX, - # signed, with break-dnssec - msg = isctest.query.create("signed", "mx") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::4", - source="fd92:7065:b8e:ffff::4") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 2) - assert not [a for a in res.additional if a.rdtype == rdatatype.AAAA] - - -def test_auth_break_dnssec_filter_aaaa_on_v6_via_v4(servers, templates): - if filter_family != "v6" or filter_type != "aaaa": - reset_servers("v6", "aaaa", servers, templates) - - # check that AAAA is returned when both AAAA and A record exists, - # unsigned, over IPv6, with break-dnssec - msg = isctest.query.create("dual.unsigned", "aaaa") - res = isctest.query.tcp(msg, "10.53.0.4") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 1) - aaaa = res.answer[0] - assert "2001:db8::6" in str(aaaa[0]) - - # check that AAAA is included in additional section, qtype=MX, - # unsigned, over IPv6, with break-dnssec - msg = isctest.query.create("unsigned", "mx") - res = isctest.query.tcp(msg, "10.53.0.4") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 1) - assert [a for a in res.additional if a.rdtype == rdatatype.AAAA] - - -# These tests are against a recursive server configured with: -## filter-aaaa-on-v6 yes; -## filter-aaaa { fd92:7065:b8e:ffff::2; }; -@isctest.mark.with_ipv6 -def test_recursive_filter_aaaa_on_v6(servers, templates): - if filter_family != "v6" or filter_type != "aaaa": - reset_servers("v6", "aaaa", servers, templates) - - # check that AAAA is returned when only AAAA record exists, signed, - # recursive - msg = isctest.query.create("aaaa-only.signed", "aaaa") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::2", - source="fd92:7065:b8e:ffff::2") - isctest.check.noerror(res) - isctest.check.adflag(res) - isctest.check.rr_count_eq(res.authority, 2) - aaaa, _ = res.answer - assert "2001:db8::2" in str(aaaa[0]) - - # check that AAAA is returned when only AAAA record exists, unsigned, - # recursive - msg = isctest.query.create("aaaa-only.unsigned", "aaaa") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::2", - source="fd92:7065:b8e:ffff::2") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 1) - aaaa = res.answer[0] - assert "2001:db8::5" in str(aaaa[0]) - - # check that NODATA/NOERROR is returned when both AAAA and A exist, - # signed, DO=0, recursive - msg = message.make_query("dual.signed", "aaaa") # sends DO=0 - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::2", - source="fd92:7065:b8e:ffff::2") - isctest.check.noerror(res) - isctest.check.noadflag(res) - isctest.check.empty_answer(res) - - # check that NODATA/NOERROR is returned when both AAAA and A exist, - # unsigned, DO=0, recursive - msg = message.make_query("dual.unsigned", "aaaa") # sends DO=0 - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::2", - source="fd92:7065:b8e:ffff::2") - isctest.check.noerror(res) - isctest.check.noadflag(res) - isctest.check.empty_answer(res) - - # check that AAAA is returned when both AAAA and A exist, signed, DO=1, - # recursive - msg = isctest.query.create("dual.signed", "aaaa") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::2", - source="fd92:7065:b8e:ffff::2") - isctest.check.noerror(res) - isctest.check.adflag(res) - isctest.check.rr_count_eq(res.authority, 2) - aaaa = res.answer[0] - assert "2001:db8::3" in str(aaaa[0]) - - # check that NODATA/NOERROR is returned when both AAAA and A exist, - # unsigned, DO=1, recursive - msg = isctest.query.create("dual.unsigned", "aaaa") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::2", - source="fd92:7065:b8e:ffff::2") - isctest.check.noadflag(res) - isctest.check.noerror(res) - isctest.check.empty_answer(res) - - # check that AAAA is returned if both AAAA and A exist and the query - # source doesn't match the ACL - msg = isctest.query.create("dual.unsigned", "aaaa") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::2", - source="fd92:7065:b8e:ffff::1") - isctest.check.noerror(res) - isctest.check.noadflag(res) - isctest.check.rr_count_eq(res.authority, 1) - aaaa = res.answer[0] - assert "2001:db8::6" in str(aaaa[0]) - - # check that A (and not AAAA) is returned if both AAAA and A exist, - # signed, qtype=ANY, DO=0, recursive - msg = message.make_query("dual.signed", "any") # sends DO=0 - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::2", - source="fd92:7065:b8e:ffff::2") - isctest.check.noerror(res) - isctest.check.noadflag(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.3" in r for r in records) - assert not any("2001:db8::3" in r for r in records) - - # check that both A and AAAA are returned if both AAAA and A exist, - # signed, qtype=ANY, DO=1, recursive - msg = isctest.query.create("dual.signed", "any") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::2", - source="fd92:7065:b8e:ffff::2") - isctest.check.noerror(res) - isctest.check.adflag(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.3" in r for r in records) - assert any("2001:db8::3" in r for r in records) - - # check that A (and not AAAA) is returned if both AAAA and A exist, - # unsigned, qtype=ANY, DO=0, recursive - msg = message.make_query("dual.unsigned", "any") # sends DO=0 - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::2", - source="fd92:7065:b8e:ffff::2") - isctest.check.noerror(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.6" in r for r in records) - assert not any("2001:db8::6" in r for r in records) - - # check that A (and not AAAA) is returned if both AAAA and A exist, - # unsigned, qtype=ANY, DO=1, recursive - msg = isctest.query.create("dual.unsigned", "any") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::2", - source="fd92:7065:b8e:ffff::2") - isctest.check.noerror(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.6" in r for r in records) - assert not any("2001:db8::6" in r for r in records) - - # check that both A and AAAA are returned if both AAAA and A exist, - # signed, qtype=ANY, query source does not match ACL, recursive - msg = isctest.query.create("dual.unsigned", "any") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::2", - source="fd92:7065:b8e:ffff::1") - isctest.check.noerror(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.6" in r for r in records) - assert any("2001:db8::6" in r for r in records) - - # check that AAAA is omitted from additional section, qtype=NS, - # unsigned, recursive - msg = isctest.query.create("unsigned", "ns") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::2", - source="fd92:7065:b8e:ffff::2") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.additional, 1) - assert not [a for a in res.additional if a.rdtype == rdatatype.AAAA] - - # check that AAAA is omitted from additional section, qtype=MX, - # unsigned, recursive - msg = isctest.query.create("unsigned", "mx") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::2", - source="fd92:7065:b8e:ffff::2") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.additional, 2) - assert not [a for a in res.additional if a.rdtype == rdatatype.AAAA] - - # (we need to prime the cache first with the MX addresses, since - # additional section data isn't included unless it's already validated.) - msg = isctest.query.create("mx.signed", "a") - isctest.query.tcp(msg, "fd92:7065:b8e:ffff::2") - msg = isctest.query.create("mx.signed", "aaaa") - isctest.query.tcp(msg, "fd92:7065:b8e:ffff::2") - - # check that AAAA is included in additional section, qtype=MX, signed, - # recursive - msg = isctest.query.create("signed", "mx") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::2", - source="fd92:7065:b8e:ffff::2") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 2) - assert [a for a in res.additional if a.rdtype == rdatatype.AAAA] - - -def test_recursive_filter_aaaa_on_v6_via_v4(servers, templates): - if filter_family != "v6" or filter_type != "aaaa": - reset_servers("v6", "aaaa", servers, templates) - - # check that AAAA is returned when both AAAA and A record exists, - # unsigned, over IPv6, recursive - msg = isctest.query.create("dual.unsigned", "aaaa") - res = isctest.query.tcp(msg, "10.53.0.4") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 1) - aaaa = res.answer[0] - assert "2001:db8::6" in str(aaaa[0]) - - # check that AAAA is included in additional section, qtype=MX, - # unsigned, over IPv6, recursive - msg = isctest.query.create("unsigned", "mx") - res = isctest.query.tcp(msg, "10.53.0.4") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 1) - assert [a for a in res.additional if a.rdtype == rdatatype.AAAA] - - -# These tests are against a recursive server configured with: -## filter-aaaa-on-v6 break-dnssec; -## filter-aaaa { fd92:7065:b8e:ffff::3; }; -@isctest.mark.with_ipv6 -def test_recursive_break_dnssec_filter_aaaa_on_v6(servers, templates): - if filter_family != "v6" or filter_type != "aaaa": - reset_servers("v6", "aaaa", servers, templates) - - # check that AAAA is returned when only AAAA record exists, signed, - # recursive, with break-dnssec - msg = isctest.query.create("aaaa-only.signed", "aaaa") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::3", - source="fd92:7065:b8e:ffff::3") - isctest.check.noerror(res) - isctest.check.adflag(res) - aaaa, _ = res.answer - assert "2001:db8::2" in str(aaaa[0]) - - # check that AAAA is returned when only AAAA record exists, unsigned, - # recursive, with break-dnssec - msg = isctest.query.create("aaaa-only.unsigned", "aaaa") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::3", - source="fd92:7065:b8e:ffff::3") - isctest.check.noerror(res) - aaaa = res.answer[0] - assert "2001:db8::5" in str(aaaa[0]) - - # check that NODATA/NOERROR is returned when both AAAA and A exist, - # signed, DO=0, recursive, with break-dnssec - msg = message.make_query("dual.signed", "aaaa") # sends DO=0 - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::3", - source="fd92:7065:b8e:ffff::3") - isctest.check.noerror(res) - isctest.check.noadflag(res) - isctest.check.empty_answer(res) - - # check that NODATA/NOERROR is returned when both AAAA and A exist, - # unsigned, DO=0, recursive, with break-dnssec - msg = message.make_query("dual.unsigned", "aaaa") # sends DO=0 - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::3", - source="fd92:7065:b8e:ffff::3") - isctest.check.noerror(res) - isctest.check.noadflag(res) - isctest.check.empty_answer(res) - - # check that NODATA/NOERROR is returned when both AAAA and A exist, - # unsigned, DO=1, recursive, with break-dnssec - msg = isctest.query.create("dual.signed", "aaaa") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::3", - source="fd92:7065:b8e:ffff::3") - isctest.check.noerror(res) - isctest.check.noadflag(res) - isctest.check.empty_answer(res) - - # check that NODATA/NOERROR is returned when both AAAA and A exist, - # unsigned, DO=1, recursive, with break-dnssec - msg = isctest.query.create("dual.unsigned", "aaaa") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::3", - source="fd92:7065:b8e:ffff::3") - isctest.check.noadflag(res) - isctest.check.noerror(res) - isctest.check.empty_answer(res) - - # check that AAAA is returned if both AAAA and A exist and the query - # source doesn't match the ACL, with break-dnssec - msg = isctest.query.create("dual.unsigned", "aaaa") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::3", - source="fd92:7065:b8e:ffff::1") - isctest.check.noerror(res) - isctest.check.noadflag(res) - isctest.check.rr_count_eq(res.authority, 1) - aaaa = res.answer[0] - assert "2001:db8::6" in str(aaaa[0]) - - # check that A (and not AAAA) is returned if both AAAA and A exist, - # signed, qtype=ANY, DO=0, recursive, with break-dnssec - msg = message.make_query("dual.signed", "any") # sends DO=0 - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::3", - source="fd92:7065:b8e:ffff::3") - isctest.check.noerror(res) - isctest.check.noadflag(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.3" in r for r in records) - assert not any("2001:db8::3" in r for r in records) - - # check that A (and not AAAA) is returned if both AAAA and A exist, - # signed, qtype=ANY, DO=0, recursive, with break-dnssec - msg = isctest.query.create("dual.signed", "any") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::3", - source="fd92:7065:b8e:ffff::3") - isctest.check.noerror(res) - isctest.check.noadflag(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.3" in r for r in records) - assert not any("2001:db8::3" in r for r in records) - - # check that A (and not AAAA) is returned if both AAAA and A exist, - # unsigned, qtype=ANY, DO=0, recursive, with break-dnssec - msg = message.make_query("dual.unsigned", "any") # sends DO=0 - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::3", - source="fd92:7065:b8e:ffff::3") - isctest.check.noerror(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.6" in r for r in records) - assert not any("2001:db8::6" in r for r in records) - - # check that A (and not AAAA) is returned if both AAAA and A exist, - # unsigned, qtype=ANY, DO=1, recursive, with break-dnssec - msg = isctest.query.create("dual.unsigned", "any") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::3", - source="fd92:7065:b8e:ffff::3") - isctest.check.noerror(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.6" in r for r in records) - assert not any("2001:db8::6" in r for r in records) - - # check that both A and AAAA are returned if both AAAA and A exist, - # signed, qtype=ANY, query source does not match ACL, recursive, with - # break-dnssec - msg = isctest.query.create("dual.unsigned", "any") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::3", - source="fd92:7065:b8e:ffff::1") - isctest.check.noerror(res) - records = sum([str(a).splitlines() for a in res.answer], []) - assert any("1.0.0.6" in r for r in records) - assert any("2001:db8::6" in r for r in records) - - # check that AAAA is omitted from additional section, qtype=NS, - # unsigned, recursive, with break-dnssec - msg = isctest.query.create("unsigned", "ns") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::3", - source="fd92:7065:b8e:ffff::3") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.additional, 1) - assert not [a for a in res.additional if a.rdtype == rdatatype.AAAA] - - # check that AAAA is omitted from additional section, qtype=MX, - # unsigned, recursive, with break-dnssec - msg = isctest.query.create("unsigned", "mx") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::3", - source="fd92:7065:b8e:ffff::3") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.additional, 2) - assert not [a for a in res.additional if a.rdtype == rdatatype.AAAA] - - # check that AAAA is omitted from additional section, qtype=MX, signed, - # recursive, with break-dnssec - msg = isctest.query.create("signed", "mx") - res = isctest.query.tcp(msg, "fd92:7065:b8e:ffff::3", - source="fd92:7065:b8e:ffff::3") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 2) - assert not [a for a in res.additional if a.rdtype == rdatatype.AAAA] - - -def test_recursive_break_dnssec_filter_aaaa_on_v6_via_v4(servers, templates): - if filter_family != "v6" or filter_type != "aaaa": - reset_servers("v6", "aaaa", servers, templates) - - # check that AAAA is returned when both AAAA and A record exists, - # unsigned, over IPv6, recursive - msg = isctest.query.create("dual.unsigned", "aaaa") - res = isctest.query.tcp(msg, "10.53.0.4") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 1) - aaaa = res.answer[0] - assert "2001:db8::6" in str(aaaa[0]) - - # check that AAAA is included in additional section, qtype=MX, - # unsigned, over IPv6, recursive - msg = isctest.query.create("unsigned", "mx") - res = isctest.query.tcp(msg, "10.53.0.4") - isctest.check.noerror(res) - isctest.check.rr_count_eq(res.authority, 1) - assert [a for a in res.additional if a.rdtype == rdatatype.AAAA] + check_filter_other_family("10.53.0.1") + check_filter_other_family("10.53.0.2") + check_filter_other_family("10.53.0.3") + check_filter_other_family("10.53.0.4")