Merge pull request #132479 from p0lyn0mial/upstream-watchlist-cacher-listwatcher

apiserver/cacher: properly wire listwatch options to the listwatcher
This commit is contained in:
Kubernetes Prow Robot 2025-06-24 03:48:29 -07:00 committed by GitHub
commit ae15bc5613
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 65 additions and 4 deletions

View file

@ -75,11 +75,14 @@ func (lw *listerWatcher) List(options metav1.ListOptions) (runtime.Object, error
// Implements cache.ListerWatcher interface.
func (lw *listerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
pred := storage.Everything
pred.AllowWatchBookmarks = options.AllowWatchBookmarks
opts := storage.ListOptions{
ResourceVersion: options.ResourceVersion,
Predicate: storage.Everything,
Recursive: true,
ProgressNotify: true,
ResourceVersion: options.ResourceVersion,
Predicate: pred,
Recursive: true,
ProgressNotify: true,
SendInitialEvents: options.SendInitialEvents,
}
ctx := context.Background()
if lw.contextMetadata != nil {

View file

@ -22,7 +22,14 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/apis/example"
"k8s.io/apiserver/pkg/features"
storagetesting "k8s.io/apiserver/pkg/storage/testing"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/cache"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/utils/ptr"
)
func TestCacherListerWatcher(t *testing.T) {
@ -119,3 +126,54 @@ func TestCacherListerWatcherPagination(t *testing.T) {
}
}
func TestCacherListerWatcherListWatch(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)
prefix := "pods"
fn := func() runtime.Object { return &example.PodList{} }
server, store := newEtcdTestStorage(t, prefix)
defer server.Terminate(t)
makePodFn := func() *example.Pod {
return &example.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"},
}
}
ctx := context.TODO()
pod := makePodFn()
key := computePodKey(pod)
createdPod := &example.Pod{}
if err := store.Create(ctx, key, makePodFn(), createdPod, 0); err != nil {
t.Fatalf("Create failed: %v", err)
}
lw := NewListerWatcher(store, prefix, fn, nil)
target := cache.ToListerWatcherWithContext(lw)
watchListOptions := metav1.ListOptions{
Watch: true,
AllowWatchBookmarks: true,
SendInitialEvents: ptr.To(true),
}
w, err := target.WatchWithContext(ctx, watchListOptions)
if err != nil {
t.Fatal(err)
}
defer w.Stop()
expectedWatchEvents := []watch.Event{
{Type: watch.Added, Object: createdPod},
{
Type: watch.Bookmark,
Object: &example.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: createdPod.ResourceVersion,
Annotations: map[string]string{metav1.InitialEventsAnnotationKey: "true"},
},
},
},
}
storagetesting.TestCheckResultsInStrictOrder(t, w, expectedWatchEvents)
storagetesting.TestCheckNoMoreResultsWithIgnoreFunc(t, w, nil)
}