controller/endpointslice: Improve goroutine mgmt

Make sure all threads are terminated when Run returns.
This commit is contained in:
Ondra Kupka 2025-10-27 14:08:34 +01:00
parent ccd35f7c5e
commit cd73e8777b

View file

@ -19,6 +19,7 @@ package endpointslice
import (
"context"
"fmt"
"sync"
"time"
"golang.org/x/time/rate"
@ -278,13 +279,17 @@ func (c *Controller) Run(ctx context.Context, workers int) {
c.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.client.CoreV1().Events("")})
defer c.eventBroadcaster.Shutdown()
defer c.serviceQueue.ShutDown()
defer c.podQueue.ShutDown()
defer c.topologyQueue.ShutDown()
logger := klog.FromContext(ctx)
logger.Info("Starting endpoint slice controller")
defer logger.Info("Shutting down endpoint slice controller")
var wg sync.WaitGroup
defer func() {
logger.Info("Shutting down endpoint slice controller")
c.serviceQueue.ShutDown()
c.podQueue.ShutDown()
c.topologyQueue.ShutDown()
wg.Wait()
}()
if !cache.WaitForNamedCacheSyncWithContext(ctx, c.podsSynced, c.servicesSynced, c.endpointSlicesSynced, c.nodesSynced) {
return
@ -292,12 +297,18 @@ func (c *Controller) Run(ctx context.Context, workers int) {
logger.V(2).Info("Starting service queue worker threads", "total", workers)
for i := 0; i < workers; i++ {
go wait.Until(func() { c.serviceQueueWorker(logger) }, c.workerLoopPeriod, ctx.Done())
go wait.Until(func() { c.podQueueWorker(logger) }, c.workerLoopPeriod, ctx.Done())
wg.Go(func() {
wait.Until(func() { c.serviceQueueWorker(logger) }, c.workerLoopPeriod, ctx.Done())
})
wg.Go(func() {
wait.Until(func() { c.podQueueWorker(logger) }, c.workerLoopPeriod, ctx.Done())
})
}
logger.V(2).Info("Starting topology queue worker threads", "total", 1)
go wait.Until(func() { c.topologyQueueWorker(logger) }, c.workerLoopPeriod, ctx.Done())
logger.V(2).Info("Starting topology queue worker threads", "total", 1)
wg.Go(func() {
wait.Until(func() { c.topologyQueueWorker(logger) }, c.workerLoopPeriod, ctx.Done())
})
<-ctx.Done()
}