Merge pull request #139532 from serathius/watchcache-waituntilfresh-cleanup

Cleanup if statements by making waitUntilFreshLocked to handle sending progress notifies if consistent read is supported
This commit is contained in:
Kubernetes Prow Robot 2026-06-06 22:14:27 +05:30 committed by GitHub
commit 643e407efe
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 11 additions and 51 deletions

View file

@ -1308,17 +1308,7 @@ func (c *Cacher) waitUntilWatchCacheFreshAndForceAllEvents(ctx context.Context,
consistentReadSupported := delegator.ConsistentReadSupported()
c.watchCache.RLock()
defer c.watchCache.RUnlock()
if requestedWatchRV > 0 && requestedWatchRV > c.watchCache.resourceVersion {
if consistentReadSupported {
c.watchCache.waitingUntilFresh.Add()
err := c.watchCache.waitUntilFreshLocked(ctx, requestedWatchRV)
c.watchCache.waitingUntilFresh.Remove()
return err
} else {
return c.watchCache.waitUntilFreshLocked(ctx, requestedWatchRV)
}
}
return nil
return c.watchCache.waitUntilFreshLocked(ctx, consistentReadSupported, requestedWatchRV)
}
return nil
}

View file

@ -161,7 +161,7 @@ func compactWatch(c *CacheDelegator, client *clientv3.Client) storagetesting.Com
}
c.cacher.watchCache.RLock()
err = c.cacher.watchCache.waitUntilFreshLocked(context.TODO(), rv)
err = c.cacher.watchCache.waitUntilFreshLocked(context.TODO(), false, rv)
c.cacher.watchCache.RUnlock()
if err != nil {
t.Fatalf("WatchCache didn't caught up to RV: %v", rv)

View file

@ -442,11 +442,14 @@ func (w *watchCache) List() []interface{} {
}
// waitUntilFreshLocked waits until cache is at least as fresh as given resourceVersion.
func (w *watchCache) waitUntilFreshLocked(ctx context.Context, resourceVersion uint64) error {
func (w *watchCache) waitUntilFreshLocked(ctx context.Context, consistentReadSupported bool, resourceVersion uint64) error {
if resourceVersion == 0 || resourceVersion <= w.resourceVersion {
return nil
}
if consistentReadSupported {
w.waitingUntilFresh.Add()
defer w.waitingUntilFresh.Remove()
}
startTime := w.clock.Now()
defer func() {
if resourceVersion > 0 {
@ -540,15 +543,7 @@ func (w *watchCache) WaitUntilFreshAndGetKeys(ctx context.Context, resourceVersi
consistentReadSupported := delegator.ConsistentReadSupported()
w.RLock()
defer w.RUnlock()
if resourceVersion > 0 && resourceVersion > w.resourceVersion {
if consistentReadSupported {
w.waitingUntilFresh.Add()
err = w.waitUntilFreshLocked(ctx, resourceVersion)
w.waitingUntilFresh.Remove()
} else {
err = w.waitUntilFreshLocked(ctx, resourceVersion)
}
}
err = w.waitUntilFreshLocked(ctx, consistentReadSupported, resourceVersion)
if err != nil {
return nil, err
}
@ -595,15 +590,7 @@ func (w *watchCache) waitAndListExactRV(ctx context.Context, key, continueKey st
consistentReadSupported := delegator.ConsistentReadSupported()
w.RLock()
defer w.RUnlock()
if resourceVersion > 0 && resourceVersion > w.resourceVersion {
if consistentReadSupported {
w.waitingUntilFresh.Add()
err = w.waitUntilFreshLocked(ctx, resourceVersion)
w.waitingUntilFresh.Remove()
} else {
err = w.waitUntilFreshLocked(ctx, resourceVersion)
}
}
err = w.waitUntilFreshLocked(ctx, consistentReadSupported, resourceVersion)
if err != nil {
return listResp{}, "", err
}
@ -634,15 +621,7 @@ func (w *watchCache) waitAndListLatestRV(ctx context.Context, resourceVersion ui
consistentReadSupported := delegator.ConsistentReadSupported()
w.RLock()
defer w.RUnlock()
if resourceVersion > 0 && resourceVersion > w.resourceVersion {
if consistentReadSupported {
w.waitingUntilFresh.Add()
err = w.waitUntilFreshLocked(ctx, resourceVersion)
w.waitingUntilFresh.Remove()
} else {
err = w.waitUntilFreshLocked(ctx, resourceVersion)
}
}
err = w.waitUntilFreshLocked(ctx, consistentReadSupported, resourceVersion)
if err != nil {
return listResp{}, "", err
}
@ -694,19 +673,10 @@ func (w *watchCache) notFresh(resourceVersion uint64) bool {
// WaitUntilFreshAndGet returns a pointers to <storeElement> object.
func (w *watchCache) WaitUntilFreshAndGet(ctx context.Context, resourceVersion uint64, key string) (interface{}, bool, uint64, error) {
var err error
consistentReadSupported := delegator.ConsistentReadSupported()
w.RLock()
defer w.RUnlock()
if resourceVersion > 0 && resourceVersion > w.resourceVersion {
if consistentReadSupported {
w.waitingUntilFresh.Add()
err = w.waitUntilFreshLocked(ctx, resourceVersion)
w.waitingUntilFresh.Remove()
} else {
err = w.waitUntilFreshLocked(ctx, resourceVersion)
}
}
err := w.waitUntilFreshLocked(ctx, consistentReadSupported, resourceVersion)
if err != nil {
return nil, false, 0, err
}