netinet tests: ensure we send IGMPv2 messages if net.inet.igmp.default_version=2

Sponsored by:	Rubicon Communications, LLC ("Netgate")
Differential Revision:	https://reviews.freebsd.org/D50072
This commit is contained in:
Kristof Provost 2025-04-29 15:56:14 +02:00
parent d2c2d6d6b0
commit 720fabb36f

View file

@ -62,6 +62,25 @@ def check_igmpv3(args, pkt):
return True
def check_igmpv2(args, pkt):
pkt.show()
igmp = pkt.getlayer(sc.igmp.IGMP)
if igmp is None:
return False
if igmp.gaddr != args["group"]:
return False
if args["type"] == "join":
if igmp.type != 0x16:
return False
if args["type"] == "leave":
if igmp.type != 0x17:
return False
return True
class TestIGMP(VnetTestTemplate):
REQUIRED_MODULES = []
TOPOLOGY = {
@ -82,7 +101,7 @@ class TestIGMP(VnetTestTemplate):
@pytest.mark.require_progs(["scapy"])
def test_igmp3_join_leave(self):
"Test that we send the expected join/leave IGMPv2 messages"
"Test that we send the expected join/leave IGMPv3 messages"
if1 = self.vnet.iface_alias_map["if1"]
@ -107,3 +126,32 @@ class TestIGMP(VnetTestTemplate):
s.close()
sniffer.join()
assert(sniffer.correctPackets > 0)
@pytest.mark.require_progs(["scapy"])
def test_igmp2_join_leave(self):
"Test that we send the expected join/leave IGMPv2 messages"
ToolsHelper.print_output("/sbin/sysctl net.inet.igmp.default_version=2")
if1 = self.vnet.iface_alias_map["if1"]
# Start a background sniff
from sniffer import Sniffer
expected_pkt = { "type": "join", "group": "230.0.0.1" }
sniffer = Sniffer(expected_pkt, check_igmpv2, if1.name, timeout=10)
# Now join a multicast group, and see if we're getting the igmp packet we expect
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
mreq = struct.pack("4sl", socket.inet_aton('230.0.0.1'), socket.INADDR_ANY)
s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
# Wait for the sniffer to see the join packet
sniffer.join()
assert(sniffer.correctPackets > 0)
# Now leave, check for the packet
expected_pkt = { "type": "leave", "group": "230.0.0.1" }
sniffer = Sniffer(expected_pkt, check_igmpv2, if1.name)
s.close()
sniffer.join()
assert(sniffer.correctPackets > 0)