diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_init_bench_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_init_bench_test.go index 6e8c60d79f5..b01e88f31f0 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_init_bench_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_init_bench_test.go @@ -30,11 +30,8 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" utilrand "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/uuid" - "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/etcd3" - utilfeature "k8s.io/apiserver/pkg/util/feature" - featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/utils/clock" ) @@ -64,34 +61,22 @@ func BenchmarkCacherInit(b *testing.B) { Clock: clock.RealClock{}, } - for _, m := range []struct { - name string - rangeStream bool - }{ - {name: "Paginated", rangeStream: false}, - {name: "RangeStream", rangeStream: true}, - } { - b.Run(fmt.Sprintf("Mode=%s", m.name), func(b *testing.B) { - featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, features.EtcdRangeStream, m.rangeStream) - - b.ResetTimer() - b.ReportAllocs() - for i := 0; i < b.N; i++ { - cacher, err := NewCacherFromConfig(config) - if err != nil { - b.Fatal(err) - } - if err := cacher.Wait(ctx); err != nil { - b.Fatal(err) - } - b.StopTimer() - cacher.Stop() - etcd3.TestOnlyResetResourceSizeEstimator(etcdStorage) - b.StartTimer() - } - b.ReportMetric(float64(pods), "pods/cache") - }) + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + cacher, err := NewCacherFromConfig(config) + if err != nil { + b.Fatal(err) + } + if err := cacher.Wait(ctx); err != nil { + b.Fatal(err) + } + b.StopTimer() + cacher.Stop() + etcd3.TestOnlyResetResourceSizeEstimator(etcdStorage) + b.StartTimer() } + b.ReportMetric(float64(pods), "pods/cache") } func loadExemplarPod(b *testing.B) *corev1.Pod { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go index ca7513fcba7..12950c41c82 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go @@ -61,7 +61,7 @@ func init() { utilruntime.Must(metav1.AddMetaToScheme(scheme)) scheme.AddUnversionedTypes(corev1.SchemeGroupVersion, &metav1.Status{}) pb := protobuf.NewSerializer(scheme, scheme) - corev1ProtoCodec = codecs.CodecForVersions(pb, pb, schema.GroupVersions{corev1.SchemeGroupVersion}, schema.GroupVersions{corev1.SchemeGroupVersion}) + corev1ProtoCodec = codecs.CodecForVersions(pb, pb, schema.GroupVersions{corev1.SchemeGroupVersion}, nil) examplev1ProtoCodec = codecs.CodecForVersions(pb, pb, schema.GroupVersions{examplev1.SchemeGroupVersion}, nil) } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go index e5afed3356b..cb71179cf17 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go @@ -296,10 +296,7 @@ func (wc *watchChan) RequestWatchProgress() error { // All events sent will have isCreated=true func (wc *watchChan) sync() error { if wc.recursive && utilfeature.DefaultFeatureGate.Enabled(features.EtcdRangeStream) { - err := wc.syncStream() - if err == nil { - return nil - } + err := wc.syncStreamRecursive() if !isUnimplementedErr(err) { return interpretListError(err, true, wc.key, wc.key) } @@ -365,10 +362,14 @@ func (wc *watchChan) syncPaginated() error { } } -func (wc *watchChan) syncStream() error { +func (wc *watchChan) syncStreamRecursive() error { + if !wc.recursive { + return fmt.Errorf("syncStreamRecursive called on a non-recursive watch") + } opts := []clientv3.OpOption{ clientv3.WithRange(clientv3.GetPrefixRangeEnd(wc.key)), } + startTime := time.Now() streamResp, err := wc.watcher.client.KV.GetStream(wc.ctx, wc.key, opts...) metrics.RecordEtcdRequest("listStream", wc.watcher.groupResource, err, startTime) @@ -376,7 +377,7 @@ func (wc *watchChan) syncStream() error { return err } - var streamRev int64 + var initialRev int64 for r := range streamResp { if err := r.Err(); err != nil { return err @@ -384,14 +385,18 @@ func (wc *watchChan) syncStream() error { rangeResp := r.RangeResponse for i, kv := range rangeResp.Kvs { wc.queueEvent(parseKV(kv)) + // free kv early. Long lists can take O(seconds) to decode. rangeResp.Kvs[i] = nil } - if streamRev == 0 && rangeResp.Header != nil { - streamRev = rangeResp.Header.Revision + if initialRev == 0 && rangeResp.Header != nil { + initialRev = rangeResp.Header.Revision } } + if initialRev == 0 { + return fmt.Errorf("rangeStream for %q completed without a revision", wc.key) + } - wc.initialRev = streamRev + wc.initialRev = initialRev return nil } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index 0b9362f522e..13723ac0668 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -232,7 +232,15 @@ func TestTooLargeResourceVersionErrorForWatchList(t *testing.T) { } } -func TestWatchChanSyncPaginated(t *testing.T) { +func TestWatchChanSync(t *testing.T) { + modes := []struct { + name string + rangeStream bool + }{ + {name: "Paginated"}, + {name: "RangeStream", rangeStream: true}, + } + testCases := []struct { name string watchKey string @@ -262,103 +270,79 @@ func TestWatchChanSyncPaginated(t *testing.T) { }, } - for _, testCase := range testCases { - t.Run(testCase.name, func(t *testing.T) { - orig := defaultWatcherMaxLimit - defer func() { defaultWatcherMaxLimit = orig }() - defaultWatcherMaxLimit = testCase.watcherMaxLimit + for _, mode := range modes { + for _, testCase := range testCases { + t.Run(mode.name+"/"+testCase.name, func(t *testing.T) { + orig := defaultWatcherMaxLimit + defer func() { defaultWatcherMaxLimit = orig }() + defaultWatcherMaxLimit = testCase.watcherMaxLimit - origCtx, store, _ := testSetup(t) - initList, err := initStoreData(origCtx, store) - if err != nil { - t.Fatal(err) - } - - kvWrapper := newEtcdClientKVWrapper(store.client.KV) - kvWrapper.getReactors = append(kvWrapper.getReactors, func() { - barThird := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "third", Name: "bar"}} - podKey := fmt.Sprintf("/pods/%s/%s", barThird.Namespace, barThird.Name) - storedObj := &example.Pod{} - - err := store.Create(context.Background(), podKey, barThird, storedObj, 0) + origCtx, store, _ := testSetup(t) + initList, err := initStoreData(origCtx, store) if err != nil { - t.Errorf("failed to create object: %v", err) + t.Fatal(err) + } + + kvWrapper := newEtcdClientKVWrapper(store.client.KV) + kvWrapper.getReactors = append(kvWrapper.getReactors, func() { + barThird := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "third", Name: "bar"}} + podKey := fmt.Sprintf("/pods/%s/%s", barThird.Namespace, barThird.Name) + storedObj := &example.Pod{} + + err := store.Create(context.Background(), podKey, barThird, storedObj, 0) + if err != nil { + t.Errorf("failed to create object: %v", err) + } + }) + + store.client.KV = kvWrapper + + w := store.watcher.createWatchChan( + origCtx, + testCase.watchKey, + 0, + true, + false, + storage.Everything) + + sync := w.syncPaginated + if mode.rangeStream { + sync = w.syncStreamRecursive + } + if err := sync(); err != nil { + t.Fatal(err) + } + + if w.initialRev <= 0 { + t.Errorf("expected initialRev to be set, got %d", w.initialRev) + } + + // close incomingEventChan so we can read incomingEventChan non-blocking + close(w.incomingEventChan) + + eventsReceived := 0 + for event := range w.incomingEventChan { + eventsReceived++ + storagetesting.ExpectContains(t, "incorrect list pods", initList, event.key) + } + + if eventsReceived != testCase.expectEventCount { + t.Errorf("Unexpected number of events: %v, expected: %v", eventsReceived, testCase.expectEventCount) + } + + if mode.rangeStream { + if kvWrapper.getStreamCallCounter != 1 { + t.Errorf("Unexpected called times of client.KV.GetStream() : %v, expected: 1", kvWrapper.getStreamCallCounter) + } + } else if kvWrapper.getCallCounter != testCase.expectGetCount { + t.Errorf("Unexpected called times of client.KV.Get() : %v, expected: %v", kvWrapper.getCallCounter, testCase.expectGetCount) } }) - - store.client.KV = kvWrapper - - w := store.watcher.createWatchChan( - origCtx, - testCase.watchKey, - 0, - true, - false, - storage.Everything) - - err = w.syncPaginated() - if err != nil { - t.Fatal(err) - } - - // close incomingEventChan so we can read incomingEventChan non-blocking - close(w.incomingEventChan) - - eventsReceived := 0 - for event := range w.incomingEventChan { - eventsReceived++ - storagetesting.ExpectContains(t, "incorrect list pods", initList, event.key) - } - - if eventsReceived != testCase.expectEventCount { - t.Errorf("Unexpected number of events: %v, expected: %v", eventsReceived, testCase.expectEventCount) - } - - if kvWrapper.getCallCounter != testCase.expectGetCount { - t.Errorf("Unexpected called times of client.KV.Get() : %v, expected: %v", kvWrapper.getCallCounter, testCase.expectGetCount) - } - }) + } } } -func TestWatchChanSyncStream(t *testing.T) { - origCtx, store, _ := testSetup(t) - initList, err := initStoreData(origCtx, store) - if err != nil { - t.Fatal(err) - } - - w := store.watcher.createWatchChan( - origCtx, - "/pods/", - 0, - true, - false, - storage.Everything) - - if err := w.syncStream(); err != nil { - t.Fatalf("syncStream failed: %v", err) - } - - if w.initialRev <= 0 { - t.Errorf("expected initialRev to be set from the stream header, got %d", w.initialRev) - } - - // close incomingEventChan so we can read it non-blocking - close(w.incomingEventChan) - - eventsReceived := 0 - for event := range w.incomingEventChan { - eventsReceived++ - storagetesting.ExpectContains(t, "incorrect list pods", initList, event.key) - } - - if eventsReceived != len(initList) { - t.Errorf("Unexpected number of events: %v, expected: %v", eventsReceived, len(initList)) - } -} - -// TestWatchChanSyncStreamMatchesPaginated verifies syncStream queues the same +// TestWatchChanSyncStreamMatchesPaginated verifies syncStreamRecursive queues the same // key/value/revision set as syncPaginated for the same etcd state. func TestWatchChanSyncStreamMatchesPaginated(t *testing.T) { origCtx, store, _ := testSetup(t) @@ -373,14 +357,14 @@ func TestWatchChanSyncStreamMatchesPaginated(t *testing.T) { want[key] = struct{}{} } - stream := drainSync(t, store, origCtx, func(wc *watchChan) error { return wc.syncStream() }) + stream := drainSync(t, store, origCtx, func(wc *watchChan) error { return wc.syncStreamRecursive() }) paginated := drainSync(t, store, origCtx, func(wc *watchChan) error { return wc.syncPaginated() }) if len(stream) != len(want) { - t.Errorf("syncStream queued %d events, expected %d", len(stream), len(want)) + t.Errorf("syncStreamRecursive queued %d events, expected %d", len(stream), len(want)) } if diff := cmp.Diff(paginated, stream, cmp.AllowUnexported(event{})); diff != "" { - t.Errorf("syncStream and syncPaginated queued different events (-paginated +stream):\n%s", diff) + t.Errorf("syncStreamRecursive and syncPaginated queued different events (-paginated +stream):\n%s", diff) } }