kube-proxy: merge NodePodCIDRHandler with NodeManager

NodeManager, if configured with to watch for PodCIDR watch, watches
for changes in PodCIDRs and crashes kube-proxy if a change is
detected in PodCIDRs.

Signed-off-by: Daman Arora <aroradaman@gmail.com>
Co-authored-by: Dan Winship <danwinship@redhat.com>
This commit is contained in:
Daman Arora 2025-05-11 20:49:38 +05:30 committed by Dan Winship
parent 373fb487f6
commit d4892fef76
5 changed files with 198 additions and 415 deletions

View file

@ -208,7 +208,8 @@ func newProxyServer(ctx context.Context, config *kubeproxyconfig.KubeProxyConfig
}
// NodeManager makes an informer that selects for the node where this kube-proxy is running
s.NodeManager, err = proxy.NewNodeManager(ctx, s.Client, s.Config.ConfigSyncPeriod.Duration, s.NodeName)
s.NodeManager, err = proxy.NewNodeManager(ctx, s.Client, s.Config.ConfigSyncPeriod.Duration,
s.NodeName, s.Config.DetectLocalMode == kubeproxyconfig.LocalModeNodeCIDR)
if err != nil {
return nil, err
}
@ -218,6 +219,7 @@ func newProxyServer(ctx context.Context, config *kubeproxyconfig.KubeProxyConfig
logger.Info("Successfully retrieved NodeIPs", "NodeIPs", rawNodeIPs)
}
s.PrimaryIPFamily, s.NodeIPs = detectNodeIPs(ctx, rawNodeIPs, config.BindAddress)
s.podCIDRs = s.NodeManager.PodCIDRs()
if len(config.NodePortAddresses) == 1 && config.NodePortAddresses[0] == kubeproxyconfig.NodePortAddressesPrimary {
var nodePortAddresses []string
@ -606,10 +608,6 @@ func (s *ProxyServer) Run(ctx context.Context) error {
// hollow-proxy doesn't need node config, and we don't create nodeManager for hollow-proxy.
if s.NodeManager != nil {
nodeConfig := config.NewNodeConfig(ctx, s.NodeManager.NodeInformer(), s.Config.ConfigSyncPeriod.Duration)
// https://issues.k8s.io/111321
if s.Config.DetectLocalMode == kubeproxyconfig.LocalModeNodeCIDR {
nodeConfig.RegisterEventHandler(proxy.NewNodePodCIDRHandler(ctx, s.podCIDRs))
}
nodeConfig.RegisterEventHandler(&proxy.NodeEligibleHandler{
HealthServer: s.HealthzServer,
})

View file

@ -33,13 +33,6 @@ import (
"github.com/google/cadvisor/utils/sysfs"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
toolswatch "k8s.io/client-go/tools/watch"
utilsysctl "k8s.io/component-helpers/node/util/sysctl"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/proxy"
@ -53,10 +46,6 @@ import (
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
)
// timeoutForNodePodCIDR is the time to wait for allocators to assign a PodCIDR to the
// node after it is registered.
var timeoutForNodePodCIDR = 5 * time.Minute
// platformApplyDefaults is called after parsing command-line flags and/or reading the
// config file, to apply platform-specific default values to config.
func (o *Options) platformApplyDefaults(config *proxyconfigapi.KubeProxyConfiguration) {
@ -80,17 +69,6 @@ func (o *Options) platformApplyDefaults(config *proxyconfigapi.KubeProxyConfigur
// Proxier. It should fill in any platform-specific fields and perform other
// platform-specific setup.
func (s *ProxyServer) platformSetup(ctx context.Context) error {
logger := klog.FromContext(ctx)
if s.Config.DetectLocalMode == proxyconfigapi.LocalModeNodeCIDR {
logger.Info("Watching for node, awaiting podCIDR allocation", "node", s.NodeName)
node, err := waitForPodCIDR(ctx, s.Client, s.NodeName)
if err != nil {
return err
}
s.podCIDRs = node.Spec.PodCIDRs
logger.Info("NodeInfo", "podCIDRs", node.Spec.PodCIDRs)
}
ct := &realConntracker{}
err := s.setupConntrack(ctx, ct)
if err != nil {
@ -391,50 +369,6 @@ func getConntrackMax(ctx context.Context, config proxyconfigapi.KubeProxyConntra
return 0, nil
}
func waitForPodCIDR(ctx context.Context, client clientset.Interface, nodeName string) (*v1.Node, error) {
// since allocators can assign the podCIDR after the node registers, we do a watch here to wait
// for podCIDR to be assigned, instead of assuming that the Get() on startup will have it.
ctx, cancelFunc := context.WithTimeout(ctx, timeoutForNodePodCIDR)
defer cancelFunc()
fieldSelector := fields.OneTermEqualSelector("metadata.name", nodeName).String()
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (object runtime.Object, e error) {
options.FieldSelector = fieldSelector
return client.CoreV1().Nodes().List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) {
options.FieldSelector = fieldSelector
return client.CoreV1().Nodes().Watch(ctx, options)
},
}
condition := func(event watch.Event) (bool, error) {
// don't process delete events
if event.Type != watch.Modified && event.Type != watch.Added {
return false, nil
}
n, ok := event.Object.(*v1.Node)
if !ok {
return false, fmt.Errorf("event object not of type Node")
}
// don't consider the node if is going to be deleted and keep waiting
if !n.DeletionTimestamp.IsZero() {
return false, nil
}
return n.Spec.PodCIDR != "" && len(n.Spec.PodCIDRs) > 0, nil
}
evt, err := toolswatch.UntilWithSync(ctx, lw, &v1.Node{}, nil, condition)
if err != nil {
return nil, fmt.Errorf("timeout waiting for PodCIDR allocation to configure detect-local-mode %v: %v", proxyconfigapi.LocalModeNodeCIDR, err)
}
if n, ok := evt.Object.(*v1.Node); ok {
return n, nil
}
return nil, fmt.Errorf("event object not of type node")
}
func detectNumCPU() int {
// try get numCPU from /sys firstly due to a known issue (https://github.com/kubernetes/kubernetes/issues/99225)
_, numCPU, err := machine.GetTopology(sysfs.NewRealSysFs())

View file

@ -23,7 +23,6 @@ import (
"context"
"errors"
"fmt"
"net"
"os"
"path/filepath"
"reflect"
@ -36,14 +35,9 @@ import (
"github.com/spf13/pflag"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
clientsetfake "k8s.io/client-go/kubernetes/fake"
clientgotesting "k8s.io/client-go/testing"
proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config"
proxyutil "k8s.io/kubernetes/pkg/proxy/util"
"k8s.io/kubernetes/test/utils/ktesting"
netutils "k8s.io/utils/net"
"k8s.io/utils/ptr"
)
@ -445,18 +439,6 @@ func Test_getLocalDetectors(t *testing.T) {
}
}
func makeNodeWithPodCIDRs(cidrs ...string) *v1.Node {
if len(cidrs) == 0 {
return &v1.Node{}
}
return &v1.Node{
Spec: v1.NodeSpec{
PodCIDR: cidrs[0],
PodCIDRs: cidrs,
},
}
}
func TestConfigChange(t *testing.T) {
setUp := func() (*os.File, string, error) {
tempDir, err := os.MkdirTemp("", "kubeproxy-config-change")
@ -574,56 +556,6 @@ detectLocalMode: "BridgeInterface"`)
}
}
func Test_waitForPodCIDR(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
expected := []string{"192.168.0.0/24", "fd00:1:2::/64"}
nodeName := "test-node"
oldNode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: nodeName,
ResourceVersion: "1000",
},
Spec: v1.NodeSpec{
PodCIDR: "10.0.0.0/24",
PodCIDRs: []string{"10.0.0.0/24", "2001:db2:1/64"},
},
}
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: nodeName,
ResourceVersion: "1",
},
}
updatedNode := node.DeepCopy()
updatedNode.Spec.PodCIDRs = expected
updatedNode.Spec.PodCIDR = expected[0]
// start with the new node
client := clientsetfake.NewSimpleClientset()
client.AddReactor("list", "nodes", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) {
obj := &v1.NodeList{}
return true, obj, nil
})
fakeWatch := watch.NewFake()
client.PrependWatchReactor("nodes", clientgotesting.DefaultWatchReactor(fakeWatch, nil))
go func() {
fakeWatch.Add(node)
// receive a delete event for the old node
fakeWatch.Delete(oldNode)
// set the PodCIDRs on the new node
fakeWatch.Modify(updatedNode)
}()
got, err := waitForPodCIDR(ctx, client, node.Name)
if err != nil {
t.Errorf("waitForPodCIDR() unexpected error %v", err)
return
}
if !reflect.DeepEqual(got.Spec.PodCIDRs, expected) {
t.Errorf("waitForPodCIDR() got %v expected to be %v ", got.Spec.PodCIDRs, expected)
}
}
func TestGetConntrackMax(t *testing.T) {
ncores := goruntime.NumCPU()
testCases := []struct {
@ -671,57 +603,6 @@ func TestGetConntrackMax(t *testing.T) {
}
}
func TestProxyServer_platformSetup(t *testing.T) {
tests := []struct {
name string
node *v1.Node
config *proxyconfigapi.KubeProxyConfiguration
wantPodCIDRs []string
}{
{
name: "LocalModeNodeCIDR store the node PodCIDRs obtained",
node: makeNodeWithPodCIDRs("10.0.0.0/24"),
config: &proxyconfigapi.KubeProxyConfiguration{DetectLocalMode: proxyconfigapi.LocalModeNodeCIDR},
wantPodCIDRs: []string{"10.0.0.0/24"},
},
{
name: "LocalModeNodeCIDR store the node PodCIDRs obtained dual stack",
node: makeNodeWithPodCIDRs("10.0.0.0/24", "2001:db2:1/64"),
config: &proxyconfigapi.KubeProxyConfiguration{DetectLocalMode: proxyconfigapi.LocalModeNodeCIDR},
wantPodCIDRs: []string{"10.0.0.0/24", "2001:db2:1/64"},
},
{
name: "LocalModeClusterCIDR does not get the node PodCIDRs",
node: makeNodeWithPodCIDRs("10.0.0.0/24", "2001:db2:1/64"),
config: &proxyconfigapi.KubeProxyConfiguration{DetectLocalMode: proxyconfigapi.LocalModeClusterCIDR},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
client := clientsetfake.NewSimpleClientset(tt.node)
s := &ProxyServer{
Config: tt.config,
Client: client,
NodeName: "nodename",
NodeIPs: map[v1.IPFamily]net.IP{
v1.IPv4Protocol: netutils.ParseIPSloppy("127.0.0.1"),
v1.IPv6Protocol: net.IPv6zero,
},
}
err := s.platformSetup(ctx)
if err != nil {
t.Errorf("ProxyServer.createProxier() error = %v", err)
return
}
if !reflect.DeepEqual(s.podCIDRs, tt.wantPodCIDRs) {
t.Errorf("Expected PodCIDRs %v got %v", tt.wantPodCIDRs, s.podCIDRs)
}
})
}
}
type fakeConntracker struct {
called []string
err error

View file

@ -22,7 +22,6 @@ import (
"net"
"os"
"reflect"
"sync"
"time"
v1 "k8s.io/api/core/v1"
@ -40,69 +39,6 @@ import (
utilnode "k8s.io/kubernetes/pkg/util/node"
)
// NodePodCIDRHandler handles the life cycle of kube-proxy based on the node PodCIDR assigned
// Implements the config.NodeHandler interface
// https://issues.k8s.io/111321
type NodePodCIDRHandler struct {
mu sync.Mutex
podCIDRs []string
logger klog.Logger
}
func NewNodePodCIDRHandler(ctx context.Context, podCIDRs []string) *NodePodCIDRHandler {
return &NodePodCIDRHandler{
podCIDRs: podCIDRs,
logger: klog.FromContext(ctx),
}
}
var _ config.NodeHandler = &NodePodCIDRHandler{}
// OnNodeAdd is a handler for Node creates.
func (n *NodePodCIDRHandler) OnNodeAdd(node *v1.Node) {
n.mu.Lock()
defer n.mu.Unlock()
podCIDRs := node.Spec.PodCIDRs
// initialize podCIDRs
if len(n.podCIDRs) == 0 && len(podCIDRs) > 0 {
n.logger.Info("Setting current PodCIDRs", "podCIDRs", podCIDRs)
n.podCIDRs = podCIDRs
return
}
if !reflect.DeepEqual(n.podCIDRs, podCIDRs) {
n.logger.Error(nil, "Using NodeCIDR LocalDetector mode, current PodCIDRs are different than previous PodCIDRs, restarting",
"node", klog.KObj(node), "newPodCIDRs", podCIDRs, "oldPodCIDRs", n.podCIDRs)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
}
// OnNodeUpdate is a handler for Node updates.
func (n *NodePodCIDRHandler) OnNodeUpdate(_, node *v1.Node) {
n.mu.Lock()
defer n.mu.Unlock()
podCIDRs := node.Spec.PodCIDRs
// initialize podCIDRs
if len(n.podCIDRs) == 0 && len(podCIDRs) > 0 {
n.logger.Info("Setting current PodCIDRs", "podCIDRs", podCIDRs)
n.podCIDRs = podCIDRs
return
}
if !reflect.DeepEqual(n.podCIDRs, podCIDRs) {
n.logger.Error(nil, "Using NodeCIDR LocalDetector mode, current PodCIDRs are different than previous PodCIDRs, restarting",
"node", klog.KObj(node), "newPodCIDRs", podCIDRs, "oldPODCIDRs", n.podCIDRs)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
}
// OnNodeDelete is a handler for Node deletes.
func (n *NodePodCIDRHandler) OnNodeDelete(node *v1.Node) {
n.logger.Error(nil, "Current Node is being deleted", "node", klog.KObj(node))
}
// OnNodeSynced is a handler for Node syncs.
func (n *NodePodCIDRHandler) OnNodeSynced() {}
// NodeEligibleHandler handles the life cycle of the Node's eligibility, as
// determined by the health server for directing load balancer traffic.
type NodeEligibleHandler struct {
@ -123,25 +59,35 @@ func (n *NodeEligibleHandler) OnNodeDelete(node *v1.Node) { n.HealthServer.SyncN
// OnNodeSynced is a handler for Node syncs.
func (n *NodeEligibleHandler) OnNodeSynced() {}
// NodeManager handles the life cycle of kube-proxy based on the NodeIPs, handles node watch events
// and crashes kube-proxy if there are any changes in NodeIPs.
// NodeManager handles the life cycle of kube-proxy based on the NodeIPs and PodCIDRs handles
// node watch events and crashes kube-proxy if there are any changes in NodeIPs or PodCIDRs.
// Note: It only crashes on change on PodCIDR when watchPodCIDRs is set to true.
type NodeManager struct {
nodeInformer v1informers.NodeInformer
nodeLister corelisters.NodeLister
exitFunc func(exitCode int)
nodeInformer v1informers.NodeInformer
nodeLister corelisters.NodeLister
exitFunc func(exitCode int)
watchPodCIDRs bool
nodeIPs []net.IP
nodeIPs []net.IP
podCIDRs []string
}
// NewNodeManager initializes node informer that selects for the given node, waits for cache sync
// and returns NodeManager after waiting some amount of time for the node object to exist
// and have NodeIPs. Note: NewNodeManager doesn't return any error if failed to retrieve NodeIPs.
func NewNodeManager(ctx context.Context, client clientset.Interface, resyncInterval time.Duration, nodeName string) (*NodeManager, error) {
return newNodeManager(ctx, client, resyncInterval, nodeName, os.Exit, time.Second, 30*time.Second)
// and have NodeIPs (and PodCIDRs if watchPodCIDRs is true). Note: for backward compatibility,
// NewNodeManager doesn't return any error if it failed to retrieve NodeIPs and watchPodCIDRs
// is false.
func NewNodeManager(ctx context.Context, client clientset.Interface,
resyncInterval time.Duration, nodeName string, watchPodCIDRs bool,
) (*NodeManager, error) {
return newNodeManager(ctx, client, resyncInterval, nodeName, watchPodCIDRs, os.Exit, time.Second, 30*time.Second, 5*time.Minute)
}
// newNodeManager implements NewNodeManager with configurable exit function, poll interval and timeout.
func newNodeManager(ctx context.Context, client clientset.Interface, resyncInterval time.Duration, nodeName string, exitFunc func(int), pollInterval, pollTimeout time.Duration) (*NodeManager, error) {
// newNodeManager implements NewNodeManager with configurable exit function, poll interval and timeouts.
func newNodeManager(ctx context.Context, client clientset.Interface, resyncInterval time.Duration,
nodeName string, watchPodCIDRs bool, exitFunc func(int),
pollInterval, nodeIPsTimeout, podCIDRsTimeout time.Duration,
) (*NodeManager, error) {
// make an informer that selects for the given node
thisNodeInformerFactory := informers.NewSharedInformerFactoryWithOptions(client, resyncInterval,
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
@ -156,20 +102,38 @@ func newNodeManager(ctx context.Context, client clientset.Interface, resyncInter
return nil, fmt.Errorf("can not sync node informer")
}
node, nodeIPs := getNodeInfo(nodeLister, nodeName)
node, nodeIPs, podCIDRs := getNodeInfo(nodeLister, nodeName)
if len(nodeIPs) == 0 {
// wait for the node object to (hopefully) exist and have NodeIPs.
ctx, cancel := context.WithTimeout(ctx, pollTimeout)
// wait for the node object to exist and have NodeIPs.
ctx, cancel := context.WithTimeout(ctx, nodeIPsTimeout)
defer cancel()
_ = wait.PollUntilContextCancel(ctx, pollInterval, true, func(context.Context) (bool, error) {
node, nodeIPs = getNodeInfo(nodeLister, nodeName)
_ = wait.PollUntilContextCancel(ctx, pollInterval, false, func(context.Context) (bool, error) {
node, nodeIPs, podCIDRs = getNodeInfo(nodeLister, nodeName)
return len(nodeIPs) != 0, nil
})
}
// For backward-compatibility, we keep going even if we didn't find a node or it
// didn't have IPs.
if watchPodCIDRs && len(podCIDRs) == 0 {
// wait some additional time for the PodCIDRs.
ctx, cancel := context.WithTimeout(ctx, podCIDRsTimeout)
defer cancel()
_ = wait.PollUntilContextCancel(ctx, pollInterval, false, func(context.Context) (bool, error) {
node, nodeIPs, podCIDRs = getNodeInfo(nodeLister, nodeName)
return len(podCIDRs) != 0, nil
})
if len(podCIDRs) == 0 {
if node == nil {
return nil, fmt.Errorf("timeout waiting for node %q to exist", nodeName)
} else {
return nil, fmt.Errorf("timeout waiting for PodCIDR allocation on node %q", nodeName)
}
}
}
// For backward-compatibility, we keep going even if we didn't find a node (in
// non-watchPodCIDRs mode) or it didn't have IPs.
if node == nil {
klog.FromContext(ctx).Error(nil, "Timed out waiting for node %q to exist", nodeName)
} else if len(nodeIPs) == 0 {
@ -177,21 +141,23 @@ func newNodeManager(ctx context.Context, client clientset.Interface, resyncInter
}
return &NodeManager{
nodeInformer: nodeInformer,
nodeLister: nodeLister,
exitFunc: exitFunc,
nodeInformer: nodeInformer,
nodeLister: nodeLister,
exitFunc: exitFunc,
watchPodCIDRs: watchPodCIDRs,
nodeIPs: nodeIPs,
nodeIPs: nodeIPs,
podCIDRs: podCIDRs,
}, nil
}
func getNodeInfo(nodeLister corelisters.NodeLister, nodeName string) (*v1.Node, []net.IP) {
func getNodeInfo(nodeLister corelisters.NodeLister, nodeName string) (*v1.Node, []net.IP, []string) {
node, _ := nodeLister.Get(nodeName)
if node == nil {
return nil, nil
return nil, nil, nil
}
nodeIPs, _ := utilnode.GetNodeHostIPs(node)
return node, nodeIPs
return node, nodeIPs, node.Spec.PodCIDRs
}
// NodeIPs returns the NodeIPs polled in NewNodeManager(). (This may be empty if
@ -200,6 +166,11 @@ func (n *NodeManager) NodeIPs() []net.IP {
return n.nodeIPs
}
// PodCIDRs returns the PodCIDRs polled in NewNodeManager().
func (n *NodeManager) PodCIDRs() []string {
return n.podCIDRs
}
// NodeInformer returns the NodeInformer.
func (n *NodeManager) NodeInformer() v1informers.NodeInformer {
return n.nodeInformer
@ -217,6 +188,18 @@ func (n *NodeManager) OnNodeUpdate(_, node *v1.Node) {
// onNodeChange functions helps to implement OnNodeAdd and OnNodeUpdate.
func (n *NodeManager) onNodeChange(node *v1.Node) {
// We exit whenever there is a change in PodCIDRs detected initially, and PodCIDRs received
// on node watch event if the node manager is configured with watchPodCIDRs.
if n.watchPodCIDRs {
podCIDRs := node.Spec.PodCIDRs
if !reflect.DeepEqual(n.podCIDRs, podCIDRs) {
klog.InfoS("PodCIDRs changed for the node",
"node", klog.KObj(node), "newPodCIDRs", podCIDRs, "oldPodCIDRs", n.podCIDRs)
klog.Flush()
n.exitFunc(1)
}
}
nodeIPs, _ := utilnode.GetNodeHostIPs(node)
// We exit whenever there is a change in NodeIPs detected initially, and NodeIPs received

View file

@ -19,7 +19,6 @@ package proxy
import (
"context"
"net"
"strconv"
"testing"
"time"
@ -29,7 +28,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
clientsetfake "k8s.io/client-go/kubernetes/fake"
"k8s.io/klog/v2"
"k8s.io/kubernetes/test/utils/ktesting"
netutils "k8s.io/utils/net"
"k8s.io/utils/ptr"
@ -39,131 +37,6 @@ const (
testNodeName = "test-node"
)
func TestNodePodCIDRHandlerAdd(t *testing.T) {
oldKlogOsExit := klog.OsExit
defer func() {
klog.OsExit = oldKlogOsExit
}()
klog.OsExit = customExit
tests := []struct {
name string
oldNodePodCIDRs []string
newNodePodCIDRs []string
expectPanic bool
}{
{
name: "both empty",
},
{
name: "initialized correctly",
newNodePodCIDRs: []string{"192.168.1.0/24", "fd00:1:2:3::/64"},
},
{
name: "already initialized and same node",
oldNodePodCIDRs: []string{"10.0.0.0/24", "fd00:3:2:1::/64"},
newNodePodCIDRs: []string{"10.0.0.0/24", "fd00:3:2:1::/64"},
},
{
name: "already initialized and different node",
oldNodePodCIDRs: []string{"192.168.1.0/24", "fd00:1:2:3::/64"},
newNodePodCIDRs: []string{"10.0.0.0/24", "fd00:3:2:1::/64"},
expectPanic: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
n := &NodePodCIDRHandler{
podCIDRs: tt.oldNodePodCIDRs,
}
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node",
ResourceVersion: "1",
},
Spec: v1.NodeSpec{
PodCIDRs: tt.newNodePodCIDRs,
},
}
defer func() {
r := recover()
if r == nil && tt.expectPanic {
t.Errorf("The code did not panic")
} else if r != nil && !tt.expectPanic {
t.Errorf("The code did panic")
}
}()
n.OnNodeAdd(node)
})
}
}
func TestNodePodCIDRHandlerUpdate(t *testing.T) {
oldKlogOsExit := klog.OsExit
defer func() {
klog.OsExit = oldKlogOsExit
}()
klog.OsExit = customExit
tests := []struct {
name string
oldNodePodCIDRs []string
newNodePodCIDRs []string
expectPanic bool
}{
{
name: "both empty",
},
{
name: "initialize",
newNodePodCIDRs: []string{"192.168.1.0/24", "fd00:1:2:3::/64"},
},
{
name: "same node",
oldNodePodCIDRs: []string{"192.168.1.0/24", "fd00:1:2:3::/64"},
newNodePodCIDRs: []string{"192.168.1.0/24", "fd00:1:2:3::/64"},
},
{
name: "different nodes",
oldNodePodCIDRs: []string{"192.168.1.0/24", "fd00:1:2:3::/64"},
newNodePodCIDRs: []string{"10.0.0.0/24", "fd00:3:2:1::/64"},
expectPanic: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
n := &NodePodCIDRHandler{
podCIDRs: tt.oldNodePodCIDRs,
}
oldNode := &v1.Node{}
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node",
ResourceVersion: "1",
},
Spec: v1.NodeSpec{
PodCIDRs: tt.newNodePodCIDRs,
},
}
defer func() {
r := recover()
if r == nil && tt.expectPanic {
t.Errorf("The code did not panic")
} else if r != nil && !tt.expectPanic {
t.Errorf("The code did panic")
}
}()
n.OnNodeUpdate(oldNode, node)
})
}
}
func customExit(exitCode int) {
panic(strconv.Itoa(exitCode))
}
type nodeTweak func(n *v1.Node)
func makeNode(tweaks ...nodeTweak) *v1.Node {
@ -186,18 +59,32 @@ func tweakNodeIPs(nodeIPs ...string) nodeTweak {
}
}
func tweakPodCIDRs(podCIDRs ...string) nodeTweak {
return func(n *v1.Node) {
n.Spec.PodCIDRs = append(n.Spec.PodCIDRs, podCIDRs...)
}
}
func TestNewNodeManager(t *testing.T) {
testCases := []struct {
name string
nodeUpdates []func(context.Context, clientset.Interface)
expectedNodeIPs []net.IP
expectedError string
name string
watchPodCIDRs bool
nodeUpdates []func(context.Context, clientset.Interface)
expectedNodeIPs []net.IP
expectedPodCIDRs []string
expectedError string
}{
{
name: "node object doesn't exist",
// times out and ignores the error
expectedNodeIPs: nil,
},
{
name: "node object doesn't exist, with watchPodCIDRs",
watchPodCIDRs: true,
// assert on error thrown by newNodeManager()
expectedError: "timeout waiting for node \"test-node\" to exist",
},
{
name: "node object exist without NodeIP",
nodeUpdates: []func(ctx context.Context, client clientset.Interface){
@ -234,6 +121,81 @@ func TestNewNodeManager(t *testing.T) {
},
expectedNodeIPs: []net.IP{netutils.ParseIPSloppy("192.168.1.10")},
},
{
name: "watchPodCIDRs and node object exist without PodCIDRs",
watchPodCIDRs: true,
nodeUpdates: []func(ctx context.Context, client clientset.Interface){
func(ctx context.Context, client clientset.Interface) {
// node object doesn't exist initially
},
func(ctx context.Context, client clientset.Interface) {
// node object now exists but without NodeIP
_, _ = client.CoreV1().Nodes().Create(ctx, makeNode(), metav1.CreateOptions{})
},
func(ctx context.Context, client clientset.Interface) {
// node object got updated with NodeIPs
_, _ = client.CoreV1().Nodes().Update(ctx, makeNode(
tweakNodeIPs("192.168.1.10"),
), metav1.UpdateOptions{})
},
},
// assert on error thrown by newNodeManager()
expectedError: "timeout waiting for PodCIDR allocation on node \"test-node\"",
},
{
name: "watchPodCIDRs and node object exist with NodeIP and PodCIDR",
watchPodCIDRs: true,
nodeUpdates: []func(ctx context.Context, client clientset.Interface){
func(ctx context.Context, client clientset.Interface) {
// node object doesn't exist initially
},
func(ctx context.Context, client clientset.Interface) {
// node object now exists but without NodeIP
_, _ = client.CoreV1().Nodes().Create(ctx, makeNode(), metav1.CreateOptions{})
},
func(ctx context.Context, client clientset.Interface) {
// node object got updated with NodeIPs
_, _ = client.CoreV1().Nodes().Update(ctx, makeNode(
tweakNodeIPs("192.168.1.10"),
), metav1.UpdateOptions{})
},
func(ctx context.Context, client clientset.Interface) {
// node updated with PodCIDRs
_, _ = client.CoreV1().Nodes().Update(ctx, makeNode(
tweakNodeIPs("192.168.1.1"),
tweakPodCIDRs("10.0.0.0/24"),
), metav1.UpdateOptions{})
},
},
expectedNodeIPs: []net.IP{netutils.ParseIPSloppy("192.168.1.1")},
expectedPodCIDRs: []string{"10.0.0.0/24"},
},
{
name: "watchPodCIDRs and node object exist without NodeIP and with PodCIDR",
watchPodCIDRs: true,
nodeUpdates: []func(ctx context.Context, client clientset.Interface){
func(ctx context.Context, client clientset.Interface) {
// node object doesn't exist initially
},
func(ctx context.Context, client clientset.Interface) {
// node object now exists but without NodeIP
_, _ = client.CoreV1().Nodes().Create(ctx, makeNode(), metav1.CreateOptions{})
},
func(ctx context.Context, client clientset.Interface) {
// node updated with PodCIDRs
_, _ = client.CoreV1().Nodes().Update(ctx, makeNode(
tweakPodCIDRs("10.0.0.0/24"),
), metav1.UpdateOptions{})
},
},
// times out and ignores the error
expectedNodeIPs: nil,
expectedPodCIDRs: []string{"10.0.0.0/24"},
},
}
for _, tc := range testCases {
@ -254,13 +216,14 @@ func TestNewNodeManager(t *testing.T) {
}
}()
// initialize the node manager with 10ms poll interval and 1s poll timeout
nodeManager, err := newNodeManager(ctx, client, time.Second, testNodeName, func(i int) {}, 10*time.Millisecond, time.Second)
nodeManager, err := newNodeManager(ctx, client, time.Second, testNodeName, tc.watchPodCIDRs, func(i int) {}, 10*time.Millisecond, time.Second, time.Second)
if len(tc.expectedError) > 0 {
require.Nil(t, nodeManager)
require.ErrorContains(t, err, tc.expectedError)
} else {
require.NoError(t, err)
require.Equal(t, tc.expectedNodeIPs, nodeManager.NodeIPs())
require.Equal(t, tc.expectedPodCIDRs, nodeManager.PodCIDRs())
}
})
}
@ -270,7 +233,10 @@ func TestNodeManagerOnNodeChange(t *testing.T) {
tests := []struct {
name string
initialNodeIPs []string
initialPodCIDRs []string
updatedNodeIPs []string
updatedPodCIDRs []string
watchPodCIDRs bool
expectedExitCode *int
}{
{
@ -285,6 +251,24 @@ func TestNodeManagerOnNodeChange(t *testing.T) {
updatedNodeIPs: []string{"10.0.1.1", "fd00:3:2:1::2"},
expectedExitCode: ptr.To(1),
},
{
name: "watchPodCIDR and node updated with same PodCIDRs",
initialNodeIPs: []string{"192.168.1.1", "fd00:1:2:3::1"},
initialPodCIDRs: []string{"10.0.0.0/8", "fd01:2345::/64"},
updatedNodeIPs: []string{"192.168.1.1", "fd00:1:2:3::1"},
updatedPodCIDRs: []string{"10.0.0.0/8", "fd01:2345::/64"},
watchPodCIDRs: true,
expectedExitCode: nil,
},
{
name: "watchPodCIDR and node updated with different PodCIDRs",
initialNodeIPs: []string{"192.168.1.1", "fd00:1:2:3::1"},
initialPodCIDRs: []string{"10.0.0.0/8", "fd01:2345::/64"},
updatedNodeIPs: []string{"192.168.1.1", "fd00:1:2:3::1"},
updatedPodCIDRs: []string{"172.16.10.0/24", "fd01:5422::/64"},
watchPodCIDRs: true,
expectedExitCode: ptr.To(1),
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
@ -295,13 +279,16 @@ func TestNodeManagerOnNodeChange(t *testing.T) {
}
client := clientsetfake.NewClientset()
_, err := client.CoreV1().Nodes().Create(ctx, makeNode(tweakNodeIPs(tc.initialNodeIPs...)), metav1.CreateOptions{})
_, err := client.CoreV1().Nodes().Create(ctx, makeNode(
tweakNodeIPs(tc.initialNodeIPs...),
tweakPodCIDRs(tc.initialPodCIDRs...),
), metav1.CreateOptions{})
require.NoError(t, err)
nodeManager, err := newNodeManager(ctx, client, 30*time.Second, testNodeName, exitFunc, 10*time.Millisecond, time.Second)
nodeManager, err := newNodeManager(ctx, client, 30*time.Second, testNodeName, tc.watchPodCIDRs, exitFunc, 10*time.Millisecond, time.Second, time.Second)
require.NoError(t, err)
nodeManager.onNodeChange(makeNode(tweakNodeIPs(tc.updatedNodeIPs...)))
nodeManager.onNodeChange(makeNode(tweakNodeIPs(tc.updatedNodeIPs...), tweakPodCIDRs(tc.updatedPodCIDRs...)))
require.Equal(t, tc.expectedExitCode, exitCode)
})
}
@ -315,7 +302,7 @@ func TestNodeManagerOnNodeDelete(t *testing.T) {
}
client := clientsetfake.NewClientset()
_, _ = client.CoreV1().Nodes().Create(ctx, makeNode(tweakNodeIPs("192.168.1.1")), metav1.CreateOptions{})
nodeManager, err := newNodeManager(ctx, client, 30*time.Second, testNodeName, exitFunc, 10*time.Millisecond, time.Second)
nodeManager, err := newNodeManager(ctx, client, 30*time.Second, testNodeName, false, exitFunc, 10*time.Millisecond, time.Second, time.Second)
require.NoError(t, err)
nodeManager.OnNodeDelete(makeNode())