diff --git a/pkg/controlplane/apiserver/server.go b/pkg/controlplane/apiserver/server.go index 8ce38ad15d9..5cc74fdf9b6 100644 --- a/pkg/controlplane/apiserver/server.go +++ b/pkg/controlplane/apiserver/server.go @@ -216,23 +216,11 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele return err }) - // Run peer-discovery sync loop - s.GenericAPIServer.AddPostStartHookOrDie("peer-discovery-cache-sync", func(context genericapiserver.PostStartHookContext) error { + // Run peer-discovery workers + s.GenericAPIServer.AddPostStartHookOrDie("peer-discovery-workers", func(context genericapiserver.PostStartHookContext) error { go c.Extra.PeerProxy.RunPeerDiscoveryCacheSync(context, 1) - return nil - }) - - // RunGVDeletionWorkers processes GVs from deleted CRDs/APIServices. If a GV is no longer in use, - // it is marked for removal from peer-discovery (with a deletion timestamp), triggering a grace period before cleanup. - s.GenericAPIServer.AddPostStartHookOrDie("gv-deletion-workers", func(context genericapiserver.PostStartHookContext) error { - go c.Extra.PeerProxy.RunGVDeletionWorkers(context, 1) - return nil - }) - - // RunExcludedGVsReaper removes GVs from the peer-discovery exclusion list after their grace period expires. - // This ensures we don't include stale CRDs/aggregated APIs from peer discovery in the aggregated discovery. - s.GenericAPIServer.AddPostStartHookOrDie("excluded-groups-reaper", func(context genericapiserver.PostStartHookContext) error { - go c.Extra.PeerProxy.RunExcludedGVsReaper(context.Done()) + go c.Extra.PeerProxy.RunPeerDiscoveryActiveGVTracker(context) + go c.Extra.PeerProxy.RunPeerDiscoveryRefilter(context) return nil }) } diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/exclusion_filter.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/exclusion_filter.go deleted file mode 100644 index a808d06157a..00000000000 --- a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/exclusion_filter.go +++ /dev/null @@ -1,368 +0,0 @@ -/* -Copyright 2025 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package peerproxy - -import ( - "context" - "time" - - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/client-go/tools/cache" - "k8s.io/klog/v2" - - apidiscoveryv2 "k8s.io/api/apidiscovery/v2" -) - -// GVExtractor is a function that extracts group-versions from an object. -// It returns a slice of GroupVersions belonging to CRDs or aggregated APIs that -// should be excluded from peer-discovery to avoid advertising stale CRDs/aggregated APIs -// in peer-aggregated discovery that were deleted but still appear in a peer's discovery. -type GVExtractor func(obj interface{}) []schema.GroupVersion - -// RegisterCRDInformerHandlers registers event handlers for CRD informer using a custom extractor. -// The extractor function is responsible for extracting GroupVersions from CRD objects. -func (h *peerProxyHandler) RegisterCRDInformerHandlers(crdInformer cache.SharedIndexInformer, extractor GVExtractor) error { - h.crdInformer = crdInformer - h.crdExtractor = extractor - _, err := h.crdInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - gvs := extractor(obj) - if gvs == nil { - return - } - h.addExcludedGVs(gvs) - }, - UpdateFunc: func(oldObj, newObj interface{}) { - gvs := extractor(newObj) - if gvs == nil { - return - } - h.addExcludedGVs(gvs) - }, - DeleteFunc: func(obj interface{}) { - // Handle tombstone objects - if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { - obj = tombstone.Obj - } - gvs := extractor(obj) - if gvs == nil { - return - } - for _, gv := range gvs { - h.gvDeletionQueue.Add(gv.String()) - } - }, - }) - return err -} - -// RegisterAPIServiceInformerHandlers registers event handlers for APIService informer using a custom extractor. -// The extractor function is responsible for extracting GroupVersions from APIService objects -// and determining if they represent aggregated APIs. -func (h *peerProxyHandler) RegisterAPIServiceInformerHandlers(apiServiceInformer cache.SharedIndexInformer, extractor GVExtractor) error { - h.apiServiceInformer = apiServiceInformer - h.apiServiceExtractor = extractor - _, err := h.apiServiceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - gvs := extractor(obj) - if gvs == nil { - return - } - h.addExcludedGVs(gvs) - }, - UpdateFunc: func(oldObj, newObj interface{}) { - gvs := extractor(newObj) - if gvs == nil { - return - } - h.addExcludedGVs(gvs) - }, - DeleteFunc: func(obj interface{}) { - // Handle tombstone objects - if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { - obj = tombstone.Obj - } - - gvs := extractor(obj) - if gvs == nil { - return - } - for _, gv := range gvs { - h.gvDeletionQueue.Add(gv.String()) - } - }, - }) - return err -} - -// addExcludedGVs adds group-versions to the exclusion set, filters them from the peer discovery cache, -// and triggers invalidation callbacks if the cache changed. -func (h *peerProxyHandler) addExcludedGVs(gvs []schema.GroupVersion) { - if len(gvs) == 0 { - return - } - - h.excludedGVsMu.Lock() - for _, gv := range gvs { - h.excludedGVs[gv] = nil - } - h.excludedGVsMu.Unlock() - - // Load current peer cache and filter it - cache := h.peerDiscoveryInfoCache.Load() - if cache == nil { - return - } - - cacheMap, ok := cache.(map[string]PeerDiscoveryCacheEntry) - if !ok { - klog.Warning("Invalid cache type in peerDiscoveryInfoCache") - return - } - - // Always trigger filter and callbacks, to ensure late-appearing GVs are filtered. - if peerDiscovery, peerDiscoveryChanged := h.filterPeerDiscoveryCache(cacheMap); peerDiscoveryChanged { - h.storePeerDiscoveryCacheAndInvalidate(peerDiscovery) - } -} - -// filterPeerDiscoveryCache filters the provided peer discovery cache, -// excluding GVs in h.excludedGVs. -func (h *peerProxyHandler) filterPeerDiscoveryCache(cacheMap map[string]PeerDiscoveryCacheEntry) (map[string]PeerDiscoveryCacheEntry, bool) { - h.excludedGVsMu.RLock() - if len(h.excludedGVs) == 0 { - // No exclusions, no filtering needed. - h.excludedGVsMu.RUnlock() - return cacheMap, false - } - - excludedCloned := make(map[schema.GroupVersion]struct{}, len(h.excludedGVs)) - for gv := range h.excludedGVs { - excludedCloned[gv] = struct{}{} - } - h.excludedGVsMu.RUnlock() - - filtered := make(map[string]PeerDiscoveryCacheEntry, len(cacheMap)) - peerDiscoveryChanged := false - for peerID, entry := range cacheMap { - newEntry, peerChanged := h.filterPeerCacheEntry(entry, excludedCloned) - filtered[peerID] = newEntry - if peerChanged { - peerDiscoveryChanged = true - } - } - - return filtered, peerDiscoveryChanged -} - -// filterPeerCacheEntry filters a single peer's cache entry for excluded groups. -// Returns the filtered entry and whether any excluded group was present. -func (h *peerProxyHandler) filterPeerCacheEntry(entry PeerDiscoveryCacheEntry, excluded map[schema.GroupVersion]struct{}) (PeerDiscoveryCacheEntry, bool) { - filteredGVRs := make(map[schema.GroupVersionResource]bool, len(entry.GVRs)) - peerDiscoveryChanged := false - - for existingGVR, v := range entry.GVRs { - gv := schema.GroupVersion{Group: existingGVR.Group, Version: existingGVR.Version} - if _, skip := excluded[gv]; skip { - peerDiscoveryChanged = true - continue - } - filteredGVRs[existingGVR] = v - } - - if !peerDiscoveryChanged { - return entry, false - } - - // Filter the group discovery list - filteredGroups := h.filterGroupDiscovery(entry.GroupDiscovery, excluded) - return PeerDiscoveryCacheEntry{ - GVRs: filteredGVRs, - GroupDiscovery: filteredGroups, - }, true -} - -func (h *peerProxyHandler) filterGroupDiscovery(groupDiscoveries []apidiscoveryv2.APIGroupDiscovery, excluded map[schema.GroupVersion]struct{}) []apidiscoveryv2.APIGroupDiscovery { - var filteredDiscovery []apidiscoveryv2.APIGroupDiscovery - for _, groupDiscovery := range groupDiscoveries { - filteredGroup := apidiscoveryv2.APIGroupDiscovery{ - ObjectMeta: groupDiscovery.ObjectMeta, - } - - for _, version := range groupDiscovery.Versions { - gv := schema.GroupVersion{Group: groupDiscovery.Name, Version: version.Version} - if _, found := excluded[gv]; found { - // This version is excluded, skip it - continue - } - filteredGroup.Versions = append(filteredGroup.Versions, version) - } - - // Only add the group to the final list if it still has any versions left - if len(filteredGroup.Versions) > 0 { - filteredDiscovery = append(filteredDiscovery, filteredGroup) - } - } - return filteredDiscovery -} - -// RunGVDeletionWorkers starts workers to process GroupVersions from deleted CRDs and APIServices. -// When a GV is processed, it is checked for usage by other resources. If no longer in use, -// it is marked with a deletion timestamp, initiating a grace period. After this period, -// the RunExcludedGVsReaper is responsible for removing the GV from the exclusion set. -func (h *peerProxyHandler) RunGVDeletionWorkers(ctx context.Context, workers int) { - defer func() { - klog.Info("Shutting down GV deletion workers") - h.gvDeletionQueue.ShutDown() - }() - klog.Infof("Starting %d GV deletion worker(s)", workers) - for i := 0; i < workers; i++ { - go func(workerID int) { - for h.processNextGVDeletion() { - select { - case <-ctx.Done(): - klog.Infof("GV deletion worker %d shutting down", workerID) - return - default: - } - } - }(i) - } - - <-ctx.Done() // Wait till context is done to call deferred shutdown function -} - -// processNextGVDeletion processes a single item from the GV deletion queue. -func (h *peerProxyHandler) processNextGVDeletion() bool { - gvString, shutdown := h.gvDeletionQueue.Get() - if shutdown { - return false - } - defer h.gvDeletionQueue.Done(gvString) - - gv, err := schema.ParseGroupVersion(gvString) - if err != nil { - klog.Errorf("Failed to parse GroupVersion %q: %v", gvString, err) - return true - } - - h.markGVForDeletionIfUnused(gv) - return true -} - -// markGVForDeletionIfUnused checks if a group-version is still in use. -// If not, it marks it for deletion by setting a timestamp. -func (h *peerProxyHandler) markGVForDeletionIfUnused(gv schema.GroupVersion) { - // Best-effort check if GV is still in use. This is racy - a new CRD/APIService could be - // created after this check completes and returns false. - if h.isGVUsed(gv) { - return - } - - h.excludedGVsMu.Lock() - defer h.excludedGVsMu.Unlock() - - // Only mark if it's currently active (nil timestamp) - if ts, exists := h.excludedGVs[gv]; exists && ts == nil { - now := time.Now() - h.excludedGVs[gv] = &now - klog.V(4).Infof("Marking group-version %q for deletion, grace period started", gv) - } -} - -// isGVUsed checks the informer stores to see if any CRD or APIService -// is still using the given group-version. -// -// This is necessary because multiple CRDs or aggregated APIs can share the same group-version, -// but define different resources (kinds) or versions. Deleting one CRD or APIService -// for a group-version does not mean the group-version is entirely gone—other CRDs or APIs in that group-version -// may still exist. We must only remove a group-version from the exclusion set when there are -// no remaining CRDs or APIService objects for that group-version, to ensure we do not prematurely -// allow peer-aggregated discovery or proxying for APIs that are still served locally. -func (h *peerProxyHandler) isGVUsed(gv schema.GroupVersion) bool { - // Check CRD informer store for the specific group-version - if h.crdInformer != nil && h.crdExtractor != nil { - crdList := h.crdInformer.GetStore().List() - for _, item := range crdList { - gvs := h.crdExtractor(item) - if gvs == nil { - continue - } - for _, extractedGV := range gvs { - if extractedGV == gv { - return true - } - } - } - } - - // Check APIService informer store for the specific group-version - if h.apiServiceInformer != nil && h.apiServiceExtractor != nil { - apiSvcList := h.apiServiceInformer.GetStore().List() - for _, item := range apiSvcList { - gvs := h.apiServiceExtractor(item) - if gvs == nil { - continue - } - for _, extractedGV := range gvs { - if extractedGV == gv { - return true - } - } - } - } - - return false -} - -// RunExcludedGVsReaper starts a goroutine that periodically cleans up -// excluded group-versions that have passed their grace period. -func (h *peerProxyHandler) RunExcludedGVsReaper(stopCh <-chan struct{}) { - klog.Infof("Starting excluded group-versions reaper with %s interval", h.reaperCheckInterval) - ticker := time.NewTicker(h.reaperCheckInterval) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - h.reapExcludedGVs() - case <-stopCh: - klog.Info("Shutting down excluded group-versions reaper") - return - } - } -} - -// reapExcludedGVs is the garbage collector function. -// It removes group-versions from excludedGVs if their grace period has expired. -func (h *peerProxyHandler) reapExcludedGVs() { - h.excludedGVsMu.Lock() - defer h.excludedGVsMu.Unlock() - - now := time.Now() - for gv, deletionTime := range h.excludedGVs { - if deletionTime == nil { - // Still actively excluded - continue - } - - if now.Sub(*deletionTime) > h.exclusionGracePeriod { - klog.V(4).Infof("Reaping excluded group-version %q (grace period expired)", gv) - delete(h.excludedGVs, gv) - } - } -} diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/exclusion_filter_test.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/exclusion_filter_test.go deleted file mode 100644 index bafa19341ba..00000000000 --- a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/exclusion_filter_test.go +++ /dev/null @@ -1,132 +0,0 @@ -/* -Copyright 2025 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package peerproxy - -import ( - "reflect" - "testing" - - "k8s.io/apimachinery/pkg/runtime/schema" - - apidiscoveryv2 "k8s.io/api/apidiscovery/v2" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -func TestFilteringLogic(t *testing.T) { - gvr := func(g, v, r string) schema.GroupVersionResource { - return schema.GroupVersionResource{Group: g, Version: v, Resource: r} - } - gv := func(g, v string) schema.GroupVersion { - return schema.GroupVersion{Group: g, Version: v} - } - - testCases := []struct { - name string - initialPeerCache PeerDiscoveryCacheEntry - excludedGVs map[schema.GroupVersion]struct{} - wantFilteredGVRs map[schema.GroupVersionResource]bool - wantFilteredGroups []apidiscoveryv2.APIGroupDiscovery - wantChange bool - }{ - { - name: "no exclusions", - initialPeerCache: PeerDiscoveryCacheEntry{ - GVRs: map[schema.GroupVersionResource]bool{gvr("apps", "v1", "deployments"): true}, - GroupDiscovery: []apidiscoveryv2.APIGroupDiscovery{{ - ObjectMeta: metav1.ObjectMeta{Name: "apps"}, - Versions: []apidiscoveryv2.APIVersionDiscovery{{Version: "v1"}}, - }}, - }, - excludedGVs: map[schema.GroupVersion]struct{}{}, - wantFilteredGVRs: map[schema.GroupVersionResource]bool{gvr("apps", "v1", "deployments"): true}, - wantFilteredGroups: []apidiscoveryv2.APIGroupDiscovery{{ - ObjectMeta: metav1.ObjectMeta{Name: "apps"}, - Versions: []apidiscoveryv2.APIVersionDiscovery{{Version: "v1"}}, - }}, - wantChange: false, - }, - { - name: "exclude one GV that exists", - initialPeerCache: PeerDiscoveryCacheEntry{ - GVRs: map[schema.GroupVersionResource]bool{ - gvr("apps", "v1", "deployments"): true, - gvr("apps", "v1", "statefulsets"): true, - gvr("batch", "v1", "jobs"): true, - }, - GroupDiscovery: []apidiscoveryv2.APIGroupDiscovery{ - { - ObjectMeta: metav1.ObjectMeta{Name: "apps"}, - Versions: []apidiscoveryv2.APIVersionDiscovery{{Version: "v1"}}, - }, - { - ObjectMeta: metav1.ObjectMeta{Name: "batch"}, - Versions: []apidiscoveryv2.APIVersionDiscovery{{Version: "v1"}}, - }, - }, - }, - excludedGVs: map[schema.GroupVersion]struct{}{ - gv("apps", "v1"): {}, - }, - wantFilteredGVRs: map[schema.GroupVersionResource]bool{gvr("batch", "v1", "jobs"): true}, - wantFilteredGroups: []apidiscoveryv2.APIGroupDiscovery{ - { - ObjectMeta: metav1.ObjectMeta{Name: "batch"}, - Versions: []apidiscoveryv2.APIVersionDiscovery{{Version: "v1"}}, - }, - }, - wantChange: true, - }, - { - name: "exclude a GV that does not exist", - initialPeerCache: PeerDiscoveryCacheEntry{ - GVRs: map[schema.GroupVersionResource]bool{gvr("apps", "v1", "deployments"): true}, - GroupDiscovery: []apidiscoveryv2.APIGroupDiscovery{{ - ObjectMeta: metav1.ObjectMeta{Name: "apps"}, - Versions: []apidiscoveryv2.APIVersionDiscovery{{Version: "v1"}}, - }}, - }, - excludedGVs: map[schema.GroupVersion]struct{}{ - gv("foo", "v1"): {}, - }, - wantFilteredGVRs: map[schema.GroupVersionResource]bool{gvr("apps", "v1", "deployments"): true}, - wantFilteredGroups: []apidiscoveryv2.APIGroupDiscovery{{ - ObjectMeta: metav1.ObjectMeta{Name: "apps"}, - Versions: []apidiscoveryv2.APIVersionDiscovery{{Version: "v1"}}, - }}, - wantChange: false, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - h := &peerProxyHandler{} - filteredEntry, changed := h.filterPeerCacheEntry(tc.initialPeerCache, tc.excludedGVs) - - if changed != tc.wantChange { - t.Errorf("want change to be %v, got %v", tc.wantChange, changed) - } - - if !reflect.DeepEqual(filteredEntry.GVRs, tc.wantFilteredGVRs) { - t.Errorf("filtered GVRs mismatch: got %v, want %v", filteredEntry.GVRs, tc.wantFilteredGVRs) - } - - if !reflect.DeepEqual(filteredEntry.GroupDiscovery, tc.wantFilteredGroups) { - t.Errorf("filtered GroupDiscovery mismatch: got %v, want %v", filteredEntry.GroupDiscovery, tc.wantFilteredGroups) - } - }) - } -} diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/gv_exclusion_manager.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/gv_exclusion_manager.go new file mode 100644 index 00000000000..e7fb0071031 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/gv_exclusion_manager.go @@ -0,0 +1,492 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package peerproxy + +import ( + "context" + "sync/atomic" + "time" + + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + + apidiscoveryv2 "k8s.io/api/apidiscovery/v2" +) + +// GVExtractor is a function that extracts group-versions from an object. +// It returns a slice of GroupVersions belonging to CRDs or aggregated APIs that +// should be excluded from peer-discovery to avoid advertising stale CRDs/aggregated APIs +// in peer-aggregated discovery that were deleted but still appear in a peer's discovery. +type GVExtractor func(obj interface{}) []schema.GroupVersion + +// GVExclusionManager manages the exclusion of group-versions from peer discovery. +// It maintains two atomic maps: +// - currentlyActiveGVs: All GVs currently served by CRDs and aggregated APIServices +// - recentlyDeletedGVs: GVs belonging to CRDs and aggregated APIServices that were recently deleted, +// tracked with deletion timestamp for grace period +// +// It runs two workers: +// 1. Active GV Tracker: Triggered on CRD/APIService events, rebuilds active GVs +// and reaps expired deleted GVs. When a GV is deleted, a delayed sync is scheduled +// after the grace period to reap it. +// 2. Peer Discovery Re-filter: Rate-limited worker that filters peer cache +type GVExclusionManager struct { + // Atomic maps for lock-free access + currentlyActiveGVs atomic.Value // map[schema.GroupVersion]struct{} + recentlyDeletedGVs atomic.Value // map[schema.GroupVersion]time.Time + + // Informers for fetching active GVs + crdInformer cache.SharedIndexInformer + crdExtractor GVExtractor + apiServiceInformer cache.SharedIndexInformer + apiServiceExtractor GVExtractor + + // Worker 1: triggered by CRD/APIService events or delayed reap scheduling + activeGVQueue workqueue.TypedRateLimitingInterface[string] + // Grace period before reaping deleted GVs from the exclusion set + exclusionGracePeriod time.Duration + // Worker 2: triggered by Active/Deleted GV changes + refilterQueue workqueue.TypedRateLimitingInterface[string] + + // rawPeerDiscoveryCache is written only by peerLeaseQueue worker + // when peer leases change + rawPeerDiscoveryCache *atomic.Value // map[string]PeerDiscoveryCacheEntry + // filteredPeerDiscoveryCache is written only by the refilter worker + // when raw cache or exclusion set changes. + filteredPeerDiscoveryCache atomic.Value // map[string]PeerDiscoveryCacheEntry + invalidationCallback *atomic.Pointer[func()] +} + +// NewGVExclusionManager creates a new GV exclusion manager. +func NewGVExclusionManager( + exclusionGracePeriod time.Duration, + rawPeerDiscoveryCache *atomic.Value, + invalidationCallback *atomic.Pointer[func()], +) *GVExclusionManager { + mgr := &GVExclusionManager{ + exclusionGracePeriod: exclusionGracePeriod, + rawPeerDiscoveryCache: rawPeerDiscoveryCache, + invalidationCallback: invalidationCallback, + activeGVQueue: workqueue.NewTypedRateLimitingQueueWithConfig( + workqueue.DefaultTypedControllerRateLimiter[string](), + workqueue.TypedRateLimitingQueueConfig[string]{ + Name: "active-gv-tracker", + }), + refilterQueue: workqueue.NewTypedRateLimitingQueueWithConfig( + workqueue.DefaultTypedControllerRateLimiter[string](), + workqueue.TypedRateLimitingQueueConfig[string]{ + Name: "peer-discovery-refilter", + }), + } + + mgr.currentlyActiveGVs.Store(map[schema.GroupVersion]struct{}{}) + mgr.recentlyDeletedGVs.Store(map[schema.GroupVersion]time.Time{}) + mgr.filteredPeerDiscoveryCache.Store(map[string]PeerDiscoveryCacheEntry{}) + + return mgr +} + +// GetFilteredPeerDiscoveryCache returns the filtered peer discovery cache. +func (m *GVExclusionManager) GetFilteredPeerDiscoveryCache() map[string]PeerDiscoveryCacheEntry { + if cache := m.filteredPeerDiscoveryCache.Load(); cache != nil { + if cacheMap, ok := cache.(map[string]PeerDiscoveryCacheEntry); ok { + return cacheMap + } + } + return map[string]PeerDiscoveryCacheEntry{} +} + +// RegisterCRDInformerHandlers registers event handlers for CRD informer using a custom extractor. +// The extractor function is responsible for extracting GroupVersions from CRD objects. +func (m *GVExclusionManager) RegisterCRDInformerHandlers(crdInformer cache.SharedIndexInformer, extractor GVExtractor) error { + m.crdInformer = crdInformer + m.crdExtractor = extractor + _, err := m.crdInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + m.handleGVUpdate() + }, + UpdateFunc: func(oldObj, newObj interface{}) { + m.handleGVUpdate() + }, + DeleteFunc: func(obj interface{}) { + m.handleGVUpdate() + }, + }) + return err +} + +// RegisterAPIServiceInformerHandlers registers event handlers for APIService informer using a custom extractor. +// The extractor function is responsible for extracting GroupVersions from APIService objects +// and determining if they represent aggregated APIs. +func (m *GVExclusionManager) RegisterAPIServiceInformerHandlers(apiServiceInformer cache.SharedIndexInformer, extractor GVExtractor) error { + m.apiServiceInformer = apiServiceInformer + m.apiServiceExtractor = extractor + _, err := m.apiServiceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + m.handleGVUpdate() + }, + UpdateFunc: func(oldObj, newObj interface{}) { + m.handleGVUpdate() + }, + DeleteFunc: func(obj interface{}) { + m.handleGVUpdate() + }, + }) + return err +} + +// WaitForCacheSync waits for the informer caches to sync. +func (m *GVExclusionManager) WaitForCacheSync(stopCh <-chan struct{}) bool { + synced := []cache.InformerSynced{} + if m.crdInformer != nil { + synced = append(synced, m.crdInformer.HasSynced) + } + if m.apiServiceInformer != nil { + synced = append(synced, m.apiServiceInformer.HasSynced) + } + if len(synced) == 0 { + return true + } + return cache.WaitForNamedCacheSync("gv-exclusion-manager", stopCh, synced...) +} + +// getExclusionSet returns the combined exclusion set i.e., union(currentlyActiveGVs, recentlyDeletedGVs) +func (m *GVExclusionManager) getExclusionSet() map[schema.GroupVersion]struct{} { + activeMap := m.loadCurrentlyActiveGVs() + deletedMap := m.loadRecentlyDeletedGVs() + + exclusionSet := make(map[schema.GroupVersion]struct{}, len(activeMap)+len(deletedMap)) + for gv := range activeMap { + exclusionSet[gv] = struct{}{} + } + for gv := range deletedMap { + exclusionSet[gv] = struct{}{} + } + + return exclusionSet +} + +// handleGVUpdate is called when a CRD or APIService event occurs. +// This triggers reconciliation which rebuilds the active GV set +// and also reaps expired GVs if indicated so. +func (m *GVExclusionManager) handleGVUpdate() { + m.activeGVQueue.Add("sync") +} + +// RunPeerDiscoveryActiveGVTracker runs the Active GV Tracker worker. +// This worker is triggered by CRD/APIService events and +// rebuilds the active GV set and reaps expired GVs. +func (m *GVExclusionManager) RunPeerDiscoveryActiveGVTracker(ctx context.Context) { + defer m.activeGVQueue.ShutDown() + + klog.Info("Starting Active GV Tracker worker") + go wait.UntilWithContext(ctx, m.runActiveGVTrackerWorker, time.Second) + + <-ctx.Done() + klog.Info("Active GV Tracker workers stopped") +} + +func (m *GVExclusionManager) runActiveGVTrackerWorker(ctx context.Context) { + for m.processNextActiveGV(ctx) { + } +} + +func (m *GVExclusionManager) processNextActiveGV(ctx context.Context) bool { + key, shutdown := m.activeGVQueue.Get() + if shutdown { + return false + } + defer m.activeGVQueue.Done(key) + + select { + case <-ctx.Done(): + return false + default: + } + + m.reconcileActiveGVs() + return true +} + +// reconcileActiveGVs does the following +// 1. fetches all GVs from CRD and APIService informers +// 2. detects diffs with the previous state +// 3. adds deleted GVs to recentlyDeletedGVs +// 4. reaps expired GVs, and queues Worker 2 if changes detected. +func (m *GVExclusionManager) reconcileActiveGVs() { + freshGVs := make(map[schema.GroupVersion]struct{}) + + // Fetch GVs from CRD informer + if m.crdInformer != nil && m.crdExtractor != nil { + crdList := m.crdInformer.GetStore().List() + for _, item := range crdList { + gvs := m.crdExtractor(item) + for _, gv := range gvs { + freshGVs[gv] = struct{}{} + } + } + } + + // Fetch GVs from APIService informer + if m.apiServiceInformer != nil && m.apiServiceExtractor != nil { + apiSvcList := m.apiServiceInformer.GetStore().List() + for _, item := range apiSvcList { + gvs := m.apiServiceExtractor(item) + for _, gv := range gvs { + freshGVs[gv] = struct{}{} + } + } + } + + // Load previous active GVs and detect diff + previousGVs := m.loadCurrentlyActiveGVs() + deletedGVs, activeGVsChanged := diffGVs(previousGVs, freshGVs) + + // Update recentlyDeletedGVs: add newly deleted GVs and reap expired ones + recentlyDeletedChanged := m.updateRecentlyDeletedGVs(deletedGVs) + + if activeGVsChanged || recentlyDeletedChanged { + if activeGVsChanged { + m.currentlyActiveGVs.Store(freshGVs) + klog.V(4).Infof("Active GVs updated: %d GVs now active", len(freshGVs)) + } + m.TriggerRefilter() + } else { + klog.V(4).Infof("No changes detected in active or recently deleted GVs") + } +} + +// updateRecentlyDeletedGVs adds newly deleted GVs to recentlyDeletedGVs +// and reaps expired ones. Returns true if any changes were made. +func (m *GVExclusionManager) updateRecentlyDeletedGVs(deletedGVs []schema.GroupVersion) bool { + deletedMap := m.loadRecentlyDeletedGVs() + // Early return if nothing to do + if len(deletedGVs) == 0 && len(deletedMap) == 0 { + return false + } + + now := time.Now() + newDeletedMap := make(map[schema.GroupVersion]time.Time, len(deletedMap)+len(deletedGVs)) + changed := false + + // Copy existing entries, reaping expired ones + for gv, deletionTime := range deletedMap { + if now.Sub(deletionTime) > m.exclusionGracePeriod { + klog.V(4).Infof("Reaping GV %s (grace period expired)", gv.String()) + changed = true + // Don't add to new map (effectively removing it) + } else { + newDeletedMap[gv] = deletionTime + } + } + + // Schedule a delayed sync to reap expired GVs after the grace period. + if len(deletedGVs) > 0 { + m.activeGVQueue.AddAfter("sync", m.exclusionGracePeriod) + } + + for _, gv := range deletedGVs { + newDeletedMap[gv] = now + klog.V(4).Infof("GV %s deleted: moved to recentlyDeletedGVs", gv.String()) + changed = true + } + + if changed { + m.recentlyDeletedGVs.Store(newDeletedMap) + } + + return changed +} + +// diffGVs compares old and new GV maps, returns GVs that were deleted (in old but not new) +// and a boolean indicating if there were any changes (additions or deletions). +func diffGVs(old, new map[schema.GroupVersion]struct{}) ([]schema.GroupVersion, bool) { + var deletedGVs []schema.GroupVersion + hasChanges := len(old) != len(new) + + // Find deleted GVs (in old but not in new) + for gv := range old { + if _, exists := new[gv]; !exists { + deletedGVs = append(deletedGVs, gv) + hasChanges = true + } + } + + return deletedGVs, hasChanges +} + +// RunPeerDiscoveryRefilter runs the Peer Discovery Re-filter worker. +// This worker filters the peer discovery cache using the exclusion set. +func (m *GVExclusionManager) RunPeerDiscoveryRefilter(ctx context.Context) { + defer m.refilterQueue.ShutDown() + + klog.Info("Starting Peer Discovery Re-filter worker") + go wait.UntilWithContext(ctx, m.runRefilterWorker, time.Second) + + <-ctx.Done() + klog.Info("Peer Discovery Re-filter workers stopped") +} + +func (m *GVExclusionManager) runRefilterWorker(ctx context.Context) { + for m.processNextRefilter(ctx) { + } +} + +func (m *GVExclusionManager) processNextRefilter(ctx context.Context) bool { + key, shutdown := m.refilterQueue.Get() + if shutdown { + return false + } + defer m.refilterQueue.Done(key) + + select { + case <-ctx.Done(): + return false + default: + } + + m.refilterPeerDiscoveryCache() + return true +} + +// refilterPeerDiscoveryCache reads the raw peer discovery cache, +// applies exclusion filtering, and stores the result to the filtered cache. +func (m *GVExclusionManager) refilterPeerDiscoveryCache() { + var cacheMap map[string]PeerDiscoveryCacheEntry + if m.rawPeerDiscoveryCache != nil { + if rawCache := m.rawPeerDiscoveryCache.Load(); rawCache != nil { + cacheMap, _ = rawCache.(map[string]PeerDiscoveryCacheEntry) + } + } + + if len(cacheMap) == 0 { + m.filteredPeerDiscoveryCache.Store(map[string]PeerDiscoveryCacheEntry{}) + klog.V(4).Infof("Raw peer discovery cache is empty or unavailable") + return + } + + filteredCache := m.filterPeerDiscoveryCache(cacheMap) + m.filteredPeerDiscoveryCache.Store(filteredCache) + + if m.invalidationCallback != nil { + if callback := m.invalidationCallback.Load(); callback != nil { + (*callback)() + } + } + + klog.V(4).Infof("Peer discovery cache re-filtered, %d GVs in exclusion set", len(m.getExclusionSet())) + +} + +// filterPeerDiscoveryCache applies the current exclusion set to the provided peer cache. +func (m *GVExclusionManager) filterPeerDiscoveryCache(cacheMap map[string]PeerDiscoveryCacheEntry) map[string]PeerDiscoveryCacheEntry { + exclusionSet := m.getExclusionSet() + if len(exclusionSet) == 0 { + return cacheMap + } + + filtered := make(map[string]PeerDiscoveryCacheEntry, len(cacheMap)) + for peerID, entry := range cacheMap { + filtered[peerID] = m.filterPeerCacheEntry(entry, exclusionSet) + } + + return filtered +} + +// filterPeerCacheEntry filters a single peer's cache entry for excluded GVs. +func (m *GVExclusionManager) filterPeerCacheEntry( + entry PeerDiscoveryCacheEntry, + exclusionSet map[schema.GroupVersion]struct{}, +) PeerDiscoveryCacheEntry { + filteredGVRs := make(map[schema.GroupVersionResource]bool, len(entry.GVRs)) + anyExcluded := false + for existingGVR, v := range entry.GVRs { + gv := schema.GroupVersion{Group: existingGVR.Group, Version: existingGVR.Version} + if _, excluded := exclusionSet[gv]; excluded { + anyExcluded = true + continue + } + filteredGVRs[existingGVR] = v + } + + // If no GVRs were excluded, the exclusion set doesn't intersect with this peer's GVs, + // so we can return the entry unchanged without filtering GroupDiscovery. + if !anyExcluded { + return entry + } + filteredGroups := m.filterGroupDiscovery(entry.GroupDiscovery, exclusionSet) + + return PeerDiscoveryCacheEntry{ + GVRs: filteredGVRs, + GroupDiscovery: filteredGroups, + } +} + +// filterGroupDiscovery filters group discovery entries, removing excluded GVs. +func (m *GVExclusionManager) filterGroupDiscovery( + groupDiscoveries []apidiscoveryv2.APIGroupDiscovery, + exclusionSet map[schema.GroupVersion]struct{}, +) []apidiscoveryv2.APIGroupDiscovery { + var filteredDiscovery []apidiscoveryv2.APIGroupDiscovery + for _, groupDiscovery := range groupDiscoveries { + filteredGroup := apidiscoveryv2.APIGroupDiscovery{ + ObjectMeta: groupDiscovery.ObjectMeta, + } + + for _, version := range groupDiscovery.Versions { + gv := schema.GroupVersion{Group: groupDiscovery.Name, Version: version.Version} + if _, found := exclusionSet[gv]; found { + // This version is excluded, skip it + continue + } + filteredGroup.Versions = append(filteredGroup.Versions, version) + } + + // Only add the group to the final list if it still has any versions left + if len(filteredGroup.Versions) > 0 { + filteredDiscovery = append(filteredDiscovery, filteredGroup) + } + } + return filteredDiscovery +} + +func (m *GVExclusionManager) loadCurrentlyActiveGVs() map[schema.GroupVersion]struct{} { + if val := m.currentlyActiveGVs.Load(); val != nil { + if gvMap, ok := val.(map[schema.GroupVersion]struct{}); ok { + return gvMap + } + } + return map[schema.GroupVersion]struct{}{} +} + +func (m *GVExclusionManager) loadRecentlyDeletedGVs() map[schema.GroupVersion]time.Time { + if val := m.recentlyDeletedGVs.Load(); val != nil { + if gvMap, ok := val.(map[schema.GroupVersion]time.Time); ok { + return gvMap + } + } + return map[schema.GroupVersion]time.Time{} +} + +// TriggerRefilter triggers the refilter worker to apply exclusions to the filtered cache. +// This should be called by peerLeaseQueue after updating the raw peer discovery cache. +func (m *GVExclusionManager) TriggerRefilter() { + m.refilterQueue.Add("refilter") +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/gv_exclusion_manager_test.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/gv_exclusion_manager_test.go new file mode 100644 index 00000000000..bc8125bf20e --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/gv_exclusion_manager_test.go @@ -0,0 +1,589 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package peerproxy + +import ( + "slices" + "sync/atomic" + "testing" + "time" + + "k8s.io/apimachinery/pkg/runtime/schema" + + apidiscoveryv2 "k8s.io/api/apidiscovery/v2" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestGetExclusionSet(t *testing.T) { + tests := []struct { + name string + activeGVs map[schema.GroupVersion]struct{} + deletedGVs map[schema.GroupVersion]time.Time + wantGVs []schema.GroupVersion + }{ + { + name: "empty state", + activeGVs: nil, + deletedGVs: nil, + wantGVs: []schema.GroupVersion{}, + }, + { + name: "only active GVs", + activeGVs: map[schema.GroupVersion]struct{}{ + {Group: "apps", Version: "v1"}: {}, + {Group: "batch", Version: "v1"}: {}, + {Group: "custom", Version: "v1alpha1"}: {}, + }, + wantGVs: []schema.GroupVersion{ + {Group: "apps", Version: "v1"}, + {Group: "batch", Version: "v1"}, + {Group: "custom", Version: "v1alpha1"}, + }, + }, + { + name: "only deleted GVs", + deletedGVs: map[schema.GroupVersion]time.Time{ + {Group: "deprecated", Version: "v1beta1"}: time.Now(), + {Group: "legacy", Version: "v1alpha1"}: time.Now(), + }, + // Reaper hasnt removed deleted GVs yet. + wantGVs: []schema.GroupVersion{ + {Group: "deprecated", Version: "v1beta1"}, + {Group: "legacy", Version: "v1alpha1"}, + }, + }, + { + name: "different active and deleted GVs", + activeGVs: map[schema.GroupVersion]struct{}{ + {Group: "apps", Version: "v1"}: {}, + {Group: "batch", Version: "v1"}: {}, + }, + deletedGVs: map[schema.GroupVersion]time.Time{ + {Group: "old", Version: "v1"}: time.Now(), + }, + // Include both active GVs and recently deleted GVs. + // Deleted GVs remain in the exclusion set until the reaper removes them after the grace period. + wantGVs: []schema.GroupVersion{ + {Group: "apps", Version: "v1"}, + {Group: "batch", Version: "v1"}, + {Group: "old", Version: "v1"}, + }, + }, + { + // A GV can appear in both active and deleted sets if: + // 1. CRD was deleted (moved from active to deleted) + // 2. CRD was recreated (added back to active) + // 3. Reaper hasn't cleaned up the deleted entry yet + name: "same GV in both active and deleted", + activeGVs: map[schema.GroupVersion]struct{}{ + {Group: "apps", Version: "v1"}: {}, + }, + deletedGVs: map[schema.GroupVersion]time.Time{ + {Group: "apps", Version: "v1"}: time.Now(), + }, + wantGVs: []schema.GroupVersion{ + {Group: "apps", Version: "v1"}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mgr := NewGVExclusionManager(5*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{}) + + if tt.activeGVs != nil { + mgr.currentlyActiveGVs.Store(tt.activeGVs) + } + if tt.deletedGVs != nil { + mgr.recentlyDeletedGVs.Store(tt.deletedGVs) + } + + exclusionSet := mgr.getExclusionSet() + if len(exclusionSet) != len(tt.wantGVs) { + t.Errorf("Want exclusion set size %d, got %d", len(tt.wantGVs), len(exclusionSet)) + } + + for _, gv := range tt.wantGVs { + if _, found := exclusionSet[gv]; !found { + t.Errorf("Want GV %v in exclusion set, but not found", gv) + } + } + }) + } +} + +func TestReapExpiredGVs(t *testing.T) { + tests := []struct { + name string + gracePeriod time.Duration + deletedGVs map[schema.GroupVersion]time.Time + wantReaped []schema.GroupVersion + }{ + { + name: "empty state", + gracePeriod: 100 * time.Millisecond, + deletedGVs: map[schema.GroupVersion]time.Time{}, + wantReaped: []schema.GroupVersion{}, + }, + { + name: "reap old GV keep recent", + gracePeriod: 100 * time.Millisecond, + deletedGVs: map[schema.GroupVersion]time.Time{ + {Group: "old", Version: "v1"}: time.Now().Add(-200 * time.Millisecond), + {Group: "recent", Version: "v1"}: time.Now().Add(-50 * time.Millisecond), + {Group: "new", Version: "v1"}: time.Now(), + }, + wantReaped: []schema.GroupVersion{ + {Group: "old", Version: "v1"}, + }, + }, + { + name: "all expired", + gracePeriod: 50 * time.Millisecond, + deletedGVs: map[schema.GroupVersion]time.Time{ + {Group: "old1", Version: "v1"}: time.Now().Add(-100 * time.Millisecond), + {Group: "old2", Version: "v1"}: time.Now().Add(-200 * time.Millisecond), + }, + wantReaped: []schema.GroupVersion{ + {Group: "old1", Version: "v1"}, + {Group: "old2", Version: "v1"}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mgr := NewGVExclusionManager(tt.gracePeriod, &atomic.Value{}, &atomic.Pointer[func()]{}) + mgr.recentlyDeletedGVs.Store(tt.deletedGVs) + mgr.updateRecentlyDeletedGVs(nil) + result := mgr.loadRecentlyDeletedGVs() + + for _, gv := range tt.wantReaped { + if _, found := result[gv]; found { + t.Errorf("GV %v should have been reaped but still exists", gv) + } + } + }) + } +} + +func TestDetectDiff(t *testing.T) { + _ = NewGVExclusionManager(5*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{}) + + tests := []struct { + name string + old map[schema.GroupVersion]struct{} + new map[schema.GroupVersion]struct{} + wantChanged bool + wantDeleted []schema.GroupVersion + }{ + { + name: "both empty", + old: map[schema.GroupVersion]struct{}{}, + new: map[schema.GroupVersion]struct{}{}, + wantChanged: false, + wantDeleted: nil, + }, + { + name: "identical", + old: map[schema.GroupVersion]struct{}{ + {Group: "apps", Version: "v1"}: {}, + }, + new: map[schema.GroupVersion]struct{}{ + {Group: "apps", Version: "v1"}: {}, + }, + wantChanged: false, + wantDeleted: nil, + }, + { + name: "added GV", + old: map[schema.GroupVersion]struct{}{ + {Group: "apps", Version: "v1"}: {}, + }, + new: map[schema.GroupVersion]struct{}{ + {Group: "apps", Version: "v1"}: {}, + {Group: "batch", Version: "v1"}: {}, + }, + wantChanged: true, + wantDeleted: nil, // No deletions, only addition + }, + { + name: "removed GV", + old: map[schema.GroupVersion]struct{}{ + {Group: "apps", Version: "v1"}: {}, + {Group: "batch", Version: "v1"}: {}, + }, + new: map[schema.GroupVersion]struct{}{ + {Group: "apps", Version: "v1"}: {}, + }, + wantChanged: true, + wantDeleted: []schema.GroupVersion{ + {Group: "batch", Version: "v1"}, + }, + }, + { + name: "different GVs same size", + old: map[schema.GroupVersion]struct{}{ + {Group: "apps", Version: "v1"}: {}, + }, + new: map[schema.GroupVersion]struct{}{ + {Group: "batch", Version: "v1"}: {}, + }, + wantChanged: true, + wantDeleted: []schema.GroupVersion{ + {Group: "apps", Version: "v1"}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + deletedGVs, changed := diffGVs(tt.old, tt.new) + if changed != tt.wantChanged { + t.Errorf("diffGVs() changed = %v, want %v", changed, tt.wantChanged) + } + if len(deletedGVs) != len(tt.wantDeleted) { + t.Errorf("diffGVs() deleted count = %d, want %d", len(deletedGVs), len(tt.wantDeleted)) + } + for _, wantGV := range tt.wantDeleted { + if !slices.Contains(deletedGVs, wantGV) { + t.Errorf("diffGVs() deleted missing GV %v", wantGV) + } + } + }) + } +} + +func TestFilterPeerDiscoveryCache(t *testing.T) { + tests := []struct { + name string + activeGVs map[schema.GroupVersion]struct{} + deletedGVs map[schema.GroupVersion]time.Time + cacheMap map[string]PeerDiscoveryCacheEntry + wantPeerGVRs map[string]int // peer name -> GVR count + }{ + { + name: "empty exclusion no changes", + activeGVs: nil, + cacheMap: map[string]PeerDiscoveryCacheEntry{ + "peer1": { + GVRs: map[schema.GroupVersionResource]bool{ + {Group: "apps", Version: "v1", Resource: "deployments"}: true, + }, + }, + "peer2": { + GVRs: map[schema.GroupVersionResource]bool{ + {Group: "batch", Version: "v1", Resource: "jobs"}: true, + }, + }, + }, + wantPeerGVRs: map[string]int{ + "peer1": 1, + "peer2": 1, + }, + }, + { + name: "filter active GVs", + activeGVs: map[schema.GroupVersion]struct{}{ + {Group: "apps", Version: "v1"}: {}, + }, + cacheMap: map[string]PeerDiscoveryCacheEntry{ + "peer1": { + GVRs: map[schema.GroupVersionResource]bool{ + {Group: "apps", Version: "v1", Resource: "deployments"}: true, + }, + }, + "peer2": { + GVRs: map[schema.GroupVersionResource]bool{ + {Group: "batch", Version: "v1", Resource: "jobs"}: true, + }, + }, + }, + wantPeerGVRs: map[string]int{ + "peer1": 0, // apps/v1 filtered out + "peer2": 1, // unchanged + }, + }, + { + // Recently deleted GVs are still filtered from peer discovery during the + // grace period (before the reaper cleans them up) to avoid routing requests + // to peers for GVs that were just deleted locally. + name: "filter deleted GVs", + deletedGVs: map[schema.GroupVersion]time.Time{ + {Group: "custom", Version: "v1alpha1"}: time.Now(), + }, + cacheMap: map[string]PeerDiscoveryCacheEntry{ + "peer1": { + GVRs: map[schema.GroupVersionResource]bool{ + {Group: "custom", Version: "v1alpha1", Resource: "myresources"}: true, + }, + }, + "peer2": { + GVRs: map[schema.GroupVersionResource]bool{ + {Group: "apps", Version: "v1", Resource: "deployments"}: true, + }, + }, + }, + wantPeerGVRs: map[string]int{ + "peer1": 0, // custom/v1alpha1 filtered out + "peer2": 1, // unchanged + }, + }, + { + // Both active GVs and recently deleted GVs (within grace period, not yet + // cleaned up by the reaper) are filtered from peer discovery. + name: "filter both active and deleted GVs", + activeGVs: map[schema.GroupVersion]struct{}{ + {Group: "apps", Version: "v1"}: {}, + }, + deletedGVs: map[schema.GroupVersion]time.Time{ + {Group: "custom", Version: "v1alpha1"}: time.Now(), + }, + cacheMap: map[string]PeerDiscoveryCacheEntry{ + "peer1": { + GVRs: map[schema.GroupVersionResource]bool{ + {Group: "apps", Version: "v1", Resource: "deployments"}: true, + {Group: "custom", Version: "v1alpha1", Resource: "myresources"}: true, + {Group: "batch", Version: "v1", Resource: "jobs"}: true, + }, + }, + }, + wantPeerGVRs: map[string]int{ + "peer1": 1, // apps/v1 and custom/v1alpha1 filtered out, batch/v1 remains + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mgr := NewGVExclusionManager(5*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{}) + + if tt.activeGVs != nil { + mgr.currentlyActiveGVs.Store(tt.activeGVs) + } + if tt.deletedGVs != nil { + mgr.recentlyDeletedGVs.Store(tt.deletedGVs) + } + + filtered := mgr.filterPeerDiscoveryCache(tt.cacheMap) + + if len(filtered) != len(tt.cacheMap) { + t.Errorf("Want %d peers in filtered cache, got %d", len(tt.cacheMap), len(filtered)) + } + + for peerName, wantCount := range tt.wantPeerGVRs { + entry, found := filtered[peerName] + if !found { + t.Errorf("Peer %s not found in filtered cache", peerName) + continue + } + if len(entry.GVRs) != wantCount { + t.Errorf("Peer %s: want %d GVRs, got %d", peerName, wantCount, len(entry.GVRs)) + } + } + }) + } +} + +func TestFilterPeerCacheEntry(t *testing.T) { + tests := []struct { + name string + entry PeerDiscoveryCacheEntry + exclusionSet map[schema.GroupVersion]struct{} + wantGVRs []schema.GroupVersionResource + }{ + { + name: "empty exclusion set returns entry unchanged", + entry: PeerDiscoveryCacheEntry{ + GVRs: map[schema.GroupVersionResource]bool{ + {Group: "apps", Version: "v1", Resource: "deployments"}: true, + {Group: "batch", Version: "v1", Resource: "jobs"}: true, + }, + GroupDiscovery: []apidiscoveryv2.APIGroupDiscovery{ + { + ObjectMeta: metav1.ObjectMeta{Name: "apps"}, + Versions: []apidiscoveryv2.APIVersionDiscovery{ + {Version: "v1"}, + }, + }, + }, + }, + exclusionSet: map[schema.GroupVersion]struct{}{}, + wantGVRs: []schema.GroupVersionResource{ + {Group: "apps", Version: "v1", Resource: "deployments"}, + {Group: "batch", Version: "v1", Resource: "jobs"}, + }, + }, + { + name: "filter GVRs and groups", + entry: PeerDiscoveryCacheEntry{ + GVRs: map[schema.GroupVersionResource]bool{ + {Group: "apps", Version: "v1", Resource: "deployments"}: true, + {Group: "batch", Version: "v1", Resource: "jobs"}: true, + {Group: "custom", Version: "v1", Resource: "myresources"}: true, + }, + GroupDiscovery: []apidiscoveryv2.APIGroupDiscovery{ + { + ObjectMeta: metav1.ObjectMeta{Name: "apps"}, + Versions: []apidiscoveryv2.APIVersionDiscovery{ + {Version: "v1"}, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "batch"}, + Versions: []apidiscoveryv2.APIVersionDiscovery{ + {Version: "v1"}, + }, + }, + }, + }, + exclusionSet: map[schema.GroupVersion]struct{}{ + {Group: "apps", Version: "v1"}: {}, + }, + wantGVRs: []schema.GroupVersionResource{ + {Group: "batch", Version: "v1", Resource: "jobs"}, + {Group: "custom", Version: "v1", Resource: "myresources"}, + }, + }, + { + name: "no changes when exclusion doesn't match", + entry: PeerDiscoveryCacheEntry{ + GVRs: map[schema.GroupVersionResource]bool{ + {Group: "batch", Version: "v1", Resource: "jobs"}: true, + }, + }, + exclusionSet: map[schema.GroupVersion]struct{}{ + {Group: "apps", Version: "v1"}: {}, + }, + wantGVRs: []schema.GroupVersionResource{ + {Group: "batch", Version: "v1", Resource: "jobs"}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mgr := NewGVExclusionManager(5*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{}) + + filtered := mgr.filterPeerCacheEntry(tt.entry, tt.exclusionSet) + + for _, gvr := range tt.wantGVRs { + if _, found := filtered.GVRs[gvr]; !found { + t.Errorf("Want GVR %v to be present", gvr) + } + } + }) + } +} + +func TestFilterGroupDiscovery(t *testing.T) { + tests := []struct { + name string + groupDiscoveries []apidiscoveryv2.APIGroupDiscovery + exclusionSet map[schema.GroupVersion]struct{} + wantGroups map[string][]string // group name -> list of versions + }{ + { + name: "partial version exclusion", + groupDiscoveries: []apidiscoveryv2.APIGroupDiscovery{ + { + ObjectMeta: metav1.ObjectMeta{Name: "apps"}, + Versions: []apidiscoveryv2.APIVersionDiscovery{ + {Version: "v1"}, + {Version: "v1beta1"}, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "batch"}, + Versions: []apidiscoveryv2.APIVersionDiscovery{ + {Version: "v1"}, + }, + }, + }, + exclusionSet: map[schema.GroupVersion]struct{}{ + {Group: "apps", Version: "v1"}: {}, + }, + wantGroups: map[string][]string{ + "apps": {"v1beta1"}, + "batch": {"v1"}, + }, + }, + { + name: "all versions excluded", + groupDiscoveries: []apidiscoveryv2.APIGroupDiscovery{ + { + ObjectMeta: metav1.ObjectMeta{Name: "apps"}, + Versions: []apidiscoveryv2.APIVersionDiscovery{ + {Version: "v1"}, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{Name: "batch"}, + Versions: []apidiscoveryv2.APIVersionDiscovery{ + {Version: "v1"}, + }, + }, + }, + exclusionSet: map[schema.GroupVersion]struct{}{ + {Group: "apps", Version: "v1"}: {}, + }, + wantGroups: map[string][]string{ + "batch": {"v1"}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mgr := NewGVExclusionManager(5*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{}) + + filtered := mgr.filterGroupDiscovery(tt.groupDiscoveries, tt.exclusionSet) + for groupName, wantVersionStrs := range tt.wantGroups { + var group *apidiscoveryv2.APIGroupDiscovery + for i := range filtered { + if filtered[i].Name == groupName { + group = &filtered[i] + break + } + } + + if group == nil { + t.Errorf("Want group %s not found in filtered results", groupName) + continue + } + + if len(group.Versions) != len(wantVersionStrs) { + t.Errorf("Group %s: want %d versions, got %d", groupName, len(wantVersionStrs), len(group.Versions)) + continue + } + + for _, wantVer := range wantVersionStrs { + found := false + for _, ver := range group.Versions { + if ver.Version == wantVer { + found = true + break + } + } + if !found { + t.Errorf("Group %s: want version %s not found", groupName, wantVer) + } + } + } + }) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery.go index 12d06dd4a7e..041377d0817 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery.go +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery.go @@ -115,25 +115,13 @@ func (h *peerProxyHandler) syncPeerDiscoveryCache(ctx context.Context) error { } } - // Apply exclusion filter to the cache. - if len(newCache) != 0 { - if filteredCache, peerDiscoveryChanged := h.filterPeerDiscoveryCache(newCache); peerDiscoveryChanged { - newCache = filteredCache - } - } - - h.storePeerDiscoveryCacheAndInvalidate(newCache) + // Store unfiltered data to raw cache and trigger refilter. + // The refilter worker (single writer to filtered cache) will apply exclusions. + h.rawPeerDiscoveryCache.Store(newCache) + h.gvExclusionManager.TriggerRefilter() return fetchDiscoveryErr } -// storePeerDiscoveryCacheAndInvalidate stores the new peer discovery cache and always calls the invalidation callback if set. -func (h *peerProxyHandler) storePeerDiscoveryCacheAndInvalidate(newCache map[string]PeerDiscoveryCacheEntry) { - h.peerDiscoveryInfoCache.Store(newCache) - if callback := h.cacheInvalidationCallback.Load(); callback != nil { - (*callback)() - } -} - func (h *peerProxyHandler) fetchNewDiscoveryFor(ctx context.Context, serverID string) (PeerDiscoveryCacheEntry, error) { hostport, err := h.hostportInfo(serverID) if err != nil { @@ -287,14 +275,8 @@ func (h *peerProxyHandler) isValidPeerIdentityLease(obj interface{}) (*v1.Lease, func (h *peerProxyHandler) findServiceableByPeerFromPeerDiscoveryCache(gvr schema.GroupVersionResource) []string { var serviceableByIDs []string - cache := h.peerDiscoveryInfoCache.Load() - if cache == nil { - return serviceableByIDs - } - - cacheMap, ok := cache.(map[string]PeerDiscoveryCacheEntry) - if !ok { - klog.Warning("Invalid cache type in peerDiscoveryInfoCache") + cacheMap := h.gvExclusionManager.GetFilteredPeerDiscoveryCache() + if len(cacheMap) == 0 { return serviceableByIDs } diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery_test.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery_test.go index e15e4c314b9..a275e112ff6 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery_test.go @@ -192,6 +192,7 @@ func TestRunPeerDiscoveryCacheSync(t *testing.T) { } go h.RunPeerDiscoveryCacheSync(ctx, 1) + go h.RunPeerDiscoveryRefilter(ctx) // Wait for initial cache update. initialCache := map[string]PeerDiscoveryCacheEntry{} @@ -204,7 +205,7 @@ func TestRunPeerDiscoveryCacheSync(t *testing.T) { return false, ctx.Err() default: } - gotCache := h.peerDiscoveryInfoCache.Load() + gotCache := h.gvExclusionManager.GetFilteredPeerDiscoveryCache() return assert.ObjectsAreEqual(initialCache, gotCache), nil }) if err != nil { @@ -251,7 +252,7 @@ func TestRunPeerDiscoveryCacheSync(t *testing.T) { return false, ctx.Err() default: } - gotCache := h.peerDiscoveryInfoCache.Load() + gotCache := h.gvExclusionManager.GetFilteredPeerDiscoveryCache() r := assert.ObjectsAreEqual(tt.wantCache, gotCache) return r, nil }) diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy.go index e89eea6ad35..9644be4cfd0 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy.go +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy.go @@ -49,15 +49,12 @@ const ( // available on this server. localDiscoveryRefreshInterval = 30 * time.Minute // defaultExclusionGracePeriod is the default duration to wait before - // removing a group from the exclusion set after it is deleted from + // removing a groupversion from the exclusion set after it is deleted from // CRDs and aggregated APIs. // This is to allow time for all peer API servers to also observe // the deleted CRD or aggregated API before this server stops excluding it // in peer-aggregated discovery and while proxying requests to peers. defaultExclusionGracePeriod = 5 * time.Minute - // defaultExclusionReaperInterval is the interval at which the we - // clean up deleted groups from the exclusion list. - defaultExclusionReaperInterval = 1 * time.Minute ) // Interface defines how the Mixed Version Proxy filter interacts with the underlying system. @@ -67,12 +64,49 @@ type Interface interface { HasFinishedSync() bool RunLocalDiscoveryCacheSync(stopCh <-chan struct{}) error RunPeerDiscoveryCacheSync(ctx context.Context, workers int) - RunExcludedGVsReaper(stopCh <-chan struct{}) - RunGVDeletionWorkers(ctx context.Context, workers int) GetPeerResources() map[string][]apidiscoveryv2.APIGroupDiscovery RegisterCacheInvalidationCallback(cb func()) + + // RegisterCRDInformerHandlers registers event handlers on the CRD informer to track + // which GroupVersions are served locally by CRDs. When a CRD is created or updated, + // its GV is added to the exclusion set. When deleted, the GV is marked for exclusion + // during a grace period to allow peers to observe the deletion. The extractor function + // extracts the GroupVersion from a CRD object. + // + // This exclusion is necessary because peer discovery is not refreshed when a local + // CRD is deleted. Without exclusion, the deleted GV might still appear in cached peer + // discovery data, causing requests to be incorrectly routed to a peer for a GV that + // no longer exists locally. Therefore, we intentionally exclude CRD GVs from peer + // discovery from the start and only rely on the local apiserver's view of the CRD + // to serve it in peer-aggregated discovery. RegisterCRDInformerHandlers(crdInformer cache.SharedIndexInformer, extractor GVExtractor) error + + // RegisterAPIServiceInformerHandlers registers event handlers on the APIService informer + // to track which GroupVersions are served locally by aggregated APIServices. When an + // APIService is created or updated, its GV is added to the exclusion set. When deleted, + // the GV is marked for exclusion during a grace period. + // + // This exclusion is necessary because peer discovery is not refreshed when a local + // aggregated APIService is deleted. Without exclusion, the deleted GV might still appear + // in cached peer discovery data, causing requests to be incorrectly routed to a peer. + // Therefore, we intentionally exclude aggregated APIService GVs from peer discovery + // from the start and only rely on the local apiserver's view to serve them in + // peer-aggregated discovery. RegisterAPIServiceInformerHandlers(apiServiceInformer cache.SharedIndexInformer, extractor GVExtractor) error + + // RunPeerDiscoveryActiveGVTracker starts a worker that processes CRD/APIService informer + // events to rebuild the set of actively served GroupVersions. This worker is triggered + // whenever a CRD or APIService is added or updated and updates the exclusion + // set accordingly. When a GV is deleted, a delayed sync is automatically scheduled + // after the grace period to reap expired GVs from the exclusion set. + RunPeerDiscoveryActiveGVTracker(ctx context.Context) + + // RunPeerDiscoveryRefilter starts a worker that re-applies exclusion filtering to the + // cached peer discovery data whenever the exclusion set changes. This ensures that + // already-cached peer discovery responses are immediately updated to exclude newly added + // or updated local GVs, rather than waiting for the next peer lease event to trigger a + // cache refresh of peer discovery data. + RunPeerDiscoveryRefilter(ctx context.Context) } // New creates a new instance to implement unknown version proxy @@ -95,23 +129,21 @@ func NewPeerProxyHandler( localDiscoveryInfoCache: atomic.Value{}, localDiscoveryCacheTicker: time.NewTicker(localDiscoveryRefreshInterval), localDiscoveryInfoCachePopulated: make(chan struct{}), - peerDiscoveryInfoCache: atomic.Value{}, + rawPeerDiscoveryCache: atomic.Value{}, peerLeaseQueue: workqueue.NewTypedRateLimitingQueueWithConfig( workqueue.DefaultTypedControllerRateLimiter[string](), workqueue.TypedRateLimitingQueueConfig[string]{ Name: peerDiscoveryControllerName, }), apiserverIdentityInformer: leaseInformer, - excludedGVs: make(map[schema.GroupVersion]*time.Time), - exclusionGracePeriod: defaultExclusionGracePeriod, - reaperCheckInterval: defaultExclusionReaperInterval, - gvDeletionQueue: workqueue.NewTypedRateLimitingQueueWithConfig( - workqueue.DefaultTypedControllerRateLimiter[string](), - workqueue.TypedRateLimitingQueueConfig[string]{ - Name: "gv-deletion", - }), } + h.gvExclusionManager = NewGVExclusionManager( + defaultExclusionGracePeriod, + &h.rawPeerDiscoveryCache, + &h.cacheInvalidationCallback, + ) + if parts := strings.Split(identityLeaseLabelSelector, "="); len(parts) != 2 { return nil, fmt.Errorf("invalid identityLeaseLabelSelector provided, must be of the form key=value, received: %v", identityLeaseLabelSelector) } @@ -134,7 +166,7 @@ func NewPeerProxyHandler( discoveryClient.NoPeerDiscovery = true h.discoveryClient = discoveryClient h.localDiscoveryInfoCache.Store(map[schema.GroupVersionResource]bool{}) - h.peerDiscoveryInfoCache.Store(map[string]PeerDiscoveryCacheEntry{}) + h.rawPeerDiscoveryCache.Store(map[string]PeerDiscoveryCacheEntry{}) proxyTransport, err := transport.New(proxyClientConfig) if err != nil { @@ -169,3 +201,33 @@ func NewPeerProxyHandler( h.leaseRegistration = peerDiscoveryRegistration return h, nil } + +// RegisterCRDInformerHandlers registers event handlers for CRD informer. +func (h *peerProxyHandler) RegisterCRDInformerHandlers(crdInformer cache.SharedIndexInformer, extractor GVExtractor) error { + if h.gvExclusionManager != nil { + return h.gvExclusionManager.RegisterCRDInformerHandlers(crdInformer, extractor) + } + return nil +} + +// RegisterAPIServiceInformerHandlers registers event handlers for APIService informer. +func (h *peerProxyHandler) RegisterAPIServiceInformerHandlers(apiServiceInformer cache.SharedIndexInformer, extractor GVExtractor) error { + if h.gvExclusionManager != nil { + return h.gvExclusionManager.RegisterAPIServiceInformerHandlers(apiServiceInformer, extractor) + } + return nil +} + +// RunPeerDiscoveryActiveGVTracker starts the worker that tracks active GVs from CRDs/APIServices. +func (h *peerProxyHandler) RunPeerDiscoveryActiveGVTracker(ctx context.Context) { + if h.gvExclusionManager != nil { + h.gvExclusionManager.RunPeerDiscoveryActiveGVTracker(ctx) + } +} + +// RunPeerDiscoveryRefilter starts the worker that refilters peer discovery cache. +func (h *peerProxyHandler) RunPeerDiscoveryRefilter(ctx context.Context) { + if h.gvExclusionManager != nil { + h.gvExclusionManager.RunPeerDiscoveryRefilter(ctx) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler.go index aed2909ca9a..cd7227f220e 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler.go +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler.go @@ -79,36 +79,17 @@ type peerProxyHandler struct { localDiscoveryCacheTicker *time.Ticker localDiscoveryInfoCachePopulated chan struct{} localDiscoveryInfoCachePopulatedOnce sync.Once - // Cache that stores resources and groups served by peer apiservers. - // The map is from string to PeerDiscoveryCacheEntry where the string - // is the serverID of the peer apiserver. - // Refreshed if a new apiserver identity lease is added, deleted or - // holderIndentity change is observed in the lease. - peerDiscoveryInfoCache atomic.Value // map[string]struct{GVRs map[schema.GroupVersionResource]bool; Groups []apidiscoveryv2.APIGroupDiscovery} - proxyTransport http.RoundTripper - // Worker queue that keeps the peerDiscoveryInfoCache up-to-date. + // rawPeerDiscoveryCache stores unfiltered resources and groups served by peer apiservers. + // The map is from string (serverID) to PeerDiscoveryCacheEntry. + // Written ONLY by peerLeaseQueue worker when peer leases change. + rawPeerDiscoveryCache atomic.Value // map[string]PeerDiscoveryCacheEntry + proxyTransport http.RoundTripper + // Worker queue that keeps the rawPeerDiscoveryCache up-to-date. peerLeaseQueue workqueue.TypedRateLimitingInterface[string] serializer runtime.NegotiatedSerializer cacheInvalidationCallback atomic.Pointer[func()] - // Exclusion set for groups that should not be included in peer proxying - // or peer-aggregated discovery (e.g., CRDs/APIServices) - // - // This map has three states for a group: - // - Not in map: Group is not excluded. - // - In map, value is nil: Group is actively excluded. - // - In map, value is non-nil: Group is pending deletion (grace period). - excludedGVs map[schema.GroupVersion]*time.Time - excludedGVsMu sync.RWMutex - crdInformer cache.SharedIndexInformer - crdExtractor GVExtractor - apiServiceInformer cache.SharedIndexInformer - apiServiceExtractor GVExtractor - - exclusionGracePeriod time.Duration - reaperCheckInterval time.Duration - - // Worker queue for processing GV deletions asynchronously - gvDeletionQueue workqueue.TypedRateLimitingInterface[string] + // Manager for GV exclusions (CRDs/APIServices) + gvExclusionManager *GVExclusionManager } // PeerDiscoveryCacheEntry holds the GVRs and group-level discovery info for a peer. @@ -141,12 +122,10 @@ func (h *peerProxyHandler) WaitForCacheSync(stopCh <-chan struct{}) error { return fmt.Errorf("error while waiting for peer-identity-lease event handler registration sync") } - if h.crdInformer != nil && !cache.WaitForNamedCacheSync("peer-discovery-crd-informer", stopCh, h.crdInformer.HasSynced) { - return fmt.Errorf("error while waiting for crd informer sync") - } - - if h.apiServiceInformer != nil && !cache.WaitForNamedCacheSync("peer-discovery-api-service-informer", stopCh, h.apiServiceInformer.HasSynced) { - return fmt.Errorf("error while waiting for apiservice informer sync") + if h.gvExclusionManager != nil { + if !h.gvExclusionManager.WaitForCacheSync(stopCh) { + return fmt.Errorf("error while waiting for gv exclusion manager cache sync") + } } // Wait for localDiscoveryInfoCache to be populated. @@ -303,16 +282,9 @@ func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) { // Returns a map of serverID -> []apidiscoveryv2.APIGroupDiscovery served by peer servers func (h *peerProxyHandler) GetPeerResources() map[string][]apidiscoveryv2.APIGroupDiscovery { result := make(map[string][]apidiscoveryv2.APIGroupDiscovery) - - peerCache := h.peerDiscoveryInfoCache.Load() - if peerCache == nil { - klog.V(4).Infof("GetPeerResources: peer cache is nil") - return result - } - - cacheMap, ok := peerCache.(map[string]PeerDiscoveryCacheEntry) - if !ok { - klog.Warning("Invalid cache type in peerDiscoveryGVRCache") + cacheMap := h.gvExclusionManager.GetFilteredPeerDiscoveryCache() + if len(cacheMap) == 0 { + klog.V(4).Infof("GetPeerResources: peer cache is empty") return result } diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler_test.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler_test.go index 95983701142..afa4ca86ef4 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler_test.go @@ -387,7 +387,7 @@ func newFakePeerProxyHandler(informerFinishedSync bool, return nil, err } ppH.localDiscoveryInfoCache.Store(localCache) - ppH.peerDiscoveryInfoCache.Store(peerCache) + ppH.gvExclusionManager.filteredPeerDiscoveryCache.Store(peerCache) ppH.finishedSync.Store(informerFinishedSync) return ppH, nil