Merge pull request #136716 from yonizxz/concurrent-node-syncs-split

Split from concurrent-node-syncs a separate flag for node status updates
This commit is contained in:
Kubernetes Prow Robot 2026-02-11 03:00:10 +05:30 committed by GitHub
commit 870e2928bc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 65 additions and 10 deletions

View file

@ -146,6 +146,7 @@ API rule violation: names_match,k8s.io/cloud-provider/config/v1alpha1,KubeCloudS
API rule violation: names_match,k8s.io/cloud-provider/config/v1alpha1,KubeCloudSharedConfiguration,RouteReconciliationPeriod
API rule violation: names_match,k8s.io/cloud-provider/config/v1alpha1,KubeCloudSharedConfiguration,UseServiceAccountCredentials
API rule violation: names_match,k8s.io/cloud-provider/config/v1alpha1,WebhookConfiguration,Webhooks
API rule violation: names_match,k8s.io/cloud-provider/controllers/node/config/v1alpha1,NodeControllerConfiguration,ConcurrentNodeStatusUpdates
API rule violation: names_match,k8s.io/cloud-provider/controllers/node/config/v1alpha1,NodeControllerConfiguration,ConcurrentNodeSyncs
API rule violation: names_match,k8s.io/cloud-provider/controllers/service/config/v1alpha1,ServiceControllerConfiguration,ConcurrentServiceSyncs
API rule violation: names_match,k8s.io/controller-manager/config/v1alpha1,GenericControllerManagerConfiguration,Address

View file

@ -64233,8 +64233,16 @@ func schema_controllers_node_config_v1alpha1_NodeControllerConfiguration(ref com
Format: "int32",
},
},
"ConcurrentNodeStatusUpdates": {
SchemaProps: spec.SchemaProps{
Description: "ConcurrentNodeStatusUpdates is the number of workers concurrently updating node statuses. If unspecified or 0, ConcurrentNodeSyncs is used instead",
Default: 0,
Type: []string{"integer"},
Format: "int32",
},
},
},
Required: []string{"ConcurrentNodeSyncs"},
Required: []string{"ConcurrentNodeSyncs", "ConcurrentNodeStatusUpdates"},
},
},
}

View file

@ -48,6 +48,7 @@ func startCloudNodeController(ctx context.Context, initContext ControllerInitCon
cloud,
completedConfig.ComponentConfig.NodeStatusUpdateFrequency.Duration,
completedConfig.ComponentConfig.NodeController.ConcurrentNodeSyncs,
completedConfig.ComponentConfig.NodeController.ConcurrentNodeStatusUpdates,
)
if err != nil {
klog.Warningf("failed to start cloud node controller: %s", err)

View file

@ -21,4 +21,8 @@ type NodeControllerConfiguration struct {
// ConcurrentNodeSyncs is the number of workers
// concurrently synchronizing nodes
ConcurrentNodeSyncs int32
// ConcurrentNodeStatusUpdates is the number of workers
// concurrently updating node statuses.
// If unspecified or 0, ConcurrentNodeSyncs is used instead
ConcurrentNodeStatusUpdates int32
}

View file

@ -21,4 +21,8 @@ type NodeControllerConfiguration struct {
// ConcurrentNodeSyncs is the number of workers
// concurrently synchronizing nodes
ConcurrentNodeSyncs int32
// ConcurrentNodeStatusUpdates is the number of workers
// concurrently updating node statuses.
// If unspecified or 0, ConcurrentNodeSyncs is used instead
ConcurrentNodeStatusUpdates int32
}

View file

@ -49,10 +49,12 @@ func RegisterConversions(s *runtime.Scheme) error {
func autoConvert_v1alpha1_NodeControllerConfiguration_To_config_NodeControllerConfiguration(in *NodeControllerConfiguration, out *config.NodeControllerConfiguration, s conversion.Scope) error {
out.ConcurrentNodeSyncs = in.ConcurrentNodeSyncs
out.ConcurrentNodeStatusUpdates = in.ConcurrentNodeStatusUpdates
return nil
}
func autoConvert_config_NodeControllerConfiguration_To_v1alpha1_NodeControllerConfiguration(in *config.NodeControllerConfiguration, out *NodeControllerConfiguration, s conversion.Scope) error {
out.ConcurrentNodeSyncs = in.ConcurrentNodeSyncs
out.ConcurrentNodeStatusUpdates = in.ConcurrentNodeStatusUpdates
return nil
}

View file

@ -106,6 +106,7 @@ type CloudNodeController struct {
nodeStatusUpdateFrequency time.Duration
workerCount int32
statusUpdateWorkerCount int32
nodesLister corelisters.NodeLister
nodesSynced cache.InformerSynced
@ -118,7 +119,8 @@ func NewCloudNodeController(
kubeClient clientset.Interface,
cloud cloudprovider.Interface,
nodeStatusUpdateFrequency time.Duration,
workerCount int32) (*CloudNodeController, error) {
workerCount,
statusUpdateWorkerCount int32) (*CloudNodeController, error) {
_, instancesSupported := cloud.Instances()
_, instancesV2Supported := cloud.InstancesV2()
@ -132,6 +134,7 @@ func NewCloudNodeController(
cloud: cloud,
nodeStatusUpdateFrequency: nodeStatusUpdateFrequency,
workerCount: workerCount,
statusUpdateWorkerCount: statusUpdateWorkerCount,
nodesLister: nodeInformer.Lister(),
nodesSynced: nodeInformer.Informer().HasSynced,
workqueue: workqueue.NewTypedRateLimitingQueueWithConfig(
@ -289,7 +292,7 @@ func (cnc *CloudNodeController) UpdateNodeStatus(ctx context.Context) error {
cnc.updateNodeAddress(ctx, node, instanceMetadata)
}
workqueue.ParallelizeUntil(ctx, int(cnc.workerCount), len(nodes), updateNodeFunc)
workqueue.ParallelizeUntil(ctx, int(cnc.statusUpdateWorkerCount), len(nodes), updateNodeFunc)
return nil
}

View file

@ -2788,6 +2788,7 @@ func TestUpdateNodeStatus(t *testing.T) {
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}),
nodeStatusUpdateFrequency: 1 * time.Second,
workerCount: test.workers,
statusUpdateWorkerCount: test.workers,
}
for _, n := range generateNodes(test.nodes) {

View file

@ -36,6 +36,8 @@ func (o *NodeControllerOptions) AddFlags(fs *pflag.FlagSet) {
}
fs.Int32Var(&o.ConcurrentNodeSyncs, "concurrent-node-syncs", o.ConcurrentNodeSyncs, "Number of workers concurrently synchronizing nodes.")
// If concurrent-node-status-updates is not specified it will be derived from concurrent-node-syncs for backwards compatibility
fs.Int32Var(&o.ConcurrentNodeStatusUpdates, "concurrent-node-status-updates", 0, "Number of workers concurrently updating node statuses. If unspecified or 0, --concurrent-node-syncs is used instead")
}
// ApplyTo fills up ServiceController config with options.
@ -45,6 +47,13 @@ func (o *NodeControllerOptions) ApplyTo(cfg *nodeconfig.NodeControllerConfigurat
}
cfg.ConcurrentNodeSyncs = o.ConcurrentNodeSyncs
// Concurrent node status updates used to be derived from the concurrent-node-syncs flag,
// so for backwards compatibility it will default to concurrent-node-syncs if concurrent-node-status-updates is not specified
if o.ConcurrentNodeStatusUpdates > 0 {
cfg.ConcurrentNodeStatusUpdates = o.ConcurrentNodeStatusUpdates
} else {
cfg.ConcurrentNodeStatusUpdates = o.ConcurrentNodeSyncs
}
return nil
}
@ -58,5 +67,8 @@ func (o *NodeControllerOptions) Validate() []error {
if o.ConcurrentNodeSyncs <= 0 {
errors = append(errors, fmt.Errorf("concurrent-node-syncs must be a positive number"))
}
if o.ConcurrentNodeStatusUpdates < 0 {
errors = append(errors, fmt.Errorf("concurrent-node-status-updates cannot be a negative number"))
}
return errors
}

View file

@ -35,7 +35,7 @@ func errSliceEq(a []error, b []error) bool {
return true
}
func TestNodeControllerConcurrentNodeSyncsValidation(t *testing.T) {
func TestNodeControllerValidation(t *testing.T) {
testCases := []struct {
desc string
input *NodeControllerOptions
@ -45,19 +45,28 @@ func TestNodeControllerConcurrentNodeSyncsValidation(t *testing.T) {
desc: "empty options",
},
{
desc: "negative value",
desc: "concurrent-node-syncs negative value",
input: &NodeControllerOptions{NodeControllerConfiguration: &nodeconfig.NodeControllerConfiguration{ConcurrentNodeSyncs: -5}},
expect: []error{fmt.Errorf("concurrent-node-syncs must be a positive number")},
},
{
desc: "zero value",
desc: "concurrent-node-syncs zero value",
input: &NodeControllerOptions{NodeControllerConfiguration: &nodeconfig.NodeControllerConfiguration{ConcurrentNodeSyncs: 0}},
expect: []error{fmt.Errorf("concurrent-node-syncs must be a positive number")},
},
{
desc: "positive value",
desc: "concurrent-node-syncs positive value, concurrent-node-status-updates default 0",
input: &NodeControllerOptions{NodeControllerConfiguration: &nodeconfig.NodeControllerConfiguration{ConcurrentNodeSyncs: 5}},
},
{
desc: "concurrent-node-syncs positive value, concurrent-node-status-updates positive value",
input: &NodeControllerOptions{NodeControllerConfiguration: &nodeconfig.NodeControllerConfiguration{ConcurrentNodeSyncs: 5, ConcurrentNodeStatusUpdates: 5}},
},
{
desc: "concurrent-node-syncs positive value, concurrent-node-status-updates negative value",
input: &NodeControllerOptions{NodeControllerConfiguration: &nodeconfig.NodeControllerConfiguration{ConcurrentNodeSyncs: 5, ConcurrentNodeStatusUpdates: -1}},
expect: []error{fmt.Errorf("concurrent-node-status-updates cannot be a negative number")},
},
}
for _, tc := range testCases {
got := tc.input.Validate()

View file

@ -242,7 +242,9 @@ func (o *CloudControllerManagerOptions) ApplyTo(c *config.Config, allControllers
// sync back to component config
// TODO: find more elegant way than syncing back the values.
c.ComponentConfig.NodeStatusUpdateFrequency = o.NodeStatusUpdateFrequency
c.ComponentConfig.NodeController.ConcurrentNodeSyncs = o.NodeController.ConcurrentNodeSyncs
if err = o.NodeController.ApplyTo(&c.ComponentConfig.NodeController); err != nil {
return err
}
return nil
}

View file

@ -423,7 +423,11 @@ func TestCreateConfig(t *testing.T) {
ServiceController: serviceconfig.ServiceControllerConfiguration{
ConcurrentServiceSyncs: 1,
},
NodeController: nodeconfig.NodeControllerConfiguration{ConcurrentNodeSyncs: 1},
NodeController: nodeconfig.NodeControllerConfiguration{
ConcurrentNodeSyncs: 1,
// ConcurrentNodeStatusUpdates should default to the value of ConcurrentNodeSyncs only at the stage of config creation
ConcurrentNodeStatusUpdates: 1,
},
NodeStatusUpdateFrequency: metav1.Duration{Duration: 10 * time.Minute},
Webhook: cpconfig.WebhookConfiguration{
Webhooks: []string{"foo", "bar", "-baz"},
@ -491,6 +495,7 @@ func TestCreateConfigWithoutWebHooks(t *testing.T) {
"--controller-start-interval=2m",
"--controllers=foo,bar",
"--concurrent-node-syncs=1",
"--concurrent-node-status-updates=2",
"--http2-max-streams-per-connection=47",
"--kube-api-burst=101",
"--kube-api-content-type=application/vnd.kubernetes.protobuf",
@ -564,7 +569,10 @@ func TestCreateConfigWithoutWebHooks(t *testing.T) {
ServiceController: serviceconfig.ServiceControllerConfiguration{
ConcurrentServiceSyncs: 1,
},
NodeController: nodeconfig.NodeControllerConfiguration{ConcurrentNodeSyncs: 1},
NodeController: nodeconfig.NodeControllerConfiguration{
ConcurrentNodeSyncs: 1,
ConcurrentNodeStatusUpdates: 2,
},
NodeStatusUpdateFrequency: metav1.Duration{Duration: 10 * time.Minute},
Webhook: cpconfig.WebhookConfiguration{},
},