diff --git a/tests/atf_python/sys/net/tools.py b/tests/atf_python/sys/net/tools.py index 567d9d4b21a..44bd74d8578 100644 --- a/tests/atf_python/sys/net/tools.py +++ b/tests/atf_python/sys/net/tools.py @@ -1,6 +1,7 @@ #!/usr/local/bin/python3 import json import os +import subprocess class ToolsHelper(object): @@ -13,6 +14,26 @@ class ToolsHelper(object): print("run: '{}'".format(cmd)) return os.popen(cmd).read() + @classmethod + def pf_rules(cls, rules, verbose=True): + pf_conf = "" + for r in rules: + pf_conf = pf_conf + r + "\n" + + if verbose: + print("Set rules:") + print(pf_conf) + + ps = subprocess.Popen("/sbin/pfctl -g -f -", shell=True, + stdin=subprocess.PIPE) + ps.communicate(bytes(pf_conf, 'utf-8')) + ret = ps.wait() + if ret != 0: + raise Exception("Failed to set pf rules %d" % ret) + + if verbose: + cls.print_output("/sbin/pfctl -sr") + @classmethod def print_output(cls, cmd: str, verbose=True): if verbose: diff --git a/tests/sys/netpfil/pf/Makefile b/tests/sys/netpfil/pf/Makefile index 19c87486d8b..5c59584d2ec 100644 --- a/tests/sys/netpfil/pf/Makefile +++ b/tests/sys/netpfil/pf/Makefile @@ -40,6 +40,8 @@ ATF_TESTS_SH+= altq \ table \ tos +ATF_TESTS_PYTEST+= frag6.py + # Tests reuse jail names and so cannot run in parallel. TEST_METADATA+= is_exclusive=true diff --git a/tests/sys/netpfil/pf/frag6.py b/tests/sys/netpfil/pf/frag6.py new file mode 100644 index 00000000000..28b1829d418 --- /dev/null +++ b/tests/sys/netpfil/pf/frag6.py @@ -0,0 +1,60 @@ +import pytest +import logging +import threading +import time +logging.getLogger("scapy").setLevel(logging.CRITICAL) +from atf_python.sys.net.tools import ToolsHelper +from atf_python.sys.net.vnet import VnetTestTemplate + +class DelayedSend(threading.Thread): + def __init__(self, packet): + threading.Thread.__init__(self) + self._packet = packet + + self.start() + + def run(self): + import scapy.all as sp + time.sleep(1) + sp.send(self._packet) + +class TestFrag6(VnetTestTemplate): + REQUIRED_MODULES = ["pf"] + TOPOLOGY = { + "vnet1": {"ifaces": ["if1"]}, + "vnet2": {"ifaces": ["if1"]}, + "if1": {"prefixes6": [("2001:db8::1/64", "2001:db8::2/64")]}, + } + + def vnet2_handler(self, vnet): + ToolsHelper.print_output("/sbin/pfctl -e") + ToolsHelper.pf_rules([ + "scrub fragment reassemble", + "pass", + "block in inet6 proto icmp6 icmp6-type echoreq", + ]) + + def check_ping_reply(self, packet): + print(packet) + return False + + @pytest.mark.require_user("root") + def test_dup_frag_hdr(self): + "Test packets with duplicate fragment headers" + srv_vnet = self.vnet_map["vnet2"] + + # Import in the correct vnet, so at to not confuse Scapy + import scapy.all as sp + + packet = sp.IPv6(src="2001:db8::1", dst="2001:db8::2") \ + / sp.IPv6ExtHdrFragment(offset = 0, m = 0) \ + / sp.IPv6ExtHdrFragment(offset = 0, m = 0) \ + / sp.ICMPv6EchoRequest(data=sp.raw(bytes.fromhex('f00f') * 128)) + + # Delay the send so the sniffer is running when we transmit. + s = DelayedSend(packet) + + packets = sp.sniff(iface=self.vnet.iface_alias_map["if1"].name, + timeout=3) + for p in packets: + assert not p.getlayer(sp.ICMPv6EchoReply)