mirror of
https://github.com/kubernetes/kubernetes.git
synced 2026-06-09 00:34:10 -04:00
Cleanup ordered listing after removal of btree feature gate
This commit is contained in:
parent
48d5bff656
commit
94a03e35c0
8 changed files with 35 additions and 59 deletions
|
|
@ -3155,7 +3155,7 @@ func (f fakeOrderedLister) Add(obj interface{}) error { return nil }
|
|||
func (f fakeOrderedLister) Update(obj interface{}) error { return nil }
|
||||
func (f fakeOrderedLister) Delete(obj interface{}) error { return nil }
|
||||
func (f fakeOrderedLister) Clone() store.OrderedLister { return f }
|
||||
func (f fakeOrderedLister) ListPrefix(prefixKey, continueKey string) []interface{} {
|
||||
func (f fakeOrderedLister) OrderedListPrefix(prefixKey, continueKey string) []interface{} {
|
||||
return nil
|
||||
}
|
||||
func (f fakeOrderedLister) Count(prefixKey, continueKey string) int { return 0 }
|
||||
|
|
|
|||
|
|
@ -58,6 +58,11 @@ const (
|
|||
btreeDegree = 16
|
||||
)
|
||||
|
||||
type OrderedIndexer interface {
|
||||
Indexer
|
||||
OrderedLister
|
||||
}
|
||||
|
||||
type Indexer interface {
|
||||
Add(obj interface{}) error
|
||||
Update(obj interface{}) error
|
||||
|
|
@ -71,12 +76,12 @@ type Indexer interface {
|
|||
}
|
||||
|
||||
type OrderedLister interface {
|
||||
ListPrefix(prefix, continueKey string) []interface{}
|
||||
OrderedListPrefix(prefix, continueKey string) []interface{}
|
||||
Count(prefix, continueKey string) (count int)
|
||||
Clone() OrderedLister
|
||||
}
|
||||
|
||||
func NewIndexer(indexers *cache.Indexers) Indexer {
|
||||
func NewIndexer(indexers *cache.Indexers) OrderedIndexer {
|
||||
return newThreadedBtreeStoreIndexer(ElementIndexers(indexers), btreeDegree)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -99,10 +99,10 @@ func (si *threadedStoreIndexer) List() []interface{} {
|
|||
return si.store.List()
|
||||
}
|
||||
|
||||
func (si *threadedStoreIndexer) ListPrefix(prefix, continueKey string) []interface{} {
|
||||
func (si *threadedStoreIndexer) OrderedListPrefix(prefix, continueKey string) []interface{} {
|
||||
si.lock.RLock()
|
||||
defer si.lock.RUnlock()
|
||||
return si.store.ListPrefix(prefix, continueKey)
|
||||
return si.store.OrderedListPrefix(prefix, continueKey)
|
||||
}
|
||||
|
||||
func (si *threadedStoreIndexer) ListKeys() []string {
|
||||
|
|
@ -253,7 +253,7 @@ func (s *btreeStore) getByKey(key string) (item interface{}, exists bool, err er
|
|||
return item, exists, nil
|
||||
}
|
||||
|
||||
func (s *btreeStore) ListPrefix(prefix, continueKey string) []interface{} {
|
||||
func (s *btreeStore) OrderedListPrefix(prefix, continueKey string) []interface{} {
|
||||
if continueKey == "" {
|
||||
continueKey = prefix
|
||||
}
|
||||
|
|
|
|||
|
|
@ -41,30 +41,30 @@ func TestStoreListPrefix(t *testing.T) {
|
|||
assert.NoError(t, store.Add(testStorageElement("foo2", "bar1", 3)))
|
||||
assert.NoError(t, store.Add(testStorageElement("bar", "baz", 4)))
|
||||
|
||||
items := store.ListPrefix("foo", "")
|
||||
items := store.OrderedListPrefix("foo", "")
|
||||
assert.Equal(t, []interface{}{
|
||||
testStorageElement("foo1", "bar2", 2),
|
||||
testStorageElement("foo2", "bar1", 3),
|
||||
testStorageElement("foo3", "bar3", 1),
|
||||
}, items)
|
||||
|
||||
items = store.ListPrefix("foo2", "")
|
||||
items = store.OrderedListPrefix("foo2", "")
|
||||
assert.Equal(t, []interface{}{
|
||||
testStorageElement("foo2", "bar1", 3),
|
||||
}, items)
|
||||
|
||||
items = store.ListPrefix("foo", "foo1\x00")
|
||||
items = store.OrderedListPrefix("foo", "foo1\x00")
|
||||
assert.Equal(t, []interface{}{
|
||||
testStorageElement("foo2", "bar1", 3),
|
||||
testStorageElement("foo3", "bar3", 1),
|
||||
}, items)
|
||||
|
||||
items = store.ListPrefix("foo", "foo2\x00")
|
||||
items = store.OrderedListPrefix("foo", "foo2\x00")
|
||||
assert.Equal(t, []interface{}{
|
||||
testStorageElement("foo3", "bar3", 1),
|
||||
}, items)
|
||||
|
||||
items = store.ListPrefix("bar", "")
|
||||
items = store.OrderedListPrefix("bar", "")
|
||||
assert.Equal(t, []interface{}{
|
||||
testStorageElement("bar", "baz", 4),
|
||||
}, items)
|
||||
|
|
@ -133,7 +133,7 @@ func (f fakeOrderedLister) Add(obj interface{}) error { return nil }
|
|||
func (f fakeOrderedLister) Update(obj interface{}) error { return nil }
|
||||
func (f fakeOrderedLister) Delete(obj interface{}) error { return nil }
|
||||
func (f fakeOrderedLister) Clone() OrderedLister { return f }
|
||||
func (f fakeOrderedLister) ListPrefix(prefixKey, continueKey string) []interface{} {
|
||||
func (f fakeOrderedLister) OrderedListPrefix(prefixKey, continueKey string) []interface{} {
|
||||
return nil
|
||||
}
|
||||
func (f fakeOrderedLister) Count(prefixKey, continueKey string) int { return 0 }
|
||||
|
|
|
|||
|
|
@ -122,7 +122,7 @@ type watchCache struct {
|
|||
// history" i.e. from the moment just after the newest cached watched event.
|
||||
// It is necessary to effectively allow clients to start watching at now.
|
||||
// NOTE: We assume that <store> is thread-safe.
|
||||
store store.Indexer
|
||||
store store.OrderedIndexer
|
||||
|
||||
// ResourceVersion up to which the watchCache is propagated.
|
||||
resourceVersion uint64
|
||||
|
|
@ -332,13 +332,11 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
|
|||
return err
|
||||
}
|
||||
if w.snapshots != nil && w.snapshottingEnabled.Load() {
|
||||
if orderedLister, ordered := w.store.(store.OrderedLister); ordered {
|
||||
if w.isCacheFullLocked() {
|
||||
oldestRV := w.cache[w.startIndex%w.capacity].ResourceVersion
|
||||
w.snapshots.RemoveLess(oldestRV)
|
||||
}
|
||||
w.snapshots.Add(w.resourceVersion, orderedLister)
|
||||
if w.isCacheFullLocked() {
|
||||
oldestRV := w.cache[w.startIndex%w.capacity].ResourceVersion
|
||||
w.snapshots.RemoveLess(oldestRV)
|
||||
}
|
||||
w.snapshots.Add(w.resourceVersion, w.store)
|
||||
}
|
||||
return err
|
||||
}(); err != nil {
|
||||
|
|
@ -609,7 +607,7 @@ func (w *watchCache) waitAndListExactRV(ctx context.Context, key, continueKey st
|
|||
if !ok {
|
||||
return listResp{}, "", errors.NewResourceExpired(fmt.Sprintf("too old resource version: %d", resourceVersion))
|
||||
}
|
||||
items := store.ListPrefix(key, continueKey)
|
||||
items := store.OrderedListPrefix(key, continueKey)
|
||||
return listResp{
|
||||
Items: items,
|
||||
ResourceVersion: resourceVersion,
|
||||
|
|
@ -653,19 +651,11 @@ func (w *watchCache) listLatestRV(key, continueKey string, matchValues []storage
|
|||
}, matchValue.IndexName, err
|
||||
}
|
||||
}
|
||||
if store, ok := w.store.(store.OrderedLister); ok {
|
||||
result := store.ListPrefix(key, continueKey)
|
||||
return listResp{
|
||||
Items: result,
|
||||
ResourceVersion: w.resourceVersion,
|
||||
}, "", nil
|
||||
}
|
||||
result := w.store.List()
|
||||
result, err = filterPrefixAndOrder(key, result)
|
||||
result := w.store.OrderedListPrefix(key, continueKey)
|
||||
return listResp{
|
||||
Items: result,
|
||||
ResourceVersion: w.resourceVersion,
|
||||
}, "", err
|
||||
}, "", nil
|
||||
}
|
||||
|
||||
func filterPrefixAndOrder(prefix string, items []interface{}) ([]interface{}, error) {
|
||||
|
|
@ -778,8 +768,8 @@ func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
|
|||
}
|
||||
if w.snapshots != nil {
|
||||
w.snapshots.Reset()
|
||||
if orderedLister, ordered := w.store.(store.OrderedLister); ordered && w.snapshottingEnabled.Load() {
|
||||
w.snapshots.Add(version, orderedLister)
|
||||
if w.snapshottingEnabled.Load() {
|
||||
w.snapshots.Add(version, w.store)
|
||||
}
|
||||
}
|
||||
w.listResourceVersion = version
|
||||
|
|
|
|||
|
|
@ -18,7 +18,6 @@ package cacher
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
|
|
@ -119,20 +118,6 @@ func newCacheInterval(startIndex, endIndex int, indexer indexerFunc, indexValida
|
|||
}
|
||||
}
|
||||
|
||||
type sortableWatchCacheEvents []*watchCacheEvent
|
||||
|
||||
func (s sortableWatchCacheEvents) Len() int {
|
||||
return len(s)
|
||||
}
|
||||
|
||||
func (s sortableWatchCacheEvents) Less(i, j int) bool {
|
||||
return s[i].Key < s[j].Key
|
||||
}
|
||||
|
||||
func (s sortableWatchCacheEvents) Swap(i, j int) {
|
||||
s[i], s[j] = s[j], s[i]
|
||||
}
|
||||
|
||||
// newCacheIntervalFromStore is meant to handle the case of rv=0, such that the events
|
||||
// returned by Next() need to be events from a List() done on the underlying store of
|
||||
// the watch cache.
|
||||
|
|
@ -168,9 +153,6 @@ func newCacheIntervalFromStore(resourceVersion uint64, indexer store.Indexer, ke
|
|||
}
|
||||
buffer.endIndex++
|
||||
}
|
||||
if _, ordered := indexer.(store.OrderedLister); !ordered {
|
||||
sort.Sort(sortableWatchCacheEvents(buffer.buffer))
|
||||
}
|
||||
ci := &watchCacheInterval{
|
||||
startIndex: 0,
|
||||
// Simulate that we already have all the events we're looking for.
|
||||
|
|
|
|||
|
|
@ -435,9 +435,8 @@ func TestCacheIntervalNextFromStore(t *testing.T) {
|
|||
func TestCacheIntervalFromStoreSorted(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
indexer store.Indexer
|
||||
indexer store.OrderedIndexer
|
||||
}{
|
||||
{"legacy", cache.NewIndexer(store.ElementKey, store.ElementIndexers(nil))},
|
||||
{"btree", store.NewIndexer(nil)},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
|
|
|
|||
|
|
@ -1314,7 +1314,7 @@ func TestCacheSnapshots(t *testing.T) {
|
|||
assert.False(t, found, "Expected store to not include rev 99")
|
||||
lister, found := s.snapshots.GetLessOrEqual(100)
|
||||
assert.True(t, found, "Expected store to not include rev 100")
|
||||
elements := lister.ListPrefix("", "")
|
||||
elements := lister.OrderedListPrefix("", "")
|
||||
assert.Len(t, elements, 1)
|
||||
assert.Equal(t, makeTestPod("foo", 100), elements[0].(*store.Element).Object)
|
||||
|
||||
|
|
@ -1326,20 +1326,20 @@ func TestCacheSnapshots(t *testing.T) {
|
|||
t.Log("Test cache on rev 200")
|
||||
lister, found = s.snapshots.GetLessOrEqual(200)
|
||||
assert.True(t, found, "Expected store to still keep rev 200")
|
||||
elements = lister.ListPrefix("", "")
|
||||
elements = lister.OrderedListPrefix("", "")
|
||||
assert.Len(t, elements, 1)
|
||||
assert.Equal(t, makeTestPod("foo", 200), elements[0].(*store.Element).Object)
|
||||
|
||||
t.Log("Test cache on rev 300")
|
||||
lister, found = s.snapshots.GetLessOrEqual(300)
|
||||
assert.True(t, found, "Expected store to still keep rev 300")
|
||||
elements = lister.ListPrefix("", "")
|
||||
elements = lister.OrderedListPrefix("", "")
|
||||
assert.Empty(t, elements)
|
||||
|
||||
t.Log("Test cache on rev 400")
|
||||
lister, found = s.snapshots.GetLessOrEqual(400)
|
||||
assert.True(t, found, "Expected store to still keep rev 400")
|
||||
elements = lister.ListPrefix("", "")
|
||||
elements = lister.OrderedListPrefix("", "")
|
||||
assert.Len(t, elements, 1)
|
||||
assert.Equal(t, makeTestPod("foo", 400), elements[0].(*store.Element).Object)
|
||||
|
||||
|
|
@ -1355,7 +1355,7 @@ func TestCacheSnapshots(t *testing.T) {
|
|||
t.Log("Test cache on rev 500")
|
||||
lister, found = s.snapshots.GetLessOrEqual(500)
|
||||
assert.True(t, found, "Expected store to still keep rev 500")
|
||||
elements = lister.ListPrefix("", "")
|
||||
elements = lister.OrderedListPrefix("", "")
|
||||
assert.Len(t, elements, 1)
|
||||
assert.Equal(t, makeTestPod("foo", 500), elements[0].(*store.Element).Object)
|
||||
|
||||
|
|
@ -1367,7 +1367,7 @@ func TestCacheSnapshots(t *testing.T) {
|
|||
t.Log("Test cache on rev 600")
|
||||
lister, found = s.snapshots.GetLessOrEqual(600)
|
||||
assert.True(t, found, "Expected replace to be snapshotted")
|
||||
elements = lister.ListPrefix("", "")
|
||||
elements = lister.OrderedListPrefix("", "")
|
||||
assert.Len(t, elements, 1)
|
||||
assert.Equal(t, makeTestPod("foo", 600), elements[0].(*store.Element).Object)
|
||||
|
||||
|
|
@ -1384,7 +1384,7 @@ func TestCacheSnapshots(t *testing.T) {
|
|||
t.Log("Test cache on rev 700")
|
||||
lister, found = s.snapshots.GetLessOrEqual(700)
|
||||
assert.True(t, found, "Expected replace to be snapshotted")
|
||||
elements = lister.ListPrefix("", "")
|
||||
elements = lister.OrderedListPrefix("", "")
|
||||
assert.Len(t, elements, 1)
|
||||
assert.Equal(t, makeTestPod("foo", 600), elements[0].(*store.Element).Object)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue