client-go testing: support List+Watch with ResourceVersion

Quite a lot of unit tests set up informers with a fake client, do
informerFactory.WaitForCacheSync, then create or modify objects. Such tests
suffered from a race: because the fake client only delivered objects to the
watch after the watch has been created, creating an object too early caused
that object to not get delivered to the informer.

Usually the timing worked out okay because WaitForCacheSync typically slept a
bit while polling, giving the Watch call time to complete, but this race has
also gone wrong occasionally. Now with WaitForCacheSync returning more promptly
without polling (work in progress), the race goes wrong more often.

Instead of working around this in unit tests it's better to improve the fake
client such that List+Watch works reliably, regardless of the timing. The fake
client has traditionally not touched ResourceVersion in stored objects and
doing so now might break unit tests, so the added support for ResourceVersion
is intentionally limited to List+Watch.

The test simulates "real" usage of informers. It runs in a synctest bubble and
completes quickly:

    go  test -v .
    === RUN   TestListAndWatch
        listandwatch_test.go:67: I0101 01:00:00.000000] Listed configMaps="&ConfigMapList{ListMeta:{ 1  <nil>},Items:[]ConfigMap{ConfigMap{ObjectMeta:{cm1  default    0 0001-01-01 00:00:00 +0000 UTC <nil> <nil> map[] map[] [] [] []},Data:map[string]string{},BinaryData:map[string][]byte{},Immutable:nil,},},}" err=null
        listandwatch_test.go:79: I0101 01:00:00.000000] Delaying Watch...
        listandwatch_test.go:90: I0101 01:00:00.100000] Caches synced
        listandwatch_test.go:107: I0101 01:00:00.100000] Created second ConfigMap
        listandwatch_test.go:81: I0101 01:00:00.100000] Continuing Watch...
    --- PASS: TestListAndWatch (0.00s)
    PASS
    ok  	k8s.io/client-go/testing/internal	0.009s

Some users of the fake client need to be updated to avoid test failures:
- ListMeta comparisons have to be updated.
- Optional: pass ListOptions into tracker.Watch. It's optional because
  the implementation behaves as before when options are missing,
  but the List+Watch race fix only works when options are passed.
This commit is contained in:
Patrick Ohly 2025-12-27 21:57:54 +01:00
parent d04610bbfb
commit 5644850607
9 changed files with 224 additions and 24 deletions

View file

@ -164,7 +164,7 @@ func NewPolicyTestContext[P, B runtime.Object, E Evaluator](
return policiesAndBindingsTracker.List(fakePolicyGVR, fakePolicyGVK, "")
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return policiesAndBindingsTracker.Watch(fakePolicyGVR, "")
return policiesAndBindingsTracker.Watch(fakePolicyGVR, "", options)
},
}, policiesAndBindingsTracker),
Pexample,
@ -177,7 +177,7 @@ func NewPolicyTestContext[P, B runtime.Object, E Evaluator](
return policiesAndBindingsTracker.List(fakeBindingGVR, fakeBindingGVK, "")
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return policiesAndBindingsTracker.Watch(fakeBindingGVR, "")
return policiesAndBindingsTracker.Watch(fakeBindingGVR, "", options)
},
}, policiesAndBindingsTracker),
Bexample,

View file

@ -117,7 +117,7 @@ func setupTest(ctx context.Context, customReconciler func(string, string, runtim
return tracker.List(fakeGVR, fakeGVK, "")
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return tracker.Watch(fakeGVR, "")
return tracker.Watch(fakeGVR, "", options)
},
}, tracker), &unstructured.Unstructured{}, 30*time.Second, nil)}

View file

@ -114,9 +114,13 @@ func NewSimpleDynamicClientWithCustomListKinds(scheme *runtime.Scheme, gvrToList
cs := &FakeDynamicClient{scheme: scheme, gvrToListKind: completeGVRToListKind, tracker: o}
cs.AddReactor("*", "*", testing.ObjectReaction(o))
cs.AddWatchReactor("*", func(action testing.Action) (handled bool, ret watch.Interface, err error) {
var opts metav1.ListOptions
if watchAction, ok := action.(testing.WatchActionImpl); ok {
opts = watchAction.ListOptions
}
gvr := action.GetResource()
ns := action.GetNamespace()
watch, err := o.Watch(gvr, ns)
watch, err := o.Watch(gvr, ns, opts)
if err != nil {
return false, nil, err
}

View file

@ -186,7 +186,7 @@ func Test_ListKind(t *testing.T) {
"kind": "TheKindList",
"metadata": map[string]interface{}{
"continue": "",
"resourceVersion": "",
"resourceVersion": "3", // Three objects created so far.
},
},
Items: []unstructured.Unstructured{
@ -340,7 +340,7 @@ func TestListWithUnstructuredObjectsAndTypedScheme(t *testing.T) {
expectedList := &unstructured.UnstructuredList{}
expectedList.SetGroupVersionKind(listGVK)
expectedList.SetResourceVersion("") // by product of the fake setting resource version
expectedList.SetResourceVersion("1") // One object created so far.
expectedList.SetContinue("")
expectedList.Items = append(expectedList.Items, u)
@ -402,7 +402,7 @@ func TestListWithNoScheme(t *testing.T) {
expectedList := &unstructured.UnstructuredList{}
expectedList.SetGroupVersionKind(listGVK)
expectedList.SetResourceVersion("") // by product of the fake setting resource version
expectedList.SetResourceVersion("1") // One object created so far.
expectedList.SetContinue("")
expectedList.Items = append(expectedList.Items, u)
@ -443,7 +443,7 @@ func TestListWithTypedFixtures(t *testing.T) {
expectedList := &unstructured.UnstructuredList{}
expectedList.SetGroupVersionKind(listGVK)
expectedList.SetResourceVersion("") // by product of the fake setting resource version
expectedList.SetResourceVersion("1") // One object created so far.
expectedList.SetContinue("")
expectedList.Items = []unstructured.Unstructured{u}

View file

@ -41,9 +41,13 @@ func TestFakeClient(t *testing.T) {
client := fake.NewSimpleClientset()
// A catch-all watch reactor that allows us to inject the watcherStarted channel.
client.PrependWatchReactor("*", func(action clienttesting.Action) (handled bool, ret watch.Interface, err error) {
var opts metav1.ListOptions
if watchAction, ok := action.(clienttesting.WatchActionImpl); ok {
opts = watchAction.ListOptions
}
gvr := action.GetResource()
ns := action.GetNamespace()
watch, err := client.Tracker().Watch(gvr, ns)
watch, err := client.Tracker().Watch(gvr, ns, opts)
if err != nil {
return false, nil, err
}

View file

@ -68,9 +68,13 @@ func NewSimpleMetadataClient(scheme *runtime.Scheme, objects ...runtime.Object)
cs := &FakeMetadataClient{scheme: scheme, tracker: o}
cs.AddReactor("*", "*", testing.ObjectReaction(o))
cs.AddWatchReactor("*", func(action testing.Action) (handled bool, ret watch.Interface, err error) {
var opts metav1.ListOptions
if watchAction, ok := action.(testing.WatchActionImpl); ok {
opts = watchAction.ListOptions
}
gvr := action.GetResource()
ns := action.GetNamespace()
watch, err := o.Watch(gvr, ns)
watch, err := o.Watch(gvr, ns, opts)
if err != nil {
return false, nil, err
}

View file

@ -20,6 +20,7 @@ import (
"fmt"
"reflect"
"sort"
"strconv"
"strings"
"sync"
@ -288,13 +289,41 @@ type tracker struct {
scheme ObjectScheme
decoder runtime.Decoder
lock sync.RWMutex
objects map[schema.GroupVersionResource]map[types.NamespacedName]runtime.Object
objects map[schema.GroupVersionResource]map[types.NamespacedName]versionedObject
// The value type of watchers is a map of which the key is either a namespace or
// all/non namespace aka "" and its value is list of fake watchers.
// Manipulations on resources will broadcast the notification events into the
// watchers' channel. Note that too many unhandled events (currently 100,
// see apimachinery/pkg/watch.DefaultChanSize) will cause a panic.
watchers map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher
// resourceVersions is the highest resource version of any tracked object with
// a certain gvr. The resource version for that set of objects gets bumped before
// storing a new or modified object, so all entries are larger than 0.
// Object content does not get changed to preserve the traditional behavior
// (hence also the versionedObject type instead of storing a runtime.Object
// with modified ResourceVersion).
//
// Resource version support (https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions)
// is very limited. It only supports one particular use case:
// List (no resource version check, returned ListMeta has ResourceVersion set) +
// Watch (Exact match for the ResourceVersion returned by List).
//
// This is sufficient for Reflector.ListAndWatch (https://github.com/kubernetes/kubernetes/blob/b53b9fb5573323484af9a19cf3f5bfe80760abba/staging/src/k8s.io/client-go/tools/cache/reflector.go#L401)
// when setting up informers in an informer factory.
//
// Strictly speaking, this should be by GroupVersion. But objects are
// also tracked by GroupVersionResource instead of GroupVersion, so the
// same is done here to match how List is implemented.
resourceVersions map[schema.GroupVersionResource]int64
}
// versionedObject stores an object together with the resource version that was
// assigned to it by the tracker. The version could be stored inline in the object,
// but this is not how fake client-go has traditionally worked and starting to do
// that now might break tests.
type versionedObject struct {
resourceVersion int64
runtime.Object
}
var _ ObjectTracker = &tracker{}
@ -303,10 +332,11 @@ var _ ObjectTracker = &tracker{}
// of objects for the fake clientset. Mostly useful for unit tests.
func NewObjectTracker(scheme ObjectScheme, decoder runtime.Decoder) ObjectTracker {
return &tracker{
scheme: scheme,
decoder: decoder,
objects: make(map[schema.GroupVersionResource]map[types.NamespacedName]runtime.Object),
watchers: make(map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher),
scheme: scheme,
decoder: decoder,
objects: make(map[schema.GroupVersionResource]map[types.NamespacedName]versionedObject),
watchers: make(map[schema.GroupVersionResource]map[string][]*watch.RaceFreeFakeWatcher),
resourceVersions: make(map[schema.GroupVersionResource]int64),
}
}
@ -343,13 +373,20 @@ func (t *tracker) List(gvr schema.GroupVersionResource, gvk schema.GroupVersionK
return list, nil
}
matchingObjs, err := filterByNamespace(objs, ns)
matchingVersionedObjs, err := filterByNamespace(objs, ns)
if err != nil {
return nil, err
}
matchingObjs := make([]runtime.Object, len(matchingVersionedObjs))
for i, obj := range matchingVersionedObjs {
matchingObjs[i] = obj.Object
}
if err := meta.SetList(list, matchingObjs); err != nil {
return nil, err
}
if listMeta, err := meta.ListAccessor(list); err == nil {
listMeta.SetResourceVersion(fmt.Sprintf("%d", t.resourceVersions[gvr]))
}
return list.DeepCopyObject(), nil
}
@ -358,6 +395,17 @@ func (t *tracker) Watch(gvr schema.GroupVersionResource, ns string, opts ...meta
if err != nil {
return nil, err
}
// By default, emulate the traditional behavior of the tracker and don't deliver
// *any* existing objects. This ensures compatibility which users
// which don't pass options and then also don't expect the additional objects.
resourceVersion := int64(-1)
if len(opts) > 0 && opts[0].ResourceVersion != "" {
rv, err := strconv.ParseInt(opts[0].ResourceVersion, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid ResourceVersion %q in ListOptions, must be int64: %w", opts[0].ResourceVersion, err)
}
resourceVersion = rv
}
t.lock.Lock()
defer t.lock.Unlock()
@ -368,6 +416,22 @@ func (t *tracker) Watch(gvr schema.GroupVersionResource, ns string, opts ...meta
t.watchers[gvr] = make(map[string][]*watch.RaceFreeFakeWatcher)
}
t.watchers[gvr][ns] = append(t.watchers[gvr][ns], fakewatcher)
// Deliver all objects that match the list options, for example
// between the initial List and the following Watch.
if resourceVersion != -1 {
objs := t.objects[gvr]
matchingObjs, err := filterByNamespace(objs, ns)
if err != nil {
return nil, err
}
for _, obj := range matchingObjs {
if resourceVersion == 0 || resourceVersion < obj.resourceVersion {
fakewatcher.Add(obj.Object)
}
}
}
return fakewatcher, nil
}
@ -565,17 +629,19 @@ func (t *tracker) add(gvr schema.GroupVersionResource, obj runtime.Object, ns st
_, ok := t.objects[gvr]
if !ok {
t.objects[gvr] = make(map[types.NamespacedName]runtime.Object)
t.objects[gvr] = make(map[types.NamespacedName]versionedObject)
}
namespacedName := types.NamespacedName{Namespace: newMeta.GetNamespace(), Name: newMeta.GetName()}
if _, ok = t.objects[gvr][namespacedName]; ok {
if replaceExisting {
resourceVersion := t.resourceVersions[gvr] + 1
t.resourceVersions[gvr] = resourceVersion
for _, w := range t.getWatches(gvr, ns) {
// To avoid the object from being accidentally modified by watcher
w.Modify(obj.DeepCopyObject())
}
t.objects[gvr][namespacedName] = obj
t.objects[gvr][namespacedName] = versionedObject{resourceVersion, obj}
return nil
}
return apierrors.NewAlreadyExists(gr, newMeta.GetName())
@ -586,7 +652,9 @@ func (t *tracker) add(gvr schema.GroupVersionResource, obj runtime.Object, ns st
return apierrors.NewNotFound(gr, newMeta.GetName())
}
t.objects[gvr][namespacedName] = obj
resourceVersion := t.resourceVersions[gvr] + 1
t.resourceVersions[gvr] = resourceVersion
t.objects[gvr][namespacedName] = versionedObject{resourceVersion, obj}
for _, w := range t.getWatches(gvr, ns) {
// To avoid the object from being accidentally modified by watcher
@ -841,11 +909,11 @@ func (d *objectDefaulter) Default(_ runtime.Object) {}
// filterByNamespace returns all objects in the collection that
// match provided namespace. Empty namespace matches
// non-namespaced objects.
func filterByNamespace(objs map[types.NamespacedName]runtime.Object, ns string) ([]runtime.Object, error) {
var res []runtime.Object
func filterByNamespace(objs map[types.NamespacedName]versionedObject, ns string) ([]versionedObject, error) {
var res []versionedObject
for _, obj := range objs {
acc, err := meta.Accessor(obj)
acc, err := meta.Accessor(obj.Object)
if err != nil {
return nil, err
}
@ -857,8 +925,8 @@ func filterByNamespace(objs map[types.NamespacedName]runtime.Object, ns string)
// Sort res to get deterministic order.
sort.Slice(res, func(i, j int) bool {
acc1, _ := meta.Accessor(res[i])
acc2, _ := meta.Accessor(res[j])
acc1, _ := meta.Accessor(res[i].Object)
acc2, _ := meta.Accessor(res[j].Object)
if acc1.GetNamespace() != acc2.GetNamespace() {
return acc1.GetNamespace() < acc2.GetNamespace()
}

View file

@ -0,0 +1,117 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package internal
import (
"context"
"testing"
"testing/synctest"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2/ktesting"
_ "k8s.io/klog/v2/ktesting/init" // for -testing.v
)
// TestListAndWatch mirrors how fake client-go is often used with real
// informers. It enforces a timing such that List completes, a new
// object gets created because of the completed cache sync, and only
// then is the Watch call in the reflector's "ListAndWatch" allowed to
// continue.
//
// The fake Watch implementation then must use the ResourceVersion to
// detect that it must send some (but not all!) objects to the new watch.
//
// This runs in a synctest bubble, therefore time is virtual.
func TestListAndWatch(t *testing.T) { synctest.Test(t, testListAndWatch) }
func testListAndWatch(t *testing.T) {
logger, ctx := ktesting.NewTestContext(t)
cm := &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "cm1",
Namespace: "default",
},
}
client := fake.NewClientset(cm)
stopCh := make(chan struct{})
defer close(stopCh)
createDone := make(chan struct{})
f := informers.NewSharedInformerFactory(client, 0)
configMapInformer := f.InformerFor(&v1.ConfigMap{}, func(client kubernetes.Interface, defaultEventHandlerResyncPeriod time.Duration) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(cache.ToListWatcherWithWatchListSemantics(&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
objs, err := client.CoreV1().ConfigMaps("").List(context.Background(), options)
logger.Info("Listed", "configMaps", objs, "err", err)
if err != nil {
t.Errorf("Unexpected List error: %v", err)
} else if objs.ResourceVersion != "1" {
t.Errorf("Expected ListMeta ResourceVersion 1, got %q", objs.ResourceVersion)
}
return objs, err
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if options.ResourceVersion != "1" {
t.Errorf("Expected ListOptions ResourceVersion 1, got %q", options.ResourceVersion)
}
logger.Info("Delaying Watch...")
<-createDone
logger.Info("Continuing Watch...")
return client.CoreV1().ConfigMaps("").Watch(context.Background(), options)
},
}, client), &v1.ConfigMap{}, defaultEventHandlerResyncPeriod, nil)
})
configMapStore := configMapInformer.GetStore()
f.Start(stopCh)
f.WaitForCacheSync(stopCh)
logger.Info("Caches synced")
objs := configMapStore.List()
if len(objs) != 1 {
t.Fatalf("Unexpected item(s) in informer cache, want 1, got %d = %v", len(objs), objs)
}
cm = &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "cm2",
Namespace: "default",
},
}
_, err := client.CoreV1().ConfigMaps(cm.Namespace).Create(ctx, cm, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Unexpected error creating ConfigMap: %v", err)
}
logger.Info("Created second ConfigMap")
close(createDone)
// Wait for watch setup and event processing.
synctest.Wait()
objs = configMapStore.List()
if len(objs) != 2 {
t.Fatalf("Unexpected item(s) in informer cache, want 2, got %d = %v", len(objs), objs)
}
}

View file

@ -50,6 +50,9 @@ func TestGetPodList(t *testing.T) {
podList: newPodList(2, -1, -1, labelSet),
sortBy: func(pods []*corev1.Pod) sort.Interface { return podutils.ByLogging(pods) },
expected: &corev1.PodList{
ListMeta: metav1.ListMeta{
ResourceVersion: "2", // Two objects created.
},
Items: []corev1.Pod{
{
ObjectMeta: metav1.ObjectMeta{