diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher.go index 2817a93dd0c..b3b215ba3ac 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher.go @@ -75,11 +75,14 @@ func (lw *listerWatcher) List(options metav1.ListOptions) (runtime.Object, error // Implements cache.ListerWatcher interface. func (lw *listerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) { + pred := storage.Everything + pred.AllowWatchBookmarks = options.AllowWatchBookmarks opts := storage.ListOptions{ - ResourceVersion: options.ResourceVersion, - Predicate: storage.Everything, - Recursive: true, - ProgressNotify: true, + ResourceVersion: options.ResourceVersion, + Predicate: pred, + Recursive: true, + ProgressNotify: true, + SendInitialEvents: options.SendInitialEvents, } ctx := context.Background() if lw.contextMetadata != nil { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher_test.go index a5a279cb699..18a918506ea 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher_test.go @@ -22,7 +22,14 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/apis/example" + "k8s.io/apiserver/pkg/features" + storagetesting "k8s.io/apiserver/pkg/storage/testing" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/tools/cache" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/utils/ptr" ) func TestCacherListerWatcher(t *testing.T) { @@ -119,3 +126,54 @@ func TestCacherListerWatcherPagination(t *testing.T) { } } + +func TestCacherListerWatcherListWatch(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true) + + prefix := "pods" + fn := func() runtime.Object { return &example.PodList{} } + server, store := newEtcdTestStorage(t, prefix) + defer server.Terminate(t) + + makePodFn := func() *example.Pod { + return &example.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}, + } + } + ctx := context.TODO() + pod := makePodFn() + key := computePodKey(pod) + createdPod := &example.Pod{} + if err := store.Create(ctx, key, makePodFn(), createdPod, 0); err != nil { + t.Fatalf("Create failed: %v", err) + } + + lw := NewListerWatcher(store, prefix, fn, nil) + target := cache.ToListerWatcherWithContext(lw) + watchListOptions := metav1.ListOptions{ + Watch: true, + AllowWatchBookmarks: true, + SendInitialEvents: ptr.To(true), + } + w, err := target.WatchWithContext(ctx, watchListOptions) + if err != nil { + t.Fatal(err) + } + defer w.Stop() + + expectedWatchEvents := []watch.Event{ + {Type: watch.Added, Object: createdPod}, + { + Type: watch.Bookmark, + Object: &example.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: createdPod.ResourceVersion, + Annotations: map[string]string{metav1.InitialEventsAnnotationKey: "true"}, + }, + }, + }, + } + + storagetesting.TestCheckResultsInStrictOrder(t, w, expectedWatchEvents) + storagetesting.TestCheckNoMoreResultsWithIgnoreFunc(t, w, nil) +}