Use single worker for ActiveGVTracker and Refliter workqueues

This commit is contained in:
Richa Banker 2026-01-27 13:23:00 -08:00
parent 4df467d42d
commit d03e9c8708
5 changed files with 21 additions and 30 deletions

View file

@ -227,9 +227,9 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele
// Run peer-discovery workers
s.GenericAPIServer.AddPostStartHookOrDie("peer-discovery-workers", func(context genericapiserver.PostStartHookContext) error {
go c.Extra.PeerProxy.RunPeerDiscoveryCacheSync(context, 1)
go c.Extra.PeerProxy.RunPeerDiscoveryActiveGVTracker(context, 1)
go c.Extra.PeerProxy.RunPeerDiscoveryActiveGVTracker(context)
go c.Extra.PeerProxy.RunPeerDiscoveryReaper(context)
go c.Extra.PeerProxy.RunPeerDiscoveryRefilter(context, 1)
go c.Extra.PeerProxy.RunPeerDiscoveryRefilter(context)
return nil
})
}

View file

@ -193,16 +193,15 @@ func (m *GVExclusionManager) handleGVUpdate() {
m.activeGVQueue.Add("sync")
}
// RunPeerDiscoveryActiveGVTracker runs Worker 1: Active GV Tracker
// RunPeerDiscoveryActiveGVTracker runs the Active GV Tracker worker.
// This worker is triggered by CRD/APIService events and
// rebuilds the active GV set and reaps expired GVs if indicated so.
func (m *GVExclusionManager) RunPeerDiscoveryActiveGVTracker(ctx context.Context, workers int) {
// rebuilds the active GV set and reaps expired GVs.
// Only a single worker is used to avoid race conditions on atomic store operations.
func (m *GVExclusionManager) RunPeerDiscoveryActiveGVTracker(ctx context.Context) {
defer m.activeGVQueue.ShutDown()
klog.Infof("Starting %d Active GV Tracker worker(s)", workers)
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, m.runActiveGVTrackerWorker, time.Second)
}
klog.Info("Starting Active GV Tracker worker")
go wait.UntilWithContext(ctx, m.runActiveGVTrackerWorker, time.Second)
<-ctx.Done()
klog.Info("Active GV Tracker workers stopped")
@ -344,7 +343,7 @@ func (m *GVExclusionManager) RunPeerDiscoveryReaper(ctx context.Context) {
select {
case <-ticker.C:
// Trigger reconciliation which will also reap expired GVs
m.activeGVQueue.Add("reap")
m.activeGVQueue.Add("sync")
case <-ctx.Done():
klog.Info("GV Reaper stopped")
return
@ -352,15 +351,13 @@ func (m *GVExclusionManager) RunPeerDiscoveryReaper(ctx context.Context) {
}
}
// RunPeerDiscoveryRefilter runs Worker 2: Peer Discovery Re-filter
// RunPeerDiscoveryRefilter runs the Peer Discovery Re-filter worker.
// This worker filters the peer discovery cache using the exclusion set.
func (m *GVExclusionManager) RunPeerDiscoveryRefilter(ctx context.Context, workers int) {
func (m *GVExclusionManager) RunPeerDiscoveryRefilter(ctx context.Context) {
defer m.refilterQueue.ShutDown()
klog.Infof("Starting %d Peer Discovery Re-filter worker(s)", workers)
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, m.runRefilterWorker, time.Second)
}
klog.Info("Starting Peer Discovery Re-filter worker")
go wait.UntilWithContext(ctx, m.runRefilterWorker, time.Second)
<-ctx.Done()
klog.Info("Peer Discovery Re-filter workers stopped")

View file

@ -117,15 +117,9 @@ func (h *peerProxyHandler) syncPeerDiscoveryCache(ctx context.Context) error {
// Store unfiltered data to raw cache and trigger refilter.
// The refilter worker (single writer to filtered cache) will apply exclusions.
h.storeRawPeerDiscoveryCacheAndTriggerRefilter(newCache)
return fetchDiscoveryErr
}
// storeRawPeerDiscoveryCacheAndTriggerRefilter stores the raw (unfiltered) peer discovery cache
// and immediately refilters to update the filtered cache.
func (h *peerProxyHandler) storeRawPeerDiscoveryCacheAndTriggerRefilter(newCache map[string]PeerDiscoveryCacheEntry) {
h.rawPeerDiscoveryCache.Store(newCache)
h.gvExclusionManager.TriggerRefilter()
return fetchDiscoveryErr
}
func (h *peerProxyHandler) fetchNewDiscoveryFor(ctx context.Context, serverID string) (PeerDiscoveryCacheEntry, error) {

View file

@ -192,7 +192,7 @@ func TestRunPeerDiscoveryCacheSync(t *testing.T) {
}
go h.RunPeerDiscoveryCacheSync(ctx, 1)
go h.RunPeerDiscoveryRefilter(ctx, 1)
go h.RunPeerDiscoveryRefilter(ctx)
// Wait for initial cache update.
initialCache := map[string]PeerDiscoveryCacheEntry{}

View file

@ -101,7 +101,7 @@ type Interface interface {
// events to rebuild the set of actively served GroupVersions. This worker is triggered
// whenever a CRD or APIService is added or updated and updates the exclusion
// set accordingly.
RunPeerDiscoveryActiveGVTracker(ctx context.Context, workers int)
RunPeerDiscoveryActiveGVTracker(ctx context.Context)
// RunPeerDiscoveryReaper starts a background worker that periodically removes expired
// GroupVersions from the exclusion set. When a CRD/APIService is deleted, its GV remains
@ -115,7 +115,7 @@ type Interface interface {
// already-cached peer discovery responses are immediately updated to exclude newly added
// or updated local GVs, rather than waiting for the next peer lease event to trigger a
// cache refresh of peer discovery data.
RunPeerDiscoveryRefilter(ctx context.Context, workers int)
RunPeerDiscoveryRefilter(ctx context.Context)
}
// New creates a new instance to implement unknown version proxy
@ -229,9 +229,9 @@ func (h *peerProxyHandler) RegisterAPIServiceInformerHandlers(apiServiceInformer
}
// RunPeerDiscoveryActiveGVTracker starts the worker that tracks active GVs from CRDs/APIServices.
func (h *peerProxyHandler) RunPeerDiscoveryActiveGVTracker(ctx context.Context, workers int) {
func (h *peerProxyHandler) RunPeerDiscoveryActiveGVTracker(ctx context.Context) {
if h.gvExclusionManager != nil {
h.gvExclusionManager.RunPeerDiscoveryActiveGVTracker(ctx, workers)
h.gvExclusionManager.RunPeerDiscoveryActiveGVTracker(ctx)
}
}
@ -243,8 +243,8 @@ func (h *peerProxyHandler) RunPeerDiscoveryReaper(ctx context.Context) {
}
// RunPeerDiscoveryRefilter starts the worker that refilters peer discovery cache.
func (h *peerProxyHandler) RunPeerDiscoveryRefilter(ctx context.Context, workers int) {
func (h *peerProxyHandler) RunPeerDiscoveryRefilter(ctx context.Context) {
if h.gvExclusionManager != nil {
h.gvExclusionManager.RunPeerDiscoveryRefilter(ctx, workers)
h.gvExclusionManager.RunPeerDiscoveryRefilter(ctx)
}
}