Merge pull request #133053 from serathius/jitter

Add jitter to priodically executed processes in storage to avoid too many concurrent executions
This commit is contained in:
Kubernetes Prow Robot 2025-07-18 14:08:32 -07:00 committed by GitHub
commit bfc6d87ac1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 44 additions and 30 deletions

View file

@ -321,13 +321,14 @@ type getLister interface {
func (c consistencyChecker) startChecking(stopCh <-chan struct{}) {
klog.V(3).InfoS("Cache consistency check start", "group", c.groupResource.Group, "resource", c.groupResource.Resource)
err := wait.PollUntilContextCancel(wait.ContextForChannel(stopCh), ConsistencyCheckPeriod, false, func(ctx context.Context) (done bool, err error) {
c.check(ctx)
return false, nil
})
if err != nil {
klog.V(3).InfoS("Cache consistency check exiting", "group", c.groupResource.Group, "resource", c.groupResource.Resource, "err", err)
jitter := 0.5 // Period between [interval, interval * (1.0 + jitter)]
sliding := true
// wait.JitterUntilWithContext starts work immediately, so wait first.
select {
case <-time.After(wait.Jitter(ConsistencyCheckPeriod, jitter)):
case <-stopCh:
}
wait.JitterUntilWithContext(wait.ContextForChannel(stopCh), c.check, ConsistencyCheckPeriod, jitter, sliding)
}
func (c *consistencyChecker) check(ctx context.Context) {

View file

@ -62,13 +62,15 @@ func newStatsCache(prefix string, getKeys storage.KeysFunc) *statsCache {
// thus we run a background goroutine to periodically cleanup keys if needed.
type statsCache struct {
prefix string
getKeys storage.KeysFunc
stop chan struct{}
wg sync.WaitGroup
lastKeyCleanup atomic.Pointer[time.Time]
lock sync.Mutex
keys map[string]sizeRevision
getKeysLock sync.Mutex
getKeys storage.KeysFunc
keysLock sync.Mutex
keys map[string]sizeRevision
}
type sizeRevision struct {
@ -77,15 +79,15 @@ type sizeRevision struct {
}
func (sc *statsCache) Stats(ctx context.Context) (storage.Stats, error) {
keys, err := sc.getKeys(ctx)
keys, err := sc.GetKeys(ctx)
if err != nil {
return storage.Stats{}, err
}
stats := storage.Stats{
ObjectCount: int64(len(keys)),
}
sc.lock.Lock()
defer sc.lock.Unlock()
sc.keysLock.Lock()
defer sc.keysLock.Unlock()
sc.cleanKeys(keys)
if len(sc.keys) != 0 {
stats.EstimatedAverageObjectSizeBytes = sc.keySizes() / int64(len(sc.keys))
@ -93,7 +95,18 @@ func (sc *statsCache) Stats(ctx context.Context) (storage.Stats, error) {
return stats, nil
}
func (sc *statsCache) GetKeys(ctx context.Context) ([]string, error) {
sc.getKeysLock.Lock()
getKeys := sc.getKeys
sc.getKeysLock.Unlock()
// Don't execute getKeys under lock.
return getKeys(ctx)
}
func (sc *statsCache) SetKeysFunc(keys storage.KeysFunc) {
sc.getKeysLock.Lock()
defer sc.getKeysLock.Unlock()
sc.getKeys = keys
}
@ -103,13 +116,14 @@ func (sc *statsCache) Close() {
}
func (sc *statsCache) run() {
err := wait.PollUntilContextCancel(wait.ContextForChannel(sc.stop), sizerRefreshInterval, false, func(ctx context.Context) (done bool, err error) {
sc.cleanKeysIfNeeded(ctx)
return false, nil
})
if err != nil {
klog.InfoS("Sizer exiting")
jitter := 0.5 // Period between [interval, interval * (1.0 + jitter)]
sliding := true
// wait.JitterUntilWithContext starts work immediately, so wait first.
select {
case <-time.After(wait.Jitter(sizerRefreshInterval, jitter)):
case <-sc.stop:
}
wait.JitterUntilWithContext(wait.ContextForChannel(sc.stop), sc.cleanKeysIfNeeded, sizerRefreshInterval, jitter, sliding)
}
func (sc *statsCache) cleanKeysIfNeeded(ctx context.Context) {
@ -117,13 +131,12 @@ func (sc *statsCache) cleanKeysIfNeeded(ctx context.Context) {
if lastKeyCleanup != nil && time.Since(*lastKeyCleanup) < sizerRefreshInterval {
return
}
// Don't execute getKeys under lock.
keys, err := sc.getKeys(ctx)
keys, err := sc.GetKeys(ctx)
if err != nil {
klog.InfoS("Error getting keys", "err", err)
}
sc.lock.Lock()
defer sc.lock.Unlock()
sc.keysLock.Lock()
defer sc.keysLock.Unlock()
sc.cleanKeys(keys)
}
@ -157,16 +170,16 @@ func (sc *statsCache) keySizes() (totalSize int64) {
}
func (sc *statsCache) Update(kvs []*mvccpb.KeyValue) {
sc.lock.Lock()
defer sc.lock.Unlock()
sc.keysLock.Lock()
defer sc.keysLock.Unlock()
for _, kv := range kvs {
sc.updateKey(kv)
}
}
func (sc *statsCache) UpdateKey(kv *mvccpb.KeyValue) {
sc.lock.Lock()
defer sc.lock.Unlock()
sc.keysLock.Lock()
defer sc.keysLock.Unlock()
sc.updateKey(kv)
}
@ -185,8 +198,8 @@ func (sc *statsCache) updateKey(kv *mvccpb.KeyValue) {
}
func (sc *statsCache) DeleteKey(kv *mvccpb.KeyValue) {
sc.lock.Lock()
defer sc.lock.Unlock()
sc.keysLock.Lock()
defer sc.keysLock.Unlock()
key := string(kv.Key)
keySizeRevision := sc.keys[key]

View file

@ -645,8 +645,8 @@ func TestWatchCacheConsistencyCheckMetrics(t *testing.T) {
if err != nil {
t.Fatal(err)
}
// Do at least 2 scrape cycles to require 2 successes
delay := 2 * period
// wait 3 periods to for 2 scrape cycles (takes 1-1.5 period) and require 2 successes
delay := 3 * period
time.Sleep(delay)
resp, err := rt.RoundTrip(req)
if err != nil {