From 4df467d42d103bb512441d250336ffd38fe12105 Mon Sep 17 00:00:00 2001 From: Richa Banker Date: Sat, 24 Jan 2026 20:33:37 -0800 Subject: [PATCH] separate raw and filtered peer discovery caches --- .../util/peerproxy/gv_exclusion_manager.go | 119 +++++++++--------- .../peerproxy/gv_exclusion_manager_test.go | 96 +++++++++----- .../pkg/util/peerproxy/peer_discovery.go | 30 ++--- .../pkg/util/peerproxy/peer_discovery_test.go | 5 +- .../apiserver/pkg/util/peerproxy/peerproxy.go | 6 +- .../pkg/util/peerproxy/peerproxy_handler.go | 27 ++-- .../util/peerproxy/peerproxy_handler_test.go | 2 +- 7 files changed, 147 insertions(+), 138 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/gv_exclusion_manager.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/gv_exclusion_manager.go index 950a05f2635..9a1ede91767 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/gv_exclusion_manager.go +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/gv_exclusion_manager.go @@ -66,22 +66,27 @@ type GVExclusionManager struct { // Worker 2: triggered by Active/Deleted GV changes refilterQueue workqueue.TypedRateLimitingInterface[string] - peerDiscoveryCache *atomic.Value // peerProxyHandler.peerDiscoveryInfoCache - invalidationCallback *atomic.Pointer[func()] + // rawPeerDiscoveryCache is written only by peerLeaseQueue worker + // when peer leases change + rawPeerDiscoveryCache *atomic.Value // map[string]PeerDiscoveryCacheEntry + // filteredPeerDiscoveryCache is written only by the refilter worker + // when raw cache or exclusion set changes. + filteredPeerDiscoveryCache atomic.Value // map[string]PeerDiscoveryCacheEntry + invalidationCallback *atomic.Pointer[func()] } // NewGVExclusionManager creates a new GV exclusion manager. func NewGVExclusionManager( exclusionGracePeriod time.Duration, reaperCheckInterval time.Duration, - peerDiscoveryCache *atomic.Value, + rawPeerDiscoveryCache *atomic.Value, invalidationCallback *atomic.Pointer[func()], ) *GVExclusionManager { mgr := &GVExclusionManager{ - exclusionGracePeriod: exclusionGracePeriod, - reaperCheckInterval: reaperCheckInterval, - peerDiscoveryCache: peerDiscoveryCache, - invalidationCallback: invalidationCallback, + exclusionGracePeriod: exclusionGracePeriod, + reaperCheckInterval: reaperCheckInterval, + rawPeerDiscoveryCache: rawPeerDiscoveryCache, + invalidationCallback: invalidationCallback, activeGVQueue: workqueue.NewTypedRateLimitingQueueWithConfig( workqueue.DefaultTypedControllerRateLimiter[string](), workqueue.TypedRateLimitingQueueConfig[string]{ @@ -96,10 +101,21 @@ func NewGVExclusionManager( mgr.currentlyActiveGVs.Store(map[schema.GroupVersion]struct{}{}) mgr.recentlyDeletedGVs.Store(map[schema.GroupVersion]time.Time{}) + mgr.filteredPeerDiscoveryCache.Store(map[string]PeerDiscoveryCacheEntry{}) return mgr } +// GetFilteredPeerDiscoveryCache returns the filtered peer discovery cache. +func (m *GVExclusionManager) GetFilteredPeerDiscoveryCache() map[string]PeerDiscoveryCacheEntry { + if cache := m.filteredPeerDiscoveryCache.Load(); cache != nil { + if cacheMap, ok := cache.(map[string]PeerDiscoveryCacheEntry); ok { + return cacheMap + } + } + return map[string]PeerDiscoveryCacheEntry{} +} + // RegisterCRDInformerHandlers registers event handlers for CRD informer using a custom extractor. // The extractor function is responsible for extracting GroupVersions from CRD objects. func (m *GVExclusionManager) RegisterCRDInformerHandlers(crdInformer cache.SharedIndexInformer, extractor GVExtractor) error { @@ -256,8 +272,7 @@ func (m *GVExclusionManager) reconcileActiveGVs() { m.currentlyActiveGVs.Store(freshGVs) klog.V(4).Infof("Active GVs updated: %d GVs now active", len(freshGVs)) } - // Queue re-filtering of peer discovery cache - m.refilterQueue.Add("refilter") + m.TriggerRefilter() } else { klog.V(4).Infof("No changes detected in active or recently deleted GVs") } @@ -373,83 +388,57 @@ func (m *GVExclusionManager) processNextRefilter(ctx context.Context) bool { return true } -// refilterPeerDiscoveryCache filters the peer discovery cache using the exclusion set. -// Only updates the cache and calls invalidation callback if filtering actually changed content. +// refilterPeerDiscoveryCache reads the raw peer discovery cache, +// applies exclusion filtering, and stores the result to the filtered cache. func (m *GVExclusionManager) refilterPeerDiscoveryCache() { - if m.peerDiscoveryCache == nil { - klog.Warning("peerDiscoveryCache reference not set") - return - } - - cache := m.peerDiscoveryCache.Load() - if cache == nil { - klog.V(4).Infof("Peer discovery cache is empty, skipping re-filter") - return - } - - cacheMap, ok := cache.(map[string]PeerDiscoveryCacheEntry) - if !ok { - klog.Warning("Invalid cache type in peerDiscoveryInfoCache") - return + var cacheMap map[string]PeerDiscoveryCacheEntry + if m.rawPeerDiscoveryCache != nil { + if rawCache := m.rawPeerDiscoveryCache.Load(); rawCache != nil { + cacheMap, _ = rawCache.(map[string]PeerDiscoveryCacheEntry) + } } if len(cacheMap) == 0 { - klog.V(4).Infof("Peer discovery cache is empty, skipping re-filter") + m.filteredPeerDiscoveryCache.Store(map[string]PeerDiscoveryCacheEntry{}) + klog.V(4).Infof("Raw peer discovery cache is empty or unavailable") return } - // Filter the cache - filteredCache, changed := m.FilterPeerDiscoveryCache(cacheMap) + filteredCache := m.filterPeerDiscoveryCache(cacheMap) + m.filteredPeerDiscoveryCache.Store(filteredCache) - if changed { - // Atomic swap peer cache - m.peerDiscoveryCache.Store(filteredCache) - - // Call invalidation callback - if m.invalidationCallback != nil { - if callback := m.invalidationCallback.Load(); callback != nil { - (*callback)() - } + if m.invalidationCallback != nil { + if callback := m.invalidationCallback.Load(); callback != nil { + (*callback)() } - - klog.V(4).Infof("Peer discovery cache re-filtered, %d GVs excluded", len(m.getExclusionSet())) - } else { - klog.V(4).Infof("No changes after filtering peer discovery cache") } + + klog.V(4).Infof("Peer discovery cache re-filtered, %d GVs in exclusion set", len(m.getExclusionSet())) + } -// FilterPeerDiscoveryCache applies the current exclusion set to the provided peer cache. -// This should be called when new peers are added to ensure their discovery is filtered. -// Returns the filtered cache and a boolean indicating if any changes were made. -func (m *GVExclusionManager) FilterPeerDiscoveryCache(cacheMap map[string]PeerDiscoveryCacheEntry) (map[string]PeerDiscoveryCacheEntry, bool) { +// filterPeerDiscoveryCache applies the current exclusion set to the provided peer cache. +func (m *GVExclusionManager) filterPeerDiscoveryCache(cacheMap map[string]PeerDiscoveryCacheEntry) map[string]PeerDiscoveryCacheEntry { exclusionSet := m.getExclusionSet() if len(exclusionSet) == 0 { - return cacheMap, false + return cacheMap } filtered := make(map[string]PeerDiscoveryCacheEntry, len(cacheMap)) - anyChanged := false - for peerID, entry := range cacheMap { - filteredEntry, changed := m.filterPeerCacheEntry(entry, exclusionSet) - filtered[peerID] = filteredEntry - if changed { - anyChanged = true - } + filtered[peerID] = m.filterPeerCacheEntry(entry, exclusionSet) } - return filtered, anyChanged + return filtered } // filterPeerCacheEntry filters a single peer's cache entry for excluded GVs. -// Returns the filtered entry and whether any GVs were excluded. func (m *GVExclusionManager) filterPeerCacheEntry( entry PeerDiscoveryCacheEntry, exclusionSet map[schema.GroupVersion]struct{}, -) (PeerDiscoveryCacheEntry, bool) { +) PeerDiscoveryCacheEntry { filteredGVRs := make(map[schema.GroupVersionResource]bool, len(entry.GVRs)) anyExcluded := false - for existingGVR, v := range entry.GVRs { gv := schema.GroupVersion{Group: existingGVR.Group, Version: existingGVR.Version} if _, excluded := exclusionSet[gv]; excluded { @@ -459,17 +448,17 @@ func (m *GVExclusionManager) filterPeerCacheEntry( filteredGVRs[existingGVR] = v } + // If no GVRs were excluded, the exclusion set doesn't intersect with this peer's GVs, + // so we can return the entry unchanged without filtering GroupDiscovery. if !anyExcluded { - return entry, false + return entry } - - // Filter the group discovery list filteredGroups := m.filterGroupDiscovery(entry.GroupDiscovery, exclusionSet) return PeerDiscoveryCacheEntry{ GVRs: filteredGVRs, GroupDiscovery: filteredGroups, - }, true + } } // filterGroupDiscovery filters group discovery entries, removing excluded GVs. @@ -517,3 +506,9 @@ func (m *GVExclusionManager) loadRecentlyDeletedGVs() map[schema.GroupVersion]ti } return map[schema.GroupVersion]time.Time{} } + +// TriggerRefilter triggers the refilter worker to apply exclusions to the filtered cache. +// This should be called by peerLeaseQueue after updating the raw peer discovery cache. +func (m *GVExclusionManager) TriggerRefilter() { + m.refilterQueue.Add("refilter") +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/gv_exclusion_manager_test.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/gv_exclusion_manager_test.go index 1e3ea220e32..85e921ccc45 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/gv_exclusion_manager_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/gv_exclusion_manager_test.go @@ -184,16 +184,18 @@ func TestDetectDiff(t *testing.T) { _ = NewGVExclusionManager(5*time.Minute, 1*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{}) tests := []struct { - name string - old map[schema.GroupVersion]struct{} - new map[schema.GroupVersion]struct{} - want bool + name string + old map[schema.GroupVersion]struct{} + new map[schema.GroupVersion]struct{} + wantChanged bool + wantDeleted []schema.GroupVersion }{ { - name: "both empty", - old: map[schema.GroupVersion]struct{}{}, - new: map[schema.GroupVersion]struct{}{}, - want: false, + name: "both empty", + old: map[schema.GroupVersion]struct{}{}, + new: map[schema.GroupVersion]struct{}{}, + wantChanged: false, + wantDeleted: nil, }, { name: "identical", @@ -203,7 +205,8 @@ func TestDetectDiff(t *testing.T) { new: map[schema.GroupVersion]struct{}{ {Group: "apps", Version: "v1"}: {}, }, - want: false, + wantChanged: false, + wantDeleted: nil, }, { name: "added GV", @@ -214,7 +217,8 @@ func TestDetectDiff(t *testing.T) { {Group: "apps", Version: "v1"}: {}, {Group: "batch", Version: "v1"}: {}, }, - want: true, + wantChanged: true, + wantDeleted: nil, // No deletions, only addition }, { name: "removed GV", @@ -225,7 +229,10 @@ func TestDetectDiff(t *testing.T) { new: map[schema.GroupVersion]struct{}{ {Group: "apps", Version: "v1"}: {}, }, - want: true, + wantChanged: true, + wantDeleted: []schema.GroupVersion{ + {Group: "batch", Version: "v1"}, + }, }, { name: "different GVs same size", @@ -235,15 +242,33 @@ func TestDetectDiff(t *testing.T) { new: map[schema.GroupVersion]struct{}{ {Group: "batch", Version: "v1"}: {}, }, - want: true, + wantChanged: true, + wantDeleted: []schema.GroupVersion{ + {Group: "apps", Version: "v1"}, + }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, result := diffGVs(tt.old, tt.new) - if result != tt.want { - t.Errorf("diffGVs() = %v, want %v", result, tt.want) + deletedGVs, changed := diffGVs(tt.old, tt.new) + if changed != tt.wantChanged { + t.Errorf("diffGVs() changed = %v, want %v", changed, tt.wantChanged) + } + if len(deletedGVs) != len(tt.wantDeleted) { + t.Errorf("diffGVs() deleted count = %d, want %d", len(deletedGVs), len(tt.wantDeleted)) + } + for _, wantGV := range tt.wantDeleted { + found := false + for _, gotGV := range deletedGVs { + if gotGV == wantGV { + found = true + break + } + } + if !found { + t.Errorf("diffGVs() missing deleted GV %v", wantGV) + } } }) } @@ -255,7 +280,6 @@ func TestFilterPeerDiscoveryCache(t *testing.T) { activeGVs map[schema.GroupVersion]struct{} deletedGVs map[schema.GroupVersion]time.Time cacheMap map[string]PeerDiscoveryCacheEntry - wantChanged bool wantPeerGVRs map[string]int // peer name -> GVR count }{ { @@ -273,7 +297,6 @@ func TestFilterPeerDiscoveryCache(t *testing.T) { }, }, }, - wantChanged: false, wantPeerGVRs: map[string]int{ "peer1": 1, "peer2": 1, @@ -296,7 +319,6 @@ func TestFilterPeerDiscoveryCache(t *testing.T) { }, }, }, - wantChanged: true, wantPeerGVRs: map[string]int{ "peer1": 0, // apps/v1 filtered out "peer2": 1, // unchanged @@ -322,7 +344,6 @@ func TestFilterPeerDiscoveryCache(t *testing.T) { }, }, }, - wantChanged: true, wantPeerGVRs: map[string]int{ "peer1": 0, // custom/v1alpha1 filtered out "peer2": 1, // unchanged @@ -347,7 +368,6 @@ func TestFilterPeerDiscoveryCache(t *testing.T) { }, }, }, - wantChanged: true, wantPeerGVRs: map[string]int{ "peer1": 1, // apps/v1 and custom/v1alpha1 filtered out, batch/v1 remains }, @@ -365,10 +385,7 @@ func TestFilterPeerDiscoveryCache(t *testing.T) { mgr.recentlyDeletedGVs.Store(tt.deletedGVs) } - filtered, changed := mgr.FilterPeerDiscoveryCache(tt.cacheMap) - if changed != tt.wantChanged { - t.Errorf("Want changed=%v, got %v", tt.wantChanged, changed) - } + filtered := mgr.filterPeerDiscoveryCache(tt.cacheMap) if len(filtered) != len(tt.cacheMap) { t.Errorf("Want %d peers in filtered cache, got %d", len(tt.cacheMap), len(filtered)) @@ -393,9 +410,30 @@ func TestFilterPeerCacheEntry(t *testing.T) { name string entry PeerDiscoveryCacheEntry exclusionSet map[schema.GroupVersion]struct{} - wantChanged bool wantGVRs []schema.GroupVersionResource }{ + { + name: "empty exclusion set returns entry unchanged", + entry: PeerDiscoveryCacheEntry{ + GVRs: map[schema.GroupVersionResource]bool{ + {Group: "apps", Version: "v1", Resource: "deployments"}: true, + {Group: "batch", Version: "v1", Resource: "jobs"}: true, + }, + GroupDiscovery: []apidiscoveryv2.APIGroupDiscovery{ + { + ObjectMeta: metav1.ObjectMeta{Name: "apps"}, + Versions: []apidiscoveryv2.APIVersionDiscovery{ + {Version: "v1"}, + }, + }, + }, + }, + exclusionSet: map[schema.GroupVersion]struct{}{}, + wantGVRs: []schema.GroupVersionResource{ + {Group: "apps", Version: "v1", Resource: "deployments"}, + {Group: "batch", Version: "v1", Resource: "jobs"}, + }, + }, { name: "filter GVRs and groups", entry: PeerDiscoveryCacheEntry{ @@ -422,7 +460,6 @@ func TestFilterPeerCacheEntry(t *testing.T) { exclusionSet: map[schema.GroupVersion]struct{}{ {Group: "apps", Version: "v1"}: {}, }, - wantChanged: true, wantGVRs: []schema.GroupVersionResource{ {Group: "batch", Version: "v1", Resource: "jobs"}, {Group: "custom", Version: "v1", Resource: "myresources"}, @@ -438,7 +475,6 @@ func TestFilterPeerCacheEntry(t *testing.T) { exclusionSet: map[schema.GroupVersion]struct{}{ {Group: "apps", Version: "v1"}: {}, }, - wantChanged: false, wantGVRs: []schema.GroupVersionResource{ {Group: "batch", Version: "v1", Resource: "jobs"}, }, @@ -449,11 +485,7 @@ func TestFilterPeerCacheEntry(t *testing.T) { t.Run(tt.name, func(t *testing.T) { mgr := NewGVExclusionManager(5*time.Minute, 1*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{}) - filtered, changed := mgr.filterPeerCacheEntry(tt.entry, tt.exclusionSet) - - if changed != tt.wantChanged { - t.Errorf("Want changed=%v, got %v", tt.wantChanged, changed) - } + filtered := mgr.filterPeerCacheEntry(tt.entry, tt.exclusionSet) for _, gvr := range tt.wantGVRs { if _, found := filtered.GVRs[gvr]; !found { diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery.go index 31b2bbc037a..b7e9d362d69 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery.go +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery.go @@ -115,21 +115,17 @@ func (h *peerProxyHandler) syncPeerDiscoveryCache(ctx context.Context) error { } } - // Apply current exclusion filter to new peer cache entries. - if filtered, changed := h.gvExclusionManager.FilterPeerDiscoveryCache(newCache); changed { - newCache = filtered - } - - h.storePeerDiscoveryCacheAndInvalidate(newCache) + // Store unfiltered data to raw cache and trigger refilter. + // The refilter worker (single writer to filtered cache) will apply exclusions. + h.storeRawPeerDiscoveryCacheAndTriggerRefilter(newCache) return fetchDiscoveryErr } -// storePeerDiscoveryCacheAndInvalidate stores the new peer discovery cache and always calls the invalidation callback if set. -func (h *peerProxyHandler) storePeerDiscoveryCacheAndInvalidate(newCache map[string]PeerDiscoveryCacheEntry) { - h.peerDiscoveryInfoCache.Store(newCache) - if callback := h.cacheInvalidationCallback.Load(); callback != nil { - (*callback)() - } +// storeRawPeerDiscoveryCacheAndTriggerRefilter stores the raw (unfiltered) peer discovery cache +// and immediately refilters to update the filtered cache. +func (h *peerProxyHandler) storeRawPeerDiscoveryCacheAndTriggerRefilter(newCache map[string]PeerDiscoveryCacheEntry) { + h.rawPeerDiscoveryCache.Store(newCache) + h.gvExclusionManager.TriggerRefilter() } func (h *peerProxyHandler) fetchNewDiscoveryFor(ctx context.Context, serverID string) (PeerDiscoveryCacheEntry, error) { @@ -285,14 +281,8 @@ func (h *peerProxyHandler) isValidPeerIdentityLease(obj interface{}) (*v1.Lease, func (h *peerProxyHandler) findServiceableByPeerFromPeerDiscoveryCache(gvr schema.GroupVersionResource) []string { var serviceableByIDs []string - cache := h.peerDiscoveryInfoCache.Load() - if cache == nil { - return serviceableByIDs - } - - cacheMap, ok := cache.(map[string]PeerDiscoveryCacheEntry) - if !ok { - klog.Warning("Invalid cache type in peerDiscoveryInfoCache") + cacheMap := h.gvExclusionManager.GetFilteredPeerDiscoveryCache() + if len(cacheMap) == 0 { return serviceableByIDs } diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery_test.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery_test.go index e15e4c314b9..c479fbcece3 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery_test.go @@ -192,6 +192,7 @@ func TestRunPeerDiscoveryCacheSync(t *testing.T) { } go h.RunPeerDiscoveryCacheSync(ctx, 1) + go h.RunPeerDiscoveryRefilter(ctx, 1) // Wait for initial cache update. initialCache := map[string]PeerDiscoveryCacheEntry{} @@ -204,7 +205,7 @@ func TestRunPeerDiscoveryCacheSync(t *testing.T) { return false, ctx.Err() default: } - gotCache := h.peerDiscoveryInfoCache.Load() + gotCache := h.gvExclusionManager.GetFilteredPeerDiscoveryCache() return assert.ObjectsAreEqual(initialCache, gotCache), nil }) if err != nil { @@ -251,7 +252,7 @@ func TestRunPeerDiscoveryCacheSync(t *testing.T) { return false, ctx.Err() default: } - gotCache := h.peerDiscoveryInfoCache.Load() + gotCache := h.gvExclusionManager.GetFilteredPeerDiscoveryCache() r := assert.ObjectsAreEqual(tt.wantCache, gotCache) return r, nil }) diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy.go index b6dce3b1ac0..ca0a383c22d 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy.go +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy.go @@ -138,7 +138,7 @@ func NewPeerProxyHandler( localDiscoveryInfoCache: atomic.Value{}, localDiscoveryCacheTicker: time.NewTicker(localDiscoveryRefreshInterval), localDiscoveryInfoCachePopulated: make(chan struct{}), - peerDiscoveryInfoCache: atomic.Value{}, + rawPeerDiscoveryCache: atomic.Value{}, peerLeaseQueue: workqueue.NewTypedRateLimitingQueueWithConfig( workqueue.DefaultTypedControllerRateLimiter[string](), workqueue.TypedRateLimitingQueueConfig[string]{ @@ -150,7 +150,7 @@ func NewPeerProxyHandler( h.gvExclusionManager = NewGVExclusionManager( defaultExclusionGracePeriod, defaultExclusionReaperInterval, - &h.peerDiscoveryInfoCache, + &h.rawPeerDiscoveryCache, &h.cacheInvalidationCallback, ) @@ -176,7 +176,7 @@ func NewPeerProxyHandler( discoveryClient.NoPeerDiscovery = true h.discoveryClient = discoveryClient h.localDiscoveryInfoCache.Store(map[schema.GroupVersionResource]bool{}) - h.peerDiscoveryInfoCache.Store(map[string]PeerDiscoveryCacheEntry{}) + h.rawPeerDiscoveryCache.Store(map[string]PeerDiscoveryCacheEntry{}) proxyTransport, err := transport.New(proxyClientConfig) if err != nil { diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler.go index 87b86de39e3..cd7227f220e 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler.go +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler.go @@ -79,14 +79,12 @@ type peerProxyHandler struct { localDiscoveryCacheTicker *time.Ticker localDiscoveryInfoCachePopulated chan struct{} localDiscoveryInfoCachePopulatedOnce sync.Once - // Cache that stores resources and groups served by peer apiservers. - // The map is from string to PeerDiscoveryCacheEntry where the string - // is the serverID of the peer apiserver. - // Refreshed if a new apiserver identity lease is added, deleted or - // holderIndentity change is observed in the lease. - peerDiscoveryInfoCache atomic.Value // map[string]struct{GVRs map[schema.GroupVersionResource]bool; Groups []apidiscoveryv2.APIGroupDiscovery} - proxyTransport http.RoundTripper - // Worker queue that keeps the peerDiscoveryInfoCache up-to-date. + // rawPeerDiscoveryCache stores unfiltered resources and groups served by peer apiservers. + // The map is from string (serverID) to PeerDiscoveryCacheEntry. + // Written ONLY by peerLeaseQueue worker when peer leases change. + rawPeerDiscoveryCache atomic.Value // map[string]PeerDiscoveryCacheEntry + proxyTransport http.RoundTripper + // Worker queue that keeps the rawPeerDiscoveryCache up-to-date. peerLeaseQueue workqueue.TypedRateLimitingInterface[string] serializer runtime.NegotiatedSerializer cacheInvalidationCallback atomic.Pointer[func()] @@ -284,16 +282,9 @@ func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) { // Returns a map of serverID -> []apidiscoveryv2.APIGroupDiscovery served by peer servers func (h *peerProxyHandler) GetPeerResources() map[string][]apidiscoveryv2.APIGroupDiscovery { result := make(map[string][]apidiscoveryv2.APIGroupDiscovery) - - peerCache := h.peerDiscoveryInfoCache.Load() - if peerCache == nil { - klog.V(4).Infof("GetPeerResources: peer cache is nil") - return result - } - - cacheMap, ok := peerCache.(map[string]PeerDiscoveryCacheEntry) - if !ok { - klog.Warning("Invalid cache type in peerDiscoveryGVRCache") + cacheMap := h.gvExclusionManager.GetFilteredPeerDiscoveryCache() + if len(cacheMap) == 0 { + klog.V(4).Infof("GetPeerResources: peer cache is empty") return result } diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler_test.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler_test.go index 95983701142..afa4ca86ef4 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler_test.go @@ -387,7 +387,7 @@ func newFakePeerProxyHandler(informerFinishedSync bool, return nil, err } ppH.localDiscoveryInfoCache.Store(localCache) - ppH.peerDiscoveryInfoCache.Store(peerCache) + ppH.gvExclusionManager.filteredPeerDiscoveryCache.Store(peerCache) ppH.finishedSync.Store(informerFinishedSync) return ppH, nil