separate raw and filtered peer discovery caches

This commit is contained in:
Richa Banker 2026-01-24 20:33:37 -08:00
parent aaed7525dd
commit 4df467d42d
7 changed files with 147 additions and 138 deletions

View file

@ -66,22 +66,27 @@ type GVExclusionManager struct {
// Worker 2: triggered by Active/Deleted GV changes
refilterQueue workqueue.TypedRateLimitingInterface[string]
peerDiscoveryCache *atomic.Value // peerProxyHandler.peerDiscoveryInfoCache
invalidationCallback *atomic.Pointer[func()]
// rawPeerDiscoveryCache is written only by peerLeaseQueue worker
// when peer leases change
rawPeerDiscoveryCache *atomic.Value // map[string]PeerDiscoveryCacheEntry
// filteredPeerDiscoveryCache is written only by the refilter worker
// when raw cache or exclusion set changes.
filteredPeerDiscoveryCache atomic.Value // map[string]PeerDiscoveryCacheEntry
invalidationCallback *atomic.Pointer[func()]
}
// NewGVExclusionManager creates a new GV exclusion manager.
func NewGVExclusionManager(
exclusionGracePeriod time.Duration,
reaperCheckInterval time.Duration,
peerDiscoveryCache *atomic.Value,
rawPeerDiscoveryCache *atomic.Value,
invalidationCallback *atomic.Pointer[func()],
) *GVExclusionManager {
mgr := &GVExclusionManager{
exclusionGracePeriod: exclusionGracePeriod,
reaperCheckInterval: reaperCheckInterval,
peerDiscoveryCache: peerDiscoveryCache,
invalidationCallback: invalidationCallback,
exclusionGracePeriod: exclusionGracePeriod,
reaperCheckInterval: reaperCheckInterval,
rawPeerDiscoveryCache: rawPeerDiscoveryCache,
invalidationCallback: invalidationCallback,
activeGVQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{
@ -96,10 +101,21 @@ func NewGVExclusionManager(
mgr.currentlyActiveGVs.Store(map[schema.GroupVersion]struct{}{})
mgr.recentlyDeletedGVs.Store(map[schema.GroupVersion]time.Time{})
mgr.filteredPeerDiscoveryCache.Store(map[string]PeerDiscoveryCacheEntry{})
return mgr
}
// GetFilteredPeerDiscoveryCache returns the filtered peer discovery cache.
func (m *GVExclusionManager) GetFilteredPeerDiscoveryCache() map[string]PeerDiscoveryCacheEntry {
if cache := m.filteredPeerDiscoveryCache.Load(); cache != nil {
if cacheMap, ok := cache.(map[string]PeerDiscoveryCacheEntry); ok {
return cacheMap
}
}
return map[string]PeerDiscoveryCacheEntry{}
}
// RegisterCRDInformerHandlers registers event handlers for CRD informer using a custom extractor.
// The extractor function is responsible for extracting GroupVersions from CRD objects.
func (m *GVExclusionManager) RegisterCRDInformerHandlers(crdInformer cache.SharedIndexInformer, extractor GVExtractor) error {
@ -256,8 +272,7 @@ func (m *GVExclusionManager) reconcileActiveGVs() {
m.currentlyActiveGVs.Store(freshGVs)
klog.V(4).Infof("Active GVs updated: %d GVs now active", len(freshGVs))
}
// Queue re-filtering of peer discovery cache
m.refilterQueue.Add("refilter")
m.TriggerRefilter()
} else {
klog.V(4).Infof("No changes detected in active or recently deleted GVs")
}
@ -373,83 +388,57 @@ func (m *GVExclusionManager) processNextRefilter(ctx context.Context) bool {
return true
}
// refilterPeerDiscoveryCache filters the peer discovery cache using the exclusion set.
// Only updates the cache and calls invalidation callback if filtering actually changed content.
// refilterPeerDiscoveryCache reads the raw peer discovery cache,
// applies exclusion filtering, and stores the result to the filtered cache.
func (m *GVExclusionManager) refilterPeerDiscoveryCache() {
if m.peerDiscoveryCache == nil {
klog.Warning("peerDiscoveryCache reference not set")
return
}
cache := m.peerDiscoveryCache.Load()
if cache == nil {
klog.V(4).Infof("Peer discovery cache is empty, skipping re-filter")
return
}
cacheMap, ok := cache.(map[string]PeerDiscoveryCacheEntry)
if !ok {
klog.Warning("Invalid cache type in peerDiscoveryInfoCache")
return
var cacheMap map[string]PeerDiscoveryCacheEntry
if m.rawPeerDiscoveryCache != nil {
if rawCache := m.rawPeerDiscoveryCache.Load(); rawCache != nil {
cacheMap, _ = rawCache.(map[string]PeerDiscoveryCacheEntry)
}
}
if len(cacheMap) == 0 {
klog.V(4).Infof("Peer discovery cache is empty, skipping re-filter")
m.filteredPeerDiscoveryCache.Store(map[string]PeerDiscoveryCacheEntry{})
klog.V(4).Infof("Raw peer discovery cache is empty or unavailable")
return
}
// Filter the cache
filteredCache, changed := m.FilterPeerDiscoveryCache(cacheMap)
filteredCache := m.filterPeerDiscoveryCache(cacheMap)
m.filteredPeerDiscoveryCache.Store(filteredCache)
if changed {
// Atomic swap peer cache
m.peerDiscoveryCache.Store(filteredCache)
// Call invalidation callback
if m.invalidationCallback != nil {
if callback := m.invalidationCallback.Load(); callback != nil {
(*callback)()
}
if m.invalidationCallback != nil {
if callback := m.invalidationCallback.Load(); callback != nil {
(*callback)()
}
klog.V(4).Infof("Peer discovery cache re-filtered, %d GVs excluded", len(m.getExclusionSet()))
} else {
klog.V(4).Infof("No changes after filtering peer discovery cache")
}
klog.V(4).Infof("Peer discovery cache re-filtered, %d GVs in exclusion set", len(m.getExclusionSet()))
}
// FilterPeerDiscoveryCache applies the current exclusion set to the provided peer cache.
// This should be called when new peers are added to ensure their discovery is filtered.
// Returns the filtered cache and a boolean indicating if any changes were made.
func (m *GVExclusionManager) FilterPeerDiscoveryCache(cacheMap map[string]PeerDiscoveryCacheEntry) (map[string]PeerDiscoveryCacheEntry, bool) {
// filterPeerDiscoveryCache applies the current exclusion set to the provided peer cache.
func (m *GVExclusionManager) filterPeerDiscoveryCache(cacheMap map[string]PeerDiscoveryCacheEntry) map[string]PeerDiscoveryCacheEntry {
exclusionSet := m.getExclusionSet()
if len(exclusionSet) == 0 {
return cacheMap, false
return cacheMap
}
filtered := make(map[string]PeerDiscoveryCacheEntry, len(cacheMap))
anyChanged := false
for peerID, entry := range cacheMap {
filteredEntry, changed := m.filterPeerCacheEntry(entry, exclusionSet)
filtered[peerID] = filteredEntry
if changed {
anyChanged = true
}
filtered[peerID] = m.filterPeerCacheEntry(entry, exclusionSet)
}
return filtered, anyChanged
return filtered
}
// filterPeerCacheEntry filters a single peer's cache entry for excluded GVs.
// Returns the filtered entry and whether any GVs were excluded.
func (m *GVExclusionManager) filterPeerCacheEntry(
entry PeerDiscoveryCacheEntry,
exclusionSet map[schema.GroupVersion]struct{},
) (PeerDiscoveryCacheEntry, bool) {
) PeerDiscoveryCacheEntry {
filteredGVRs := make(map[schema.GroupVersionResource]bool, len(entry.GVRs))
anyExcluded := false
for existingGVR, v := range entry.GVRs {
gv := schema.GroupVersion{Group: existingGVR.Group, Version: existingGVR.Version}
if _, excluded := exclusionSet[gv]; excluded {
@ -459,17 +448,17 @@ func (m *GVExclusionManager) filterPeerCacheEntry(
filteredGVRs[existingGVR] = v
}
// If no GVRs were excluded, the exclusion set doesn't intersect with this peer's GVs,
// so we can return the entry unchanged without filtering GroupDiscovery.
if !anyExcluded {
return entry, false
return entry
}
// Filter the group discovery list
filteredGroups := m.filterGroupDiscovery(entry.GroupDiscovery, exclusionSet)
return PeerDiscoveryCacheEntry{
GVRs: filteredGVRs,
GroupDiscovery: filteredGroups,
}, true
}
}
// filterGroupDiscovery filters group discovery entries, removing excluded GVs.
@ -517,3 +506,9 @@ func (m *GVExclusionManager) loadRecentlyDeletedGVs() map[schema.GroupVersion]ti
}
return map[schema.GroupVersion]time.Time{}
}
// TriggerRefilter triggers the refilter worker to apply exclusions to the filtered cache.
// This should be called by peerLeaseQueue after updating the raw peer discovery cache.
func (m *GVExclusionManager) TriggerRefilter() {
m.refilterQueue.Add("refilter")
}

View file

@ -184,16 +184,18 @@ func TestDetectDiff(t *testing.T) {
_ = NewGVExclusionManager(5*time.Minute, 1*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{})
tests := []struct {
name string
old map[schema.GroupVersion]struct{}
new map[schema.GroupVersion]struct{}
want bool
name string
old map[schema.GroupVersion]struct{}
new map[schema.GroupVersion]struct{}
wantChanged bool
wantDeleted []schema.GroupVersion
}{
{
name: "both empty",
old: map[schema.GroupVersion]struct{}{},
new: map[schema.GroupVersion]struct{}{},
want: false,
name: "both empty",
old: map[schema.GroupVersion]struct{}{},
new: map[schema.GroupVersion]struct{}{},
wantChanged: false,
wantDeleted: nil,
},
{
name: "identical",
@ -203,7 +205,8 @@ func TestDetectDiff(t *testing.T) {
new: map[schema.GroupVersion]struct{}{
{Group: "apps", Version: "v1"}: {},
},
want: false,
wantChanged: false,
wantDeleted: nil,
},
{
name: "added GV",
@ -214,7 +217,8 @@ func TestDetectDiff(t *testing.T) {
{Group: "apps", Version: "v1"}: {},
{Group: "batch", Version: "v1"}: {},
},
want: true,
wantChanged: true,
wantDeleted: nil, // No deletions, only addition
},
{
name: "removed GV",
@ -225,7 +229,10 @@ func TestDetectDiff(t *testing.T) {
new: map[schema.GroupVersion]struct{}{
{Group: "apps", Version: "v1"}: {},
},
want: true,
wantChanged: true,
wantDeleted: []schema.GroupVersion{
{Group: "batch", Version: "v1"},
},
},
{
name: "different GVs same size",
@ -235,15 +242,33 @@ func TestDetectDiff(t *testing.T) {
new: map[schema.GroupVersion]struct{}{
{Group: "batch", Version: "v1"}: {},
},
want: true,
wantChanged: true,
wantDeleted: []schema.GroupVersion{
{Group: "apps", Version: "v1"},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, result := diffGVs(tt.old, tt.new)
if result != tt.want {
t.Errorf("diffGVs() = %v, want %v", result, tt.want)
deletedGVs, changed := diffGVs(tt.old, tt.new)
if changed != tt.wantChanged {
t.Errorf("diffGVs() changed = %v, want %v", changed, tt.wantChanged)
}
if len(deletedGVs) != len(tt.wantDeleted) {
t.Errorf("diffGVs() deleted count = %d, want %d", len(deletedGVs), len(tt.wantDeleted))
}
for _, wantGV := range tt.wantDeleted {
found := false
for _, gotGV := range deletedGVs {
if gotGV == wantGV {
found = true
break
}
}
if !found {
t.Errorf("diffGVs() missing deleted GV %v", wantGV)
}
}
})
}
@ -255,7 +280,6 @@ func TestFilterPeerDiscoveryCache(t *testing.T) {
activeGVs map[schema.GroupVersion]struct{}
deletedGVs map[schema.GroupVersion]time.Time
cacheMap map[string]PeerDiscoveryCacheEntry
wantChanged bool
wantPeerGVRs map[string]int // peer name -> GVR count
}{
{
@ -273,7 +297,6 @@ func TestFilterPeerDiscoveryCache(t *testing.T) {
},
},
},
wantChanged: false,
wantPeerGVRs: map[string]int{
"peer1": 1,
"peer2": 1,
@ -296,7 +319,6 @@ func TestFilterPeerDiscoveryCache(t *testing.T) {
},
},
},
wantChanged: true,
wantPeerGVRs: map[string]int{
"peer1": 0, // apps/v1 filtered out
"peer2": 1, // unchanged
@ -322,7 +344,6 @@ func TestFilterPeerDiscoveryCache(t *testing.T) {
},
},
},
wantChanged: true,
wantPeerGVRs: map[string]int{
"peer1": 0, // custom/v1alpha1 filtered out
"peer2": 1, // unchanged
@ -347,7 +368,6 @@ func TestFilterPeerDiscoveryCache(t *testing.T) {
},
},
},
wantChanged: true,
wantPeerGVRs: map[string]int{
"peer1": 1, // apps/v1 and custom/v1alpha1 filtered out, batch/v1 remains
},
@ -365,10 +385,7 @@ func TestFilterPeerDiscoveryCache(t *testing.T) {
mgr.recentlyDeletedGVs.Store(tt.deletedGVs)
}
filtered, changed := mgr.FilterPeerDiscoveryCache(tt.cacheMap)
if changed != tt.wantChanged {
t.Errorf("Want changed=%v, got %v", tt.wantChanged, changed)
}
filtered := mgr.filterPeerDiscoveryCache(tt.cacheMap)
if len(filtered) != len(tt.cacheMap) {
t.Errorf("Want %d peers in filtered cache, got %d", len(tt.cacheMap), len(filtered))
@ -393,9 +410,30 @@ func TestFilterPeerCacheEntry(t *testing.T) {
name string
entry PeerDiscoveryCacheEntry
exclusionSet map[schema.GroupVersion]struct{}
wantChanged bool
wantGVRs []schema.GroupVersionResource
}{
{
name: "empty exclusion set returns entry unchanged",
entry: PeerDiscoveryCacheEntry{
GVRs: map[schema.GroupVersionResource]bool{
{Group: "apps", Version: "v1", Resource: "deployments"}: true,
{Group: "batch", Version: "v1", Resource: "jobs"}: true,
},
GroupDiscovery: []apidiscoveryv2.APIGroupDiscovery{
{
ObjectMeta: metav1.ObjectMeta{Name: "apps"},
Versions: []apidiscoveryv2.APIVersionDiscovery{
{Version: "v1"},
},
},
},
},
exclusionSet: map[schema.GroupVersion]struct{}{},
wantGVRs: []schema.GroupVersionResource{
{Group: "apps", Version: "v1", Resource: "deployments"},
{Group: "batch", Version: "v1", Resource: "jobs"},
},
},
{
name: "filter GVRs and groups",
entry: PeerDiscoveryCacheEntry{
@ -422,7 +460,6 @@ func TestFilterPeerCacheEntry(t *testing.T) {
exclusionSet: map[schema.GroupVersion]struct{}{
{Group: "apps", Version: "v1"}: {},
},
wantChanged: true,
wantGVRs: []schema.GroupVersionResource{
{Group: "batch", Version: "v1", Resource: "jobs"},
{Group: "custom", Version: "v1", Resource: "myresources"},
@ -438,7 +475,6 @@ func TestFilterPeerCacheEntry(t *testing.T) {
exclusionSet: map[schema.GroupVersion]struct{}{
{Group: "apps", Version: "v1"}: {},
},
wantChanged: false,
wantGVRs: []schema.GroupVersionResource{
{Group: "batch", Version: "v1", Resource: "jobs"},
},
@ -449,11 +485,7 @@ func TestFilterPeerCacheEntry(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
mgr := NewGVExclusionManager(5*time.Minute, 1*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{})
filtered, changed := mgr.filterPeerCacheEntry(tt.entry, tt.exclusionSet)
if changed != tt.wantChanged {
t.Errorf("Want changed=%v, got %v", tt.wantChanged, changed)
}
filtered := mgr.filterPeerCacheEntry(tt.entry, tt.exclusionSet)
for _, gvr := range tt.wantGVRs {
if _, found := filtered.GVRs[gvr]; !found {

View file

@ -115,21 +115,17 @@ func (h *peerProxyHandler) syncPeerDiscoveryCache(ctx context.Context) error {
}
}
// Apply current exclusion filter to new peer cache entries.
if filtered, changed := h.gvExclusionManager.FilterPeerDiscoveryCache(newCache); changed {
newCache = filtered
}
h.storePeerDiscoveryCacheAndInvalidate(newCache)
// Store unfiltered data to raw cache and trigger refilter.
// The refilter worker (single writer to filtered cache) will apply exclusions.
h.storeRawPeerDiscoveryCacheAndTriggerRefilter(newCache)
return fetchDiscoveryErr
}
// storePeerDiscoveryCacheAndInvalidate stores the new peer discovery cache and always calls the invalidation callback if set.
func (h *peerProxyHandler) storePeerDiscoveryCacheAndInvalidate(newCache map[string]PeerDiscoveryCacheEntry) {
h.peerDiscoveryInfoCache.Store(newCache)
if callback := h.cacheInvalidationCallback.Load(); callback != nil {
(*callback)()
}
// storeRawPeerDiscoveryCacheAndTriggerRefilter stores the raw (unfiltered) peer discovery cache
// and immediately refilters to update the filtered cache.
func (h *peerProxyHandler) storeRawPeerDiscoveryCacheAndTriggerRefilter(newCache map[string]PeerDiscoveryCacheEntry) {
h.rawPeerDiscoveryCache.Store(newCache)
h.gvExclusionManager.TriggerRefilter()
}
func (h *peerProxyHandler) fetchNewDiscoveryFor(ctx context.Context, serverID string) (PeerDiscoveryCacheEntry, error) {
@ -285,14 +281,8 @@ func (h *peerProxyHandler) isValidPeerIdentityLease(obj interface{}) (*v1.Lease,
func (h *peerProxyHandler) findServiceableByPeerFromPeerDiscoveryCache(gvr schema.GroupVersionResource) []string {
var serviceableByIDs []string
cache := h.peerDiscoveryInfoCache.Load()
if cache == nil {
return serviceableByIDs
}
cacheMap, ok := cache.(map[string]PeerDiscoveryCacheEntry)
if !ok {
klog.Warning("Invalid cache type in peerDiscoveryInfoCache")
cacheMap := h.gvExclusionManager.GetFilteredPeerDiscoveryCache()
if len(cacheMap) == 0 {
return serviceableByIDs
}

View file

@ -192,6 +192,7 @@ func TestRunPeerDiscoveryCacheSync(t *testing.T) {
}
go h.RunPeerDiscoveryCacheSync(ctx, 1)
go h.RunPeerDiscoveryRefilter(ctx, 1)
// Wait for initial cache update.
initialCache := map[string]PeerDiscoveryCacheEntry{}
@ -204,7 +205,7 @@ func TestRunPeerDiscoveryCacheSync(t *testing.T) {
return false, ctx.Err()
default:
}
gotCache := h.peerDiscoveryInfoCache.Load()
gotCache := h.gvExclusionManager.GetFilteredPeerDiscoveryCache()
return assert.ObjectsAreEqual(initialCache, gotCache), nil
})
if err != nil {
@ -251,7 +252,7 @@ func TestRunPeerDiscoveryCacheSync(t *testing.T) {
return false, ctx.Err()
default:
}
gotCache := h.peerDiscoveryInfoCache.Load()
gotCache := h.gvExclusionManager.GetFilteredPeerDiscoveryCache()
r := assert.ObjectsAreEqual(tt.wantCache, gotCache)
return r, nil
})

View file

@ -138,7 +138,7 @@ func NewPeerProxyHandler(
localDiscoveryInfoCache: atomic.Value{},
localDiscoveryCacheTicker: time.NewTicker(localDiscoveryRefreshInterval),
localDiscoveryInfoCachePopulated: make(chan struct{}),
peerDiscoveryInfoCache: atomic.Value{},
rawPeerDiscoveryCache: atomic.Value{},
peerLeaseQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{
@ -150,7 +150,7 @@ func NewPeerProxyHandler(
h.gvExclusionManager = NewGVExclusionManager(
defaultExclusionGracePeriod,
defaultExclusionReaperInterval,
&h.peerDiscoveryInfoCache,
&h.rawPeerDiscoveryCache,
&h.cacheInvalidationCallback,
)
@ -176,7 +176,7 @@ func NewPeerProxyHandler(
discoveryClient.NoPeerDiscovery = true
h.discoveryClient = discoveryClient
h.localDiscoveryInfoCache.Store(map[schema.GroupVersionResource]bool{})
h.peerDiscoveryInfoCache.Store(map[string]PeerDiscoveryCacheEntry{})
h.rawPeerDiscoveryCache.Store(map[string]PeerDiscoveryCacheEntry{})
proxyTransport, err := transport.New(proxyClientConfig)
if err != nil {

View file

@ -79,14 +79,12 @@ type peerProxyHandler struct {
localDiscoveryCacheTicker *time.Ticker
localDiscoveryInfoCachePopulated chan struct{}
localDiscoveryInfoCachePopulatedOnce sync.Once
// Cache that stores resources and groups served by peer apiservers.
// The map is from string to PeerDiscoveryCacheEntry where the string
// is the serverID of the peer apiserver.
// Refreshed if a new apiserver identity lease is added, deleted or
// holderIndentity change is observed in the lease.
peerDiscoveryInfoCache atomic.Value // map[string]struct{GVRs map[schema.GroupVersionResource]bool; Groups []apidiscoveryv2.APIGroupDiscovery}
proxyTransport http.RoundTripper
// Worker queue that keeps the peerDiscoveryInfoCache up-to-date.
// rawPeerDiscoveryCache stores unfiltered resources and groups served by peer apiservers.
// The map is from string (serverID) to PeerDiscoveryCacheEntry.
// Written ONLY by peerLeaseQueue worker when peer leases change.
rawPeerDiscoveryCache atomic.Value // map[string]PeerDiscoveryCacheEntry
proxyTransport http.RoundTripper
// Worker queue that keeps the rawPeerDiscoveryCache up-to-date.
peerLeaseQueue workqueue.TypedRateLimitingInterface[string]
serializer runtime.NegotiatedSerializer
cacheInvalidationCallback atomic.Pointer[func()]
@ -284,16 +282,9 @@ func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) {
// Returns a map of serverID -> []apidiscoveryv2.APIGroupDiscovery served by peer servers
func (h *peerProxyHandler) GetPeerResources() map[string][]apidiscoveryv2.APIGroupDiscovery {
result := make(map[string][]apidiscoveryv2.APIGroupDiscovery)
peerCache := h.peerDiscoveryInfoCache.Load()
if peerCache == nil {
klog.V(4).Infof("GetPeerResources: peer cache is nil")
return result
}
cacheMap, ok := peerCache.(map[string]PeerDiscoveryCacheEntry)
if !ok {
klog.Warning("Invalid cache type in peerDiscoveryGVRCache")
cacheMap := h.gvExclusionManager.GetFilteredPeerDiscoveryCache()
if len(cacheMap) == 0 {
klog.V(4).Infof("GetPeerResources: peer cache is empty")
return result
}

View file

@ -387,7 +387,7 @@ func newFakePeerProxyHandler(informerFinishedSync bool,
return nil, err
}
ppH.localDiscoveryInfoCache.Store(localCache)
ppH.peerDiscoveryInfoCache.Store(peerCache)
ppH.gvExclusionManager.filteredPeerDiscoveryCache.Store(peerCache)
ppH.finishedSync.Store(informerFinishedSync)
return ppH, nil