From 0dbf8667ccf46d37f012318dfa45dcea49a62bf0 Mon Sep 17 00:00:00 2001 From: Jonathan Yaniv Date: Tue, 3 Feb 2026 10:54:58 +0000 Subject: [PATCH] Split from concurrent-node-syncs a separate flag for node status updates --- api/api-rules/violation_exceptions.list | 1 + pkg/generated/openapi/zz_generated.openapi.go | 10 +++++++++- staging/src/k8s.io/cloud-provider/app/core.go | 1 + .../controllers/node/config/types.go | 4 ++++ .../controllers/node/config/v1alpha1/types.go | 4 ++++ .../config/v1alpha1/zz_generated.conversion.go | 2 ++ .../controllers/node/node_controller.go | 7 +++++-- .../controllers/node/node_controller_test.go | 1 + .../cloud-provider/options/nodecontroller.go | 12 ++++++++++++ .../options/nodecontroller_test.go | 17 +++++++++++++---- .../k8s.io/cloud-provider/options/options.go | 4 +++- .../cloud-provider/options/options_test.go | 12 ++++++++++-- 12 files changed, 65 insertions(+), 10 deletions(-) diff --git a/api/api-rules/violation_exceptions.list b/api/api-rules/violation_exceptions.list index 3cbd85ab338..b5e5f6c75e3 100644 --- a/api/api-rules/violation_exceptions.list +++ b/api/api-rules/violation_exceptions.list @@ -146,6 +146,7 @@ API rule violation: names_match,k8s.io/cloud-provider/config/v1alpha1,KubeCloudS API rule violation: names_match,k8s.io/cloud-provider/config/v1alpha1,KubeCloudSharedConfiguration,RouteReconciliationPeriod API rule violation: names_match,k8s.io/cloud-provider/config/v1alpha1,KubeCloudSharedConfiguration,UseServiceAccountCredentials API rule violation: names_match,k8s.io/cloud-provider/config/v1alpha1,WebhookConfiguration,Webhooks +API rule violation: names_match,k8s.io/cloud-provider/controllers/node/config/v1alpha1,NodeControllerConfiguration,ConcurrentNodeStatusUpdates API rule violation: names_match,k8s.io/cloud-provider/controllers/node/config/v1alpha1,NodeControllerConfiguration,ConcurrentNodeSyncs API rule violation: names_match,k8s.io/cloud-provider/controllers/service/config/v1alpha1,ServiceControllerConfiguration,ConcurrentServiceSyncs API rule violation: names_match,k8s.io/controller-manager/config/v1alpha1,GenericControllerManagerConfiguration,Address diff --git a/pkg/generated/openapi/zz_generated.openapi.go b/pkg/generated/openapi/zz_generated.openapi.go index 805409848bf..4e4fbf94da5 100644 --- a/pkg/generated/openapi/zz_generated.openapi.go +++ b/pkg/generated/openapi/zz_generated.openapi.go @@ -64235,8 +64235,16 @@ func schema_controllers_node_config_v1alpha1_NodeControllerConfiguration(ref com Format: "int32", }, }, + "ConcurrentNodeStatusUpdates": { + SchemaProps: spec.SchemaProps{ + Description: "ConcurrentNodeStatusUpdates is the number of workers concurrently updating node statuses. If unspecified or 0, ConcurrentNodeSyncs is used instead", + Default: 0, + Type: []string{"integer"}, + Format: "int32", + }, + }, }, - Required: []string{"ConcurrentNodeSyncs"}, + Required: []string{"ConcurrentNodeSyncs", "ConcurrentNodeStatusUpdates"}, }, }, } diff --git a/staging/src/k8s.io/cloud-provider/app/core.go b/staging/src/k8s.io/cloud-provider/app/core.go index f1a324e3ff0..3acbb4e8c19 100644 --- a/staging/src/k8s.io/cloud-provider/app/core.go +++ b/staging/src/k8s.io/cloud-provider/app/core.go @@ -48,6 +48,7 @@ func startCloudNodeController(ctx context.Context, initContext ControllerInitCon cloud, completedConfig.ComponentConfig.NodeStatusUpdateFrequency.Duration, completedConfig.ComponentConfig.NodeController.ConcurrentNodeSyncs, + completedConfig.ComponentConfig.NodeController.ConcurrentNodeStatusUpdates, ) if err != nil { klog.Warningf("failed to start cloud node controller: %s", err) diff --git a/staging/src/k8s.io/cloud-provider/controllers/node/config/types.go b/staging/src/k8s.io/cloud-provider/controllers/node/config/types.go index af7c7880351..1879de834d6 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/node/config/types.go +++ b/staging/src/k8s.io/cloud-provider/controllers/node/config/types.go @@ -21,4 +21,8 @@ type NodeControllerConfiguration struct { // ConcurrentNodeSyncs is the number of workers // concurrently synchronizing nodes ConcurrentNodeSyncs int32 + // ConcurrentNodeStatusUpdates is the number of workers + // concurrently updating node statuses. + // If unspecified or 0, ConcurrentNodeSyncs is used instead + ConcurrentNodeStatusUpdates int32 } diff --git a/staging/src/k8s.io/cloud-provider/controllers/node/config/v1alpha1/types.go b/staging/src/k8s.io/cloud-provider/controllers/node/config/v1alpha1/types.go index ca85a557cbf..0c9234fbc02 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/node/config/v1alpha1/types.go +++ b/staging/src/k8s.io/cloud-provider/controllers/node/config/v1alpha1/types.go @@ -21,4 +21,8 @@ type NodeControllerConfiguration struct { // ConcurrentNodeSyncs is the number of workers // concurrently synchronizing nodes ConcurrentNodeSyncs int32 + // ConcurrentNodeStatusUpdates is the number of workers + // concurrently updating node statuses. + // If unspecified or 0, ConcurrentNodeSyncs is used instead + ConcurrentNodeStatusUpdates int32 } diff --git a/staging/src/k8s.io/cloud-provider/controllers/node/config/v1alpha1/zz_generated.conversion.go b/staging/src/k8s.io/cloud-provider/controllers/node/config/v1alpha1/zz_generated.conversion.go index cf3852db294..89ccc8833aa 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/node/config/v1alpha1/zz_generated.conversion.go +++ b/staging/src/k8s.io/cloud-provider/controllers/node/config/v1alpha1/zz_generated.conversion.go @@ -49,10 +49,12 @@ func RegisterConversions(s *runtime.Scheme) error { func autoConvert_v1alpha1_NodeControllerConfiguration_To_config_NodeControllerConfiguration(in *NodeControllerConfiguration, out *config.NodeControllerConfiguration, s conversion.Scope) error { out.ConcurrentNodeSyncs = in.ConcurrentNodeSyncs + out.ConcurrentNodeStatusUpdates = in.ConcurrentNodeStatusUpdates return nil } func autoConvert_config_NodeControllerConfiguration_To_v1alpha1_NodeControllerConfiguration(in *config.NodeControllerConfiguration, out *NodeControllerConfiguration, s conversion.Scope) error { out.ConcurrentNodeSyncs = in.ConcurrentNodeSyncs + out.ConcurrentNodeStatusUpdates = in.ConcurrentNodeStatusUpdates return nil } diff --git a/staging/src/k8s.io/cloud-provider/controllers/node/node_controller.go b/staging/src/k8s.io/cloud-provider/controllers/node/node_controller.go index a1fa02b7d8d..66f7ed17a0d 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/node/node_controller.go +++ b/staging/src/k8s.io/cloud-provider/controllers/node/node_controller.go @@ -106,6 +106,7 @@ type CloudNodeController struct { nodeStatusUpdateFrequency time.Duration workerCount int32 + statusUpdateWorkerCount int32 nodesLister corelisters.NodeLister nodesSynced cache.InformerSynced @@ -118,7 +119,8 @@ func NewCloudNodeController( kubeClient clientset.Interface, cloud cloudprovider.Interface, nodeStatusUpdateFrequency time.Duration, - workerCount int32) (*CloudNodeController, error) { + workerCount, + statusUpdateWorkerCount int32) (*CloudNodeController, error) { _, instancesSupported := cloud.Instances() _, instancesV2Supported := cloud.InstancesV2() @@ -132,6 +134,7 @@ func NewCloudNodeController( cloud: cloud, nodeStatusUpdateFrequency: nodeStatusUpdateFrequency, workerCount: workerCount, + statusUpdateWorkerCount: statusUpdateWorkerCount, nodesLister: nodeInformer.Lister(), nodesSynced: nodeInformer.Informer().HasSynced, workqueue: workqueue.NewTypedRateLimitingQueueWithConfig( @@ -289,7 +292,7 @@ func (cnc *CloudNodeController) UpdateNodeStatus(ctx context.Context) error { cnc.updateNodeAddress(ctx, node, instanceMetadata) } - workqueue.ParallelizeUntil(ctx, int(cnc.workerCount), len(nodes), updateNodeFunc) + workqueue.ParallelizeUntil(ctx, int(cnc.statusUpdateWorkerCount), len(nodes), updateNodeFunc) return nil } diff --git a/staging/src/k8s.io/cloud-provider/controllers/node/node_controller_test.go b/staging/src/k8s.io/cloud-provider/controllers/node/node_controller_test.go index 03b211a06ac..db1947a2b0e 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/node/node_controller_test.go +++ b/staging/src/k8s.io/cloud-provider/controllers/node/node_controller_test.go @@ -2788,6 +2788,7 @@ func TestUpdateNodeStatus(t *testing.T) { recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}), nodeStatusUpdateFrequency: 1 * time.Second, workerCount: test.workers, + statusUpdateWorkerCount: test.workers, } for _, n := range generateNodes(test.nodes) { diff --git a/staging/src/k8s.io/cloud-provider/options/nodecontroller.go b/staging/src/k8s.io/cloud-provider/options/nodecontroller.go index 4823c437b16..9573276db03 100644 --- a/staging/src/k8s.io/cloud-provider/options/nodecontroller.go +++ b/staging/src/k8s.io/cloud-provider/options/nodecontroller.go @@ -36,6 +36,8 @@ func (o *NodeControllerOptions) AddFlags(fs *pflag.FlagSet) { } fs.Int32Var(&o.ConcurrentNodeSyncs, "concurrent-node-syncs", o.ConcurrentNodeSyncs, "Number of workers concurrently synchronizing nodes.") + // If concurrent-node-status-updates is not specified it will be derived from concurrent-node-syncs for backwards compatibility + fs.Int32Var(&o.ConcurrentNodeStatusUpdates, "concurrent-node-status-updates", 0, "Number of workers concurrently updating node statuses. If unspecified or 0, --concurrent-node-syncs is used instead") } // ApplyTo fills up ServiceController config with options. @@ -45,6 +47,13 @@ func (o *NodeControllerOptions) ApplyTo(cfg *nodeconfig.NodeControllerConfigurat } cfg.ConcurrentNodeSyncs = o.ConcurrentNodeSyncs + // Concurrent node status updates used to be derived from the concurrent-node-syncs flag, + // so for backwards compatibility it will default to concurrent-node-syncs if concurrent-node-status-updates is not specified + if o.ConcurrentNodeStatusUpdates > 0 { + cfg.ConcurrentNodeStatusUpdates = o.ConcurrentNodeStatusUpdates + } else { + cfg.ConcurrentNodeStatusUpdates = o.ConcurrentNodeSyncs + } return nil } @@ -58,5 +67,8 @@ func (o *NodeControllerOptions) Validate() []error { if o.ConcurrentNodeSyncs <= 0 { errors = append(errors, fmt.Errorf("concurrent-node-syncs must be a positive number")) } + if o.ConcurrentNodeStatusUpdates < 0 { + errors = append(errors, fmt.Errorf("concurrent-node-status-updates cannot be a negative number")) + } return errors } diff --git a/staging/src/k8s.io/cloud-provider/options/nodecontroller_test.go b/staging/src/k8s.io/cloud-provider/options/nodecontroller_test.go index 6c024a9dabe..73ba895573c 100644 --- a/staging/src/k8s.io/cloud-provider/options/nodecontroller_test.go +++ b/staging/src/k8s.io/cloud-provider/options/nodecontroller_test.go @@ -35,7 +35,7 @@ func errSliceEq(a []error, b []error) bool { return true } -func TestNodeControllerConcurrentNodeSyncsValidation(t *testing.T) { +func TestNodeControllerValidation(t *testing.T) { testCases := []struct { desc string input *NodeControllerOptions @@ -45,19 +45,28 @@ func TestNodeControllerConcurrentNodeSyncsValidation(t *testing.T) { desc: "empty options", }, { - desc: "negative value", + desc: "concurrent-node-syncs negative value", input: &NodeControllerOptions{NodeControllerConfiguration: &nodeconfig.NodeControllerConfiguration{ConcurrentNodeSyncs: -5}}, expect: []error{fmt.Errorf("concurrent-node-syncs must be a positive number")}, }, { - desc: "zero value", + desc: "concurrent-node-syncs zero value", input: &NodeControllerOptions{NodeControllerConfiguration: &nodeconfig.NodeControllerConfiguration{ConcurrentNodeSyncs: 0}}, expect: []error{fmt.Errorf("concurrent-node-syncs must be a positive number")}, }, { - desc: "positive value", + desc: "concurrent-node-syncs positive value, concurrent-node-status-updates default 0", input: &NodeControllerOptions{NodeControllerConfiguration: &nodeconfig.NodeControllerConfiguration{ConcurrentNodeSyncs: 5}}, }, + { + desc: "concurrent-node-syncs positive value, concurrent-node-status-updates positive value", + input: &NodeControllerOptions{NodeControllerConfiguration: &nodeconfig.NodeControllerConfiguration{ConcurrentNodeSyncs: 5, ConcurrentNodeStatusUpdates: 5}}, + }, + { + desc: "concurrent-node-syncs positive value, concurrent-node-status-updates negative value", + input: &NodeControllerOptions{NodeControllerConfiguration: &nodeconfig.NodeControllerConfiguration{ConcurrentNodeSyncs: 5, ConcurrentNodeStatusUpdates: -1}}, + expect: []error{fmt.Errorf("concurrent-node-status-updates cannot be a negative number")}, + }, } for _, tc := range testCases { got := tc.input.Validate() diff --git a/staging/src/k8s.io/cloud-provider/options/options.go b/staging/src/k8s.io/cloud-provider/options/options.go index 0534f9f614b..2f24aab31d1 100644 --- a/staging/src/k8s.io/cloud-provider/options/options.go +++ b/staging/src/k8s.io/cloud-provider/options/options.go @@ -242,7 +242,9 @@ func (o *CloudControllerManagerOptions) ApplyTo(c *config.Config, allControllers // sync back to component config // TODO: find more elegant way than syncing back the values. c.ComponentConfig.NodeStatusUpdateFrequency = o.NodeStatusUpdateFrequency - c.ComponentConfig.NodeController.ConcurrentNodeSyncs = o.NodeController.ConcurrentNodeSyncs + if err = o.NodeController.ApplyTo(&c.ComponentConfig.NodeController); err != nil { + return err + } return nil } diff --git a/staging/src/k8s.io/cloud-provider/options/options_test.go b/staging/src/k8s.io/cloud-provider/options/options_test.go index 8cafb4518ab..90d30dbf47a 100644 --- a/staging/src/k8s.io/cloud-provider/options/options_test.go +++ b/staging/src/k8s.io/cloud-provider/options/options_test.go @@ -423,7 +423,11 @@ func TestCreateConfig(t *testing.T) { ServiceController: serviceconfig.ServiceControllerConfiguration{ ConcurrentServiceSyncs: 1, }, - NodeController: nodeconfig.NodeControllerConfiguration{ConcurrentNodeSyncs: 1}, + NodeController: nodeconfig.NodeControllerConfiguration{ + ConcurrentNodeSyncs: 1, + // ConcurrentNodeStatusUpdates should default to the value of ConcurrentNodeSyncs only at the stage of config creation + ConcurrentNodeStatusUpdates: 1, + }, NodeStatusUpdateFrequency: metav1.Duration{Duration: 10 * time.Minute}, Webhook: cpconfig.WebhookConfiguration{ Webhooks: []string{"foo", "bar", "-baz"}, @@ -491,6 +495,7 @@ func TestCreateConfigWithoutWebHooks(t *testing.T) { "--controller-start-interval=2m", "--controllers=foo,bar", "--concurrent-node-syncs=1", + "--concurrent-node-status-updates=2", "--http2-max-streams-per-connection=47", "--kube-api-burst=101", "--kube-api-content-type=application/vnd.kubernetes.protobuf", @@ -564,7 +569,10 @@ func TestCreateConfigWithoutWebHooks(t *testing.T) { ServiceController: serviceconfig.ServiceControllerConfiguration{ ConcurrentServiceSyncs: 1, }, - NodeController: nodeconfig.NodeControllerConfiguration{ConcurrentNodeSyncs: 1}, + NodeController: nodeconfig.NodeControllerConfiguration{ + ConcurrentNodeSyncs: 1, + ConcurrentNodeStatusUpdates: 2, + }, NodeStatusUpdateFrequency: metav1.Duration{Duration: 10 * time.Minute}, Webhook: cpconfig.WebhookConfiguration{}, },