pkg/kubelet/watch_base_manager: wraps the LW with WatchList semantics

This commit is contained in:
Lukasz Szaszkiewicz 2025-10-20 14:21:55 +02:00
parent e2453c503e
commit da77cf84ba
5 changed files with 140 additions and 27 deletions

View file

@ -23,6 +23,7 @@ import (
v1 "k8s.io/api/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
corev1 "k8s.io/kubernetes/pkg/apis/core/v1"
"k8s.io/kubernetes/pkg/kubelet/util/manager"
@ -151,8 +152,11 @@ func NewWatchingConfigMapManager(kubeClient clientset.Interface, resyncInterval
}
return false
}
listWatcherWithWatchListSemanticsWrapper := func(lw *cache.ListWatch) cache.ListerWatcher {
return cache.ToListWatcherWithWatchListSemantics(lw, kubeClient)
}
gr := corev1.Resource("configmap")
return &configMapManager{
manager: manager.NewWatchBasedManager(listConfigMap, watchConfigMap, newConfigMap, isImmutable, gr, resyncInterval, getConfigMapNames),
manager: manager.NewWatchBasedManager(listConfigMap, watchConfigMap, newConfigMap, isImmutable, listWatcherWithWatchListSemanticsWrapper, gr, resyncInterval, getConfigMapNames),
}
}

View file

@ -23,6 +23,7 @@ import (
v1 "k8s.io/api/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
corev1 "k8s.io/kubernetes/pkg/apis/core/v1"
"k8s.io/kubernetes/pkg/kubelet/util/manager"
@ -152,8 +153,11 @@ func NewWatchingSecretManager(kubeClient clientset.Interface, resyncInterval tim
}
return false
}
listWatcherWithWatchListSemanticsWrapper := func(lw *cache.ListWatch) cache.ListerWatcher {
return cache.ToListWatcherWithWatchListSemantics(lw, kubeClient)
}
gr := corev1.Resource("secret")
return &secretManager{
manager: manager.NewWatchBasedManager(listSecret, watchSecret, newSecret, isImmutable, gr, resyncInterval, getSecretNames),
manager: manager.NewWatchBasedManager(listSecret, watchSecret, newSecret, isImmutable, listWatcherWithWatchListSemanticsWrapper, gr, resyncInterval, getSecretNames),
}
}

View file

@ -44,6 +44,8 @@ type watchObjectFunc func(string, metav1.ListOptions) (watch.Interface, error)
type newObjectFunc func() runtime.Object
type isImmutableFunc func(runtime.Object) bool
type listWatcherWithWatchListSemanticsWrapperFunc func(lw *cache.ListWatch) cache.ListerWatcher
// objectCacheItem is a single item stored in objectCache.
type objectCacheItem struct {
refMap map[types.UID]int
@ -158,13 +160,14 @@ func (c *cacheStore) unsetInitialized() {
// objectCache is a local cache of objects propagated via
// individual watches.
type objectCache struct {
listObject listObjectFunc
watchObject watchObjectFunc
newObject newObjectFunc
isImmutable isImmutableFunc
groupResource schema.GroupResource
clock clock.Clock
maxIdleTime time.Duration
listObject listObjectFunc
watchObject watchObjectFunc
newObject newObjectFunc
isImmutable isImmutableFunc
listWatcherWithWatchListSemanticsWrapper listWatcherWithWatchListSemanticsWrapperFunc
groupResource schema.GroupResource
clock clock.Clock
maxIdleTime time.Duration
lock sync.RWMutex
items map[objectKey]*objectCacheItem
@ -179,6 +182,7 @@ func NewObjectCache(
watchObject watchObjectFunc,
newObject newObjectFunc,
isImmutable isImmutableFunc,
listWatcherWithWatchListSemanticsWrapper listWatcherWithWatchListSemanticsWrapperFunc,
groupResource schema.GroupResource,
clock clock.Clock,
maxIdleTime time.Duration,
@ -189,14 +193,15 @@ func NewObjectCache(
}
store := &objectCache{
listObject: listObject,
watchObject: watchObject,
newObject: newObject,
isImmutable: isImmutable,
groupResource: groupResource,
clock: clock,
maxIdleTime: maxIdleTime,
items: make(map[objectKey]*objectCacheItem),
listObject: listObject,
watchObject: watchObject,
newObject: newObject,
isImmutable: isImmutable,
listWatcherWithWatchListSemanticsWrapper: listWatcherWithWatchListSemanticsWrapper,
groupResource: groupResource,
clock: clock,
maxIdleTime: maxIdleTime,
items: make(map[objectKey]*objectCacheItem),
}
go wait.Until(store.startRecycleIdleWatch, time.Minute, stopCh)
@ -226,7 +231,7 @@ func (c *objectCache) newReflectorLocked(namespace, name string) *objectCacheIte
}
store := c.newStore()
reflector := cache.NewReflectorWithOptions(
&cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc},
c.listWatcherWithWatchListSemanticsWrapper(&cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}),
c.newObject(),
store,
cache.ReflectorOptions{
@ -392,6 +397,7 @@ func NewWatchBasedManager(
watchObject watchObjectFunc,
newObject newObjectFunc,
isImmutable isImmutableFunc,
listWatcherWithWatchListSemanticsWrapper listWatcherWithWatchListSemanticsWrapperFunc,
groupResource schema.GroupResource,
resyncInterval time.Duration,
getReferencedObjects func(*v1.Pod) sets.Set[string]) Manager {
@ -403,6 +409,6 @@ func NewWatchBasedManager(
maxIdleTime := resyncInterval * 5
// TODO propagate stopCh from the higher level.
objectStore := NewObjectCache(listObject, watchObject, newObject, isImmutable, groupResource, clock.RealClock{}, maxIdleTime, wait.NeverStop)
objectStore := NewObjectCache(listObject, watchObject, newObject, isImmutable, listWatcherWithWatchListSemanticsWrapper, groupResource, clock.RealClock{}, maxIdleTime, wait.NeverStop)
return NewCacheBasedManager(objectStore, getReferencedObjects)
}

View file

@ -19,6 +19,8 @@ package manager
import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
@ -29,15 +31,17 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
clientfeatures "k8s.io/client-go/features"
clientfeaturestesting "k8s.io/client-go/features/testing"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
corev1 "k8s.io/kubernetes/pkg/apis/core/v1"
"k8s.io/kubernetes/test/utils/ktesting"
"k8s.io/utils/clock"
testingclock "k8s.io/utils/clock/testing"
@ -66,10 +70,13 @@ func isSecretImmutable(object runtime.Object) bool {
func newSecretCache(ctx context.Context, fakeClient clientset.Interface, fakeClock clock.Clock, maxIdleTime time.Duration) *objectCache {
return &objectCache{
listObject: listSecret(ctx, fakeClient),
watchObject: watchSecret(ctx, fakeClient),
newObject: func() runtime.Object { return &v1.Secret{} },
isImmutable: isSecretImmutable,
listObject: listSecret(ctx, fakeClient),
watchObject: watchSecret(ctx, fakeClient),
newObject: func() runtime.Object { return &v1.Secret{} },
isImmutable: isSecretImmutable,
listWatcherWithWatchListSemanticsWrapper: func(lw *cache.ListWatch) cache.ListerWatcher {
return cache.ToListWatcherWithWatchListSemantics(lw, fakeClient)
},
groupResource: corev1.Resource("secret"),
clock: fakeClock,
maxIdleTime: maxIdleTime,
@ -633,3 +640,91 @@ func TestRefMapHandlesReferencesCorrectly(t *testing.T) {
})
}
}
func TestUnSupportWatchListSemantics(t *testing.T) {
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true)
fakeClock := testingclock.NewFakeClock(time.Now())
// The fake client doesnt support WatchList semantics,
// so we dont need to prepare a response.
fakeClient := fake.NewClientset()
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel()
target := newSecretCache(context.TODO(), fakeClient, fakeClock, time.Minute)
ret := target.newReflectorLocked("ns", "obj")
defer ret.stop()
if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 3*time.Second, true, func(ctx context.Context) (bool, error) {
return ret.store.hasSynced(), nil
}); err != nil {
t.Fatal(err)
}
}
func TestWatchListSemanticsSimple(t *testing.T) {
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if _, ok := req.URL.Query()["watch"]; !ok {
t.Errorf("expected a watch request, params: %v", req.URL.Query())
http.Error(w, fmt.Errorf("unexpected request").Error(), http.StatusInternalServerError)
return
}
obj := &v1.Secret{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Secret",
},
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
metav1.InitialEventsAnnotationKey: "true",
},
},
}
rawObj, err := json.Marshal(obj)
if err != nil {
t.Errorf("failed to marshal rawObj: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
watchEvent := &metav1.WatchEvent{
Type: string(watch.Bookmark),
Object: runtime.RawExtension{Raw: rawObj},
}
rawRsp, err := json.Marshal(watchEvent)
if err != nil {
t.Errorf("failed to marshal watchEvent: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
_, err = w.Write(rawRsp)
if err != nil {
t.Fatalf("failed to write response: %v", err)
}
}))
defer server.Close()
cfg := &rest.Config{Host: server.URL}
client, err := clientset.NewForConfig(cfg)
if err != nil {
t.Fatal(err)
}
fakeClock := testingclock.NewFakeClock(time.Now())
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel()
target := newSecretCache(context.TODO(), client, fakeClock, time.Second)
ret := target.newReflectorLocked("ns", "obj")
defer ret.stop()
if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 3*time.Second, true, func(ctx context.Context) (bool, error) {
return ret.store.hasSynced(), nil
}); err != nil {
t.Fatal(err)
}
}

View file

@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/pkg/kubelet/util/manager"
"k8s.io/kubernetes/test/integration/framework"
@ -66,12 +67,15 @@ func TestWatchBasedManager(t *testing.T) {
// We want all watches to be up and running to stress test it.
// So don't treat any secret as immutable here.
isImmutable := func(_ runtime.Object) bool { return false }
listWatcherWithWatchListSemanticsWrapper := func(lw *cache.ListWatch) cache.ListerWatcher {
return cache.ToListWatcherWithWatchListSemantics(lw, client)
}
fakeClock := testingclock.NewFakeClock(time.Now())
stopCh := make(chan struct{})
t.Cleanup(func() { close(stopCh) })
store := manager.NewObjectCache(listObj, watchObj, newObj, isImmutable, schema.GroupResource{Group: "v1", Resource: "secrets"}, fakeClock, time.Minute, stopCh)
store := manager.NewObjectCache(listObj, watchObj, newObj, isImmutable, listWatcherWithWatchListSemanticsWrapper, schema.GroupResource{Group: "v1", Resource: "secrets"}, fakeClock, time.Minute, stopCh)
// create 1000 secrets in parallel
t.Log(time.Now(), "creating 1000 secrets")