mirror of
https://github.com/kubernetes/kubernetes.git
synced 2026-06-09 00:34:10 -04:00
Merge pull request #139533 from serathius/watchcache-snapshot-lock
List snapshot data outside of lock
This commit is contained in:
commit
de7f0f85c8
1 changed files with 20 additions and 12 deletions
|
|
@ -587,21 +587,10 @@ func (w *watchCache) waitUntilFreshAndList(ctx context.Context, key string, opts
|
|||
}
|
||||
|
||||
func (w *watchCache) waitAndListExactRV(ctx context.Context, key, continueKey string, resourceVersion uint64) (resp listResp, index string, err error) {
|
||||
consistentReadSupported := delegator.ConsistentReadSupported()
|
||||
w.RLock()
|
||||
defer w.RUnlock()
|
||||
err = w.waitUntilFreshLocked(ctx, consistentReadSupported, resourceVersion)
|
||||
store, err := w.waitAndGetExactSnapshot(ctx, resourceVersion)
|
||||
if err != nil {
|
||||
return listResp{}, "", err
|
||||
}
|
||||
|
||||
if w.snapshots == nil {
|
||||
return listResp{}, "", errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d", resourceVersion))
|
||||
}
|
||||
store, ok := w.snapshots.GetLessOrEqual(resourceVersion)
|
||||
if !ok {
|
||||
return listResp{}, "", errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d", resourceVersion))
|
||||
}
|
||||
items := store.OrderedListPrefix(key, continueKey)
|
||||
return listResp{
|
||||
Items: items,
|
||||
|
|
@ -609,6 +598,25 @@ func (w *watchCache) waitAndListExactRV(ctx context.Context, key, continueKey st
|
|||
}, "", nil
|
||||
}
|
||||
|
||||
func (w *watchCache) waitAndGetExactSnapshot(ctx context.Context, resourceVersion uint64) (store store.OrderedLister, err error) {
|
||||
consistentReadSupported := delegator.ConsistentReadSupported()
|
||||
w.RLock()
|
||||
defer w.RUnlock()
|
||||
err = w.waitUntilFreshLocked(ctx, consistentReadSupported, resourceVersion)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if w.snapshots == nil {
|
||||
return nil, errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d", resourceVersion))
|
||||
}
|
||||
store, ok := w.snapshots.GetLessOrEqual(resourceVersion)
|
||||
if !ok {
|
||||
return nil, errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d", resourceVersion))
|
||||
}
|
||||
return store, nil
|
||||
}
|
||||
|
||||
func (w *watchCache) waitAndListConsistent(ctx context.Context, key, continueKey string, matchValues []storage.MatchValue) (resp listResp, index string, err error) {
|
||||
resourceVersion, err := w.getCurrentRV(ctx)
|
||||
if err != nil {
|
||||
|
|
|
|||
Loading…
Reference in a new issue