diff --git a/pkg/kubelet/configmap/configmap_manager.go b/pkg/kubelet/configmap/configmap_manager.go index 08bc9674668..822d08af685 100644 --- a/pkg/kubelet/configmap/configmap_manager.go +++ b/pkg/kubelet/configmap/configmap_manager.go @@ -23,6 +23,7 @@ import ( v1 "k8s.io/api/core/v1" clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" podutil "k8s.io/kubernetes/pkg/api/v1/pod" corev1 "k8s.io/kubernetes/pkg/apis/core/v1" "k8s.io/kubernetes/pkg/kubelet/util/manager" @@ -151,8 +152,11 @@ func NewWatchingConfigMapManager(kubeClient clientset.Interface, resyncInterval } return false } + listWatcherWithWatchListSemanticsWrapper := func(lw *cache.ListWatch) cache.ListerWatcher { + return cache.ToListWatcherWithWatchListSemantics(lw, kubeClient) + } gr := corev1.Resource("configmap") return &configMapManager{ - manager: manager.NewWatchBasedManager(listConfigMap, watchConfigMap, newConfigMap, isImmutable, gr, resyncInterval, getConfigMapNames), + manager: manager.NewWatchBasedManager(listConfigMap, watchConfigMap, newConfigMap, isImmutable, listWatcherWithWatchListSemanticsWrapper, gr, resyncInterval, getConfigMapNames), } } diff --git a/pkg/kubelet/secret/secret_manager.go b/pkg/kubelet/secret/secret_manager.go index dd6ec256ad6..eace33c5a14 100644 --- a/pkg/kubelet/secret/secret_manager.go +++ b/pkg/kubelet/secret/secret_manager.go @@ -23,6 +23,7 @@ import ( v1 "k8s.io/api/core/v1" clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" podutil "k8s.io/kubernetes/pkg/api/v1/pod" corev1 "k8s.io/kubernetes/pkg/apis/core/v1" "k8s.io/kubernetes/pkg/kubelet/util/manager" @@ -152,8 +153,11 @@ func NewWatchingSecretManager(kubeClient clientset.Interface, resyncInterval tim } return false } + listWatcherWithWatchListSemanticsWrapper := func(lw *cache.ListWatch) cache.ListerWatcher { + return cache.ToListWatcherWithWatchListSemantics(lw, kubeClient) + } gr := corev1.Resource("secret") return &secretManager{ - manager: manager.NewWatchBasedManager(listSecret, watchSecret, newSecret, isImmutable, gr, resyncInterval, getSecretNames), + manager: manager.NewWatchBasedManager(listSecret, watchSecret, newSecret, isImmutable, listWatcherWithWatchListSemanticsWrapper, gr, resyncInterval, getSecretNames), } } diff --git a/pkg/kubelet/util/manager/watch_based_manager.go b/pkg/kubelet/util/manager/watch_based_manager.go index 5def888fbfe..6abf0b31ddd 100644 --- a/pkg/kubelet/util/manager/watch_based_manager.go +++ b/pkg/kubelet/util/manager/watch_based_manager.go @@ -44,6 +44,8 @@ type watchObjectFunc func(string, metav1.ListOptions) (watch.Interface, error) type newObjectFunc func() runtime.Object type isImmutableFunc func(runtime.Object) bool +type listWatcherWithWatchListSemanticsWrapperFunc func(lw *cache.ListWatch) cache.ListerWatcher + // objectCacheItem is a single item stored in objectCache. type objectCacheItem struct { refMap map[types.UID]int @@ -158,13 +160,14 @@ func (c *cacheStore) unsetInitialized() { // objectCache is a local cache of objects propagated via // individual watches. type objectCache struct { - listObject listObjectFunc - watchObject watchObjectFunc - newObject newObjectFunc - isImmutable isImmutableFunc - groupResource schema.GroupResource - clock clock.Clock - maxIdleTime time.Duration + listObject listObjectFunc + watchObject watchObjectFunc + newObject newObjectFunc + isImmutable isImmutableFunc + listWatcherWithWatchListSemanticsWrapper listWatcherWithWatchListSemanticsWrapperFunc + groupResource schema.GroupResource + clock clock.Clock + maxIdleTime time.Duration lock sync.RWMutex items map[objectKey]*objectCacheItem @@ -179,6 +182,7 @@ func NewObjectCache( watchObject watchObjectFunc, newObject newObjectFunc, isImmutable isImmutableFunc, + listWatcherWithWatchListSemanticsWrapper listWatcherWithWatchListSemanticsWrapperFunc, groupResource schema.GroupResource, clock clock.Clock, maxIdleTime time.Duration, @@ -189,14 +193,15 @@ func NewObjectCache( } store := &objectCache{ - listObject: listObject, - watchObject: watchObject, - newObject: newObject, - isImmutable: isImmutable, - groupResource: groupResource, - clock: clock, - maxIdleTime: maxIdleTime, - items: make(map[objectKey]*objectCacheItem), + listObject: listObject, + watchObject: watchObject, + newObject: newObject, + isImmutable: isImmutable, + listWatcherWithWatchListSemanticsWrapper: listWatcherWithWatchListSemanticsWrapper, + groupResource: groupResource, + clock: clock, + maxIdleTime: maxIdleTime, + items: make(map[objectKey]*objectCacheItem), } go wait.Until(store.startRecycleIdleWatch, time.Minute, stopCh) @@ -226,7 +231,7 @@ func (c *objectCache) newReflectorLocked(namespace, name string) *objectCacheIte } store := c.newStore() reflector := cache.NewReflectorWithOptions( - &cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}, + c.listWatcherWithWatchListSemanticsWrapper(&cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}), c.newObject(), store, cache.ReflectorOptions{ @@ -392,6 +397,7 @@ func NewWatchBasedManager( watchObject watchObjectFunc, newObject newObjectFunc, isImmutable isImmutableFunc, + listWatcherWithWatchListSemanticsWrapper listWatcherWithWatchListSemanticsWrapperFunc, groupResource schema.GroupResource, resyncInterval time.Duration, getReferencedObjects func(*v1.Pod) sets.Set[string]) Manager { @@ -403,6 +409,6 @@ func NewWatchBasedManager( maxIdleTime := resyncInterval * 5 // TODO propagate stopCh from the higher level. - objectStore := NewObjectCache(listObject, watchObject, newObject, isImmutable, groupResource, clock.RealClock{}, maxIdleTime, wait.NeverStop) + objectStore := NewObjectCache(listObject, watchObject, newObject, isImmutable, listWatcherWithWatchListSemanticsWrapper, groupResource, clock.RealClock{}, maxIdleTime, wait.NeverStop) return NewCacheBasedManager(objectStore, getReferencedObjects) } diff --git a/pkg/kubelet/util/manager/watch_based_manager_test.go b/pkg/kubelet/util/manager/watch_based_manager_test.go index 552cfb2b160..5e793b652e6 100644 --- a/pkg/kubelet/util/manager/watch_based_manager_test.go +++ b/pkg/kubelet/util/manager/watch_based_manager_test.go @@ -19,6 +19,8 @@ package manager import ( "context" "fmt" + "net/http" + "net/http/httptest" "strings" "testing" "time" @@ -29,15 +31,17 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" - + clientfeatures "k8s.io/client-go/features" + clientfeaturestesting "k8s.io/client-go/features/testing" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/rest" core "k8s.io/client-go/testing" - + "k8s.io/client-go/tools/cache" corev1 "k8s.io/kubernetes/pkg/apis/core/v1" - "k8s.io/kubernetes/test/utils/ktesting" "k8s.io/utils/clock" testingclock "k8s.io/utils/clock/testing" @@ -66,10 +70,13 @@ func isSecretImmutable(object runtime.Object) bool { func newSecretCache(ctx context.Context, fakeClient clientset.Interface, fakeClock clock.Clock, maxIdleTime time.Duration) *objectCache { return &objectCache{ - listObject: listSecret(ctx, fakeClient), - watchObject: watchSecret(ctx, fakeClient), - newObject: func() runtime.Object { return &v1.Secret{} }, - isImmutable: isSecretImmutable, + listObject: listSecret(ctx, fakeClient), + watchObject: watchSecret(ctx, fakeClient), + newObject: func() runtime.Object { return &v1.Secret{} }, + isImmutable: isSecretImmutable, + listWatcherWithWatchListSemanticsWrapper: func(lw *cache.ListWatch) cache.ListerWatcher { + return cache.ToListWatcherWithWatchListSemantics(lw, fakeClient) + }, groupResource: corev1.Resource("secret"), clock: fakeClock, maxIdleTime: maxIdleTime, @@ -633,3 +640,91 @@ func TestRefMapHandlesReferencesCorrectly(t *testing.T) { }) } } + +func TestUnSupportWatchListSemantics(t *testing.T) { + clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true) + + fakeClock := testingclock.NewFakeClock(time.Now()) + // The fake client doesn’t support WatchList semantics, + // so we don’t need to prepare a response. + fakeClient := fake.NewClientset() + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + target := newSecretCache(context.TODO(), fakeClient, fakeClock, time.Minute) + + ret := target.newReflectorLocked("ns", "obj") + defer ret.stop() + + if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 3*time.Second, true, func(ctx context.Context) (bool, error) { + return ret.store.hasSynced(), nil + }); err != nil { + t.Fatal(err) + } +} + +func TestWatchListSemanticsSimple(t *testing.T) { + clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + if _, ok := req.URL.Query()["watch"]; !ok { + t.Errorf("expected a watch request, params: %v", req.URL.Query()) + http.Error(w, fmt.Errorf("unexpected request").Error(), http.StatusInternalServerError) + return + } + + obj := &v1.Secret{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Secret", + }, + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + metav1.InitialEventsAnnotationKey: "true", + }, + }, + } + rawObj, err := json.Marshal(obj) + if err != nil { + t.Errorf("failed to marshal rawObj: %v", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + watchEvent := &metav1.WatchEvent{ + Type: string(watch.Bookmark), + Object: runtime.RawExtension{Raw: rawObj}, + } + rawRsp, err := json.Marshal(watchEvent) + if err != nil { + t.Errorf("failed to marshal watchEvent: %v", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + _, err = w.Write(rawRsp) + if err != nil { + t.Fatalf("failed to write response: %v", err) + } + })) + defer server.Close() + + cfg := &rest.Config{Host: server.URL} + client, err := clientset.NewForConfig(cfg) + if err != nil { + t.Fatal(err) + } + + fakeClock := testingclock.NewFakeClock(time.Now()) + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + target := newSecretCache(context.TODO(), client, fakeClock, time.Second) + + ret := target.newReflectorLocked("ns", "obj") + defer ret.stop() + + if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 3*time.Second, true, func(ctx context.Context) (bool, error) { + return ret.store.hasSynced(), nil + }); err != nil { + t.Fatal(err) + } +} diff --git a/test/integration/kubelet/watch_manager_test.go b/test/integration/kubelet/watch_manager_test.go index 5754c4eb112..0e3804b001e 100644 --- a/test/integration/kubelet/watch_manager_test.go +++ b/test/integration/kubelet/watch_manager_test.go @@ -31,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" "k8s.io/kubernetes/pkg/kubelet/util/manager" "k8s.io/kubernetes/test/integration/framework" @@ -66,12 +67,15 @@ func TestWatchBasedManager(t *testing.T) { // We want all watches to be up and running to stress test it. // So don't treat any secret as immutable here. isImmutable := func(_ runtime.Object) bool { return false } + listWatcherWithWatchListSemanticsWrapper := func(lw *cache.ListWatch) cache.ListerWatcher { + return cache.ToListWatcherWithWatchListSemantics(lw, client) + } fakeClock := testingclock.NewFakeClock(time.Now()) stopCh := make(chan struct{}) t.Cleanup(func() { close(stopCh) }) - store := manager.NewObjectCache(listObj, watchObj, newObj, isImmutable, schema.GroupResource{Group: "v1", Resource: "secrets"}, fakeClock, time.Minute, stopCh) + store := manager.NewObjectCache(listObj, watchObj, newObj, isImmutable, listWatcherWithWatchListSemanticsWrapper, schema.GroupResource{Group: "v1", Resource: "secrets"}, fakeClock, time.Minute, stopCh) // create 1000 secrets in parallel t.Log(time.Now(), "creating 1000 secrets")