Reduce times lock is aquired in watch cache during reads

This commit is contained in:
Marek Siarkowicz 2026-06-04 09:45:23 +02:00
parent fbcbb81625
commit f4555eaa7e
4 changed files with 109 additions and 68 deletions

View file

@ -1305,13 +1305,20 @@ func (c *Cacher) waitUntilWatchCacheFreshAndForceAllEvents(ctx context.Context,
//
// In this very rare scenario, the worst case will be that this
// request will wait for 3 seconds before it fails.
if etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress) && c.watchCache.notFresh(requestedWatchRV) {
c.watchCache.waitingUntilFresh.Add()
defer c.watchCache.waitingUntilFresh.Remove()
}
err := c.watchCache.waitUntilFreshAndBlock(ctx, requestedWatchRV)
consistentReadSupported := delegator.ConsistentReadSupported()
c.watchCache.RLock()
defer c.watchCache.RUnlock()
return err
if requestedWatchRV > 0 && requestedWatchRV > c.watchCache.resourceVersion {
if consistentReadSupported {
c.watchCache.waitingUntilFresh.Add()
err := c.watchCache.waitUntilFreshLocked(ctx, requestedWatchRV)
c.watchCache.waitingUntilFresh.Remove()
return err
} else {
return c.watchCache.waitUntilFreshLocked(ctx, requestedWatchRV)
}
}
return nil
}
return nil
}

View file

@ -160,12 +160,12 @@ func compactWatch(c *CacheDelegator, client *clientv3.Client) storagetesting.Com
t.Fatal(err)
}
err = c.cacher.watchCache.waitUntilFreshAndBlock(context.TODO(), rv)
c.cacher.watchCache.RLock()
err = c.cacher.watchCache.waitUntilFreshLocked(context.TODO(), rv)
c.cacher.watchCache.RUnlock()
if err != nil {
t.Fatalf("WatchCache didn't caught up to RV: %v", rv)
}
c.cacher.watchCache.RUnlock()
c.cacher.watchCache.Lock()
defer c.cacher.watchCache.Unlock()
c.cacher.Lock()

View file

@ -441,10 +441,12 @@ func (w *watchCache) List() []interface{} {
return w.store.List()
}
// waitUntilFreshAndBlock waits until cache is at least as fresh as given <resourceVersion>.
// NOTE: This function acquired lock and doesn't release it.
// You HAVE TO explicitly call w.RUnlock() after this function.
func (w *watchCache) waitUntilFreshAndBlock(ctx context.Context, resourceVersion uint64) error {
// waitUntilFreshLocked waits until cache is at least as fresh as given resourceVersion.
func (w *watchCache) waitUntilFreshLocked(ctx context.Context, resourceVersion uint64) error {
if resourceVersion == 0 || resourceVersion <= w.resourceVersion {
return nil
}
startTime := w.clock.Now()
defer func() {
if resourceVersion > 0 {
@ -473,7 +475,6 @@ func (w *watchCache) waitUntilFreshAndBlock(ctx context.Context, resourceVersion
}()
}
w.RLock()
span := tracing.SpanFromContext(ctx)
span.AddEvent("watchCache locked acquired")
for w.resourceVersion < resourceVersion {
@ -536,15 +537,18 @@ func (c *watchCache) waitUntilFreshAndGetList(ctx context.Context, key string, o
// WaitUntilFreshAndList returns list of pointers to `storeElement` objects along
// with their ResourceVersion and the name of the index, if any, that was used.
func (w *watchCache) WaitUntilFreshAndGetKeys(ctx context.Context, resourceVersion uint64) (keys []string, err error) {
if delegator.ConsistentReadSupported() && w.notFresh(resourceVersion) {
w.waitingUntilFresh.Add()
err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
w.waitingUntilFresh.Remove()
} else {
err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
}
consistentReadSupported := delegator.ConsistentReadSupported()
w.RLock()
defer w.RUnlock()
if resourceVersion > 0 && resourceVersion > w.resourceVersion {
if consistentReadSupported {
w.waitingUntilFresh.Add()
err = w.waitUntilFreshLocked(ctx, resourceVersion)
w.waitingUntilFresh.Remove()
} else {
err = w.waitUntilFreshLocked(ctx, resourceVersion)
}
}
if err != nil {
return nil, err
}
@ -588,14 +592,18 @@ func (w *watchCache) waitUntilFreshAndList(ctx context.Context, key string, opts
}
func (w *watchCache) waitAndListExactRV(ctx context.Context, key, continueKey string, resourceVersion uint64) (resp listResp, index string, err error) {
if delegator.ConsistentReadSupported() && w.notFresh(resourceVersion) {
w.waitingUntilFresh.Add()
err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
w.waitingUntilFresh.Remove()
} else {
err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
}
consistentReadSupported := delegator.ConsistentReadSupported()
w.RLock()
defer w.RUnlock()
if resourceVersion > 0 && resourceVersion > w.resourceVersion {
if consistentReadSupported {
w.waitingUntilFresh.Add()
err = w.waitUntilFreshLocked(ctx, resourceVersion)
w.waitingUntilFresh.Remove()
} else {
err = w.waitUntilFreshLocked(ctx, resourceVersion)
}
}
if err != nil {
return listResp{}, "", err
}
@ -623,14 +631,18 @@ func (w *watchCache) waitAndListConsistent(ctx context.Context, key, continueKey
}
func (w *watchCache) waitAndListLatestRV(ctx context.Context, resourceVersion uint64, key, continueKey string, matchValues []storage.MatchValue) (resp listResp, index string, err error) {
if delegator.ConsistentReadSupported() && w.notFresh(resourceVersion) {
w.waitingUntilFresh.Add()
err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
w.waitingUntilFresh.Remove()
} else {
err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
}
consistentReadSupported := delegator.ConsistentReadSupported()
w.RLock()
defer w.RUnlock()
if resourceVersion > 0 && resourceVersion > w.resourceVersion {
if consistentReadSupported {
w.waitingUntilFresh.Add()
err = w.waitUntilFreshLocked(ctx, resourceVersion)
w.waitingUntilFresh.Remove()
} else {
err = w.waitUntilFreshLocked(ctx, resourceVersion)
}
}
if err != nil {
return listResp{}, "", err
}
@ -683,14 +695,18 @@ func (w *watchCache) notFresh(resourceVersion uint64) bool {
// WaitUntilFreshAndGet returns a pointers to <storeElement> object.
func (w *watchCache) WaitUntilFreshAndGet(ctx context.Context, resourceVersion uint64, key string) (interface{}, bool, uint64, error) {
var err error
if delegator.ConsistentReadSupported() && w.notFresh(resourceVersion) {
w.waitingUntilFresh.Add()
err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
w.waitingUntilFresh.Remove()
} else {
err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
}
consistentReadSupported := delegator.ConsistentReadSupported()
w.RLock()
defer w.RUnlock()
if resourceVersion > 0 && resourceVersion > w.resourceVersion {
if consistentReadSupported {
w.waitingUntilFresh.Add()
err = w.waitUntilFreshLocked(ctx, resourceVersion)
w.waitingUntilFresh.Remove()
} else {
err = w.waitUntilFreshLocked(ctx, resourceVersion)
}
}
if err != nil {
return nil, false, 0, err
}

View file

@ -1231,59 +1231,77 @@ func TestHistogramCacheReadWait(t *testing.T) {
testedMetrics := "apiserver_watch_cache_read_wait_seconds"
store := newTestWatchCache(2, DefaultEventFreshDuration, &cache.Indexers{})
defer store.Stop()
// In background, update the store.
go func() {
if err := store.Add(makeTestPod("foo", 2)); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := store.Add(makeTestPod("bar", 5)); err != nil {
t.Errorf("unexpected error: %v", err)
}
}()
fakeClock := store.clock.(*testingclock.FakeClock)
testCases := []struct {
desc string
resourceVersion uint64
run func(t *testing.T)
want string
}{
{
desc: "resourceVersion is non-zero",
resourceVersion: 5,
run: func(t *testing.T) {
if err := store.Add(makeTestPod("foo", 2)); err != nil {
t.Errorf("unexpected error: %v", err)
}
getCompleted := make(chan struct{})
go func() {
defer close(getCompleted)
if _, _, _, err := store.WaitUntilFreshAndGet(ctx, 5, "prefix/ns/bar"); err != nil {
t.Errorf("unexpected error: %v", err)
}
}()
time.Sleep(10 * time.Millisecond)
fakeClock.Step(1 * time.Second)
if err := store.Add(makeTestPod("bar", 5)); err != nil {
t.Errorf("unexpected error: %v", err)
}
<-getCompleted
},
want: `
# HELP apiserver_watch_cache_read_wait_seconds [ALPHA] Histogram of time spent waiting for a watch cache to become fresh.
# TYPE apiserver_watch_cache_read_wait_seconds histogram
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.005"} 1
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.025"} 1
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.05"} 1
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.1"} 1
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.2"} 1
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.4"} 1
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.6"} 1
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.8"} 1
# HELP apiserver_watch_cache_read_wait_seconds [ALPHA] Histogram of time spent waiting for a watch cache to become fresh.
# TYPE apiserver_watch_cache_read_wait_seconds histogram
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.005"} 0
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.025"} 0
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.05"} 0
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.1"} 0
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.2"} 0
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.4"} 0
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.6"} 0
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.8"} 0
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="1"} 1
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="1.25"} 1
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="1.5"} 1
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="2"} 1
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="3"} 1
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="+Inf"} 1
apiserver_watch_cache_read_wait_seconds_sum{group="",resource="pods"} 0
apiserver_watch_cache_read_wait_seconds_sum{group="",resource="pods"} 1
apiserver_watch_cache_read_wait_seconds_count{group="",resource="pods"} 1
`,
},
{
desc: "resourceVersion is 0",
resourceVersion: 0,
want: ``,
run: func(t *testing.T) {
if _, _, _, err := store.WaitUntilFreshAndGet(ctx, 0, "prefix/ns/bar"); err != nil {
t.Errorf("unexpected error: %v", err)
}
},
want: ``,
},
}
for _, test := range testCases {
t.Run(test.desc, func(t *testing.T) {
defer registry.Reset()
if _, _, _, err := store.WaitUntilFreshAndGet(ctx, test.resourceVersion, "prefix/ns/bar"); err != nil {
t.Errorf("unexpected error: %v", err)
}
test.run(t)
if err := testutil.GatherAndCompare(registry, strings.NewReader(test.want), testedMetrics); err != nil {
t.Errorf("unexpected error: %v", err)
}