From 5d62486d1ceaf3abb16be27c26e1e3e2b39ee323 Mon Sep 17 00:00:00 2001 From: Jefftree Date: Tue, 2 Jun 2026 15:31:33 -0400 Subject: [PATCH 1/4] apiserver: add EtcdRangeStream feature gate for watch cache initial sync --- pkg/features/kube_features.go | 6 + .../apiserver/pkg/features/kube_features.go | 9 ++ .../storage/cacher/cacher_init_bench_test.go | 45 ++++-- .../cacher/cacher_testing_utils_test.go | 2 +- .../apiserver/pkg/storage/etcd3/watcher.go | 48 +++++++ .../pkg/storage/etcd3/watcher_test.go | 134 +++++++++++++++++- .../reference/feature_list.md | 1 + .../reference/versioned_feature_list.yaml | 6 + 8 files changed, 233 insertions(+), 18 deletions(-) diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 6b6cd9ae65e..38c20c34026 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -2116,6 +2116,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate {Version: version.MustParse("1.34"), Default: true, PreRelease: featuregate.Beta}, }, + genericfeatures.EtcdRangeStream: { + {Version: version.MustParse("1.37"), Default: false, PreRelease: featuregate.Beta}, + }, + genericfeatures.KMSv1: { {Version: version.MustParse("1.0"), Default: true, PreRelease: featuregate.GA}, {Version: version.MustParse("1.28"), Default: true, PreRelease: featuregate.Deprecated}, @@ -2606,6 +2610,8 @@ var defaultKubernetesFeatureGateDependencies = map[featuregate.Feature][]feature genericfeatures.DetectCacheInconsistency: {}, + genericfeatures.EtcdRangeStream: {}, + genericfeatures.KMSv1: {}, genericfeatures.ListFromCacheSnapshot: {}, diff --git a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go index a2f3f8a53e8..18416b7be10 100644 --- a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go +++ b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go @@ -146,6 +146,11 @@ const ( // Enabled cache inconsistency detection. DetectCacheInconsistency featuregate.Feature = "DetectCacheInconsistency" + // owner: @jefftree + // + // Enables the RangeStream RPC for list operations in etcd. + EtcdRangeStream featuregate.Feature = "EtcdRangeStream" + // owner: @aramase // kep: https://kep.k8s.io/3299 // deprecated: v1.28 @@ -354,6 +359,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate {Version: version.MustParse("1.34"), Default: true, PreRelease: featuregate.Beta}, }, + EtcdRangeStream: { + {Version: version.MustParse("1.37"), Default: false, PreRelease: featuregate.Beta}, + }, + KMSv1: { {Version: version.MustParse("1.0"), Default: true, PreRelease: featuregate.GA}, {Version: version.MustParse("1.28"), Default: true, PreRelease: featuregate.Deprecated}, diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_init_bench_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_init_bench_test.go index b01e88f31f0..6e8c60d79f5 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_init_bench_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_init_bench_test.go @@ -30,8 +30,11 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" utilrand "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/etcd3" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/utils/clock" ) @@ -61,22 +64,34 @@ func BenchmarkCacherInit(b *testing.B) { Clock: clock.RealClock{}, } - b.ResetTimer() - b.ReportAllocs() - for i := 0; i < b.N; i++ { - cacher, err := NewCacherFromConfig(config) - if err != nil { - b.Fatal(err) - } - if err := cacher.Wait(ctx); err != nil { - b.Fatal(err) - } - b.StopTimer() - cacher.Stop() - etcd3.TestOnlyResetResourceSizeEstimator(etcdStorage) - b.StartTimer() + for _, m := range []struct { + name string + rangeStream bool + }{ + {name: "Paginated", rangeStream: false}, + {name: "RangeStream", rangeStream: true}, + } { + b.Run(fmt.Sprintf("Mode=%s", m.name), func(b *testing.B) { + featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, features.EtcdRangeStream, m.rangeStream) + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + cacher, err := NewCacherFromConfig(config) + if err != nil { + b.Fatal(err) + } + if err := cacher.Wait(ctx); err != nil { + b.Fatal(err) + } + b.StopTimer() + cacher.Stop() + etcd3.TestOnlyResetResourceSizeEstimator(etcdStorage) + b.StartTimer() + } + b.ReportMetric(float64(pods), "pods/cache") + }) } - b.ReportMetric(float64(pods), "pods/cache") } func loadExemplarPod(b *testing.B) *corev1.Pod { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go index 12950c41c82..ca7513fcba7 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go @@ -61,7 +61,7 @@ func init() { utilruntime.Must(metav1.AddMetaToScheme(scheme)) scheme.AddUnversionedTypes(corev1.SchemeGroupVersion, &metav1.Status{}) pb := protobuf.NewSerializer(scheme, scheme) - corev1ProtoCodec = codecs.CodecForVersions(pb, pb, schema.GroupVersions{corev1.SchemeGroupVersion}, nil) + corev1ProtoCodec = codecs.CodecForVersions(pb, pb, schema.GroupVersions{corev1.SchemeGroupVersion}, schema.GroupVersions{corev1.SchemeGroupVersion}) examplev1ProtoCodec = codecs.CodecForVersions(pb, pb, schema.GroupVersions{examplev1.SchemeGroupVersion}, nil) } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go index 85214cbb58f..e5afed3356b 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go @@ -234,6 +234,10 @@ type grpcError interface { GRPCStatus() *grpcstatus.Status } +func isUnimplementedErr(err error) bool { + return grpcstatus.Code(err) == grpccodes.Unimplemented +} + func isCancelError(err error) bool { if err == nil { return false @@ -291,6 +295,20 @@ func (wc *watchChan) RequestWatchProgress() error { // The revision to watch will be set to the revision in response. // All events sent will have isCreated=true func (wc *watchChan) sync() error { + if wc.recursive && utilfeature.DefaultFeatureGate.Enabled(features.EtcdRangeStream) { + err := wc.syncStream() + if err == nil { + return nil + } + if !isUnimplementedErr(err) { + return interpretListError(err, true, wc.key, wc.key) + } + klog.V(4).Infof("etcd server does not support RangeStream for %v; falling back to paginated list", wc.watcher.groupResource) + } + return wc.syncPaginated() +} + +func (wc *watchChan) syncPaginated() error { opts := []clientv3.OpOption{} if wc.recursive { opts = append(opts, clientv3.WithLimit(defaultWatcherMaxLimit)) @@ -347,6 +365,36 @@ func (wc *watchChan) sync() error { } } +func (wc *watchChan) syncStream() error { + opts := []clientv3.OpOption{ + clientv3.WithRange(clientv3.GetPrefixRangeEnd(wc.key)), + } + startTime := time.Now() + streamResp, err := wc.watcher.client.KV.GetStream(wc.ctx, wc.key, opts...) + metrics.RecordEtcdRequest("listStream", wc.watcher.groupResource, err, startTime) + if err != nil { + return err + } + + var streamRev int64 + for r := range streamResp { + if err := r.Err(); err != nil { + return err + } + rangeResp := r.RangeResponse + for i, kv := range rangeResp.Kvs { + wc.queueEvent(parseKV(kv)) + rangeResp.Kvs[i] = nil + } + if streamRev == 0 && rangeResp.Header != nil { + streamRev = rangeResp.Header.Revision + } + } + + wc.initialRev = streamRev + return nil +} + func logWatchChannelErr(err error) { switch { case strings.Contains(err.Error(), "mvcc: required revision has been compacted"): diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index 6064ddcd8a3..0b9362f522e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -25,6 +25,8 @@ import ( "github.com/google/go-cmp/cmp" clientv3 "go.etcd.io/etcd/client/v3" + grpccodes "google.golang.org/grpc/codes" + grpcstatus "google.golang.org/grpc/status" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -230,7 +232,7 @@ func TestTooLargeResourceVersionErrorForWatchList(t *testing.T) { } } -func TestWatchChanSync(t *testing.T) { +func TestWatchChanSyncPaginated(t *testing.T) { testCases := []struct { name string watchKey string @@ -262,6 +264,8 @@ func TestWatchChanSync(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { + orig := defaultWatcherMaxLimit + defer func() { defaultWatcherMaxLimit = orig }() defaultWatcherMaxLimit = testCase.watcherMaxLimit origCtx, store, _ := testSetup(t) @@ -292,7 +296,7 @@ func TestWatchChanSync(t *testing.T) { false, storage.Everything) - err = w.sync() + err = w.syncPaginated() if err != nil { t.Fatal(err) } @@ -317,11 +321,129 @@ func TestWatchChanSync(t *testing.T) { } } +func TestWatchChanSyncStream(t *testing.T) { + origCtx, store, _ := testSetup(t) + initList, err := initStoreData(origCtx, store) + if err != nil { + t.Fatal(err) + } + + w := store.watcher.createWatchChan( + origCtx, + "/pods/", + 0, + true, + false, + storage.Everything) + + if err := w.syncStream(); err != nil { + t.Fatalf("syncStream failed: %v", err) + } + + if w.initialRev <= 0 { + t.Errorf("expected initialRev to be set from the stream header, got %d", w.initialRev) + } + + // close incomingEventChan so we can read it non-blocking + close(w.incomingEventChan) + + eventsReceived := 0 + for event := range w.incomingEventChan { + eventsReceived++ + storagetesting.ExpectContains(t, "incorrect list pods", initList, event.key) + } + + if eventsReceived != len(initList) { + t.Errorf("Unexpected number of events: %v, expected: %v", eventsReceived, len(initList)) + } +} + +// TestWatchChanSyncStreamMatchesPaginated verifies syncStream queues the same +// key/value/revision set as syncPaginated for the same etcd state. +func TestWatchChanSyncStreamMatchesPaginated(t *testing.T) { + origCtx, store, _ := testSetup(t) + + want := map[string]struct{}{} + for i := range 20 { + pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: fmt.Sprintf("ns-%d", i%3), Name: fmt.Sprintf("pod-%d", i)}} + key := fmt.Sprintf("/pods/%s/%s", pod.Namespace, pod.Name) + if err := store.Create(origCtx, key, pod, &example.Pod{}, 0); err != nil { + t.Fatalf("failed to create object: %v", err) + } + want[key] = struct{}{} + } + + stream := drainSync(t, store, origCtx, func(wc *watchChan) error { return wc.syncStream() }) + paginated := drainSync(t, store, origCtx, func(wc *watchChan) error { return wc.syncPaginated() }) + + if len(stream) != len(want) { + t.Errorf("syncStream queued %d events, expected %d", len(stream), len(want)) + } + if diff := cmp.Diff(paginated, stream, cmp.AllowUnexported(event{})); diff != "" { + t.Errorf("syncStream and syncPaginated queued different events (-paginated +stream):\n%s", diff) + } +} + +func TestWatchChanSyncStreamFallsBackToPaginated(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.EtcdRangeStream, true) + + origCtx, store, _ := testSetup(t) + initList, err := initStoreData(origCtx, store) + if err != nil { + t.Fatal(err) + } + + kvWrapper := newEtcdClientKVWrapper(store.client.KV) + kvWrapper.streamUnimplemented = true + store.client.KV = kvWrapper + + w := store.watcher.createWatchChan(origCtx, "/pods/", 0, true, false, storage.Everything) + + if err := w.sync(); err != nil { + t.Fatalf("sync failed: %v", err) + } + + if kvWrapper.getStreamCallCounter != 1 { + t.Errorf("expected GetStream to be called once, got %d", kvWrapper.getStreamCallCounter) + } + if w.initialRev <= 0 { + t.Errorf("expected initialRev to be set by the paginated fallback, got %d", w.initialRev) + } + + close(w.incomingEventChan) + eventsReceived := 0 + for event := range w.incomingEventChan { + eventsReceived++ + storagetesting.ExpectContains(t, "incorrect list pods", initList, event.key) + } + if eventsReceived != len(initList) { + t.Errorf("Unexpected number of events: %v, expected: %v", eventsReceived, len(initList)) + } +} + +func drainSync(t *testing.T, store *store, ctx context.Context, sync func(*watchChan) error) map[string]*event { + t.Helper() + wc := store.watcher.createWatchChan(ctx, "/pods/", 0, true, false, storage.Everything) + if err := sync(wc); err != nil { + t.Fatalf("sync failed: %v", err) + } + close(wc.incomingEventChan) + out := map[string]*event{} + for e := range wc.incomingEventChan { + out[e.key] = e + } + return out +} + // NOTE: it's not thread-safe type etcdClientKVWrapper struct { clientv3.KV // keeps track of the number of times Get method is called getCallCounter int + // keeps track of the number of times GetStream method is called + getStreamCallCounter int + // when true, GetStream returns a gRPC Unimplemented error + streamUnimplemented bool // getReactors is called after the etcd KV's get function is executed. getReactors []func() } @@ -333,6 +455,14 @@ func newEtcdClientKVWrapper(kv clientv3.KV) *etcdClientKVWrapper { } } +func (ecw *etcdClientKVWrapper) GetStream(ctx context.Context, key string, opts ...clientv3.OpOption) (clientv3.GetStreamChan, error) { + ecw.getStreamCallCounter++ + if ecw.streamUnimplemented { + return nil, grpcstatus.Error(grpccodes.Unimplemented, "RangeStream is unimplemented") + } + return ecw.KV.GetStream(ctx, key, opts...) +} + func (ecw *etcdClientKVWrapper) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) { resp, err := ecw.KV.Get(ctx, key, opts...) ecw.getCallCounter++ diff --git a/test/compatibility_lifecycle/reference/feature_list.md b/test/compatibility_lifecycle/reference/feature_list.md index 7ae319ae4c5..baa2a1b0c7b 100644 --- a/test/compatibility_lifecycle/reference/feature_list.md +++ b/test/compatibility_lifecycle/reference/feature_list.md @@ -73,6 +73,7 @@ | DisableNodeKubeProxyVersion | :ballot_box_with_check: 1.33+ | :closed_lock_with_key: 1.36+ | 1.29–1.30 | | | 1.31– | | [code](https://cs.k8s.io/?q=%5CbDisableNodeKubeProxyVersion%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbDisableNodeKubeProxyVersion%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | DynamicResourceAllocation | :ballot_box_with_check: 1.34+ | :closed_lock_with_key: 1.35+ | 1.26–1.31 | 1.32–1.33 | 1.34– | | | [code](https://cs.k8s.io/?q=%5CbDynamicResourceAllocation%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbDynamicResourceAllocation%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | EnvFiles | :ballot_box_with_check: 1.35+ | | 1.34 | 1.35– | | | | [code](https://cs.k8s.io/?q=%5CbEnvFiles%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbEnvFiles%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | +| EtcdRangeStream | | | | 1.37– | | | | [code](https://cs.k8s.io/?q=%5CbEtcdRangeStream%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbEtcdRangeStream%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | EventedPLEG | | | 1.26– | | | | | [code](https://cs.k8s.io/?q=%5CbEventedPLEG%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbEventedPLEG%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | ExecProbeTimeout | :ballot_box_with_check: 1.20+ | :closed_lock_with_key: 1.35+ | | | 1.20– | | | [code](https://cs.k8s.io/?q=%5CbExecProbeTimeout%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbExecProbeTimeout%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | | ExtendWebSocketsToKubelet | :ballot_box_with_check: 1.36+ | | | 1.36– | | | NodeDeclaredFeatures | [code](https://cs.k8s.io/?q=%5CbExtendWebSocketsToKubelet%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbExtendWebSocketsToKubelet%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) | diff --git a/test/compatibility_lifecycle/reference/versioned_feature_list.yaml b/test/compatibility_lifecycle/reference/versioned_feature_list.yaml index 663d317f246..35d9a55017d 100644 --- a/test/compatibility_lifecycle/reference/versioned_feature_list.yaml +++ b/test/compatibility_lifecycle/reference/versioned_feature_list.yaml @@ -611,6 +611,12 @@ lockToDefault: false preRelease: Beta version: "1.35" +- name: EtcdRangeStream + versionedSpecs: + - default: false + lockToDefault: false + preRelease: Beta + version: "1.37" - name: EventedPLEG versionedSpecs: - default: false From 3ea54dafffcc537b475bb4cff4e6fc437d375d49 Mon Sep 17 00:00:00 2001 From: Jefftree Date: Wed, 3 Jun 2026 09:11:22 -0400 Subject: [PATCH 2/4] apiserver: address RangeStream review feedback --- .../storage/cacher/cacher_init_bench_test.go | 45 ++--- .../cacher/cacher_testing_utils_test.go | 2 +- .../apiserver/pkg/storage/etcd3/watcher.go | 23 ++- .../pkg/storage/etcd3/watcher_test.go | 172 ++++++++---------- 4 files changed, 108 insertions(+), 134 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_init_bench_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_init_bench_test.go index 6e8c60d79f5..b01e88f31f0 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_init_bench_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_init_bench_test.go @@ -30,11 +30,8 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" utilrand "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/uuid" - "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/etcd3" - utilfeature "k8s.io/apiserver/pkg/util/feature" - featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/utils/clock" ) @@ -64,34 +61,22 @@ func BenchmarkCacherInit(b *testing.B) { Clock: clock.RealClock{}, } - for _, m := range []struct { - name string - rangeStream bool - }{ - {name: "Paginated", rangeStream: false}, - {name: "RangeStream", rangeStream: true}, - } { - b.Run(fmt.Sprintf("Mode=%s", m.name), func(b *testing.B) { - featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, features.EtcdRangeStream, m.rangeStream) - - b.ResetTimer() - b.ReportAllocs() - for i := 0; i < b.N; i++ { - cacher, err := NewCacherFromConfig(config) - if err != nil { - b.Fatal(err) - } - if err := cacher.Wait(ctx); err != nil { - b.Fatal(err) - } - b.StopTimer() - cacher.Stop() - etcd3.TestOnlyResetResourceSizeEstimator(etcdStorage) - b.StartTimer() - } - b.ReportMetric(float64(pods), "pods/cache") - }) + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + cacher, err := NewCacherFromConfig(config) + if err != nil { + b.Fatal(err) + } + if err := cacher.Wait(ctx); err != nil { + b.Fatal(err) + } + b.StopTimer() + cacher.Stop() + etcd3.TestOnlyResetResourceSizeEstimator(etcdStorage) + b.StartTimer() } + b.ReportMetric(float64(pods), "pods/cache") } func loadExemplarPod(b *testing.B) *corev1.Pod { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go index ca7513fcba7..12950c41c82 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go @@ -61,7 +61,7 @@ func init() { utilruntime.Must(metav1.AddMetaToScheme(scheme)) scheme.AddUnversionedTypes(corev1.SchemeGroupVersion, &metav1.Status{}) pb := protobuf.NewSerializer(scheme, scheme) - corev1ProtoCodec = codecs.CodecForVersions(pb, pb, schema.GroupVersions{corev1.SchemeGroupVersion}, schema.GroupVersions{corev1.SchemeGroupVersion}) + corev1ProtoCodec = codecs.CodecForVersions(pb, pb, schema.GroupVersions{corev1.SchemeGroupVersion}, nil) examplev1ProtoCodec = codecs.CodecForVersions(pb, pb, schema.GroupVersions{examplev1.SchemeGroupVersion}, nil) } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go index e5afed3356b..cb71179cf17 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go @@ -296,10 +296,7 @@ func (wc *watchChan) RequestWatchProgress() error { // All events sent will have isCreated=true func (wc *watchChan) sync() error { if wc.recursive && utilfeature.DefaultFeatureGate.Enabled(features.EtcdRangeStream) { - err := wc.syncStream() - if err == nil { - return nil - } + err := wc.syncStreamRecursive() if !isUnimplementedErr(err) { return interpretListError(err, true, wc.key, wc.key) } @@ -365,10 +362,14 @@ func (wc *watchChan) syncPaginated() error { } } -func (wc *watchChan) syncStream() error { +func (wc *watchChan) syncStreamRecursive() error { + if !wc.recursive { + return fmt.Errorf("syncStreamRecursive called on a non-recursive watch") + } opts := []clientv3.OpOption{ clientv3.WithRange(clientv3.GetPrefixRangeEnd(wc.key)), } + startTime := time.Now() streamResp, err := wc.watcher.client.KV.GetStream(wc.ctx, wc.key, opts...) metrics.RecordEtcdRequest("listStream", wc.watcher.groupResource, err, startTime) @@ -376,7 +377,7 @@ func (wc *watchChan) syncStream() error { return err } - var streamRev int64 + var initialRev int64 for r := range streamResp { if err := r.Err(); err != nil { return err @@ -384,14 +385,18 @@ func (wc *watchChan) syncStream() error { rangeResp := r.RangeResponse for i, kv := range rangeResp.Kvs { wc.queueEvent(parseKV(kv)) + // free kv early. Long lists can take O(seconds) to decode. rangeResp.Kvs[i] = nil } - if streamRev == 0 && rangeResp.Header != nil { - streamRev = rangeResp.Header.Revision + if initialRev == 0 && rangeResp.Header != nil { + initialRev = rangeResp.Header.Revision } } + if initialRev == 0 { + return fmt.Errorf("rangeStream for %q completed without a revision", wc.key) + } - wc.initialRev = streamRev + wc.initialRev = initialRev return nil } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index 0b9362f522e..13723ac0668 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -232,7 +232,15 @@ func TestTooLargeResourceVersionErrorForWatchList(t *testing.T) { } } -func TestWatchChanSyncPaginated(t *testing.T) { +func TestWatchChanSync(t *testing.T) { + modes := []struct { + name string + rangeStream bool + }{ + {name: "Paginated"}, + {name: "RangeStream", rangeStream: true}, + } + testCases := []struct { name string watchKey string @@ -262,103 +270,79 @@ func TestWatchChanSyncPaginated(t *testing.T) { }, } - for _, testCase := range testCases { - t.Run(testCase.name, func(t *testing.T) { - orig := defaultWatcherMaxLimit - defer func() { defaultWatcherMaxLimit = orig }() - defaultWatcherMaxLimit = testCase.watcherMaxLimit + for _, mode := range modes { + for _, testCase := range testCases { + t.Run(mode.name+"/"+testCase.name, func(t *testing.T) { + orig := defaultWatcherMaxLimit + defer func() { defaultWatcherMaxLimit = orig }() + defaultWatcherMaxLimit = testCase.watcherMaxLimit - origCtx, store, _ := testSetup(t) - initList, err := initStoreData(origCtx, store) - if err != nil { - t.Fatal(err) - } - - kvWrapper := newEtcdClientKVWrapper(store.client.KV) - kvWrapper.getReactors = append(kvWrapper.getReactors, func() { - barThird := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "third", Name: "bar"}} - podKey := fmt.Sprintf("/pods/%s/%s", barThird.Namespace, barThird.Name) - storedObj := &example.Pod{} - - err := store.Create(context.Background(), podKey, barThird, storedObj, 0) + origCtx, store, _ := testSetup(t) + initList, err := initStoreData(origCtx, store) if err != nil { - t.Errorf("failed to create object: %v", err) + t.Fatal(err) + } + + kvWrapper := newEtcdClientKVWrapper(store.client.KV) + kvWrapper.getReactors = append(kvWrapper.getReactors, func() { + barThird := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "third", Name: "bar"}} + podKey := fmt.Sprintf("/pods/%s/%s", barThird.Namespace, barThird.Name) + storedObj := &example.Pod{} + + err := store.Create(context.Background(), podKey, barThird, storedObj, 0) + if err != nil { + t.Errorf("failed to create object: %v", err) + } + }) + + store.client.KV = kvWrapper + + w := store.watcher.createWatchChan( + origCtx, + testCase.watchKey, + 0, + true, + false, + storage.Everything) + + sync := w.syncPaginated + if mode.rangeStream { + sync = w.syncStreamRecursive + } + if err := sync(); err != nil { + t.Fatal(err) + } + + if w.initialRev <= 0 { + t.Errorf("expected initialRev to be set, got %d", w.initialRev) + } + + // close incomingEventChan so we can read incomingEventChan non-blocking + close(w.incomingEventChan) + + eventsReceived := 0 + for event := range w.incomingEventChan { + eventsReceived++ + storagetesting.ExpectContains(t, "incorrect list pods", initList, event.key) + } + + if eventsReceived != testCase.expectEventCount { + t.Errorf("Unexpected number of events: %v, expected: %v", eventsReceived, testCase.expectEventCount) + } + + if mode.rangeStream { + if kvWrapper.getStreamCallCounter != 1 { + t.Errorf("Unexpected called times of client.KV.GetStream() : %v, expected: 1", kvWrapper.getStreamCallCounter) + } + } else if kvWrapper.getCallCounter != testCase.expectGetCount { + t.Errorf("Unexpected called times of client.KV.Get() : %v, expected: %v", kvWrapper.getCallCounter, testCase.expectGetCount) } }) - - store.client.KV = kvWrapper - - w := store.watcher.createWatchChan( - origCtx, - testCase.watchKey, - 0, - true, - false, - storage.Everything) - - err = w.syncPaginated() - if err != nil { - t.Fatal(err) - } - - // close incomingEventChan so we can read incomingEventChan non-blocking - close(w.incomingEventChan) - - eventsReceived := 0 - for event := range w.incomingEventChan { - eventsReceived++ - storagetesting.ExpectContains(t, "incorrect list pods", initList, event.key) - } - - if eventsReceived != testCase.expectEventCount { - t.Errorf("Unexpected number of events: %v, expected: %v", eventsReceived, testCase.expectEventCount) - } - - if kvWrapper.getCallCounter != testCase.expectGetCount { - t.Errorf("Unexpected called times of client.KV.Get() : %v, expected: %v", kvWrapper.getCallCounter, testCase.expectGetCount) - } - }) + } } } -func TestWatchChanSyncStream(t *testing.T) { - origCtx, store, _ := testSetup(t) - initList, err := initStoreData(origCtx, store) - if err != nil { - t.Fatal(err) - } - - w := store.watcher.createWatchChan( - origCtx, - "/pods/", - 0, - true, - false, - storage.Everything) - - if err := w.syncStream(); err != nil { - t.Fatalf("syncStream failed: %v", err) - } - - if w.initialRev <= 0 { - t.Errorf("expected initialRev to be set from the stream header, got %d", w.initialRev) - } - - // close incomingEventChan so we can read it non-blocking - close(w.incomingEventChan) - - eventsReceived := 0 - for event := range w.incomingEventChan { - eventsReceived++ - storagetesting.ExpectContains(t, "incorrect list pods", initList, event.key) - } - - if eventsReceived != len(initList) { - t.Errorf("Unexpected number of events: %v, expected: %v", eventsReceived, len(initList)) - } -} - -// TestWatchChanSyncStreamMatchesPaginated verifies syncStream queues the same +// TestWatchChanSyncStreamMatchesPaginated verifies syncStreamRecursive queues the same // key/value/revision set as syncPaginated for the same etcd state. func TestWatchChanSyncStreamMatchesPaginated(t *testing.T) { origCtx, store, _ := testSetup(t) @@ -373,14 +357,14 @@ func TestWatchChanSyncStreamMatchesPaginated(t *testing.T) { want[key] = struct{}{} } - stream := drainSync(t, store, origCtx, func(wc *watchChan) error { return wc.syncStream() }) + stream := drainSync(t, store, origCtx, func(wc *watchChan) error { return wc.syncStreamRecursive() }) paginated := drainSync(t, store, origCtx, func(wc *watchChan) error { return wc.syncPaginated() }) if len(stream) != len(want) { - t.Errorf("syncStream queued %d events, expected %d", len(stream), len(want)) + t.Errorf("syncStreamRecursive queued %d events, expected %d", len(stream), len(want)) } if diff := cmp.Diff(paginated, stream, cmp.AllowUnexported(event{})); diff != "" { - t.Errorf("syncStream and syncPaginated queued different events (-paginated +stream):\n%s", diff) + t.Errorf("syncStreamRecursive and syncPaginated queued different events (-paginated +stream):\n%s", diff) } } From dbbc4dc17699a71d1a30e718dedb79a21d8fc288 Mon Sep 17 00:00:00 2001 From: Jefftree Date: Fri, 5 Jun 2026 10:38:38 -0400 Subject: [PATCH 3/4] apiserver: interpret RangeStream errors in syncStreamRecursive --- staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go index cb71179cf17..dbfe556f334 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go @@ -295,10 +295,11 @@ func (wc *watchChan) RequestWatchProgress() error { // The revision to watch will be set to the revision in response. // All events sent will have isCreated=true func (wc *watchChan) sync() error { + // TODO(jefftree): detect RangeStream support via the etcd feature checker. if wc.recursive && utilfeature.DefaultFeatureGate.Enabled(features.EtcdRangeStream) { err := wc.syncStreamRecursive() if !isUnimplementedErr(err) { - return interpretListError(err, true, wc.key, wc.key) + return err } klog.V(4).Infof("etcd server does not support RangeStream for %v; falling back to paginated list", wc.watcher.groupResource) } @@ -380,7 +381,9 @@ func (wc *watchChan) syncStreamRecursive() error { var initialRev int64 for r := range streamResp { if err := r.Err(); err != nil { - return err + // paging=false: a compaction mid-stream can't be resumed with a + // continue token, so surface it as ResourceExpired for a relist. + return interpretListError(err, false, wc.key, wc.key) } rangeResp := r.RangeResponse for i, kv := range rangeResp.Kvs { From 33d1d2e6561ce5d3e40f48ede564dcced019266e Mon Sep 17 00:00:00 2001 From: Jefftree Date: Fri, 5 Jun 2026 11:07:21 -0400 Subject: [PATCH 4/4] apiserver: inline isUnimplementedErr and clarify sync fallback flow --- .../src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go index dbfe556f334..c1c62d4e1fd 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go @@ -234,10 +234,6 @@ type grpcError interface { GRPCStatus() *grpcstatus.Status } -func isUnimplementedErr(err error) bool { - return grpcstatus.Code(err) == grpccodes.Unimplemented -} - func isCancelError(err error) bool { if err == nil { return false @@ -298,7 +294,10 @@ func (wc *watchChan) sync() error { // TODO(jefftree): detect RangeStream support via the etcd feature checker. if wc.recursive && utilfeature.DefaultFeatureGate.Enabled(features.EtcdRangeStream) { err := wc.syncStreamRecursive() - if !isUnimplementedErr(err) { + if err == nil { + return nil + } + if grpcstatus.Code(err) != grpccodes.Unimplemented { return err } klog.V(4).Infof("etcd server does not support RangeStream for %v; falling back to paginated list", wc.watcher.groupResource)