From f4882eeaa63b8df9e47710ca02ac228a593e13f2 Mon Sep 17 00:00:00 2001 From: Richa Banker Date: Sun, 8 Feb 2026 22:03:00 -0800 Subject: [PATCH] Use AddAfter for GV reaping instead of periodic ticker --- pkg/controlplane/apiserver/server.go | 1 - .../util/peerproxy/gv_exclusion_manager.go | 41 +++++-------------- .../peerproxy/gv_exclusion_manager_test.go | 24 ++++------- .../apiserver/pkg/util/peerproxy/peerproxy.go | 21 +--------- 4 files changed, 22 insertions(+), 65 deletions(-) diff --git a/pkg/controlplane/apiserver/server.go b/pkg/controlplane/apiserver/server.go index 3382ff69eb9..5e1bc1fc122 100644 --- a/pkg/controlplane/apiserver/server.go +++ b/pkg/controlplane/apiserver/server.go @@ -228,7 +228,6 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele s.GenericAPIServer.AddPostStartHookOrDie("peer-discovery-workers", func(context genericapiserver.PostStartHookContext) error { go c.Extra.PeerProxy.RunPeerDiscoveryCacheSync(context, 1) go c.Extra.PeerProxy.RunPeerDiscoveryActiveGVTracker(context) - go c.Extra.PeerProxy.RunPeerDiscoveryReaper(context) go c.Extra.PeerProxy.RunPeerDiscoveryRefilter(context) return nil }) diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/gv_exclusion_manager.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/gv_exclusion_manager.go index 0015345e0bb..e7fb0071031 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/gv_exclusion_manager.go +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/gv_exclusion_manager.go @@ -42,11 +42,11 @@ type GVExtractor func(obj interface{}) []schema.GroupVersion // - recentlyDeletedGVs: GVs belonging to CRDs and aggregated APIServices that were recently deleted, // tracked with deletion timestamp for grace period // -// It runs two workers and a periodic ticker: -// 1. Active GV Tracker: Triggered on CRD/APIService events or reaper ticks, -// rebuilds active GVs and reaps expired deleted GVs +// It runs two workers: +// 1. Active GV Tracker: Triggered on CRD/APIService events, rebuilds active GVs +// and reaps expired deleted GVs. When a GV is deleted, a delayed sync is scheduled +// after the grace period to reap it. // 2. Peer Discovery Re-filter: Rate-limited worker that filters peer cache -// 3. Reaper Ticker: Periodically triggers the Active GV Tracker to reap expired GVs type GVExclusionManager struct { // Atomic maps for lock-free access currentlyActiveGVs atomic.Value // map[schema.GroupVersion]struct{} @@ -58,11 +58,10 @@ type GVExclusionManager struct { apiServiceInformer cache.SharedIndexInformer apiServiceExtractor GVExtractor - // Worker 1: triggered by CRD/APIService events or reaper ticks + // Worker 1: triggered by CRD/APIService events or delayed reap scheduling activeGVQueue workqueue.TypedRateLimitingInterface[string] - // Reaper ticker configuration + // Grace period before reaping deleted GVs from the exclusion set exclusionGracePeriod time.Duration - reaperCheckInterval time.Duration // Worker 2: triggered by Active/Deleted GV changes refilterQueue workqueue.TypedRateLimitingInterface[string] @@ -78,13 +77,11 @@ type GVExclusionManager struct { // NewGVExclusionManager creates a new GV exclusion manager. func NewGVExclusionManager( exclusionGracePeriod time.Duration, - reaperCheckInterval time.Duration, rawPeerDiscoveryCache *atomic.Value, invalidationCallback *atomic.Pointer[func()], ) *GVExclusionManager { mgr := &GVExclusionManager{ exclusionGracePeriod: exclusionGracePeriod, - reaperCheckInterval: reaperCheckInterval, rawPeerDiscoveryCache: rawPeerDiscoveryCache, invalidationCallback: invalidationCallback, activeGVQueue: workqueue.NewTypedRateLimitingQueueWithConfig( @@ -196,7 +193,6 @@ func (m *GVExclusionManager) handleGVUpdate() { // RunPeerDiscoveryActiveGVTracker runs the Active GV Tracker worker. // This worker is triggered by CRD/APIService events and // rebuilds the active GV set and reaps expired GVs. -// Only a single worker is used to avoid race conditions on atomic store operations. func (m *GVExclusionManager) RunPeerDiscoveryActiveGVTracker(ctx context.Context) { defer m.activeGVQueue.ShutDown() @@ -301,7 +297,11 @@ func (m *GVExclusionManager) updateRecentlyDeletedGVs(deletedGVs []schema.GroupV } } - // Add newly deleted GVs + // Schedule a delayed sync to reap expired GVs after the grace period. + if len(deletedGVs) > 0 { + m.activeGVQueue.AddAfter("sync", m.exclusionGracePeriod) + } + for _, gv := range deletedGVs { newDeletedMap[gv] = now klog.V(4).Infof("GV %s deleted: moved to recentlyDeletedGVs", gv.String()) @@ -332,25 +332,6 @@ func diffGVs(old, new map[schema.GroupVersion]struct{}) ([]schema.GroupVersion, return deletedGVs, hasChanges } -// RunPeerDiscoveryReaper runs Worker 2: Reaper -// This worker periodically triggers reconciliation which also reaps expired GVs. -func (m *GVExclusionManager) RunPeerDiscoveryReaper(ctx context.Context) { - klog.Infof("Starting GV Reaper with %s interval", m.reaperCheckInterval) - ticker := time.NewTicker(m.reaperCheckInterval) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - // Trigger reconciliation which will also reap expired GVs - m.activeGVQueue.Add("sync") - case <-ctx.Done(): - klog.Info("GV Reaper stopped") - return - } - } -} - // RunPeerDiscoveryRefilter runs the Peer Discovery Re-filter worker. // This worker filters the peer discovery cache using the exclusion set. func (m *GVExclusionManager) RunPeerDiscoveryRefilter(ctx context.Context) { diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/gv_exclusion_manager_test.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/gv_exclusion_manager_test.go index 85e921ccc45..bc8125bf20e 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/gv_exclusion_manager_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/gv_exclusion_manager_test.go @@ -17,6 +17,7 @@ limitations under the License. package peerproxy import ( + "slices" "sync/atomic" "testing" "time" @@ -102,7 +103,7 @@ func TestGetExclusionSet(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - mgr := NewGVExclusionManager(5*time.Minute, 1*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{}) + mgr := NewGVExclusionManager(5*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{}) if tt.activeGVs != nil { mgr.currentlyActiveGVs.Store(tt.activeGVs) @@ -166,7 +167,7 @@ func TestReapExpiredGVs(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - mgr := NewGVExclusionManager(tt.gracePeriod, 50*time.Millisecond, &atomic.Value{}, &atomic.Pointer[func()]{}) + mgr := NewGVExclusionManager(tt.gracePeriod, &atomic.Value{}, &atomic.Pointer[func()]{}) mgr.recentlyDeletedGVs.Store(tt.deletedGVs) mgr.updateRecentlyDeletedGVs(nil) result := mgr.loadRecentlyDeletedGVs() @@ -181,7 +182,7 @@ func TestReapExpiredGVs(t *testing.T) { } func TestDetectDiff(t *testing.T) { - _ = NewGVExclusionManager(5*time.Minute, 1*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{}) + _ = NewGVExclusionManager(5*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{}) tests := []struct { name string @@ -259,15 +260,8 @@ func TestDetectDiff(t *testing.T) { t.Errorf("diffGVs() deleted count = %d, want %d", len(deletedGVs), len(tt.wantDeleted)) } for _, wantGV := range tt.wantDeleted { - found := false - for _, gotGV := range deletedGVs { - if gotGV == wantGV { - found = true - break - } - } - if !found { - t.Errorf("diffGVs() missing deleted GV %v", wantGV) + if !slices.Contains(deletedGVs, wantGV) { + t.Errorf("diffGVs() deleted missing GV %v", wantGV) } } }) @@ -376,7 +370,7 @@ func TestFilterPeerDiscoveryCache(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - mgr := NewGVExclusionManager(5*time.Minute, 1*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{}) + mgr := NewGVExclusionManager(5*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{}) if tt.activeGVs != nil { mgr.currentlyActiveGVs.Store(tt.activeGVs) @@ -483,7 +477,7 @@ func TestFilterPeerCacheEntry(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - mgr := NewGVExclusionManager(5*time.Minute, 1*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{}) + mgr := NewGVExclusionManager(5*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{}) filtered := mgr.filterPeerCacheEntry(tt.entry, tt.exclusionSet) @@ -555,7 +549,7 @@ func TestFilterGroupDiscovery(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - mgr := NewGVExclusionManager(5*time.Minute, 1*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{}) + mgr := NewGVExclusionManager(5*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{}) filtered := mgr.filterGroupDiscovery(tt.groupDiscoveries, tt.exclusionSet) for groupName, wantVersionStrs := range tt.wantGroups { diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy.go index bc3a44c954b..9644be4cfd0 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy.go +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy.go @@ -55,9 +55,6 @@ const ( // the deleted CRD or aggregated API before this server stops excluding it // in peer-aggregated discovery and while proxying requests to peers. defaultExclusionGracePeriod = 5 * time.Minute - // defaultExclusionReaperInterval is the interval at which the we - // clean up deleted groups from the exclusion list. - defaultExclusionReaperInterval = 1 * time.Minute ) // Interface defines how the Mixed Version Proxy filter interacts with the underlying system. @@ -100,16 +97,10 @@ type Interface interface { // RunPeerDiscoveryActiveGVTracker starts a worker that processes CRD/APIService informer // events to rebuild the set of actively served GroupVersions. This worker is triggered // whenever a CRD or APIService is added or updated and updates the exclusion - // set accordingly. + // set accordingly. When a GV is deleted, a delayed sync is automatically scheduled + // after the grace period to reap expired GVs from the exclusion set. RunPeerDiscoveryActiveGVTracker(ctx context.Context) - // RunPeerDiscoveryReaper starts a background worker that periodically removes expired - // GroupVersions from the exclusion set. When a CRD/APIService is deleted, its GV remains - // in the exclusion set for a grace period (default 5 minutes) to allow all peer API servers - // to observe the deletion. The reaper runs at a configured interval (default 1 minute) - // and removes GVs whose grace period has elapsed. - RunPeerDiscoveryReaper(ctx context.Context) - // RunPeerDiscoveryRefilter starts a worker that re-applies exclusion filtering to the // cached peer discovery data whenever the exclusion set changes. This ensures that // already-cached peer discovery responses are immediately updated to exclude newly added @@ -149,7 +140,6 @@ func NewPeerProxyHandler( h.gvExclusionManager = NewGVExclusionManager( defaultExclusionGracePeriod, - defaultExclusionReaperInterval, &h.rawPeerDiscoveryCache, &h.cacheInvalidationCallback, ) @@ -235,13 +225,6 @@ func (h *peerProxyHandler) RunPeerDiscoveryActiveGVTracker(ctx context.Context) } } -// RunPeerDiscoveryReaper starts the worker that removes expired GVs from the exclusion set. -func (h *peerProxyHandler) RunPeerDiscoveryReaper(ctx context.Context) { - if h.gvExclusionManager != nil { - h.gvExclusionManager.RunPeerDiscoveryReaper(ctx) - } -} - // RunPeerDiscoveryRefilter starts the worker that refilters peer discovery cache. func (h *peerProxyHandler) RunPeerDiscoveryRefilter(ctx context.Context) { if h.gvExclusionManager != nil {