diff --git a/pkg/controller/clusterroleaggregation/clusterroleaggregation_controller.go b/pkg/controller/clusterroleaggregation/clusterroleaggregation_controller.go index e3b98618963..2b27994f8e7 100644 --- a/pkg/controller/clusterroleaggregation/clusterroleaggregation_controller.go +++ b/pkg/controller/clusterroleaggregation/clusterroleaggregation_controller.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "sort" + "sync" "time" rbacv1ac "k8s.io/client-go/applyconfigurations/rbac/v1" @@ -188,20 +189,26 @@ func ruleExists(haystack []rbacv1.PolicyRule, needle rbacv1.PolicyRule) bool { // Run starts the controller and blocks until stopCh is closed. func (c *ClusterRoleAggregationController) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() - defer c.queue.ShutDown() logger := klog.FromContext(ctx) logger.Info("Starting ClusterRoleAggregator controller") - defer logger.Info("Shutting down ClusterRoleAggregator controller") + + var wg sync.WaitGroup + defer func() { + logger.Info("Shutting down ClusterRoleAggregator controller") + c.queue.ShutDown() + wg.Wait() + }() if !cache.WaitForNamedCacheSyncWithContext(ctx, c.clusterRolesSynced) { return } for i := 0; i < workers; i++ { - go wait.UntilWithContext(ctx, c.runWorker, time.Second) + wg.Go(func() { + wait.UntilWithContext(ctx, c.runWorker, time.Second) + }) } - <-ctx.Done() }