diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 5d03f02c01e..8d5dd5bc903 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -1305,13 +1305,20 @@ func (c *Cacher) waitUntilWatchCacheFreshAndForceAllEvents(ctx context.Context, // // In this very rare scenario, the worst case will be that this // request will wait for 3 seconds before it fails. - if etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress) && c.watchCache.notFresh(requestedWatchRV) { - c.watchCache.waitingUntilFresh.Add() - defer c.watchCache.waitingUntilFresh.Remove() - } - err := c.watchCache.waitUntilFreshAndBlock(ctx, requestedWatchRV) + consistentReadSupported := delegator.ConsistentReadSupported() + c.watchCache.RLock() defer c.watchCache.RUnlock() - return err + if requestedWatchRV > 0 && requestedWatchRV > c.watchCache.resourceVersion { + if consistentReadSupported { + c.watchCache.waitingUntilFresh.Add() + err := c.watchCache.waitUntilFreshLocked(ctx, requestedWatchRV) + c.watchCache.waitingUntilFresh.Remove() + return err + } else { + return c.watchCache.waitUntilFreshLocked(ctx, requestedWatchRV) + } + } + return nil } return nil } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go index 12950c41c82..1c3ab10beea 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go @@ -160,12 +160,12 @@ func compactWatch(c *CacheDelegator, client *clientv3.Client) storagetesting.Com t.Fatal(err) } - err = c.cacher.watchCache.waitUntilFreshAndBlock(context.TODO(), rv) + c.cacher.watchCache.RLock() + err = c.cacher.watchCache.waitUntilFreshLocked(context.TODO(), rv) + c.cacher.watchCache.RUnlock() if err != nil { t.Fatalf("WatchCache didn't caught up to RV: %v", rv) } - c.cacher.watchCache.RUnlock() - c.cacher.watchCache.Lock() defer c.cacher.watchCache.Unlock() c.cacher.Lock() diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index 0a6b8757257..abd2c0b060a 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -441,10 +441,12 @@ func (w *watchCache) List() []interface{} { return w.store.List() } -// waitUntilFreshAndBlock waits until cache is at least as fresh as given . -// NOTE: This function acquired lock and doesn't release it. -// You HAVE TO explicitly call w.RUnlock() after this function. -func (w *watchCache) waitUntilFreshAndBlock(ctx context.Context, resourceVersion uint64) error { +// waitUntilFreshLocked waits until cache is at least as fresh as given resourceVersion. +func (w *watchCache) waitUntilFreshLocked(ctx context.Context, resourceVersion uint64) error { + if resourceVersion == 0 || resourceVersion <= w.resourceVersion { + return nil + } + startTime := w.clock.Now() defer func() { if resourceVersion > 0 { @@ -473,7 +475,6 @@ func (w *watchCache) waitUntilFreshAndBlock(ctx context.Context, resourceVersion }() } - w.RLock() span := tracing.SpanFromContext(ctx) span.AddEvent("watchCache locked acquired") for w.resourceVersion < resourceVersion { @@ -536,15 +537,18 @@ func (c *watchCache) waitUntilFreshAndGetList(ctx context.Context, key string, o // WaitUntilFreshAndList returns list of pointers to `storeElement` objects along // with their ResourceVersion and the name of the index, if any, that was used. func (w *watchCache) WaitUntilFreshAndGetKeys(ctx context.Context, resourceVersion uint64) (keys []string, err error) { - if delegator.ConsistentReadSupported() && w.notFresh(resourceVersion) { - w.waitingUntilFresh.Add() - err = w.waitUntilFreshAndBlock(ctx, resourceVersion) - w.waitingUntilFresh.Remove() - } else { - err = w.waitUntilFreshAndBlock(ctx, resourceVersion) - } - + consistentReadSupported := delegator.ConsistentReadSupported() + w.RLock() defer w.RUnlock() + if resourceVersion > 0 && resourceVersion > w.resourceVersion { + if consistentReadSupported { + w.waitingUntilFresh.Add() + err = w.waitUntilFreshLocked(ctx, resourceVersion) + w.waitingUntilFresh.Remove() + } else { + err = w.waitUntilFreshLocked(ctx, resourceVersion) + } + } if err != nil { return nil, err } @@ -588,14 +592,18 @@ func (w *watchCache) waitUntilFreshAndList(ctx context.Context, key string, opts } func (w *watchCache) waitAndListExactRV(ctx context.Context, key, continueKey string, resourceVersion uint64) (resp listResp, index string, err error) { - if delegator.ConsistentReadSupported() && w.notFresh(resourceVersion) { - w.waitingUntilFresh.Add() - err = w.waitUntilFreshAndBlock(ctx, resourceVersion) - w.waitingUntilFresh.Remove() - } else { - err = w.waitUntilFreshAndBlock(ctx, resourceVersion) - } + consistentReadSupported := delegator.ConsistentReadSupported() + w.RLock() defer w.RUnlock() + if resourceVersion > 0 && resourceVersion > w.resourceVersion { + if consistentReadSupported { + w.waitingUntilFresh.Add() + err = w.waitUntilFreshLocked(ctx, resourceVersion) + w.waitingUntilFresh.Remove() + } else { + err = w.waitUntilFreshLocked(ctx, resourceVersion) + } + } if err != nil { return listResp{}, "", err } @@ -623,14 +631,18 @@ func (w *watchCache) waitAndListConsistent(ctx context.Context, key, continueKey } func (w *watchCache) waitAndListLatestRV(ctx context.Context, resourceVersion uint64, key, continueKey string, matchValues []storage.MatchValue) (resp listResp, index string, err error) { - if delegator.ConsistentReadSupported() && w.notFresh(resourceVersion) { - w.waitingUntilFresh.Add() - err = w.waitUntilFreshAndBlock(ctx, resourceVersion) - w.waitingUntilFresh.Remove() - } else { - err = w.waitUntilFreshAndBlock(ctx, resourceVersion) - } + consistentReadSupported := delegator.ConsistentReadSupported() + w.RLock() defer w.RUnlock() + if resourceVersion > 0 && resourceVersion > w.resourceVersion { + if consistentReadSupported { + w.waitingUntilFresh.Add() + err = w.waitUntilFreshLocked(ctx, resourceVersion) + w.waitingUntilFresh.Remove() + } else { + err = w.waitUntilFreshLocked(ctx, resourceVersion) + } + } if err != nil { return listResp{}, "", err } @@ -683,14 +695,18 @@ func (w *watchCache) notFresh(resourceVersion uint64) bool { // WaitUntilFreshAndGet returns a pointers to object. func (w *watchCache) WaitUntilFreshAndGet(ctx context.Context, resourceVersion uint64, key string) (interface{}, bool, uint64, error) { var err error - if delegator.ConsistentReadSupported() && w.notFresh(resourceVersion) { - w.waitingUntilFresh.Add() - err = w.waitUntilFreshAndBlock(ctx, resourceVersion) - w.waitingUntilFresh.Remove() - } else { - err = w.waitUntilFreshAndBlock(ctx, resourceVersion) - } + consistentReadSupported := delegator.ConsistentReadSupported() + w.RLock() defer w.RUnlock() + if resourceVersion > 0 && resourceVersion > w.resourceVersion { + if consistentReadSupported { + w.waitingUntilFresh.Add() + err = w.waitUntilFreshLocked(ctx, resourceVersion) + w.waitingUntilFresh.Remove() + } else { + err = w.waitUntilFreshLocked(ctx, resourceVersion) + } + } if err != nil { return nil, false, 0, err } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go index 2ceb008c17b..8e7ba8c0c78 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go @@ -1231,59 +1231,77 @@ func TestHistogramCacheReadWait(t *testing.T) { testedMetrics := "apiserver_watch_cache_read_wait_seconds" store := newTestWatchCache(2, DefaultEventFreshDuration, &cache.Indexers{}) defer store.Stop() - - // In background, update the store. - go func() { - if err := store.Add(makeTestPod("foo", 2)); err != nil { - t.Errorf("unexpected error: %v", err) - } - if err := store.Add(makeTestPod("bar", 5)); err != nil { - t.Errorf("unexpected error: %v", err) - } - }() + fakeClock := store.clock.(*testingclock.FakeClock) testCases := []struct { desc string resourceVersion uint64 + run func(t *testing.T) want string }{ { desc: "resourceVersion is non-zero", resourceVersion: 5, + run: func(t *testing.T) { + if err := store.Add(makeTestPod("foo", 2)); err != nil { + t.Errorf("unexpected error: %v", err) + } + + getCompleted := make(chan struct{}) + go func() { + defer close(getCompleted) + if _, _, _, err := store.WaitUntilFreshAndGet(ctx, 5, "prefix/ns/bar"); err != nil { + t.Errorf("unexpected error: %v", err) + } + }() + + time.Sleep(10 * time.Millisecond) + + fakeClock.Step(1 * time.Second) + + if err := store.Add(makeTestPod("bar", 5)); err != nil { + t.Errorf("unexpected error: %v", err) + } + + <-getCompleted + }, want: ` - # HELP apiserver_watch_cache_read_wait_seconds [ALPHA] Histogram of time spent waiting for a watch cache to become fresh. - # TYPE apiserver_watch_cache_read_wait_seconds histogram - apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.005"} 1 - apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.025"} 1 - apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.05"} 1 - apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.1"} 1 - apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.2"} 1 - apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.4"} 1 - apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.6"} 1 - apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.8"} 1 + # HELP apiserver_watch_cache_read_wait_seconds [ALPHA] Histogram of time spent waiting for a watch cache to become fresh. + # TYPE apiserver_watch_cache_read_wait_seconds histogram + apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.005"} 0 + apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.025"} 0 + apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.05"} 0 + apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.1"} 0 + apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.2"} 0 + apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.4"} 0 + apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.6"} 0 + apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.8"} 0 apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="1"} 1 apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="1.25"} 1 apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="1.5"} 1 apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="2"} 1 apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="3"} 1 apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="+Inf"} 1 - apiserver_watch_cache_read_wait_seconds_sum{group="",resource="pods"} 0 + apiserver_watch_cache_read_wait_seconds_sum{group="",resource="pods"} 1 apiserver_watch_cache_read_wait_seconds_count{group="",resource="pods"} 1 `, }, { desc: "resourceVersion is 0", resourceVersion: 0, - want: ``, + run: func(t *testing.T) { + if _, _, _, err := store.WaitUntilFreshAndGet(ctx, 0, "prefix/ns/bar"); err != nil { + t.Errorf("unexpected error: %v", err) + } + }, + want: ``, }, } for _, test := range testCases { t.Run(test.desc, func(t *testing.T) { defer registry.Reset() - if _, _, _, err := store.WaitUntilFreshAndGet(ctx, test.resourceVersion, "prefix/ns/bar"); err != nil { - t.Errorf("unexpected error: %v", err) - } + test.run(t) if err := testutil.GatherAndCompare(registry, strings.NewReader(test.want), testedMetrics); err != nil { t.Errorf("unexpected error: %v", err) }