diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 1fc9f59f6ae..3c5e3e48ada 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -3155,7 +3155,7 @@ func (f fakeOrderedLister) Add(obj interface{}) error { return nil } func (f fakeOrderedLister) Update(obj interface{}) error { return nil } func (f fakeOrderedLister) Delete(obj interface{}) error { return nil } func (f fakeOrderedLister) Clone() store.OrderedLister { return f } -func (f fakeOrderedLister) ListPrefix(prefixKey, continueKey string) []interface{} { +func (f fakeOrderedLister) OrderedListPrefix(prefixKey, continueKey string) []interface{} { return nil } func (f fakeOrderedLister) Count(prefixKey, continueKey string) int { return 0 } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/store/store.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/store/store.go index 89de895d9c0..2a8175f5a8f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/store/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/store/store.go @@ -58,6 +58,11 @@ const ( btreeDegree = 16 ) +type OrderedIndexer interface { + Indexer + OrderedLister +} + type Indexer interface { Add(obj interface{}) error Update(obj interface{}) error @@ -71,12 +76,12 @@ type Indexer interface { } type OrderedLister interface { - ListPrefix(prefix, continueKey string) []interface{} + OrderedListPrefix(prefix, continueKey string) []interface{} Count(prefix, continueKey string) (count int) Clone() OrderedLister } -func NewIndexer(indexers *cache.Indexers) Indexer { +func NewIndexer(indexers *cache.Indexers) OrderedIndexer { return newThreadedBtreeStoreIndexer(ElementIndexers(indexers), btreeDegree) } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/store/store_btree.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/store/store_btree.go index 33e8e4f1024..c4309225521 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/store/store_btree.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/store/store_btree.go @@ -99,10 +99,10 @@ func (si *threadedStoreIndexer) List() []interface{} { return si.store.List() } -func (si *threadedStoreIndexer) ListPrefix(prefix, continueKey string) []interface{} { +func (si *threadedStoreIndexer) OrderedListPrefix(prefix, continueKey string) []interface{} { si.lock.RLock() defer si.lock.RUnlock() - return si.store.ListPrefix(prefix, continueKey) + return si.store.OrderedListPrefix(prefix, continueKey) } func (si *threadedStoreIndexer) ListKeys() []string { @@ -253,7 +253,7 @@ func (s *btreeStore) getByKey(key string) (item interface{}, exists bool, err er return item, exists, nil } -func (s *btreeStore) ListPrefix(prefix, continueKey string) []interface{} { +func (s *btreeStore) OrderedListPrefix(prefix, continueKey string) []interface{} { if continueKey == "" { continueKey = prefix } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/store/store_btree_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/store/store_btree_test.go index 2ea923d6441..d23951be0d9 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/store/store_btree_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/store/store_btree_test.go @@ -41,30 +41,30 @@ func TestStoreListPrefix(t *testing.T) { assert.NoError(t, store.Add(testStorageElement("foo2", "bar1", 3))) assert.NoError(t, store.Add(testStorageElement("bar", "baz", 4))) - items := store.ListPrefix("foo", "") + items := store.OrderedListPrefix("foo", "") assert.Equal(t, []interface{}{ testStorageElement("foo1", "bar2", 2), testStorageElement("foo2", "bar1", 3), testStorageElement("foo3", "bar3", 1), }, items) - items = store.ListPrefix("foo2", "") + items = store.OrderedListPrefix("foo2", "") assert.Equal(t, []interface{}{ testStorageElement("foo2", "bar1", 3), }, items) - items = store.ListPrefix("foo", "foo1\x00") + items = store.OrderedListPrefix("foo", "foo1\x00") assert.Equal(t, []interface{}{ testStorageElement("foo2", "bar1", 3), testStorageElement("foo3", "bar3", 1), }, items) - items = store.ListPrefix("foo", "foo2\x00") + items = store.OrderedListPrefix("foo", "foo2\x00") assert.Equal(t, []interface{}{ testStorageElement("foo3", "bar3", 1), }, items) - items = store.ListPrefix("bar", "") + items = store.OrderedListPrefix("bar", "") assert.Equal(t, []interface{}{ testStorageElement("bar", "baz", 4), }, items) @@ -133,7 +133,7 @@ func (f fakeOrderedLister) Add(obj interface{}) error { return nil } func (f fakeOrderedLister) Update(obj interface{}) error { return nil } func (f fakeOrderedLister) Delete(obj interface{}) error { return nil } func (f fakeOrderedLister) Clone() OrderedLister { return f } -func (f fakeOrderedLister) ListPrefix(prefixKey, continueKey string) []interface{} { +func (f fakeOrderedLister) OrderedListPrefix(prefixKey, continueKey string) []interface{} { return nil } func (f fakeOrderedLister) Count(prefixKey, continueKey string) int { return 0 } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index fc73708a8ce..0a6b8757257 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -122,7 +122,7 @@ type watchCache struct { // history" i.e. from the moment just after the newest cached watched event. // It is necessary to effectively allow clients to start watching at now. // NOTE: We assume that is thread-safe. - store store.Indexer + store store.OrderedIndexer // ResourceVersion up to which the watchCache is propagated. resourceVersion uint64 @@ -332,13 +332,11 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd return err } if w.snapshots != nil && w.snapshottingEnabled.Load() { - if orderedLister, ordered := w.store.(store.OrderedLister); ordered { - if w.isCacheFullLocked() { - oldestRV := w.cache[w.startIndex%w.capacity].ResourceVersion - w.snapshots.RemoveLess(oldestRV) - } - w.snapshots.Add(w.resourceVersion, orderedLister) + if w.isCacheFullLocked() { + oldestRV := w.cache[w.startIndex%w.capacity].ResourceVersion + w.snapshots.RemoveLess(oldestRV) } + w.snapshots.Add(w.resourceVersion, w.store) } return err }(); err != nil { @@ -609,7 +607,7 @@ func (w *watchCache) waitAndListExactRV(ctx context.Context, key, continueKey st if !ok { return listResp{}, "", errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d", resourceVersion)) } - items := store.ListPrefix(key, continueKey) + items := store.OrderedListPrefix(key, continueKey) return listResp{ Items: items, ResourceVersion: resourceVersion, @@ -653,19 +651,11 @@ func (w *watchCache) listLatestRV(key, continueKey string, matchValues []storage }, matchValue.IndexName, err } } - if store, ok := w.store.(store.OrderedLister); ok { - result := store.ListPrefix(key, continueKey) - return listResp{ - Items: result, - ResourceVersion: w.resourceVersion, - }, "", nil - } - result := w.store.List() - result, err = filterPrefixAndOrder(key, result) + result := w.store.OrderedListPrefix(key, continueKey) return listResp{ Items: result, ResourceVersion: w.resourceVersion, - }, "", err + }, "", nil } func filterPrefixAndOrder(prefix string, items []interface{}) ([]interface{}, error) { @@ -778,8 +768,8 @@ func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error { } if w.snapshots != nil { w.snapshots.Reset() - if orderedLister, ordered := w.store.(store.OrderedLister); ordered && w.snapshottingEnabled.Load() { - w.snapshots.Add(version, orderedLister) + if w.snapshottingEnabled.Load() { + w.snapshots.Add(version, w.store) } } w.listResourceVersion = version diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval.go index 4192566ab72..2698ac7e335 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval.go @@ -18,7 +18,6 @@ package cacher import ( "fmt" - "sort" "sync" "k8s.io/apimachinery/pkg/watch" @@ -119,20 +118,6 @@ func newCacheInterval(startIndex, endIndex int, indexer indexerFunc, indexValida } } -type sortableWatchCacheEvents []*watchCacheEvent - -func (s sortableWatchCacheEvents) Len() int { - return len(s) -} - -func (s sortableWatchCacheEvents) Less(i, j int) bool { - return s[i].Key < s[j].Key -} - -func (s sortableWatchCacheEvents) Swap(i, j int) { - s[i], s[j] = s[j], s[i] -} - // newCacheIntervalFromStore is meant to handle the case of rv=0, such that the events // returned by Next() need to be events from a List() done on the underlying store of // the watch cache. @@ -168,9 +153,6 @@ func newCacheIntervalFromStore(resourceVersion uint64, indexer store.Indexer, ke } buffer.endIndex++ } - if _, ordered := indexer.(store.OrderedLister); !ordered { - sort.Sort(sortableWatchCacheEvents(buffer.buffer)) - } ci := &watchCacheInterval{ startIndex: 0, // Simulate that we already have all the events we're looking for. diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval_test.go index f5139067c46..97054527d25 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval_test.go @@ -435,9 +435,8 @@ func TestCacheIntervalNextFromStore(t *testing.T) { func TestCacheIntervalFromStoreSorted(t *testing.T) { cases := []struct { name string - indexer store.Indexer + indexer store.OrderedIndexer }{ - {"legacy", cache.NewIndexer(store.ElementKey, store.ElementIndexers(nil))}, {"btree", store.NewIndexer(nil)}, } for _, tc := range cases { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go index 6b88daade87..2ceb008c17b 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go @@ -1314,7 +1314,7 @@ func TestCacheSnapshots(t *testing.T) { assert.False(t, found, "Expected store to not include rev 99") lister, found := s.snapshots.GetLessOrEqual(100) assert.True(t, found, "Expected store to not include rev 100") - elements := lister.ListPrefix("", "") + elements := lister.OrderedListPrefix("", "") assert.Len(t, elements, 1) assert.Equal(t, makeTestPod("foo", 100), elements[0].(*store.Element).Object) @@ -1326,20 +1326,20 @@ func TestCacheSnapshots(t *testing.T) { t.Log("Test cache on rev 200") lister, found = s.snapshots.GetLessOrEqual(200) assert.True(t, found, "Expected store to still keep rev 200") - elements = lister.ListPrefix("", "") + elements = lister.OrderedListPrefix("", "") assert.Len(t, elements, 1) assert.Equal(t, makeTestPod("foo", 200), elements[0].(*store.Element).Object) t.Log("Test cache on rev 300") lister, found = s.snapshots.GetLessOrEqual(300) assert.True(t, found, "Expected store to still keep rev 300") - elements = lister.ListPrefix("", "") + elements = lister.OrderedListPrefix("", "") assert.Empty(t, elements) t.Log("Test cache on rev 400") lister, found = s.snapshots.GetLessOrEqual(400) assert.True(t, found, "Expected store to still keep rev 400") - elements = lister.ListPrefix("", "") + elements = lister.OrderedListPrefix("", "") assert.Len(t, elements, 1) assert.Equal(t, makeTestPod("foo", 400), elements[0].(*store.Element).Object) @@ -1355,7 +1355,7 @@ func TestCacheSnapshots(t *testing.T) { t.Log("Test cache on rev 500") lister, found = s.snapshots.GetLessOrEqual(500) assert.True(t, found, "Expected store to still keep rev 500") - elements = lister.ListPrefix("", "") + elements = lister.OrderedListPrefix("", "") assert.Len(t, elements, 1) assert.Equal(t, makeTestPod("foo", 500), elements[0].(*store.Element).Object) @@ -1367,7 +1367,7 @@ func TestCacheSnapshots(t *testing.T) { t.Log("Test cache on rev 600") lister, found = s.snapshots.GetLessOrEqual(600) assert.True(t, found, "Expected replace to be snapshotted") - elements = lister.ListPrefix("", "") + elements = lister.OrderedListPrefix("", "") assert.Len(t, elements, 1) assert.Equal(t, makeTestPod("foo", 600), elements[0].(*store.Element).Object) @@ -1384,7 +1384,7 @@ func TestCacheSnapshots(t *testing.T) { t.Log("Test cache on rev 700") lister, found = s.snapshots.GetLessOrEqual(700) assert.True(t, found, "Expected replace to be snapshotted") - elements = lister.ListPrefix("", "") + elements = lister.OrderedListPrefix("", "") assert.Len(t, elements, 1) assert.Equal(t, makeTestPod("foo", 600), elements[0].(*store.Element).Object) }