From 8eab454e38e5415caab70f9be0719e8667efd50f Mon Sep 17 00:00:00 2001 From: Ondra Kupka Date: Tue, 28 Oct 2025 20:06:01 +0100 Subject: [PATCH] controller/volume/expand: Improve goroutine mgmt Make sure all threads are terminated when Run returns. --- .../volume/expand/expand_controller.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/pkg/controller/volume/expand/expand_controller.go b/pkg/controller/volume/expand/expand_controller.go index b7044d1af2b..3d8ca84a7a9 100644 --- a/pkg/controller/volume/expand/expand_controller.go +++ b/pkg/controller/volume/expand/expand_controller.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "sync" "time" "k8s.io/klog/v2" @@ -322,19 +323,26 @@ func (expc *expandController) expand(logger klog.Logger, pvc *v1.PersistentVolum // TODO make concurrency configurable (workers argument). previously, nestedpendingoperations spawned unlimited goroutines func (expc *expandController) Run(ctx context.Context) { defer runtime.HandleCrash() - defer expc.queue.ShutDown() + logger := klog.FromContext(ctx) logger.Info("Starting expand controller") - defer logger.Info("Shutting down expand controller") + + var wg sync.WaitGroup + defer func() { + logger.Info("Shutting down expand controller") + expc.queue.ShutDown() + wg.Wait() + }() if !cache.WaitForNamedCacheSyncWithContext(ctx, expc.pvcsSynced) { return } for i := 0; i < defaultWorkerCount; i++ { - go wait.UntilWithContext(ctx, expc.runWorker, time.Second) + wg.Go(func() { + wait.UntilWithContext(ctx, expc.runWorker, time.Second) + }) } - <-ctx.Done() }