diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 51754c07f29..2e85bf813c7 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -208,7 +208,8 @@ func newProxyServer(ctx context.Context, config *kubeproxyconfig.KubeProxyConfig } // NodeManager makes an informer that selects for the node where this kube-proxy is running - s.NodeManager, err = proxy.NewNodeManager(ctx, s.Client, s.Config.ConfigSyncPeriod.Duration, s.NodeName) + s.NodeManager, err = proxy.NewNodeManager(ctx, s.Client, s.Config.ConfigSyncPeriod.Duration, + s.NodeName, s.Config.DetectLocalMode == kubeproxyconfig.LocalModeNodeCIDR) if err != nil { return nil, err } @@ -218,6 +219,7 @@ func newProxyServer(ctx context.Context, config *kubeproxyconfig.KubeProxyConfig logger.Info("Successfully retrieved NodeIPs", "NodeIPs", rawNodeIPs) } s.PrimaryIPFamily, s.NodeIPs = detectNodeIPs(ctx, rawNodeIPs, config.BindAddress) + s.podCIDRs = s.NodeManager.PodCIDRs() if len(config.NodePortAddresses) == 1 && config.NodePortAddresses[0] == kubeproxyconfig.NodePortAddressesPrimary { var nodePortAddresses []string @@ -606,10 +608,6 @@ func (s *ProxyServer) Run(ctx context.Context) error { // hollow-proxy doesn't need node config, and we don't create nodeManager for hollow-proxy. if s.NodeManager != nil { nodeConfig := config.NewNodeConfig(ctx, s.NodeManager.NodeInformer(), s.Config.ConfigSyncPeriod.Duration) - // https://issues.k8s.io/111321 - if s.Config.DetectLocalMode == kubeproxyconfig.LocalModeNodeCIDR { - nodeConfig.RegisterEventHandler(proxy.NewNodePodCIDRHandler(ctx, s.podCIDRs)) - } nodeConfig.RegisterEventHandler(&proxy.NodeEligibleHandler{ HealthServer: s.HealthzServer, }) diff --git a/cmd/kube-proxy/app/server_linux.go b/cmd/kube-proxy/app/server_linux.go index 7b967419705..37c4da7bbbc 100644 --- a/cmd/kube-proxy/app/server_linux.go +++ b/cmd/kube-proxy/app/server_linux.go @@ -33,13 +33,6 @@ import ( "github.com/google/cadvisor/utils/sysfs" v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/watch" - clientset "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" - toolswatch "k8s.io/client-go/tools/watch" utilsysctl "k8s.io/component-helpers/node/util/sysctl" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/proxy" @@ -53,10 +46,6 @@ import ( utiliptables "k8s.io/kubernetes/pkg/util/iptables" ) -// timeoutForNodePodCIDR is the time to wait for allocators to assign a PodCIDR to the -// node after it is registered. -var timeoutForNodePodCIDR = 5 * time.Minute - // platformApplyDefaults is called after parsing command-line flags and/or reading the // config file, to apply platform-specific default values to config. func (o *Options) platformApplyDefaults(config *proxyconfigapi.KubeProxyConfiguration) { @@ -80,17 +69,6 @@ func (o *Options) platformApplyDefaults(config *proxyconfigapi.KubeProxyConfigur // Proxier. It should fill in any platform-specific fields and perform other // platform-specific setup. func (s *ProxyServer) platformSetup(ctx context.Context) error { - logger := klog.FromContext(ctx) - if s.Config.DetectLocalMode == proxyconfigapi.LocalModeNodeCIDR { - logger.Info("Watching for node, awaiting podCIDR allocation", "node", s.NodeName) - node, err := waitForPodCIDR(ctx, s.Client, s.NodeName) - if err != nil { - return err - } - s.podCIDRs = node.Spec.PodCIDRs - logger.Info("NodeInfo", "podCIDRs", node.Spec.PodCIDRs) - } - ct := &realConntracker{} err := s.setupConntrack(ctx, ct) if err != nil { @@ -391,50 +369,6 @@ func getConntrackMax(ctx context.Context, config proxyconfigapi.KubeProxyConntra return 0, nil } -func waitForPodCIDR(ctx context.Context, client clientset.Interface, nodeName string) (*v1.Node, error) { - // since allocators can assign the podCIDR after the node registers, we do a watch here to wait - // for podCIDR to be assigned, instead of assuming that the Get() on startup will have it. - ctx, cancelFunc := context.WithTimeout(ctx, timeoutForNodePodCIDR) - defer cancelFunc() - - fieldSelector := fields.OneTermEqualSelector("metadata.name", nodeName).String() - lw := &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (object runtime.Object, e error) { - options.FieldSelector = fieldSelector - return client.CoreV1().Nodes().List(ctx, options) - }, - WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) { - options.FieldSelector = fieldSelector - return client.CoreV1().Nodes().Watch(ctx, options) - }, - } - condition := func(event watch.Event) (bool, error) { - // don't process delete events - if event.Type != watch.Modified && event.Type != watch.Added { - return false, nil - } - - n, ok := event.Object.(*v1.Node) - if !ok { - return false, fmt.Errorf("event object not of type Node") - } - // don't consider the node if is going to be deleted and keep waiting - if !n.DeletionTimestamp.IsZero() { - return false, nil - } - return n.Spec.PodCIDR != "" && len(n.Spec.PodCIDRs) > 0, nil - } - - evt, err := toolswatch.UntilWithSync(ctx, lw, &v1.Node{}, nil, condition) - if err != nil { - return nil, fmt.Errorf("timeout waiting for PodCIDR allocation to configure detect-local-mode %v: %v", proxyconfigapi.LocalModeNodeCIDR, err) - } - if n, ok := evt.Object.(*v1.Node); ok { - return n, nil - } - return nil, fmt.Errorf("event object not of type node") -} - func detectNumCPU() int { // try get numCPU from /sys firstly due to a known issue (https://github.com/kubernetes/kubernetes/issues/99225) _, numCPU, err := machine.GetTopology(sysfs.NewRealSysFs()) diff --git a/cmd/kube-proxy/app/server_linux_test.go b/cmd/kube-proxy/app/server_linux_test.go index 3918a152793..04d33569eca 100644 --- a/cmd/kube-proxy/app/server_linux_test.go +++ b/cmd/kube-proxy/app/server_linux_test.go @@ -23,7 +23,6 @@ import ( "context" "errors" "fmt" - "net" "os" "path/filepath" "reflect" @@ -36,14 +35,9 @@ import ( "github.com/spf13/pflag" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/watch" - clientsetfake "k8s.io/client-go/kubernetes/fake" - clientgotesting "k8s.io/client-go/testing" proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config" proxyutil "k8s.io/kubernetes/pkg/proxy/util" "k8s.io/kubernetes/test/utils/ktesting" - netutils "k8s.io/utils/net" "k8s.io/utils/ptr" ) @@ -445,18 +439,6 @@ func Test_getLocalDetectors(t *testing.T) { } } -func makeNodeWithPodCIDRs(cidrs ...string) *v1.Node { - if len(cidrs) == 0 { - return &v1.Node{} - } - return &v1.Node{ - Spec: v1.NodeSpec{ - PodCIDR: cidrs[0], - PodCIDRs: cidrs, - }, - } -} - func TestConfigChange(t *testing.T) { setUp := func() (*os.File, string, error) { tempDir, err := os.MkdirTemp("", "kubeproxy-config-change") @@ -574,56 +556,6 @@ detectLocalMode: "BridgeInterface"`) } } -func Test_waitForPodCIDR(t *testing.T) { - _, ctx := ktesting.NewTestContext(t) - expected := []string{"192.168.0.0/24", "fd00:1:2::/64"} - nodeName := "test-node" - oldNode := &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: nodeName, - ResourceVersion: "1000", - }, - Spec: v1.NodeSpec{ - PodCIDR: "10.0.0.0/24", - PodCIDRs: []string{"10.0.0.0/24", "2001:db2:1/64"}, - }, - } - node := &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: nodeName, - ResourceVersion: "1", - }, - } - updatedNode := node.DeepCopy() - updatedNode.Spec.PodCIDRs = expected - updatedNode.Spec.PodCIDR = expected[0] - - // start with the new node - client := clientsetfake.NewSimpleClientset() - client.AddReactor("list", "nodes", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) { - obj := &v1.NodeList{} - return true, obj, nil - }) - fakeWatch := watch.NewFake() - client.PrependWatchReactor("nodes", clientgotesting.DefaultWatchReactor(fakeWatch, nil)) - - go func() { - fakeWatch.Add(node) - // receive a delete event for the old node - fakeWatch.Delete(oldNode) - // set the PodCIDRs on the new node - fakeWatch.Modify(updatedNode) - }() - got, err := waitForPodCIDR(ctx, client, node.Name) - if err != nil { - t.Errorf("waitForPodCIDR() unexpected error %v", err) - return - } - if !reflect.DeepEqual(got.Spec.PodCIDRs, expected) { - t.Errorf("waitForPodCIDR() got %v expected to be %v ", got.Spec.PodCIDRs, expected) - } -} - func TestGetConntrackMax(t *testing.T) { ncores := goruntime.NumCPU() testCases := []struct { @@ -671,57 +603,6 @@ func TestGetConntrackMax(t *testing.T) { } } -func TestProxyServer_platformSetup(t *testing.T) { - tests := []struct { - name string - node *v1.Node - config *proxyconfigapi.KubeProxyConfiguration - wantPodCIDRs []string - }{ - { - name: "LocalModeNodeCIDR store the node PodCIDRs obtained", - node: makeNodeWithPodCIDRs("10.0.0.0/24"), - config: &proxyconfigapi.KubeProxyConfiguration{DetectLocalMode: proxyconfigapi.LocalModeNodeCIDR}, - wantPodCIDRs: []string{"10.0.0.0/24"}, - }, - { - name: "LocalModeNodeCIDR store the node PodCIDRs obtained dual stack", - node: makeNodeWithPodCIDRs("10.0.0.0/24", "2001:db2:1/64"), - config: &proxyconfigapi.KubeProxyConfiguration{DetectLocalMode: proxyconfigapi.LocalModeNodeCIDR}, - wantPodCIDRs: []string{"10.0.0.0/24", "2001:db2:1/64"}, - }, - { - name: "LocalModeClusterCIDR does not get the node PodCIDRs", - node: makeNodeWithPodCIDRs("10.0.0.0/24", "2001:db2:1/64"), - config: &proxyconfigapi.KubeProxyConfiguration{DetectLocalMode: proxyconfigapi.LocalModeClusterCIDR}, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - _, ctx := ktesting.NewTestContext(t) - client := clientsetfake.NewSimpleClientset(tt.node) - s := &ProxyServer{ - Config: tt.config, - Client: client, - NodeName: "nodename", - NodeIPs: map[v1.IPFamily]net.IP{ - v1.IPv4Protocol: netutils.ParseIPSloppy("127.0.0.1"), - v1.IPv6Protocol: net.IPv6zero, - }, - } - err := s.platformSetup(ctx) - if err != nil { - t.Errorf("ProxyServer.createProxier() error = %v", err) - return - } - if !reflect.DeepEqual(s.podCIDRs, tt.wantPodCIDRs) { - t.Errorf("Expected PodCIDRs %v got %v", tt.wantPodCIDRs, s.podCIDRs) - } - - }) - } -} - type fakeConntracker struct { called []string err error diff --git a/pkg/proxy/node.go b/pkg/proxy/node.go index 53555c27ef4..aabc0e057cc 100644 --- a/pkg/proxy/node.go +++ b/pkg/proxy/node.go @@ -22,7 +22,6 @@ import ( "net" "os" "reflect" - "sync" "time" v1 "k8s.io/api/core/v1" @@ -40,69 +39,6 @@ import ( utilnode "k8s.io/kubernetes/pkg/util/node" ) -// NodePodCIDRHandler handles the life cycle of kube-proxy based on the node PodCIDR assigned -// Implements the config.NodeHandler interface -// https://issues.k8s.io/111321 -type NodePodCIDRHandler struct { - mu sync.Mutex - podCIDRs []string - logger klog.Logger -} - -func NewNodePodCIDRHandler(ctx context.Context, podCIDRs []string) *NodePodCIDRHandler { - return &NodePodCIDRHandler{ - podCIDRs: podCIDRs, - logger: klog.FromContext(ctx), - } -} - -var _ config.NodeHandler = &NodePodCIDRHandler{} - -// OnNodeAdd is a handler for Node creates. -func (n *NodePodCIDRHandler) OnNodeAdd(node *v1.Node) { - n.mu.Lock() - defer n.mu.Unlock() - - podCIDRs := node.Spec.PodCIDRs - // initialize podCIDRs - if len(n.podCIDRs) == 0 && len(podCIDRs) > 0 { - n.logger.Info("Setting current PodCIDRs", "podCIDRs", podCIDRs) - n.podCIDRs = podCIDRs - return - } - if !reflect.DeepEqual(n.podCIDRs, podCIDRs) { - n.logger.Error(nil, "Using NodeCIDR LocalDetector mode, current PodCIDRs are different than previous PodCIDRs, restarting", - "node", klog.KObj(node), "newPodCIDRs", podCIDRs, "oldPodCIDRs", n.podCIDRs) - klog.FlushAndExit(klog.ExitFlushTimeout, 1) - } -} - -// OnNodeUpdate is a handler for Node updates. -func (n *NodePodCIDRHandler) OnNodeUpdate(_, node *v1.Node) { - n.mu.Lock() - defer n.mu.Unlock() - podCIDRs := node.Spec.PodCIDRs - // initialize podCIDRs - if len(n.podCIDRs) == 0 && len(podCIDRs) > 0 { - n.logger.Info("Setting current PodCIDRs", "podCIDRs", podCIDRs) - n.podCIDRs = podCIDRs - return - } - if !reflect.DeepEqual(n.podCIDRs, podCIDRs) { - n.logger.Error(nil, "Using NodeCIDR LocalDetector mode, current PodCIDRs are different than previous PodCIDRs, restarting", - "node", klog.KObj(node), "newPodCIDRs", podCIDRs, "oldPODCIDRs", n.podCIDRs) - klog.FlushAndExit(klog.ExitFlushTimeout, 1) - } -} - -// OnNodeDelete is a handler for Node deletes. -func (n *NodePodCIDRHandler) OnNodeDelete(node *v1.Node) { - n.logger.Error(nil, "Current Node is being deleted", "node", klog.KObj(node)) -} - -// OnNodeSynced is a handler for Node syncs. -func (n *NodePodCIDRHandler) OnNodeSynced() {} - // NodeEligibleHandler handles the life cycle of the Node's eligibility, as // determined by the health server for directing load balancer traffic. type NodeEligibleHandler struct { @@ -123,25 +59,35 @@ func (n *NodeEligibleHandler) OnNodeDelete(node *v1.Node) { n.HealthServer.SyncN // OnNodeSynced is a handler for Node syncs. func (n *NodeEligibleHandler) OnNodeSynced() {} -// NodeManager handles the life cycle of kube-proxy based on the NodeIPs, handles node watch events -// and crashes kube-proxy if there are any changes in NodeIPs. +// NodeManager handles the life cycle of kube-proxy based on the NodeIPs and PodCIDRs handles +// node watch events and crashes kube-proxy if there are any changes in NodeIPs or PodCIDRs. +// Note: It only crashes on change on PodCIDR when watchPodCIDRs is set to true. type NodeManager struct { - nodeInformer v1informers.NodeInformer - nodeLister corelisters.NodeLister - exitFunc func(exitCode int) + nodeInformer v1informers.NodeInformer + nodeLister corelisters.NodeLister + exitFunc func(exitCode int) + watchPodCIDRs bool - nodeIPs []net.IP + nodeIPs []net.IP + podCIDRs []string } // NewNodeManager initializes node informer that selects for the given node, waits for cache sync // and returns NodeManager after waiting some amount of time for the node object to exist -// and have NodeIPs. Note: NewNodeManager doesn't return any error if failed to retrieve NodeIPs. -func NewNodeManager(ctx context.Context, client clientset.Interface, resyncInterval time.Duration, nodeName string) (*NodeManager, error) { - return newNodeManager(ctx, client, resyncInterval, nodeName, os.Exit, time.Second, 30*time.Second) +// and have NodeIPs (and PodCIDRs if watchPodCIDRs is true). Note: for backward compatibility, +// NewNodeManager doesn't return any error if it failed to retrieve NodeIPs and watchPodCIDRs +// is false. +func NewNodeManager(ctx context.Context, client clientset.Interface, + resyncInterval time.Duration, nodeName string, watchPodCIDRs bool, +) (*NodeManager, error) { + return newNodeManager(ctx, client, resyncInterval, nodeName, watchPodCIDRs, os.Exit, time.Second, 30*time.Second, 5*time.Minute) } -// newNodeManager implements NewNodeManager with configurable exit function, poll interval and timeout. -func newNodeManager(ctx context.Context, client clientset.Interface, resyncInterval time.Duration, nodeName string, exitFunc func(int), pollInterval, pollTimeout time.Duration) (*NodeManager, error) { +// newNodeManager implements NewNodeManager with configurable exit function, poll interval and timeouts. +func newNodeManager(ctx context.Context, client clientset.Interface, resyncInterval time.Duration, + nodeName string, watchPodCIDRs bool, exitFunc func(int), + pollInterval, nodeIPsTimeout, podCIDRsTimeout time.Duration, +) (*NodeManager, error) { // make an informer that selects for the given node thisNodeInformerFactory := informers.NewSharedInformerFactoryWithOptions(client, resyncInterval, informers.WithTweakListOptions(func(options *metav1.ListOptions) { @@ -156,20 +102,38 @@ func newNodeManager(ctx context.Context, client clientset.Interface, resyncInter return nil, fmt.Errorf("can not sync node informer") } - node, nodeIPs := getNodeInfo(nodeLister, nodeName) + node, nodeIPs, podCIDRs := getNodeInfo(nodeLister, nodeName) if len(nodeIPs) == 0 { - // wait for the node object to (hopefully) exist and have NodeIPs. - ctx, cancel := context.WithTimeout(ctx, pollTimeout) + // wait for the node object to exist and have NodeIPs. + ctx, cancel := context.WithTimeout(ctx, nodeIPsTimeout) defer cancel() - _ = wait.PollUntilContextCancel(ctx, pollInterval, true, func(context.Context) (bool, error) { - node, nodeIPs = getNodeInfo(nodeLister, nodeName) + _ = wait.PollUntilContextCancel(ctx, pollInterval, false, func(context.Context) (bool, error) { + node, nodeIPs, podCIDRs = getNodeInfo(nodeLister, nodeName) return len(nodeIPs) != 0, nil }) } - // For backward-compatibility, we keep going even if we didn't find a node or it - // didn't have IPs. + if watchPodCIDRs && len(podCIDRs) == 0 { + // wait some additional time for the PodCIDRs. + ctx, cancel := context.WithTimeout(ctx, podCIDRsTimeout) + defer cancel() + _ = wait.PollUntilContextCancel(ctx, pollInterval, false, func(context.Context) (bool, error) { + node, nodeIPs, podCIDRs = getNodeInfo(nodeLister, nodeName) + return len(podCIDRs) != 0, nil + }) + + if len(podCIDRs) == 0 { + if node == nil { + return nil, fmt.Errorf("timeout waiting for node %q to exist", nodeName) + } else { + return nil, fmt.Errorf("timeout waiting for PodCIDR allocation on node %q", nodeName) + } + } + } + + // For backward-compatibility, we keep going even if we didn't find a node (in + // non-watchPodCIDRs mode) or it didn't have IPs. if node == nil { klog.FromContext(ctx).Error(nil, "Timed out waiting for node %q to exist", nodeName) } else if len(nodeIPs) == 0 { @@ -177,21 +141,23 @@ func newNodeManager(ctx context.Context, client clientset.Interface, resyncInter } return &NodeManager{ - nodeInformer: nodeInformer, - nodeLister: nodeLister, - exitFunc: exitFunc, + nodeInformer: nodeInformer, + nodeLister: nodeLister, + exitFunc: exitFunc, + watchPodCIDRs: watchPodCIDRs, - nodeIPs: nodeIPs, + nodeIPs: nodeIPs, + podCIDRs: podCIDRs, }, nil } -func getNodeInfo(nodeLister corelisters.NodeLister, nodeName string) (*v1.Node, []net.IP) { +func getNodeInfo(nodeLister corelisters.NodeLister, nodeName string) (*v1.Node, []net.IP, []string) { node, _ := nodeLister.Get(nodeName) if node == nil { - return nil, nil + return nil, nil, nil } nodeIPs, _ := utilnode.GetNodeHostIPs(node) - return node, nodeIPs + return node, nodeIPs, node.Spec.PodCIDRs } // NodeIPs returns the NodeIPs polled in NewNodeManager(). (This may be empty if @@ -200,6 +166,11 @@ func (n *NodeManager) NodeIPs() []net.IP { return n.nodeIPs } +// PodCIDRs returns the PodCIDRs polled in NewNodeManager(). +func (n *NodeManager) PodCIDRs() []string { + return n.podCIDRs +} + // NodeInformer returns the NodeInformer. func (n *NodeManager) NodeInformer() v1informers.NodeInformer { return n.nodeInformer @@ -217,6 +188,18 @@ func (n *NodeManager) OnNodeUpdate(_, node *v1.Node) { // onNodeChange functions helps to implement OnNodeAdd and OnNodeUpdate. func (n *NodeManager) onNodeChange(node *v1.Node) { + // We exit whenever there is a change in PodCIDRs detected initially, and PodCIDRs received + // on node watch event if the node manager is configured with watchPodCIDRs. + if n.watchPodCIDRs { + podCIDRs := node.Spec.PodCIDRs + if !reflect.DeepEqual(n.podCIDRs, podCIDRs) { + klog.InfoS("PodCIDRs changed for the node", + "node", klog.KObj(node), "newPodCIDRs", podCIDRs, "oldPodCIDRs", n.podCIDRs) + klog.Flush() + n.exitFunc(1) + } + } + nodeIPs, _ := utilnode.GetNodeHostIPs(node) // We exit whenever there is a change in NodeIPs detected initially, and NodeIPs received diff --git a/pkg/proxy/node_test.go b/pkg/proxy/node_test.go index db9c68486f3..248efe1e9d6 100644 --- a/pkg/proxy/node_test.go +++ b/pkg/proxy/node_test.go @@ -19,7 +19,6 @@ package proxy import ( "context" "net" - "strconv" "testing" "time" @@ -29,7 +28,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clientset "k8s.io/client-go/kubernetes" clientsetfake "k8s.io/client-go/kubernetes/fake" - "k8s.io/klog/v2" "k8s.io/kubernetes/test/utils/ktesting" netutils "k8s.io/utils/net" "k8s.io/utils/ptr" @@ -39,131 +37,6 @@ const ( testNodeName = "test-node" ) -func TestNodePodCIDRHandlerAdd(t *testing.T) { - oldKlogOsExit := klog.OsExit - defer func() { - klog.OsExit = oldKlogOsExit - }() - klog.OsExit = customExit - - tests := []struct { - name string - oldNodePodCIDRs []string - newNodePodCIDRs []string - expectPanic bool - }{ - { - name: "both empty", - }, - { - name: "initialized correctly", - newNodePodCIDRs: []string{"192.168.1.0/24", "fd00:1:2:3::/64"}, - }, - { - name: "already initialized and same node", - oldNodePodCIDRs: []string{"10.0.0.0/24", "fd00:3:2:1::/64"}, - newNodePodCIDRs: []string{"10.0.0.0/24", "fd00:3:2:1::/64"}, - }, - { - name: "already initialized and different node", - oldNodePodCIDRs: []string{"192.168.1.0/24", "fd00:1:2:3::/64"}, - newNodePodCIDRs: []string{"10.0.0.0/24", "fd00:3:2:1::/64"}, - expectPanic: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - n := &NodePodCIDRHandler{ - podCIDRs: tt.oldNodePodCIDRs, - } - node := &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-node", - ResourceVersion: "1", - }, - Spec: v1.NodeSpec{ - PodCIDRs: tt.newNodePodCIDRs, - }, - } - defer func() { - r := recover() - if r == nil && tt.expectPanic { - t.Errorf("The code did not panic") - } else if r != nil && !tt.expectPanic { - t.Errorf("The code did panic") - } - }() - - n.OnNodeAdd(node) - }) - } -} - -func TestNodePodCIDRHandlerUpdate(t *testing.T) { - oldKlogOsExit := klog.OsExit - defer func() { - klog.OsExit = oldKlogOsExit - }() - klog.OsExit = customExit - - tests := []struct { - name string - oldNodePodCIDRs []string - newNodePodCIDRs []string - expectPanic bool - }{ - { - name: "both empty", - }, - { - name: "initialize", - newNodePodCIDRs: []string{"192.168.1.0/24", "fd00:1:2:3::/64"}, - }, - { - name: "same node", - oldNodePodCIDRs: []string{"192.168.1.0/24", "fd00:1:2:3::/64"}, - newNodePodCIDRs: []string{"192.168.1.0/24", "fd00:1:2:3::/64"}, - }, - { - name: "different nodes", - oldNodePodCIDRs: []string{"192.168.1.0/24", "fd00:1:2:3::/64"}, - newNodePodCIDRs: []string{"10.0.0.0/24", "fd00:3:2:1::/64"}, - expectPanic: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - n := &NodePodCIDRHandler{ - podCIDRs: tt.oldNodePodCIDRs, - } - oldNode := &v1.Node{} - node := &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-node", - ResourceVersion: "1", - }, - Spec: v1.NodeSpec{ - PodCIDRs: tt.newNodePodCIDRs, - }, - } - defer func() { - r := recover() - if r == nil && tt.expectPanic { - t.Errorf("The code did not panic") - } else if r != nil && !tt.expectPanic { - t.Errorf("The code did panic") - } - }() - - n.OnNodeUpdate(oldNode, node) - }) - } -} - -func customExit(exitCode int) { - panic(strconv.Itoa(exitCode)) -} - type nodeTweak func(n *v1.Node) func makeNode(tweaks ...nodeTweak) *v1.Node { @@ -186,18 +59,32 @@ func tweakNodeIPs(nodeIPs ...string) nodeTweak { } } +func tweakPodCIDRs(podCIDRs ...string) nodeTweak { + return func(n *v1.Node) { + n.Spec.PodCIDRs = append(n.Spec.PodCIDRs, podCIDRs...) + } +} + func TestNewNodeManager(t *testing.T) { testCases := []struct { - name string - nodeUpdates []func(context.Context, clientset.Interface) - expectedNodeIPs []net.IP - expectedError string + name string + watchPodCIDRs bool + nodeUpdates []func(context.Context, clientset.Interface) + expectedNodeIPs []net.IP + expectedPodCIDRs []string + expectedError string }{ { name: "node object doesn't exist", // times out and ignores the error expectedNodeIPs: nil, }, + { + name: "node object doesn't exist, with watchPodCIDRs", + watchPodCIDRs: true, + // assert on error thrown by newNodeManager() + expectedError: "timeout waiting for node \"test-node\" to exist", + }, { name: "node object exist without NodeIP", nodeUpdates: []func(ctx context.Context, client clientset.Interface){ @@ -234,6 +121,81 @@ func TestNewNodeManager(t *testing.T) { }, expectedNodeIPs: []net.IP{netutils.ParseIPSloppy("192.168.1.10")}, }, + { + name: "watchPodCIDRs and node object exist without PodCIDRs", + watchPodCIDRs: true, + nodeUpdates: []func(ctx context.Context, client clientset.Interface){ + func(ctx context.Context, client clientset.Interface) { + // node object doesn't exist initially + }, + + func(ctx context.Context, client clientset.Interface) { + // node object now exists but without NodeIP + _, _ = client.CoreV1().Nodes().Create(ctx, makeNode(), metav1.CreateOptions{}) + }, + func(ctx context.Context, client clientset.Interface) { + // node object got updated with NodeIPs + _, _ = client.CoreV1().Nodes().Update(ctx, makeNode( + tweakNodeIPs("192.168.1.10"), + ), metav1.UpdateOptions{}) + }, + }, + // assert on error thrown by newNodeManager() + expectedError: "timeout waiting for PodCIDR allocation on node \"test-node\"", + }, + { + name: "watchPodCIDRs and node object exist with NodeIP and PodCIDR", + watchPodCIDRs: true, + nodeUpdates: []func(ctx context.Context, client clientset.Interface){ + func(ctx context.Context, client clientset.Interface) { + // node object doesn't exist initially + }, + + func(ctx context.Context, client clientset.Interface) { + // node object now exists but without NodeIP + _, _ = client.CoreV1().Nodes().Create(ctx, makeNode(), metav1.CreateOptions{}) + }, + + func(ctx context.Context, client clientset.Interface) { + // node object got updated with NodeIPs + _, _ = client.CoreV1().Nodes().Update(ctx, makeNode( + tweakNodeIPs("192.168.1.10"), + ), metav1.UpdateOptions{}) + }, + func(ctx context.Context, client clientset.Interface) { + // node updated with PodCIDRs + _, _ = client.CoreV1().Nodes().Update(ctx, makeNode( + tweakNodeIPs("192.168.1.1"), + tweakPodCIDRs("10.0.0.0/24"), + ), metav1.UpdateOptions{}) + }, + }, + expectedNodeIPs: []net.IP{netutils.ParseIPSloppy("192.168.1.1")}, + expectedPodCIDRs: []string{"10.0.0.0/24"}, + }, + { + name: "watchPodCIDRs and node object exist without NodeIP and with PodCIDR", + watchPodCIDRs: true, + nodeUpdates: []func(ctx context.Context, client clientset.Interface){ + func(ctx context.Context, client clientset.Interface) { + // node object doesn't exist initially + }, + + func(ctx context.Context, client clientset.Interface) { + // node object now exists but without NodeIP + _, _ = client.CoreV1().Nodes().Create(ctx, makeNode(), metav1.CreateOptions{}) + }, + func(ctx context.Context, client clientset.Interface) { + // node updated with PodCIDRs + _, _ = client.CoreV1().Nodes().Update(ctx, makeNode( + tweakPodCIDRs("10.0.0.0/24"), + ), metav1.UpdateOptions{}) + }, + }, + // times out and ignores the error + expectedNodeIPs: nil, + expectedPodCIDRs: []string{"10.0.0.0/24"}, + }, } for _, tc := range testCases { @@ -254,13 +216,14 @@ func TestNewNodeManager(t *testing.T) { } }() // initialize the node manager with 10ms poll interval and 1s poll timeout - nodeManager, err := newNodeManager(ctx, client, time.Second, testNodeName, func(i int) {}, 10*time.Millisecond, time.Second) + nodeManager, err := newNodeManager(ctx, client, time.Second, testNodeName, tc.watchPodCIDRs, func(i int) {}, 10*time.Millisecond, time.Second, time.Second) if len(tc.expectedError) > 0 { require.Nil(t, nodeManager) require.ErrorContains(t, err, tc.expectedError) } else { require.NoError(t, err) require.Equal(t, tc.expectedNodeIPs, nodeManager.NodeIPs()) + require.Equal(t, tc.expectedPodCIDRs, nodeManager.PodCIDRs()) } }) } @@ -270,7 +233,10 @@ func TestNodeManagerOnNodeChange(t *testing.T) { tests := []struct { name string initialNodeIPs []string + initialPodCIDRs []string updatedNodeIPs []string + updatedPodCIDRs []string + watchPodCIDRs bool expectedExitCode *int }{ { @@ -285,6 +251,24 @@ func TestNodeManagerOnNodeChange(t *testing.T) { updatedNodeIPs: []string{"10.0.1.1", "fd00:3:2:1::2"}, expectedExitCode: ptr.To(1), }, + { + name: "watchPodCIDR and node updated with same PodCIDRs", + initialNodeIPs: []string{"192.168.1.1", "fd00:1:2:3::1"}, + initialPodCIDRs: []string{"10.0.0.0/8", "fd01:2345::/64"}, + updatedNodeIPs: []string{"192.168.1.1", "fd00:1:2:3::1"}, + updatedPodCIDRs: []string{"10.0.0.0/8", "fd01:2345::/64"}, + watchPodCIDRs: true, + expectedExitCode: nil, + }, + { + name: "watchPodCIDR and node updated with different PodCIDRs", + initialNodeIPs: []string{"192.168.1.1", "fd00:1:2:3::1"}, + initialPodCIDRs: []string{"10.0.0.0/8", "fd01:2345::/64"}, + updatedNodeIPs: []string{"192.168.1.1", "fd00:1:2:3::1"}, + updatedPodCIDRs: []string{"172.16.10.0/24", "fd01:5422::/64"}, + watchPodCIDRs: true, + expectedExitCode: ptr.To(1), + }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { @@ -295,13 +279,16 @@ func TestNodeManagerOnNodeChange(t *testing.T) { } client := clientsetfake.NewClientset() - _, err := client.CoreV1().Nodes().Create(ctx, makeNode(tweakNodeIPs(tc.initialNodeIPs...)), metav1.CreateOptions{}) + _, err := client.CoreV1().Nodes().Create(ctx, makeNode( + tweakNodeIPs(tc.initialNodeIPs...), + tweakPodCIDRs(tc.initialPodCIDRs...), + ), metav1.CreateOptions{}) require.NoError(t, err) - nodeManager, err := newNodeManager(ctx, client, 30*time.Second, testNodeName, exitFunc, 10*time.Millisecond, time.Second) + nodeManager, err := newNodeManager(ctx, client, 30*time.Second, testNodeName, tc.watchPodCIDRs, exitFunc, 10*time.Millisecond, time.Second, time.Second) require.NoError(t, err) - nodeManager.onNodeChange(makeNode(tweakNodeIPs(tc.updatedNodeIPs...))) + nodeManager.onNodeChange(makeNode(tweakNodeIPs(tc.updatedNodeIPs...), tweakPodCIDRs(tc.updatedPodCIDRs...))) require.Equal(t, tc.expectedExitCode, exitCode) }) } @@ -315,7 +302,7 @@ func TestNodeManagerOnNodeDelete(t *testing.T) { } client := clientsetfake.NewClientset() _, _ = client.CoreV1().Nodes().Create(ctx, makeNode(tweakNodeIPs("192.168.1.1")), metav1.CreateOptions{}) - nodeManager, err := newNodeManager(ctx, client, 30*time.Second, testNodeName, exitFunc, 10*time.Millisecond, time.Second) + nodeManager, err := newNodeManager(ctx, client, 30*time.Second, testNodeName, false, exitFunc, 10*time.Millisecond, time.Second, time.Second) require.NoError(t, err) nodeManager.OnNodeDelete(makeNode())