mirror of
https://github.com/kubernetes/kubernetes.git
synced 2026-06-08 16:30:57 -04:00
apiserver: address RangeStream review feedback
This commit is contained in:
parent
5d62486d1c
commit
3ea54dafff
4 changed files with 108 additions and 134 deletions
|
|
@ -30,11 +30,8 @@ import (
|
|||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
utilrand "k8s.io/apimachinery/pkg/util/rand"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
"k8s.io/apiserver/pkg/features"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
"k8s.io/apiserver/pkg/storage/etcd3"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/utils/clock"
|
||||
)
|
||||
|
||||
|
|
@ -64,34 +61,22 @@ func BenchmarkCacherInit(b *testing.B) {
|
|||
Clock: clock.RealClock{},
|
||||
}
|
||||
|
||||
for _, m := range []struct {
|
||||
name string
|
||||
rangeStream bool
|
||||
}{
|
||||
{name: "Paginated", rangeStream: false},
|
||||
{name: "RangeStream", rangeStream: true},
|
||||
} {
|
||||
b.Run(fmt.Sprintf("Mode=%s", m.name), func(b *testing.B) {
|
||||
featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, features.EtcdRangeStream, m.rangeStream)
|
||||
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
for i := 0; i < b.N; i++ {
|
||||
cacher, err := NewCacherFromConfig(config)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
if err := cacher.Wait(ctx); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
b.StopTimer()
|
||||
cacher.Stop()
|
||||
etcd3.TestOnlyResetResourceSizeEstimator(etcdStorage)
|
||||
b.StartTimer()
|
||||
}
|
||||
b.ReportMetric(float64(pods), "pods/cache")
|
||||
})
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
for i := 0; i < b.N; i++ {
|
||||
cacher, err := NewCacherFromConfig(config)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
if err := cacher.Wait(ctx); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
b.StopTimer()
|
||||
cacher.Stop()
|
||||
etcd3.TestOnlyResetResourceSizeEstimator(etcdStorage)
|
||||
b.StartTimer()
|
||||
}
|
||||
b.ReportMetric(float64(pods), "pods/cache")
|
||||
}
|
||||
|
||||
func loadExemplarPod(b *testing.B) *corev1.Pod {
|
||||
|
|
|
|||
|
|
@ -61,7 +61,7 @@ func init() {
|
|||
utilruntime.Must(metav1.AddMetaToScheme(scheme))
|
||||
scheme.AddUnversionedTypes(corev1.SchemeGroupVersion, &metav1.Status{})
|
||||
pb := protobuf.NewSerializer(scheme, scheme)
|
||||
corev1ProtoCodec = codecs.CodecForVersions(pb, pb, schema.GroupVersions{corev1.SchemeGroupVersion}, schema.GroupVersions{corev1.SchemeGroupVersion})
|
||||
corev1ProtoCodec = codecs.CodecForVersions(pb, pb, schema.GroupVersions{corev1.SchemeGroupVersion}, nil)
|
||||
examplev1ProtoCodec = codecs.CodecForVersions(pb, pb, schema.GroupVersions{examplev1.SchemeGroupVersion}, nil)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -296,10 +296,7 @@ func (wc *watchChan) RequestWatchProgress() error {
|
|||
// All events sent will have isCreated=true
|
||||
func (wc *watchChan) sync() error {
|
||||
if wc.recursive && utilfeature.DefaultFeatureGate.Enabled(features.EtcdRangeStream) {
|
||||
err := wc.syncStream()
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
err := wc.syncStreamRecursive()
|
||||
if !isUnimplementedErr(err) {
|
||||
return interpretListError(err, true, wc.key, wc.key)
|
||||
}
|
||||
|
|
@ -365,10 +362,14 @@ func (wc *watchChan) syncPaginated() error {
|
|||
}
|
||||
}
|
||||
|
||||
func (wc *watchChan) syncStream() error {
|
||||
func (wc *watchChan) syncStreamRecursive() error {
|
||||
if !wc.recursive {
|
||||
return fmt.Errorf("syncStreamRecursive called on a non-recursive watch")
|
||||
}
|
||||
opts := []clientv3.OpOption{
|
||||
clientv3.WithRange(clientv3.GetPrefixRangeEnd(wc.key)),
|
||||
}
|
||||
|
||||
startTime := time.Now()
|
||||
streamResp, err := wc.watcher.client.KV.GetStream(wc.ctx, wc.key, opts...)
|
||||
metrics.RecordEtcdRequest("listStream", wc.watcher.groupResource, err, startTime)
|
||||
|
|
@ -376,7 +377,7 @@ func (wc *watchChan) syncStream() error {
|
|||
return err
|
||||
}
|
||||
|
||||
var streamRev int64
|
||||
var initialRev int64
|
||||
for r := range streamResp {
|
||||
if err := r.Err(); err != nil {
|
||||
return err
|
||||
|
|
@ -384,14 +385,18 @@ func (wc *watchChan) syncStream() error {
|
|||
rangeResp := r.RangeResponse
|
||||
for i, kv := range rangeResp.Kvs {
|
||||
wc.queueEvent(parseKV(kv))
|
||||
// free kv early. Long lists can take O(seconds) to decode.
|
||||
rangeResp.Kvs[i] = nil
|
||||
}
|
||||
if streamRev == 0 && rangeResp.Header != nil {
|
||||
streamRev = rangeResp.Header.Revision
|
||||
if initialRev == 0 && rangeResp.Header != nil {
|
||||
initialRev = rangeResp.Header.Revision
|
||||
}
|
||||
}
|
||||
if initialRev == 0 {
|
||||
return fmt.Errorf("rangeStream for %q completed without a revision", wc.key)
|
||||
}
|
||||
|
||||
wc.initialRev = streamRev
|
||||
wc.initialRev = initialRev
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -232,7 +232,15 @@ func TestTooLargeResourceVersionErrorForWatchList(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestWatchChanSyncPaginated(t *testing.T) {
|
||||
func TestWatchChanSync(t *testing.T) {
|
||||
modes := []struct {
|
||||
name string
|
||||
rangeStream bool
|
||||
}{
|
||||
{name: "Paginated"},
|
||||
{name: "RangeStream", rangeStream: true},
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
watchKey string
|
||||
|
|
@ -262,103 +270,79 @@ func TestWatchChanSyncPaginated(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
for _, testCase := range testCases {
|
||||
t.Run(testCase.name, func(t *testing.T) {
|
||||
orig := defaultWatcherMaxLimit
|
||||
defer func() { defaultWatcherMaxLimit = orig }()
|
||||
defaultWatcherMaxLimit = testCase.watcherMaxLimit
|
||||
for _, mode := range modes {
|
||||
for _, testCase := range testCases {
|
||||
t.Run(mode.name+"/"+testCase.name, func(t *testing.T) {
|
||||
orig := defaultWatcherMaxLimit
|
||||
defer func() { defaultWatcherMaxLimit = orig }()
|
||||
defaultWatcherMaxLimit = testCase.watcherMaxLimit
|
||||
|
||||
origCtx, store, _ := testSetup(t)
|
||||
initList, err := initStoreData(origCtx, store)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
kvWrapper := newEtcdClientKVWrapper(store.client.KV)
|
||||
kvWrapper.getReactors = append(kvWrapper.getReactors, func() {
|
||||
barThird := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "third", Name: "bar"}}
|
||||
podKey := fmt.Sprintf("/pods/%s/%s", barThird.Namespace, barThird.Name)
|
||||
storedObj := &example.Pod{}
|
||||
|
||||
err := store.Create(context.Background(), podKey, barThird, storedObj, 0)
|
||||
origCtx, store, _ := testSetup(t)
|
||||
initList, err := initStoreData(origCtx, store)
|
||||
if err != nil {
|
||||
t.Errorf("failed to create object: %v", err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
kvWrapper := newEtcdClientKVWrapper(store.client.KV)
|
||||
kvWrapper.getReactors = append(kvWrapper.getReactors, func() {
|
||||
barThird := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "third", Name: "bar"}}
|
||||
podKey := fmt.Sprintf("/pods/%s/%s", barThird.Namespace, barThird.Name)
|
||||
storedObj := &example.Pod{}
|
||||
|
||||
err := store.Create(context.Background(), podKey, barThird, storedObj, 0)
|
||||
if err != nil {
|
||||
t.Errorf("failed to create object: %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
store.client.KV = kvWrapper
|
||||
|
||||
w := store.watcher.createWatchChan(
|
||||
origCtx,
|
||||
testCase.watchKey,
|
||||
0,
|
||||
true,
|
||||
false,
|
||||
storage.Everything)
|
||||
|
||||
sync := w.syncPaginated
|
||||
if mode.rangeStream {
|
||||
sync = w.syncStreamRecursive
|
||||
}
|
||||
if err := sync(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if w.initialRev <= 0 {
|
||||
t.Errorf("expected initialRev to be set, got %d", w.initialRev)
|
||||
}
|
||||
|
||||
// close incomingEventChan so we can read incomingEventChan non-blocking
|
||||
close(w.incomingEventChan)
|
||||
|
||||
eventsReceived := 0
|
||||
for event := range w.incomingEventChan {
|
||||
eventsReceived++
|
||||
storagetesting.ExpectContains(t, "incorrect list pods", initList, event.key)
|
||||
}
|
||||
|
||||
if eventsReceived != testCase.expectEventCount {
|
||||
t.Errorf("Unexpected number of events: %v, expected: %v", eventsReceived, testCase.expectEventCount)
|
||||
}
|
||||
|
||||
if mode.rangeStream {
|
||||
if kvWrapper.getStreamCallCounter != 1 {
|
||||
t.Errorf("Unexpected called times of client.KV.GetStream() : %v, expected: 1", kvWrapper.getStreamCallCounter)
|
||||
}
|
||||
} else if kvWrapper.getCallCounter != testCase.expectGetCount {
|
||||
t.Errorf("Unexpected called times of client.KV.Get() : %v, expected: %v", kvWrapper.getCallCounter, testCase.expectGetCount)
|
||||
}
|
||||
})
|
||||
|
||||
store.client.KV = kvWrapper
|
||||
|
||||
w := store.watcher.createWatchChan(
|
||||
origCtx,
|
||||
testCase.watchKey,
|
||||
0,
|
||||
true,
|
||||
false,
|
||||
storage.Everything)
|
||||
|
||||
err = w.syncPaginated()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// close incomingEventChan so we can read incomingEventChan non-blocking
|
||||
close(w.incomingEventChan)
|
||||
|
||||
eventsReceived := 0
|
||||
for event := range w.incomingEventChan {
|
||||
eventsReceived++
|
||||
storagetesting.ExpectContains(t, "incorrect list pods", initList, event.key)
|
||||
}
|
||||
|
||||
if eventsReceived != testCase.expectEventCount {
|
||||
t.Errorf("Unexpected number of events: %v, expected: %v", eventsReceived, testCase.expectEventCount)
|
||||
}
|
||||
|
||||
if kvWrapper.getCallCounter != testCase.expectGetCount {
|
||||
t.Errorf("Unexpected called times of client.KV.Get() : %v, expected: %v", kvWrapper.getCallCounter, testCase.expectGetCount)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatchChanSyncStream(t *testing.T) {
|
||||
origCtx, store, _ := testSetup(t)
|
||||
initList, err := initStoreData(origCtx, store)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
w := store.watcher.createWatchChan(
|
||||
origCtx,
|
||||
"/pods/",
|
||||
0,
|
||||
true,
|
||||
false,
|
||||
storage.Everything)
|
||||
|
||||
if err := w.syncStream(); err != nil {
|
||||
t.Fatalf("syncStream failed: %v", err)
|
||||
}
|
||||
|
||||
if w.initialRev <= 0 {
|
||||
t.Errorf("expected initialRev to be set from the stream header, got %d", w.initialRev)
|
||||
}
|
||||
|
||||
// close incomingEventChan so we can read it non-blocking
|
||||
close(w.incomingEventChan)
|
||||
|
||||
eventsReceived := 0
|
||||
for event := range w.incomingEventChan {
|
||||
eventsReceived++
|
||||
storagetesting.ExpectContains(t, "incorrect list pods", initList, event.key)
|
||||
}
|
||||
|
||||
if eventsReceived != len(initList) {
|
||||
t.Errorf("Unexpected number of events: %v, expected: %v", eventsReceived, len(initList))
|
||||
}
|
||||
}
|
||||
|
||||
// TestWatchChanSyncStreamMatchesPaginated verifies syncStream queues the same
|
||||
// TestWatchChanSyncStreamMatchesPaginated verifies syncStreamRecursive queues the same
|
||||
// key/value/revision set as syncPaginated for the same etcd state.
|
||||
func TestWatchChanSyncStreamMatchesPaginated(t *testing.T) {
|
||||
origCtx, store, _ := testSetup(t)
|
||||
|
|
@ -373,14 +357,14 @@ func TestWatchChanSyncStreamMatchesPaginated(t *testing.T) {
|
|||
want[key] = struct{}{}
|
||||
}
|
||||
|
||||
stream := drainSync(t, store, origCtx, func(wc *watchChan) error { return wc.syncStream() })
|
||||
stream := drainSync(t, store, origCtx, func(wc *watchChan) error { return wc.syncStreamRecursive() })
|
||||
paginated := drainSync(t, store, origCtx, func(wc *watchChan) error { return wc.syncPaginated() })
|
||||
|
||||
if len(stream) != len(want) {
|
||||
t.Errorf("syncStream queued %d events, expected %d", len(stream), len(want))
|
||||
t.Errorf("syncStreamRecursive queued %d events, expected %d", len(stream), len(want))
|
||||
}
|
||||
if diff := cmp.Diff(paginated, stream, cmp.AllowUnexported(event{})); diff != "" {
|
||||
t.Errorf("syncStream and syncPaginated queued different events (-paginated +stream):\n%s", diff)
|
||||
t.Errorf("syncStreamRecursive and syncPaginated queued different events (-paginated +stream):\n%s", diff)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue