mirror of
https://github.com/kubernetes/kubernetes.git
synced 2026-06-08 16:30:57 -04:00
Merge pull request #139495 from serathius/watchcache-locking
Reduce times lock is aquired in watch cache during reads
This commit is contained in:
commit
6ebbd2fc51
4 changed files with 109 additions and 68 deletions
|
|
@ -1305,13 +1305,20 @@ func (c *Cacher) waitUntilWatchCacheFreshAndForceAllEvents(ctx context.Context,
|
|||
//
|
||||
// In this very rare scenario, the worst case will be that this
|
||||
// request will wait for 3 seconds before it fails.
|
||||
if etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress) && c.watchCache.notFresh(requestedWatchRV) {
|
||||
c.watchCache.waitingUntilFresh.Add()
|
||||
defer c.watchCache.waitingUntilFresh.Remove()
|
||||
}
|
||||
err := c.watchCache.waitUntilFreshAndBlock(ctx, requestedWatchRV)
|
||||
consistentReadSupported := delegator.ConsistentReadSupported()
|
||||
c.watchCache.RLock()
|
||||
defer c.watchCache.RUnlock()
|
||||
return err
|
||||
if requestedWatchRV > 0 && requestedWatchRV > c.watchCache.resourceVersion {
|
||||
if consistentReadSupported {
|
||||
c.watchCache.waitingUntilFresh.Add()
|
||||
err := c.watchCache.waitUntilFreshLocked(ctx, requestedWatchRV)
|
||||
c.watchCache.waitingUntilFresh.Remove()
|
||||
return err
|
||||
} else {
|
||||
return c.watchCache.waitUntilFreshLocked(ctx, requestedWatchRV)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -160,12 +160,12 @@ func compactWatch(c *CacheDelegator, client *clientv3.Client) storagetesting.Com
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = c.cacher.watchCache.waitUntilFreshAndBlock(context.TODO(), rv)
|
||||
c.cacher.watchCache.RLock()
|
||||
err = c.cacher.watchCache.waitUntilFreshLocked(context.TODO(), rv)
|
||||
c.cacher.watchCache.RUnlock()
|
||||
if err != nil {
|
||||
t.Fatalf("WatchCache didn't caught up to RV: %v", rv)
|
||||
}
|
||||
c.cacher.watchCache.RUnlock()
|
||||
|
||||
c.cacher.watchCache.Lock()
|
||||
defer c.cacher.watchCache.Unlock()
|
||||
c.cacher.Lock()
|
||||
|
|
|
|||
|
|
@ -441,10 +441,12 @@ func (w *watchCache) List() []interface{} {
|
|||
return w.store.List()
|
||||
}
|
||||
|
||||
// waitUntilFreshAndBlock waits until cache is at least as fresh as given <resourceVersion>.
|
||||
// NOTE: This function acquired lock and doesn't release it.
|
||||
// You HAVE TO explicitly call w.RUnlock() after this function.
|
||||
func (w *watchCache) waitUntilFreshAndBlock(ctx context.Context, resourceVersion uint64) error {
|
||||
// waitUntilFreshLocked waits until cache is at least as fresh as given resourceVersion.
|
||||
func (w *watchCache) waitUntilFreshLocked(ctx context.Context, resourceVersion uint64) error {
|
||||
if resourceVersion == 0 || resourceVersion <= w.resourceVersion {
|
||||
return nil
|
||||
}
|
||||
|
||||
startTime := w.clock.Now()
|
||||
defer func() {
|
||||
if resourceVersion > 0 {
|
||||
|
|
@ -473,7 +475,6 @@ func (w *watchCache) waitUntilFreshAndBlock(ctx context.Context, resourceVersion
|
|||
}()
|
||||
}
|
||||
|
||||
w.RLock()
|
||||
span := tracing.SpanFromContext(ctx)
|
||||
span.AddEvent("watchCache locked acquired")
|
||||
for w.resourceVersion < resourceVersion {
|
||||
|
|
@ -536,15 +537,18 @@ func (c *watchCache) waitUntilFreshAndGetList(ctx context.Context, key string, o
|
|||
// WaitUntilFreshAndList returns list of pointers to `storeElement` objects along
|
||||
// with their ResourceVersion and the name of the index, if any, that was used.
|
||||
func (w *watchCache) WaitUntilFreshAndGetKeys(ctx context.Context, resourceVersion uint64) (keys []string, err error) {
|
||||
if delegator.ConsistentReadSupported() && w.notFresh(resourceVersion) {
|
||||
w.waitingUntilFresh.Add()
|
||||
err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
|
||||
w.waitingUntilFresh.Remove()
|
||||
} else {
|
||||
err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
|
||||
}
|
||||
|
||||
consistentReadSupported := delegator.ConsistentReadSupported()
|
||||
w.RLock()
|
||||
defer w.RUnlock()
|
||||
if resourceVersion > 0 && resourceVersion > w.resourceVersion {
|
||||
if consistentReadSupported {
|
||||
w.waitingUntilFresh.Add()
|
||||
err = w.waitUntilFreshLocked(ctx, resourceVersion)
|
||||
w.waitingUntilFresh.Remove()
|
||||
} else {
|
||||
err = w.waitUntilFreshLocked(ctx, resourceVersion)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -588,14 +592,18 @@ func (w *watchCache) waitUntilFreshAndList(ctx context.Context, key string, opts
|
|||
}
|
||||
|
||||
func (w *watchCache) waitAndListExactRV(ctx context.Context, key, continueKey string, resourceVersion uint64) (resp listResp, index string, err error) {
|
||||
if delegator.ConsistentReadSupported() && w.notFresh(resourceVersion) {
|
||||
w.waitingUntilFresh.Add()
|
||||
err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
|
||||
w.waitingUntilFresh.Remove()
|
||||
} else {
|
||||
err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
|
||||
}
|
||||
consistentReadSupported := delegator.ConsistentReadSupported()
|
||||
w.RLock()
|
||||
defer w.RUnlock()
|
||||
if resourceVersion > 0 && resourceVersion > w.resourceVersion {
|
||||
if consistentReadSupported {
|
||||
w.waitingUntilFresh.Add()
|
||||
err = w.waitUntilFreshLocked(ctx, resourceVersion)
|
||||
w.waitingUntilFresh.Remove()
|
||||
} else {
|
||||
err = w.waitUntilFreshLocked(ctx, resourceVersion)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return listResp{}, "", err
|
||||
}
|
||||
|
|
@ -623,14 +631,18 @@ func (w *watchCache) waitAndListConsistent(ctx context.Context, key, continueKey
|
|||
}
|
||||
|
||||
func (w *watchCache) waitAndListLatestRV(ctx context.Context, resourceVersion uint64, key, continueKey string, matchValues []storage.MatchValue) (resp listResp, index string, err error) {
|
||||
if delegator.ConsistentReadSupported() && w.notFresh(resourceVersion) {
|
||||
w.waitingUntilFresh.Add()
|
||||
err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
|
||||
w.waitingUntilFresh.Remove()
|
||||
} else {
|
||||
err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
|
||||
}
|
||||
consistentReadSupported := delegator.ConsistentReadSupported()
|
||||
w.RLock()
|
||||
defer w.RUnlock()
|
||||
if resourceVersion > 0 && resourceVersion > w.resourceVersion {
|
||||
if consistentReadSupported {
|
||||
w.waitingUntilFresh.Add()
|
||||
err = w.waitUntilFreshLocked(ctx, resourceVersion)
|
||||
w.waitingUntilFresh.Remove()
|
||||
} else {
|
||||
err = w.waitUntilFreshLocked(ctx, resourceVersion)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return listResp{}, "", err
|
||||
}
|
||||
|
|
@ -683,14 +695,18 @@ func (w *watchCache) notFresh(resourceVersion uint64) bool {
|
|||
// WaitUntilFreshAndGet returns a pointers to <storeElement> object.
|
||||
func (w *watchCache) WaitUntilFreshAndGet(ctx context.Context, resourceVersion uint64, key string) (interface{}, bool, uint64, error) {
|
||||
var err error
|
||||
if delegator.ConsistentReadSupported() && w.notFresh(resourceVersion) {
|
||||
w.waitingUntilFresh.Add()
|
||||
err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
|
||||
w.waitingUntilFresh.Remove()
|
||||
} else {
|
||||
err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
|
||||
}
|
||||
consistentReadSupported := delegator.ConsistentReadSupported()
|
||||
w.RLock()
|
||||
defer w.RUnlock()
|
||||
if resourceVersion > 0 && resourceVersion > w.resourceVersion {
|
||||
if consistentReadSupported {
|
||||
w.waitingUntilFresh.Add()
|
||||
err = w.waitUntilFreshLocked(ctx, resourceVersion)
|
||||
w.waitingUntilFresh.Remove()
|
||||
} else {
|
||||
err = w.waitUntilFreshLocked(ctx, resourceVersion)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return nil, false, 0, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1231,59 +1231,77 @@ func TestHistogramCacheReadWait(t *testing.T) {
|
|||
testedMetrics := "apiserver_watch_cache_read_wait_seconds"
|
||||
store := newTestWatchCache(2, DefaultEventFreshDuration, &cache.Indexers{})
|
||||
defer store.Stop()
|
||||
|
||||
// In background, update the store.
|
||||
go func() {
|
||||
if err := store.Add(makeTestPod("foo", 2)); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if err := store.Add(makeTestPod("bar", 5)); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
}()
|
||||
fakeClock := store.clock.(*testingclock.FakeClock)
|
||||
|
||||
testCases := []struct {
|
||||
desc string
|
||||
resourceVersion uint64
|
||||
run func(t *testing.T)
|
||||
want string
|
||||
}{
|
||||
{
|
||||
desc: "resourceVersion is non-zero",
|
||||
resourceVersion: 5,
|
||||
run: func(t *testing.T) {
|
||||
if err := store.Add(makeTestPod("foo", 2)); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
getCompleted := make(chan struct{})
|
||||
go func() {
|
||||
defer close(getCompleted)
|
||||
if _, _, _, err := store.WaitUntilFreshAndGet(ctx, 5, "prefix/ns/bar"); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
fakeClock.Step(1 * time.Second)
|
||||
|
||||
if err := store.Add(makeTestPod("bar", 5)); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
<-getCompleted
|
||||
},
|
||||
want: `
|
||||
# HELP apiserver_watch_cache_read_wait_seconds [ALPHA] Histogram of time spent waiting for a watch cache to become fresh.
|
||||
# TYPE apiserver_watch_cache_read_wait_seconds histogram
|
||||
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.005"} 1
|
||||
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.025"} 1
|
||||
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.05"} 1
|
||||
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.1"} 1
|
||||
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.2"} 1
|
||||
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.4"} 1
|
||||
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.6"} 1
|
||||
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.8"} 1
|
||||
# HELP apiserver_watch_cache_read_wait_seconds [ALPHA] Histogram of time spent waiting for a watch cache to become fresh.
|
||||
# TYPE apiserver_watch_cache_read_wait_seconds histogram
|
||||
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.005"} 0
|
||||
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.025"} 0
|
||||
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.05"} 0
|
||||
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.1"} 0
|
||||
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.2"} 0
|
||||
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.4"} 0
|
||||
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.6"} 0
|
||||
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="0.8"} 0
|
||||
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="1"} 1
|
||||
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="1.25"} 1
|
||||
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="1.5"} 1
|
||||
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="2"} 1
|
||||
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="3"} 1
|
||||
apiserver_watch_cache_read_wait_seconds_bucket{group="",resource="pods",le="+Inf"} 1
|
||||
apiserver_watch_cache_read_wait_seconds_sum{group="",resource="pods"} 0
|
||||
apiserver_watch_cache_read_wait_seconds_sum{group="",resource="pods"} 1
|
||||
apiserver_watch_cache_read_wait_seconds_count{group="",resource="pods"} 1
|
||||
`,
|
||||
},
|
||||
{
|
||||
desc: "resourceVersion is 0",
|
||||
resourceVersion: 0,
|
||||
want: ``,
|
||||
run: func(t *testing.T) {
|
||||
if _, _, _, err := store.WaitUntilFreshAndGet(ctx, 0, "prefix/ns/bar"); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
},
|
||||
want: ``,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range testCases {
|
||||
t.Run(test.desc, func(t *testing.T) {
|
||||
defer registry.Reset()
|
||||
if _, _, _, err := store.WaitUntilFreshAndGet(ctx, test.resourceVersion, "prefix/ns/bar"); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
test.run(t)
|
||||
if err := testutil.GatherAndCompare(registry, strings.NewReader(test.want), testedMetrics); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue