From cd73e8777b3014c2da7422cf060696aec30009c3 Mon Sep 17 00:00:00 2001 From: Ondra Kupka Date: Mon, 27 Oct 2025 14:08:34 +0100 Subject: [PATCH] controller/endpointslice: Improve goroutine mgmt Make sure all threads are terminated when Run returns. --- .../endpointslice/endpointslice_controller.go | 29 +++++++++++++------ 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/pkg/controller/endpointslice/endpointslice_controller.go b/pkg/controller/endpointslice/endpointslice_controller.go index 2c22989ec98..2f424437b83 100644 --- a/pkg/controller/endpointslice/endpointslice_controller.go +++ b/pkg/controller/endpointslice/endpointslice_controller.go @@ -19,6 +19,7 @@ package endpointslice import ( "context" "fmt" + "sync" "time" "golang.org/x/time/rate" @@ -278,13 +279,17 @@ func (c *Controller) Run(ctx context.Context, workers int) { c.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.client.CoreV1().Events("")}) defer c.eventBroadcaster.Shutdown() - defer c.serviceQueue.ShutDown() - defer c.podQueue.ShutDown() - defer c.topologyQueue.ShutDown() - logger := klog.FromContext(ctx) logger.Info("Starting endpoint slice controller") - defer logger.Info("Shutting down endpoint slice controller") + + var wg sync.WaitGroup + defer func() { + logger.Info("Shutting down endpoint slice controller") + c.serviceQueue.ShutDown() + c.podQueue.ShutDown() + c.topologyQueue.ShutDown() + wg.Wait() + }() if !cache.WaitForNamedCacheSyncWithContext(ctx, c.podsSynced, c.servicesSynced, c.endpointSlicesSynced, c.nodesSynced) { return @@ -292,12 +297,18 @@ func (c *Controller) Run(ctx context.Context, workers int) { logger.V(2).Info("Starting service queue worker threads", "total", workers) for i := 0; i < workers; i++ { - go wait.Until(func() { c.serviceQueueWorker(logger) }, c.workerLoopPeriod, ctx.Done()) - go wait.Until(func() { c.podQueueWorker(logger) }, c.workerLoopPeriod, ctx.Done()) + wg.Go(func() { + wait.Until(func() { c.serviceQueueWorker(logger) }, c.workerLoopPeriod, ctx.Done()) + }) + wg.Go(func() { + wait.Until(func() { c.podQueueWorker(logger) }, c.workerLoopPeriod, ctx.Done()) + }) } - logger.V(2).Info("Starting topology queue worker threads", "total", 1) - go wait.Until(func() { c.topologyQueueWorker(logger) }, c.workerLoopPeriod, ctx.Done()) + logger.V(2).Info("Starting topology queue worker threads", "total", 1) + wg.Go(func() { + wait.Until(func() { c.topologyQueueWorker(logger) }, c.workerLoopPeriod, ctx.Done()) + }) <-ctx.Done() }