From 79dc0b8a4146a411d8c106705558deedd8f7f8bf Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 18 Jul 2025 10:57:49 +0200 Subject: [PATCH] Add jitter to priodically executed process in storage to avoid too concurrent executions --- .../apiserver/pkg/storage/cacher/delegator.go | 13 +++-- .../apiserver/pkg/storage/etcd3/stats.go | 57 ++++++++++++------- test/integration/metrics/metrics_test.go | 4 +- 3 files changed, 44 insertions(+), 30 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go index 914f0eaced5..68d269ca35e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/delegator.go @@ -321,13 +321,14 @@ type getLister interface { func (c consistencyChecker) startChecking(stopCh <-chan struct{}) { klog.V(3).InfoS("Cache consistency check start", "group", c.groupResource.Group, "resource", c.groupResource.Resource) - err := wait.PollUntilContextCancel(wait.ContextForChannel(stopCh), ConsistencyCheckPeriod, false, func(ctx context.Context) (done bool, err error) { - c.check(ctx) - return false, nil - }) - if err != nil { - klog.V(3).InfoS("Cache consistency check exiting", "group", c.groupResource.Group, "resource", c.groupResource.Resource, "err", err) + jitter := 0.5 // Period between [interval, interval * (1.0 + jitter)] + sliding := true + // wait.JitterUntilWithContext starts work immediately, so wait first. + select { + case <-time.After(wait.Jitter(ConsistencyCheckPeriod, jitter)): + case <-stopCh: } + wait.JitterUntilWithContext(wait.ContextForChannel(stopCh), c.check, ConsistencyCheckPeriod, jitter, sliding) } func (c *consistencyChecker) check(ctx context.Context) { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/stats.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/stats.go index fa1b1caa8d9..227c46e10ff 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/stats.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/stats.go @@ -62,13 +62,15 @@ func newStatsCache(prefix string, getKeys storage.KeysFunc) *statsCache { // thus we run a background goroutine to periodically cleanup keys if needed. type statsCache struct { prefix string - getKeys storage.KeysFunc stop chan struct{} wg sync.WaitGroup lastKeyCleanup atomic.Pointer[time.Time] - lock sync.Mutex - keys map[string]sizeRevision + getKeysLock sync.Mutex + getKeys storage.KeysFunc + + keysLock sync.Mutex + keys map[string]sizeRevision } type sizeRevision struct { @@ -77,15 +79,15 @@ type sizeRevision struct { } func (sc *statsCache) Stats(ctx context.Context) (storage.Stats, error) { - keys, err := sc.getKeys(ctx) + keys, err := sc.GetKeys(ctx) if err != nil { return storage.Stats{}, err } stats := storage.Stats{ ObjectCount: int64(len(keys)), } - sc.lock.Lock() - defer sc.lock.Unlock() + sc.keysLock.Lock() + defer sc.keysLock.Unlock() sc.cleanKeys(keys) if len(sc.keys) != 0 { stats.EstimatedAverageObjectSizeBytes = sc.keySizes() / int64(len(sc.keys)) @@ -93,7 +95,18 @@ func (sc *statsCache) Stats(ctx context.Context) (storage.Stats, error) { return stats, nil } +func (sc *statsCache) GetKeys(ctx context.Context) ([]string, error) { + sc.getKeysLock.Lock() + getKeys := sc.getKeys + sc.getKeysLock.Unlock() + + // Don't execute getKeys under lock. + return getKeys(ctx) +} + func (sc *statsCache) SetKeysFunc(keys storage.KeysFunc) { + sc.getKeysLock.Lock() + defer sc.getKeysLock.Unlock() sc.getKeys = keys } @@ -103,13 +116,14 @@ func (sc *statsCache) Close() { } func (sc *statsCache) run() { - err := wait.PollUntilContextCancel(wait.ContextForChannel(sc.stop), sizerRefreshInterval, false, func(ctx context.Context) (done bool, err error) { - sc.cleanKeysIfNeeded(ctx) - return false, nil - }) - if err != nil { - klog.InfoS("Sizer exiting") + jitter := 0.5 // Period between [interval, interval * (1.0 + jitter)] + sliding := true + // wait.JitterUntilWithContext starts work immediately, so wait first. + select { + case <-time.After(wait.Jitter(sizerRefreshInterval, jitter)): + case <-sc.stop: } + wait.JitterUntilWithContext(wait.ContextForChannel(sc.stop), sc.cleanKeysIfNeeded, sizerRefreshInterval, jitter, sliding) } func (sc *statsCache) cleanKeysIfNeeded(ctx context.Context) { @@ -117,13 +131,12 @@ func (sc *statsCache) cleanKeysIfNeeded(ctx context.Context) { if lastKeyCleanup != nil && time.Since(*lastKeyCleanup) < sizerRefreshInterval { return } - // Don't execute getKeys under lock. - keys, err := sc.getKeys(ctx) + keys, err := sc.GetKeys(ctx) if err != nil { klog.InfoS("Error getting keys", "err", err) } - sc.lock.Lock() - defer sc.lock.Unlock() + sc.keysLock.Lock() + defer sc.keysLock.Unlock() sc.cleanKeys(keys) } @@ -157,16 +170,16 @@ func (sc *statsCache) keySizes() (totalSize int64) { } func (sc *statsCache) Update(kvs []*mvccpb.KeyValue) { - sc.lock.Lock() - defer sc.lock.Unlock() + sc.keysLock.Lock() + defer sc.keysLock.Unlock() for _, kv := range kvs { sc.updateKey(kv) } } func (sc *statsCache) UpdateKey(kv *mvccpb.KeyValue) { - sc.lock.Lock() - defer sc.lock.Unlock() + sc.keysLock.Lock() + defer sc.keysLock.Unlock() sc.updateKey(kv) } @@ -185,8 +198,8 @@ func (sc *statsCache) updateKey(kv *mvccpb.KeyValue) { } func (sc *statsCache) DeleteKey(kv *mvccpb.KeyValue) { - sc.lock.Lock() - defer sc.lock.Unlock() + sc.keysLock.Lock() + defer sc.keysLock.Unlock() key := string(kv.Key) keySizeRevision := sc.keys[key] diff --git a/test/integration/metrics/metrics_test.go b/test/integration/metrics/metrics_test.go index 3aaa759d032..b7888e15931 100644 --- a/test/integration/metrics/metrics_test.go +++ b/test/integration/metrics/metrics_test.go @@ -645,8 +645,8 @@ func TestWatchCacheConsistencyCheckMetrics(t *testing.T) { if err != nil { t.Fatal(err) } - // Do at least 2 scrape cycles to require 2 successes - delay := 2 * period + // wait 3 periods to for 2 scrape cycles (takes 1-1.5 period) and require 2 successes + delay := 3 * period time.Sleep(delay) resp, err := rt.RoundTrip(req) if err != nil {