From aaed7525ddcf04b00c6d1a96ed13e7663b3a22ad Mon Sep 17 00:00:00 2001 From: Richa Banker Date: Wed, 21 Jan 2026 16:41:01 -0800 Subject: [PATCH] Consolidate delete handler, reaper logic into same sync loop using the same worker - activeGVQueue --- pkg/controlplane/apiserver/server.go | 23 +-- .../util/peerproxy/gv_exclusion_manager.go | 189 ++++++++---------- .../peerproxy/gv_exclusion_manager_test.go | 8 +- 3 files changed, 85 insertions(+), 135 deletions(-) diff --git a/pkg/controlplane/apiserver/server.go b/pkg/controlplane/apiserver/server.go index b12891fdfe7..d536a1dd59c 100644 --- a/pkg/controlplane/apiserver/server.go +++ b/pkg/controlplane/apiserver/server.go @@ -224,30 +224,11 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele return err }) - // Run peer-discovery sync loop - s.GenericAPIServer.AddPostStartHookOrDie("peer-discovery-cache-sync", func(context genericapiserver.PostStartHookContext) error { + // Run peer-discovery workers + s.GenericAPIServer.AddPostStartHookOrDie("peer-discovery-workers", func(context genericapiserver.PostStartHookContext) error { go c.Extra.PeerProxy.RunPeerDiscoveryCacheSync(context, 1) - return nil - }) - - // RunActiveGVTracker monitors CRDs/APIServices and maintains the active GV list for exclusion from peer-discovery. - s.GenericAPIServer.AddPostStartHookOrDie("peer-discovery-gv-active-tracker", func(context genericapiserver.PostStartHookContext) error { go c.Extra.PeerProxy.RunPeerDiscoveryActiveGVTracker(context, 1) - return nil - }) - - // RunReaper removes expired GVs from the exclusion set for peer-discovery after their grace period. - // This ensures we don't indefinitely exclude GVs that are no longer present. - s.GenericAPIServer.AddPostStartHookOrDie("peer-discovery-gv-exclusion-reaper", func(context genericapiserver.PostStartHookContext) error { go c.Extra.PeerProxy.RunPeerDiscoveryReaper(context) - return nil - }) - - // RunPeerDiscoveryRefilter re-applies the exclusion filter to the existing peer discovery cache - // whenever the exclusion set changes (e.g., CRD or aggregated API added/deleted). This is different from the - // initial filtering that happens when peer discovery is first fetched - this worker ensures - // the already-cached data stays consistent with the current exclusion set. - s.GenericAPIServer.AddPostStartHookOrDie("peer-discovery-refilter", func(context genericapiserver.PostStartHookContext) error { go c.Extra.PeerProxy.RunPeerDiscoveryRefilter(context, 1) return nil }) diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/gv_exclusion_manager.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/gv_exclusion_manager.go index b1f9e4cac63..950a05f2635 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/gv_exclusion_manager.go +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/gv_exclusion_manager.go @@ -42,10 +42,11 @@ type GVExtractor func(obj interface{}) []schema.GroupVersion // - recentlyDeletedGVs: GVs belonging to CRDs and aggregated APIServices that were recently deleted, // tracked with deletion timestamp for grace period // -// It runs three workers: -// 1. Active GV Tracker: Triggered on CRD/APIService events, rebuilds active GVs -// 2. Reaper: Periodically removes expired GVs from recentlyDeletedGVs -// 3. Peer Discovery Re-filter: Rate-limited worker that filters peer cache +// It runs two workers and a periodic ticker: +// 1. Active GV Tracker: Triggered on CRD/APIService events or reaper ticks, +// rebuilds active GVs and reaps expired deleted GVs +// 2. Peer Discovery Re-filter: Rate-limited worker that filters peer cache +// 3. Reaper Ticker: Periodically triggers the Active GV Tracker to reap expired GVs type GVExclusionManager struct { // Atomic maps for lock-free access currentlyActiveGVs atomic.Value // map[schema.GroupVersion]struct{} @@ -57,12 +58,12 @@ type GVExclusionManager struct { apiServiceInformer cache.SharedIndexInformer apiServiceExtractor GVExtractor - // Worker 1: triggered by CRD/APIService events + // Worker 1: triggered by CRD/APIService events or reaper ticks activeGVQueue workqueue.TypedRateLimitingInterface[string] - // Worker 2: periodic reaper's configuration + // Reaper ticker configuration exclusionGracePeriod time.Duration reaperCheckInterval time.Duration - // Worker 3: triggered by Active/Deleted GV changes + // Worker 2: triggered by Active/Deleted GV changes refilterQueue workqueue.TypedRateLimitingInterface[string] peerDiscoveryCache *atomic.Value // peerProxyHandler.peerDiscoveryInfoCache @@ -112,15 +113,7 @@ func (m *GVExclusionManager) RegisterCRDInformerHandlers(crdInformer cache.Share m.handleGVUpdate() }, DeleteFunc: func(obj interface{}) { - // Handle tombstone objects - if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { - obj = tombstone.Obj - } - gvs := extractor(obj) - if gvs == nil { - return - } - m.onDeleteEvent(gvs) + m.handleGVUpdate() }, }) return err @@ -140,16 +133,7 @@ func (m *GVExclusionManager) RegisterAPIServiceInformerHandlers(apiServiceInform m.handleGVUpdate() }, DeleteFunc: func(obj interface{}) { - // Handle tombstone objects - if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { - obj = tombstone.Obj - } - - gvs := extractor(obj) - if gvs == nil { - return - } - m.onDeleteEvent(gvs) + m.handleGVUpdate() }, }) return err @@ -187,49 +171,15 @@ func (m *GVExclusionManager) getExclusionSet() map[schema.GroupVersion]struct{} } // handleGVUpdate is called when a CRD or APIService event occurs. -// This triggers Worker 1 to rebuild the active GV set. +// This triggers reconciliation which rebuilds the active GV set +// and also reaps expired GVs if indicated so. func (m *GVExclusionManager) handleGVUpdate() { m.activeGVQueue.Add("sync") } -// onDeleteEvent is called when a CRD or APIService is deleted. -// This removes the GVs from currentlyActiveGVs, adds them to recentlyDeletedGVs -// with current timestamp, and queues Worker 3 to refilter the peer discovery cache. -func (m *GVExclusionManager) onDeleteEvent(gvs []schema.GroupVersion) { - if len(gvs) == 0 { - return - } - - // Remove from currentlyActiveGVs - activeMap := m.loadCurrentlyActiveGVs() - newActiveMap := make(map[schema.GroupVersion]struct{}, len(activeMap)) - for k, v := range activeMap { - newActiveMap[k] = v - } - for _, gv := range gvs { - delete(newActiveMap, gv) - } - m.currentlyActiveGVs.Store(newActiveMap) - - // Add to recentlyDeletedGVs - deletedMap := m.loadRecentlyDeletedGVs() - newDeletedMap := make(map[schema.GroupVersion]time.Time, len(deletedMap)+len(gvs)) - for k, v := range deletedMap { - newDeletedMap[k] = v - } - now := time.Now() - for _, gv := range gvs { - newDeletedMap[gv] = now - klog.V(4).Infof("GV %s deleted: moved to recentlyDeletedGVs", gv.String()) - } - m.recentlyDeletedGVs.Store(newDeletedMap) - - // Queue Worker 3 - m.refilterQueue.Add("refilter") -} - // RunPeerDiscoveryActiveGVTracker runs Worker 1: Active GV Tracker -// This worker is triggered by CRD/APIService events and rebuilds the active GV set. +// This worker is triggered by CRD/APIService events and +// rebuilds the active GV set and reaps expired GVs if indicated so. func (m *GVExclusionManager) RunPeerDiscoveryActiveGVTracker(ctx context.Context, workers int) { defer m.activeGVQueue.ShutDown() @@ -264,8 +214,11 @@ func (m *GVExclusionManager) processNextActiveGV(ctx context.Context) bool { return true } -// reconcileActiveGVs fetches all GVs from CRD and APIService informers, -// detects diffs with the previous state, and queues Worker 3 if changes detected. +// reconcileActiveGVs does the following +// 1. fetches all GVs from CRD and APIService informers +// 2. detects diffs with the previous state +// 3. adds deleted GVs to recentlyDeletedGVs +// 4. reaps expired GVs, and queues Worker 2 if changes detected. func (m *GVExclusionManager) reconcileActiveGVs() { freshGVs := make(map[schema.GroupVersion]struct{}) @@ -293,42 +246,80 @@ func (m *GVExclusionManager) reconcileActiveGVs() { // Load previous active GVs and detect diff previousGVs := m.loadCurrentlyActiveGVs() - diffDetected := m.diffGVs(previousGVs, freshGVs) + deletedGVs, activeGVsChanged := diffGVs(previousGVs, freshGVs) - if diffDetected { - m.currentlyActiveGVs.Store(freshGVs) - klog.V(4).Infof("Active GVs updated: %d GVs now active", len(freshGVs)) + // Update recentlyDeletedGVs: add newly deleted GVs and reap expired ones + recentlyDeletedChanged := m.updateRecentlyDeletedGVs(deletedGVs) - // Queue Worker 3 for re-filtering + if activeGVsChanged || recentlyDeletedChanged { + if activeGVsChanged { + m.currentlyActiveGVs.Store(freshGVs) + klog.V(4).Infof("Active GVs updated: %d GVs now active", len(freshGVs)) + } + // Queue re-filtering of peer discovery cache m.refilterQueue.Add("refilter") } else { - klog.V(4).Infof("No diff detected in active GVs") + klog.V(4).Infof("No changes detected in active or recently deleted GVs") } } -// diffGVs checks if there's a presence/absence difference between two GV maps -func (m *GVExclusionManager) diffGVs(old, new map[schema.GroupVersion]struct{}) bool { - if len(old) != len(new) { - return true +// updateRecentlyDeletedGVs adds newly deleted GVs to recentlyDeletedGVs +// and reaps expired ones. Returns true if any changes were made. +func (m *GVExclusionManager) updateRecentlyDeletedGVs(deletedGVs []schema.GroupVersion) bool { + deletedMap := m.loadRecentlyDeletedGVs() + // Early return if nothing to do + if len(deletedGVs) == 0 && len(deletedMap) == 0 { + return false } + now := time.Now() + newDeletedMap := make(map[schema.GroupVersion]time.Time, len(deletedMap)+len(deletedGVs)) + changed := false + + // Copy existing entries, reaping expired ones + for gv, deletionTime := range deletedMap { + if now.Sub(deletionTime) > m.exclusionGracePeriod { + klog.V(4).Infof("Reaping GV %s (grace period expired)", gv.String()) + changed = true + // Don't add to new map (effectively removing it) + } else { + newDeletedMap[gv] = deletionTime + } + } + + // Add newly deleted GVs + for _, gv := range deletedGVs { + newDeletedMap[gv] = now + klog.V(4).Infof("GV %s deleted: moved to recentlyDeletedGVs", gv.String()) + changed = true + } + + if changed { + m.recentlyDeletedGVs.Store(newDeletedMap) + } + + return changed +} + +// diffGVs compares old and new GV maps, returns GVs that were deleted (in old but not new) +// and a boolean indicating if there were any changes (additions or deletions). +func diffGVs(old, new map[schema.GroupVersion]struct{}) ([]schema.GroupVersion, bool) { + var deletedGVs []schema.GroupVersion + hasChanges := len(old) != len(new) + + // Find deleted GVs (in old but not in new) for gv := range old { if _, exists := new[gv]; !exists { - return true + deletedGVs = append(deletedGVs, gv) + hasChanges = true } } - for gv := range new { - if _, exists := old[gv]; !exists { - return true - } - } - - return false + return deletedGVs, hasChanges } // RunPeerDiscoveryReaper runs Worker 2: Reaper -// This worker periodically removes expired GVs from recentlyDeletedGVs. +// This worker periodically triggers reconciliation which also reaps expired GVs. func (m *GVExclusionManager) RunPeerDiscoveryReaper(ctx context.Context) { klog.Infof("Starting GV Reaper with %s interval", m.reaperCheckInterval) ticker := time.NewTicker(m.reaperCheckInterval) @@ -337,7 +328,8 @@ func (m *GVExclusionManager) RunPeerDiscoveryReaper(ctx context.Context) { for { select { case <-ticker.C: - m.reapExpiredGVs() + // Trigger reconciliation which will also reap expired GVs + m.activeGVQueue.Add("reap") case <-ctx.Done(): klog.Info("GV Reaper stopped") return @@ -345,30 +337,7 @@ func (m *GVExclusionManager) RunPeerDiscoveryReaper(ctx context.Context) { } } -// reapExpiredGVs removes GVs from recentlyDeletedGVs that have exceeded the grace period. -func (m *GVExclusionManager) reapExpiredGVs() { - deletedMap := m.loadRecentlyDeletedGVs() - now := time.Now() - newDeletedMap := make(map[schema.GroupVersion]time.Time) - anyReaped := false - - for gv, deletionTime := range deletedMap { - if now.Sub(deletionTime) > m.exclusionGracePeriod { - klog.V(4).Infof("Reaping GV %s (grace period expired)", gv.String()) - anyReaped = true - // Don't add to new map (effectively removing it) - } else { - newDeletedMap[gv] = deletionTime - } - } - - if anyReaped { - // Atomic swap - m.recentlyDeletedGVs.Store(newDeletedMap) - } -} - -// RunPeerDiscoveryRefilter runs Worker 3: Peer Discovery Re-filter +// RunPeerDiscoveryRefilter runs Worker 2: Peer Discovery Re-filter // This worker filters the peer discovery cache using the exclusion set. func (m *GVExclusionManager) RunPeerDiscoveryRefilter(ctx context.Context, workers int) { defer m.refilterQueue.ShutDown() diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/gv_exclusion_manager_test.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/gv_exclusion_manager_test.go index a7324b86fe1..1e3ea220e32 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/gv_exclusion_manager_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/gv_exclusion_manager_test.go @@ -168,7 +168,7 @@ func TestReapExpiredGVs(t *testing.T) { t.Run(tt.name, func(t *testing.T) { mgr := NewGVExclusionManager(tt.gracePeriod, 50*time.Millisecond, &atomic.Value{}, &atomic.Pointer[func()]{}) mgr.recentlyDeletedGVs.Store(tt.deletedGVs) - mgr.reapExpiredGVs() + mgr.updateRecentlyDeletedGVs(nil) result := mgr.loadRecentlyDeletedGVs() for _, gv := range tt.wantReaped { @@ -181,7 +181,7 @@ func TestReapExpiredGVs(t *testing.T) { } func TestDetectDiff(t *testing.T) { - mgr := NewGVExclusionManager(5*time.Minute, 1*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{}) + _ = NewGVExclusionManager(5*time.Minute, 1*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{}) tests := []struct { name string @@ -241,9 +241,9 @@ func TestDetectDiff(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - result := mgr.diffGVs(tt.old, tt.new) + _, result := diffGVs(tt.old, tt.new) if result != tt.want { - t.Errorf("detectDiff() = %v, want %v", result, tt.want) + t.Errorf("diffGVs() = %v, want %v", result, tt.want) } }) }