Merge pull request #137419 from michaelasp/rvInformerMetrics

Add metric tracking the latest cached rv of informers
This commit is contained in:
Kubernetes Prow Robot 2026-03-11 03:53:24 +05:30 committed by GitHub
commit fdbea74545
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 408 additions and 150 deletions

View file

@ -452,9 +452,9 @@ type InformerOptions struct {
// If not set, metrics will not be published.
Identifier InformerNameAndResource
// FIFOMetricsProvider is the metrics provider for the FIFO queue.
// InformerMetricsProvider is the metrics provider for the informer.
// If not set, metrics will be no-ops.
FIFOMetricsProvider FIFOMetricsProvider
InformerMetricsProvider InformerMetricsProvider
}
// NewInformerWithOptions returns a Store and a controller for populating the store
@ -464,9 +464,9 @@ type InformerOptions struct {
func NewInformerWithOptions(options InformerOptions) (Store, Controller) {
var clientState Store
if options.Indexers == nil {
clientState = NewStore(DeletionHandlingMetaNamespaceKeyFunc)
clientState = NewStore(DeletionHandlingMetaNamespaceKeyFunc, WithStoreMetrics(options.Identifier, options.InformerMetricsProvider))
} else {
clientState = NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers)
clientState = NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers, WithStoreMetrics(options.Identifier, options.InformerMetricsProvider))
}
return clientState, newInformer(clientState, options, DeletionHandlingMetaNamespaceKeyFunc)
}
@ -814,7 +814,7 @@ func newInformer(clientState Store, options InformerOptions, keyFunc KeyFunc) Co
if options.Logger != nil {
logger = *options.Logger
}
logger, fifo := newQueueFIFO(logger, options.ObjectType, clientState, options.Transform, options.Identifier, options.FIFOMetricsProvider)
logger, fifo := newQueueFIFO(logger, options.ObjectType, clientState, options.Transform, options.Identifier, options.InformerMetricsProvider)
cfg := &Config{
Queue: fifo,
@ -844,7 +844,7 @@ func newInformer(clientState Store, options InformerOptions, keyFunc KeyFunc) Co
// It returns the FIFO and the logger used by the FIFO.
// That logger includes the name used for the FIFO,
// in contrast to the logger which was passed in.
func newQueueFIFO(logger klog.Logger, objectType any, clientState Store, transform TransformFunc, identifier InformerNameAndResource, metricsProvider FIFOMetricsProvider) (klog.Logger, Queue) {
func newQueueFIFO(logger klog.Logger, objectType any, clientState Store, transform TransformFunc, identifier InformerNameAndResource, metricsProvider InformerMetricsProvider) (klog.Logger, Queue) {
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) {
options := RealFIFOOptions{
Logger: &logger,

View file

@ -28,14 +28,14 @@ import (
)
var (
globalFIFOMetricsProvider FIFOMetricsProvider = noopFIFOMetricsProvider{}
setFIFOMetricsProviderOnce sync.Once
globalInformerMetricsProvider InformerMetricsProvider = noopInformerMetricsProvider{}
setInformerMetricsProviderOnce sync.Once
)
type noopFIFOMetricsProvider struct{}
type noopInformerMetricsProvider struct{}
// FIFOMetricsProvider defines an interface for creating metrics that track FIFO queue operations.
type FIFOMetricsProvider interface {
// InformerMetricsProvider defines an interface for creating metrics that track informer operations.
type InformerMetricsProvider interface {
// NewQueuedItemMetric returns a gauge metric for tracking the total number of items
// currently queued and waiting to be processed.
// The returned metric should check id.Reserved() before updating to support
@ -51,6 +51,11 @@ type FIFOMetricsProvider interface {
// The returned metric should check id.Reserved() before updating to support
// dynamic informers that may shut down while the process is still running.
NewProcessingLatencyMetric(id InformerNameAndResource) HistogramMetric
// NewStoreResourceVersionMetric returns a gauge metric for tracking the resource version of the store.
// The returned metric should check id.Reserved() before updating to support
// dynamic informers that may shut down while the process is still running.
NewStoreResourceVersionMetric(id InformerNameAndResource) GaugeMetric
}
// fifoMetrics holds all metrics for a FIFO.
@ -59,17 +64,22 @@ type fifoMetrics struct {
processingLatency HistogramMetric
}
// SetFIFOMetricsProvider sets the metrics provider for all subsequently created
// storeMetrics holds all metrics for a store.
type storeMetrics struct {
storeResourceVersion GaugeMetric
}
// SetInformerMetricsProvider sets the metrics provider for all subsequently created
// FIFOs. Only the first call has an effect.
func SetFIFOMetricsProvider(metricsProvider FIFOMetricsProvider) {
setFIFOMetricsProviderOnce.Do(func() {
globalFIFOMetricsProvider = metricsProvider
func SetInformerMetricsProvider(metricsProvider InformerMetricsProvider) {
setInformerMetricsProviderOnce.Do(func() {
globalInformerMetricsProvider = metricsProvider
})
}
func newFIFOMetrics(id InformerNameAndResource, metricsProvider FIFOMetricsProvider) *fifoMetrics {
func newFIFOMetrics(id InformerNameAndResource, metricsProvider InformerMetricsProvider) *fifoMetrics {
if metricsProvider == nil {
metricsProvider = globalFIFOMetricsProvider
metricsProvider = globalInformerMetricsProvider
}
metrics := &fifoMetrics{
numberOfQueuedItem: noopMetric{},
@ -84,10 +94,29 @@ func newFIFOMetrics(id InformerNameAndResource, metricsProvider FIFOMetricsProvi
return metrics
}
func (noopFIFOMetricsProvider) NewQueuedItemMetric(InformerNameAndResource) GaugeMetric {
func newStoreMetrics(id InformerNameAndResource, metricsProvider InformerMetricsProvider) *storeMetrics {
if metricsProvider == nil {
metricsProvider = globalInformerMetricsProvider
}
metrics := &storeMetrics{
storeResourceVersion: noopMetric{},
}
if id.Reserved() {
metrics.storeResourceVersion = metricsProvider.NewStoreResourceVersionMetric(id)
}
return metrics
}
func (noopInformerMetricsProvider) NewQueuedItemMetric(InformerNameAndResource) GaugeMetric {
return noopMetric{}
}
func (noopFIFOMetricsProvider) NewProcessingLatencyMetric(InformerNameAndResource) HistogramMetric {
func (noopInformerMetricsProvider) NewProcessingLatencyMetric(InformerNameAndResource) HistogramMetric {
return noopMetric{}
}
func (noopInformerMetricsProvider) NewStoreResourceVersionMetric(InformerNameAndResource) GaugeMetric {
return noopMetric{}
}

View file

@ -323,7 +323,7 @@ func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.O
processor.listenersRCond = sync.NewCond(processor.listenersLock.RLocker())
return &sharedIndexInformer{
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers),
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers, WithStoreMetrics(options.Identifier, options.InformerMetricsProvider)),
processor: processor,
synced: make(chan struct{}),
listerWatcher: lw,
@ -334,7 +334,7 @@ func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.O
clock: realClock,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
identifier: options.Identifier,
fifoMetricsProvider: options.FIFOMetricsProvider,
informerMetricsProvider: options.InformerMetricsProvider,
keyFunc: DeletionHandlingMetaNamespaceKeyFunc,
}
}
@ -356,9 +356,9 @@ type SharedIndexInformerOptions struct {
// If not set, metrics will not be published.
Identifier InformerNameAndResource
// FIFOMetricsProvider is the metrics provider for the FIFO queue.
// InformerMetricsProvider is the metrics provider for the FIFO queue.
// If not set, metrics will be no-ops.
FIFOMetricsProvider FIFOMetricsProvider
InformerMetricsProvider InformerMetricsProvider
}
// InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced.
@ -630,8 +630,8 @@ type sharedIndexInformer struct {
// identifier is used to identify this informer for metrics and logging purposes.
identifier InformerNameAndResource
// fifoMetricsProvider is the metrics provider for the FIFO queue.
fifoMetricsProvider FIFOMetricsProvider
// informerMetricsProvider is the metrics provider for the FIFO queue.
informerMetricsProvider InformerMetricsProvider
// keyFunc is called when processing deltas by the underlying process function.
keyFunc KeyFunc
@ -729,7 +729,7 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) {
s.startedLock.Lock()
defer s.startedLock.Unlock()
logger, fifo := newQueueFIFO(logger, s.objectType, s.indexer, s.transform, s.identifier, s.fifoMetricsProvider)
logger, fifo := newQueueFIFO(logger, s.objectType, s.indexer, s.transform, s.identifier, s.informerMetricsProvider)
cfg := &Config{
Queue: fifo,

View file

@ -209,6 +209,10 @@ type cache struct {
keyFunc KeyFunc
// Called with every object put in the cache.
transformer TransformFunc
// identifier is used to identify the store for metrics.
identifier InformerNameAndResource
// metrics is the metrics provider for the store.
metrics InformerMetricsProvider
}
var _ Store = &cache{}
@ -395,22 +399,41 @@ func WithTransformer(transformer TransformFunc) StoreOption {
}
}
func WithStoreMetrics(identifier InformerNameAndResource, metrics InformerMetricsProvider) StoreOption {
return func(c *cache) {
c.identifier = identifier
c.metrics = metrics
}
}
// NewStore returns a Store implemented simply with a map and a lock.
func NewStore(keyFunc KeyFunc, opts ...StoreOption) Store {
c := &cache{
cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
keyFunc: keyFunc,
keyFunc: keyFunc,
}
for _, opt := range opts {
opt(c)
}
threadSafeOpts := []ThreadSafeStoreOption{}
if c.metrics != nil {
threadSafeOpts = append(threadSafeOpts, WithThreadSafeStoreMetrics(c.identifier, c.metrics))
}
c.cacheStorage = NewThreadSafeStore(Indexers{}, Indices{}, threadSafeOpts...)
return c
}
// NewIndexer returns an Indexer implemented simply with a map and a lock.
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
return &cache{
cacheStorage: NewThreadSafeStore(indexers, Indices{}),
keyFunc: keyFunc,
func NewIndexer(keyFunc KeyFunc, indexers Indexers, opts ...StoreOption) Indexer {
c := &cache{
keyFunc: keyFunc,
}
for _, opt := range opts {
opt(c)
}
threadSafeOpts := []ThreadSafeStoreOption{}
if c.metrics != nil {
threadSafeOpts = append(threadSafeOpts, WithThreadSafeStoreMetrics(c.identifier, c.metrics))
}
c.cacheStorage = NewThreadSafeStore(indexers, Indices{}, threadSafeOpts...)
return c
}

View file

@ -72,7 +72,7 @@ type RealFIFOOptions struct {
Identifier InformerNameAndResource
// MetricsProvider is used to create metrics for the FIFO.
MetricsProvider FIFOMetricsProvider
MetricsProvider InformerMetricsProvider
// EmitDeltaTypeBookmark is used to specify whether the RealFIFO will emit
// bookmark deltas or not. This can only be set if AtomicEvents is true.

View file

@ -18,6 +18,7 @@ package cache
import (
"fmt"
"strconv"
"sync"
"time"
@ -81,6 +82,14 @@ type ThreadSafeStoreTransaction struct {
Key string
}
type ThreadSafeStoreOption = func(*threadSafeMap)
func WithThreadSafeStoreMetrics(identifier InformerNameAndResource, metricsProvider InformerMetricsProvider) ThreadSafeStoreOption {
return func(c *threadSafeMap) {
c.metrics = newStoreMetrics(identifier, metricsProvider)
}
}
// storeIndex implements the indexing functionality for Store interface
type storeIndex struct {
// indexers maps a name to an IndexFunc
@ -251,6 +260,10 @@ type threadSafeMap struct {
// index implements the indexing functionality
index *storeIndex
rv string
// metrics is used to expose metrics about the store
// and must be non-nil. If not provided, a noop implementation will be used.
metrics *storeMetrics
}
func (c *threadSafeMap) Transaction(txns ...ThreadSafeStoreTransaction) {
@ -259,6 +272,7 @@ func (c *threadSafeMap) Transaction(txns ...ThreadSafeStoreTransaction) {
}
finalObj := txns[len(txns)-1].Object
rv, rvErr := rvFromObject(finalObj)
rvInt, parseErr := parseRVForMetricsWithTruncation(rv)
c.lock.Lock()
defer c.lock.Unlock()
trace := utiltrace.New("ThreadSafeMap Transaction Process",
@ -278,6 +292,9 @@ func (c *threadSafeMap) Transaction(txns ...ThreadSafeStoreTransaction) {
}
if rvErr == nil {
c.rv = rv
if parseErr == nil {
c.metrics.storeResourceVersion.Set(float64(rvInt))
}
}
}
@ -291,11 +308,15 @@ func (c *threadSafeMap) addLocked(key string, obj interface{}) {
func (c *threadSafeMap) Update(key string, obj interface{}) {
rv, rvErr := rvFromObject(obj)
rvInt, parseErr := parseRVForMetricsWithTruncation(rv)
c.lock.Lock()
defer c.lock.Unlock()
c.updateLocked(key, obj)
if rvErr == nil {
c.rv = rv
if parseErr == nil {
c.metrics.storeResourceVersion.Set(float64(rvInt))
}
}
}
@ -311,15 +332,20 @@ func (c *threadSafeMap) Delete(key string) {
func (c *threadSafeMap) DeleteWithObject(key string, obj interface{}) {
var rv string
var rvErr error
var rvInt int64
var rvErr, parseErr error
if obj != nil {
rv, rvErr = rvFromObject(obj)
rvInt, parseErr = parseRVForMetricsWithTruncation(rv)
}
c.lock.Lock()
defer c.lock.Unlock()
c.deleteLocked(key)
if obj != nil && rvErr == nil {
c.rv = rv
if parseErr == nil {
c.metrics.storeResourceVersion.Set(float64(rvInt))
}
}
}
@ -360,10 +386,18 @@ func (c *threadSafeMap) ListKeys() []string {
}
func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
var rvInt int64
var parseErr error
if resourceVersion != "" {
rvInt, parseErr = parseRVForMetricsWithTruncation(resourceVersion)
}
c.lock.Lock()
defer c.lock.Unlock()
c.items = items
c.rv = resourceVersion
if parseErr == nil {
c.metrics.storeResourceVersion.Set(float64(rvInt))
}
// rebuild any index
c.index.reset()
for key, item := range c.items {
@ -411,9 +445,17 @@ func (c *threadSafeMap) LastStoreSyncResourceVersion() string {
// Bookmark sets the latest resource version that the store has seen.
func (c *threadSafeMap) Bookmark(rv string) {
var rvInt int64
var parseErr error
if rv != "" {
rvInt, parseErr = parseRVForMetricsWithTruncation(rv)
}
c.lock.Lock()
defer c.lock.Unlock()
c.rv = rv
if parseErr == nil {
c.metrics.storeResourceVersion.Set(float64(rvInt))
}
}
// ByIndex returns a list of the items whose indexed values in the given index include the given indexed value
@ -480,13 +522,31 @@ func (c *threadSafeMap) Resync() error {
return nil
}
// NewThreadSafeStore creates a new instance of ThreadSafeStore.
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
return &threadSafeMap{
func NewThreadSafeStore(indexers Indexers, indices Indices, opts ...ThreadSafeStoreOption) ThreadSafeStore {
store := &threadSafeMap{
items: map[string]interface{}{},
index: &storeIndex{
indexers: indexers,
indices: indices,
},
}
for _, opt := range opts {
opt(store)
}
if store.metrics == nil {
store.metrics = newStoreMetrics(InformerNameAndResource{}, noopInformerMetricsProvider{})
}
return store
}
func parseRVForMetricsWithTruncation(rv string) (int64, error) {
if rv == "" {
return 0, nil
}
// Truncate to last 15 digits to ensure metrics are always less than 2^53-1
// and avoid imprecise float64 representation.
if len(rv) > 15 {
rv = rv[len(rv)-15:]
}
return strconv.ParseInt(rv, 10, 64)
}

View file

@ -46,6 +46,15 @@ var (
},
[]string{"name", "group", "version", "resource"},
)
storeResourceVersion = k8smetrics.NewGaugeVec(
&k8smetrics.GaugeOpts{
Subsystem: "informer",
Name: "store_resource_version",
Help: "The 15 least significant digits of the resource version of the store.",
StabilityLevel: k8smetrics.ALPHA,
},
[]string{"name", "group", "version", "resource"},
)
registerOnce sync.Once
)
@ -58,13 +67,14 @@ func Register() {
registerOnce.Do(func() {
legacyregistry.MustRegister(fifoQueuedItems)
legacyregistry.MustRegister(fifoProcessingLatency)
legacyregistry.MustRegister(storeResourceVersion)
})
cache.SetFIFOMetricsProvider(fifoMetricsProvider{})
cache.SetInformerMetricsProvider(informerMetricsProvider{})
}
type fifoMetricsProvider struct{}
type informerMetricsProvider struct{}
func (fifoMetricsProvider) NewQueuedItemMetric(id cache.InformerNameAndResource) cache.GaugeMetric {
func (informerMetricsProvider) NewQueuedItemMetric(id cache.InformerNameAndResource) cache.GaugeMetric {
return &reservedGaugeMetric{
id: id,
gauge: fifoQueuedItems.WithLabelValues(
@ -76,7 +86,7 @@ func (fifoMetricsProvider) NewQueuedItemMetric(id cache.InformerNameAndResource)
}
}
func (fifoMetricsProvider) NewProcessingLatencyMetric(id cache.InformerNameAndResource) cache.HistogramMetric {
func (informerMetricsProvider) NewProcessingLatencyMetric(id cache.InformerNameAndResource) cache.HistogramMetric {
return &reservedHistogramMetric{
id: id,
histogram: fifoProcessingLatency.WithLabelValues(
@ -88,6 +98,18 @@ func (fifoMetricsProvider) NewProcessingLatencyMetric(id cache.InformerNameAndRe
}
}
func (informerMetricsProvider) NewStoreResourceVersionMetric(id cache.InformerNameAndResource) cache.GaugeMetric {
return &reservedGaugeMetric{
id: id,
gauge: storeResourceVersion.WithLabelValues(
id.Name(),
id.GroupVersionResource().Group,
id.GroupVersionResource().Version,
id.GroupVersionResource().Resource,
),
}
}
// reservedGaugeMetric wraps a gauge and only updates it if the identifier
// is still reserved. This supports dynamic informers (e.g., GC, ResourceQuota)
// that may shut down while the process is still running.

View file

@ -21,6 +21,8 @@ import (
"strings"
"testing"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/cache"
"k8s.io/component-base/metrics"
@ -44,40 +46,40 @@ func TestRealFIFO_Metrics(t *testing.T) {
{
name: "Add increases metric",
actions: []func(f *cache.RealFIFO){
func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("foo", 1)) },
func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("foo", "1")) },
},
expectedQueuedItems: 1,
},
{
name: "multiple Adds increase metric",
actions: []func(f *cache.RealFIFO){
func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("foo", 1)) },
func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("bar", 2)) },
func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("baz", 3)) },
func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("foo", "1")) },
func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("bar", "2")) },
func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("baz", "3")) },
},
expectedQueuedItems: 3,
},
{
name: "Update increases metric",
actions: []func(f *cache.RealFIFO){
func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("foo", 1)) },
func(f *cache.RealFIFO) { _ = f.Update(mkFifoObj("foo", 2)) },
func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("foo", "1")) },
func(f *cache.RealFIFO) { _ = f.Update(mkFifoObj("foo", "2")) },
},
expectedQueuedItems: 2,
},
{
name: "Delete increases metric",
actions: []func(f *cache.RealFIFO){
func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("foo", 1)) },
func(f *cache.RealFIFO) { _ = f.Delete(mkFifoObj("foo", 2)) },
func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("foo", "1")) },
func(f *cache.RealFIFO) { _ = f.Delete(mkFifoObj("foo", "2")) },
},
expectedQueuedItems: 2,
},
{
name: "Pop decreases metric and records latency",
actions: []func(f *cache.RealFIFO){
func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("foo", 1)) },
func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("bar", 2)) },
func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("foo", "1")) },
func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("bar", "2")) },
func(f *cache.RealFIFO) {
_, _ = f.Pop(func(obj interface{}, isInInitialList bool) error { return nil })
},
@ -88,9 +90,9 @@ func TestRealFIFO_Metrics(t *testing.T) {
{
name: "PopBatch decreases metric and records latency",
actions: []func(f *cache.RealFIFO){
func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("foo", 1)) },
func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("bar", 2)) },
func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("baz", 3)) },
func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("foo", "1")) },
func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("bar", "2")) },
func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("baz", "3")) },
func(f *cache.RealFIFO) {
_ = f.PopBatch(
func(deltas []cache.Delta, isInInitialList bool) error { return nil },
@ -104,12 +106,12 @@ func TestRealFIFO_Metrics(t *testing.T) {
{
name: "Replace sets metric to new count",
actions: []func(f *cache.RealFIFO){
func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("old", 1)) },
func(f *cache.RealFIFO) { _ = f.Add(mkFifoObj("old", "1")) },
func(f *cache.RealFIFO) {
_ = f.Replace([]interface{}{
mkFifoObj("foo", 1),
mkFifoObj("bar", 2),
}, "0")
mkFifoObj("foo", "1"),
mkFifoObj("bar", "2"),
}, "50")
},
},
// 1 (Add) + 1 (Delete for "old") + 2 (Replace items) = 4
@ -119,7 +121,7 @@ func TestRealFIFO_Metrics(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
metricsProvider := newTestFIFOMetricsProvider()
metricsProvider := newTestInformerMetricsProvider()
informerName, err := cache.NewInformerName("test-fifo")
if err != nil {
t.Fatalf("NewInformerName() unexpected error: %v", err)
@ -128,8 +130,12 @@ func TestRealFIFO_Metrics(t *testing.T) {
id := informerName.WithResource(podsGVR)
f := cache.NewRealFIFOWithOptions(cache.RealFIFOOptions{
KeyFunction: testFifoObjectKeyFunc,
KnownObjects: emptyKnownObjects(),
KeyFunction: cache.DeletionHandlingMetaNamespaceKeyFunc,
KnownObjects: cache.NewIndexer(
cache.DeletionHandlingMetaNamespaceKeyFunc,
cache.Indexers{},
cache.WithStoreMetrics(id, metricsProvider),
),
Identifier: id,
MetricsProvider: metricsProvider,
})
@ -148,20 +154,24 @@ func TestRealFIFO_Metrics(t *testing.T) {
}
func TestRealFIFO_MetricsNotPublishedForUnnamedFIFO(t *testing.T) {
metricsProvider := newTestFIFOMetricsProvider()
metricsProvider := newTestInformerMetricsProvider()
// No InformerName configured - should not publish metrics
var id cache.InformerNameAndResource
f := cache.NewRealFIFOWithOptions(cache.RealFIFOOptions{
KeyFunction: testFifoObjectKeyFunc,
KnownObjects: emptyKnownObjects(),
KeyFunction: cache.DeletionHandlingMetaNamespaceKeyFunc,
KnownObjects: cache.NewIndexer(
cache.DeletionHandlingMetaNamespaceKeyFunc,
cache.Indexers{},
cache.WithStoreMetrics(id, metricsProvider),
),
Identifier: id,
MetricsProvider: metricsProvider,
})
// Perform operations
_ = f.Add(mkFifoObj("foo", 1))
_ = f.Add(mkFifoObj("bar", 2))
_ = f.Add(mkFifoObj("foo", "1"))
_ = f.Add(mkFifoObj("bar", "2"))
// No metrics should be created because there's no identifier configured
want := ""
@ -171,7 +181,7 @@ func TestRealFIFO_MetricsNotPublishedForUnnamedFIFO(t *testing.T) {
}
func TestRealFIFO_MetricsNotPublishedForDuplicateGVR(t *testing.T) {
metricsProvider := newTestFIFOMetricsProvider()
metricsProvider := newTestInformerMetricsProvider()
// Create InformerName
informerName, err := cache.NewInformerName("duplicate-test")
@ -185,8 +195,12 @@ func TestRealFIFO_MetricsNotPublishedForDuplicateGVR(t *testing.T) {
t.Fatal("Expected first identifier to be reserved")
}
f1 := cache.NewRealFIFOWithOptions(cache.RealFIFOOptions{
KeyFunction: testFifoObjectKeyFunc,
KnownObjects: emptyKnownObjects(),
KeyFunction: cache.DeletionHandlingMetaNamespaceKeyFunc,
KnownObjects: cache.NewIndexer(
cache.DeletionHandlingMetaNamespaceKeyFunc,
cache.Indexers{},
cache.WithStoreMetrics(id1, metricsProvider),
),
Identifier: id1,
MetricsProvider: metricsProvider,
})
@ -197,22 +211,26 @@ func TestRealFIFO_MetricsNotPublishedForDuplicateGVR(t *testing.T) {
t.Fatal("Expected second identifier with same GVR to not be reserved")
}
f2 := cache.NewRealFIFOWithOptions(cache.RealFIFOOptions{
KeyFunction: testFifoObjectKeyFunc,
KnownObjects: emptyKnownObjects(),
KeyFunction: cache.DeletionHandlingMetaNamespaceKeyFunc,
KnownObjects: cache.NewIndexer(
cache.DeletionHandlingMetaNamespaceKeyFunc,
cache.Indexers{},
cache.WithStoreMetrics(id2, metricsProvider),
),
Identifier: id2,
MetricsProvider: metricsProvider,
})
// Add items to both FIFOs
_ = f1.Add(mkFifoObj("foo", 1))
_ = f2.Add(mkFifoObj("bar", 2))
_ = f1.Add(mkFifoObj("foo", "1"))
_ = f2.Add(mkFifoObj("bar", "2"))
// Only f1's metric should be published, f2 uses noopMetric
verifyQueuedItems(t, metricsProvider, "duplicate-test", podsGVR, 1)
}
func TestRealFIFO_MetricsTrackedIndependentlyForDifferentFIFOs(t *testing.T) {
metricsProvider := newTestFIFOMetricsProvider()
metricsProvider := newTestInformerMetricsProvider()
// Create two InformerNames with different names - both should be unique
informerName1, err := cache.NewInformerName("fifo-1")
@ -223,8 +241,12 @@ func TestRealFIFO_MetricsTrackedIndependentlyForDifferentFIFOs(t *testing.T) {
id1 := informerName1.WithResource(podsGVR)
f1 := cache.NewRealFIFOWithOptions(cache.RealFIFOOptions{
KeyFunction: testFifoObjectKeyFunc,
KnownObjects: emptyKnownObjects(),
KeyFunction: cache.DeletionHandlingMetaNamespaceKeyFunc,
KnownObjects: cache.NewIndexer(
cache.DeletionHandlingMetaNamespaceKeyFunc,
cache.Indexers{},
cache.WithStoreMetrics(id1, metricsProvider),
),
Identifier: id1,
MetricsProvider: metricsProvider,
})
@ -237,18 +259,22 @@ func TestRealFIFO_MetricsTrackedIndependentlyForDifferentFIFOs(t *testing.T) {
id2 := informerName2.WithResource(podsGVR)
f2 := cache.NewRealFIFOWithOptions(cache.RealFIFOOptions{
KeyFunction: testFifoObjectKeyFunc,
KnownObjects: emptyKnownObjects(),
KeyFunction: cache.DeletionHandlingMetaNamespaceKeyFunc,
KnownObjects: cache.NewIndexer(
cache.DeletionHandlingMetaNamespaceKeyFunc,
cache.Indexers{},
cache.WithStoreMetrics(id2, metricsProvider),
),
Identifier: id2,
MetricsProvider: metricsProvider,
})
// Add items to f1
_ = f1.Add(mkFifoObj("foo", 1))
_ = f1.Add(mkFifoObj("bar", 2))
_ = f1.Add(mkFifoObj("foo", "1"))
_ = f1.Add(mkFifoObj("bar", "2"))
// Add items to f2
_ = f2.Add(mkFifoObj("baz", 3))
_ = f2.Add(mkFifoObj("baz", "3"))
// Verify metrics are tracked independently
wantInformerQueuedItemsMetric := `# HELP informer_queued_items [ALPHA] Number of items currently queued in the FIFO.
@ -284,73 +310,145 @@ informer_processing_latency_seconds_count{group="",name="fifo-2",resource="pods"
}
}
type testFifoObject struct {
name string
val interface{}
}
func testFifoObjectKeyFunc(obj interface{}) (string, error) {
return obj.(testFifoObject).name, nil
}
func mkFifoObj(name string, val interface{}) testFifoObject {
return testFifoObject{name: name, val: val}
}
type literalListerGetter func() []testFifoObject
func (l literalListerGetter) List() []interface{} {
if l == nil {
return nil
}
result := []interface{}{}
for _, item := range l() {
result = append(result, item)
}
return result
}
func (l literalListerGetter) ListKeys() []string {
if l == nil {
return nil
}
result := []string{}
for _, item := range l() {
result = append(result, item.name)
}
return result
}
func (l literalListerGetter) Get(key string) (interface{}, bool, error) {
for _, item := range l() {
if item.name == key {
return item, true, nil
}
}
return nil, false, nil
}
func (l literalListerGetter) GetByKey(key string) (interface{}, bool, error) {
return l.Get(key)
}
func emptyKnownObjects() cache.KeyListerGetter {
return literalListerGetter(
func() []testFifoObject {
return []testFifoObject{}
func TestStore_MetricsTrackedForResourceVersions(t *testing.T) {
tests := []struct {
name string
actions []func(f cache.Store)
expectedMetric int64
}{
{
name: "empty store has zero metric",
actions: []func(f cache.Store){},
expectedMetric: 0,
},
{
name: "Add sets metric",
actions: []func(f cache.Store){
func(f cache.Store) { _ = f.Add(mkFifoObj("foo", "1")) },
},
expectedMetric: 1,
},
{
name: "multiple Adds increase metric",
actions: []func(f cache.Store){
func(f cache.Store) { _ = f.Add(mkFifoObj("bar", "2")) },
func(f cache.Store) { _ = f.Add(mkFifoObj("baz", "3")) },
},
expectedMetric: 3,
},
{
name: "Update increases metric",
actions: []func(f cache.Store){
func(f cache.Store) { _ = f.Add(mkFifoObj("foo", "1")) },
func(f cache.Store) { _ = f.Update(mkFifoObj("foo", "2")) },
},
expectedMetric: 2,
},
{
name: "Delete sets metric",
actions: []func(f cache.Store){
func(f cache.Store) { _ = f.Add(mkFifoObj("foo", "1")) },
func(f cache.Store) { _ = f.Delete(mkFifoObj("foo", "2")) },
},
expectedMetric: 2,
},
{
name: "Replace sets metric to new count and updates store resource version metric",
actions: []func(f cache.Store){
func(f cache.Store) { _ = f.Add(mkFifoObj("old", "1")) },
func(f cache.Store) {
_ = f.Replace([]interface{}{
mkFifoObj("foo", "10"),
mkFifoObj("bar", "20"),
}, "50")
},
},
expectedMetric: 50,
},
{
name: "Metrics are truncated to last 15 digits",
actions: []func(f cache.Store){
func(f cache.Store) { _ = f.Add(mkFifoObj("old", "1")) },
func(f cache.Store) {
_ = f.Replace([]interface{}{
mkFifoObj("foo", "10"),
mkFifoObj("bar", "20"),
}, "123456789012345678901234567890")
},
},
expectedMetric: 678901234567890,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
metricsProvider := newTestInformerMetricsProvider()
informerName, err := cache.NewInformerName("test-fifo")
if err != nil {
t.Fatalf("NewInformerName() unexpected error: %v", err)
}
defer informerName.Release()
id := informerName.WithResource(podsGVR)
store := cache.NewIndexer(
cache.DeletionHandlingMetaNamespaceKeyFunc,
cache.Indexers{},
cache.WithStoreMetrics(id, metricsProvider),
)
for _, action := range tt.actions {
action(store)
}
verifyStoreResourceVersion(t, metricsProvider, "test-fifo", podsGVR, tt.expectedMetric)
})
}
}
func BenchmarkStoreWithMetrics(b *testing.B) {
metricsProvider := newTestInformerMetricsProvider()
informerName, err := cache.NewInformerName("test-fifo")
if err != nil {
b.Fatalf("NewInformerName() unexpected error: %v", err)
}
defer informerName.Release()
id := informerName.WithResource(podsGVR)
store := cache.NewIndexer(
cache.DeletionHandlingMetaNamespaceKeyFunc,
cache.Indexers{},
cache.WithStoreMetrics(id, metricsProvider),
)
objectCount := 5000
objects := make([]*v1.Pod, 0, 5000)
for i := range objectCount {
objects = append(objects, mkFifoObj(fmt.Sprintf("object-number-%d", i), fmt.Sprintf("%d", i)))
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = store.Update(objects[i%objectCount])
}
}
// testFIFOMetricsProvider is a test implementation of cache.FIFOMetricsProvider
func mkFifoObj(name string, resourceVersion string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
ResourceVersion: resourceVersion,
},
}
}
// testInformerMetricsProvider is a test implementation of cache.InformerMetricsProvider
// that uses real component-base metrics registered with a custom registry.
type testFIFOMetricsProvider struct {
registry metrics.KubeRegistry
gauge *metrics.GaugeVec
histogram *metrics.HistogramVec
type testInformerMetricsProvider struct {
registry metrics.KubeRegistry
gauge *metrics.GaugeVec
histogram *metrics.HistogramVec
storeResourceVersion *metrics.GaugeVec
}
func newTestFIFOMetricsProvider() *testFIFOMetricsProvider {
func newTestInformerMetricsProvider() *testInformerMetricsProvider {
registry := metrics.NewKubeRegistry()
gauge := metrics.NewGaugeVec(
&metrics.GaugeOpts{
@ -371,24 +469,39 @@ func newTestFIFOMetricsProvider() *testFIFOMetricsProvider {
},
[]string{"name", "group", "version", "resource"},
)
storeResourceVersion := metrics.NewGaugeVec(
&metrics.GaugeOpts{
Subsystem: "informer",
Name: "store_resource_version",
Help: "The 15 least significant digits of the resource version of the store.",
StabilityLevel: metrics.ALPHA,
},
[]string{"name", "group", "version", "resource"},
)
registry.MustRegister(gauge)
registry.MustRegister(histogram)
return &testFIFOMetricsProvider{
registry: registry,
gauge: gauge,
histogram: histogram,
registry.MustRegister(storeResourceVersion)
return &testInformerMetricsProvider{
registry: registry,
gauge: gauge,
histogram: histogram,
storeResourceVersion: storeResourceVersion,
}
}
func (p *testFIFOMetricsProvider) NewQueuedItemMetric(id cache.InformerNameAndResource) cache.GaugeMetric {
func (p *testInformerMetricsProvider) NewQueuedItemMetric(id cache.InformerNameAndResource) cache.GaugeMetric {
return p.gauge.WithLabelValues(id.Name(), id.GroupVersionResource().Group, id.GroupVersionResource().Version, id.GroupVersionResource().Resource)
}
func (p *testFIFOMetricsProvider) NewProcessingLatencyMetric(id cache.InformerNameAndResource) cache.HistogramMetric {
func (p *testInformerMetricsProvider) NewProcessingLatencyMetric(id cache.InformerNameAndResource) cache.HistogramMetric {
return p.histogram.WithLabelValues(id.Name(), id.GroupVersionResource().Group, id.GroupVersionResource().Version, id.GroupVersionResource().Resource)
}
func verifyQueuedItems(t *testing.T, metricsProvider *testFIFOMetricsProvider, informerName string, gvr schema.GroupVersionResource, expected int) {
func (p *testInformerMetricsProvider) NewStoreResourceVersionMetric(id cache.InformerNameAndResource) cache.GaugeMetric {
return p.storeResourceVersion.WithLabelValues(id.Name(), id.GroupVersionResource().Group, id.GroupVersionResource().Version, id.GroupVersionResource().Resource)
}
func verifyQueuedItems(t *testing.T, metricsProvider *testInformerMetricsProvider, informerName string, gvr schema.GroupVersionResource, expected int) {
t.Helper()
want := fmt.Sprintf(`# HELP informer_queued_items [ALPHA] Number of items currently queued in the FIFO.
# TYPE informer_queued_items gauge
@ -402,7 +515,7 @@ informer_queued_items{group="%s",name="%s",resource="%s",version="%s"} %d
// verifyLatencyObservations checks the histogram observation count using a custom gatherer
// that strips timing-dependent values (bucket counts and sum) from the comparison, so that
// we only verify the number of observations without being affected by the duration values.
func verifyLatencyObservations(t *testing.T, metricsProvider *testFIFOMetricsProvider, informerName string, gvr schema.GroupVersionResource, expected uint64) {
func verifyLatencyObservations(t *testing.T, metricsProvider *testInformerMetricsProvider, informerName string, gvr schema.GroupVersionResource, expected uint64) {
t.Helper()
if expected == 0 {
return
@ -417,7 +530,18 @@ informer_processing_latency_seconds_count{group="%s",name="%s",resource="%s",ver
}
}
func (p *testFIFOMetricsProvider) gatherWithoutDurations() testutil.GathererFunc {
func verifyStoreResourceVersion(t *testing.T, metricsProvider *testInformerMetricsProvider, informerName string, gvr schema.GroupVersionResource, expected int64) {
t.Helper()
want := fmt.Sprintf(`# HELP informer_store_resource_version [ALPHA] The 15 least significant digits of the resource version of the store.
# TYPE informer_store_resource_version gauge
informer_store_resource_version{group="%s",name="%s",resource="%s",version="%s"} %d
`, gvr.Group, informerName, gvr.Resource, gvr.Version, expected)
if err := testutil.GatherAndCompare(metricsProvider.registry, strings.NewReader(want), "informer_store_resource_version"); err != nil {
t.Fatal(err)
}
}
func (p *testInformerMetricsProvider) gatherWithoutDurations() testutil.GathererFunc {
return func() ([]*testutil.MetricFamily, error) {
got, err := p.registry.Gather()
for _, mf := range got {