mirror of
https://github.com/opnsense/src.git
synced 2026-06-08 16:22:46 -04:00
testing: add ability to specify multi-vnet topologies in the pytest framework.
Notable amount of tests related to the packet IO require two VNET jails
for proper testing and avoiding side effects for the host system.
Additionally, it is often required to run actions in the jails seme-sequentially
- waiting for the listener initialisation can be an example of such
dependency.
This change extends pytest vnet framework to allow defining multi-vnet
multi-epair topologies in declarative style, without any need to bother
about jail or repair names. All jail creation/teardown, interface
creation/teardown and address assignments are handled automatically.
Example:
TOPOLOGY = {
"vnet1": {"ifaces": ["if1", "if2", "if3"]},
"vnet2": {"ifaces": ["if1", "if2", "if3"]},
"if1": {"prefixes6": [("2001:db8:a::1/64", "2001:db8:a::2/64")]},
"if2": {"prefixes6": [("2001:db8:b::1/64", "2001:db8:b::2/64")]},
"if3": {"prefixes6": [("2001:db8:c::1/64", "2001:db8:c::2/64")]},
}
def vnet2_handler(self, vnet, obj_map, pipe):
ss = VerboseSocketServer("::", self.DEFAULT_PORT)
pipe.send("READY")
def test_output6_base(self):
self.wait_object(second_vnet.pipe)
The definitions above will create 2 vnets ("jail_test_output6_base",
"jail_test_output6_base_2"), 3 epairs, attached to both first and
second jails, set up the IP addresses for each epair, spawn another
process for vnet2_handler and pass control to vnet2_handler and
test_output6_base. Both processes can pass objects between each
other using pre-created pipes.
Differential Revision: https://reviews.freebsd.org/D35708
This commit is contained in:
parent
92f692fd24
commit
cfc9cf9baf
2 changed files with 404 additions and 107 deletions
|
|
@ -12,6 +12,7 @@ from typing import Optional
|
|||
|
||||
class ToolsHelper(object):
|
||||
NETSTAT_PATH = "/usr/bin/netstat"
|
||||
IFCONFIG_PATH = "/sbin/ifconfig"
|
||||
|
||||
@classmethod
|
||||
def get_output(cls, cmd: str, verbose=False) -> str:
|
||||
|
|
@ -19,6 +20,23 @@ class ToolsHelper(object):
|
|||
print("run: '{}'".format(cmd))
|
||||
return os.popen(cmd).read()
|
||||
|
||||
@classmethod
|
||||
def print_output(cls, cmd: str, verbose=True):
|
||||
if verbose:
|
||||
print("======= {} =====".format(cmd))
|
||||
print(cls.get_output(cmd))
|
||||
if verbose:
|
||||
print()
|
||||
|
||||
@classmethod
|
||||
def print_net_debug(cls):
|
||||
cls.print_output("ifconfig")
|
||||
cls.print_output("netstat -rnW")
|
||||
|
||||
@classmethod
|
||||
def set_sysctl(cls, oid, val):
|
||||
cls.get_output("sysctl {}={}".format(oid, val))
|
||||
|
||||
@classmethod
|
||||
def get_routes(cls, family: str, fibnum: int = 0):
|
||||
family_key = {"inet": "-4", "inet6": "-6"}.get(family)
|
||||
|
|
@ -31,3 +49,25 @@ class ToolsHelper(object):
|
|||
return js[0]["rt-entry"]
|
||||
else:
|
||||
return []
|
||||
|
||||
@classmethod
|
||||
def get_linklocals(cls):
|
||||
ret = {}
|
||||
ifname = None
|
||||
ips = []
|
||||
for line in cls.get_output(cls.IFCONFIG_PATH).splitlines():
|
||||
if line[0].isalnum():
|
||||
if ifname:
|
||||
ret[ifname] = ips
|
||||
ips = []
|
||||
ifname = line.split(":")[0]
|
||||
else:
|
||||
words = line.split()
|
||||
if words[0] == "inet6" and words[1].startswith("fe80"):
|
||||
# inet6 fe80::1%lo0 prefixlen 64 scopeid 0x2
|
||||
ip = words[1].split("%")[0]
|
||||
scopeid = int(words[words.index("scopeid") + 1], 16)
|
||||
ips.append((ip, scopeid))
|
||||
if ifname:
|
||||
ret[ifname] = ips
|
||||
return ret
|
||||
|
|
|
|||
|
|
@ -1,30 +1,52 @@
|
|||
#!/usr/local/bin/python3
|
||||
import copy
|
||||
import ipaddress
|
||||
import os
|
||||
import socket
|
||||
import sys
|
||||
import time
|
||||
from ctypes import cdll
|
||||
from ctypes import get_errno
|
||||
from ctypes.util import find_library
|
||||
from multiprocessing import Pipe
|
||||
from multiprocessing import Process
|
||||
from typing import Dict
|
||||
from typing import List
|
||||
from typing import NamedTuple
|
||||
from typing import Optional
|
||||
|
||||
from atf_python.sys.net.tools import ToolsHelper
|
||||
|
||||
def run_cmd(cmd: str) -> str:
|
||||
|
||||
def run_cmd(cmd: str, verbose=True) -> str:
|
||||
print("run: '{}'".format(cmd))
|
||||
return os.popen(cmd).read()
|
||||
|
||||
|
||||
class VnetInterface(object):
|
||||
INTERFACES_FNAME = "created_interfaces.lst"
|
||||
def convert_test_name(test_name: str) -> str:
|
||||
"""Convert test name to a string that can be used in the file/jail names"""
|
||||
ret = ""
|
||||
for char in test_name:
|
||||
if char.isalnum() or char in ("_", "-"):
|
||||
ret += char
|
||||
elif char in ("["):
|
||||
ret += "_"
|
||||
return ret
|
||||
|
||||
|
||||
class VnetInterface(object):
|
||||
# defines from net/if_types.h
|
||||
IFT_LOOP = 0x18
|
||||
IFT_ETHER = 0x06
|
||||
|
||||
def __init__(self, iface_name: str):
|
||||
def __init__(self, iface_alias: str, iface_name: str):
|
||||
self.name = iface_name
|
||||
self.alias = iface_alias
|
||||
self.vnet_name = ""
|
||||
self.jailed = False
|
||||
self.addr_map: Dict[str, Dict] = {"inet6": {}, "inet": {}}
|
||||
self.prefixes4: List[List[str]] = []
|
||||
self.prefixes6: List[List[str]] = []
|
||||
if iface_name.startswith("lo"):
|
||||
self.iftype = self.IFT_LOOP
|
||||
else:
|
||||
|
|
@ -34,56 +56,67 @@ class VnetInterface(object):
|
|||
def ifindex(self):
|
||||
return socket.if_nametoindex(self.name)
|
||||
|
||||
@property
|
||||
def first_ipv6(self):
|
||||
d = self.addr_map["inet6"]
|
||||
return d[next(iter(d))]
|
||||
|
||||
@property
|
||||
def first_ipv4(self):
|
||||
d = self.addr_map["inet"]
|
||||
return d[next(iter(d))]
|
||||
|
||||
def set_vnet(self, vnet_name: str):
|
||||
self.vnet_name = vnet_name
|
||||
|
||||
def set_jailed(self, jailed: bool):
|
||||
self.jailed = jailed
|
||||
|
||||
def run_cmd(self, cmd):
|
||||
def run_cmd(
|
||||
self,
|
||||
cmd,
|
||||
verbose=False,
|
||||
):
|
||||
if self.vnet_name and not self.jailed:
|
||||
cmd = "jexec {} {}".format(self.vnet_name, cmd)
|
||||
run_cmd(cmd)
|
||||
|
||||
@staticmethod
|
||||
def file_append_line(line):
|
||||
with open(VnetInterface.INTERFACES_FNAME, "a") as f:
|
||||
f.write(line + "\n")
|
||||
return run_cmd(cmd, verbose)
|
||||
|
||||
@classmethod
|
||||
def create_iface(cls, iface_name: str):
|
||||
def setup_loopback(cls, vnet_name: str):
|
||||
lo = VnetInterface("", "lo0")
|
||||
lo.set_vnet(vnet_name)
|
||||
lo.turn_up()
|
||||
|
||||
@classmethod
|
||||
def create_iface(cls, alias_name: str, iface_name: str) -> List["VnetInterface"]:
|
||||
name = run_cmd("/sbin/ifconfig {} create".format(iface_name)).rstrip()
|
||||
if not name:
|
||||
raise Exception("Unable to create iface {}".format(iface_name))
|
||||
cls.file_append_line(name)
|
||||
ret = [cls(alias_name, name)]
|
||||
if name.startswith("epair"):
|
||||
cls.file_append_line(name[:-1] + "b")
|
||||
return cls(name)
|
||||
ret.append(cls(alias_name, name[:-1] + "b"))
|
||||
return ret
|
||||
|
||||
@staticmethod
|
||||
def cleanup_ifaces():
|
||||
try:
|
||||
with open(VnetInterface.INTERFACES_FNAME, "r") as f:
|
||||
for line in f:
|
||||
run_cmd("/sbin/ifconfig {} destroy".format(line.strip()))
|
||||
os.unlink(VnetInterface.INTERFACES_FNAME)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def setup_addr(self, addr: str):
|
||||
if ":" in addr:
|
||||
def setup_addr(self, _addr: str):
|
||||
addr = ipaddress.ip_interface(_addr)
|
||||
if addr.version == 6:
|
||||
family = "inet6"
|
||||
else:
|
||||
family = "inet"
|
||||
cmd = "/sbin/ifconfig {} {} {}".format(self.name, family, addr)
|
||||
self.run_cmd(cmd)
|
||||
self.addr_map[family][str(addr)] = addr
|
||||
|
||||
def delete_addr(self, addr: str):
|
||||
if ":" in addr:
|
||||
def delete_addr(self, _addr: str):
|
||||
addr = ipaddress.ip_address(_addr)
|
||||
if addr.version == 6:
|
||||
family = "inet6"
|
||||
cmd = "/sbin/ifconfig {} inet6 {} delete".format(self.name, addr)
|
||||
else:
|
||||
family = "inet"
|
||||
cmd = "/sbin/ifconfig {} -alias {}".format(self.name, addr)
|
||||
self.run_cmd(cmd)
|
||||
del self.addr_map[family][str(addr)]
|
||||
|
||||
def turn_up(self):
|
||||
cmd = "/sbin/ifconfig {} up".format(self.name)
|
||||
|
|
@ -93,67 +126,77 @@ class VnetInterface(object):
|
|||
cmd = "/usr/sbin/ndp -i {} -disabled".format(self.name)
|
||||
self.run_cmd(cmd)
|
||||
|
||||
|
||||
class VnetInstance(object):
|
||||
JAILS_FNAME = "created_jails.lst"
|
||||
|
||||
def __init__(self, vnet_name: str, jid: int, ifaces: List[VnetInterface]):
|
||||
self.name = vnet_name
|
||||
self.jid = jid
|
||||
self.ifaces = ifaces
|
||||
for iface in ifaces:
|
||||
iface.set_vnet(vnet_name)
|
||||
iface.set_jailed(True)
|
||||
|
||||
def run_vnet_cmd(self, cmd):
|
||||
if self.vnet_name:
|
||||
cmd = "jexec {} {}".format(self.vnet_name, cmd)
|
||||
return run_cmd(cmd)
|
||||
|
||||
@staticmethod
|
||||
def wait_interface(vnet_name: str, iface_name: str):
|
||||
cmd = "jexec {} /sbin/ifconfig -l".format(vnet_name)
|
||||
for i in range(50):
|
||||
ifaces = run_cmd(cmd).strip().split(" ")
|
||||
if iface_name in ifaces:
|
||||
def has_tentative(self) -> bool:
|
||||
"""True if an interface has some addresses in tenative state"""
|
||||
cmd = "/sbin/ifconfig {} inet6".format(self.name)
|
||||
out = self.run_cmd(cmd, verbose=False)
|
||||
for line in out.splitlines():
|
||||
if "tentative" in line:
|
||||
return True
|
||||
time.sleep(0.1)
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def file_append_line(line):
|
||||
with open(VnetInstance.JAILS_FNAME, "a") as f:
|
||||
f.write(line + "\n")
|
||||
|
||||
@staticmethod
|
||||
def cleanup_vnets():
|
||||
class IfaceFactory(object):
|
||||
INTERFACES_FNAME = "created_ifaces.lst"
|
||||
|
||||
def __init__(self, test_name: str):
|
||||
self.test_name = test_name
|
||||
test_id = convert_test_name(test_name)
|
||||
self.file_name = self.INTERFACES_FNAME
|
||||
|
||||
def _register_iface(self, iface_name: str):
|
||||
with open(self.file_name, "a") as f:
|
||||
f.write(iface_name + "\n")
|
||||
|
||||
def create_iface(self, alias_name: str, iface_name: str) -> List[VnetInterface]:
|
||||
ifaces = VnetInterface.create_iface(alias_name, iface_name)
|
||||
for iface in ifaces:
|
||||
self._register_iface(iface.name)
|
||||
return ifaces
|
||||
|
||||
def cleanup(self):
|
||||
try:
|
||||
with open(VnetInstance.JAILS_FNAME) as f:
|
||||
with open(self.file_name, "r") as f:
|
||||
for line in f:
|
||||
run_cmd("/usr/sbin/jail -r {}".format(line.strip()))
|
||||
os.unlink(VnetInstance.JAILS_FNAME)
|
||||
run_cmd("/sbin/ifconfig {} destroy".format(line.strip()))
|
||||
os.unlink(self.INTERFACES_FNAME)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def create_with_interfaces(cls, vnet_name: str, ifaces: List[VnetInterface]):
|
||||
iface_cmds = " ".join(["vnet.interface={}".format(i.name) for i in ifaces])
|
||||
cmd = "/usr/sbin/jail -i -c name={} persist vnet {}".format(
|
||||
vnet_name, iface_cmds
|
||||
)
|
||||
jid_str = run_cmd(cmd)
|
||||
jid = int(jid_str)
|
||||
if jid <= 0:
|
||||
raise Exception("Jail creation failed, output: {}".format(jid))
|
||||
cls.file_append_line(vnet_name)
|
||||
|
||||
class VnetInstance(object):
|
||||
def __init__(
|
||||
self, vnet_alias: str, vnet_name: str, jid: int, ifaces: List[VnetInterface]
|
||||
):
|
||||
self.name = vnet_name
|
||||
self.alias = vnet_alias # reference in the test topology
|
||||
self.jid = jid
|
||||
self.ifaces = ifaces
|
||||
self.iface_alias_map = {} # iface.alias: iface
|
||||
self.iface_map = {} # iface.name: iface
|
||||
for iface in ifaces:
|
||||
if cls.wait_interface(vnet_name, iface.name):
|
||||
continue
|
||||
raise Exception(
|
||||
"Interface {} has not appeared in vnet {}".format(iface.name, vnet_name)
|
||||
)
|
||||
return cls(vnet_name, jid, ifaces)
|
||||
iface.set_vnet(vnet_name)
|
||||
iface.set_jailed(True)
|
||||
self.iface_alias_map[iface.alias] = iface
|
||||
self.iface_map[iface.name] = iface
|
||||
self.need_dad = False # Disable duplicate address detection by default
|
||||
self.attached = False
|
||||
self.pipe = None
|
||||
self.subprocess = None
|
||||
|
||||
def run_vnet_cmd(self, cmd):
|
||||
if not self.attached:
|
||||
cmd = "jexec {} {}".format(self.name, cmd)
|
||||
return run_cmd(cmd)
|
||||
|
||||
def disable_dad(self):
|
||||
self.run_vnet_cmd("/sbin/sysctl net.inet6.ip6.dad_count=0")
|
||||
|
||||
def set_pipe(self, pipe):
|
||||
self.pipe = pipe
|
||||
|
||||
def set_subprocess(self, p):
|
||||
self.subprocess = p
|
||||
|
||||
@staticmethod
|
||||
def attach_jid(jid: int):
|
||||
|
|
@ -167,37 +210,251 @@ class VnetInstance(object):
|
|||
|
||||
def attach(self):
|
||||
self.attach_jid(self.jid)
|
||||
self.attached = True
|
||||
|
||||
|
||||
class SingleVnetTestTemplate(object):
|
||||
num_epairs = 1
|
||||
class VnetFactory(object):
|
||||
JAILS_FNAME = "created_jails.lst"
|
||||
|
||||
def __init__(self, test_name: str):
|
||||
self.test_name = test_name
|
||||
self.test_id = convert_test_name(test_name)
|
||||
self.file_name = self.JAILS_FNAME
|
||||
self._vnets: List[str] = []
|
||||
|
||||
def _register_vnet(self, vnet_name: str):
|
||||
self._vnets.append(vnet_name)
|
||||
with open(self.file_name, "a") as f:
|
||||
f.write(vnet_name + "\n")
|
||||
|
||||
@staticmethod
|
||||
def _wait_interfaces(vnet_name: str, ifaces: List[str]) -> List[str]:
|
||||
cmd = "jexec {} /sbin/ifconfig -l".format(vnet_name)
|
||||
not_matched: List[str] = []
|
||||
for i in range(50):
|
||||
vnet_ifaces = run_cmd(cmd).strip().split(" ")
|
||||
not_matched = []
|
||||
for iface_name in ifaces:
|
||||
if iface_name not in vnet_ifaces:
|
||||
not_matched.append(iface_name)
|
||||
if len(not_matched) == 0:
|
||||
return []
|
||||
time.sleep(0.1)
|
||||
return not_matched
|
||||
|
||||
def create_vnet(self, vnet_alias: str, ifaces: List[VnetInterface]):
|
||||
vnet_name = "jail_{}".format(self.test_id)
|
||||
if self._vnets:
|
||||
# add number to distinguish jails
|
||||
vnet_name = "{}_{}".format(vnet_name, len(self._vnets) + 1)
|
||||
iface_cmds = " ".join(["vnet.interface={}".format(i.name) for i in ifaces])
|
||||
cmd = "/usr/sbin/jail -i -c name={} persist vnet {}".format(
|
||||
vnet_name, iface_cmds
|
||||
)
|
||||
jid_str = run_cmd(cmd)
|
||||
jid = int(jid_str)
|
||||
if jid <= 0:
|
||||
raise Exception("Jail creation failed, output: {}".format(jid))
|
||||
self._register_vnet(vnet_name)
|
||||
|
||||
# Run expedited version of routing
|
||||
VnetInterface.setup_loopback(vnet_name)
|
||||
|
||||
not_found = self._wait_interfaces(vnet_name, [i.name for i in ifaces])
|
||||
if not_found:
|
||||
raise Exception(
|
||||
"Interfaces {} has not appeared in vnet {}".format(not_found, vnet_name)
|
||||
)
|
||||
return VnetInstance(vnet_alias, vnet_name, jid, ifaces)
|
||||
|
||||
def cleanup(self):
|
||||
try:
|
||||
with open(self.file_name) as f:
|
||||
for line in f:
|
||||
jail_name = line.strip()
|
||||
ToolsHelper.print_output(
|
||||
"/usr/sbin/jexec {} ifconfig -l".format(jail_name)
|
||||
)
|
||||
run_cmd("/usr/sbin/jail -r {}".format(line.strip()))
|
||||
os.unlink(self.JAILS_FNAME)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
class SingleInterfaceMap(NamedTuple):
|
||||
ifaces: List[VnetInterface]
|
||||
vnet_aliases: List[str]
|
||||
|
||||
|
||||
class VnetTestTemplate(object):
|
||||
TOPOLOGY = {}
|
||||
|
||||
def _get_vnet_handler(self, vnet_alias: str):
|
||||
handler_name = "{}_handler".format(vnet_alias)
|
||||
return getattr(self, handler_name, None)
|
||||
|
||||
def _setup_vnet(self, vnet: VnetInstance, obj_map: Dict, pipe):
|
||||
"""Base Handler to setup given VNET.
|
||||
Can be run in a subprocess. If so, passes control to the special
|
||||
vnetX_handler() after setting up interface addresses
|
||||
"""
|
||||
vnet.attach()
|
||||
print("# setup_vnet({})".format(vnet.name))
|
||||
|
||||
topo = obj_map["topo_map"]
|
||||
ipv6_ifaces = []
|
||||
# Disable DAD
|
||||
if not vnet.need_dad:
|
||||
vnet.disable_dad()
|
||||
for iface in vnet.ifaces:
|
||||
# check index of vnet within an interface
|
||||
# as we have prefixes for both ends of the interface
|
||||
iface_map = obj_map["iface_map"][iface.alias]
|
||||
idx = iface_map.vnet_aliases.index(vnet.alias)
|
||||
prefixes6 = topo[iface.alias].get("prefixes6", [])
|
||||
prefixes4 = topo[iface.alias].get("prefixes4", [])
|
||||
if prefixes6 or prefixes4:
|
||||
ipv6_ifaces.append(iface)
|
||||
iface.turn_up()
|
||||
if prefixes6:
|
||||
iface.enable_ipv6()
|
||||
for prefix in prefixes6 + prefixes4:
|
||||
iface.setup_addr(prefix[idx])
|
||||
for iface in ipv6_ifaces:
|
||||
while iface.has_tentative():
|
||||
time.sleep(0.1)
|
||||
|
||||
# Run actual handler
|
||||
handler = self._get_vnet_handler(vnet.alias)
|
||||
if handler:
|
||||
# Do unbuffered stdout for children
|
||||
# so the logs are present if the child hangs
|
||||
sys.stdout.reconfigure(line_buffering=True)
|
||||
handler(vnet, obj_map, pipe)
|
||||
|
||||
def setup_topology(self, topo: Dict, test_name: str):
|
||||
"""Creates jails & interfaces for the provided topology"""
|
||||
iface_map: Dict[str, SingleInterfaceMap] = {}
|
||||
vnet_map = {}
|
||||
iface_factory = IfaceFactory(test_name)
|
||||
vnet_factory = VnetFactory(test_name)
|
||||
for obj_name, obj_data in topo.items():
|
||||
if obj_name.startswith("if"):
|
||||
epair_ifaces = iface_factory.create_iface(obj_name, "epair")
|
||||
smap = SingleInterfaceMap(epair_ifaces, [])
|
||||
iface_map[obj_name] = smap
|
||||
for obj_name, obj_data in topo.items():
|
||||
if obj_name.startswith("vnet"):
|
||||
vnet_ifaces = []
|
||||
for iface_alias in obj_data["ifaces"]:
|
||||
# epair creates 2 interfaces, grab first _available_
|
||||
# and map it to the VNET being created
|
||||
idx = len(iface_map[iface_alias].vnet_aliases)
|
||||
iface_map[iface_alias].vnet_aliases.append(obj_name)
|
||||
vnet_ifaces.append(iface_map[iface_alias].ifaces[idx])
|
||||
vnet = vnet_factory.create_vnet(obj_name, vnet_ifaces)
|
||||
vnet_map[obj_name] = vnet
|
||||
# Debug output
|
||||
print("============= TEST TOPOLOGY =============")
|
||||
for vnet_alias, vnet in vnet_map.items():
|
||||
print("# vnet {} -> {}".format(vnet.alias, vnet.name), end="")
|
||||
handler = self._get_vnet_handler(vnet.alias)
|
||||
if handler:
|
||||
print(" handler: {}".format(handler.__name__), end="")
|
||||
print()
|
||||
for iface_alias, iface_data in iface_map.items():
|
||||
vnets = iface_data.vnet_aliases
|
||||
ifaces: List[VnetInterface] = iface_data.ifaces
|
||||
if len(vnets) == 1 and len(ifaces) == 2:
|
||||
print(
|
||||
"# iface {}: {}::{} -> main::{}".format(
|
||||
iface_alias, vnets[0], ifaces[0].name, ifaces[1].name
|
||||
)
|
||||
)
|
||||
elif len(vnets) == 2 and len(ifaces) == 2:
|
||||
print(
|
||||
"# iface {}: {}::{} -> {}::{}".format(
|
||||
iface_alias, vnets[0], ifaces[0].name, vnets[1], ifaces[1].name
|
||||
)
|
||||
)
|
||||
else:
|
||||
print(
|
||||
"# iface {}: ifaces: {} vnets: {}".format(
|
||||
iface_alias, vnets, [i.name for i in ifaces]
|
||||
)
|
||||
)
|
||||
print()
|
||||
return {"iface_map": iface_map, "vnet_map": vnet_map, "topo_map": topo}
|
||||
|
||||
def setup_method(self, method):
|
||||
"""Sets up all the required topology and handlers for the given test"""
|
||||
# 'test_ip6_output.py::TestIP6Output::test_output6_pktinfo[ipandif] (setup)'
|
||||
test_id = os.environ.get("PYTEST_CURRENT_TEST").split(" ")[0]
|
||||
test_name = test_id.split("::")[-1]
|
||||
topology = self.TOPOLOGY
|
||||
# First, setup kernel objects - interfaces & vnets
|
||||
obj_map = self.setup_topology(topology, test_name)
|
||||
main_vnet = None # one without subprocess handler
|
||||
for vnet_alias, vnet in obj_map["vnet_map"].items():
|
||||
if self._get_vnet_handler(vnet_alias):
|
||||
# Need subprocess to run
|
||||
parent_pipe, child_pipe = Pipe()
|
||||
p = Process(
|
||||
target=self._setup_vnet,
|
||||
args=(
|
||||
vnet,
|
||||
obj_map,
|
||||
child_pipe,
|
||||
),
|
||||
)
|
||||
vnet.set_pipe(parent_pipe)
|
||||
vnet.set_subprocess(p)
|
||||
p.start()
|
||||
else:
|
||||
if main_vnet is not None:
|
||||
raise Exception("there can be only 1 VNET w/o handler")
|
||||
main_vnet = vnet
|
||||
# Main vnet needs to be the last, so all the other subprocesses
|
||||
# are started & their pipe handles collected
|
||||
self.vnet = main_vnet
|
||||
self._setup_vnet(main_vnet, obj_map, None)
|
||||
# Save state for the main handler
|
||||
self.iface_map = obj_map["iface_map"]
|
||||
self.vnet_map = obj_map["vnet_map"]
|
||||
|
||||
def cleanup(self, test_id: str):
|
||||
# pytest test id: file::class::test_name
|
||||
test_name = test_id.split("::")[-1]
|
||||
|
||||
print("==== vnet cleanup ===")
|
||||
print("# test_name: '{}'".format(test_name))
|
||||
VnetFactory(test_name).cleanup()
|
||||
IfaceFactory(test_name).cleanup()
|
||||
|
||||
def wait_object(self, pipe, timeout=5):
|
||||
if pipe.poll(timeout):
|
||||
return pipe.recv()
|
||||
raise TimeoutError
|
||||
|
||||
@property
|
||||
def curvnet(self):
|
||||
pass
|
||||
|
||||
|
||||
class SingleVnetTestTemplate(VnetTestTemplate):
|
||||
IPV6_PREFIXES: List[str] = []
|
||||
IPV4_PREFIXES: List[str] = []
|
||||
|
||||
def setup_method(self, method):
|
||||
test_name = method.__name__
|
||||
vnet_name = "jail_{}".format(test_name)
|
||||
ifaces = []
|
||||
for i in range(self.num_epairs):
|
||||
ifaces.append(VnetInterface.create_iface("epair"))
|
||||
self.vnet = VnetInstance.create_with_interfaces(vnet_name, ifaces)
|
||||
self.vnet.attach()
|
||||
for i, addr in enumerate(self.IPV6_PREFIXES):
|
||||
if addr:
|
||||
iface = self.vnet.ifaces[i]
|
||||
iface.turn_up()
|
||||
iface.enable_ipv6()
|
||||
iface.setup_addr(addr)
|
||||
for i, addr in enumerate(self.IPV4_PREFIXES):
|
||||
if addr:
|
||||
iface = self.vnet.ifaces[i]
|
||||
iface.turn_up()
|
||||
iface.setup_addr(addr)
|
||||
|
||||
def cleanup(self, nodeid: str):
|
||||
print("==== vnet cleanup ===")
|
||||
VnetInstance.cleanup_vnets()
|
||||
VnetInterface.cleanup_ifaces()
|
||||
|
||||
def run_cmd(self, cmd: str) -> str:
|
||||
return os.popen(cmd).read()
|
||||
topology = copy.deepcopy(
|
||||
{
|
||||
"vnet1": {"ifaces": ["if1"]},
|
||||
"if1": {"prefixes4": [], "prefixes6": []},
|
||||
}
|
||||
)
|
||||
for prefix in self.IPV6_PREFIXES:
|
||||
topology["if1"]["prefixes6"].append((prefix,))
|
||||
for prefix in self.IPV4_PREFIXES:
|
||||
topology["if1"]["prefixes4"].append((prefix,))
|
||||
self.TOPOLOGY = topology
|
||||
super().setup_method(method)
|
||||
|
|
|
|||
Loading…
Reference in a new issue