diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 8d5dd5bc903..d2b610610fe 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -1308,17 +1308,7 @@ func (c *Cacher) waitUntilWatchCacheFreshAndForceAllEvents(ctx context.Context, consistentReadSupported := delegator.ConsistentReadSupported() c.watchCache.RLock() defer c.watchCache.RUnlock() - if requestedWatchRV > 0 && requestedWatchRV > c.watchCache.resourceVersion { - if consistentReadSupported { - c.watchCache.waitingUntilFresh.Add() - err := c.watchCache.waitUntilFreshLocked(ctx, requestedWatchRV) - c.watchCache.waitingUntilFresh.Remove() - return err - } else { - return c.watchCache.waitUntilFreshLocked(ctx, requestedWatchRV) - } - } - return nil + return c.watchCache.waitUntilFreshLocked(ctx, consistentReadSupported, requestedWatchRV) } return nil } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go index 3bf2d2c7c64..5803b0abd57 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go @@ -161,7 +161,7 @@ func compactWatch(c *CacheDelegator, client *clientv3.Client) storagetesting.Com } c.cacher.watchCache.RLock() - err = c.cacher.watchCache.waitUntilFreshLocked(context.TODO(), rv) + err = c.cacher.watchCache.waitUntilFreshLocked(context.TODO(), false, rv) c.cacher.watchCache.RUnlock() if err != nil { t.Fatalf("WatchCache didn't caught up to RV: %v", rv) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index abd2c0b060a..5419be91139 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -442,11 +442,14 @@ func (w *watchCache) List() []interface{} { } // waitUntilFreshLocked waits until cache is at least as fresh as given resourceVersion. -func (w *watchCache) waitUntilFreshLocked(ctx context.Context, resourceVersion uint64) error { +func (w *watchCache) waitUntilFreshLocked(ctx context.Context, consistentReadSupported bool, resourceVersion uint64) error { if resourceVersion == 0 || resourceVersion <= w.resourceVersion { return nil } - + if consistentReadSupported { + w.waitingUntilFresh.Add() + defer w.waitingUntilFresh.Remove() + } startTime := w.clock.Now() defer func() { if resourceVersion > 0 { @@ -540,15 +543,7 @@ func (w *watchCache) WaitUntilFreshAndGetKeys(ctx context.Context, resourceVersi consistentReadSupported := delegator.ConsistentReadSupported() w.RLock() defer w.RUnlock() - if resourceVersion > 0 && resourceVersion > w.resourceVersion { - if consistentReadSupported { - w.waitingUntilFresh.Add() - err = w.waitUntilFreshLocked(ctx, resourceVersion) - w.waitingUntilFresh.Remove() - } else { - err = w.waitUntilFreshLocked(ctx, resourceVersion) - } - } + err = w.waitUntilFreshLocked(ctx, consistentReadSupported, resourceVersion) if err != nil { return nil, err } @@ -595,15 +590,7 @@ func (w *watchCache) waitAndListExactRV(ctx context.Context, key, continueKey st consistentReadSupported := delegator.ConsistentReadSupported() w.RLock() defer w.RUnlock() - if resourceVersion > 0 && resourceVersion > w.resourceVersion { - if consistentReadSupported { - w.waitingUntilFresh.Add() - err = w.waitUntilFreshLocked(ctx, resourceVersion) - w.waitingUntilFresh.Remove() - } else { - err = w.waitUntilFreshLocked(ctx, resourceVersion) - } - } + err = w.waitUntilFreshLocked(ctx, consistentReadSupported, resourceVersion) if err != nil { return listResp{}, "", err } @@ -634,15 +621,7 @@ func (w *watchCache) waitAndListLatestRV(ctx context.Context, resourceVersion ui consistentReadSupported := delegator.ConsistentReadSupported() w.RLock() defer w.RUnlock() - if resourceVersion > 0 && resourceVersion > w.resourceVersion { - if consistentReadSupported { - w.waitingUntilFresh.Add() - err = w.waitUntilFreshLocked(ctx, resourceVersion) - w.waitingUntilFresh.Remove() - } else { - err = w.waitUntilFreshLocked(ctx, resourceVersion) - } - } + err = w.waitUntilFreshLocked(ctx, consistentReadSupported, resourceVersion) if err != nil { return listResp{}, "", err } @@ -694,19 +673,10 @@ func (w *watchCache) notFresh(resourceVersion uint64) bool { // WaitUntilFreshAndGet returns a pointers to object. func (w *watchCache) WaitUntilFreshAndGet(ctx context.Context, resourceVersion uint64, key string) (interface{}, bool, uint64, error) { - var err error consistentReadSupported := delegator.ConsistentReadSupported() w.RLock() defer w.RUnlock() - if resourceVersion > 0 && resourceVersion > w.resourceVersion { - if consistentReadSupported { - w.waitingUntilFresh.Add() - err = w.waitUntilFreshLocked(ctx, resourceVersion) - w.waitingUntilFresh.Remove() - } else { - err = w.waitUntilFreshLocked(ctx, resourceVersion) - } - } + err := w.waitUntilFreshLocked(ctx, consistentReadSupported, resourceVersion) if err != nil { return nil, false, 0, err }