From edf132e1755368085783df7f7ef37b919e6ff173 Mon Sep 17 00:00:00 2001 From: KunWuLuan Date: Wed, 20 Aug 2025 17:44:51 +0800 Subject: [PATCH 1/6] use indexer to acclerate volume limit plugin --- .../framework/plugins/nodevolumelimits/csi.go | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go b/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go index 9bb7843eb8c..ab7129504a1 100644 --- a/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go +++ b/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go @@ -23,11 +23,11 @@ import ( v1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/rand" corelisters "k8s.io/client-go/listers/core/v1" storagelisters "k8s.io/client-go/listers/storage/v1" + "k8s.io/client-go/tools/cache" ephemeral "k8s.io/component-helpers/storage/ephemeral" storagehelpers "k8s.io/component-helpers/storage/volume" csitrans "k8s.io/csi-translation-lib" @@ -63,6 +63,7 @@ type CSILimits struct { scLister storagelisters.StorageClassLister vaLister storagelisters.VolumeAttachmentLister csiDriverLister storagelisters.CSIDriverLister + vaindexer cache.Indexer randomVolumeIDPrefix string enableVolumeLimitScaling bool @@ -590,6 +591,14 @@ func NewCSI(_ context.Context, _ runtime.Object, handle fwk.Handle, fts feature. scLister := informerFactory.Storage().V1().StorageClasses().Lister() vaLister := informerFactory.Storage().V1().VolumeAttachments().Lister() csiDriverLister := informerFactory.Storage().V1().CSIDrivers().Lister() + vaindexer := informerFactory.Storage().V1().VolumeAttachments().Informer().GetIndexer() + informerFactory.Storage().V1().VolumeAttachments().Informer().AddIndexers(cache.Indexers{"nodename": func(obj interface{}) ([]string, error) { + va, ok := obj.(*storagev1.VolumeAttachment) + if !ok { + return []string{}, nil + } + return []string{va.Spec.NodeName}, nil + }}) csiTranslator := csitrans.New() return &CSILimits{ @@ -602,6 +611,7 @@ func NewCSI(_ context.Context, _ runtime.Object, handle fwk.Handle, fts feature. enableVolumeLimitScaling: fts.EnableVolumeLimitScaling, randomVolumeIDPrefix: rand.String(32), translator: csiTranslator, + vaindexer: vaindexer, }, nil } @@ -624,11 +634,15 @@ func getVolumeLimits(csiNode *storagev1.CSINode) map[string]int64 { // getNodeVolumeAttachmentInfo returns a map of volumeID to driver name for the given node. func (pl *CSILimits) getNodeVolumeAttachmentInfo(logger klog.Logger, nodeName string) (map[string]string, error) { volumeAttachments := make(map[string]string) - vas, err := pl.vaLister.List(labels.Everything()) + vas, err := pl.vaindexer.ByIndex("nodename", nodeName) if err != nil { return nil, err } - for _, va := range vas { + for _, vao := range vas { + va, ok := vao.(*storagev1.VolumeAttachment) + if !ok { + continue + } if va.Spec.NodeName == nodeName { if va.Spec.Attacher == "" { logger.V(5).Info("VolumeAttachment has no attacher", "VolumeAttachment", klog.KObj(va)) From dd767cdcd6752c4f6a4fac85cf0b5ef17877ac6e Mon Sep 17 00:00:00 2001 From: KunWuLuan Date: Thu, 21 Aug 2025 09:27:11 +0800 Subject: [PATCH 2/6] add err check when plugin init --- pkg/scheduler/framework/plugins/nodevolumelimits/csi.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go b/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go index ab7129504a1..055bece343c 100644 --- a/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go +++ b/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go @@ -592,13 +592,15 @@ func NewCSI(_ context.Context, _ runtime.Object, handle fwk.Handle, fts feature. vaLister := informerFactory.Storage().V1().VolumeAttachments().Lister() csiDriverLister := informerFactory.Storage().V1().CSIDrivers().Lister() vaindexer := informerFactory.Storage().V1().VolumeAttachments().Informer().GetIndexer() - informerFactory.Storage().V1().VolumeAttachments().Informer().AddIndexers(cache.Indexers{"nodename": func(obj interface{}) ([]string, error) { + if err := informerFactory.Storage().V1().VolumeAttachments().Informer().AddIndexers(cache.Indexers{"nodename": func(obj interface{}) ([]string, error) { va, ok := obj.(*storagev1.VolumeAttachment) if !ok { return []string{}, nil } return []string{va.Spec.NodeName}, nil - }}) + }}); err != nil { + return nil, fmt.Errorf("failed to add index to VA informer: %w", err) + } csiTranslator := csitrans.New() return &CSILimits{ From 6560d3b1a8d78a5398270a9672d2694cd7962031 Mon Sep 17 00:00:00 2001 From: KunWuLuan Date: Fri, 22 Aug 2025 16:46:19 +0800 Subject: [PATCH 3/6] fix unit test Signed-off-by: KunWuLuan --- .../framework/plugins/nodevolumelimits/csi.go | 11 +++-- .../plugins/nodevolumelimits/csi_test.go | 44 ++++++++++++++++--- 2 files changed, 47 insertions(+), 8 deletions(-) diff --git a/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go b/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go index 055bece343c..eb76176515b 100644 --- a/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go +++ b/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go @@ -19,6 +19,7 @@ package nodevolumelimits import ( "context" "fmt" + "strings" v1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" @@ -592,14 +593,16 @@ func NewCSI(_ context.Context, _ runtime.Object, handle fwk.Handle, fts feature. vaLister := informerFactory.Storage().V1().VolumeAttachments().Lister() csiDriverLister := informerFactory.Storage().V1().CSIDrivers().Lister() vaindexer := informerFactory.Storage().V1().VolumeAttachments().Informer().GetIndexer() - if err := informerFactory.Storage().V1().VolumeAttachments().Informer().AddIndexers(cache.Indexers{"nodename": func(obj interface{}) ([]string, error) { + if err := informerFactory.Storage().V1().VolumeAttachments().Informer().AddIndexers(cache.Indexers{vaIndexKey: func(obj interface{}) ([]string, error) { va, ok := obj.(*storagev1.VolumeAttachment) if !ok { return []string{}, nil } return []string{va.Spec.NodeName}, nil }}); err != nil { - return nil, fmt.Errorf("failed to add index to VA informer: %w", err) + if !strings.HasPrefix(err.Error(), "indexer conflict") { + return nil, fmt.Errorf("failed to add index to VA informer: %w", err) + } } csiTranslator := csitrans.New() @@ -633,10 +636,12 @@ func getVolumeLimits(csiNode *storagev1.CSINode) map[string]int64 { return nodeVolumeLimits } +const vaIndexKey = "va.spec.nodename" + // getNodeVolumeAttachmentInfo returns a map of volumeID to driver name for the given node. func (pl *CSILimits) getNodeVolumeAttachmentInfo(logger klog.Logger, nodeName string) (map[string]string, error) { volumeAttachments := make(map[string]string) - vas, err := pl.vaindexer.ByIndex("nodename", nodeName) + vas, err := pl.vaindexer.ByIndex(vaIndexKey, nodeName) if err != nil { return nil, err } diff --git a/pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go b/pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go index dd0a9eb6c21..c4cb91869ca 100644 --- a/pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go +++ b/pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go @@ -17,6 +17,7 @@ limitations under the License. package nodevolumelimits import ( + "context" "errors" "fmt" "strings" @@ -29,9 +30,13 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" csitrans "k8s.io/csi-translation-lib" csilibplugins "k8s.io/csi-translation-lib/plugins" fwk "k8s.io/kube-scheduler/framework" @@ -641,12 +646,26 @@ func TestCSILimits(t *testing.T) { enableMigrationOnNode(csiNode, csilibplugins.AWSEBSInTreePluginName) } csiTranslator := csitrans.New() + vas := getFakeVolumeAttachmentLister(test.vaCount, test.driverNames...) + fakecli := fake.NewClientset(vas...) + informerfactory := informers.NewSharedInformerFactory(fakecli, 0) + if err := informerfactory.Storage().V1().VolumeAttachments().Informer().AddIndexers(cache.Indexers{vaIndexKey: func(obj interface{}) ([]string, error) { + va, ok := obj.(*storagev1.VolumeAttachment) + if !ok { + return []string{}, nil + } + return []string{va.Spec.NodeName}, nil + }}); err != nil { + t.Error(err) + } + informerfactory.Start(context.TODO().Done()) + informerfactory.WaitForCacheSync(context.TODO().Done()) p := &CSILimits{ csiManager: NewCSIManager(getFakeCSINodeLister(csiNode)), pvLister: getFakeCSIPVLister(test.filterName, test.driverNames...), pvcLister: append(getFakeCSIPVCLister(test.filterName, scName, test.driverNames...), test.extraClaims...), scLister: getFakeCSIStorageClassLister(scName, test.driverNames[0]), - vaLister: getFakeVolumeAttachmentLister(test.vaCount, test.driverNames...), + vaindexer: informerfactory.Storage().V1().VolumeAttachments().Informer().GetIndexer(), randomVolumeIDPrefix: rand.String(32), translator: csiTranslator, } @@ -1074,8 +1093,8 @@ func TestCSILimitsAfterCSINodeUpdatedQHint(t *testing.T) { } } -func getFakeVolumeAttachmentLister(count int, driverNames ...string) tf.VolumeAttachmentLister { - vaLister := tf.VolumeAttachmentLister{} +func getFakeVolumeAttachmentLister(count int, driverNames ...string) []runtime.Object { + vaLister := []runtime.Object{} for _, driver := range driverNames { for j := 0; j < count; j++ { pvName := fmt.Sprintf("csi-%s-%d", driver, j) @@ -1091,7 +1110,7 @@ func getFakeVolumeAttachmentLister(count int, driverNames ...string) tf.VolumeAt }, }, } - vaLister = append(vaLister, va) + vaLister = append(vaLister, &va) } } return vaLister @@ -1324,6 +1343,20 @@ func TestVolumeLimitScalingGate(t *testing.T) { for _, tt := range cases { t.Run(tt.name, func(t *testing.T) { node, csiNode := getNodeWithPodAndVolumeLimits(tt.limitSource, []*v1.Pod{}, tt.limit, ebsCSIDriverName) + vas := getFakeVolumeAttachmentLister(0, ebsCSIDriverName) + fakecli := fake.NewClientset(vas...) + informerfactory := informers.NewSharedInformerFactory(fakecli, 0) + if err := informerfactory.Storage().V1().VolumeAttachments().Informer().AddIndexers(cache.Indexers{vaIndexKey: func(obj interface{}) ([]string, error) { + va, ok := obj.(*storagev1.VolumeAttachment) + if !ok { + return []string{}, nil + } + return []string{va.Spec.NodeName}, nil + }}); err != nil { + t.Error(err) + } + informerfactory.Start(context.TODO().Done()) + informerfactory.WaitForCacheSync(context.TODO().Done()) csiTranslator := csitrans.New() p := &CSILimits{ @@ -1331,7 +1364,8 @@ func TestVolumeLimitScalingGate(t *testing.T) { pvLister: getFakeCSIPVLister("csi", ebsCSIDriverName), pvcLister: getFakeCSIPVCLister("csi", scName, ebsCSIDriverName), scLister: getFakeCSIStorageClassLister(scName, ebsCSIDriverName), - vaLister: getFakeVolumeAttachmentLister(0, ebsCSIDriverName), + vaLister: informerfactory.Storage().V1().VolumeAttachments().Lister(), + vaindexer: informerfactory.Storage().V1().VolumeAttachments().Informer().GetIndexer(), csiDriverLister: func() fakeCSIDriverLister { if tt.csiDriverPresent { return getFakeCSIDriverLister(ebsCSIDriverName) From 4b4fa73c1f28ad0657b0b34e78a72b1e8ba2039f Mon Sep 17 00:00:00 2001 From: KunWuLuan Date: Thu, 18 Dec 2025 20:17:30 +0800 Subject: [PATCH 4/6] refactor: rename vaindexer to vaIndexer and improve error handling in CSI volume limits plugin This commit includes the following changes: 1. Rename 'vaindexer' variable to 'vaIndexer' for better readability and consistency 2. Add proper error handling with utilruntime.HandleError when encountering unexpected object types in volume attachment indexer 3. Update test files to use ctx from ktesting.NewTestContext instead of context.TODO() 4. Remove unused 'strings' import from csi.go 5. Improve indexer conflict checking logic in CSI limits plugin initialization These changes enhance code clarity and robustness in the node volume limits plugin for CSI. Signed-off-by: KunWuLuan --- .../framework/plugins/nodevolumelimits/csi.go | 14 ++++++++------ .../plugins/nodevolumelimits/csi_test.go | 17 ++++++++--------- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go b/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go index eb76176515b..c4b3d8c6a08 100644 --- a/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go +++ b/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go @@ -19,13 +19,13 @@ package nodevolumelimits import ( "context" "fmt" - "strings" v1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/rand" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" corelisters "k8s.io/client-go/listers/core/v1" storagelisters "k8s.io/client-go/listers/storage/v1" "k8s.io/client-go/tools/cache" @@ -64,7 +64,7 @@ type CSILimits struct { scLister storagelisters.StorageClassLister vaLister storagelisters.VolumeAttachmentLister csiDriverLister storagelisters.CSIDriverLister - vaindexer cache.Indexer + vaIndexer cache.Indexer randomVolumeIDPrefix string enableVolumeLimitScaling bool @@ -592,7 +592,7 @@ func NewCSI(_ context.Context, _ runtime.Object, handle fwk.Handle, fts feature. scLister := informerFactory.Storage().V1().StorageClasses().Lister() vaLister := informerFactory.Storage().V1().VolumeAttachments().Lister() csiDriverLister := informerFactory.Storage().V1().CSIDrivers().Lister() - vaindexer := informerFactory.Storage().V1().VolumeAttachments().Informer().GetIndexer() + vaIndexer := informerFactory.Storage().V1().VolumeAttachments().Informer().GetIndexer() if err := informerFactory.Storage().V1().VolumeAttachments().Informer().AddIndexers(cache.Indexers{vaIndexKey: func(obj interface{}) ([]string, error) { va, ok := obj.(*storagev1.VolumeAttachment) if !ok { @@ -600,7 +600,8 @@ func NewCSI(_ context.Context, _ runtime.Object, handle fwk.Handle, fts feature. } return []string{va.Spec.NodeName}, nil }}); err != nil { - if !strings.HasPrefix(err.Error(), "indexer conflict") { + vaInformer := informerFactory.Storage().V1().VolumeAttachments().Informer() + if vaInformer.GetIndexer().GetIndexers()[vaIndexKey] == nil { return nil, fmt.Errorf("failed to add index to VA informer: %w", err) } } @@ -616,7 +617,7 @@ func NewCSI(_ context.Context, _ runtime.Object, handle fwk.Handle, fts feature. enableVolumeLimitScaling: fts.EnableVolumeLimitScaling, randomVolumeIDPrefix: rand.String(32), translator: csiTranslator, - vaindexer: vaindexer, + vaIndexer: vaIndexer, }, nil } @@ -641,13 +642,14 @@ const vaIndexKey = "va.spec.nodename" // getNodeVolumeAttachmentInfo returns a map of volumeID to driver name for the given node. func (pl *CSILimits) getNodeVolumeAttachmentInfo(logger klog.Logger, nodeName string) (map[string]string, error) { volumeAttachments := make(map[string]string) - vas, err := pl.vaindexer.ByIndex(vaIndexKey, nodeName) + vas, err := pl.vaIndexer.ByIndex(vaIndexKey, nodeName) if err != nil { return nil, err } for _, vao := range vas { va, ok := vao.(*storagev1.VolumeAttachment) if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type in volume attachment indexer: %v", vao)) continue } if va.Spec.NodeName == nodeName { diff --git a/pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go b/pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go index c4cb91869ca..12f69b96c9b 100644 --- a/pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go +++ b/pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go @@ -17,7 +17,6 @@ limitations under the License. package nodevolumelimits import ( - "context" "errors" "fmt" "strings" @@ -658,18 +657,18 @@ func TestCSILimits(t *testing.T) { }}); err != nil { t.Error(err) } - informerfactory.Start(context.TODO().Done()) - informerfactory.WaitForCacheSync(context.TODO().Done()) + _, ctx := ktesting.NewTestContext(t) + informerfactory.Start(ctx.Done()) + informerfactory.WaitForCacheSync(ctx.Done()) p := &CSILimits{ csiManager: NewCSIManager(getFakeCSINodeLister(csiNode)), pvLister: getFakeCSIPVLister(test.filterName, test.driverNames...), pvcLister: append(getFakeCSIPVCLister(test.filterName, scName, test.driverNames...), test.extraClaims...), scLister: getFakeCSIStorageClassLister(scName, test.driverNames[0]), - vaindexer: informerfactory.Storage().V1().VolumeAttachments().Informer().GetIndexer(), + vaIndexer: informerfactory.Storage().V1().VolumeAttachments().Informer().GetIndexer(), randomVolumeIDPrefix: rand.String(32), translator: csiTranslator, } - _, ctx := ktesting.NewTestContext(t) _, gotPreFilterStatus := p.PreFilter(ctx, nil, test.newPod, nil) if diff := cmp.Diff(test.wantPreFilterStatus, gotPreFilterStatus, statusCmpOpts...); diff != "" { t.Errorf("PreFilter status does not match (-want, +got):\n%s", diff) @@ -1355,8 +1354,9 @@ func TestVolumeLimitScalingGate(t *testing.T) { }}); err != nil { t.Error(err) } - informerfactory.Start(context.TODO().Done()) - informerfactory.WaitForCacheSync(context.TODO().Done()) + _, ctx := ktesting.NewTestContext(t) + informerfactory.Start(ctx.Done()) + informerfactory.WaitForCacheSync(ctx.Done()) csiTranslator := csitrans.New() p := &CSILimits{ @@ -1365,7 +1365,7 @@ func TestVolumeLimitScalingGate(t *testing.T) { pvcLister: getFakeCSIPVCLister("csi", scName, ebsCSIDriverName), scLister: getFakeCSIStorageClassLister(scName, ebsCSIDriverName), vaLister: informerfactory.Storage().V1().VolumeAttachments().Lister(), - vaindexer: informerfactory.Storage().V1().VolumeAttachments().Informer().GetIndexer(), + vaIndexer: informerfactory.Storage().V1().VolumeAttachments().Informer().GetIndexer(), csiDriverLister: func() fakeCSIDriverLister { if tt.csiDriverPresent { return getFakeCSIDriverLister(ebsCSIDriverName) @@ -1377,7 +1377,6 @@ func TestVolumeLimitScalingGate(t *testing.T) { translator: csiTranslator, } - _, ctx := ktesting.NewTestContext(t) // Ensure PreFilter doesn't skip _, preStatus := p.PreFilter(ctx, nil, newPod, nil) if preStatus.Code() == fwk.Skip { From f006a945623017f353fba6b97d2318ad1b98472e Mon Sep 17 00:00:00 2001 From: KunWuLuan Date: Mon, 5 Jan 2026 17:37:48 +0800 Subject: [PATCH 5/6] refactor: extract volumeAttachmentIndexer function for better code reuse - Create a dedicated volumeAttachmentIndexer function to handle VolumeAttachment indexing logic This change extracts the inline indexer function into a named function to eliminate code duplication and improve maintainability. Signed-off-by: KunWuLuan --- .../framework/plugins/nodevolumelimits/csi.go | 16 +++++++++------- .../plugins/nodevolumelimits/csi_test.go | 16 ++-------------- 2 files changed, 11 insertions(+), 21 deletions(-) diff --git a/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go b/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go index c4b3d8c6a08..ba2ef338437 100644 --- a/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go +++ b/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go @@ -584,6 +584,14 @@ func (pl *CSILimits) getCSIDriverInfoFromSC(logger klog.Logger, csiNode *storage return provisioner, volumeHandle } +func volumeAttachmentIndexer(obj interface{}) ([]string, error) { + va, ok := obj.(*storagev1.VolumeAttachment) + if !ok { + return []string{}, nil + } + return []string{va.Spec.NodeName}, nil +} + // NewCSI initializes a new plugin and returns it. func NewCSI(_ context.Context, _ runtime.Object, handle fwk.Handle, fts feature.Features) (fwk.Plugin, error) { informerFactory := handle.SharedInformerFactory() @@ -593,13 +601,7 @@ func NewCSI(_ context.Context, _ runtime.Object, handle fwk.Handle, fts feature. vaLister := informerFactory.Storage().V1().VolumeAttachments().Lister() csiDriverLister := informerFactory.Storage().V1().CSIDrivers().Lister() vaIndexer := informerFactory.Storage().V1().VolumeAttachments().Informer().GetIndexer() - if err := informerFactory.Storage().V1().VolumeAttachments().Informer().AddIndexers(cache.Indexers{vaIndexKey: func(obj interface{}) ([]string, error) { - va, ok := obj.(*storagev1.VolumeAttachment) - if !ok { - return []string{}, nil - } - return []string{va.Spec.NodeName}, nil - }}); err != nil { + if err := informerFactory.Storage().V1().VolumeAttachments().Informer().AddIndexers(cache.Indexers{vaIndexKey: volumeAttachmentIndexer}); err != nil { vaInformer := informerFactory.Storage().V1().VolumeAttachments().Informer() if vaInformer.GetIndexer().GetIndexers()[vaIndexKey] == nil { return nil, fmt.Errorf("failed to add index to VA informer: %w", err) diff --git a/pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go b/pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go index 12f69b96c9b..d7bca9e89ff 100644 --- a/pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go +++ b/pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go @@ -648,13 +648,7 @@ func TestCSILimits(t *testing.T) { vas := getFakeVolumeAttachmentLister(test.vaCount, test.driverNames...) fakecli := fake.NewClientset(vas...) informerfactory := informers.NewSharedInformerFactory(fakecli, 0) - if err := informerfactory.Storage().V1().VolumeAttachments().Informer().AddIndexers(cache.Indexers{vaIndexKey: func(obj interface{}) ([]string, error) { - va, ok := obj.(*storagev1.VolumeAttachment) - if !ok { - return []string{}, nil - } - return []string{va.Spec.NodeName}, nil - }}); err != nil { + if err := informerfactory.Storage().V1().VolumeAttachments().Informer().AddIndexers(cache.Indexers{vaIndexKey: volumeAttachmentIndexer}); err != nil { t.Error(err) } _, ctx := ktesting.NewTestContext(t) @@ -1345,13 +1339,7 @@ func TestVolumeLimitScalingGate(t *testing.T) { vas := getFakeVolumeAttachmentLister(0, ebsCSIDriverName) fakecli := fake.NewClientset(vas...) informerfactory := informers.NewSharedInformerFactory(fakecli, 0) - if err := informerfactory.Storage().V1().VolumeAttachments().Informer().AddIndexers(cache.Indexers{vaIndexKey: func(obj interface{}) ([]string, error) { - va, ok := obj.(*storagev1.VolumeAttachment) - if !ok { - return []string{}, nil - } - return []string{va.Spec.NodeName}, nil - }}); err != nil { + if err := informerfactory.Storage().V1().VolumeAttachments().Informer().AddIndexers(cache.Indexers{vaIndexKey: volumeAttachmentIndexer}); err != nil { t.Error(err) } _, ctx := ktesting.NewTestContext(t) From c8c5a1653f785f1360d2cb038b3ef8fc58b96fc2 Mon Sep 17 00:00:00 2001 From: KunWuLuan Date: Tue, 20 Jan 2026 15:36:45 +0800 Subject: [PATCH 6/6] refactor: improve CSI plugin code clarity and error handling Signed-off-by: KunWuLuan --- .../framework/plugins/nodevolumelimits/csi.go | 10 ++--- .../plugins/nodevolumelimits/csi_test.go | 40 +++++++++---------- 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go b/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go index ba2ef338437..33149a14c22 100644 --- a/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go +++ b/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go @@ -600,9 +600,8 @@ func NewCSI(_ context.Context, _ runtime.Object, handle fwk.Handle, fts feature. scLister := informerFactory.Storage().V1().StorageClasses().Lister() vaLister := informerFactory.Storage().V1().VolumeAttachments().Lister() csiDriverLister := informerFactory.Storage().V1().CSIDrivers().Lister() - vaIndexer := informerFactory.Storage().V1().VolumeAttachments().Informer().GetIndexer() - if err := informerFactory.Storage().V1().VolumeAttachments().Informer().AddIndexers(cache.Indexers{vaIndexKey: volumeAttachmentIndexer}); err != nil { - vaInformer := informerFactory.Storage().V1().VolumeAttachments().Informer() + vaInformer := informerFactory.Storage().V1().VolumeAttachments().Informer() + if err := vaInformer.AddIndexers(cache.Indexers{vaIndexKey: volumeAttachmentIndexer}); err != nil { if vaInformer.GetIndexer().GetIndexers()[vaIndexKey] == nil { return nil, fmt.Errorf("failed to add index to VA informer: %w", err) } @@ -619,7 +618,7 @@ func NewCSI(_ context.Context, _ runtime.Object, handle fwk.Handle, fts feature. enableVolumeLimitScaling: fts.EnableVolumeLimitScaling, randomVolumeIDPrefix: rand.String(32), translator: csiTranslator, - vaIndexer: vaIndexer, + vaIndexer: vaInformer.GetIndexer(), }, nil } @@ -651,7 +650,8 @@ func (pl *CSILimits) getNodeVolumeAttachmentInfo(logger klog.Logger, nodeName st for _, vao := range vas { va, ok := vao.(*storagev1.VolumeAttachment) if !ok { - utilruntime.HandleError(fmt.Errorf("unexpected object type in volume attachment indexer: %v", vao)) + utilruntime.HandleErrorWithLogger(logger, fmt.Errorf("unexpected object type in volume attachment indexer: %v", vao), + "volume indexer not available") continue } if va.Spec.NodeName == nodeName { diff --git a/pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go b/pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go index d7bca9e89ff..ab3ede5b787 100644 --- a/pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go +++ b/pkg/scheduler/framework/plugins/nodevolumelimits/csi_test.go @@ -645,21 +645,20 @@ func TestCSILimits(t *testing.T) { enableMigrationOnNode(csiNode, csilibplugins.AWSEBSInTreePluginName) } csiTranslator := csitrans.New() - vas := getFakeVolumeAttachmentLister(test.vaCount, test.driverNames...) - fakecli := fake.NewClientset(vas...) - informerfactory := informers.NewSharedInformerFactory(fakecli, 0) - if err := informerfactory.Storage().V1().VolumeAttachments().Informer().AddIndexers(cache.Indexers{vaIndexKey: volumeAttachmentIndexer}); err != nil { + fakecli := buildFakeClientWithVALister(test.vaCount, test.driverNames...) + informerFactory := informers.NewSharedInformerFactory(fakecli, 0) + if err := informerFactory.Storage().V1().VolumeAttachments().Informer().AddIndexers(cache.Indexers{vaIndexKey: volumeAttachmentIndexer}); err != nil { t.Error(err) } _, ctx := ktesting.NewTestContext(t) - informerfactory.Start(ctx.Done()) - informerfactory.WaitForCacheSync(ctx.Done()) + informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) p := &CSILimits{ csiManager: NewCSIManager(getFakeCSINodeLister(csiNode)), pvLister: getFakeCSIPVLister(test.filterName, test.driverNames...), pvcLister: append(getFakeCSIPVCLister(test.filterName, scName, test.driverNames...), test.extraClaims...), scLister: getFakeCSIStorageClassLister(scName, test.driverNames[0]), - vaIndexer: informerfactory.Storage().V1().VolumeAttachments().Informer().GetIndexer(), + vaIndexer: informerFactory.Storage().V1().VolumeAttachments().Informer().GetIndexer(), randomVolumeIDPrefix: rand.String(32), translator: csiTranslator, } @@ -1086,12 +1085,12 @@ func TestCSILimitsAfterCSINodeUpdatedQHint(t *testing.T) { } } -func getFakeVolumeAttachmentLister(count int, driverNames ...string) []runtime.Object { - vaLister := []runtime.Object{} +func buildFakeClientWithVALister(count int, driverNames ...string) *fake.Clientset { + vas := []runtime.Object{} for _, driver := range driverNames { for j := 0; j < count; j++ { pvName := fmt.Sprintf("csi-%s-%d", driver, j) - va := storagev1.VolumeAttachment{ + va := &storagev1.VolumeAttachment{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("va-%s-%d", driver, j), }, @@ -1103,11 +1102,13 @@ func getFakeVolumeAttachmentLister(count int, driverNames ...string) []runtime.O }, }, } - vaLister = append(vaLister, &va) + vas = append(vas, va) } } - return vaLister + fakeCli := fake.NewClientset(vas...) + return fakeCli } + func getFakeCSIPVLister(volumeName string, driverNames ...string) tf.PersistentVolumeLister { pvLister := tf.PersistentVolumeLister{} for _, driver := range driverNames { @@ -1336,15 +1337,14 @@ func TestVolumeLimitScalingGate(t *testing.T) { for _, tt := range cases { t.Run(tt.name, func(t *testing.T) { node, csiNode := getNodeWithPodAndVolumeLimits(tt.limitSource, []*v1.Pod{}, tt.limit, ebsCSIDriverName) - vas := getFakeVolumeAttachmentLister(0, ebsCSIDriverName) - fakecli := fake.NewClientset(vas...) - informerfactory := informers.NewSharedInformerFactory(fakecli, 0) - if err := informerfactory.Storage().V1().VolumeAttachments().Informer().AddIndexers(cache.Indexers{vaIndexKey: volumeAttachmentIndexer}); err != nil { + fakecli := buildFakeClientWithVALister(0, ebsCSIDriverName) + informerFactory := informers.NewSharedInformerFactory(fakecli, 0) + if err := informerFactory.Storage().V1().VolumeAttachments().Informer().AddIndexers(cache.Indexers{vaIndexKey: volumeAttachmentIndexer}); err != nil { t.Error(err) } _, ctx := ktesting.NewTestContext(t) - informerfactory.Start(ctx.Done()) - informerfactory.WaitForCacheSync(ctx.Done()) + informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) csiTranslator := csitrans.New() p := &CSILimits{ @@ -1352,8 +1352,8 @@ func TestVolumeLimitScalingGate(t *testing.T) { pvLister: getFakeCSIPVLister("csi", ebsCSIDriverName), pvcLister: getFakeCSIPVCLister("csi", scName, ebsCSIDriverName), scLister: getFakeCSIStorageClassLister(scName, ebsCSIDriverName), - vaLister: informerfactory.Storage().V1().VolumeAttachments().Lister(), - vaIndexer: informerfactory.Storage().V1().VolumeAttachments().Informer().GetIndexer(), + vaLister: informerFactory.Storage().V1().VolumeAttachments().Lister(), + vaIndexer: informerFactory.Storage().V1().VolumeAttachments().Informer().GetIndexer(), csiDriverLister: func() fakeCSIDriverLister { if tt.csiDriverPresent { return getFakeCSIDriverLister(ebsCSIDriverName)