From 575e9eb64c716bc0ca24dc5bf5ccbe86b06ea5a0 Mon Sep 17 00:00:00 2001 From: Ondra Kupka Date: Mon, 27 Oct 2025 11:21:51 +0100 Subject: [PATCH] controller/job: Improve goroutine mgmt Make sure all threads are terminated when Run returns. --- pkg/controller/job/job_controller.go | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index e415fe0d50c..cd908de599c 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -244,28 +244,35 @@ func newControllerWithClock(ctx context.Context, podInformer coreinformers.PodIn // Run the main goroutine responsible for watching and syncing jobs. func (jm *Controller) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() - logger := klog.FromContext(ctx) // Start events processing pipeline. jm.broadcaster.StartStructuredLogging(3) jm.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: jm.kubeClient.CoreV1().Events("")}) defer jm.broadcaster.Shutdown() - defer jm.queue.ShutDown() - defer jm.orphanQueue.ShutDown() - + logger := klog.FromContext(ctx) logger.Info("Starting job controller") - defer logger.Info("Shutting down job controller") + + var wg sync.WaitGroup + defer func() { + logger.Info("Shutting down job controller") + jm.queue.ShutDown() + jm.orphanQueue.ShutDown() + wg.Wait() + }() if !cache.WaitForNamedCacheSyncWithContext(ctx, jm.podStoreSynced, jm.jobStoreSynced) { return } for i := 0; i < workers; i++ { - go wait.UntilWithContext(ctx, jm.worker, time.Second) - go wait.UntilWithContext(ctx, jm.orphanWorker, time.Second) + wg.Go(func() { + wait.UntilWithContext(ctx, jm.worker, time.Second) + }) + wg.Go(func() { + wait.UntilWithContext(ctx, jm.orphanWorker, time.Second) + }) } - <-ctx.Done() }