mirror of
https://github.com/kubernetes/kubernetes.git
synced 2026-05-28 04:04:39 -04:00
controller/clusterroleaggregation: Improve goroutine mgmt
Make sure all threads are terminated when Run returns.
This commit is contained in:
parent
5f3f39edc1
commit
d1eccb2377
1 changed files with 11 additions and 4 deletions
|
|
@ -20,6 +20,7 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
rbacv1ac "k8s.io/client-go/applyconfigurations/rbac/v1"
|
||||
|
|
@ -188,20 +189,26 @@ func ruleExists(haystack []rbacv1.PolicyRule, needle rbacv1.PolicyRule) bool {
|
|||
// Run starts the controller and blocks until stopCh is closed.
|
||||
func (c *ClusterRoleAggregationController) Run(ctx context.Context, workers int) {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer c.queue.ShutDown()
|
||||
|
||||
logger := klog.FromContext(ctx)
|
||||
logger.Info("Starting ClusterRoleAggregator controller")
|
||||
defer logger.Info("Shutting down ClusterRoleAggregator controller")
|
||||
|
||||
var wg sync.WaitGroup
|
||||
defer func() {
|
||||
logger.Info("Shutting down ClusterRoleAggregator controller")
|
||||
c.queue.ShutDown()
|
||||
wg.Wait()
|
||||
}()
|
||||
|
||||
if !cache.WaitForNamedCacheSyncWithContext(ctx, c.clusterRolesSynced) {
|
||||
return
|
||||
}
|
||||
|
||||
for i := 0; i < workers; i++ {
|
||||
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
|
||||
wg.Go(func() {
|
||||
wait.UntilWithContext(ctx, c.runWorker, time.Second)
|
||||
})
|
||||
}
|
||||
|
||||
<-ctx.Done()
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue