diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 74372ce6af5..a96849af298 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -2128,6 +2128,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate {Version: version.MustParse("1.34"), Default: true, PreRelease: featuregate.Beta}, }, + genericfeatures.EtcdRangeStream: { + {Version: version.MustParse("1.37"), Default: false, PreRelease: featuregate.Beta}, + }, + genericfeatures.KMSv1: { {Version: version.MustParse("1.0"), Default: true, PreRelease: featuregate.GA}, {Version: version.MustParse("1.28"), Default: true, PreRelease: featuregate.Deprecated}, @@ -2621,6 +2625,8 @@ var defaultKubernetesFeatureGateDependencies = map[featuregate.Feature][]feature genericfeatures.DetectCacheInconsistency: {}, + genericfeatures.EtcdRangeStream: {}, + genericfeatures.KMSv1: {}, genericfeatures.ListFromCacheSnapshot: {}, diff --git a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go index c6ad171857c..8e6cc4664ed 100644 --- a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go +++ b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go @@ -146,6 +146,11 @@ const ( // Enabled cache inconsistency detection. DetectCacheInconsistency featuregate.Feature = "DetectCacheInconsistency" + // owner: @jefftree + // + // Enables the RangeStream RPC for list operations in etcd. + EtcdRangeStream featuregate.Feature = "EtcdRangeStream" + // owner: @aramase // kep: https://kep.k8s.io/3299 // deprecated: v1.28 @@ -354,6 +359,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate {Version: version.MustParse("1.34"), Default: true, PreRelease: featuregate.Beta}, }, + EtcdRangeStream: { + {Version: version.MustParse("1.37"), Default: false, PreRelease: featuregate.Beta}, + }, + KMSv1: { {Version: version.MustParse("1.0"), Default: true, PreRelease: featuregate.GA}, {Version: version.MustParse("1.28"), Default: true, PreRelease: featuregate.Deprecated}, diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go index 85214cbb58f..c1c62d4e1fd 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go @@ -291,6 +291,21 @@ func (wc *watchChan) RequestWatchProgress() error { // The revision to watch will be set to the revision in response. // All events sent will have isCreated=true func (wc *watchChan) sync() error { + // TODO(jefftree): detect RangeStream support via the etcd feature checker. + if wc.recursive && utilfeature.DefaultFeatureGate.Enabled(features.EtcdRangeStream) { + err := wc.syncStreamRecursive() + if err == nil { + return nil + } + if grpcstatus.Code(err) != grpccodes.Unimplemented { + return err + } + klog.V(4).Infof("etcd server does not support RangeStream for %v; falling back to paginated list", wc.watcher.groupResource) + } + return wc.syncPaginated() +} + +func (wc *watchChan) syncPaginated() error { opts := []clientv3.OpOption{} if wc.recursive { opts = append(opts, clientv3.WithLimit(defaultWatcherMaxLimit)) @@ -347,6 +362,46 @@ func (wc *watchChan) sync() error { } } +func (wc *watchChan) syncStreamRecursive() error { + if !wc.recursive { + return fmt.Errorf("syncStreamRecursive called on a non-recursive watch") + } + opts := []clientv3.OpOption{ + clientv3.WithRange(clientv3.GetPrefixRangeEnd(wc.key)), + } + + startTime := time.Now() + streamResp, err := wc.watcher.client.KV.GetStream(wc.ctx, wc.key, opts...) + metrics.RecordEtcdRequest("listStream", wc.watcher.groupResource, err, startTime) + if err != nil { + return err + } + + var initialRev int64 + for r := range streamResp { + if err := r.Err(); err != nil { + // paging=false: a compaction mid-stream can't be resumed with a + // continue token, so surface it as ResourceExpired for a relist. + return interpretListError(err, false, wc.key, wc.key) + } + rangeResp := r.RangeResponse + for i, kv := range rangeResp.Kvs { + wc.queueEvent(parseKV(kv)) + // free kv early. Long lists can take O(seconds) to decode. + rangeResp.Kvs[i] = nil + } + if initialRev == 0 && rangeResp.Header != nil { + initialRev = rangeResp.Header.Revision + } + } + if initialRev == 0 { + return fmt.Errorf("rangeStream for %q completed without a revision", wc.key) + } + + wc.initialRev = initialRev + return nil +} + func logWatchChannelErr(err error) { switch { case strings.Contains(err.Error(), "mvcc: required revision has been compacted"): diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index 6064ddcd8a3..13723ac0668 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -25,6 +25,8 @@ import ( "github.com/google/go-cmp/cmp" clientv3 "go.etcd.io/etcd/client/v3" + grpccodes "google.golang.org/grpc/codes" + grpcstatus "google.golang.org/grpc/status" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -231,6 +233,14 @@ func TestTooLargeResourceVersionErrorForWatchList(t *testing.T) { } func TestWatchChanSync(t *testing.T) { + modes := []struct { + name string + rangeStream bool + }{ + {name: "Paginated"}, + {name: "RangeStream", rangeStream: true}, + } + testCases := []struct { name string watchKey string @@ -260,68 +270,164 @@ func TestWatchChanSync(t *testing.T) { }, } - for _, testCase := range testCases { - t.Run(testCase.name, func(t *testing.T) { - defaultWatcherMaxLimit = testCase.watcherMaxLimit + for _, mode := range modes { + for _, testCase := range testCases { + t.Run(mode.name+"/"+testCase.name, func(t *testing.T) { + orig := defaultWatcherMaxLimit + defer func() { defaultWatcherMaxLimit = orig }() + defaultWatcherMaxLimit = testCase.watcherMaxLimit - origCtx, store, _ := testSetup(t) - initList, err := initStoreData(origCtx, store) - if err != nil { - t.Fatal(err) - } - - kvWrapper := newEtcdClientKVWrapper(store.client.KV) - kvWrapper.getReactors = append(kvWrapper.getReactors, func() { - barThird := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "third", Name: "bar"}} - podKey := fmt.Sprintf("/pods/%s/%s", barThird.Namespace, barThird.Name) - storedObj := &example.Pod{} - - err := store.Create(context.Background(), podKey, barThird, storedObj, 0) + origCtx, store, _ := testSetup(t) + initList, err := initStoreData(origCtx, store) if err != nil { - t.Errorf("failed to create object: %v", err) + t.Fatal(err) + } + + kvWrapper := newEtcdClientKVWrapper(store.client.KV) + kvWrapper.getReactors = append(kvWrapper.getReactors, func() { + barThird := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "third", Name: "bar"}} + podKey := fmt.Sprintf("/pods/%s/%s", barThird.Namespace, barThird.Name) + storedObj := &example.Pod{} + + err := store.Create(context.Background(), podKey, barThird, storedObj, 0) + if err != nil { + t.Errorf("failed to create object: %v", err) + } + }) + + store.client.KV = kvWrapper + + w := store.watcher.createWatchChan( + origCtx, + testCase.watchKey, + 0, + true, + false, + storage.Everything) + + sync := w.syncPaginated + if mode.rangeStream { + sync = w.syncStreamRecursive + } + if err := sync(); err != nil { + t.Fatal(err) + } + + if w.initialRev <= 0 { + t.Errorf("expected initialRev to be set, got %d", w.initialRev) + } + + // close incomingEventChan so we can read incomingEventChan non-blocking + close(w.incomingEventChan) + + eventsReceived := 0 + for event := range w.incomingEventChan { + eventsReceived++ + storagetesting.ExpectContains(t, "incorrect list pods", initList, event.key) + } + + if eventsReceived != testCase.expectEventCount { + t.Errorf("Unexpected number of events: %v, expected: %v", eventsReceived, testCase.expectEventCount) + } + + if mode.rangeStream { + if kvWrapper.getStreamCallCounter != 1 { + t.Errorf("Unexpected called times of client.KV.GetStream() : %v, expected: 1", kvWrapper.getStreamCallCounter) + } + } else if kvWrapper.getCallCounter != testCase.expectGetCount { + t.Errorf("Unexpected called times of client.KV.Get() : %v, expected: %v", kvWrapper.getCallCounter, testCase.expectGetCount) } }) - - store.client.KV = kvWrapper - - w := store.watcher.createWatchChan( - origCtx, - testCase.watchKey, - 0, - true, - false, - storage.Everything) - - err = w.sync() - if err != nil { - t.Fatal(err) - } - - // close incomingEventChan so we can read incomingEventChan non-blocking - close(w.incomingEventChan) - - eventsReceived := 0 - for event := range w.incomingEventChan { - eventsReceived++ - storagetesting.ExpectContains(t, "incorrect list pods", initList, event.key) - } - - if eventsReceived != testCase.expectEventCount { - t.Errorf("Unexpected number of events: %v, expected: %v", eventsReceived, testCase.expectEventCount) - } - - if kvWrapper.getCallCounter != testCase.expectGetCount { - t.Errorf("Unexpected called times of client.KV.Get() : %v, expected: %v", kvWrapper.getCallCounter, testCase.expectGetCount) - } - }) + } } } +// TestWatchChanSyncStreamMatchesPaginated verifies syncStreamRecursive queues the same +// key/value/revision set as syncPaginated for the same etcd state. +func TestWatchChanSyncStreamMatchesPaginated(t *testing.T) { + origCtx, store, _ := testSetup(t) + + want := map[string]struct{}{} + for i := range 20 { + pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: fmt.Sprintf("ns-%d", i%3), Name: fmt.Sprintf("pod-%d", i)}} + key := fmt.Sprintf("/pods/%s/%s", pod.Namespace, pod.Name) + if err := store.Create(origCtx, key, pod, &example.Pod{}, 0); err != nil { + t.Fatalf("failed to create object: %v", err) + } + want[key] = struct{}{} + } + + stream := drainSync(t, store, origCtx, func(wc *watchChan) error { return wc.syncStreamRecursive() }) + paginated := drainSync(t, store, origCtx, func(wc *watchChan) error { return wc.syncPaginated() }) + + if len(stream) != len(want) { + t.Errorf("syncStreamRecursive queued %d events, expected %d", len(stream), len(want)) + } + if diff := cmp.Diff(paginated, stream, cmp.AllowUnexported(event{})); diff != "" { + t.Errorf("syncStreamRecursive and syncPaginated queued different events (-paginated +stream):\n%s", diff) + } +} + +func TestWatchChanSyncStreamFallsBackToPaginated(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.EtcdRangeStream, true) + + origCtx, store, _ := testSetup(t) + initList, err := initStoreData(origCtx, store) + if err != nil { + t.Fatal(err) + } + + kvWrapper := newEtcdClientKVWrapper(store.client.KV) + kvWrapper.streamUnimplemented = true + store.client.KV = kvWrapper + + w := store.watcher.createWatchChan(origCtx, "/pods/", 0, true, false, storage.Everything) + + if err := w.sync(); err != nil { + t.Fatalf("sync failed: %v", err) + } + + if kvWrapper.getStreamCallCounter != 1 { + t.Errorf("expected GetStream to be called once, got %d", kvWrapper.getStreamCallCounter) + } + if w.initialRev <= 0 { + t.Errorf("expected initialRev to be set by the paginated fallback, got %d", w.initialRev) + } + + close(w.incomingEventChan) + eventsReceived := 0 + for event := range w.incomingEventChan { + eventsReceived++ + storagetesting.ExpectContains(t, "incorrect list pods", initList, event.key) + } + if eventsReceived != len(initList) { + t.Errorf("Unexpected number of events: %v, expected: %v", eventsReceived, len(initList)) + } +} + +func drainSync(t *testing.T, store *store, ctx context.Context, sync func(*watchChan) error) map[string]*event { + t.Helper() + wc := store.watcher.createWatchChan(ctx, "/pods/", 0, true, false, storage.Everything) + if err := sync(wc); err != nil { + t.Fatalf("sync failed: %v", err) + } + close(wc.incomingEventChan) + out := map[string]*event{} + for e := range wc.incomingEventChan { + out[e.key] = e + } + return out +} + // NOTE: it's not thread-safe type etcdClientKVWrapper struct { clientv3.KV // keeps track of the number of times Get method is called getCallCounter int + // keeps track of the number of times GetStream method is called + getStreamCallCounter int + // when true, GetStream returns a gRPC Unimplemented error + streamUnimplemented bool // getReactors is called after the etcd KV's get function is executed. getReactors []func() } @@ -333,6 +439,14 @@ func newEtcdClientKVWrapper(kv clientv3.KV) *etcdClientKVWrapper { } } +func (ecw *etcdClientKVWrapper) GetStream(ctx context.Context, key string, opts ...clientv3.OpOption) (clientv3.GetStreamChan, error) { + ecw.getStreamCallCounter++ + if ecw.streamUnimplemented { + return nil, grpcstatus.Error(grpccodes.Unimplemented, "RangeStream is unimplemented") + } + return ecw.KV.GetStream(ctx, key, opts...) +} + func (ecw *etcdClientKVWrapper) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) { resp, err := ecw.KV.Get(ctx, key, opts...) ecw.getCallCounter++ diff --git a/test/compatibility_lifecycle/reference/feature_list.md b/test/compatibility_lifecycle/reference/feature_list.md index f3ac652a3ab..94e93ab5e08 100644 --- a/test/compatibility_lifecycle/reference/feature_list.md +++ b/test/compatibility_lifecycle/reference/feature_list.md @@ -73,6 +73,7 @@ | DisableNodeKubeProxyVersion | :ballot_box_with_check: 1.33+ | :closed_lock_with_key: 1.36+ | 1.29–1.30 | | | 1.31– | | [code](https://cs.k8s.io/?q=%5CbDisableNodeKubeProxyVersion%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbDisableNodeKubeProxyVersion%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | DynamicResourceAllocation | :ballot_box_with_check: 1.34+ | :closed_lock_with_key: 1.35+ | 1.26–1.31 | 1.32–1.33 | 1.34– | | | [code](https://cs.k8s.io/?q=%5CbDynamicResourceAllocation%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbDynamicResourceAllocation%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | EnvFiles | :ballot_box_with_check: 1.35+ | | 1.34 | 1.35– | | | | [code](https://cs.k8s.io/?q=%5CbEnvFiles%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbEnvFiles%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | +| EtcdRangeStream | | | | 1.37– | | | | [code](https://cs.k8s.io/?q=%5CbEtcdRangeStream%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbEtcdRangeStream%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | EventedPLEG | | | 1.26– | | | | | [code](https://cs.k8s.io/?q=%5CbEventedPLEG%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbEventedPLEG%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | ExecProbeTimeout | :ballot_box_with_check: 1.20+ | :closed_lock_with_key: 1.35+ | | | 1.20– | | | [code](https://cs.k8s.io/?q=%5CbExecProbeTimeout%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbExecProbeTimeout%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | ExtendWebSocketsToKubelet | :ballot_box_with_check: 1.36+ | | | 1.36– | | | NodeDeclaredFeatures | [code](https://cs.k8s.io/?q=%5CbExtendWebSocketsToKubelet%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbExtendWebSocketsToKubelet%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | diff --git a/test/compatibility_lifecycle/reference/versioned_feature_list.yaml b/test/compatibility_lifecycle/reference/versioned_feature_list.yaml index ea12ee19a46..9399331d9e6 100644 --- a/test/compatibility_lifecycle/reference/versioned_feature_list.yaml +++ b/test/compatibility_lifecycle/reference/versioned_feature_list.yaml @@ -615,6 +615,12 @@ lockToDefault: false preRelease: Beta version: "1.35" +- name: EtcdRangeStream + versionedSpecs: + - default: false + lockToDefault: false + preRelease: Beta + version: "1.37" - name: EventedPLEG versionedSpecs: - default: false