Use AddAfter for GV reaping instead of periodic ticker

This commit is contained in:
Richa Banker 2026-02-08 22:03:00 -08:00
parent d03e9c8708
commit f4882eeaa6
4 changed files with 22 additions and 65 deletions

View file

@ -228,7 +228,6 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele
s.GenericAPIServer.AddPostStartHookOrDie("peer-discovery-workers", func(context genericapiserver.PostStartHookContext) error {
go c.Extra.PeerProxy.RunPeerDiscoveryCacheSync(context, 1)
go c.Extra.PeerProxy.RunPeerDiscoveryActiveGVTracker(context)
go c.Extra.PeerProxy.RunPeerDiscoveryReaper(context)
go c.Extra.PeerProxy.RunPeerDiscoveryRefilter(context)
return nil
})

View file

@ -42,11 +42,11 @@ type GVExtractor func(obj interface{}) []schema.GroupVersion
// - recentlyDeletedGVs: GVs belonging to CRDs and aggregated APIServices that were recently deleted,
// tracked with deletion timestamp for grace period
//
// It runs two workers and a periodic ticker:
// 1. Active GV Tracker: Triggered on CRD/APIService events or reaper ticks,
// rebuilds active GVs and reaps expired deleted GVs
// It runs two workers:
// 1. Active GV Tracker: Triggered on CRD/APIService events, rebuilds active GVs
// and reaps expired deleted GVs. When a GV is deleted, a delayed sync is scheduled
// after the grace period to reap it.
// 2. Peer Discovery Re-filter: Rate-limited worker that filters peer cache
// 3. Reaper Ticker: Periodically triggers the Active GV Tracker to reap expired GVs
type GVExclusionManager struct {
// Atomic maps for lock-free access
currentlyActiveGVs atomic.Value // map[schema.GroupVersion]struct{}
@ -58,11 +58,10 @@ type GVExclusionManager struct {
apiServiceInformer cache.SharedIndexInformer
apiServiceExtractor GVExtractor
// Worker 1: triggered by CRD/APIService events or reaper ticks
// Worker 1: triggered by CRD/APIService events or delayed reap scheduling
activeGVQueue workqueue.TypedRateLimitingInterface[string]
// Reaper ticker configuration
// Grace period before reaping deleted GVs from the exclusion set
exclusionGracePeriod time.Duration
reaperCheckInterval time.Duration
// Worker 2: triggered by Active/Deleted GV changes
refilterQueue workqueue.TypedRateLimitingInterface[string]
@ -78,13 +77,11 @@ type GVExclusionManager struct {
// NewGVExclusionManager creates a new GV exclusion manager.
func NewGVExclusionManager(
exclusionGracePeriod time.Duration,
reaperCheckInterval time.Duration,
rawPeerDiscoveryCache *atomic.Value,
invalidationCallback *atomic.Pointer[func()],
) *GVExclusionManager {
mgr := &GVExclusionManager{
exclusionGracePeriod: exclusionGracePeriod,
reaperCheckInterval: reaperCheckInterval,
rawPeerDiscoveryCache: rawPeerDiscoveryCache,
invalidationCallback: invalidationCallback,
activeGVQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
@ -196,7 +193,6 @@ func (m *GVExclusionManager) handleGVUpdate() {
// RunPeerDiscoveryActiveGVTracker runs the Active GV Tracker worker.
// This worker is triggered by CRD/APIService events and
// rebuilds the active GV set and reaps expired GVs.
// Only a single worker is used to avoid race conditions on atomic store operations.
func (m *GVExclusionManager) RunPeerDiscoveryActiveGVTracker(ctx context.Context) {
defer m.activeGVQueue.ShutDown()
@ -301,7 +297,11 @@ func (m *GVExclusionManager) updateRecentlyDeletedGVs(deletedGVs []schema.GroupV
}
}
// Add newly deleted GVs
// Schedule a delayed sync to reap expired GVs after the grace period.
if len(deletedGVs) > 0 {
m.activeGVQueue.AddAfter("sync", m.exclusionGracePeriod)
}
for _, gv := range deletedGVs {
newDeletedMap[gv] = now
klog.V(4).Infof("GV %s deleted: moved to recentlyDeletedGVs", gv.String())
@ -332,25 +332,6 @@ func diffGVs(old, new map[schema.GroupVersion]struct{}) ([]schema.GroupVersion,
return deletedGVs, hasChanges
}
// RunPeerDiscoveryReaper runs Worker 2: Reaper
// This worker periodically triggers reconciliation which also reaps expired GVs.
func (m *GVExclusionManager) RunPeerDiscoveryReaper(ctx context.Context) {
klog.Infof("Starting GV Reaper with %s interval", m.reaperCheckInterval)
ticker := time.NewTicker(m.reaperCheckInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// Trigger reconciliation which will also reap expired GVs
m.activeGVQueue.Add("sync")
case <-ctx.Done():
klog.Info("GV Reaper stopped")
return
}
}
}
// RunPeerDiscoveryRefilter runs the Peer Discovery Re-filter worker.
// This worker filters the peer discovery cache using the exclusion set.
func (m *GVExclusionManager) RunPeerDiscoveryRefilter(ctx context.Context) {

View file

@ -17,6 +17,7 @@ limitations under the License.
package peerproxy
import (
"slices"
"sync/atomic"
"testing"
"time"
@ -102,7 +103,7 @@ func TestGetExclusionSet(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mgr := NewGVExclusionManager(5*time.Minute, 1*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{})
mgr := NewGVExclusionManager(5*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{})
if tt.activeGVs != nil {
mgr.currentlyActiveGVs.Store(tt.activeGVs)
@ -166,7 +167,7 @@ func TestReapExpiredGVs(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mgr := NewGVExclusionManager(tt.gracePeriod, 50*time.Millisecond, &atomic.Value{}, &atomic.Pointer[func()]{})
mgr := NewGVExclusionManager(tt.gracePeriod, &atomic.Value{}, &atomic.Pointer[func()]{})
mgr.recentlyDeletedGVs.Store(tt.deletedGVs)
mgr.updateRecentlyDeletedGVs(nil)
result := mgr.loadRecentlyDeletedGVs()
@ -181,7 +182,7 @@ func TestReapExpiredGVs(t *testing.T) {
}
func TestDetectDiff(t *testing.T) {
_ = NewGVExclusionManager(5*time.Minute, 1*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{})
_ = NewGVExclusionManager(5*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{})
tests := []struct {
name string
@ -259,15 +260,8 @@ func TestDetectDiff(t *testing.T) {
t.Errorf("diffGVs() deleted count = %d, want %d", len(deletedGVs), len(tt.wantDeleted))
}
for _, wantGV := range tt.wantDeleted {
found := false
for _, gotGV := range deletedGVs {
if gotGV == wantGV {
found = true
break
}
}
if !found {
t.Errorf("diffGVs() missing deleted GV %v", wantGV)
if !slices.Contains(deletedGVs, wantGV) {
t.Errorf("diffGVs() deleted missing GV %v", wantGV)
}
}
})
@ -376,7 +370,7 @@ func TestFilterPeerDiscoveryCache(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mgr := NewGVExclusionManager(5*time.Minute, 1*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{})
mgr := NewGVExclusionManager(5*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{})
if tt.activeGVs != nil {
mgr.currentlyActiveGVs.Store(tt.activeGVs)
@ -483,7 +477,7 @@ func TestFilterPeerCacheEntry(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mgr := NewGVExclusionManager(5*time.Minute, 1*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{})
mgr := NewGVExclusionManager(5*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{})
filtered := mgr.filterPeerCacheEntry(tt.entry, tt.exclusionSet)
@ -555,7 +549,7 @@ func TestFilterGroupDiscovery(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mgr := NewGVExclusionManager(5*time.Minute, 1*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{})
mgr := NewGVExclusionManager(5*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{})
filtered := mgr.filterGroupDiscovery(tt.groupDiscoveries, tt.exclusionSet)
for groupName, wantVersionStrs := range tt.wantGroups {

View file

@ -55,9 +55,6 @@ const (
// the deleted CRD or aggregated API before this server stops excluding it
// in peer-aggregated discovery and while proxying requests to peers.
defaultExclusionGracePeriod = 5 * time.Minute
// defaultExclusionReaperInterval is the interval at which the we
// clean up deleted groups from the exclusion list.
defaultExclusionReaperInterval = 1 * time.Minute
)
// Interface defines how the Mixed Version Proxy filter interacts with the underlying system.
@ -100,16 +97,10 @@ type Interface interface {
// RunPeerDiscoveryActiveGVTracker starts a worker that processes CRD/APIService informer
// events to rebuild the set of actively served GroupVersions. This worker is triggered
// whenever a CRD or APIService is added or updated and updates the exclusion
// set accordingly.
// set accordingly. When a GV is deleted, a delayed sync is automatically scheduled
// after the grace period to reap expired GVs from the exclusion set.
RunPeerDiscoveryActiveGVTracker(ctx context.Context)
// RunPeerDiscoveryReaper starts a background worker that periodically removes expired
// GroupVersions from the exclusion set. When a CRD/APIService is deleted, its GV remains
// in the exclusion set for a grace period (default 5 minutes) to allow all peer API servers
// to observe the deletion. The reaper runs at a configured interval (default 1 minute)
// and removes GVs whose grace period has elapsed.
RunPeerDiscoveryReaper(ctx context.Context)
// RunPeerDiscoveryRefilter starts a worker that re-applies exclusion filtering to the
// cached peer discovery data whenever the exclusion set changes. This ensures that
// already-cached peer discovery responses are immediately updated to exclude newly added
@ -149,7 +140,6 @@ func NewPeerProxyHandler(
h.gvExclusionManager = NewGVExclusionManager(
defaultExclusionGracePeriod,
defaultExclusionReaperInterval,
&h.rawPeerDiscoveryCache,
&h.cacheInvalidationCallback,
)
@ -235,13 +225,6 @@ func (h *peerProxyHandler) RunPeerDiscoveryActiveGVTracker(ctx context.Context)
}
}
// RunPeerDiscoveryReaper starts the worker that removes expired GVs from the exclusion set.
func (h *peerProxyHandler) RunPeerDiscoveryReaper(ctx context.Context) {
if h.gvExclusionManager != nil {
h.gvExclusionManager.RunPeerDiscoveryReaper(ctx)
}
}
// RunPeerDiscoveryRefilter starts the worker that refilters peer discovery cache.
func (h *peerProxyHandler) RunPeerDiscoveryRefilter(ctx context.Context) {
if h.gvExclusionManager != nil {