mirror of
https://github.com/kubernetes/kubernetes.git
synced 2026-06-08 16:30:57 -04:00
Merge pull request #136915 from Jefftree/range-stream
KEP 5966: Implement RangeStream for watch cache
This commit is contained in:
commit
1b7d495d61
6 changed files with 241 additions and 50 deletions
|
|
@ -2128,6 +2128,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
|
|||
{Version: version.MustParse("1.34"), Default: true, PreRelease: featuregate.Beta},
|
||||
},
|
||||
|
||||
genericfeatures.EtcdRangeStream: {
|
||||
{Version: version.MustParse("1.37"), Default: false, PreRelease: featuregate.Beta},
|
||||
},
|
||||
|
||||
genericfeatures.KMSv1: {
|
||||
{Version: version.MustParse("1.0"), Default: true, PreRelease: featuregate.GA},
|
||||
{Version: version.MustParse("1.28"), Default: true, PreRelease: featuregate.Deprecated},
|
||||
|
|
@ -2621,6 +2625,8 @@ var defaultKubernetesFeatureGateDependencies = map[featuregate.Feature][]feature
|
|||
|
||||
genericfeatures.DetectCacheInconsistency: {},
|
||||
|
||||
genericfeatures.EtcdRangeStream: {},
|
||||
|
||||
genericfeatures.KMSv1: {},
|
||||
|
||||
genericfeatures.ListFromCacheSnapshot: {},
|
||||
|
|
|
|||
|
|
@ -146,6 +146,11 @@ const (
|
|||
// Enabled cache inconsistency detection.
|
||||
DetectCacheInconsistency featuregate.Feature = "DetectCacheInconsistency"
|
||||
|
||||
// owner: @jefftree
|
||||
//
|
||||
// Enables the RangeStream RPC for list operations in etcd.
|
||||
EtcdRangeStream featuregate.Feature = "EtcdRangeStream"
|
||||
|
||||
// owner: @aramase
|
||||
// kep: https://kep.k8s.io/3299
|
||||
// deprecated: v1.28
|
||||
|
|
@ -354,6 +359,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
|
|||
{Version: version.MustParse("1.34"), Default: true, PreRelease: featuregate.Beta},
|
||||
},
|
||||
|
||||
EtcdRangeStream: {
|
||||
{Version: version.MustParse("1.37"), Default: false, PreRelease: featuregate.Beta},
|
||||
},
|
||||
|
||||
KMSv1: {
|
||||
{Version: version.MustParse("1.0"), Default: true, PreRelease: featuregate.GA},
|
||||
{Version: version.MustParse("1.28"), Default: true, PreRelease: featuregate.Deprecated},
|
||||
|
|
|
|||
|
|
@ -291,6 +291,21 @@ func (wc *watchChan) RequestWatchProgress() error {
|
|||
// The revision to watch will be set to the revision in response.
|
||||
// All events sent will have isCreated=true
|
||||
func (wc *watchChan) sync() error {
|
||||
// TODO(jefftree): detect RangeStream support via the etcd feature checker.
|
||||
if wc.recursive && utilfeature.DefaultFeatureGate.Enabled(features.EtcdRangeStream) {
|
||||
err := wc.syncStreamRecursive()
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
if grpcstatus.Code(err) != grpccodes.Unimplemented {
|
||||
return err
|
||||
}
|
||||
klog.V(4).Infof("etcd server does not support RangeStream for %v; falling back to paginated list", wc.watcher.groupResource)
|
||||
}
|
||||
return wc.syncPaginated()
|
||||
}
|
||||
|
||||
func (wc *watchChan) syncPaginated() error {
|
||||
opts := []clientv3.OpOption{}
|
||||
if wc.recursive {
|
||||
opts = append(opts, clientv3.WithLimit(defaultWatcherMaxLimit))
|
||||
|
|
@ -347,6 +362,46 @@ func (wc *watchChan) sync() error {
|
|||
}
|
||||
}
|
||||
|
||||
func (wc *watchChan) syncStreamRecursive() error {
|
||||
if !wc.recursive {
|
||||
return fmt.Errorf("syncStreamRecursive called on a non-recursive watch")
|
||||
}
|
||||
opts := []clientv3.OpOption{
|
||||
clientv3.WithRange(clientv3.GetPrefixRangeEnd(wc.key)),
|
||||
}
|
||||
|
||||
startTime := time.Now()
|
||||
streamResp, err := wc.watcher.client.KV.GetStream(wc.ctx, wc.key, opts...)
|
||||
metrics.RecordEtcdRequest("listStream", wc.watcher.groupResource, err, startTime)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var initialRev int64
|
||||
for r := range streamResp {
|
||||
if err := r.Err(); err != nil {
|
||||
// paging=false: a compaction mid-stream can't be resumed with a
|
||||
// continue token, so surface it as ResourceExpired for a relist.
|
||||
return interpretListError(err, false, wc.key, wc.key)
|
||||
}
|
||||
rangeResp := r.RangeResponse
|
||||
for i, kv := range rangeResp.Kvs {
|
||||
wc.queueEvent(parseKV(kv))
|
||||
// free kv early. Long lists can take O(seconds) to decode.
|
||||
rangeResp.Kvs[i] = nil
|
||||
}
|
||||
if initialRev == 0 && rangeResp.Header != nil {
|
||||
initialRev = rangeResp.Header.Revision
|
||||
}
|
||||
}
|
||||
if initialRev == 0 {
|
||||
return fmt.Errorf("rangeStream for %q completed without a revision", wc.key)
|
||||
}
|
||||
|
||||
wc.initialRev = initialRev
|
||||
return nil
|
||||
}
|
||||
|
||||
func logWatchChannelErr(err error) {
|
||||
switch {
|
||||
case strings.Contains(err.Error(), "mvcc: required revision has been compacted"):
|
||||
|
|
|
|||
|
|
@ -25,6 +25,8 @@ import (
|
|||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
grpccodes "google.golang.org/grpc/codes"
|
||||
grpcstatus "google.golang.org/grpc/status"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
|
@ -231,6 +233,14 @@ func TestTooLargeResourceVersionErrorForWatchList(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestWatchChanSync(t *testing.T) {
|
||||
modes := []struct {
|
||||
name string
|
||||
rangeStream bool
|
||||
}{
|
||||
{name: "Paginated"},
|
||||
{name: "RangeStream", rangeStream: true},
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
watchKey string
|
||||
|
|
@ -260,68 +270,164 @@ func TestWatchChanSync(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
for _, testCase := range testCases {
|
||||
t.Run(testCase.name, func(t *testing.T) {
|
||||
defaultWatcherMaxLimit = testCase.watcherMaxLimit
|
||||
for _, mode := range modes {
|
||||
for _, testCase := range testCases {
|
||||
t.Run(mode.name+"/"+testCase.name, func(t *testing.T) {
|
||||
orig := defaultWatcherMaxLimit
|
||||
defer func() { defaultWatcherMaxLimit = orig }()
|
||||
defaultWatcherMaxLimit = testCase.watcherMaxLimit
|
||||
|
||||
origCtx, store, _ := testSetup(t)
|
||||
initList, err := initStoreData(origCtx, store)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
kvWrapper := newEtcdClientKVWrapper(store.client.KV)
|
||||
kvWrapper.getReactors = append(kvWrapper.getReactors, func() {
|
||||
barThird := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "third", Name: "bar"}}
|
||||
podKey := fmt.Sprintf("/pods/%s/%s", barThird.Namespace, barThird.Name)
|
||||
storedObj := &example.Pod{}
|
||||
|
||||
err := store.Create(context.Background(), podKey, barThird, storedObj, 0)
|
||||
origCtx, store, _ := testSetup(t)
|
||||
initList, err := initStoreData(origCtx, store)
|
||||
if err != nil {
|
||||
t.Errorf("failed to create object: %v", err)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
kvWrapper := newEtcdClientKVWrapper(store.client.KV)
|
||||
kvWrapper.getReactors = append(kvWrapper.getReactors, func() {
|
||||
barThird := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "third", Name: "bar"}}
|
||||
podKey := fmt.Sprintf("/pods/%s/%s", barThird.Namespace, barThird.Name)
|
||||
storedObj := &example.Pod{}
|
||||
|
||||
err := store.Create(context.Background(), podKey, barThird, storedObj, 0)
|
||||
if err != nil {
|
||||
t.Errorf("failed to create object: %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
store.client.KV = kvWrapper
|
||||
|
||||
w := store.watcher.createWatchChan(
|
||||
origCtx,
|
||||
testCase.watchKey,
|
||||
0,
|
||||
true,
|
||||
false,
|
||||
storage.Everything)
|
||||
|
||||
sync := w.syncPaginated
|
||||
if mode.rangeStream {
|
||||
sync = w.syncStreamRecursive
|
||||
}
|
||||
if err := sync(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if w.initialRev <= 0 {
|
||||
t.Errorf("expected initialRev to be set, got %d", w.initialRev)
|
||||
}
|
||||
|
||||
// close incomingEventChan so we can read incomingEventChan non-blocking
|
||||
close(w.incomingEventChan)
|
||||
|
||||
eventsReceived := 0
|
||||
for event := range w.incomingEventChan {
|
||||
eventsReceived++
|
||||
storagetesting.ExpectContains(t, "incorrect list pods", initList, event.key)
|
||||
}
|
||||
|
||||
if eventsReceived != testCase.expectEventCount {
|
||||
t.Errorf("Unexpected number of events: %v, expected: %v", eventsReceived, testCase.expectEventCount)
|
||||
}
|
||||
|
||||
if mode.rangeStream {
|
||||
if kvWrapper.getStreamCallCounter != 1 {
|
||||
t.Errorf("Unexpected called times of client.KV.GetStream() : %v, expected: 1", kvWrapper.getStreamCallCounter)
|
||||
}
|
||||
} else if kvWrapper.getCallCounter != testCase.expectGetCount {
|
||||
t.Errorf("Unexpected called times of client.KV.Get() : %v, expected: %v", kvWrapper.getCallCounter, testCase.expectGetCount)
|
||||
}
|
||||
})
|
||||
|
||||
store.client.KV = kvWrapper
|
||||
|
||||
w := store.watcher.createWatchChan(
|
||||
origCtx,
|
||||
testCase.watchKey,
|
||||
0,
|
||||
true,
|
||||
false,
|
||||
storage.Everything)
|
||||
|
||||
err = w.sync()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// close incomingEventChan so we can read incomingEventChan non-blocking
|
||||
close(w.incomingEventChan)
|
||||
|
||||
eventsReceived := 0
|
||||
for event := range w.incomingEventChan {
|
||||
eventsReceived++
|
||||
storagetesting.ExpectContains(t, "incorrect list pods", initList, event.key)
|
||||
}
|
||||
|
||||
if eventsReceived != testCase.expectEventCount {
|
||||
t.Errorf("Unexpected number of events: %v, expected: %v", eventsReceived, testCase.expectEventCount)
|
||||
}
|
||||
|
||||
if kvWrapper.getCallCounter != testCase.expectGetCount {
|
||||
t.Errorf("Unexpected called times of client.KV.Get() : %v, expected: %v", kvWrapper.getCallCounter, testCase.expectGetCount)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestWatchChanSyncStreamMatchesPaginated verifies syncStreamRecursive queues the same
|
||||
// key/value/revision set as syncPaginated for the same etcd state.
|
||||
func TestWatchChanSyncStreamMatchesPaginated(t *testing.T) {
|
||||
origCtx, store, _ := testSetup(t)
|
||||
|
||||
want := map[string]struct{}{}
|
||||
for i := range 20 {
|
||||
pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: fmt.Sprintf("ns-%d", i%3), Name: fmt.Sprintf("pod-%d", i)}}
|
||||
key := fmt.Sprintf("/pods/%s/%s", pod.Namespace, pod.Name)
|
||||
if err := store.Create(origCtx, key, pod, &example.Pod{}, 0); err != nil {
|
||||
t.Fatalf("failed to create object: %v", err)
|
||||
}
|
||||
want[key] = struct{}{}
|
||||
}
|
||||
|
||||
stream := drainSync(t, store, origCtx, func(wc *watchChan) error { return wc.syncStreamRecursive() })
|
||||
paginated := drainSync(t, store, origCtx, func(wc *watchChan) error { return wc.syncPaginated() })
|
||||
|
||||
if len(stream) != len(want) {
|
||||
t.Errorf("syncStreamRecursive queued %d events, expected %d", len(stream), len(want))
|
||||
}
|
||||
if diff := cmp.Diff(paginated, stream, cmp.AllowUnexported(event{})); diff != "" {
|
||||
t.Errorf("syncStreamRecursive and syncPaginated queued different events (-paginated +stream):\n%s", diff)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatchChanSyncStreamFallsBackToPaginated(t *testing.T) {
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.EtcdRangeStream, true)
|
||||
|
||||
origCtx, store, _ := testSetup(t)
|
||||
initList, err := initStoreData(origCtx, store)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
kvWrapper := newEtcdClientKVWrapper(store.client.KV)
|
||||
kvWrapper.streamUnimplemented = true
|
||||
store.client.KV = kvWrapper
|
||||
|
||||
w := store.watcher.createWatchChan(origCtx, "/pods/", 0, true, false, storage.Everything)
|
||||
|
||||
if err := w.sync(); err != nil {
|
||||
t.Fatalf("sync failed: %v", err)
|
||||
}
|
||||
|
||||
if kvWrapper.getStreamCallCounter != 1 {
|
||||
t.Errorf("expected GetStream to be called once, got %d", kvWrapper.getStreamCallCounter)
|
||||
}
|
||||
if w.initialRev <= 0 {
|
||||
t.Errorf("expected initialRev to be set by the paginated fallback, got %d", w.initialRev)
|
||||
}
|
||||
|
||||
close(w.incomingEventChan)
|
||||
eventsReceived := 0
|
||||
for event := range w.incomingEventChan {
|
||||
eventsReceived++
|
||||
storagetesting.ExpectContains(t, "incorrect list pods", initList, event.key)
|
||||
}
|
||||
if eventsReceived != len(initList) {
|
||||
t.Errorf("Unexpected number of events: %v, expected: %v", eventsReceived, len(initList))
|
||||
}
|
||||
}
|
||||
|
||||
func drainSync(t *testing.T, store *store, ctx context.Context, sync func(*watchChan) error) map[string]*event {
|
||||
t.Helper()
|
||||
wc := store.watcher.createWatchChan(ctx, "/pods/", 0, true, false, storage.Everything)
|
||||
if err := sync(wc); err != nil {
|
||||
t.Fatalf("sync failed: %v", err)
|
||||
}
|
||||
close(wc.incomingEventChan)
|
||||
out := map[string]*event{}
|
||||
for e := range wc.incomingEventChan {
|
||||
out[e.key] = e
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// NOTE: it's not thread-safe
|
||||
type etcdClientKVWrapper struct {
|
||||
clientv3.KV
|
||||
// keeps track of the number of times Get method is called
|
||||
getCallCounter int
|
||||
// keeps track of the number of times GetStream method is called
|
||||
getStreamCallCounter int
|
||||
// when true, GetStream returns a gRPC Unimplemented error
|
||||
streamUnimplemented bool
|
||||
// getReactors is called after the etcd KV's get function is executed.
|
||||
getReactors []func()
|
||||
}
|
||||
|
|
@ -333,6 +439,14 @@ func newEtcdClientKVWrapper(kv clientv3.KV) *etcdClientKVWrapper {
|
|||
}
|
||||
}
|
||||
|
||||
func (ecw *etcdClientKVWrapper) GetStream(ctx context.Context, key string, opts ...clientv3.OpOption) (clientv3.GetStreamChan, error) {
|
||||
ecw.getStreamCallCounter++
|
||||
if ecw.streamUnimplemented {
|
||||
return nil, grpcstatus.Error(grpccodes.Unimplemented, "RangeStream is unimplemented")
|
||||
}
|
||||
return ecw.KV.GetStream(ctx, key, opts...)
|
||||
}
|
||||
|
||||
func (ecw *etcdClientKVWrapper) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
|
||||
resp, err := ecw.KV.Get(ctx, key, opts...)
|
||||
ecw.getCallCounter++
|
||||
|
|
|
|||
|
|
@ -73,6 +73,7 @@
|
|||
| DisableNodeKubeProxyVersion | :ballot_box_with_check: 1.33+ | :closed_lock_with_key: 1.36+ | 1.29–1.30 | | | 1.31– | | [code](https://cs.k8s.io/?q=%5CbDisableNodeKubeProxyVersion%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbDisableNodeKubeProxyVersion%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
|
||||
| DynamicResourceAllocation | :ballot_box_with_check: 1.34+ | :closed_lock_with_key: 1.35+ | 1.26–1.31 | 1.32–1.33 | 1.34– | | | [code](https://cs.k8s.io/?q=%5CbDynamicResourceAllocation%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbDynamicResourceAllocation%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
|
||||
| EnvFiles | :ballot_box_with_check: 1.35+ | | 1.34 | 1.35– | | | | [code](https://cs.k8s.io/?q=%5CbEnvFiles%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbEnvFiles%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
|
||||
| EtcdRangeStream | | | | 1.37– | | | | [code](https://cs.k8s.io/?q=%5CbEtcdRangeStream%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbEtcdRangeStream%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
|
||||
| EventedPLEG | | | 1.26– | | | | | [code](https://cs.k8s.io/?q=%5CbEventedPLEG%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbEventedPLEG%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
|
||||
| ExecProbeTimeout | :ballot_box_with_check: 1.20+ | :closed_lock_with_key: 1.35+ | | | 1.20– | | | [code](https://cs.k8s.io/?q=%5CbExecProbeTimeout%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbExecProbeTimeout%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
|
||||
| ExtendWebSocketsToKubelet | :ballot_box_with_check: 1.36+ | | | 1.36– | | | NodeDeclaredFeatures | [code](https://cs.k8s.io/?q=%5CbExtendWebSocketsToKubelet%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/kubernetes) [KEPs](https://cs.k8s.io/?q=%5CbExtendWebSocketsToKubelet%5Cb&i=nope&files=&excludeFiles=CHANGELOG&repos=kubernetes/enhancements) |
|
||||
|
|
|
|||
|
|
@ -615,6 +615,12 @@
|
|||
lockToDefault: false
|
||||
preRelease: Beta
|
||||
version: "1.35"
|
||||
- name: EtcdRangeStream
|
||||
versionedSpecs:
|
||||
- default: false
|
||||
lockToDefault: false
|
||||
preRelease: Beta
|
||||
version: "1.37"
|
||||
- name: EventedPLEG
|
||||
versionedSpecs:
|
||||
- default: false
|
||||
|
|
|
|||
Loading…
Reference in a new issue