controller/job: Improve goroutine mgmt

Make sure all threads are terminated when Run returns.
This commit is contained in:
Ondra Kupka 2025-10-27 11:21:51 +01:00
parent 7bf52d74d0
commit 575e9eb64c

View file

@ -244,28 +244,35 @@ func newControllerWithClock(ctx context.Context, podInformer coreinformers.PodIn
// Run the main goroutine responsible for watching and syncing jobs.
func (jm *Controller) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
logger := klog.FromContext(ctx)
// Start events processing pipeline.
jm.broadcaster.StartStructuredLogging(3)
jm.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: jm.kubeClient.CoreV1().Events("")})
defer jm.broadcaster.Shutdown()
defer jm.queue.ShutDown()
defer jm.orphanQueue.ShutDown()
logger := klog.FromContext(ctx)
logger.Info("Starting job controller")
defer logger.Info("Shutting down job controller")
var wg sync.WaitGroup
defer func() {
logger.Info("Shutting down job controller")
jm.queue.ShutDown()
jm.orphanQueue.ShutDown()
wg.Wait()
}()
if !cache.WaitForNamedCacheSyncWithContext(ctx, jm.podStoreSynced, jm.jobStoreSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, jm.worker, time.Second)
go wait.UntilWithContext(ctx, jm.orphanWorker, time.Second)
wg.Go(func() {
wait.UntilWithContext(ctx, jm.worker, time.Second)
})
wg.Go(func() {
wait.UntilWithContext(ctx, jm.orphanWorker, time.Second)
})
}
<-ctx.Done()
}