Consolidate delete handler, reaper logic into same sync loop using the same worker - activeGVQueue

This commit is contained in:
Richa Banker 2026-01-21 16:41:01 -08:00
parent 1713b6d8f2
commit aaed7525dd
3 changed files with 85 additions and 135 deletions

View file

@ -224,30 +224,11 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele
return err
})
// Run peer-discovery sync loop
s.GenericAPIServer.AddPostStartHookOrDie("peer-discovery-cache-sync", func(context genericapiserver.PostStartHookContext) error {
// Run peer-discovery workers
s.GenericAPIServer.AddPostStartHookOrDie("peer-discovery-workers", func(context genericapiserver.PostStartHookContext) error {
go c.Extra.PeerProxy.RunPeerDiscoveryCacheSync(context, 1)
return nil
})
// RunActiveGVTracker monitors CRDs/APIServices and maintains the active GV list for exclusion from peer-discovery.
s.GenericAPIServer.AddPostStartHookOrDie("peer-discovery-gv-active-tracker", func(context genericapiserver.PostStartHookContext) error {
go c.Extra.PeerProxy.RunPeerDiscoveryActiveGVTracker(context, 1)
return nil
})
// RunReaper removes expired GVs from the exclusion set for peer-discovery after their grace period.
// This ensures we don't indefinitely exclude GVs that are no longer present.
s.GenericAPIServer.AddPostStartHookOrDie("peer-discovery-gv-exclusion-reaper", func(context genericapiserver.PostStartHookContext) error {
go c.Extra.PeerProxy.RunPeerDiscoveryReaper(context)
return nil
})
// RunPeerDiscoveryRefilter re-applies the exclusion filter to the existing peer discovery cache
// whenever the exclusion set changes (e.g., CRD or aggregated API added/deleted). This is different from the
// initial filtering that happens when peer discovery is first fetched - this worker ensures
// the already-cached data stays consistent with the current exclusion set.
s.GenericAPIServer.AddPostStartHookOrDie("peer-discovery-refilter", func(context genericapiserver.PostStartHookContext) error {
go c.Extra.PeerProxy.RunPeerDiscoveryRefilter(context, 1)
return nil
})

View file

@ -42,10 +42,11 @@ type GVExtractor func(obj interface{}) []schema.GroupVersion
// - recentlyDeletedGVs: GVs belonging to CRDs and aggregated APIServices that were recently deleted,
// tracked with deletion timestamp for grace period
//
// It runs three workers:
// 1. Active GV Tracker: Triggered on CRD/APIService events, rebuilds active GVs
// 2. Reaper: Periodically removes expired GVs from recentlyDeletedGVs
// 3. Peer Discovery Re-filter: Rate-limited worker that filters peer cache
// It runs two workers and a periodic ticker:
// 1. Active GV Tracker: Triggered on CRD/APIService events or reaper ticks,
// rebuilds active GVs and reaps expired deleted GVs
// 2. Peer Discovery Re-filter: Rate-limited worker that filters peer cache
// 3. Reaper Ticker: Periodically triggers the Active GV Tracker to reap expired GVs
type GVExclusionManager struct {
// Atomic maps for lock-free access
currentlyActiveGVs atomic.Value // map[schema.GroupVersion]struct{}
@ -57,12 +58,12 @@ type GVExclusionManager struct {
apiServiceInformer cache.SharedIndexInformer
apiServiceExtractor GVExtractor
// Worker 1: triggered by CRD/APIService events
// Worker 1: triggered by CRD/APIService events or reaper ticks
activeGVQueue workqueue.TypedRateLimitingInterface[string]
// Worker 2: periodic reaper's configuration
// Reaper ticker configuration
exclusionGracePeriod time.Duration
reaperCheckInterval time.Duration
// Worker 3: triggered by Active/Deleted GV changes
// Worker 2: triggered by Active/Deleted GV changes
refilterQueue workqueue.TypedRateLimitingInterface[string]
peerDiscoveryCache *atomic.Value // peerProxyHandler.peerDiscoveryInfoCache
@ -112,15 +113,7 @@ func (m *GVExclusionManager) RegisterCRDInformerHandlers(crdInformer cache.Share
m.handleGVUpdate()
},
DeleteFunc: func(obj interface{}) {
// Handle tombstone objects
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = tombstone.Obj
}
gvs := extractor(obj)
if gvs == nil {
return
}
m.onDeleteEvent(gvs)
m.handleGVUpdate()
},
})
return err
@ -140,16 +133,7 @@ func (m *GVExclusionManager) RegisterAPIServiceInformerHandlers(apiServiceInform
m.handleGVUpdate()
},
DeleteFunc: func(obj interface{}) {
// Handle tombstone objects
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = tombstone.Obj
}
gvs := extractor(obj)
if gvs == nil {
return
}
m.onDeleteEvent(gvs)
m.handleGVUpdate()
},
})
return err
@ -187,49 +171,15 @@ func (m *GVExclusionManager) getExclusionSet() map[schema.GroupVersion]struct{}
}
// handleGVUpdate is called when a CRD or APIService event occurs.
// This triggers Worker 1 to rebuild the active GV set.
// This triggers reconciliation which rebuilds the active GV set
// and also reaps expired GVs if indicated so.
func (m *GVExclusionManager) handleGVUpdate() {
m.activeGVQueue.Add("sync")
}
// onDeleteEvent is called when a CRD or APIService is deleted.
// This removes the GVs from currentlyActiveGVs, adds them to recentlyDeletedGVs
// with current timestamp, and queues Worker 3 to refilter the peer discovery cache.
func (m *GVExclusionManager) onDeleteEvent(gvs []schema.GroupVersion) {
if len(gvs) == 0 {
return
}
// Remove from currentlyActiveGVs
activeMap := m.loadCurrentlyActiveGVs()
newActiveMap := make(map[schema.GroupVersion]struct{}, len(activeMap))
for k, v := range activeMap {
newActiveMap[k] = v
}
for _, gv := range gvs {
delete(newActiveMap, gv)
}
m.currentlyActiveGVs.Store(newActiveMap)
// Add to recentlyDeletedGVs
deletedMap := m.loadRecentlyDeletedGVs()
newDeletedMap := make(map[schema.GroupVersion]time.Time, len(deletedMap)+len(gvs))
for k, v := range deletedMap {
newDeletedMap[k] = v
}
now := time.Now()
for _, gv := range gvs {
newDeletedMap[gv] = now
klog.V(4).Infof("GV %s deleted: moved to recentlyDeletedGVs", gv.String())
}
m.recentlyDeletedGVs.Store(newDeletedMap)
// Queue Worker 3
m.refilterQueue.Add("refilter")
}
// RunPeerDiscoveryActiveGVTracker runs Worker 1: Active GV Tracker
// This worker is triggered by CRD/APIService events and rebuilds the active GV set.
// This worker is triggered by CRD/APIService events and
// rebuilds the active GV set and reaps expired GVs if indicated so.
func (m *GVExclusionManager) RunPeerDiscoveryActiveGVTracker(ctx context.Context, workers int) {
defer m.activeGVQueue.ShutDown()
@ -264,8 +214,11 @@ func (m *GVExclusionManager) processNextActiveGV(ctx context.Context) bool {
return true
}
// reconcileActiveGVs fetches all GVs from CRD and APIService informers,
// detects diffs with the previous state, and queues Worker 3 if changes detected.
// reconcileActiveGVs does the following
// 1. fetches all GVs from CRD and APIService informers
// 2. detects diffs with the previous state
// 3. adds deleted GVs to recentlyDeletedGVs
// 4. reaps expired GVs, and queues Worker 2 if changes detected.
func (m *GVExclusionManager) reconcileActiveGVs() {
freshGVs := make(map[schema.GroupVersion]struct{})
@ -293,42 +246,80 @@ func (m *GVExclusionManager) reconcileActiveGVs() {
// Load previous active GVs and detect diff
previousGVs := m.loadCurrentlyActiveGVs()
diffDetected := m.diffGVs(previousGVs, freshGVs)
deletedGVs, activeGVsChanged := diffGVs(previousGVs, freshGVs)
if diffDetected {
m.currentlyActiveGVs.Store(freshGVs)
klog.V(4).Infof("Active GVs updated: %d GVs now active", len(freshGVs))
// Update recentlyDeletedGVs: add newly deleted GVs and reap expired ones
recentlyDeletedChanged := m.updateRecentlyDeletedGVs(deletedGVs)
// Queue Worker 3 for re-filtering
if activeGVsChanged || recentlyDeletedChanged {
if activeGVsChanged {
m.currentlyActiveGVs.Store(freshGVs)
klog.V(4).Infof("Active GVs updated: %d GVs now active", len(freshGVs))
}
// Queue re-filtering of peer discovery cache
m.refilterQueue.Add("refilter")
} else {
klog.V(4).Infof("No diff detected in active GVs")
klog.V(4).Infof("No changes detected in active or recently deleted GVs")
}
}
// diffGVs checks if there's a presence/absence difference between two GV maps
func (m *GVExclusionManager) diffGVs(old, new map[schema.GroupVersion]struct{}) bool {
if len(old) != len(new) {
return true
// updateRecentlyDeletedGVs adds newly deleted GVs to recentlyDeletedGVs
// and reaps expired ones. Returns true if any changes were made.
func (m *GVExclusionManager) updateRecentlyDeletedGVs(deletedGVs []schema.GroupVersion) bool {
deletedMap := m.loadRecentlyDeletedGVs()
// Early return if nothing to do
if len(deletedGVs) == 0 && len(deletedMap) == 0 {
return false
}
now := time.Now()
newDeletedMap := make(map[schema.GroupVersion]time.Time, len(deletedMap)+len(deletedGVs))
changed := false
// Copy existing entries, reaping expired ones
for gv, deletionTime := range deletedMap {
if now.Sub(deletionTime) > m.exclusionGracePeriod {
klog.V(4).Infof("Reaping GV %s (grace period expired)", gv.String())
changed = true
// Don't add to new map (effectively removing it)
} else {
newDeletedMap[gv] = deletionTime
}
}
// Add newly deleted GVs
for _, gv := range deletedGVs {
newDeletedMap[gv] = now
klog.V(4).Infof("GV %s deleted: moved to recentlyDeletedGVs", gv.String())
changed = true
}
if changed {
m.recentlyDeletedGVs.Store(newDeletedMap)
}
return changed
}
// diffGVs compares old and new GV maps, returns GVs that were deleted (in old but not new)
// and a boolean indicating if there were any changes (additions or deletions).
func diffGVs(old, new map[schema.GroupVersion]struct{}) ([]schema.GroupVersion, bool) {
var deletedGVs []schema.GroupVersion
hasChanges := len(old) != len(new)
// Find deleted GVs (in old but not in new)
for gv := range old {
if _, exists := new[gv]; !exists {
return true
deletedGVs = append(deletedGVs, gv)
hasChanges = true
}
}
for gv := range new {
if _, exists := old[gv]; !exists {
return true
}
}
return false
return deletedGVs, hasChanges
}
// RunPeerDiscoveryReaper runs Worker 2: Reaper
// This worker periodically removes expired GVs from recentlyDeletedGVs.
// This worker periodically triggers reconciliation which also reaps expired GVs.
func (m *GVExclusionManager) RunPeerDiscoveryReaper(ctx context.Context) {
klog.Infof("Starting GV Reaper with %s interval", m.reaperCheckInterval)
ticker := time.NewTicker(m.reaperCheckInterval)
@ -337,7 +328,8 @@ func (m *GVExclusionManager) RunPeerDiscoveryReaper(ctx context.Context) {
for {
select {
case <-ticker.C:
m.reapExpiredGVs()
// Trigger reconciliation which will also reap expired GVs
m.activeGVQueue.Add("reap")
case <-ctx.Done():
klog.Info("GV Reaper stopped")
return
@ -345,30 +337,7 @@ func (m *GVExclusionManager) RunPeerDiscoveryReaper(ctx context.Context) {
}
}
// reapExpiredGVs removes GVs from recentlyDeletedGVs that have exceeded the grace period.
func (m *GVExclusionManager) reapExpiredGVs() {
deletedMap := m.loadRecentlyDeletedGVs()
now := time.Now()
newDeletedMap := make(map[schema.GroupVersion]time.Time)
anyReaped := false
for gv, deletionTime := range deletedMap {
if now.Sub(deletionTime) > m.exclusionGracePeriod {
klog.V(4).Infof("Reaping GV %s (grace period expired)", gv.String())
anyReaped = true
// Don't add to new map (effectively removing it)
} else {
newDeletedMap[gv] = deletionTime
}
}
if anyReaped {
// Atomic swap
m.recentlyDeletedGVs.Store(newDeletedMap)
}
}
// RunPeerDiscoveryRefilter runs Worker 3: Peer Discovery Re-filter
// RunPeerDiscoveryRefilter runs Worker 2: Peer Discovery Re-filter
// This worker filters the peer discovery cache using the exclusion set.
func (m *GVExclusionManager) RunPeerDiscoveryRefilter(ctx context.Context, workers int) {
defer m.refilterQueue.ShutDown()

View file

@ -168,7 +168,7 @@ func TestReapExpiredGVs(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
mgr := NewGVExclusionManager(tt.gracePeriod, 50*time.Millisecond, &atomic.Value{}, &atomic.Pointer[func()]{})
mgr.recentlyDeletedGVs.Store(tt.deletedGVs)
mgr.reapExpiredGVs()
mgr.updateRecentlyDeletedGVs(nil)
result := mgr.loadRecentlyDeletedGVs()
for _, gv := range tt.wantReaped {
@ -181,7 +181,7 @@ func TestReapExpiredGVs(t *testing.T) {
}
func TestDetectDiff(t *testing.T) {
mgr := NewGVExclusionManager(5*time.Minute, 1*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{})
_ = NewGVExclusionManager(5*time.Minute, 1*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{})
tests := []struct {
name string
@ -241,9 +241,9 @@ func TestDetectDiff(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := mgr.diffGVs(tt.old, tt.new)
_, result := diffGVs(tt.old, tt.new)
if result != tt.want {
t.Errorf("detectDiff() = %v, want %v", result, tt.want)
t.Errorf("diffGVs() = %v, want %v", result, tt.want)
}
})
}