From 9fb9e933b2ebae2d94edd29b7dec2c1df9e625a6 Mon Sep 17 00:00:00 2001 From: Michael Aspinwall Date: Wed, 4 Mar 2026 22:13:54 +0000 Subject: [PATCH] Add metric tracking the latest cached rv of informers --- .../client-go/tools/cache/controller.go | 12 +- .../client-go/tools/cache/fifo_metrics.go | 55 ++- .../client-go/tools/cache/shared_informer.go | 14 +- .../src/k8s.io/client-go/tools/cache/store.go | 35 +- .../client-go/tools/cache/the_real_fifo.go | 2 +- .../tools/cache/thread_safe_store.go | 68 +++- .../prometheus/clientgo/fifo/metrics.go | 30 +- .../client/metrics/fifo_metrics_test.go | 342 ++++++++++++------ 8 files changed, 408 insertions(+), 150 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/cache/controller.go b/staging/src/k8s.io/client-go/tools/cache/controller.go index 55ab62c763d..b78ed522e24 100644 --- a/staging/src/k8s.io/client-go/tools/cache/controller.go +++ b/staging/src/k8s.io/client-go/tools/cache/controller.go @@ -452,9 +452,9 @@ type InformerOptions struct { // If not set, metrics will not be published. Identifier InformerNameAndResource - // FIFOMetricsProvider is the metrics provider for the FIFO queue. + // InformerMetricsProvider is the metrics provider for the informer. // If not set, metrics will be no-ops. - FIFOMetricsProvider FIFOMetricsProvider + InformerMetricsProvider InformerMetricsProvider } // NewInformerWithOptions returns a Store and a controller for populating the store @@ -464,9 +464,9 @@ type InformerOptions struct { func NewInformerWithOptions(options InformerOptions) (Store, Controller) { var clientState Store if options.Indexers == nil { - clientState = NewStore(DeletionHandlingMetaNamespaceKeyFunc) + clientState = NewStore(DeletionHandlingMetaNamespaceKeyFunc, WithStoreMetrics(options.Identifier, options.InformerMetricsProvider)) } else { - clientState = NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers) + clientState = NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers, WithStoreMetrics(options.Identifier, options.InformerMetricsProvider)) } return clientState, newInformer(clientState, options, DeletionHandlingMetaNamespaceKeyFunc) } @@ -814,7 +814,7 @@ func newInformer(clientState Store, options InformerOptions, keyFunc KeyFunc) Co if options.Logger != nil { logger = *options.Logger } - logger, fifo := newQueueFIFO(logger, options.ObjectType, clientState, options.Transform, options.Identifier, options.FIFOMetricsProvider) + logger, fifo := newQueueFIFO(logger, options.ObjectType, clientState, options.Transform, options.Identifier, options.InformerMetricsProvider) cfg := &Config{ Queue: fifo, @@ -844,7 +844,7 @@ func newInformer(clientState Store, options InformerOptions, keyFunc KeyFunc) Co // It returns the FIFO and the logger used by the FIFO. // That logger includes the name used for the FIFO, // in contrast to the logger which was passed in. -func newQueueFIFO(logger klog.Logger, objectType any, clientState Store, transform TransformFunc, identifier InformerNameAndResource, metricsProvider FIFOMetricsProvider) (klog.Logger, Queue) { +func newQueueFIFO(logger klog.Logger, objectType any, clientState Store, transform TransformFunc, identifier InformerNameAndResource, metricsProvider InformerMetricsProvider) (klog.Logger, Queue) { if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) { options := RealFIFOOptions{ Logger: &logger, diff --git a/staging/src/k8s.io/client-go/tools/cache/fifo_metrics.go b/staging/src/k8s.io/client-go/tools/cache/fifo_metrics.go index 9e8ee069b2a..7a678172409 100644 --- a/staging/src/k8s.io/client-go/tools/cache/fifo_metrics.go +++ b/staging/src/k8s.io/client-go/tools/cache/fifo_metrics.go @@ -28,14 +28,14 @@ import ( ) var ( - globalFIFOMetricsProvider FIFOMetricsProvider = noopFIFOMetricsProvider{} - setFIFOMetricsProviderOnce sync.Once + globalInformerMetricsProvider InformerMetricsProvider = noopInformerMetricsProvider{} + setInformerMetricsProviderOnce sync.Once ) -type noopFIFOMetricsProvider struct{} +type noopInformerMetricsProvider struct{} -// FIFOMetricsProvider defines an interface for creating metrics that track FIFO queue operations. -type FIFOMetricsProvider interface { +// InformerMetricsProvider defines an interface for creating metrics that track informer operations. +type InformerMetricsProvider interface { // NewQueuedItemMetric returns a gauge metric for tracking the total number of items // currently queued and waiting to be processed. // The returned metric should check id.Reserved() before updating to support @@ -51,6 +51,11 @@ type FIFOMetricsProvider interface { // The returned metric should check id.Reserved() before updating to support // dynamic informers that may shut down while the process is still running. NewProcessingLatencyMetric(id InformerNameAndResource) HistogramMetric + + // NewStoreResourceVersionMetric returns a gauge metric for tracking the resource version of the store. + // The returned metric should check id.Reserved() before updating to support + // dynamic informers that may shut down while the process is still running. + NewStoreResourceVersionMetric(id InformerNameAndResource) GaugeMetric } // fifoMetrics holds all metrics for a FIFO. @@ -59,17 +64,22 @@ type fifoMetrics struct { processingLatency HistogramMetric } -// SetFIFOMetricsProvider sets the metrics provider for all subsequently created +// storeMetrics holds all metrics for a store. +type storeMetrics struct { + storeResourceVersion GaugeMetric +} + +// SetInformerMetricsProvider sets the metrics provider for all subsequently created // FIFOs. Only the first call has an effect. -func SetFIFOMetricsProvider(metricsProvider FIFOMetricsProvider) { - setFIFOMetricsProviderOnce.Do(func() { - globalFIFOMetricsProvider = metricsProvider +func SetInformerMetricsProvider(metricsProvider InformerMetricsProvider) { + setInformerMetricsProviderOnce.Do(func() { + globalInformerMetricsProvider = metricsProvider }) } -func newFIFOMetrics(id InformerNameAndResource, metricsProvider FIFOMetricsProvider) *fifoMetrics { +func newFIFOMetrics(id InformerNameAndResource, metricsProvider InformerMetricsProvider) *fifoMetrics { if metricsProvider == nil { - metricsProvider = globalFIFOMetricsProvider + metricsProvider = globalInformerMetricsProvider } metrics := &fifoMetrics{ numberOfQueuedItem: noopMetric{}, @@ -84,10 +94,29 @@ func newFIFOMetrics(id InformerNameAndResource, metricsProvider FIFOMetricsProvi return metrics } -func (noopFIFOMetricsProvider) NewQueuedItemMetric(InformerNameAndResource) GaugeMetric { +func newStoreMetrics(id InformerNameAndResource, metricsProvider InformerMetricsProvider) *storeMetrics { + if metricsProvider == nil { + metricsProvider = globalInformerMetricsProvider + } + metrics := &storeMetrics{ + storeResourceVersion: noopMetric{}, + } + + if id.Reserved() { + metrics.storeResourceVersion = metricsProvider.NewStoreResourceVersionMetric(id) + } + + return metrics +} + +func (noopInformerMetricsProvider) NewQueuedItemMetric(InformerNameAndResource) GaugeMetric { return noopMetric{} } -func (noopFIFOMetricsProvider) NewProcessingLatencyMetric(InformerNameAndResource) HistogramMetric { +func (noopInformerMetricsProvider) NewProcessingLatencyMetric(InformerNameAndResource) HistogramMetric { + return noopMetric{} +} + +func (noopInformerMetricsProvider) NewStoreResourceVersionMetric(InformerNameAndResource) GaugeMetric { return noopMetric{} } diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go index 0f145b72afe..8132fa71ed4 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go @@ -322,7 +322,7 @@ func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.O processor.listenersRCond = sync.NewCond(processor.listenersLock.RLocker()) return &sharedIndexInformer{ - indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers), + indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers, WithStoreMetrics(options.Identifier, options.InformerMetricsProvider)), processor: processor, synced: make(chan struct{}), listerWatcher: lw, @@ -333,7 +333,7 @@ func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.O clock: realClock, cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)), identifier: options.Identifier, - fifoMetricsProvider: options.FIFOMetricsProvider, + informerMetricsProvider: options.InformerMetricsProvider, keyFunc: DeletionHandlingMetaNamespaceKeyFunc, } } @@ -355,9 +355,9 @@ type SharedIndexInformerOptions struct { // If not set, metrics will not be published. Identifier InformerNameAndResource - // FIFOMetricsProvider is the metrics provider for the FIFO queue. + // InformerMetricsProvider is the metrics provider for the FIFO queue. // If not set, metrics will be no-ops. - FIFOMetricsProvider FIFOMetricsProvider + InformerMetricsProvider InformerMetricsProvider } // InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced. @@ -629,8 +629,8 @@ type sharedIndexInformer struct { // identifier is used to identify this informer for metrics and logging purposes. identifier InformerNameAndResource - // fifoMetricsProvider is the metrics provider for the FIFO queue. - fifoMetricsProvider FIFOMetricsProvider + // informerMetricsProvider is the metrics provider for the FIFO queue. + informerMetricsProvider InformerMetricsProvider // keyFunc is called when processing deltas by the underlying process function. keyFunc KeyFunc @@ -728,7 +728,7 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) { s.startedLock.Lock() defer s.startedLock.Unlock() - logger, fifo := newQueueFIFO(logger, s.objectType, s.indexer, s.transform, s.identifier, s.fifoMetricsProvider) + logger, fifo := newQueueFIFO(logger, s.objectType, s.indexer, s.transform, s.identifier, s.informerMetricsProvider) cfg := &Config{ Queue: fifo, diff --git a/staging/src/k8s.io/client-go/tools/cache/store.go b/staging/src/k8s.io/client-go/tools/cache/store.go index 805d42ceec0..261ed60d9ca 100644 --- a/staging/src/k8s.io/client-go/tools/cache/store.go +++ b/staging/src/k8s.io/client-go/tools/cache/store.go @@ -209,6 +209,10 @@ type cache struct { keyFunc KeyFunc // Called with every object put in the cache. transformer TransformFunc + // identifier is used to identify the store for metrics. + identifier InformerNameAndResource + // metrics is the metrics provider for the store. + metrics InformerMetricsProvider } var _ Store = &cache{} @@ -395,22 +399,41 @@ func WithTransformer(transformer TransformFunc) StoreOption { } } +func WithStoreMetrics(identifier InformerNameAndResource, metrics InformerMetricsProvider) StoreOption { + return func(c *cache) { + c.identifier = identifier + c.metrics = metrics + } +} + // NewStore returns a Store implemented simply with a map and a lock. func NewStore(keyFunc KeyFunc, opts ...StoreOption) Store { c := &cache{ - cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}), - keyFunc: keyFunc, + keyFunc: keyFunc, } for _, opt := range opts { opt(c) } + threadSafeOpts := []ThreadSafeStoreOption{} + if c.metrics != nil { + threadSafeOpts = append(threadSafeOpts, WithThreadSafeStoreMetrics(c.identifier, c.metrics)) + } + c.cacheStorage = NewThreadSafeStore(Indexers{}, Indices{}, threadSafeOpts...) return c } // NewIndexer returns an Indexer implemented simply with a map and a lock. -func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer { - return &cache{ - cacheStorage: NewThreadSafeStore(indexers, Indices{}), - keyFunc: keyFunc, +func NewIndexer(keyFunc KeyFunc, indexers Indexers, opts ...StoreOption) Indexer { + c := &cache{ + keyFunc: keyFunc, } + for _, opt := range opts { + opt(c) + } + threadSafeOpts := []ThreadSafeStoreOption{} + if c.metrics != nil { + threadSafeOpts = append(threadSafeOpts, WithThreadSafeStoreMetrics(c.identifier, c.metrics)) + } + c.cacheStorage = NewThreadSafeStore(indexers, Indices{}, threadSafeOpts...) + return c } diff --git a/staging/src/k8s.io/client-go/tools/cache/the_real_fifo.go b/staging/src/k8s.io/client-go/tools/cache/the_real_fifo.go index bd6a3791fe8..f29c66653cb 100644 --- a/staging/src/k8s.io/client-go/tools/cache/the_real_fifo.go +++ b/staging/src/k8s.io/client-go/tools/cache/the_real_fifo.go @@ -72,7 +72,7 @@ type RealFIFOOptions struct { Identifier InformerNameAndResource // MetricsProvider is used to create metrics for the FIFO. - MetricsProvider FIFOMetricsProvider + MetricsProvider InformerMetricsProvider // EmitDeltaTypeBookmark is used to specify whether the RealFIFO will emit // bookmark deltas or not. This can only be set if AtomicEvents is true. diff --git a/staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go b/staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go index a306468ec80..74ac8f1ab26 100644 --- a/staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go +++ b/staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go @@ -18,6 +18,7 @@ package cache import ( "fmt" + "strconv" "sync" "time" @@ -81,6 +82,14 @@ type ThreadSafeStoreTransaction struct { Key string } +type ThreadSafeStoreOption = func(*threadSafeMap) + +func WithThreadSafeStoreMetrics(identifier InformerNameAndResource, metricsProvider InformerMetricsProvider) ThreadSafeStoreOption { + return func(c *threadSafeMap) { + c.metrics = newStoreMetrics(identifier, metricsProvider) + } +} + // storeIndex implements the indexing functionality for Store interface type storeIndex struct { // indexers maps a name to an IndexFunc @@ -251,6 +260,10 @@ type threadSafeMap struct { // index implements the indexing functionality index *storeIndex rv string + + // metrics is used to expose metrics about the store + // and must be non-nil. If not provided, a noop implementation will be used. + metrics *storeMetrics } func (c *threadSafeMap) Transaction(txns ...ThreadSafeStoreTransaction) { @@ -259,6 +272,7 @@ func (c *threadSafeMap) Transaction(txns ...ThreadSafeStoreTransaction) { } finalObj := txns[len(txns)-1].Object rv, rvErr := rvFromObject(finalObj) + rvInt, parseErr := parseRVForMetricsWithTruncation(rv) c.lock.Lock() defer c.lock.Unlock() trace := utiltrace.New("ThreadSafeMap Transaction Process", @@ -278,6 +292,9 @@ func (c *threadSafeMap) Transaction(txns ...ThreadSafeStoreTransaction) { } if rvErr == nil { c.rv = rv + if parseErr == nil { + c.metrics.storeResourceVersion.Set(float64(rvInt)) + } } } @@ -291,11 +308,15 @@ func (c *threadSafeMap) addLocked(key string, obj interface{}) { func (c *threadSafeMap) Update(key string, obj interface{}) { rv, rvErr := rvFromObject(obj) + rvInt, parseErr := parseRVForMetricsWithTruncation(rv) c.lock.Lock() defer c.lock.Unlock() c.updateLocked(key, obj) if rvErr == nil { c.rv = rv + if parseErr == nil { + c.metrics.storeResourceVersion.Set(float64(rvInt)) + } } } @@ -311,15 +332,20 @@ func (c *threadSafeMap) Delete(key string) { func (c *threadSafeMap) DeleteWithObject(key string, obj interface{}) { var rv string - var rvErr error + var rvInt int64 + var rvErr, parseErr error if obj != nil { rv, rvErr = rvFromObject(obj) + rvInt, parseErr = parseRVForMetricsWithTruncation(rv) } c.lock.Lock() defer c.lock.Unlock() c.deleteLocked(key) if obj != nil && rvErr == nil { c.rv = rv + if parseErr == nil { + c.metrics.storeResourceVersion.Set(float64(rvInt)) + } } } @@ -360,10 +386,18 @@ func (c *threadSafeMap) ListKeys() []string { } func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) { + var rvInt int64 + var parseErr error + if resourceVersion != "" { + rvInt, parseErr = parseRVForMetricsWithTruncation(resourceVersion) + } c.lock.Lock() defer c.lock.Unlock() c.items = items c.rv = resourceVersion + if parseErr == nil { + c.metrics.storeResourceVersion.Set(float64(rvInt)) + } // rebuild any index c.index.reset() for key, item := range c.items { @@ -411,9 +445,17 @@ func (c *threadSafeMap) LastStoreSyncResourceVersion() string { // Bookmark sets the latest resource version that the store has seen. func (c *threadSafeMap) Bookmark(rv string) { + var rvInt int64 + var parseErr error + if rv != "" { + rvInt, parseErr = parseRVForMetricsWithTruncation(rv) + } c.lock.Lock() defer c.lock.Unlock() c.rv = rv + if parseErr == nil { + c.metrics.storeResourceVersion.Set(float64(rvInt)) + } } // ByIndex returns a list of the items whose indexed values in the given index include the given indexed value @@ -480,13 +522,31 @@ func (c *threadSafeMap) Resync() error { return nil } -// NewThreadSafeStore creates a new instance of ThreadSafeStore. -func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore { - return &threadSafeMap{ +func NewThreadSafeStore(indexers Indexers, indices Indices, opts ...ThreadSafeStoreOption) ThreadSafeStore { + store := &threadSafeMap{ items: map[string]interface{}{}, index: &storeIndex{ indexers: indexers, indices: indices, }, } + for _, opt := range opts { + opt(store) + } + if store.metrics == nil { + store.metrics = newStoreMetrics(InformerNameAndResource{}, noopInformerMetricsProvider{}) + } + return store +} + +func parseRVForMetricsWithTruncation(rv string) (int64, error) { + if rv == "" { + return 0, nil + } + // Truncate to last 15 digits to ensure metrics are always less than 2^53-1 + // and avoid imprecise float64 representation. + if len(rv) > 15 { + rv = rv[len(rv)-15:] + } + return strconv.ParseInt(rv, 10, 64) } diff --git a/staging/src/k8s.io/component-base/metrics/prometheus/clientgo/fifo/metrics.go b/staging/src/k8s.io/component-base/metrics/prometheus/clientgo/fifo/metrics.go index 9eb04b30f4a..edc835437d5 100644 --- a/staging/src/k8s.io/component-base/metrics/prometheus/clientgo/fifo/metrics.go +++ b/staging/src/k8s.io/component-base/metrics/prometheus/clientgo/fifo/metrics.go @@ -46,6 +46,15 @@ var ( }, []string{"name", "group", "version", "resource"}, ) + storeResourceVersion = k8smetrics.NewGaugeVec( + &k8smetrics.GaugeOpts{ + Subsystem: "informer", + Name: "store_resource_version", + Help: "The 15 least significant digits of the resource version of the store.", + StabilityLevel: k8smetrics.ALPHA, + }, + []string{"name", "group", "version", "resource"}, + ) registerOnce sync.Once ) @@ -58,13 +67,14 @@ func Register() { registerOnce.Do(func() { legacyregistry.MustRegister(fifoQueuedItems) legacyregistry.MustRegister(fifoProcessingLatency) + legacyregistry.MustRegister(storeResourceVersion) }) - cache.SetFIFOMetricsProvider(fifoMetricsProvider{}) + cache.SetInformerMetricsProvider(informerMetricsProvider{}) } -type fifoMetricsProvider struct{} +type informerMetricsProvider struct{} -func (fifoMetricsProvider) NewQueuedItemMetric(id cache.InformerNameAndResource) cache.GaugeMetric { +func (informerMetricsProvider) NewQueuedItemMetric(id cache.InformerNameAndResource) cache.GaugeMetric { return &reservedGaugeMetric{ id: id, gauge: fifoQueuedItems.WithLabelValues( @@ -76,7 +86,7 @@ func (fifoMetricsProvider) NewQueuedItemMetric(id cache.InformerNameAndResource) } } -func (fifoMetricsProvider) NewProcessingLatencyMetric(id cache.InformerNameAndResource) cache.HistogramMetric { +func (informerMetricsProvider) NewProcessingLatencyMetric(id cache.InformerNameAndResource) cache.HistogramMetric { return &reservedHistogramMetric{ id: id, histogram: fifoProcessingLatency.WithLabelValues( @@ -88,6 +98,18 @@ func (fifoMetricsProvider) NewProcessingLatencyMetric(id cache.InformerNameAndRe } } +func (informerMetricsProvider) NewStoreResourceVersionMetric(id cache.InformerNameAndResource) cache.GaugeMetric { + return &reservedGaugeMetric{ + id: id, + gauge: storeResourceVersion.WithLabelValues( + id.Name(), + id.GroupVersionResource().Group, + id.GroupVersionResource().Version, + id.GroupVersionResource().Resource, + ), + } +} + // reservedGaugeMetric wraps a gauge and only updates it if the identifier // is still reserved. This supports dynamic informers (e.g., GC, ResourceQuota) // that may shut down while the process is still running. diff --git a/test/integration/client/metrics/fifo_metrics_test.go b/test/integration/client/metrics/fifo_metrics_test.go index 2bf76e1b5ef..848180809dc 100644 --- a/test/integration/client/metrics/fifo_metrics_test.go +++ b/test/integration/client/metrics/fifo_metrics_test.go @@ -21,6 +21,8 @@ import ( "strings" "testing" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/tools/cache" "k8s.io/component-base/metrics" @@ -44,40 +46,40 @@ func TestRealFIFO_Metrics(t *testing.T) { { name: "Add increases metric", actions: []func(f *cache.RealFIFO){ - func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("foo", 1)) }, + func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("foo", "1")) }, }, expectedQueuedItems: 1, }, { name: "multiple Adds increase metric", actions: []func(f *cache.RealFIFO){ - func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("foo", 1)) }, - func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("bar", 2)) }, - func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("baz", 3)) }, + func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("foo", "1")) }, + func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("bar", "2")) }, + func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("baz", "3")) }, }, expectedQueuedItems: 3, }, { name: "Update increases metric", actions: []func(f *cache.RealFIFO){ - func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("foo", 1)) }, - func(f *cache.RealFIFO) { _ = f.Update(mkFifoObj("foo", 2)) }, + func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("foo", "1")) }, + func(f *cache.RealFIFO) { _ = f.Update(mkFifoObj("foo", "2")) }, }, expectedQueuedItems: 2, }, { name: "Delete increases metric", actions: []func(f *cache.RealFIFO){ - func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("foo", 1)) }, - func(f *cache.RealFIFO) { _ = f.Delete(mkFifoObj("foo", 2)) }, + func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("foo", "1")) }, + func(f *cache.RealFIFO) { _ = f.Delete(mkFifoObj("foo", "2")) }, }, expectedQueuedItems: 2, }, { name: "Pop decreases metric and records latency", actions: []func(f *cache.RealFIFO){ - func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("foo", 1)) }, - func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("bar", 2)) }, + func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("foo", "1")) }, + func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("bar", "2")) }, func(f *cache.RealFIFO) { _, _ = f.Pop(func(obj interface{}, isInInitialList bool) error { return nil }) }, @@ -88,9 +90,9 @@ func TestRealFIFO_Metrics(t *testing.T) { { name: "PopBatch decreases metric and records latency", actions: []func(f *cache.RealFIFO){ - func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("foo", 1)) }, - func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("bar", 2)) }, - func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("baz", 3)) }, + func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("foo", "1")) }, + func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("bar", "2")) }, + func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("baz", "3")) }, func(f *cache.RealFIFO) { _ = f.PopBatch( func(deltas []cache.Delta, isInInitialList bool) error { return nil }, @@ -104,12 +106,12 @@ func TestRealFIFO_Metrics(t *testing.T) { { name: "Replace sets metric to new count", actions: []func(f *cache.RealFIFO){ - func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("old", 1)) }, + func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("old", "1")) }, func(f *cache.RealFIFO) { _ = f.Replace([]interface{}{ - mkFifoObj("foo", 1), - mkFifoObj("bar", 2), - }, "0") + mkFifoObj("foo", "1"), + mkFifoObj("bar", "2"), + }, "50") }, }, // 1 (Add) + 1 (Delete for "old") + 2 (Replace items) = 4 @@ -119,7 +121,7 @@ func TestRealFIFO_Metrics(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - metricsProvider := newTestFIFOMetricsProvider() + metricsProvider := newTestInformerMetricsProvider() informerName, err := cache.NewInformerName("test-fifo") if err != nil { t.Fatalf("NewInformerName() unexpected error: %v", err) @@ -128,8 +130,12 @@ func TestRealFIFO_Metrics(t *testing.T) { id := informerName.WithResource(podsGVR) f := cache.NewRealFIFOWithOptions(cache.RealFIFOOptions{ - KeyFunction: testFifoObjectKeyFunc, - KnownObjects: emptyKnownObjects(), + KeyFunction: cache.DeletionHandlingMetaNamespaceKeyFunc, + KnownObjects: cache.NewIndexer( + cache.DeletionHandlingMetaNamespaceKeyFunc, + cache.Indexers{}, + cache.WithStoreMetrics(id, metricsProvider), + ), Identifier: id, MetricsProvider: metricsProvider, }) @@ -148,20 +154,24 @@ func TestRealFIFO_Metrics(t *testing.T) { } func TestRealFIFO_MetricsNotPublishedForUnnamedFIFO(t *testing.T) { - metricsProvider := newTestFIFOMetricsProvider() + metricsProvider := newTestInformerMetricsProvider() // No InformerName configured - should not publish metrics var id cache.InformerNameAndResource f := cache.NewRealFIFOWithOptions(cache.RealFIFOOptions{ - KeyFunction: testFifoObjectKeyFunc, - KnownObjects: emptyKnownObjects(), + KeyFunction: cache.DeletionHandlingMetaNamespaceKeyFunc, + KnownObjects: cache.NewIndexer( + cache.DeletionHandlingMetaNamespaceKeyFunc, + cache.Indexers{}, + cache.WithStoreMetrics(id, metricsProvider), + ), Identifier: id, MetricsProvider: metricsProvider, }) // Perform operations - _ = f.Add(mkFifoObj("foo", 1)) - _ = f.Add(mkFifoObj("bar", 2)) + _ = f.Add(mkFifoObj("foo", "1")) + _ = f.Add(mkFifoObj("bar", "2")) // No metrics should be created because there's no identifier configured want := "" @@ -171,7 +181,7 @@ func TestRealFIFO_MetricsNotPublishedForUnnamedFIFO(t *testing.T) { } func TestRealFIFO_MetricsNotPublishedForDuplicateGVR(t *testing.T) { - metricsProvider := newTestFIFOMetricsProvider() + metricsProvider := newTestInformerMetricsProvider() // Create InformerName informerName, err := cache.NewInformerName("duplicate-test") @@ -185,8 +195,12 @@ func TestRealFIFO_MetricsNotPublishedForDuplicateGVR(t *testing.T) { t.Fatal("Expected first identifier to be reserved") } f1 := cache.NewRealFIFOWithOptions(cache.RealFIFOOptions{ - KeyFunction: testFifoObjectKeyFunc, - KnownObjects: emptyKnownObjects(), + KeyFunction: cache.DeletionHandlingMetaNamespaceKeyFunc, + KnownObjects: cache.NewIndexer( + cache.DeletionHandlingMetaNamespaceKeyFunc, + cache.Indexers{}, + cache.WithStoreMetrics(id1, metricsProvider), + ), Identifier: id1, MetricsProvider: metricsProvider, }) @@ -197,22 +211,26 @@ func TestRealFIFO_MetricsNotPublishedForDuplicateGVR(t *testing.T) { t.Fatal("Expected second identifier with same GVR to not be reserved") } f2 := cache.NewRealFIFOWithOptions(cache.RealFIFOOptions{ - KeyFunction: testFifoObjectKeyFunc, - KnownObjects: emptyKnownObjects(), + KeyFunction: cache.DeletionHandlingMetaNamespaceKeyFunc, + KnownObjects: cache.NewIndexer( + cache.DeletionHandlingMetaNamespaceKeyFunc, + cache.Indexers{}, + cache.WithStoreMetrics(id2, metricsProvider), + ), Identifier: id2, MetricsProvider: metricsProvider, }) // Add items to both FIFOs - _ = f1.Add(mkFifoObj("foo", 1)) - _ = f2.Add(mkFifoObj("bar", 2)) + _ = f1.Add(mkFifoObj("foo", "1")) + _ = f2.Add(mkFifoObj("bar", "2")) // Only f1's metric should be published, f2 uses noopMetric verifyQueuedItems(t, metricsProvider, "duplicate-test", podsGVR, 1) } func TestRealFIFO_MetricsTrackedIndependentlyForDifferentFIFOs(t *testing.T) { - metricsProvider := newTestFIFOMetricsProvider() + metricsProvider := newTestInformerMetricsProvider() // Create two InformerNames with different names - both should be unique informerName1, err := cache.NewInformerName("fifo-1") @@ -223,8 +241,12 @@ func TestRealFIFO_MetricsTrackedIndependentlyForDifferentFIFOs(t *testing.T) { id1 := informerName1.WithResource(podsGVR) f1 := cache.NewRealFIFOWithOptions(cache.RealFIFOOptions{ - KeyFunction: testFifoObjectKeyFunc, - KnownObjects: emptyKnownObjects(), + KeyFunction: cache.DeletionHandlingMetaNamespaceKeyFunc, + KnownObjects: cache.NewIndexer( + cache.DeletionHandlingMetaNamespaceKeyFunc, + cache.Indexers{}, + cache.WithStoreMetrics(id1, metricsProvider), + ), Identifier: id1, MetricsProvider: metricsProvider, }) @@ -237,18 +259,22 @@ func TestRealFIFO_MetricsTrackedIndependentlyForDifferentFIFOs(t *testing.T) { id2 := informerName2.WithResource(podsGVR) f2 := cache.NewRealFIFOWithOptions(cache.RealFIFOOptions{ - KeyFunction: testFifoObjectKeyFunc, - KnownObjects: emptyKnownObjects(), + KeyFunction: cache.DeletionHandlingMetaNamespaceKeyFunc, + KnownObjects: cache.NewIndexer( + cache.DeletionHandlingMetaNamespaceKeyFunc, + cache.Indexers{}, + cache.WithStoreMetrics(id2, metricsProvider), + ), Identifier: id2, MetricsProvider: metricsProvider, }) // Add items to f1 - _ = f1.Add(mkFifoObj("foo", 1)) - _ = f1.Add(mkFifoObj("bar", 2)) + _ = f1.Add(mkFifoObj("foo", "1")) + _ = f1.Add(mkFifoObj("bar", "2")) // Add items to f2 - _ = f2.Add(mkFifoObj("baz", 3)) + _ = f2.Add(mkFifoObj("baz", "3")) // Verify metrics are tracked independently wantInformerQueuedItemsMetric := `# HELP informer_queued_items [ALPHA] Number of items currently queued in the FIFO. @@ -284,73 +310,145 @@ informer_processing_latency_seconds_count{group="",name="fifo-2",resource="pods" } } -type testFifoObject struct { - name string - val interface{} -} - -func testFifoObjectKeyFunc(obj interface{}) (string, error) { - return obj.(testFifoObject).name, nil -} - -func mkFifoObj(name string, val interface{}) testFifoObject { - return testFifoObject{name: name, val: val} -} - -type literalListerGetter func() []testFifoObject - -func (l literalListerGetter) List() []interface{} { - if l == nil { - return nil - } - result := []interface{}{} - for _, item := range l() { - result = append(result, item) - } - return result -} - -func (l literalListerGetter) ListKeys() []string { - if l == nil { - return nil - } - result := []string{} - for _, item := range l() { - result = append(result, item.name) - } - return result -} - -func (l literalListerGetter) Get(key string) (interface{}, bool, error) { - for _, item := range l() { - if item.name == key { - return item, true, nil - } - } - return nil, false, nil -} - -func (l literalListerGetter) GetByKey(key string) (interface{}, bool, error) { - return l.Get(key) -} - -func emptyKnownObjects() cache.KeyListerGetter { - return literalListerGetter( - func() []testFifoObject { - return []testFifoObject{} +func TestStore_MetricsTrackedForResourceVersions(t *testing.T) { + tests := []struct { + name string + actions []func(f cache.Store) + expectedMetric int64 + }{ + { + name: "empty store has zero metric", + actions: []func(f cache.Store){}, + expectedMetric: 0, }, + { + name: "Add sets metric", + actions: []func(f cache.Store){ + func(f cache.Store) { _ = f.Add(mkFifoObj("foo", "1")) }, + }, + expectedMetric: 1, + }, + { + name: "multiple Adds increase metric", + actions: []func(f cache.Store){ + func(f cache.Store) { _ = f.Add(mkFifoObj("bar", "2")) }, + func(f cache.Store) { _ = f.Add(mkFifoObj("baz", "3")) }, + }, + expectedMetric: 3, + }, + { + name: "Update increases metric", + actions: []func(f cache.Store){ + func(f cache.Store) { _ = f.Add(mkFifoObj("foo", "1")) }, + func(f cache.Store) { _ = f.Update(mkFifoObj("foo", "2")) }, + }, + expectedMetric: 2, + }, + { + name: "Delete sets metric", + actions: []func(f cache.Store){ + func(f cache.Store) { _ = f.Add(mkFifoObj("foo", "1")) }, + func(f cache.Store) { _ = f.Delete(mkFifoObj("foo", "2")) }, + }, + expectedMetric: 2, + }, + { + name: "Replace sets metric to new count and updates store resource version metric", + actions: []func(f cache.Store){ + func(f cache.Store) { _ = f.Add(mkFifoObj("old", "1")) }, + func(f cache.Store) { + _ = f.Replace([]interface{}{ + mkFifoObj("foo", "10"), + mkFifoObj("bar", "20"), + }, "50") + }, + }, + expectedMetric: 50, + }, + { + name: "Metrics are truncated to last 15 digits", + actions: []func(f cache.Store){ + func(f cache.Store) { _ = f.Add(mkFifoObj("old", "1")) }, + func(f cache.Store) { + _ = f.Replace([]interface{}{ + mkFifoObj("foo", "10"), + mkFifoObj("bar", "20"), + }, "123456789012345678901234567890") + }, + }, + expectedMetric: 678901234567890, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + metricsProvider := newTestInformerMetricsProvider() + informerName, err := cache.NewInformerName("test-fifo") + if err != nil { + t.Fatalf("NewInformerName() unexpected error: %v", err) + } + defer informerName.Release() + id := informerName.WithResource(podsGVR) + store := cache.NewIndexer( + cache.DeletionHandlingMetaNamespaceKeyFunc, + cache.Indexers{}, + cache.WithStoreMetrics(id, metricsProvider), + ) + + for _, action := range tt.actions { + action(store) + } + + verifyStoreResourceVersion(t, metricsProvider, "test-fifo", podsGVR, tt.expectedMetric) + }) + } +} + +func BenchmarkStoreWithMetrics(b *testing.B) { + metricsProvider := newTestInformerMetricsProvider() + informerName, err := cache.NewInformerName("test-fifo") + if err != nil { + b.Fatalf("NewInformerName() unexpected error: %v", err) + } + defer informerName.Release() + id := informerName.WithResource(podsGVR) + store := cache.NewIndexer( + cache.DeletionHandlingMetaNamespaceKeyFunc, + cache.Indexers{}, + cache.WithStoreMetrics(id, metricsProvider), ) + + objectCount := 5000 + objects := make([]*v1.Pod, 0, 5000) + for i := range objectCount { + objects = append(objects, mkFifoObj(fmt.Sprintf("object-number-%d", i), fmt.Sprintf("%d", i))) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = store.Update(objects[i%objectCount]) + } } -// testFIFOMetricsProvider is a test implementation of cache.FIFOMetricsProvider +func mkFifoObj(name string, resourceVersion string) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + ResourceVersion: resourceVersion, + }, + } +} + +// testInformerMetricsProvider is a test implementation of cache.InformerMetricsProvider // that uses real component-base metrics registered with a custom registry. -type testFIFOMetricsProvider struct { - registry metrics.KubeRegistry - gauge *metrics.GaugeVec - histogram *metrics.HistogramVec +type testInformerMetricsProvider struct { + registry metrics.KubeRegistry + gauge *metrics.GaugeVec + histogram *metrics.HistogramVec + storeResourceVersion *metrics.GaugeVec } -func newTestFIFOMetricsProvider() *testFIFOMetricsProvider { +func newTestInformerMetricsProvider() *testInformerMetricsProvider { registry := metrics.NewKubeRegistry() gauge := metrics.NewGaugeVec( &metrics.GaugeOpts{ @@ -371,24 +469,39 @@ func newTestFIFOMetricsProvider() *testFIFOMetricsProvider { }, []string{"name", "group", "version", "resource"}, ) + storeResourceVersion := metrics.NewGaugeVec( + &metrics.GaugeOpts{ + Subsystem: "informer", + Name: "store_resource_version", + Help: "The 15 least significant digits of the resource version of the store.", + StabilityLevel: metrics.ALPHA, + }, + []string{"name", "group", "version", "resource"}, + ) registry.MustRegister(gauge) registry.MustRegister(histogram) - return &testFIFOMetricsProvider{ - registry: registry, - gauge: gauge, - histogram: histogram, + registry.MustRegister(storeResourceVersion) + return &testInformerMetricsProvider{ + registry: registry, + gauge: gauge, + histogram: histogram, + storeResourceVersion: storeResourceVersion, } } -func (p *testFIFOMetricsProvider) NewQueuedItemMetric(id cache.InformerNameAndResource) cache.GaugeMetric { +func (p *testInformerMetricsProvider) NewQueuedItemMetric(id cache.InformerNameAndResource) cache.GaugeMetric { return p.gauge.WithLabelValues(id.Name(), id.GroupVersionResource().Group, id.GroupVersionResource().Version, id.GroupVersionResource().Resource) } -func (p *testFIFOMetricsProvider) NewProcessingLatencyMetric(id cache.InformerNameAndResource) cache.HistogramMetric { +func (p *testInformerMetricsProvider) NewProcessingLatencyMetric(id cache.InformerNameAndResource) cache.HistogramMetric { return p.histogram.WithLabelValues(id.Name(), id.GroupVersionResource().Group, id.GroupVersionResource().Version, id.GroupVersionResource().Resource) } -func verifyQueuedItems(t *testing.T, metricsProvider *testFIFOMetricsProvider, informerName string, gvr schema.GroupVersionResource, expected int) { +func (p *testInformerMetricsProvider) NewStoreResourceVersionMetric(id cache.InformerNameAndResource) cache.GaugeMetric { + return p.storeResourceVersion.WithLabelValues(id.Name(), id.GroupVersionResource().Group, id.GroupVersionResource().Version, id.GroupVersionResource().Resource) +} + +func verifyQueuedItems(t *testing.T, metricsProvider *testInformerMetricsProvider, informerName string, gvr schema.GroupVersionResource, expected int) { t.Helper() want := fmt.Sprintf(`# HELP informer_queued_items [ALPHA] Number of items currently queued in the FIFO. # TYPE informer_queued_items gauge @@ -402,7 +515,7 @@ informer_queued_items{group="%s",name="%s",resource="%s",version="%s"} %d // verifyLatencyObservations checks the histogram observation count using a custom gatherer // that strips timing-dependent values (bucket counts and sum) from the comparison, so that // we only verify the number of observations without being affected by the duration values. -func verifyLatencyObservations(t *testing.T, metricsProvider *testFIFOMetricsProvider, informerName string, gvr schema.GroupVersionResource, expected uint64) { +func verifyLatencyObservations(t *testing.T, metricsProvider *testInformerMetricsProvider, informerName string, gvr schema.GroupVersionResource, expected uint64) { t.Helper() if expected == 0 { return @@ -417,7 +530,18 @@ informer_processing_latency_seconds_count{group="%s",name="%s",resource="%s",ver } } -func (p *testFIFOMetricsProvider) gatherWithoutDurations() testutil.GathererFunc { +func verifyStoreResourceVersion(t *testing.T, metricsProvider *testInformerMetricsProvider, informerName string, gvr schema.GroupVersionResource, expected int64) { + t.Helper() + want := fmt.Sprintf(`# HELP informer_store_resource_version [ALPHA] The 15 least significant digits of the resource version of the store. +# TYPE informer_store_resource_version gauge +informer_store_resource_version{group="%s",name="%s",resource="%s",version="%s"} %d +`, gvr.Group, informerName, gvr.Resource, gvr.Version, expected) + if err := testutil.GatherAndCompare(metricsProvider.registry, strings.NewReader(want), "informer_store_resource_version"); err != nil { + t.Fatal(err) + } +} + +func (p *testInformerMetricsProvider) gatherWithoutDurations() testutil.GathererFunc { return func() ([]*testutil.MetricFamily, error) { got, err := p.registry.Gather() for _, mf := range got {