Merge pull request #135675 from richabanker/merged-discovery

Peer-aggregated discovery: add GV Exclusion Manager
This commit is contained in:
Kubernetes Prow Robot 2026-02-11 06:12:07 +05:30 committed by GitHub
commit 46ac9df8c8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 1188 additions and 602 deletions

View file

@ -216,23 +216,11 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele
return err
})
// Run peer-discovery sync loop
s.GenericAPIServer.AddPostStartHookOrDie("peer-discovery-cache-sync", func(context genericapiserver.PostStartHookContext) error {
// Run peer-discovery workers
s.GenericAPIServer.AddPostStartHookOrDie("peer-discovery-workers", func(context genericapiserver.PostStartHookContext) error {
go c.Extra.PeerProxy.RunPeerDiscoveryCacheSync(context, 1)
return nil
})
// RunGVDeletionWorkers processes GVs from deleted CRDs/APIServices. If a GV is no longer in use,
// it is marked for removal from peer-discovery (with a deletion timestamp), triggering a grace period before cleanup.
s.GenericAPIServer.AddPostStartHookOrDie("gv-deletion-workers", func(context genericapiserver.PostStartHookContext) error {
go c.Extra.PeerProxy.RunGVDeletionWorkers(context, 1)
return nil
})
// RunExcludedGVsReaper removes GVs from the peer-discovery exclusion list after their grace period expires.
// This ensures we don't include stale CRDs/aggregated APIs from peer discovery in the aggregated discovery.
s.GenericAPIServer.AddPostStartHookOrDie("excluded-groups-reaper", func(context genericapiserver.PostStartHookContext) error {
go c.Extra.PeerProxy.RunExcludedGVsReaper(context.Done())
go c.Extra.PeerProxy.RunPeerDiscoveryActiveGVTracker(context)
go c.Extra.PeerProxy.RunPeerDiscoveryRefilter(context)
return nil
})
}

View file

@ -1,368 +0,0 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package peerproxy
import (
"context"
"time"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
apidiscoveryv2 "k8s.io/api/apidiscovery/v2"
)
// GVExtractor is a function that extracts group-versions from an object.
// It returns a slice of GroupVersions belonging to CRDs or aggregated APIs that
// should be excluded from peer-discovery to avoid advertising stale CRDs/aggregated APIs
// in peer-aggregated discovery that were deleted but still appear in a peer's discovery.
type GVExtractor func(obj interface{}) []schema.GroupVersion
// RegisterCRDInformerHandlers registers event handlers for CRD informer using a custom extractor.
// The extractor function is responsible for extracting GroupVersions from CRD objects.
func (h *peerProxyHandler) RegisterCRDInformerHandlers(crdInformer cache.SharedIndexInformer, extractor GVExtractor) error {
h.crdInformer = crdInformer
h.crdExtractor = extractor
_, err := h.crdInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
gvs := extractor(obj)
if gvs == nil {
return
}
h.addExcludedGVs(gvs)
},
UpdateFunc: func(oldObj, newObj interface{}) {
gvs := extractor(newObj)
if gvs == nil {
return
}
h.addExcludedGVs(gvs)
},
DeleteFunc: func(obj interface{}) {
// Handle tombstone objects
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = tombstone.Obj
}
gvs := extractor(obj)
if gvs == nil {
return
}
for _, gv := range gvs {
h.gvDeletionQueue.Add(gv.String())
}
},
})
return err
}
// RegisterAPIServiceInformerHandlers registers event handlers for APIService informer using a custom extractor.
// The extractor function is responsible for extracting GroupVersions from APIService objects
// and determining if they represent aggregated APIs.
func (h *peerProxyHandler) RegisterAPIServiceInformerHandlers(apiServiceInformer cache.SharedIndexInformer, extractor GVExtractor) error {
h.apiServiceInformer = apiServiceInformer
h.apiServiceExtractor = extractor
_, err := h.apiServiceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
gvs := extractor(obj)
if gvs == nil {
return
}
h.addExcludedGVs(gvs)
},
UpdateFunc: func(oldObj, newObj interface{}) {
gvs := extractor(newObj)
if gvs == nil {
return
}
h.addExcludedGVs(gvs)
},
DeleteFunc: func(obj interface{}) {
// Handle tombstone objects
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = tombstone.Obj
}
gvs := extractor(obj)
if gvs == nil {
return
}
for _, gv := range gvs {
h.gvDeletionQueue.Add(gv.String())
}
},
})
return err
}
// addExcludedGVs adds group-versions to the exclusion set, filters them from the peer discovery cache,
// and triggers invalidation callbacks if the cache changed.
func (h *peerProxyHandler) addExcludedGVs(gvs []schema.GroupVersion) {
if len(gvs) == 0 {
return
}
h.excludedGVsMu.Lock()
for _, gv := range gvs {
h.excludedGVs[gv] = nil
}
h.excludedGVsMu.Unlock()
// Load current peer cache and filter it
cache := h.peerDiscoveryInfoCache.Load()
if cache == nil {
return
}
cacheMap, ok := cache.(map[string]PeerDiscoveryCacheEntry)
if !ok {
klog.Warning("Invalid cache type in peerDiscoveryInfoCache")
return
}
// Always trigger filter and callbacks, to ensure late-appearing GVs are filtered.
if peerDiscovery, peerDiscoveryChanged := h.filterPeerDiscoveryCache(cacheMap); peerDiscoveryChanged {
h.storePeerDiscoveryCacheAndInvalidate(peerDiscovery)
}
}
// filterPeerDiscoveryCache filters the provided peer discovery cache,
// excluding GVs in h.excludedGVs.
func (h *peerProxyHandler) filterPeerDiscoveryCache(cacheMap map[string]PeerDiscoveryCacheEntry) (map[string]PeerDiscoveryCacheEntry, bool) {
h.excludedGVsMu.RLock()
if len(h.excludedGVs) == 0 {
// No exclusions, no filtering needed.
h.excludedGVsMu.RUnlock()
return cacheMap, false
}
excludedCloned := make(map[schema.GroupVersion]struct{}, len(h.excludedGVs))
for gv := range h.excludedGVs {
excludedCloned[gv] = struct{}{}
}
h.excludedGVsMu.RUnlock()
filtered := make(map[string]PeerDiscoveryCacheEntry, len(cacheMap))
peerDiscoveryChanged := false
for peerID, entry := range cacheMap {
newEntry, peerChanged := h.filterPeerCacheEntry(entry, excludedCloned)
filtered[peerID] = newEntry
if peerChanged {
peerDiscoveryChanged = true
}
}
return filtered, peerDiscoveryChanged
}
// filterPeerCacheEntry filters a single peer's cache entry for excluded groups.
// Returns the filtered entry and whether any excluded group was present.
func (h *peerProxyHandler) filterPeerCacheEntry(entry PeerDiscoveryCacheEntry, excluded map[schema.GroupVersion]struct{}) (PeerDiscoveryCacheEntry, bool) {
filteredGVRs := make(map[schema.GroupVersionResource]bool, len(entry.GVRs))
peerDiscoveryChanged := false
for existingGVR, v := range entry.GVRs {
gv := schema.GroupVersion{Group: existingGVR.Group, Version: existingGVR.Version}
if _, skip := excluded[gv]; skip {
peerDiscoveryChanged = true
continue
}
filteredGVRs[existingGVR] = v
}
if !peerDiscoveryChanged {
return entry, false
}
// Filter the group discovery list
filteredGroups := h.filterGroupDiscovery(entry.GroupDiscovery, excluded)
return PeerDiscoveryCacheEntry{
GVRs: filteredGVRs,
GroupDiscovery: filteredGroups,
}, true
}
func (h *peerProxyHandler) filterGroupDiscovery(groupDiscoveries []apidiscoveryv2.APIGroupDiscovery, excluded map[schema.GroupVersion]struct{}) []apidiscoveryv2.APIGroupDiscovery {
var filteredDiscovery []apidiscoveryv2.APIGroupDiscovery
for _, groupDiscovery := range groupDiscoveries {
filteredGroup := apidiscoveryv2.APIGroupDiscovery{
ObjectMeta: groupDiscovery.ObjectMeta,
}
for _, version := range groupDiscovery.Versions {
gv := schema.GroupVersion{Group: groupDiscovery.Name, Version: version.Version}
if _, found := excluded[gv]; found {
// This version is excluded, skip it
continue
}
filteredGroup.Versions = append(filteredGroup.Versions, version)
}
// Only add the group to the final list if it still has any versions left
if len(filteredGroup.Versions) > 0 {
filteredDiscovery = append(filteredDiscovery, filteredGroup)
}
}
return filteredDiscovery
}
// RunGVDeletionWorkers starts workers to process GroupVersions from deleted CRDs and APIServices.
// When a GV is processed, it is checked for usage by other resources. If no longer in use,
// it is marked with a deletion timestamp, initiating a grace period. After this period,
// the RunExcludedGVsReaper is responsible for removing the GV from the exclusion set.
func (h *peerProxyHandler) RunGVDeletionWorkers(ctx context.Context, workers int) {
defer func() {
klog.Info("Shutting down GV deletion workers")
h.gvDeletionQueue.ShutDown()
}()
klog.Infof("Starting %d GV deletion worker(s)", workers)
for i := 0; i < workers; i++ {
go func(workerID int) {
for h.processNextGVDeletion() {
select {
case <-ctx.Done():
klog.Infof("GV deletion worker %d shutting down", workerID)
return
default:
}
}
}(i)
}
<-ctx.Done() // Wait till context is done to call deferred shutdown function
}
// processNextGVDeletion processes a single item from the GV deletion queue.
func (h *peerProxyHandler) processNextGVDeletion() bool {
gvString, shutdown := h.gvDeletionQueue.Get()
if shutdown {
return false
}
defer h.gvDeletionQueue.Done(gvString)
gv, err := schema.ParseGroupVersion(gvString)
if err != nil {
klog.Errorf("Failed to parse GroupVersion %q: %v", gvString, err)
return true
}
h.markGVForDeletionIfUnused(gv)
return true
}
// markGVForDeletionIfUnused checks if a group-version is still in use.
// If not, it marks it for deletion by setting a timestamp.
func (h *peerProxyHandler) markGVForDeletionIfUnused(gv schema.GroupVersion) {
// Best-effort check if GV is still in use. This is racy - a new CRD/APIService could be
// created after this check completes and returns false.
if h.isGVUsed(gv) {
return
}
h.excludedGVsMu.Lock()
defer h.excludedGVsMu.Unlock()
// Only mark if it's currently active (nil timestamp)
if ts, exists := h.excludedGVs[gv]; exists && ts == nil {
now := time.Now()
h.excludedGVs[gv] = &now
klog.V(4).Infof("Marking group-version %q for deletion, grace period started", gv)
}
}
// isGVUsed checks the informer stores to see if any CRD or APIService
// is still using the given group-version.
//
// This is necessary because multiple CRDs or aggregated APIs can share the same group-version,
// but define different resources (kinds) or versions. Deleting one CRD or APIService
// for a group-version does not mean the group-version is entirely gone—other CRDs or APIs in that group-version
// may still exist. We must only remove a group-version from the exclusion set when there are
// no remaining CRDs or APIService objects for that group-version, to ensure we do not prematurely
// allow peer-aggregated discovery or proxying for APIs that are still served locally.
func (h *peerProxyHandler) isGVUsed(gv schema.GroupVersion) bool {
// Check CRD informer store for the specific group-version
if h.crdInformer != nil && h.crdExtractor != nil {
crdList := h.crdInformer.GetStore().List()
for _, item := range crdList {
gvs := h.crdExtractor(item)
if gvs == nil {
continue
}
for _, extractedGV := range gvs {
if extractedGV == gv {
return true
}
}
}
}
// Check APIService informer store for the specific group-version
if h.apiServiceInformer != nil && h.apiServiceExtractor != nil {
apiSvcList := h.apiServiceInformer.GetStore().List()
for _, item := range apiSvcList {
gvs := h.apiServiceExtractor(item)
if gvs == nil {
continue
}
for _, extractedGV := range gvs {
if extractedGV == gv {
return true
}
}
}
}
return false
}
// RunExcludedGVsReaper starts a goroutine that periodically cleans up
// excluded group-versions that have passed their grace period.
func (h *peerProxyHandler) RunExcludedGVsReaper(stopCh <-chan struct{}) {
klog.Infof("Starting excluded group-versions reaper with %s interval", h.reaperCheckInterval)
ticker := time.NewTicker(h.reaperCheckInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
h.reapExcludedGVs()
case <-stopCh:
klog.Info("Shutting down excluded group-versions reaper")
return
}
}
}
// reapExcludedGVs is the garbage collector function.
// It removes group-versions from excludedGVs if their grace period has expired.
func (h *peerProxyHandler) reapExcludedGVs() {
h.excludedGVsMu.Lock()
defer h.excludedGVsMu.Unlock()
now := time.Now()
for gv, deletionTime := range h.excludedGVs {
if deletionTime == nil {
// Still actively excluded
continue
}
if now.Sub(*deletionTime) > h.exclusionGracePeriod {
klog.V(4).Infof("Reaping excluded group-version %q (grace period expired)", gv)
delete(h.excludedGVs, gv)
}
}
}

View file

@ -1,132 +0,0 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package peerproxy
import (
"reflect"
"testing"
"k8s.io/apimachinery/pkg/runtime/schema"
apidiscoveryv2 "k8s.io/api/apidiscovery/v2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func TestFilteringLogic(t *testing.T) {
gvr := func(g, v, r string) schema.GroupVersionResource {
return schema.GroupVersionResource{Group: g, Version: v, Resource: r}
}
gv := func(g, v string) schema.GroupVersion {
return schema.GroupVersion{Group: g, Version: v}
}
testCases := []struct {
name string
initialPeerCache PeerDiscoveryCacheEntry
excludedGVs map[schema.GroupVersion]struct{}
wantFilteredGVRs map[schema.GroupVersionResource]bool
wantFilteredGroups []apidiscoveryv2.APIGroupDiscovery
wantChange bool
}{
{
name: "no exclusions",
initialPeerCache: PeerDiscoveryCacheEntry{
GVRs: map[schema.GroupVersionResource]bool{gvr("apps", "v1", "deployments"): true},
GroupDiscovery: []apidiscoveryv2.APIGroupDiscovery{{
ObjectMeta: metav1.ObjectMeta{Name: "apps"},
Versions: []apidiscoveryv2.APIVersionDiscovery{{Version: "v1"}},
}},
},
excludedGVs: map[schema.GroupVersion]struct{}{},
wantFilteredGVRs: map[schema.GroupVersionResource]bool{gvr("apps", "v1", "deployments"): true},
wantFilteredGroups: []apidiscoveryv2.APIGroupDiscovery{{
ObjectMeta: metav1.ObjectMeta{Name: "apps"},
Versions: []apidiscoveryv2.APIVersionDiscovery{{Version: "v1"}},
}},
wantChange: false,
},
{
name: "exclude one GV that exists",
initialPeerCache: PeerDiscoveryCacheEntry{
GVRs: map[schema.GroupVersionResource]bool{
gvr("apps", "v1", "deployments"): true,
gvr("apps", "v1", "statefulsets"): true,
gvr("batch", "v1", "jobs"): true,
},
GroupDiscovery: []apidiscoveryv2.APIGroupDiscovery{
{
ObjectMeta: metav1.ObjectMeta{Name: "apps"},
Versions: []apidiscoveryv2.APIVersionDiscovery{{Version: "v1"}},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "batch"},
Versions: []apidiscoveryv2.APIVersionDiscovery{{Version: "v1"}},
},
},
},
excludedGVs: map[schema.GroupVersion]struct{}{
gv("apps", "v1"): {},
},
wantFilteredGVRs: map[schema.GroupVersionResource]bool{gvr("batch", "v1", "jobs"): true},
wantFilteredGroups: []apidiscoveryv2.APIGroupDiscovery{
{
ObjectMeta: metav1.ObjectMeta{Name: "batch"},
Versions: []apidiscoveryv2.APIVersionDiscovery{{Version: "v1"}},
},
},
wantChange: true,
},
{
name: "exclude a GV that does not exist",
initialPeerCache: PeerDiscoveryCacheEntry{
GVRs: map[schema.GroupVersionResource]bool{gvr("apps", "v1", "deployments"): true},
GroupDiscovery: []apidiscoveryv2.APIGroupDiscovery{{
ObjectMeta: metav1.ObjectMeta{Name: "apps"},
Versions: []apidiscoveryv2.APIVersionDiscovery{{Version: "v1"}},
}},
},
excludedGVs: map[schema.GroupVersion]struct{}{
gv("foo", "v1"): {},
},
wantFilteredGVRs: map[schema.GroupVersionResource]bool{gvr("apps", "v1", "deployments"): true},
wantFilteredGroups: []apidiscoveryv2.APIGroupDiscovery{{
ObjectMeta: metav1.ObjectMeta{Name: "apps"},
Versions: []apidiscoveryv2.APIVersionDiscovery{{Version: "v1"}},
}},
wantChange: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
h := &peerProxyHandler{}
filteredEntry, changed := h.filterPeerCacheEntry(tc.initialPeerCache, tc.excludedGVs)
if changed != tc.wantChange {
t.Errorf("want change to be %v, got %v", tc.wantChange, changed)
}
if !reflect.DeepEqual(filteredEntry.GVRs, tc.wantFilteredGVRs) {
t.Errorf("filtered GVRs mismatch: got %v, want %v", filteredEntry.GVRs, tc.wantFilteredGVRs)
}
if !reflect.DeepEqual(filteredEntry.GroupDiscovery, tc.wantFilteredGroups) {
t.Errorf("filtered GroupDiscovery mismatch: got %v, want %v", filteredEntry.GroupDiscovery, tc.wantFilteredGroups)
}
})
}
}

View file

@ -0,0 +1,492 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package peerproxy
import (
"context"
"sync/atomic"
"time"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
apidiscoveryv2 "k8s.io/api/apidiscovery/v2"
)
// GVExtractor is a function that extracts group-versions from an object.
// It returns a slice of GroupVersions belonging to CRDs or aggregated APIs that
// should be excluded from peer-discovery to avoid advertising stale CRDs/aggregated APIs
// in peer-aggregated discovery that were deleted but still appear in a peer's discovery.
type GVExtractor func(obj interface{}) []schema.GroupVersion
// GVExclusionManager manages the exclusion of group-versions from peer discovery.
// It maintains two atomic maps:
// - currentlyActiveGVs: All GVs currently served by CRDs and aggregated APIServices
// - recentlyDeletedGVs: GVs belonging to CRDs and aggregated APIServices that were recently deleted,
// tracked with deletion timestamp for grace period
//
// It runs two workers:
// 1. Active GV Tracker: Triggered on CRD/APIService events, rebuilds active GVs
// and reaps expired deleted GVs. When a GV is deleted, a delayed sync is scheduled
// after the grace period to reap it.
// 2. Peer Discovery Re-filter: Rate-limited worker that filters peer cache
type GVExclusionManager struct {
// Atomic maps for lock-free access
currentlyActiveGVs atomic.Value // map[schema.GroupVersion]struct{}
recentlyDeletedGVs atomic.Value // map[schema.GroupVersion]time.Time
// Informers for fetching active GVs
crdInformer cache.SharedIndexInformer
crdExtractor GVExtractor
apiServiceInformer cache.SharedIndexInformer
apiServiceExtractor GVExtractor
// Worker 1: triggered by CRD/APIService events or delayed reap scheduling
activeGVQueue workqueue.TypedRateLimitingInterface[string]
// Grace period before reaping deleted GVs from the exclusion set
exclusionGracePeriod time.Duration
// Worker 2: triggered by Active/Deleted GV changes
refilterQueue workqueue.TypedRateLimitingInterface[string]
// rawPeerDiscoveryCache is written only by peerLeaseQueue worker
// when peer leases change
rawPeerDiscoveryCache *atomic.Value // map[string]PeerDiscoveryCacheEntry
// filteredPeerDiscoveryCache is written only by the refilter worker
// when raw cache or exclusion set changes.
filteredPeerDiscoveryCache atomic.Value // map[string]PeerDiscoveryCacheEntry
invalidationCallback *atomic.Pointer[func()]
}
// NewGVExclusionManager creates a new GV exclusion manager.
func NewGVExclusionManager(
exclusionGracePeriod time.Duration,
rawPeerDiscoveryCache *atomic.Value,
invalidationCallback *atomic.Pointer[func()],
) *GVExclusionManager {
mgr := &GVExclusionManager{
exclusionGracePeriod: exclusionGracePeriod,
rawPeerDiscoveryCache: rawPeerDiscoveryCache,
invalidationCallback: invalidationCallback,
activeGVQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{
Name: "active-gv-tracker",
}),
refilterQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{
Name: "peer-discovery-refilter",
}),
}
mgr.currentlyActiveGVs.Store(map[schema.GroupVersion]struct{}{})
mgr.recentlyDeletedGVs.Store(map[schema.GroupVersion]time.Time{})
mgr.filteredPeerDiscoveryCache.Store(map[string]PeerDiscoveryCacheEntry{})
return mgr
}
// GetFilteredPeerDiscoveryCache returns the filtered peer discovery cache.
func (m *GVExclusionManager) GetFilteredPeerDiscoveryCache() map[string]PeerDiscoveryCacheEntry {
if cache := m.filteredPeerDiscoveryCache.Load(); cache != nil {
if cacheMap, ok := cache.(map[string]PeerDiscoveryCacheEntry); ok {
return cacheMap
}
}
return map[string]PeerDiscoveryCacheEntry{}
}
// RegisterCRDInformerHandlers registers event handlers for CRD informer using a custom extractor.
// The extractor function is responsible for extracting GroupVersions from CRD objects.
func (m *GVExclusionManager) RegisterCRDInformerHandlers(crdInformer cache.SharedIndexInformer, extractor GVExtractor) error {
m.crdInformer = crdInformer
m.crdExtractor = extractor
_, err := m.crdInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
m.handleGVUpdate()
},
UpdateFunc: func(oldObj, newObj interface{}) {
m.handleGVUpdate()
},
DeleteFunc: func(obj interface{}) {
m.handleGVUpdate()
},
})
return err
}
// RegisterAPIServiceInformerHandlers registers event handlers for APIService informer using a custom extractor.
// The extractor function is responsible for extracting GroupVersions from APIService objects
// and determining if they represent aggregated APIs.
func (m *GVExclusionManager) RegisterAPIServiceInformerHandlers(apiServiceInformer cache.SharedIndexInformer, extractor GVExtractor) error {
m.apiServiceInformer = apiServiceInformer
m.apiServiceExtractor = extractor
_, err := m.apiServiceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
m.handleGVUpdate()
},
UpdateFunc: func(oldObj, newObj interface{}) {
m.handleGVUpdate()
},
DeleteFunc: func(obj interface{}) {
m.handleGVUpdate()
},
})
return err
}
// WaitForCacheSync waits for the informer caches to sync.
func (m *GVExclusionManager) WaitForCacheSync(stopCh <-chan struct{}) bool {
synced := []cache.InformerSynced{}
if m.crdInformer != nil {
synced = append(synced, m.crdInformer.HasSynced)
}
if m.apiServiceInformer != nil {
synced = append(synced, m.apiServiceInformer.HasSynced)
}
if len(synced) == 0 {
return true
}
return cache.WaitForNamedCacheSync("gv-exclusion-manager", stopCh, synced...)
}
// getExclusionSet returns the combined exclusion set i.e., union(currentlyActiveGVs, recentlyDeletedGVs)
func (m *GVExclusionManager) getExclusionSet() map[schema.GroupVersion]struct{} {
activeMap := m.loadCurrentlyActiveGVs()
deletedMap := m.loadRecentlyDeletedGVs()
exclusionSet := make(map[schema.GroupVersion]struct{}, len(activeMap)+len(deletedMap))
for gv := range activeMap {
exclusionSet[gv] = struct{}{}
}
for gv := range deletedMap {
exclusionSet[gv] = struct{}{}
}
return exclusionSet
}
// handleGVUpdate is called when a CRD or APIService event occurs.
// This triggers reconciliation which rebuilds the active GV set
// and also reaps expired GVs if indicated so.
func (m *GVExclusionManager) handleGVUpdate() {
m.activeGVQueue.Add("sync")
}
// RunPeerDiscoveryActiveGVTracker runs the Active GV Tracker worker.
// This worker is triggered by CRD/APIService events and
// rebuilds the active GV set and reaps expired GVs.
func (m *GVExclusionManager) RunPeerDiscoveryActiveGVTracker(ctx context.Context) {
defer m.activeGVQueue.ShutDown()
klog.Info("Starting Active GV Tracker worker")
go wait.UntilWithContext(ctx, m.runActiveGVTrackerWorker, time.Second)
<-ctx.Done()
klog.Info("Active GV Tracker workers stopped")
}
func (m *GVExclusionManager) runActiveGVTrackerWorker(ctx context.Context) {
for m.processNextActiveGV(ctx) {
}
}
func (m *GVExclusionManager) processNextActiveGV(ctx context.Context) bool {
key, shutdown := m.activeGVQueue.Get()
if shutdown {
return false
}
defer m.activeGVQueue.Done(key)
select {
case <-ctx.Done():
return false
default:
}
m.reconcileActiveGVs()
return true
}
// reconcileActiveGVs does the following
// 1. fetches all GVs from CRD and APIService informers
// 2. detects diffs with the previous state
// 3. adds deleted GVs to recentlyDeletedGVs
// 4. reaps expired GVs, and queues Worker 2 if changes detected.
func (m *GVExclusionManager) reconcileActiveGVs() {
freshGVs := make(map[schema.GroupVersion]struct{})
// Fetch GVs from CRD informer
if m.crdInformer != nil && m.crdExtractor != nil {
crdList := m.crdInformer.GetStore().List()
for _, item := range crdList {
gvs := m.crdExtractor(item)
for _, gv := range gvs {
freshGVs[gv] = struct{}{}
}
}
}
// Fetch GVs from APIService informer
if m.apiServiceInformer != nil && m.apiServiceExtractor != nil {
apiSvcList := m.apiServiceInformer.GetStore().List()
for _, item := range apiSvcList {
gvs := m.apiServiceExtractor(item)
for _, gv := range gvs {
freshGVs[gv] = struct{}{}
}
}
}
// Load previous active GVs and detect diff
previousGVs := m.loadCurrentlyActiveGVs()
deletedGVs, activeGVsChanged := diffGVs(previousGVs, freshGVs)
// Update recentlyDeletedGVs: add newly deleted GVs and reap expired ones
recentlyDeletedChanged := m.updateRecentlyDeletedGVs(deletedGVs)
if activeGVsChanged || recentlyDeletedChanged {
if activeGVsChanged {
m.currentlyActiveGVs.Store(freshGVs)
klog.V(4).Infof("Active GVs updated: %d GVs now active", len(freshGVs))
}
m.TriggerRefilter()
} else {
klog.V(4).Infof("No changes detected in active or recently deleted GVs")
}
}
// updateRecentlyDeletedGVs adds newly deleted GVs to recentlyDeletedGVs
// and reaps expired ones. Returns true if any changes were made.
func (m *GVExclusionManager) updateRecentlyDeletedGVs(deletedGVs []schema.GroupVersion) bool {
deletedMap := m.loadRecentlyDeletedGVs()
// Early return if nothing to do
if len(deletedGVs) == 0 && len(deletedMap) == 0 {
return false
}
now := time.Now()
newDeletedMap := make(map[schema.GroupVersion]time.Time, len(deletedMap)+len(deletedGVs))
changed := false
// Copy existing entries, reaping expired ones
for gv, deletionTime := range deletedMap {
if now.Sub(deletionTime) > m.exclusionGracePeriod {
klog.V(4).Infof("Reaping GV %s (grace period expired)", gv.String())
changed = true
// Don't add to new map (effectively removing it)
} else {
newDeletedMap[gv] = deletionTime
}
}
// Schedule a delayed sync to reap expired GVs after the grace period.
if len(deletedGVs) > 0 {
m.activeGVQueue.AddAfter("sync", m.exclusionGracePeriod)
}
for _, gv := range deletedGVs {
newDeletedMap[gv] = now
klog.V(4).Infof("GV %s deleted: moved to recentlyDeletedGVs", gv.String())
changed = true
}
if changed {
m.recentlyDeletedGVs.Store(newDeletedMap)
}
return changed
}
// diffGVs compares old and new GV maps, returns GVs that were deleted (in old but not new)
// and a boolean indicating if there were any changes (additions or deletions).
func diffGVs(old, new map[schema.GroupVersion]struct{}) ([]schema.GroupVersion, bool) {
var deletedGVs []schema.GroupVersion
hasChanges := len(old) != len(new)
// Find deleted GVs (in old but not in new)
for gv := range old {
if _, exists := new[gv]; !exists {
deletedGVs = append(deletedGVs, gv)
hasChanges = true
}
}
return deletedGVs, hasChanges
}
// RunPeerDiscoveryRefilter runs the Peer Discovery Re-filter worker.
// This worker filters the peer discovery cache using the exclusion set.
func (m *GVExclusionManager) RunPeerDiscoveryRefilter(ctx context.Context) {
defer m.refilterQueue.ShutDown()
klog.Info("Starting Peer Discovery Re-filter worker")
go wait.UntilWithContext(ctx, m.runRefilterWorker, time.Second)
<-ctx.Done()
klog.Info("Peer Discovery Re-filter workers stopped")
}
func (m *GVExclusionManager) runRefilterWorker(ctx context.Context) {
for m.processNextRefilter(ctx) {
}
}
func (m *GVExclusionManager) processNextRefilter(ctx context.Context) bool {
key, shutdown := m.refilterQueue.Get()
if shutdown {
return false
}
defer m.refilterQueue.Done(key)
select {
case <-ctx.Done():
return false
default:
}
m.refilterPeerDiscoveryCache()
return true
}
// refilterPeerDiscoveryCache reads the raw peer discovery cache,
// applies exclusion filtering, and stores the result to the filtered cache.
func (m *GVExclusionManager) refilterPeerDiscoveryCache() {
var cacheMap map[string]PeerDiscoveryCacheEntry
if m.rawPeerDiscoveryCache != nil {
if rawCache := m.rawPeerDiscoveryCache.Load(); rawCache != nil {
cacheMap, _ = rawCache.(map[string]PeerDiscoveryCacheEntry)
}
}
if len(cacheMap) == 0 {
m.filteredPeerDiscoveryCache.Store(map[string]PeerDiscoveryCacheEntry{})
klog.V(4).Infof("Raw peer discovery cache is empty or unavailable")
return
}
filteredCache := m.filterPeerDiscoveryCache(cacheMap)
m.filteredPeerDiscoveryCache.Store(filteredCache)
if m.invalidationCallback != nil {
if callback := m.invalidationCallback.Load(); callback != nil {
(*callback)()
}
}
klog.V(4).Infof("Peer discovery cache re-filtered, %d GVs in exclusion set", len(m.getExclusionSet()))
}
// filterPeerDiscoveryCache applies the current exclusion set to the provided peer cache.
func (m *GVExclusionManager) filterPeerDiscoveryCache(cacheMap map[string]PeerDiscoveryCacheEntry) map[string]PeerDiscoveryCacheEntry {
exclusionSet := m.getExclusionSet()
if len(exclusionSet) == 0 {
return cacheMap
}
filtered := make(map[string]PeerDiscoveryCacheEntry, len(cacheMap))
for peerID, entry := range cacheMap {
filtered[peerID] = m.filterPeerCacheEntry(entry, exclusionSet)
}
return filtered
}
// filterPeerCacheEntry filters a single peer's cache entry for excluded GVs.
func (m *GVExclusionManager) filterPeerCacheEntry(
entry PeerDiscoveryCacheEntry,
exclusionSet map[schema.GroupVersion]struct{},
) PeerDiscoveryCacheEntry {
filteredGVRs := make(map[schema.GroupVersionResource]bool, len(entry.GVRs))
anyExcluded := false
for existingGVR, v := range entry.GVRs {
gv := schema.GroupVersion{Group: existingGVR.Group, Version: existingGVR.Version}
if _, excluded := exclusionSet[gv]; excluded {
anyExcluded = true
continue
}
filteredGVRs[existingGVR] = v
}
// If no GVRs were excluded, the exclusion set doesn't intersect with this peer's GVs,
// so we can return the entry unchanged without filtering GroupDiscovery.
if !anyExcluded {
return entry
}
filteredGroups := m.filterGroupDiscovery(entry.GroupDiscovery, exclusionSet)
return PeerDiscoveryCacheEntry{
GVRs: filteredGVRs,
GroupDiscovery: filteredGroups,
}
}
// filterGroupDiscovery filters group discovery entries, removing excluded GVs.
func (m *GVExclusionManager) filterGroupDiscovery(
groupDiscoveries []apidiscoveryv2.APIGroupDiscovery,
exclusionSet map[schema.GroupVersion]struct{},
) []apidiscoveryv2.APIGroupDiscovery {
var filteredDiscovery []apidiscoveryv2.APIGroupDiscovery
for _, groupDiscovery := range groupDiscoveries {
filteredGroup := apidiscoveryv2.APIGroupDiscovery{
ObjectMeta: groupDiscovery.ObjectMeta,
}
for _, version := range groupDiscovery.Versions {
gv := schema.GroupVersion{Group: groupDiscovery.Name, Version: version.Version}
if _, found := exclusionSet[gv]; found {
// This version is excluded, skip it
continue
}
filteredGroup.Versions = append(filteredGroup.Versions, version)
}
// Only add the group to the final list if it still has any versions left
if len(filteredGroup.Versions) > 0 {
filteredDiscovery = append(filteredDiscovery, filteredGroup)
}
}
return filteredDiscovery
}
func (m *GVExclusionManager) loadCurrentlyActiveGVs() map[schema.GroupVersion]struct{} {
if val := m.currentlyActiveGVs.Load(); val != nil {
if gvMap, ok := val.(map[schema.GroupVersion]struct{}); ok {
return gvMap
}
}
return map[schema.GroupVersion]struct{}{}
}
func (m *GVExclusionManager) loadRecentlyDeletedGVs() map[schema.GroupVersion]time.Time {
if val := m.recentlyDeletedGVs.Load(); val != nil {
if gvMap, ok := val.(map[schema.GroupVersion]time.Time); ok {
return gvMap
}
}
return map[schema.GroupVersion]time.Time{}
}
// TriggerRefilter triggers the refilter worker to apply exclusions to the filtered cache.
// This should be called by peerLeaseQueue after updating the raw peer discovery cache.
func (m *GVExclusionManager) TriggerRefilter() {
m.refilterQueue.Add("refilter")
}

View file

@ -0,0 +1,589 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package peerproxy
import (
"slices"
"sync/atomic"
"testing"
"time"
"k8s.io/apimachinery/pkg/runtime/schema"
apidiscoveryv2 "k8s.io/api/apidiscovery/v2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func TestGetExclusionSet(t *testing.T) {
tests := []struct {
name string
activeGVs map[schema.GroupVersion]struct{}
deletedGVs map[schema.GroupVersion]time.Time
wantGVs []schema.GroupVersion
}{
{
name: "empty state",
activeGVs: nil,
deletedGVs: nil,
wantGVs: []schema.GroupVersion{},
},
{
name: "only active GVs",
activeGVs: map[schema.GroupVersion]struct{}{
{Group: "apps", Version: "v1"}: {},
{Group: "batch", Version: "v1"}: {},
{Group: "custom", Version: "v1alpha1"}: {},
},
wantGVs: []schema.GroupVersion{
{Group: "apps", Version: "v1"},
{Group: "batch", Version: "v1"},
{Group: "custom", Version: "v1alpha1"},
},
},
{
name: "only deleted GVs",
deletedGVs: map[schema.GroupVersion]time.Time{
{Group: "deprecated", Version: "v1beta1"}: time.Now(),
{Group: "legacy", Version: "v1alpha1"}: time.Now(),
},
// Reaper hasnt removed deleted GVs yet.
wantGVs: []schema.GroupVersion{
{Group: "deprecated", Version: "v1beta1"},
{Group: "legacy", Version: "v1alpha1"},
},
},
{
name: "different active and deleted GVs",
activeGVs: map[schema.GroupVersion]struct{}{
{Group: "apps", Version: "v1"}: {},
{Group: "batch", Version: "v1"}: {},
},
deletedGVs: map[schema.GroupVersion]time.Time{
{Group: "old", Version: "v1"}: time.Now(),
},
// Include both active GVs and recently deleted GVs.
// Deleted GVs remain in the exclusion set until the reaper removes them after the grace period.
wantGVs: []schema.GroupVersion{
{Group: "apps", Version: "v1"},
{Group: "batch", Version: "v1"},
{Group: "old", Version: "v1"},
},
},
{
// A GV can appear in both active and deleted sets if:
// 1. CRD was deleted (moved from active to deleted)
// 2. CRD was recreated (added back to active)
// 3. Reaper hasn't cleaned up the deleted entry yet
name: "same GV in both active and deleted",
activeGVs: map[schema.GroupVersion]struct{}{
{Group: "apps", Version: "v1"}: {},
},
deletedGVs: map[schema.GroupVersion]time.Time{
{Group: "apps", Version: "v1"}: time.Now(),
},
wantGVs: []schema.GroupVersion{
{Group: "apps", Version: "v1"},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mgr := NewGVExclusionManager(5*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{})
if tt.activeGVs != nil {
mgr.currentlyActiveGVs.Store(tt.activeGVs)
}
if tt.deletedGVs != nil {
mgr.recentlyDeletedGVs.Store(tt.deletedGVs)
}
exclusionSet := mgr.getExclusionSet()
if len(exclusionSet) != len(tt.wantGVs) {
t.Errorf("Want exclusion set size %d, got %d", len(tt.wantGVs), len(exclusionSet))
}
for _, gv := range tt.wantGVs {
if _, found := exclusionSet[gv]; !found {
t.Errorf("Want GV %v in exclusion set, but not found", gv)
}
}
})
}
}
func TestReapExpiredGVs(t *testing.T) {
tests := []struct {
name string
gracePeriod time.Duration
deletedGVs map[schema.GroupVersion]time.Time
wantReaped []schema.GroupVersion
}{
{
name: "empty state",
gracePeriod: 100 * time.Millisecond,
deletedGVs: map[schema.GroupVersion]time.Time{},
wantReaped: []schema.GroupVersion{},
},
{
name: "reap old GV keep recent",
gracePeriod: 100 * time.Millisecond,
deletedGVs: map[schema.GroupVersion]time.Time{
{Group: "old", Version: "v1"}: time.Now().Add(-200 * time.Millisecond),
{Group: "recent", Version: "v1"}: time.Now().Add(-50 * time.Millisecond),
{Group: "new", Version: "v1"}: time.Now(),
},
wantReaped: []schema.GroupVersion{
{Group: "old", Version: "v1"},
},
},
{
name: "all expired",
gracePeriod: 50 * time.Millisecond,
deletedGVs: map[schema.GroupVersion]time.Time{
{Group: "old1", Version: "v1"}: time.Now().Add(-100 * time.Millisecond),
{Group: "old2", Version: "v1"}: time.Now().Add(-200 * time.Millisecond),
},
wantReaped: []schema.GroupVersion{
{Group: "old1", Version: "v1"},
{Group: "old2", Version: "v1"},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mgr := NewGVExclusionManager(tt.gracePeriod, &atomic.Value{}, &atomic.Pointer[func()]{})
mgr.recentlyDeletedGVs.Store(tt.deletedGVs)
mgr.updateRecentlyDeletedGVs(nil)
result := mgr.loadRecentlyDeletedGVs()
for _, gv := range tt.wantReaped {
if _, found := result[gv]; found {
t.Errorf("GV %v should have been reaped but still exists", gv)
}
}
})
}
}
func TestDetectDiff(t *testing.T) {
_ = NewGVExclusionManager(5*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{})
tests := []struct {
name string
old map[schema.GroupVersion]struct{}
new map[schema.GroupVersion]struct{}
wantChanged bool
wantDeleted []schema.GroupVersion
}{
{
name: "both empty",
old: map[schema.GroupVersion]struct{}{},
new: map[schema.GroupVersion]struct{}{},
wantChanged: false,
wantDeleted: nil,
},
{
name: "identical",
old: map[schema.GroupVersion]struct{}{
{Group: "apps", Version: "v1"}: {},
},
new: map[schema.GroupVersion]struct{}{
{Group: "apps", Version: "v1"}: {},
},
wantChanged: false,
wantDeleted: nil,
},
{
name: "added GV",
old: map[schema.GroupVersion]struct{}{
{Group: "apps", Version: "v1"}: {},
},
new: map[schema.GroupVersion]struct{}{
{Group: "apps", Version: "v1"}: {},
{Group: "batch", Version: "v1"}: {},
},
wantChanged: true,
wantDeleted: nil, // No deletions, only addition
},
{
name: "removed GV",
old: map[schema.GroupVersion]struct{}{
{Group: "apps", Version: "v1"}: {},
{Group: "batch", Version: "v1"}: {},
},
new: map[schema.GroupVersion]struct{}{
{Group: "apps", Version: "v1"}: {},
},
wantChanged: true,
wantDeleted: []schema.GroupVersion{
{Group: "batch", Version: "v1"},
},
},
{
name: "different GVs same size",
old: map[schema.GroupVersion]struct{}{
{Group: "apps", Version: "v1"}: {},
},
new: map[schema.GroupVersion]struct{}{
{Group: "batch", Version: "v1"}: {},
},
wantChanged: true,
wantDeleted: []schema.GroupVersion{
{Group: "apps", Version: "v1"},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
deletedGVs, changed := diffGVs(tt.old, tt.new)
if changed != tt.wantChanged {
t.Errorf("diffGVs() changed = %v, want %v", changed, tt.wantChanged)
}
if len(deletedGVs) != len(tt.wantDeleted) {
t.Errorf("diffGVs() deleted count = %d, want %d", len(deletedGVs), len(tt.wantDeleted))
}
for _, wantGV := range tt.wantDeleted {
if !slices.Contains(deletedGVs, wantGV) {
t.Errorf("diffGVs() deleted missing GV %v", wantGV)
}
}
})
}
}
func TestFilterPeerDiscoveryCache(t *testing.T) {
tests := []struct {
name string
activeGVs map[schema.GroupVersion]struct{}
deletedGVs map[schema.GroupVersion]time.Time
cacheMap map[string]PeerDiscoveryCacheEntry
wantPeerGVRs map[string]int // peer name -> GVR count
}{
{
name: "empty exclusion no changes",
activeGVs: nil,
cacheMap: map[string]PeerDiscoveryCacheEntry{
"peer1": {
GVRs: map[schema.GroupVersionResource]bool{
{Group: "apps", Version: "v1", Resource: "deployments"}: true,
},
},
"peer2": {
GVRs: map[schema.GroupVersionResource]bool{
{Group: "batch", Version: "v1", Resource: "jobs"}: true,
},
},
},
wantPeerGVRs: map[string]int{
"peer1": 1,
"peer2": 1,
},
},
{
name: "filter active GVs",
activeGVs: map[schema.GroupVersion]struct{}{
{Group: "apps", Version: "v1"}: {},
},
cacheMap: map[string]PeerDiscoveryCacheEntry{
"peer1": {
GVRs: map[schema.GroupVersionResource]bool{
{Group: "apps", Version: "v1", Resource: "deployments"}: true,
},
},
"peer2": {
GVRs: map[schema.GroupVersionResource]bool{
{Group: "batch", Version: "v1", Resource: "jobs"}: true,
},
},
},
wantPeerGVRs: map[string]int{
"peer1": 0, // apps/v1 filtered out
"peer2": 1, // unchanged
},
},
{
// Recently deleted GVs are still filtered from peer discovery during the
// grace period (before the reaper cleans them up) to avoid routing requests
// to peers for GVs that were just deleted locally.
name: "filter deleted GVs",
deletedGVs: map[schema.GroupVersion]time.Time{
{Group: "custom", Version: "v1alpha1"}: time.Now(),
},
cacheMap: map[string]PeerDiscoveryCacheEntry{
"peer1": {
GVRs: map[schema.GroupVersionResource]bool{
{Group: "custom", Version: "v1alpha1", Resource: "myresources"}: true,
},
},
"peer2": {
GVRs: map[schema.GroupVersionResource]bool{
{Group: "apps", Version: "v1", Resource: "deployments"}: true,
},
},
},
wantPeerGVRs: map[string]int{
"peer1": 0, // custom/v1alpha1 filtered out
"peer2": 1, // unchanged
},
},
{
// Both active GVs and recently deleted GVs (within grace period, not yet
// cleaned up by the reaper) are filtered from peer discovery.
name: "filter both active and deleted GVs",
activeGVs: map[schema.GroupVersion]struct{}{
{Group: "apps", Version: "v1"}: {},
},
deletedGVs: map[schema.GroupVersion]time.Time{
{Group: "custom", Version: "v1alpha1"}: time.Now(),
},
cacheMap: map[string]PeerDiscoveryCacheEntry{
"peer1": {
GVRs: map[schema.GroupVersionResource]bool{
{Group: "apps", Version: "v1", Resource: "deployments"}: true,
{Group: "custom", Version: "v1alpha1", Resource: "myresources"}: true,
{Group: "batch", Version: "v1", Resource: "jobs"}: true,
},
},
},
wantPeerGVRs: map[string]int{
"peer1": 1, // apps/v1 and custom/v1alpha1 filtered out, batch/v1 remains
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mgr := NewGVExclusionManager(5*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{})
if tt.activeGVs != nil {
mgr.currentlyActiveGVs.Store(tt.activeGVs)
}
if tt.deletedGVs != nil {
mgr.recentlyDeletedGVs.Store(tt.deletedGVs)
}
filtered := mgr.filterPeerDiscoveryCache(tt.cacheMap)
if len(filtered) != len(tt.cacheMap) {
t.Errorf("Want %d peers in filtered cache, got %d", len(tt.cacheMap), len(filtered))
}
for peerName, wantCount := range tt.wantPeerGVRs {
entry, found := filtered[peerName]
if !found {
t.Errorf("Peer %s not found in filtered cache", peerName)
continue
}
if len(entry.GVRs) != wantCount {
t.Errorf("Peer %s: want %d GVRs, got %d", peerName, wantCount, len(entry.GVRs))
}
}
})
}
}
func TestFilterPeerCacheEntry(t *testing.T) {
tests := []struct {
name string
entry PeerDiscoveryCacheEntry
exclusionSet map[schema.GroupVersion]struct{}
wantGVRs []schema.GroupVersionResource
}{
{
name: "empty exclusion set returns entry unchanged",
entry: PeerDiscoveryCacheEntry{
GVRs: map[schema.GroupVersionResource]bool{
{Group: "apps", Version: "v1", Resource: "deployments"}: true,
{Group: "batch", Version: "v1", Resource: "jobs"}: true,
},
GroupDiscovery: []apidiscoveryv2.APIGroupDiscovery{
{
ObjectMeta: metav1.ObjectMeta{Name: "apps"},
Versions: []apidiscoveryv2.APIVersionDiscovery{
{Version: "v1"},
},
},
},
},
exclusionSet: map[schema.GroupVersion]struct{}{},
wantGVRs: []schema.GroupVersionResource{
{Group: "apps", Version: "v1", Resource: "deployments"},
{Group: "batch", Version: "v1", Resource: "jobs"},
},
},
{
name: "filter GVRs and groups",
entry: PeerDiscoveryCacheEntry{
GVRs: map[schema.GroupVersionResource]bool{
{Group: "apps", Version: "v1", Resource: "deployments"}: true,
{Group: "batch", Version: "v1", Resource: "jobs"}: true,
{Group: "custom", Version: "v1", Resource: "myresources"}: true,
},
GroupDiscovery: []apidiscoveryv2.APIGroupDiscovery{
{
ObjectMeta: metav1.ObjectMeta{Name: "apps"},
Versions: []apidiscoveryv2.APIVersionDiscovery{
{Version: "v1"},
},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "batch"},
Versions: []apidiscoveryv2.APIVersionDiscovery{
{Version: "v1"},
},
},
},
},
exclusionSet: map[schema.GroupVersion]struct{}{
{Group: "apps", Version: "v1"}: {},
},
wantGVRs: []schema.GroupVersionResource{
{Group: "batch", Version: "v1", Resource: "jobs"},
{Group: "custom", Version: "v1", Resource: "myresources"},
},
},
{
name: "no changes when exclusion doesn't match",
entry: PeerDiscoveryCacheEntry{
GVRs: map[schema.GroupVersionResource]bool{
{Group: "batch", Version: "v1", Resource: "jobs"}: true,
},
},
exclusionSet: map[schema.GroupVersion]struct{}{
{Group: "apps", Version: "v1"}: {},
},
wantGVRs: []schema.GroupVersionResource{
{Group: "batch", Version: "v1", Resource: "jobs"},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mgr := NewGVExclusionManager(5*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{})
filtered := mgr.filterPeerCacheEntry(tt.entry, tt.exclusionSet)
for _, gvr := range tt.wantGVRs {
if _, found := filtered.GVRs[gvr]; !found {
t.Errorf("Want GVR %v to be present", gvr)
}
}
})
}
}
func TestFilterGroupDiscovery(t *testing.T) {
tests := []struct {
name string
groupDiscoveries []apidiscoveryv2.APIGroupDiscovery
exclusionSet map[schema.GroupVersion]struct{}
wantGroups map[string][]string // group name -> list of versions
}{
{
name: "partial version exclusion",
groupDiscoveries: []apidiscoveryv2.APIGroupDiscovery{
{
ObjectMeta: metav1.ObjectMeta{Name: "apps"},
Versions: []apidiscoveryv2.APIVersionDiscovery{
{Version: "v1"},
{Version: "v1beta1"},
},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "batch"},
Versions: []apidiscoveryv2.APIVersionDiscovery{
{Version: "v1"},
},
},
},
exclusionSet: map[schema.GroupVersion]struct{}{
{Group: "apps", Version: "v1"}: {},
},
wantGroups: map[string][]string{
"apps": {"v1beta1"},
"batch": {"v1"},
},
},
{
name: "all versions excluded",
groupDiscoveries: []apidiscoveryv2.APIGroupDiscovery{
{
ObjectMeta: metav1.ObjectMeta{Name: "apps"},
Versions: []apidiscoveryv2.APIVersionDiscovery{
{Version: "v1"},
},
},
{
ObjectMeta: metav1.ObjectMeta{Name: "batch"},
Versions: []apidiscoveryv2.APIVersionDiscovery{
{Version: "v1"},
},
},
},
exclusionSet: map[schema.GroupVersion]struct{}{
{Group: "apps", Version: "v1"}: {},
},
wantGroups: map[string][]string{
"batch": {"v1"},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mgr := NewGVExclusionManager(5*time.Minute, &atomic.Value{}, &atomic.Pointer[func()]{})
filtered := mgr.filterGroupDiscovery(tt.groupDiscoveries, tt.exclusionSet)
for groupName, wantVersionStrs := range tt.wantGroups {
var group *apidiscoveryv2.APIGroupDiscovery
for i := range filtered {
if filtered[i].Name == groupName {
group = &filtered[i]
break
}
}
if group == nil {
t.Errorf("Want group %s not found in filtered results", groupName)
continue
}
if len(group.Versions) != len(wantVersionStrs) {
t.Errorf("Group %s: want %d versions, got %d", groupName, len(wantVersionStrs), len(group.Versions))
continue
}
for _, wantVer := range wantVersionStrs {
found := false
for _, ver := range group.Versions {
if ver.Version == wantVer {
found = true
break
}
}
if !found {
t.Errorf("Group %s: want version %s not found", groupName, wantVer)
}
}
}
})
}
}

View file

@ -115,25 +115,13 @@ func (h *peerProxyHandler) syncPeerDiscoveryCache(ctx context.Context) error {
}
}
// Apply exclusion filter to the cache.
if len(newCache) != 0 {
if filteredCache, peerDiscoveryChanged := h.filterPeerDiscoveryCache(newCache); peerDiscoveryChanged {
newCache = filteredCache
}
}
h.storePeerDiscoveryCacheAndInvalidate(newCache)
// Store unfiltered data to raw cache and trigger refilter.
// The refilter worker (single writer to filtered cache) will apply exclusions.
h.rawPeerDiscoveryCache.Store(newCache)
h.gvExclusionManager.TriggerRefilter()
return fetchDiscoveryErr
}
// storePeerDiscoveryCacheAndInvalidate stores the new peer discovery cache and always calls the invalidation callback if set.
func (h *peerProxyHandler) storePeerDiscoveryCacheAndInvalidate(newCache map[string]PeerDiscoveryCacheEntry) {
h.peerDiscoveryInfoCache.Store(newCache)
if callback := h.cacheInvalidationCallback.Load(); callback != nil {
(*callback)()
}
}
func (h *peerProxyHandler) fetchNewDiscoveryFor(ctx context.Context, serverID string) (PeerDiscoveryCacheEntry, error) {
hostport, err := h.hostportInfo(serverID)
if err != nil {
@ -287,14 +275,8 @@ func (h *peerProxyHandler) isValidPeerIdentityLease(obj interface{}) (*v1.Lease,
func (h *peerProxyHandler) findServiceableByPeerFromPeerDiscoveryCache(gvr schema.GroupVersionResource) []string {
var serviceableByIDs []string
cache := h.peerDiscoveryInfoCache.Load()
if cache == nil {
return serviceableByIDs
}
cacheMap, ok := cache.(map[string]PeerDiscoveryCacheEntry)
if !ok {
klog.Warning("Invalid cache type in peerDiscoveryInfoCache")
cacheMap := h.gvExclusionManager.GetFilteredPeerDiscoveryCache()
if len(cacheMap) == 0 {
return serviceableByIDs
}

View file

@ -192,6 +192,7 @@ func TestRunPeerDiscoveryCacheSync(t *testing.T) {
}
go h.RunPeerDiscoveryCacheSync(ctx, 1)
go h.RunPeerDiscoveryRefilter(ctx)
// Wait for initial cache update.
initialCache := map[string]PeerDiscoveryCacheEntry{}
@ -204,7 +205,7 @@ func TestRunPeerDiscoveryCacheSync(t *testing.T) {
return false, ctx.Err()
default:
}
gotCache := h.peerDiscoveryInfoCache.Load()
gotCache := h.gvExclusionManager.GetFilteredPeerDiscoveryCache()
return assert.ObjectsAreEqual(initialCache, gotCache), nil
})
if err != nil {
@ -251,7 +252,7 @@ func TestRunPeerDiscoveryCacheSync(t *testing.T) {
return false, ctx.Err()
default:
}
gotCache := h.peerDiscoveryInfoCache.Load()
gotCache := h.gvExclusionManager.GetFilteredPeerDiscoveryCache()
r := assert.ObjectsAreEqual(tt.wantCache, gotCache)
return r, nil
})

View file

@ -49,15 +49,12 @@ const (
// available on this server.
localDiscoveryRefreshInterval = 30 * time.Minute
// defaultExclusionGracePeriod is the default duration to wait before
// removing a group from the exclusion set after it is deleted from
// removing a groupversion from the exclusion set after it is deleted from
// CRDs and aggregated APIs.
// This is to allow time for all peer API servers to also observe
// the deleted CRD or aggregated API before this server stops excluding it
// in peer-aggregated discovery and while proxying requests to peers.
defaultExclusionGracePeriod = 5 * time.Minute
// defaultExclusionReaperInterval is the interval at which the we
// clean up deleted groups from the exclusion list.
defaultExclusionReaperInterval = 1 * time.Minute
)
// Interface defines how the Mixed Version Proxy filter interacts with the underlying system.
@ -67,12 +64,49 @@ type Interface interface {
HasFinishedSync() bool
RunLocalDiscoveryCacheSync(stopCh <-chan struct{}) error
RunPeerDiscoveryCacheSync(ctx context.Context, workers int)
RunExcludedGVsReaper(stopCh <-chan struct{})
RunGVDeletionWorkers(ctx context.Context, workers int)
GetPeerResources() map[string][]apidiscoveryv2.APIGroupDiscovery
RegisterCacheInvalidationCallback(cb func())
// RegisterCRDInformerHandlers registers event handlers on the CRD informer to track
// which GroupVersions are served locally by CRDs. When a CRD is created or updated,
// its GV is added to the exclusion set. When deleted, the GV is marked for exclusion
// during a grace period to allow peers to observe the deletion. The extractor function
// extracts the GroupVersion from a CRD object.
//
// This exclusion is necessary because peer discovery is not refreshed when a local
// CRD is deleted. Without exclusion, the deleted GV might still appear in cached peer
// discovery data, causing requests to be incorrectly routed to a peer for a GV that
// no longer exists locally. Therefore, we intentionally exclude CRD GVs from peer
// discovery from the start and only rely on the local apiserver's view of the CRD
// to serve it in peer-aggregated discovery.
RegisterCRDInformerHandlers(crdInformer cache.SharedIndexInformer, extractor GVExtractor) error
// RegisterAPIServiceInformerHandlers registers event handlers on the APIService informer
// to track which GroupVersions are served locally by aggregated APIServices. When an
// APIService is created or updated, its GV is added to the exclusion set. When deleted,
// the GV is marked for exclusion during a grace period.
//
// This exclusion is necessary because peer discovery is not refreshed when a local
// aggregated APIService is deleted. Without exclusion, the deleted GV might still appear
// in cached peer discovery data, causing requests to be incorrectly routed to a peer.
// Therefore, we intentionally exclude aggregated APIService GVs from peer discovery
// from the start and only rely on the local apiserver's view to serve them in
// peer-aggregated discovery.
RegisterAPIServiceInformerHandlers(apiServiceInformer cache.SharedIndexInformer, extractor GVExtractor) error
// RunPeerDiscoveryActiveGVTracker starts a worker that processes CRD/APIService informer
// events to rebuild the set of actively served GroupVersions. This worker is triggered
// whenever a CRD or APIService is added or updated and updates the exclusion
// set accordingly. When a GV is deleted, a delayed sync is automatically scheduled
// after the grace period to reap expired GVs from the exclusion set.
RunPeerDiscoveryActiveGVTracker(ctx context.Context)
// RunPeerDiscoveryRefilter starts a worker that re-applies exclusion filtering to the
// cached peer discovery data whenever the exclusion set changes. This ensures that
// already-cached peer discovery responses are immediately updated to exclude newly added
// or updated local GVs, rather than waiting for the next peer lease event to trigger a
// cache refresh of peer discovery data.
RunPeerDiscoveryRefilter(ctx context.Context)
}
// New creates a new instance to implement unknown version proxy
@ -95,23 +129,21 @@ func NewPeerProxyHandler(
localDiscoveryInfoCache: atomic.Value{},
localDiscoveryCacheTicker: time.NewTicker(localDiscoveryRefreshInterval),
localDiscoveryInfoCachePopulated: make(chan struct{}),
peerDiscoveryInfoCache: atomic.Value{},
rawPeerDiscoveryCache: atomic.Value{},
peerLeaseQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{
Name: peerDiscoveryControllerName,
}),
apiserverIdentityInformer: leaseInformer,
excludedGVs: make(map[schema.GroupVersion]*time.Time),
exclusionGracePeriod: defaultExclusionGracePeriod,
reaperCheckInterval: defaultExclusionReaperInterval,
gvDeletionQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{
Name: "gv-deletion",
}),
}
h.gvExclusionManager = NewGVExclusionManager(
defaultExclusionGracePeriod,
&h.rawPeerDiscoveryCache,
&h.cacheInvalidationCallback,
)
if parts := strings.Split(identityLeaseLabelSelector, "="); len(parts) != 2 {
return nil, fmt.Errorf("invalid identityLeaseLabelSelector provided, must be of the form key=value, received: %v", identityLeaseLabelSelector)
}
@ -134,7 +166,7 @@ func NewPeerProxyHandler(
discoveryClient.NoPeerDiscovery = true
h.discoveryClient = discoveryClient
h.localDiscoveryInfoCache.Store(map[schema.GroupVersionResource]bool{})
h.peerDiscoveryInfoCache.Store(map[string]PeerDiscoveryCacheEntry{})
h.rawPeerDiscoveryCache.Store(map[string]PeerDiscoveryCacheEntry{})
proxyTransport, err := transport.New(proxyClientConfig)
if err != nil {
@ -169,3 +201,33 @@ func NewPeerProxyHandler(
h.leaseRegistration = peerDiscoveryRegistration
return h, nil
}
// RegisterCRDInformerHandlers registers event handlers for CRD informer.
func (h *peerProxyHandler) RegisterCRDInformerHandlers(crdInformer cache.SharedIndexInformer, extractor GVExtractor) error {
if h.gvExclusionManager != nil {
return h.gvExclusionManager.RegisterCRDInformerHandlers(crdInformer, extractor)
}
return nil
}
// RegisterAPIServiceInformerHandlers registers event handlers for APIService informer.
func (h *peerProxyHandler) RegisterAPIServiceInformerHandlers(apiServiceInformer cache.SharedIndexInformer, extractor GVExtractor) error {
if h.gvExclusionManager != nil {
return h.gvExclusionManager.RegisterAPIServiceInformerHandlers(apiServiceInformer, extractor)
}
return nil
}
// RunPeerDiscoveryActiveGVTracker starts the worker that tracks active GVs from CRDs/APIServices.
func (h *peerProxyHandler) RunPeerDiscoveryActiveGVTracker(ctx context.Context) {
if h.gvExclusionManager != nil {
h.gvExclusionManager.RunPeerDiscoveryActiveGVTracker(ctx)
}
}
// RunPeerDiscoveryRefilter starts the worker that refilters peer discovery cache.
func (h *peerProxyHandler) RunPeerDiscoveryRefilter(ctx context.Context) {
if h.gvExclusionManager != nil {
h.gvExclusionManager.RunPeerDiscoveryRefilter(ctx)
}
}

View file

@ -79,36 +79,17 @@ type peerProxyHandler struct {
localDiscoveryCacheTicker *time.Ticker
localDiscoveryInfoCachePopulated chan struct{}
localDiscoveryInfoCachePopulatedOnce sync.Once
// Cache that stores resources and groups served by peer apiservers.
// The map is from string to PeerDiscoveryCacheEntry where the string
// is the serverID of the peer apiserver.
// Refreshed if a new apiserver identity lease is added, deleted or
// holderIndentity change is observed in the lease.
peerDiscoveryInfoCache atomic.Value // map[string]struct{GVRs map[schema.GroupVersionResource]bool; Groups []apidiscoveryv2.APIGroupDiscovery}
proxyTransport http.RoundTripper
// Worker queue that keeps the peerDiscoveryInfoCache up-to-date.
// rawPeerDiscoveryCache stores unfiltered resources and groups served by peer apiservers.
// The map is from string (serverID) to PeerDiscoveryCacheEntry.
// Written ONLY by peerLeaseQueue worker when peer leases change.
rawPeerDiscoveryCache atomic.Value // map[string]PeerDiscoveryCacheEntry
proxyTransport http.RoundTripper
// Worker queue that keeps the rawPeerDiscoveryCache up-to-date.
peerLeaseQueue workqueue.TypedRateLimitingInterface[string]
serializer runtime.NegotiatedSerializer
cacheInvalidationCallback atomic.Pointer[func()]
// Exclusion set for groups that should not be included in peer proxying
// or peer-aggregated discovery (e.g., CRDs/APIServices)
//
// This map has three states for a group:
// - Not in map: Group is not excluded.
// - In map, value is nil: Group is actively excluded.
// - In map, value is non-nil: Group is pending deletion (grace period).
excludedGVs map[schema.GroupVersion]*time.Time
excludedGVsMu sync.RWMutex
crdInformer cache.SharedIndexInformer
crdExtractor GVExtractor
apiServiceInformer cache.SharedIndexInformer
apiServiceExtractor GVExtractor
exclusionGracePeriod time.Duration
reaperCheckInterval time.Duration
// Worker queue for processing GV deletions asynchronously
gvDeletionQueue workqueue.TypedRateLimitingInterface[string]
// Manager for GV exclusions (CRDs/APIServices)
gvExclusionManager *GVExclusionManager
}
// PeerDiscoveryCacheEntry holds the GVRs and group-level discovery info for a peer.
@ -141,12 +122,10 @@ func (h *peerProxyHandler) WaitForCacheSync(stopCh <-chan struct{}) error {
return fmt.Errorf("error while waiting for peer-identity-lease event handler registration sync")
}
if h.crdInformer != nil && !cache.WaitForNamedCacheSync("peer-discovery-crd-informer", stopCh, h.crdInformer.HasSynced) {
return fmt.Errorf("error while waiting for crd informer sync")
}
if h.apiServiceInformer != nil && !cache.WaitForNamedCacheSync("peer-discovery-api-service-informer", stopCh, h.apiServiceInformer.HasSynced) {
return fmt.Errorf("error while waiting for apiservice informer sync")
if h.gvExclusionManager != nil {
if !h.gvExclusionManager.WaitForCacheSync(stopCh) {
return fmt.Errorf("error while waiting for gv exclusion manager cache sync")
}
}
// Wait for localDiscoveryInfoCache to be populated.
@ -303,16 +282,9 @@ func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) {
// Returns a map of serverID -> []apidiscoveryv2.APIGroupDiscovery served by peer servers
func (h *peerProxyHandler) GetPeerResources() map[string][]apidiscoveryv2.APIGroupDiscovery {
result := make(map[string][]apidiscoveryv2.APIGroupDiscovery)
peerCache := h.peerDiscoveryInfoCache.Load()
if peerCache == nil {
klog.V(4).Infof("GetPeerResources: peer cache is nil")
return result
}
cacheMap, ok := peerCache.(map[string]PeerDiscoveryCacheEntry)
if !ok {
klog.Warning("Invalid cache type in peerDiscoveryGVRCache")
cacheMap := h.gvExclusionManager.GetFilteredPeerDiscoveryCache()
if len(cacheMap) == 0 {
klog.V(4).Infof("GetPeerResources: peer cache is empty")
return result
}

View file

@ -387,7 +387,7 @@ func newFakePeerProxyHandler(informerFinishedSync bool,
return nil, err
}
ppH.localDiscoveryInfoCache.Store(localCache)
ppH.peerDiscoveryInfoCache.Store(peerCache)
ppH.gvExclusionManager.filteredPeerDiscoveryCache.Store(peerCache)
ppH.finishedSync.Store(informerFinishedSync)
return ppH, nil