From d03e9c87087debd9fe4940bb2a96100de88189cf Mon Sep 17 00:00:00 2001 From: Richa Banker Date: Tue, 27 Jan 2026 13:23:00 -0800 Subject: [PATCH] Use single worker for ActiveGVTracker and Refliter workqueues --- pkg/controlplane/apiserver/server.go | 4 +-- .../util/peerproxy/gv_exclusion_manager.go | 25 ++++++++----------- .../pkg/util/peerproxy/peer_discovery.go | 8 +----- .../pkg/util/peerproxy/peer_discovery_test.go | 2 +- .../apiserver/pkg/util/peerproxy/peerproxy.go | 12 ++++----- 5 files changed, 21 insertions(+), 30 deletions(-) diff --git a/pkg/controlplane/apiserver/server.go b/pkg/controlplane/apiserver/server.go index d536a1dd59c..3382ff69eb9 100644 --- a/pkg/controlplane/apiserver/server.go +++ b/pkg/controlplane/apiserver/server.go @@ -227,9 +227,9 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele // Run peer-discovery workers s.GenericAPIServer.AddPostStartHookOrDie("peer-discovery-workers", func(context genericapiserver.PostStartHookContext) error { go c.Extra.PeerProxy.RunPeerDiscoveryCacheSync(context, 1) - go c.Extra.PeerProxy.RunPeerDiscoveryActiveGVTracker(context, 1) + go c.Extra.PeerProxy.RunPeerDiscoveryActiveGVTracker(context) go c.Extra.PeerProxy.RunPeerDiscoveryReaper(context) - go c.Extra.PeerProxy.RunPeerDiscoveryRefilter(context, 1) + go c.Extra.PeerProxy.RunPeerDiscoveryRefilter(context) return nil }) } diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/gv_exclusion_manager.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/gv_exclusion_manager.go index 9a1ede91767..0015345e0bb 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/gv_exclusion_manager.go +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/gv_exclusion_manager.go @@ -193,16 +193,15 @@ func (m *GVExclusionManager) handleGVUpdate() { m.activeGVQueue.Add("sync") } -// RunPeerDiscoveryActiveGVTracker runs Worker 1: Active GV Tracker +// RunPeerDiscoveryActiveGVTracker runs the Active GV Tracker worker. // This worker is triggered by CRD/APIService events and -// rebuilds the active GV set and reaps expired GVs if indicated so. -func (m *GVExclusionManager) RunPeerDiscoveryActiveGVTracker(ctx context.Context, workers int) { +// rebuilds the active GV set and reaps expired GVs. +// Only a single worker is used to avoid race conditions on atomic store operations. +func (m *GVExclusionManager) RunPeerDiscoveryActiveGVTracker(ctx context.Context) { defer m.activeGVQueue.ShutDown() - klog.Infof("Starting %d Active GV Tracker worker(s)", workers) - for i := 0; i < workers; i++ { - go wait.UntilWithContext(ctx, m.runActiveGVTrackerWorker, time.Second) - } + klog.Info("Starting Active GV Tracker worker") + go wait.UntilWithContext(ctx, m.runActiveGVTrackerWorker, time.Second) <-ctx.Done() klog.Info("Active GV Tracker workers stopped") @@ -344,7 +343,7 @@ func (m *GVExclusionManager) RunPeerDiscoveryReaper(ctx context.Context) { select { case <-ticker.C: // Trigger reconciliation which will also reap expired GVs - m.activeGVQueue.Add("reap") + m.activeGVQueue.Add("sync") case <-ctx.Done(): klog.Info("GV Reaper stopped") return @@ -352,15 +351,13 @@ func (m *GVExclusionManager) RunPeerDiscoveryReaper(ctx context.Context) { } } -// RunPeerDiscoveryRefilter runs Worker 2: Peer Discovery Re-filter +// RunPeerDiscoveryRefilter runs the Peer Discovery Re-filter worker. // This worker filters the peer discovery cache using the exclusion set. -func (m *GVExclusionManager) RunPeerDiscoveryRefilter(ctx context.Context, workers int) { +func (m *GVExclusionManager) RunPeerDiscoveryRefilter(ctx context.Context) { defer m.refilterQueue.ShutDown() - klog.Infof("Starting %d Peer Discovery Re-filter worker(s)", workers) - for i := 0; i < workers; i++ { - go wait.UntilWithContext(ctx, m.runRefilterWorker, time.Second) - } + klog.Info("Starting Peer Discovery Re-filter worker") + go wait.UntilWithContext(ctx, m.runRefilterWorker, time.Second) <-ctx.Done() klog.Info("Peer Discovery Re-filter workers stopped") diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery.go index b7e9d362d69..041377d0817 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery.go +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery.go @@ -117,15 +117,9 @@ func (h *peerProxyHandler) syncPeerDiscoveryCache(ctx context.Context) error { // Store unfiltered data to raw cache and trigger refilter. // The refilter worker (single writer to filtered cache) will apply exclusions. - h.storeRawPeerDiscoveryCacheAndTriggerRefilter(newCache) - return fetchDiscoveryErr -} - -// storeRawPeerDiscoveryCacheAndTriggerRefilter stores the raw (unfiltered) peer discovery cache -// and immediately refilters to update the filtered cache. -func (h *peerProxyHandler) storeRawPeerDiscoveryCacheAndTriggerRefilter(newCache map[string]PeerDiscoveryCacheEntry) { h.rawPeerDiscoveryCache.Store(newCache) h.gvExclusionManager.TriggerRefilter() + return fetchDiscoveryErr } func (h *peerProxyHandler) fetchNewDiscoveryFor(ctx context.Context, serverID string) (PeerDiscoveryCacheEntry, error) { diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery_test.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery_test.go index c479fbcece3..a275e112ff6 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery_test.go @@ -192,7 +192,7 @@ func TestRunPeerDiscoveryCacheSync(t *testing.T) { } go h.RunPeerDiscoveryCacheSync(ctx, 1) - go h.RunPeerDiscoveryRefilter(ctx, 1) + go h.RunPeerDiscoveryRefilter(ctx) // Wait for initial cache update. initialCache := map[string]PeerDiscoveryCacheEntry{} diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy.go index ca0a383c22d..bc3a44c954b 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy.go +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy.go @@ -101,7 +101,7 @@ type Interface interface { // events to rebuild the set of actively served GroupVersions. This worker is triggered // whenever a CRD or APIService is added or updated and updates the exclusion // set accordingly. - RunPeerDiscoveryActiveGVTracker(ctx context.Context, workers int) + RunPeerDiscoveryActiveGVTracker(ctx context.Context) // RunPeerDiscoveryReaper starts a background worker that periodically removes expired // GroupVersions from the exclusion set. When a CRD/APIService is deleted, its GV remains @@ -115,7 +115,7 @@ type Interface interface { // already-cached peer discovery responses are immediately updated to exclude newly added // or updated local GVs, rather than waiting for the next peer lease event to trigger a // cache refresh of peer discovery data. - RunPeerDiscoveryRefilter(ctx context.Context, workers int) + RunPeerDiscoveryRefilter(ctx context.Context) } // New creates a new instance to implement unknown version proxy @@ -229,9 +229,9 @@ func (h *peerProxyHandler) RegisterAPIServiceInformerHandlers(apiServiceInformer } // RunPeerDiscoveryActiveGVTracker starts the worker that tracks active GVs from CRDs/APIServices. -func (h *peerProxyHandler) RunPeerDiscoveryActiveGVTracker(ctx context.Context, workers int) { +func (h *peerProxyHandler) RunPeerDiscoveryActiveGVTracker(ctx context.Context) { if h.gvExclusionManager != nil { - h.gvExclusionManager.RunPeerDiscoveryActiveGVTracker(ctx, workers) + h.gvExclusionManager.RunPeerDiscoveryActiveGVTracker(ctx) } } @@ -243,8 +243,8 @@ func (h *peerProxyHandler) RunPeerDiscoveryReaper(ctx context.Context) { } // RunPeerDiscoveryRefilter starts the worker that refilters peer discovery cache. -func (h *peerProxyHandler) RunPeerDiscoveryRefilter(ctx context.Context, workers int) { +func (h *peerProxyHandler) RunPeerDiscoveryRefilter(ctx context.Context) { if h.gvExclusionManager != nil { - h.gvExclusionManager.RunPeerDiscoveryRefilter(ctx, workers) + h.gvExclusionManager.RunPeerDiscoveryRefilter(ctx) } }