From fdcbb6cba9a04c028b158bf66d505df7431f63fe Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Tue, 18 Nov 2025 12:39:11 +0100 Subject: [PATCH] client-go cache: wait for cache sync via channels, better logging The main advantage is that waiting on channels creates a causal relationship between goroutines which is visible to synctest. When a controller in a synctest bubble does a WaitFor in a test's background goroutine for the controller, the test can use synctest.Wait to wait for completion of cache sync, without requiring any test specific "has controller synced" API. Without this, the test had to poll or otherwise wait for the controller. The polling in WaitForCacheSync moved the virtual clock forward by a random amount, depending on how often it had to check in wait.Poll. Now tests can be written such that all events during a test happen at a predictable time. This will be demonstrated in a separate commit for the pkg/controller/devicetainteviction unit test. The benefit for normal production is immediate continuation when the last informer is synced (not really a problem, but still...) and more important, nicer logging thanks to the names associated with the thing that is being waited for. The caller decides whether logging is enabled or disabled and describes what is being waited for (typically informer caches, but maybe also event handlers or even something else entirely as long as it implements the DoneChecker interface). Before: Waiting for caches to sync Caches are synced After: Waiting for="cache and event handler sync" Done waiting for="cache and event handler sync" instance="SharedIndexInformer *v1.Pod" Done waiting for="cache and event handler sync" instance="SharedIndexInformer *v1.ResourceClaim" Done waiting for="cache and event handler sync" instance="SharedIndexInformer *v1.ResourceSlice" Done waiting for="cache and event handler sync" instance="SharedIndexInformer *v1.DeviceClass" Done waiting for="cache and event handler sync" instance="SharedIndexInformer *v1alpha3.DeviceTaintRule" Done waiting for="cache and event handler sync" instance="SharedIndexInformer *v1.ResourceClaim + event handler k8s.io/kubernetes/pkg/controller/devicetainteviction.(*Controller).Run" Done waiting for="cache and event handler sync" instance="SharedIndexInformer *v1.Pod + event handler k8s.io/kubernetes/pkg/controller/devicetainteviction.(*Controller).Run" Done waiting for="cache and event handler sync" instance="SharedIndexInformer *v1alpha3.DeviceTaintRule + event handler k8s.io/kubernetes/pkg/controller/devicetainteviction.(*Controller).Run" Done waiting for="cache and event handler sync" instance="SharedIndexInformer *v1.ResourceSlice + event handler k8s.io/kubernetes/pkg/controller/devicetainteviction.(*Controller).Run" The "SharedIndexInformer *v1.Pod" is also how this appears in metrics. --- .../util/assumecache/assume_cache.go | 16 +- .../policy/internal/generic/controller.go | 36 ++- .../client-go/tools/cache/controller.go | 48 +++- .../tools/cache/controller_bench_test.go | 8 +- .../client-go/tools/cache/controller_test.go | 10 +- .../client-go/tools/cache/delta_fifo.go | 46 +++- .../tools/cache/event_handler_name.go | 119 ++++++++++ .../tools/cache/event_handler_name_test.go | 87 +++++++ .../src/k8s.io/client-go/tools/cache/fifo.go | 50 +++- .../tools/cache/processor_listener_test.go | 22 +- ...eflector_data_consistency_detector_test.go | 2 +- .../client-go/tools/cache/shared_informer.go | 216 ++++++++++++++++-- .../tools/cache/shared_informer_test.go | 9 +- .../tools/cache/synctrack/synctrack.go | 132 +++++++++-- .../tools/cache/synctrack/synctrack_test.go | 195 ++++++++-------- .../client-go/tools/cache/the_real_fifo.go | 63 ++++- .../k8s.io/client-go/tools/cache/wait_test.go | 200 ++++++++++++++++ .../resourceslice/tracker/tracker.go | 59 ++++- 18 files changed, 1118 insertions(+), 200 deletions(-) create mode 100644 staging/src/k8s.io/client-go/tools/cache/event_handler_name.go create mode 100644 staging/src/k8s.io/client-go/tools/cache/event_handler_name_test.go create mode 100644 staging/src/k8s.io/client-go/tools/cache/wait_test.go diff --git a/pkg/scheduler/util/assumecache/assume_cache.go b/pkg/scheduler/util/assumecache/assume_cache.go index fae3230bc3a..928c985ee87 100644 --- a/pkg/scheduler/util/assumecache/assume_cache.go +++ b/pkg/scheduler/util/assumecache/assume_cache.go @@ -508,7 +508,11 @@ func (c *AssumeCache) AddEventHandler(handler cache.ResourceEventHandler) cache. if c.handlerRegistration == nil { // No informer, so immediately synced. - return syncedHandlerRegistration{} + s := syncedHandlerRegistration{ + synced: make(chan struct{}), + } + close(s.synced) + return s } return c.handlerRegistration @@ -557,6 +561,14 @@ func (c *AssumeCache) emitEvents() { // syncedHandlerRegistration is an implementation of ResourceEventHandlerRegistration // which always returns true. -type syncedHandlerRegistration struct{} +type syncedHandlerRegistration struct { + synced chan struct{} +} func (syncedHandlerRegistration) HasSynced() bool { return true } + +func (s syncedHandlerRegistration) HasSyncedChecker() cache.DoneChecker { return s } + +func (s syncedHandlerRegistration) Name() string { return "AssumeCache" } + +func (s syncedHandlerRegistration) Done() <-chan struct{} { return s.synced } diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/policy/internal/generic/controller.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/policy/internal/generic/controller.go index a94004c272d..65ffee4e2f6 100644 --- a/staging/src/k8s.io/apiserver/pkg/admission/plugin/policy/internal/generic/controller.go +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/policy/internal/generic/controller.go @@ -21,7 +21,6 @@ import ( "errors" "fmt" "sync" - "sync/atomic" "time" kerrors "k8s.io/apimachinery/pkg/api/errors" @@ -48,10 +47,7 @@ type controller[T runtime.Object] struct { options ControllerOptions - // must hold a func() bool or nil - notificationsDelivered atomic.Value - - hasProcessed synctrack.AsyncTracker[string] + hasProcessed *synctrack.AsyncTracker[string] } type ControllerOptions struct { @@ -77,17 +73,11 @@ func NewController[T runtime.Object]( } c := &controller[T]{ - options: options, - informer: informer, - reconciler: reconciler, - queue: nil, - } - c.hasProcessed.UpstreamHasSynced = func() bool { - f := c.notificationsDelivered.Load() - if f == nil { - return false - } - return f.(func() bool)() + options: options, + informer: informer, + reconciler: reconciler, + queue: nil, + hasProcessed: synctrack.NewAsyncTracker[string](options.Name), } return c } @@ -159,12 +149,9 @@ func (c *controller[T]) Run(ctx context.Context) error { return err } - c.notificationsDelivered.Store(registration.HasSynced) - // Make sure event handler is removed from informer in case return early from // an error defer func() { - c.notificationsDelivered.Store(func() bool { return false }) // Remove event handler and Handle Error here. Error should only be raised // for improper usage of event handler API. if err := c.informer.RemoveEventHandler(registration); err != nil { @@ -174,7 +161,12 @@ func (c *controller[T]) Run(ctx context.Context) error { // Wait for initial cache list to complete before beginning to reconcile // objects. - if !cache.WaitForNamedCacheSyncWithContext(ctx, c.informer.HasSynced) { + if !cache.WaitFor(ctx, "caches", c.informer.HasSyncedChecker(), registration.HasSyncedChecker()) { + // TODO: should cache.WaitFor return an error? + // ctx.Err() or context.Cause(ctx)? + // Either of them would make dead code like the "if err == nil" + // below more obvious. + // ctx cancelled during cache sync. return early err := ctx.Err() if err == nil { @@ -184,6 +176,10 @@ func (c *controller[T]) Run(ctx context.Context) error { return err } + // c.informer *and* our handler have synced, which implies that our AddFunc(= enqueue) + // and thus c.hasProcessed.Start have been called for the initial list => upstream is done. + c.hasProcessed.UpstreamHasSynced() + waitGroup := sync.WaitGroup{} for i := uint(0); i < c.options.Workers; i++ { diff --git a/staging/src/k8s.io/client-go/tools/cache/controller.go b/staging/src/k8s.io/client-go/tools/cache/controller.go index a5d47716f70..75d2ca979f1 100644 --- a/staging/src/k8s.io/client-go/tools/cache/controller.go +++ b/staging/src/k8s.io/client-go/tools/cache/controller.go @@ -142,6 +142,11 @@ type Controller interface { // HasSynced delegates to the Config's Queue HasSynced() bool + // HasSyncedChecker enables waiting for syncing without polling. + // The returned DoneChecker can be passed to WaitFor. + // It delegates to the Config's Queue. + HasSyncedChecker() DoneChecker + // LastSyncResourceVersion delegates to the Reflector when there // is one, otherwise returns the empty string LastSyncResourceVersion() string @@ -168,11 +173,13 @@ func (c *controller) RunWithContext(ctx context.Context) { <-ctx.Done() c.config.Queue.Close() }() + logger := klog.FromContext(ctx) r := NewReflectorWithOptions( c.config.ListerWatcher, c.config.ObjectType, c.config.Queue, ReflectorOptions{ + Logger: &logger, ResyncPeriod: c.config.FullResyncPeriod, MinWatchTimeout: c.config.MinWatchTimeout, TypeDescription: c.config.ObjectDescription, @@ -206,6 +213,13 @@ func (c *controller) HasSynced() bool { return c.config.Queue.HasSynced() } +// HasSyncedChecker enables waiting for syncing without polling. +// The returned DoneChecker can be passed to [WaitFor]. +// It delegates to the Config's Queue. +func (c *controller) HasSyncedChecker() DoneChecker { + return c.config.Queue.HasSyncedChecker() +} + func (c *controller) LastSyncResourceVersion() string { c.reflectorMutex.RLock() defer c.reflectorMutex.RUnlock() @@ -591,6 +605,7 @@ func NewTransformingIndexerInformer( // Multiplexes updates in the form of a list of Deltas into a Store, and informs // a given handler of events OnUpdate, OnAdd, OnDelete func processDeltas( + logger klog.Logger, // Object which receives event notifications from the given deltas handler ResourceEventHandler, clientState Store, @@ -608,7 +623,7 @@ func processDeltas( if !ok { return fmt.Errorf("ReplacedAll did not contain ReplacedAllInfo: %T", obj) } - if err := processReplacedAllInfo(handler, info, clientState, isInInitialList, keyFunc); err != nil { + if err := processReplacedAllInfo(logger, handler, info, clientState, isInInitialList, keyFunc); err != nil { return err } case SyncAll: @@ -653,6 +668,7 @@ func processDeltas( // Returns an error if any Delta or transaction fails. For TransactionError, // only successful operations trigger callbacks. func processDeltasInBatch( + logger klog.Logger, handler ResourceEventHandler, clientState Store, deltas []Delta, @@ -666,7 +682,7 @@ func processDeltasInBatch( if !txnSupported { var errs []error for _, delta := range deltas { - if err := processDeltas(handler, clientState, Deltas{delta}, isInInitialList, keyFunc); err != nil { + if err := processDeltas(logger, handler, clientState, Deltas{delta}, isInInitialList, keyFunc); err != nil { errs = append(errs, err) } } @@ -731,7 +747,7 @@ func processDeltasInBatch( return nil } -func processReplacedAllInfo(handler ResourceEventHandler, info ReplacedAllInfo, clientState Store, isInInitialList bool, keyFunc KeyFunc) error { +func processReplacedAllInfo(logger klog.Logger, handler ResourceEventHandler, info ReplacedAllInfo, clientState Store, isInInitialList bool, keyFunc KeyFunc) error { var deletions []DeletedFinalStateUnknown type replacement struct { oldObj interface{} @@ -739,7 +755,7 @@ func processReplacedAllInfo(handler ResourceEventHandler, info ReplacedAllInfo, } replacements := make([]replacement, 0, len(info.Objects)) - err := reconcileReplacement(nil, clientState, info.Objects, keyFunc, + err := reconcileReplacement(logger, nil, clientState, info.Objects, keyFunc, func(obj DeletedFinalStateUnknown) error { deletions = append(deletions, obj) return nil @@ -792,7 +808,7 @@ func newInformer(clientState Store, options InformerOptions, keyFunc KeyFunc) Co if options.Logger != nil { logger = *options.Logger } - fifo := newQueueFIFO(logger, clientState, options.Transform, options.Identifier, options.FIFOMetricsProvider) + logger, fifo := newQueueFIFO(logger, options.ObjectType, clientState, options.Transform, options.Identifier, options.FIFOMetricsProvider) cfg := &Config{ Queue: fifo, @@ -803,21 +819,30 @@ func newInformer(clientState Store, options InformerOptions, keyFunc KeyFunc) Co Process: func(obj interface{}, isInInitialList bool) error { if deltas, ok := obj.(Deltas); ok { - return processDeltas(options.Handler, clientState, deltas, isInInitialList, keyFunc) + // This must be the logger *of the fifo*. + return processDeltas(logger, options.Handler, clientState, deltas, isInInitialList, keyFunc) } return errors.New("object given as Process argument is not Deltas") }, ProcessBatch: func(deltaList []Delta, isInInitialList bool) error { - return processDeltasInBatch(options.Handler, clientState, deltaList, isInInitialList, keyFunc) + // Same here. + return processDeltasInBatch(logger, options.Handler, clientState, deltaList, isInInitialList, keyFunc) }, } return New(cfg) } -func newQueueFIFO(logger klog.Logger, clientState Store, transform TransformFunc, identifier InformerNameAndResource, metricsProvider FIFOMetricsProvider) Queue { +// newQueueFIFO constructs a new FIFO, choosing between real and delta FIFO +// depending on the InOrderInformers feature gate. +// +// It returns the FIFO and the logger used by the FIFO. +// That logger includes the name used for the FIFO, +// in contrast to the logger which was passed in. +func newQueueFIFO(logger klog.Logger, objectType any, clientState Store, transform TransformFunc, identifier InformerNameAndResource, metricsProvider FIFOMetricsProvider) (klog.Logger, Queue) { if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) { options := RealFIFOOptions{ Logger: &logger, + Name: fmt.Sprintf("RealFIFO %T", objectType), KeyFunction: MetaNamespaceKeyFunc, Transformer: transform, Identifier: identifier, @@ -830,13 +855,16 @@ func newQueueFIFO(logger klog.Logger, clientState Store, transform TransformFunc } else { options.KnownObjects = clientState } - return NewRealFIFOWithOptions(options) + f := NewRealFIFOWithOptions(options) + return f.logger, f } else { - return NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ Logger: &logger, + Name: fmt.Sprintf("DeltaFIFO %T", objectType), KnownObjects: clientState, EmitDeltaTypeReplaced: true, Transformer: transform, }) + return f.logger, f } } diff --git a/staging/src/k8s.io/client-go/tools/cache/controller_bench_test.go b/staging/src/k8s.io/client-go/tools/cache/controller_bench_test.go index 754ab3f1236..f81a3d3f97f 100644 --- a/staging/src/k8s.io/client-go/tools/cache/controller_bench_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/controller_bench_test.go @@ -26,6 +26,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" fcache "k8s.io/client-go/tools/cache/testing" + "k8s.io/klog/v2/ktesting" ) const handlerWaitTime = time.Millisecond @@ -33,7 +34,8 @@ const handlerWaitTime = time.Millisecond func BenchmarkAddWithSlowHandlers(b *testing.B) { for _, unlockWhileProcessing := range []bool{false, true} { b.Run(fmt.Sprintf("unlockWhileProcessing=%t", unlockWhileProcessing), func(b *testing.B) { - ctx, cancel := context.WithCancel(context.Background()) + logger, ctx := ktesting.NewTestContext(b) + ctx, cancel := context.WithCancel(ctx) source := fcache.NewFakeControllerSource() b.Cleanup(func() { cancel() @@ -65,12 +67,12 @@ func BenchmarkAddWithSlowHandlers(b *testing.B) { Process: func(obj interface{}, isInInitialList bool) error { if deltas, ok := obj.(Deltas); ok { - return processDeltas(handler, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) + return processDeltas(logger, handler, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) } return errors.New("object given as Process argument is not Deltas") }, ProcessBatch: func(deltaList []Delta, isInInitialList bool) error { - return processDeltasInBatch(handler, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) + return processDeltasInBatch(logger, handler, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) }, } c := New(cfg) diff --git a/staging/src/k8s.io/client-go/tools/cache/controller_test.go b/staging/src/k8s.io/client-go/tools/cache/controller_test.go index c4ee71f477e..9528dc6caec 100644 --- a/staging/src/k8s.io/client-go/tools/cache/controller_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/controller_test.go @@ -834,6 +834,7 @@ func TestProcessDeltasInBatch(t *testing.T) { } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) mockStore := &mockTxnStore{ Store: NewStore(MetaNamespaceKeyFunc), failingObjs: tc.failingObjects, @@ -851,6 +852,7 @@ func TestProcessDeltasInBatch(t *testing.T) { }, } err := processDeltasInBatch( + logger, dummyListener, mockStore, tc.deltaList, @@ -929,12 +931,12 @@ func TestReplaceEvents(t *testing.T) { Process: func(obj interface{}, isInInitialList bool) error { if deltas, ok := obj.(Deltas); ok { - return processDeltas(recorder, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) + return processDeltas(fifo.logger, recorder, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) } return errors.New("object given as Process argument is not Deltas") }, ProcessBatch: func(deltaList []Delta, isInInitialList bool) error { - return processDeltasInBatch(recorder, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) + return processDeltasInBatch(fifo.logger, recorder, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) }, } c := New(cfg) @@ -1066,12 +1068,12 @@ func TestResetWatch(t *testing.T) { Process: func(obj interface{}, isInInitialList bool) error { if deltas, ok := obj.(Deltas); ok { - return processDeltas(recorder, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) + return processDeltas(fifo.logger, recorder, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) } return errors.New("object given as Process argument is not Deltas") }, ProcessBatch: func(deltaList []Delta, isInInitialList bool) error { - return processDeltasInBatch(recorder, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) + return processDeltasInBatch(fifo.logger, recorder, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) }, } c := New(cfg) diff --git a/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go b/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go index 12157dad5c6..b57cc472831 100644 --- a/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go +++ b/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go @@ -126,6 +126,11 @@ type DeltaFIFO struct { // A key is in `queue` if and only if it is in `items`. queue []string + // synced is initially an open channel. It gets closed (once!) by checkSynced_locked + // as soon as the initial sync is considered complete. + synced chan struct{} + syncedClosed bool + // populated is true if the first batch of items inserted by Replace() has been populated // or Delete/Add/Update/AddIfNotPresent was called first. populated bool @@ -272,6 +277,7 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO { f := &DeltaFIFO{ logger: klog.Background(), name: "DeltaFIFO", + synced: make(chan struct{}), items: map[string]Deltas{}, queue: []string{}, keyFunc: opts.KeyFunction, @@ -283,8 +289,8 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO { if opts.Logger != nil { f.logger = *opts.Logger } - if opts.Name != "" { - f.name = opts.Name + if name := opts.Name; name != "" { + f.name = name } f.logger = klog.LoggerWithName(f.logger, f.name) f.cond.L = &f.lock @@ -294,6 +300,7 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO { var ( _ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue _ = TransformingStore(&DeltaFIFO{}) // DeltaFIFO implements TransformingStore to allow memory optimizations + _ = DoneChecker(&DeltaFIFO{}) // DeltaFIFO implements DoneChecker. ) var ( @@ -339,8 +346,36 @@ func (f *DeltaFIFO) HasSynced() bool { return f.hasSynced_locked() } +// HasSyncedChecker is done if an Add/Update/Delete/AddIfNotPresent are called first, +// or the first batch of items inserted by Replace() has been popped. +func (f *DeltaFIFO) HasSyncedChecker() DoneChecker { + return f +} + +// Name implements [DoneChecker.Name] +func (f *DeltaFIFO) Name() string { + return f.name +} + +// Done implements [DoneChecker.Done] +func (f *DeltaFIFO) Done() <-chan struct{} { + return f.synced +} + +// hasSynced_locked returns the result of a prior checkSynced_locked call. func (f *DeltaFIFO) hasSynced_locked() bool { - return f.populated && f.initialPopulationCount == 0 + return f.syncedClosed +} + +// checkSynced_locked checks whether the initial is completed. +// It must be called whenever populated or initialPopulationCount change. +func (f *DeltaFIFO) checkSynced_locked() { + synced := f.populated && f.initialPopulationCount == 0 + if synced && !f.syncedClosed { + // Initial sync is complete. + f.syncedClosed = true + close(f.synced) + } } // Add inserts an item, and puts it in the queue. The item is only enqueued @@ -349,6 +384,7 @@ func (f *DeltaFIFO) Add(obj interface{}) error { f.lock.Lock() defer f.lock.Unlock() f.populated = true + f.checkSynced_locked() return f.queueActionLocked(Added, obj) } @@ -357,6 +393,7 @@ func (f *DeltaFIFO) Update(obj interface{}) error { f.lock.Lock() defer f.lock.Unlock() f.populated = true + f.checkSynced_locked() return f.queueActionLocked(Updated, obj) } @@ -373,6 +410,7 @@ func (f *DeltaFIFO) Delete(obj interface{}) error { f.lock.Lock() defer f.lock.Unlock() f.populated = true + f.checkSynced_locked() if f.knownObjects == nil { if _, exists := f.items[id]; !exists { // Presumably, this was deleted when a relist happened. @@ -538,6 +576,7 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { depth := len(f.queue) if f.initialPopulationCount > 0 { f.initialPopulationCount-- + f.checkSynced_locked() } item, ok := f.items[id] if !ok { @@ -650,6 +689,7 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error { if !f.populated { f.populated = true f.initialPopulationCount = keys.Len() + queuedDeletions + f.checkSynced_locked() } return nil diff --git a/staging/src/k8s.io/client-go/tools/cache/event_handler_name.go b/staging/src/k8s.io/client-go/tools/cache/event_handler_name.go new file mode 100644 index 00000000000..9489f9b47ba --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/cache/event_handler_name.go @@ -0,0 +1,119 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "fmt" + "reflect" + "runtime" + "strings" +) + +func nameForHandler(handler ResourceEventHandler) (name string) { + defer func() { + // Last resort: let Sprintf handle it. + if name == "" { + name = fmt.Sprintf("%T", handler) + } + }() + + if handler == nil { + return "" + } + switch handler := handler.(type) { + case *ResourceEventHandlerFuncs: + return nameForHandlerFuncs(*handler) + case ResourceEventHandlerFuncs: + return nameForHandlerFuncs(handler) + default: + // We can use the fully qualified name of whatever + // provides the interface. We don't care whether + // it contains fields or methods which provide + // the interface methods. + value := reflect.ValueOf(handler) + if value.Type().Kind() == reflect.Interface { + // Probably not needed, but let's play it safe. + value = value.Elem() + } + if value.Type().Kind() == reflect.Pointer { + value = value.Elem() + } + name := value.Type().PkgPath() + if name != "" { + name += "." + } + if typeName := value.Type().Name(); typeName != "" { + name += typeName + } + return name + } +} + +func nameForHandlerFuncs(funcs ResourceEventHandlerFuncs) string { + return nameForFunctions(funcs.AddFunc, funcs.UpdateFunc, funcs.DeleteFunc) +} + +func nameForFunctions(fs ...any) string { + // If all functions are defined in the same place, then we + // don't care about the actual function name in + // e.g. "main.FuncName" or "main.(*Foo).FuncName-fm", instead + // we use the common qualifier. + // + // But we don't know that yet, so we also collect all names. + var qualifier string + singleQualifier := true + var names []string + for _, f := range fs { + if f == nil { + continue + } + name := nameForFunction(f) + if name == "" { + continue + } + names = append(names, name) + + newQualifier := name + index := strings.LastIndexByte(newQualifier, '.') + if index > 0 { + newQualifier = newQualifier[:index] + } + switch qualifier { + case "": + qualifier = newQualifier + case newQualifier: + // So far, so good... + default: + // Nope, different. + singleQualifier = false + } + } + + if singleQualifier { + return qualifier + } + + return strings.Join(names, "+") +} + +func nameForFunction(f any) string { + fn := runtime.FuncForPC(reflect.ValueOf(f).Pointer()) + if fn == nil { + return "" + } + return fn.Name() +} diff --git a/staging/src/k8s.io/client-go/tools/cache/event_handler_name_test.go b/staging/src/k8s.io/client-go/tools/cache/event_handler_name_test.go new file mode 100644 index 00000000000..d17f66a0866 --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/cache/event_handler_name_test.go @@ -0,0 +1,87 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "testing" +) + +type mockHandler struct{} + +func (m mockHandler) OnAdd(any, bool) {} +func (m mockHandler) OnUpdate(any, any) {} +func (m mockHandler) OnDelete(any) {} + +func TestNameForHandler(t *testing.T) { + emptyHandler := ResourceEventHandlerFuncs{} + + for name, tc := range map[string]struct { + handler ResourceEventHandler + wantName string + }{ + "mixture": { + handler: ResourceEventHandlerFuncs{ + UpdateFunc: emptyHandler.OnUpdate, + DeleteFunc: func(any) {}, + }, + wantName: "k8s.io/client-go/tools/cache.ResourceEventHandlerFuncs.OnUpdate-fm+k8s.io/client-go/tools/cache.TestNameForHandler.func1", // Testcase must come first to get func1. + }, + "add": { + handler: ResourceEventHandlerFuncs{AddFunc: func(any) {}}, + wantName: "k8s.io/client-go/tools/cache.TestNameForHandler", + }, + "update": { + handler: ResourceEventHandlerFuncs{UpdateFunc: func(any, any) {}}, + wantName: "k8s.io/client-go/tools/cache.TestNameForHandler", + }, + "delete": { + handler: ResourceEventHandlerFuncs{DeleteFunc: func(any) {}}, + wantName: "k8s.io/client-go/tools/cache.TestNameForHandler", + }, + "all": { + handler: ResourceEventHandlerFuncs{ + AddFunc: func(any) {}, + UpdateFunc: func(any, any) {}, + DeleteFunc: func(any) {}, + }, + wantName: "k8s.io/client-go/tools/cache.TestNameForHandler", + }, + "ptrToFuncs": { + handler: &ResourceEventHandlerFuncs{AddFunc: func(any) {}}, + wantName: "k8s.io/client-go/tools/cache.TestNameForHandler", + }, + "struct": { + handler: mockHandler{}, + wantName: "k8s.io/client-go/tools/cache.mockHandler", + }, + "ptrToStruct": { + handler: &mockHandler{}, + wantName: "k8s.io/client-go/tools/cache.mockHandler", + }, + "nil": { + handler: nil, + wantName: "", + }, + } { + t.Run(name, func(t *testing.T) { + gotName := nameForHandler(tc.handler) + if gotName != tc.wantName { + t.Errorf("Got name:\n %s\nWanted name:\n %s", gotName, tc.wantName) + } + }) + } +} diff --git a/staging/src/k8s.io/client-go/tools/cache/fifo.go b/staging/src/k8s.io/client-go/tools/cache/fifo.go index 44d006b6987..e31e2d27039 100644 --- a/staging/src/k8s.io/client-go/tools/cache/fifo.go +++ b/staging/src/k8s.io/client-go/tools/cache/fifo.go @@ -58,6 +58,12 @@ type Queue interface { // Update, or Delete; otherwise the first batch is empty. HasSynced() bool + // HasSyncedChecker is done once the first batch of keys have all been + // popped. The first batch of keys are those of the first Replace + // operation if that happened before any Add, AddIfNotPresent, + // Update, or Delete; otherwise the first batch is empty. + HasSyncedChecker() DoneChecker + // Close the queue Close() } @@ -110,6 +116,11 @@ type FIFO struct { items map[string]interface{} queue []string + // synced is initially an open channel. It gets closed (once!) by checkSynced + // as soon as the initial sync is considered complete. + synced chan struct{} + syncedClosed bool + // populated is true if the first batch of items inserted by Replace() has been populated // or Delete/Add/Update was called first. populated bool @@ -127,7 +138,8 @@ type FIFO struct { } var ( - _ = Queue(&FIFO{}) // FIFO is a Queue + _ = Queue(&FIFO{}) // FIFO is a Queue + _ = DoneChecker(&FIFO{}) // ... and implements DoneChecker. ) // Close the queue. @@ -146,8 +158,36 @@ func (f *FIFO) HasSynced() bool { return f.hasSynced_locked() } +// HasSyncedChecker is done if an Add/Update/Delete/AddIfNotPresent are called first, +// or the first batch of items inserted by Replace() has been popped. +func (f *FIFO) HasSyncedChecker() DoneChecker { + return f +} + +// Name implements [DoneChecker.Name] +func (f *FIFO) Name() string { + return "FIFO" // FIFO doesn't seem to be used outside of a few tests, so changing the NewFIFO API to pass in a name doesn't seem worth it. +} + +// Done implements [DoneChecker.Done] +func (f *FIFO) Done() <-chan struct{} { + return f.synced +} + +// hasSynced_locked returns the result of a prior checkSynced call. func (f *FIFO) hasSynced_locked() bool { - return f.populated && f.initialPopulationCount == 0 + return f.syncedClosed +} + +// checkSynced checks whether the initial sync is completed. +// It must be called whenever populated or initialPopulationCount change +// while the mutex is still locked. +func (f *FIFO) checkSynced() { + synced := f.populated && f.initialPopulationCount == 0 + if synced && !f.syncedClosed { + f.syncedClosed = true + close(f.synced) + } } // Add inserts an item, and puts it in the queue. The item is only enqueued @@ -160,6 +200,7 @@ func (f *FIFO) Add(obj interface{}) error { f.lock.Lock() defer f.lock.Unlock() f.populated = true + f.checkSynced() if _, exists := f.items[id]; !exists { f.queue = append(f.queue, id) } @@ -184,6 +225,7 @@ func (f *FIFO) Delete(obj interface{}) error { f.lock.Lock() defer f.lock.Unlock() f.populated = true + f.checkSynced() delete(f.items, id) return err } @@ -220,6 +262,8 @@ func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) { f.queue = f.queue[1:] if f.initialPopulationCount > 0 { f.initialPopulationCount-- + // Must be done *after* process has completed. + defer f.checkSynced() } item, ok := f.items[id] if !ok { @@ -252,6 +296,7 @@ func (f *FIFO) Replace(list []interface{}, resourceVersion string) error { if !f.populated { f.populated = true f.initialPopulationCount = len(items) + f.checkSynced() } f.items = items @@ -290,6 +335,7 @@ func (f *FIFO) Resync() error { // process. func NewFIFO(keyFunc KeyFunc) *FIFO { f := &FIFO{ + synced: make(chan struct{}), items: map[string]interface{}{}, queue: []string{}, keyFunc: keyFunc, diff --git a/staging/src/k8s.io/client-go/tools/cache/processor_listener_test.go b/staging/src/k8s.io/client-go/tools/cache/processor_listener_test.go index 517258a160d..cd8c5c739bf 100644 --- a/staging/src/k8s.io/client-go/tools/cache/processor_listener_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/processor_listener_test.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "context" "sync" "testing" "time" @@ -29,6 +30,25 @@ const ( concurrencyLevel = 5 ) +type mockSynced struct { + context.Context + cancel func() +} + +func newMockSynced(tb testing.TB, synced bool) *mockSynced { + m := &mockSynced{} + m.Context, m.cancel = context.WithCancel(context.Background()) + if synced { + m.cancel() + } + tb.Cleanup(m.cancel) + return m +} + +func (m *mockSynced) Name() string { + return "mock" +} + func BenchmarkListener(b *testing.B) { var notification addNotification @@ -40,7 +60,7 @@ func BenchmarkListener(b *testing.B) { AddFunc: func(obj interface{}) { swg.Done() }, - }, 0, 0, time.Now(), 1024*1024, func() bool { return true }) + }, 0, 0, time.Now(), 1024*1024, newMockSynced(b, true)) var wg wait.Group defer wg.Wait() // Wait for .run and .pop to stop defer close(pl.addCh) // Tell .run and .pop to stop diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector_data_consistency_detector_test.go b/staging/src/k8s.io/client-go/tools/cache/reflector_data_consistency_detector_test.go index f6670a42c0e..fc9c1529e19 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector_data_consistency_detector_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector_data_consistency_detector_test.go @@ -73,7 +73,7 @@ func runTestReflectorDataConsistencyDetector(t *testing.T, transformer Transform defer cancel() store := NewStore(MetaNamespaceKeyFunc) - fifo := newQueueFIFO(logger, store, transformer, InformerNameAndResource{}, nil) + _, fifo := newQueueFIFO(logger, nil, store, transformer, InformerNameAndResource{}, nil) lw := &ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go index 45daafa6177..503cd2e2dac 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "slices" "sync" "sync/atomic" "time" @@ -194,6 +195,14 @@ type SharedInformer interface { // For that, please call HasSynced on the handle returned by // AddEventHandler. HasSynced() bool + // HasSyncedChecker completes if the shared informer's store has been + // informed by at least one full LIST of the authoritative state + // of the informer's object collection. This is unrelated to "resync". + // + // Note that this doesn't tell you if an individual handler is synced!! + // For that, please use HasSyncedChecker on the handle returned by + // AddEventHandler. + HasSyncedChecker() DoneChecker // LastSyncResourceVersion is the resource version observed when last synced with the underlying // store. The value returned is not synchronized with access to the underlying store and is not // thread-safe. @@ -247,6 +256,10 @@ type ResourceEventHandlerRegistration interface { // HasSynced reports if both the parent has synced and all pre-sync // events have been delivered. HasSynced() bool + + // HasSyncedChecker reports if both the parent has synced and all pre-sync + // events have been delivered. + HasSyncedChecker() DoneChecker } // Optional configuration options for [SharedInformer.AddEventHandlerWithOptions]. @@ -309,6 +322,7 @@ func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.O return &sharedIndexInformer{ indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers), processor: processor, + synced: make(chan struct{}), listerWatcher: lw, objectType: exampleObject, objectDescription: options.ObjectDescription, @@ -414,6 +428,107 @@ func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool return true } +// WaitFor waits for a set of activities to complete, like cache syncing. +// It returns true if it was successful, false if the context was canceled +// before all activities are completed. +// +// If a non-nil "what" is provided, then progress information is logged +// while waiting ("Waiting", for=""). +// +// In contrast to other WaitForCacheSync alternatives, this one here doesn't +// need polling, which makes it react immediately. When used in a synctest unit +// test, waiting completes without moving time forward randomly, which +// makes tests more predictable. +func WaitFor(ctx context.Context, what string, checkers ...DoneChecker) bool { + logger := klog.FromContext(ctx) + if what != "" { + helper, l := logger.WithCallStackHelper() + logger = l + helper() + logger.Info("Waiting", "for", what) + } + + // Check in parallel to ensure that we log "Done waiting" as soon + // as possible for each checker. The timing may be useful to know. + // We cannot log inside the goroutine, the stack unwinding wouldn't + // work, so instead each goroutine just notifies the parent + // goroutine when it's checker is done and the main goroutine then + // logs it. + var wg sync.WaitGroup + type result struct { + checker DoneChecker + done bool + } + doneChecker := make(chan result) + for _, checker := range checkers { + wg.Go(func() { + select { + case <-checker.Done(): + doneChecker <- result{checker, true} + case <-ctx.Done(): + // We can end up here even when the checker is already done, + // select is not deterministic. Check once more without blocking + // before finally giving up. + select { + case <-checker.Done(): + doneChecker <- result{checker, true} + default: + doneChecker <- result{checker, false} + } + } + }) + } + instances := make([]string, 0, len(checkers)) + for range len(checkers) { + // We are guaranteed to get exactly one result from each goroutine, so this won't block forever. + result := <-doneChecker + if result.done { + if what != "" { + logger.Info("Done waiting", "for", what, "instance", result.checker.Name()) + } + } else { + // We don't need this information unless we are a) logging or b) debugging interactively. + instances = append(instances, result.checker.Name()) + } + } + wg.Wait() + + if what != "" && len(instances) > 0 { + slices.Sort(instances) + logger.Info("Timed out waiting", "for", what, "cause", context.Cause(ctx), "instances", instances) + } + + done := len(instances) == 0 + return done +} + +// DoneChecker, in contrast to [InformerSynced], supports waiting +// for some activity to finish without polling and has a name +// that describes itself. +// +// To check for completion without blocking, use [IsDone]. +type DoneChecker interface { + // Name returns a string describing the entity that is being waited for. + // + // Note that this name might be computed, so callers should only + // get the name outside of a hot code path. + Name() string + + // Done returns a channel that will be closed on completion + // of the activity. + Done() <-chan struct{} +} + +// IsDone returns true if the activity is done, false otherwise. +func IsDone(checker DoneChecker) bool { + select { + case <-checker.Done(): + return true + default: + return false + } +} + // `*sharedIndexInformer` implements SharedIndexInformer and has three // main components. One is an indexed local cache, `indexer Indexer`. // The second main component is a Controller that pulls @@ -431,6 +546,10 @@ type sharedIndexInformer struct { indexer Indexer controller Controller + // synced gets created when creating the sharedIndexInformer. + // It gets closed when Run detects that the processor created + synced chan struct{} + processor *sharedProcessor cacheMutationDetector MutationDetector @@ -494,6 +613,10 @@ func (v *dummyController) HasSynced() bool { return v.informer.HasSynced() } +func (v *dummyController) HasSyncedChecker() DoneChecker { + return v.informer.HasSyncedChecker() +} + func (v *dummyController) LastSyncResourceVersion() string { if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InformerResourceVersion) { return v.informer.LastSyncResourceVersion() @@ -563,7 +686,7 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) { s.startedLock.Lock() defer s.startedLock.Unlock() - fifo := newQueueFIFO(logger, s.indexer, s.transform, s.identifier, s.fifoMetricsProvider) + logger, fifo := newQueueFIFO(logger, s.objectType, s.indexer, s.transform, s.identifier, s.fifoMetricsProvider) cfg := &Config{ Queue: fifo, @@ -573,8 +696,12 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) { FullResyncPeriod: s.resyncCheckPeriod, ShouldResync: s.processor.shouldResync, - Process: s.HandleDeltas, - ProcessBatch: s.HandleBatchDeltas, + Process: func(obj interface{}, isInInitialList bool) error { + return s.handleDeltas(logger, obj, isInInitialList) + }, + ProcessBatch: func(deltas []Delta, isInInitialList bool) error { + return s.handleBatchDeltas(logger, deltas, isInInitialList) + }, WatchErrorHandlerWithContext: s.watchErrorHandler, } @@ -594,6 +721,15 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) { // has a RunWithContext method that we can use here. wg.StartWithChannel(processorStopCtx.Done(), s.cacheMutationDetector.Run) wg.StartWithContext(processorStopCtx, s.processor.run) + wg.Start(func() { + select { + case <-ctx.Done(): + // We were stopped without completing the sync. + case <-s.controller.HasSyncedChecker().Done(): + // Controller has synced and thus so have we. + close(s.synced) + } + }) defer func() { s.startedLock.Lock() @@ -610,13 +746,31 @@ func (s *sharedIndexInformer) HasStarted() bool { } func (s *sharedIndexInformer) HasSynced() bool { - s.startedLock.Lock() - defer s.startedLock.Unlock() - - if s.controller == nil { + select { + case <-s.synced: + return true + default: return false } - return s.controller.HasSynced() +} + +func (s *sharedIndexInformer) HasSyncedChecker() DoneChecker { + return &sharedIndexInformerDone{ + s: s, + } +} + +// sharedIndexInformerDone implements [NamedCacheSync] for a [sharedIndexInformer]. +type sharedIndexInformerDone struct { + s *sharedIndexInformer +} + +func (sd *sharedIndexInformerDone) Name() string { + return fmt.Sprintf("SharedIndexInformer %T", sd.s.objectType) +} + +func (sd *sharedIndexInformerDone) Done() <-chan struct{} { + return sd.s.synced } func (s *sharedIndexInformer) LastSyncResourceVersion() string { @@ -708,7 +862,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithOptions(handler ResourceEventHa } } - listener := newProcessListener(logger, handler, resyncPeriod, determineResyncPeriod(logger, resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSynced) + listener := newProcessListener(logger, handler, resyncPeriod, determineResyncPeriod(logger, resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSyncedChecker()) if !s.started { return s.processor.addListener(listener), nil @@ -737,20 +891,20 @@ func (s *sharedIndexInformer) AddEventHandlerWithOptions(handler ResourceEventHa return handle, nil } -func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool) error { +func (s *sharedIndexInformer) handleDeltas(logger klog.Logger, obj interface{}, isInInitialList bool) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() if deltas, ok := obj.(Deltas); ok { - return processDeltas(s, s.indexer, deltas, isInInitialList, s.keyFunc) + return processDeltas(logger, s, s.indexer, deltas, isInInitialList, s.keyFunc) } return errors.New("object given as Process argument is not Deltas") } -func (s *sharedIndexInformer) HandleBatchDeltas(deltas []Delta, isInInitialList bool) error { +func (s *sharedIndexInformer) handleBatchDeltas(logger klog.Logger, deltas []Delta, isInInitialList bool) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() - return processDeltasInBatch(s, s.indexer, deltas, isInInitialList, s.keyFunc) + return processDeltasInBatch(logger, s, s.indexer, deltas, isInInitialList, s.keyFunc) } // Conforms to ResourceEventHandler @@ -854,6 +1008,7 @@ func (p *sharedProcessor) addListener(listener *processorListener) ResourceEvent p.listeners[listener] = true if p.listenersStarted { + p.wg.Start(listener.watchSynced) p.wg.Start(listener.run) p.wg.Start(listener.pop) } @@ -925,6 +1080,7 @@ func (p *sharedProcessor) run(ctx context.Context) { p.listenersLock.Lock() defer p.listenersLock.Unlock() for listener := range p.listeners { + p.wg.Start(listener.watchSynced) p.wg.Start(listener.run) p.wg.Start(listener.pop) } @@ -986,7 +1142,7 @@ func (p *sharedProcessor) resyncCheckPeriodChanged(logger klog.Logger, resyncChe } // processorListener relays notifications from a sharedProcessor to -// one ResourceEventHandler --- using two goroutines, two unbuffered +// one ResourceEventHandler --- using three goroutines, two unbuffered // channels, and an unbounded ring buffer. The `add(notification)` // function sends the given notification to `addCh`. One goroutine // runs `pop()`, which pumps notifications from `addCh` to `nextCh` @@ -994,16 +1150,23 @@ func (p *sharedProcessor) resyncCheckPeriodChanged(logger klog.Logger, resyncChe // Another goroutine runs `run()`, which receives notifications from // `nextCh` and synchronously invokes the appropriate handler method. // +// The third goroutine watches the upstream "has synced" channel +// and notifies a SingleFileTracker instance. That instance then +// combines the upstream state and the processListener state to +// implement the overall "event handler has synced". +// // processorListener also keeps track of the adjusted requested resync // period of the listener. type processorListener struct { logger klog.Logger nextCh chan interface{} addCh chan interface{} + done chan struct{} handler ResourceEventHandler - syncTracker *synctrack.SingleFileTracker + syncTracker *synctrack.SingleFileTracker + upstreamHasSynced DoneChecker // pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed. // There is one per listener, but a failing/stalled listener will have infinite pendingNotifications @@ -1041,13 +1204,21 @@ func (p *processorListener) HasSynced() bool { return p.syncTracker.HasSynced() } -func newProcessListener(logger klog.Logger, handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced func() bool) *processorListener { +// HasNamedSync is done if the source informer has synced, and all +// corresponding events have been delivered. +func (p *processorListener) HasSyncedChecker() DoneChecker { + return p.syncTracker +} + +func newProcessListener(logger klog.Logger, handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced DoneChecker) *processorListener { ret := &processorListener{ logger: logger, nextCh: make(chan interface{}), addCh: make(chan interface{}), + done: make(chan struct{}), + upstreamHasSynced: hasSynced, handler: handler, - syncTracker: &synctrack.SingleFileTracker{UpstreamHasSynced: hasSynced}, + syncTracker: synctrack.NewSingleFileTracker(fmt.Sprintf("%s + event handler %s", hasSynced.Name(), nameForHandler(handler))), pendingNotifications: *buffer.NewRingGrowing(bufferSize), requestedResyncPeriod: requestedResyncPeriod, resyncPeriod: resyncPeriod, @@ -1068,6 +1239,7 @@ func (p *processorListener) add(notification interface{}) { func (p *processorListener) pop() { defer utilruntime.HandleCrashWithLogger(p.logger) defer close(p.nextCh) // Tell .run() to stop + defer close(p.done) // Tell .watchSynced() to stop var nextCh chan<- interface{} var notification interface{} @@ -1131,6 +1303,16 @@ func (p *processorListener) run() { } } +func (p *processorListener) watchSynced() { + select { + case <-p.upstreamHasSynced.Done(): + // Notify tracker that the upstream has synced. + p.syncTracker.UpstreamHasSynced() + case <-p.done: + // Give up waiting for sync. + } +} + // shouldResync determines if the listener needs a resync. If the listener's resyncPeriod is 0, // this always returns false. func (p *processorListener) shouldResync(now time.Time) bool { diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go index 0b40315a8c2..835e6debdcb 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go @@ -1134,8 +1134,13 @@ func TestAddWhileActive(t *testing.T) { return } - if !handle1.HasSynced() { - t.Error("Not synced after Run??") + select { + case <-handle1.HasSyncedChecker().Done(): + if !handle1.HasSynced() { + t.Error("Not synced after channel said we are synced??") + } + case <-time.After(10 * time.Second): + t.Error("Not synced 10 seconds after Run??") } listener2.lock.Lock() // ensure we observe it before it has synced diff --git a/staging/src/k8s.io/client-go/tools/cache/synctrack/synctrack.go b/staging/src/k8s.io/client-go/tools/cache/synctrack/synctrack.go index 3fa2beb6b71..e941dafb85a 100644 --- a/staging/src/k8s.io/client-go/tools/cache/synctrack/synctrack.go +++ b/staging/src/k8s.io/client-go/tools/cache/synctrack/synctrack.go @@ -20,6 +20,7 @@ limitations under the License. package synctrack import ( + "context" "sync" "sync/atomic" @@ -27,11 +28,32 @@ import ( ) // AsyncTracker helps propagate HasSynced in the face of multiple worker threads. +// The user has to monitor the upstream "has synced" +// and notify the tracker when that changes from false to true. type AsyncTracker[T comparable] struct { - UpstreamHasSynced func() bool + // name describes the instance. + name string + + // upstreamHasSynced is changed from false (initial value) to true + // when UpstreamHasSynced is called. + upstreamHasSynced atomic.Bool lock sync.Mutex waiting sets.Set[T] + + // synced gets canceled once both the tracker and upstream are synced. + // A context is convenient for this because it gives us a channel + // and handles thread-safety. + synced context.Context + cancel func() +} + +func NewAsyncTracker[T comparable](name string) *AsyncTracker[T] { + t := &AsyncTracker[T]{ + name: name, + } + t.synced, t.cancel = context.WithCancel(context.Background()) + return t } // Start should be called prior to processing each key which is part of the @@ -57,6 +79,28 @@ func (t *AsyncTracker[T]) Finished(key T) { if t.waiting != nil { t.waiting.Delete(key) } + + // Maybe synced now? + if t.upstreamHasSynced.Load() && len(t.waiting) == 0 { + // Mark as synced. + t.cancel() + } +} + +// UpstreamHasSynced needs to be called at least once as soon as +// the upstream "has synced" becomes true. It tells AsyncTracker +// that the source is synced. +// +// Must be called after handing over the initial list to Start. +func (t *AsyncTracker[T]) UpstreamHasSynced() { + // Upstream is done, but we might not be yet. + t.upstreamHasSynced.Store(true) + t.lock.Lock() + defer t.lock.Unlock() + if len(t.waiting) == 0 { + // Mark as synced. + t.cancel() + } } // HasSynced returns true if the source is synced and every key present in the @@ -64,27 +108,51 @@ func (t *AsyncTracker[T]) Finished(key T) { // itself synced until *after* it has delivered the notification for the last // key, and that notification handler must have called Start. func (t *AsyncTracker[T]) HasSynced() bool { - // Call UpstreamHasSynced first: it might take a lock, which might take - // a significant amount of time, and we can't hold our lock while - // waiting on that or a user is likely to get a deadlock. - if !t.UpstreamHasSynced() { - return false - } - t.lock.Lock() - defer t.lock.Unlock() - return t.waiting.Len() == 0 + return t.synced.Err() != nil +} + +// Done returns a channel that is closed if the source is synced and every key present in the +// initial list has been processed. This relies on the source not considering +// itself synced until *after* it has delivered the notification for the last +// key, and that notification handler must have called Start. +func (t *AsyncTracker[T]) Done() <-chan struct{} { + return t.synced.Done() +} + +func (t *AsyncTracker[T]) Name() string { + return t.name } // SingleFileTracker helps propagate HasSynced when events are processed in -// order (i.e. via a queue). +// order (i.e. via a queue). The user has to monitor the upstream "has synced" +// and notify the tracker when that changes from false to true. type SingleFileTracker struct { + // name describes the instance. + name string + // Important: count is used with atomic operations so it must be 64-bit // aligned, otherwise atomic operations will panic. Having it at the top of // the struct will guarantee that, even on 32-bit arches. // See https://pkg.go.dev/sync/atomic#pkg-note-BUG for more information. count int64 - UpstreamHasSynced func() bool + // upstreamHasSynced is changed from false (initial value) to true + // when UpstreamHasSynced is called. + upstreamHasSynced atomic.Bool + + // synced gets canceled once both the tracker and upstream are synced. + // A context is convenient for this because it gives us a channel + // and handles thread-safety. + synced context.Context + cancel func() +} + +func NewSingleFileTracker(name string) *SingleFileTracker { + t := &SingleFileTracker{ + name: name, + } + t.synced, t.cancel = context.WithCancel(context.Background()) + return t } // Start should be called prior to processing each key which is part of the @@ -103,6 +171,26 @@ func (t *SingleFileTracker) Finished() { if result < 0 { panic("synctrack: negative counter; this logic error means HasSynced may return incorrect value") } + + // Maybe synced now? + if result == 0 && t.upstreamHasSynced.Load() { + // Mark as synced. + t.cancel() + } +} + +// UpstreamHasSynced needs to be called at least once as soon as +// the upstream "has synced" becomes true. It tells SingleFileTracker +// that the source is synced. +// +// Must be called after handing over the initial list to Start. +func (t *SingleFileTracker) UpstreamHasSynced() { + // Upstream is done, but we might not be yet. + t.upstreamHasSynced.Store(true) + if atomic.LoadInt64(&t.count) == 0 { + // Mark as synced. + t.cancel() + } } // HasSynced returns true if the source is synced and every key present in the @@ -110,11 +198,17 @@ func (t *SingleFileTracker) Finished() { // itself synced until *after* it has delivered the notification for the last // key, and that notification handler must have called Start. func (t *SingleFileTracker) HasSynced() bool { - // Call UpstreamHasSynced first: it might take a lock, which might take - // a significant amount of time, and we don't want to then act on a - // stale count value. - if !t.UpstreamHasSynced() { - return false - } - return atomic.LoadInt64(&t.count) <= 0 + return t.synced.Err() != nil +} + +// Done returns a channel that is closed if the source is synced and every key present in the +// initial list has been processed. This relies on the source not considering +// itself synced until *after* it has delivered the notification for the last +// key, and that notification handler must have called Start. +func (t *SingleFileTracker) Done() <-chan struct{} { + return t.synced.Done() +} + +func (t *SingleFileTracker) Name() string { + return t.name } diff --git a/staging/src/k8s.io/client-go/tools/cache/synctrack/synctrack_test.go b/staging/src/k8s.io/client-go/tools/cache/synctrack/synctrack_test.go index 4cf089e225c..bff8838e8ae 100644 --- a/staging/src/k8s.io/client-go/tools/cache/synctrack/synctrack_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/synctrack/synctrack_test.go @@ -19,29 +19,24 @@ package synctrack import ( "strings" "sync" - "time" "testing" ) -func testSingleFileFuncs(upstreamHasSynced func() bool) (start func(), finished func(), hasSynced func() bool) { - tracker := SingleFileTracker{ - UpstreamHasSynced: upstreamHasSynced, - } - return tracker.Start, tracker.Finished, tracker.HasSynced +func testSingleFileFuncs() (upstreamHasSynced func(), start func(), finished func(), hasSynced func() bool, synced <-chan struct{}) { + tracker := NewSingleFileTracker("") + return tracker.UpstreamHasSynced, tracker.Start, tracker.Finished, tracker.HasSynced, tracker.Done() } -func testAsyncFuncs(upstreamHasSynced func() bool) (start func(), finished func(), hasSynced func() bool) { - tracker := AsyncTracker[string]{ - UpstreamHasSynced: upstreamHasSynced, - } - return func() { tracker.Start("key") }, func() { tracker.Finished("key") }, tracker.HasSynced +func testAsyncFuncs() (upstreamHasSynced func(), start func(), finished func(), hasSynced func() bool, synced <-chan struct{}) { + tracker := NewAsyncTracker[string]("") + return tracker.UpstreamHasSynced, func() { tracker.Start("key") }, func() { tracker.Finished("key") }, tracker.HasSynced, tracker.Done() } func TestBasicLogic(t *testing.T) { table := []struct { name string - construct func(func() bool) (func(), func(), func() bool) + construct func() (func(), func(), func(), func() bool, <-chan struct{}) }{ {"SingleFile", testSingleFileFuncs}, {"Async", testAsyncFuncs}, @@ -50,27 +45,87 @@ func TestBasicLogic(t *testing.T) { for _, entry := range table { t.Run(entry.name, func(t *testing.T) { table := []struct { - synced bool + synced bool + syncedBeforeFinish bool + start bool finish bool expectSynced bool }{ - {false, true, true, false}, - {true, true, false, false}, - {false, true, false, false}, - {true, true, true, true}, + {false, false, true, true, false}, + {true, false, true, false, false}, + {true, true, true, false, false}, + {false, false, true, false, false}, + {true, false, true, true, true}, + {true, true, true, true, true}, } for _, tt := range table { - Start, Finished, HasSynced := entry.construct(func() bool { return tt.synced }) + upstreamHasSynced, start, finished, hasSynced, synced := entry.construct() + syncedDone := func() bool { + select { + case <-synced: + return true + default: + return false + } + } + + if hasSynced() { + t.Errorf("for %#v got HasSynced() true before start (wanted false)", tt) + } + if syncedDone() { + t.Errorf("for %#v got Done() true before start (wanted false)", tt) + } + if tt.start { - Start() + start() + } + + if hasSynced() { + t.Errorf("for %#v got HasSynced() true after start (wanted false)", tt) + } + if syncedDone() { + t.Errorf("for %#v got Done() true after start (wanted false)", tt) + } + + // "upstream has synced" may occur before or after finished, but not before start. + if tt.synced && tt.syncedBeforeFinish { + upstreamHasSynced() + if hasSynced() { + t.Errorf("for %#v got HasSynced() true after upstreamHasSynced and before finish (wanted false)", tt) + } + if syncedDone() { + t.Errorf("for %#v got Done() true after upstreamHasSynced and before finish (wanted false)", tt) + } } if tt.finish { - Finished() + finished() } - got := HasSynced() - if e, a := tt.expectSynced, got; e != a { - t.Errorf("for %#v got %v (wanted %v)", tt, a, e) + if tt.synced && !tt.syncedBeforeFinish { + if hasSynced() { + t.Errorf("for %#v got HasSynced() true after finish and before upstreamHasSynced (wanted false)", tt) + } + if syncedDone() { + t.Errorf("for %#v got Done() true after finish and before upstreamHasSynced (wanted false)", tt) + } + upstreamHasSynced() + } + if e, a := tt.expectSynced, hasSynced(); e != a { + t.Errorf("for %#v got HasSynced() %v (wanted %v)", tt, a, e) + } + if e, a := tt.expectSynced, syncedDone(); e != a { + t.Errorf("for %#v got Done() %v (wanted %v)", tt, a, e) + } + + select { + case <-synced: + if !tt.expectSynced { + t.Errorf("for %#v got done (wanted not done)", tt) + } + default: + if tt.expectSynced { + t.Errorf("for %#v got done (wanted not done)", tt) + } } } }) @@ -78,7 +133,7 @@ func TestBasicLogic(t *testing.T) { } func TestAsyncLocking(t *testing.T) { - aft := AsyncTracker[int]{UpstreamHasSynced: func() bool { return true }} + aft := NewAsyncTracker[int]("") var wg sync.WaitGroup for _, i := range []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10} { @@ -92,6 +147,7 @@ func TestAsyncLocking(t *testing.T) { }(i) } wg.Wait() + aft.UpstreamHasSynced() if !aft.HasSynced() { t.Errorf("async tracker must have made a threading error?") } @@ -99,7 +155,7 @@ func TestAsyncLocking(t *testing.T) { } func TestSingleFileCounting(t *testing.T) { - sft := SingleFileTracker{UpstreamHasSynced: func() bool { return true }} + sft := NewSingleFileTracker("") for i := 0; i < 100; i++ { sft.Start() @@ -115,6 +171,8 @@ func TestSingleFileCounting(t *testing.T) { } sft.Finished() + sft.UpstreamHasSynced() + if !sft.HasSynced() { t.Fatal("Unexpectedly not synced?") } @@ -148,24 +206,35 @@ func TestSingleFileCounting(t *testing.T) { func TestSingleFile(t *testing.T) { table := []struct { - synced bool + synced bool + syncedBeforeStops bool + starts int stops int expectSynced bool }{ - {false, 1, 1, false}, - {true, 1, 0, false}, - {false, 1, 0, false}, - {true, 1, 1, true}, + {false, false, 1, 1, false}, + {true, false, 1, 0, false}, + {true, true, 1, 0, false}, + {false, false, 1, 0, false}, + {true, false, 1, 1, true}, + {true, true, 1, 1, true}, } for _, tt := range table { - sft := SingleFileTracker{UpstreamHasSynced: func() bool { return tt.synced }} + sft := NewSingleFileTracker("") for i := 0; i < tt.starts; i++ { sft.Start() } + // "upstream has synced" may occur before or after finished, but not before start. + if tt.synced && tt.syncedBeforeStops { + sft.UpstreamHasSynced() + } for i := 0; i < tt.stops; i++ { sft.Finished() } + if tt.synced && !tt.syncedBeforeStops { + sft.UpstreamHasSynced() + } got := sft.HasSynced() if e, a := tt.expectSynced, got; e != a { t.Errorf("for %#v got %v (wanted %v)", tt, a, e) @@ -173,67 +242,3 @@ func TestSingleFile(t *testing.T) { } } - -func TestNoStaleValue(t *testing.T) { - table := []struct { - name string - construct func(func() bool) (func(), func(), func() bool) - }{ - {"SingleFile", testSingleFileFuncs}, - {"Async", testAsyncFuncs}, - } - - for _, entry := range table { - t.Run(entry.name, func(t *testing.T) { - var lock sync.Mutex - upstreamHasSynced := func() bool { - lock.Lock() - defer lock.Unlock() - return true - } - - Start, Finished, HasSynced := entry.construct(upstreamHasSynced) - - // Ordinarily the corresponding lock would be held and you wouldn't be - // able to call this function at this point. - if !HasSynced() { - t.Fatal("Unexpectedly not synced??") - } - - Start() - if HasSynced() { - t.Fatal("Unexpectedly synced??") - } - Finished() - if !HasSynced() { - t.Fatal("Unexpectedly not synced??") - } - - // Now we will prove that if the lock is held, you can't get a false - // HasSynced return. - lock.Lock() - - // This goroutine calls HasSynced - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - if HasSynced() { - t.Error("Unexpectedly synced??") - } - }() - - // This goroutine increments + unlocks. The sleep is to bias the - // runtime such that the other goroutine usually wins (it needs to work - // in both orderings, this one is more likely to be buggy). - go func() { - time.Sleep(time.Millisecond) - Start() - lock.Unlock() - }() - - wg.Wait() - }) - } - -} diff --git a/staging/src/k8s.io/client-go/tools/cache/the_real_fifo.go b/staging/src/k8s.io/client-go/tools/cache/the_real_fifo.go index c60d2ebb8b2..4856a9bafff 100644 --- a/staging/src/k8s.io/client-go/tools/cache/the_real_fifo.go +++ b/staging/src/k8s.io/client-go/tools/cache/the_real_fifo.go @@ -34,6 +34,7 @@ type RealFIFOOptions struct { Logger *klog.Logger // Name can be used to override the default "RealFIFO" name for the new instance. + // Optional. Used only if Identifier.Name returns an empty string. Name string // KeyFunction is used to figure out what key an object should have. (It's @@ -98,6 +99,11 @@ type RealFIFO struct { items []Delta + // synced is initially an open channel. It gets closed (once!) by checkSynced_locked + // as soon as the initial sync is considered complete. + synced chan struct{} + syncedClosed bool + // populated is true if the first batch of items inserted by Replace() has been populated // or Delete/Add/Update was called first. populated bool @@ -160,6 +166,7 @@ type SyncAllInfo struct{} var ( _ = Queue(&RealFIFO{}) // RealFIFO is a Queue _ = TransformingStore(&RealFIFO{}) // RealFIFO implements TransformingStore to allow memory optimizations + _ = DoneChecker(&RealFIFO{}) // RealFIFO and implements DoneChecker. ) // Close the queue. @@ -196,11 +203,37 @@ func (f *RealFIFO) HasSynced() bool { return f.hasSynced_locked() } -// ignoring lint to reduce delta to the original for review. It's ok adjust later. -// -//lint:file-ignore ST1003: should not use underscores in Go names +// HasSyncedChecker is done if an Add/Update/Delete/AddIfNotPresent are called first, +// or the first batch of items inserted by Replace() has been popped. +func (f *RealFIFO) HasSyncedChecker() DoneChecker { + return f +} + +// Name implements [DoneChecker.Name] +func (f *RealFIFO) Name() string { + return f.name +} + +// Done implements [DoneChecker.Done] +func (f *RealFIFO) Done() <-chan struct{} { + return f.synced +} + +// hasSynced_locked returns the result of a prior checkSynced_locked call. func (f *RealFIFO) hasSynced_locked() bool { - return f.populated && f.initialPopulationCount == 0 + return f.syncedClosed +} + +// checkSynced_locked checks whether the initial batch of items (set via Replace) has been delivered +// and closes the synced channel as needed. It must be called after changing f.populated and/or +// f.initialPopulationCount while the mutex is still locked. +func (f *RealFIFO) checkSynced_locked() { + synced := f.populated && f.initialPopulationCount == 0 + if synced && !f.syncedClosed { + // Initial sync is complete. + f.syncedClosed = true + close(f.synced) + } } // addToItems_locked appends to the delta list. @@ -291,6 +324,7 @@ func (f *RealFIFO) Add(obj interface{}) error { defer f.lock.Unlock() f.populated = true + f.checkSynced_locked() retErr := f.addToItems_locked(Added, false, obj) return retErr @@ -302,6 +336,7 @@ func (f *RealFIFO) Update(obj interface{}) error { defer f.lock.Unlock() f.populated = true + f.checkSynced_locked() retErr := f.addToItems_locked(Updated, false, obj) return retErr @@ -315,6 +350,7 @@ func (f *RealFIFO) Delete(obj interface{}) error { defer f.lock.Unlock() f.populated = true + f.checkSynced_locked() retErr := f.addToItems_locked(Deleted, false, obj) return retErr @@ -362,6 +398,7 @@ func (f *RealFIFO) Pop(process PopProcessFunc) (interface{}, error) { defer func() { if f.initialPopulationCount > 0 { f.initialPopulationCount-- + f.checkSynced_locked() } }() @@ -482,7 +519,6 @@ func (f *RealFIFO) PopBatch(processBatch ProcessBatchFunc, processSingle PopProc unique.Insert(id) moveDeltaToProcessList(i) } - f.items = f.items[len(deltas):] // Decrement initialPopulationCount if needed. // This is done in a defer so we only do this *after* processing is complete, @@ -490,6 +526,7 @@ func (f *RealFIFO) PopBatch(processBatch ProcessBatchFunc, processSingle PopProc defer func() { if f.initialPopulationCount > 0 { f.initialPopulationCount -= len(deltas) + f.checkSynced_locked() } }() @@ -539,7 +576,7 @@ func (f *RealFIFO) Replace(newItems []interface{}, resourceVersion string) error if f.emitAtomicEvents { err = f.addReplaceToItemsLocked(newItems, resourceVersion) } else { - err = reconcileReplacement(f.items, f.knownObjects, newItems, f.keyOf, + err = reconcileReplacement(f.logger, f.items, f.knownObjects, newItems, f.keyOf, func(obj DeletedFinalStateUnknown) error { return f.addToItems_locked(Deleted, true, obj) }, @@ -554,6 +591,7 @@ func (f *RealFIFO) Replace(newItems []interface{}, resourceVersion string) error if !f.populated { f.populated = true f.initialPopulationCount = len(f.items) + f.checkSynced_locked() } return nil @@ -563,6 +601,7 @@ func (f *RealFIFO) Replace(newItems []interface{}, resourceVersion string) error // and based upon the state of the items in the queue and known objects will call onDelete and onReplace // depending upon whether the item is being deleted or replaced/added. func reconcileReplacement( + logger klog.Logger, queuedItems []Delta, knownObjects KeyListerGetter, newItems []interface{}, @@ -638,10 +677,10 @@ func reconcileReplacement( deletedObj, exists, err := knownObjects.GetByKey(knownKey) if err != nil { deletedObj = nil - utilruntime.HandleErrorWithLogger(klog.TODO(), err, "Error during lookup, placing DeleteFinalStateUnknown marker without object", "key", knownKey) + utilruntime.HandleErrorWithLogger(logger, err, "Error during lookup, placing DeleteFinalStateUnknown marker without object", "key", knownKey) } else if !exists { deletedObj = nil - utilruntime.HandleErrorWithLogger(klog.TODO(), nil, "Key does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", "key", knownKey) + utilruntime.HandleErrorWithLogger(logger, nil, "Key does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", "key", knownKey) } retErr := onDelete(DeletedFinalStateUnknown{ Key: knownKey, @@ -757,6 +796,7 @@ func NewRealFIFOWithOptions(opts RealFIFOOptions) *RealFIFO { logger: klog.Background(), name: "RealFIFO", items: make([]Delta, 0, 10), + synced: make(chan struct{}), keyFunc: opts.KeyFunction, knownObjects: opts.KnownObjects, transformer: opts.Transformer, @@ -769,8 +809,11 @@ func NewRealFIFOWithOptions(opts RealFIFOOptions) *RealFIFO { if opts.Logger != nil { f.logger = *opts.Logger } - if opts.Name != "" { - f.name = opts.Name + if name := opts.Name; name != "" { + f.name = name + } + if name := opts.Identifier.Name(); name != "" { + f.name = name } f.logger = klog.LoggerWithName(f.logger, f.name) f.cond.L = &f.lock diff --git a/staging/src/k8s.io/client-go/tools/cache/wait_test.go b/staging/src/k8s.io/client-go/tools/cache/wait_test.go new file mode 100644 index 00000000000..b5368368903 --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/cache/wait_test.go @@ -0,0 +1,200 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "bytes" + "context" + "errors" + "fmt" + "os" + "runtime" + "sync" + "testing" + "testing/synctest" + "time" + + "github.com/stretchr/testify/assert" + "k8s.io/klog/v2" + "k8s.io/klog/v2/textlogger" +) + +func init() { + // The test below is sensitive to the time zone, log output uses time.Local. + time.Local = time.UTC +} + +func TestWaitFor(t *testing.T) { + for name, tc := range map[string]struct { + what string + checkers []DoneChecker + timeout time.Duration + timeoutReason string + + expectDone bool + + // Time is predictable and starts at the synctest epoch. + // %[1]d is the pid, %[2]d the line number of the WaitFor call. + expectOutput string + }{ + "empty": { + expectDone: true, + }, + "no-caches": { + what: "my-caches", + expectDone: true, + expectOutput: `I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Waiting" for="my-caches" +`, + }, + "no-logging": { + checkers: []DoneChecker{newMockChecker("first", 10*time.Second), newMockChecker("second", 5*time.Second), newMockChecker("last", 0*time.Second)}, + expectDone: true, + }, + "with-logging": { + what: "my-caches", + checkers: []DoneChecker{newMockChecker("first", 10*time.Second), newMockChecker("second", 5*time.Second), newMockChecker("last", 0*time.Second)}, + expectDone: true, + expectOutput: `I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Waiting" for="my-caches" +I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="last" +I0101 00:00:05.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="second" +I0101 00:00:10.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="first" +`, + }, + "some-timeout": { + timeout: time.Minute, + what: "my-caches", + checkers: []DoneChecker{newMockChecker("first", 10*time.Second), newMockChecker("second", -1), newMockChecker("last", 0*time.Second)}, + expectDone: false, + expectOutput: `I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Waiting" for="my-caches" +I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="last" +I0101 00:00:10.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="first" +I0101 00:01:00.000000 %7[1]d wait_test.go:%[2]d] "Timed out waiting" for="my-caches" cause="context deadline exceeded" instances=["second"] +`, + }, + "some-canceled": { + timeout: -1, + what: "my-caches", + checkers: []DoneChecker{newMockChecker("first", 10*time.Second), newMockChecker("second", -1), newMockChecker("last", 0*time.Second)}, + expectDone: false, + expectOutput: `I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Waiting" for="my-caches" +I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="last" +I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Timed out waiting" for="my-caches" cause="context canceled" instances=["first","second"] +`, + }, + "more": { + timeoutReason: "go fish", + timeout: 5 * time.Second, + what: "my-caches", + checkers: []DoneChecker{newMockChecker("first", 10*time.Second), newMockChecker("second", -1), newMockChecker("last", 0*time.Second)}, + expectDone: false, + expectOutput: `I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Waiting" for="my-caches" +I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="last" +I0101 00:00:05.000000 %7[1]d wait_test.go:%[2]d] "Timed out waiting" for="my-caches" cause="go fish" instances=["first","second"] +`, + }, + "all": { + timeout: time.Minute, + what: "my-caches", + checkers: []DoneChecker{newMockChecker("first", -1), newMockChecker("second", -1), newMockChecker("last", -1)}, + expectDone: false, + expectOutput: `I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Waiting" for="my-caches" +I0101 00:01:00.000000 %7[1]d wait_test.go:%[2]d] "Timed out waiting" for="my-caches" cause="context deadline exceeded" instances=["first","last","second"] +`, + }, + } { + t.Run(name, func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + var buffer bytes.Buffer + logger := textlogger.NewLogger(textlogger.NewConfig(textlogger.Output(&buffer))) + ctx := klog.NewContext(context.Background(), logger) + var wg sync.WaitGroup + defer wg.Wait() + if tc.timeout != 0 { + switch tc.timeoutReason { + case "": + if tc.timeout > 0 { + c, cancel := context.WithTimeout(ctx, tc.timeout) + defer cancel() + ctx = c + } else { + c, cancel := context.WithCancel(ctx) + cancel() + ctx = c + } + default: + c, cancel := context.WithCancelCause(ctx) + wg.Go(func() { + time.Sleep(tc.timeout) + cancel(errors.New(tc.timeoutReason)) + }) + ctx = c + } + } + _, _, line, _ := runtime.Caller(0) + done := WaitFor(ctx, tc.what, tc.checkers...) + expectOutput := tc.expectOutput + if expectOutput != "" { + expectOutput = fmt.Sprintf(expectOutput, os.Getpid(), line+1) + } + assert.Equal(t, tc.expectDone, done, "done") + assert.Equal(t, expectOutput, buffer.String(), "output") + }) + }) + } +} + +// newMockChecker can be created outside of a synctest bubble. +// It constructs the channel inside when Done is first called. +func newMockChecker(name string, delay time.Duration) DoneChecker { + return &mockChecker{ + name: name, + delay: delay, + } +} + +type mockChecker struct { + name string + delay time.Duration + initialized bool + done <-chan struct{} +} + +func (m *mockChecker) Name() string { return m.name } +func (m *mockChecker) Done() <-chan struct{} { + if !m.initialized { + switch { + case m.delay > 0: + // In the future. + ctx := context.Background() + // This leaks a cancel, but is hard to avoid (cannot use the parent t.Cleanup, no other way to delay calling it). Doesn't matter in a unit test. + //nolint:govet + ctx, _ = context.WithTimeout(ctx, m.delay) + m.done = ctx.Done() + case m.delay == 0: + // Immediately. + c := make(chan struct{}) + close(c) + m.done = c + default: + // Never. + c := make(chan struct{}) + m.done = c + } + m.initialized = true + } + return m.done +} diff --git a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/tracker/tracker.go b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/tracker/tracker.go index 5ca9719b288..2c94f309e49 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/tracker/tracker.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/tracker/tracker.go @@ -71,8 +71,16 @@ type Tracker struct { // may be overridden in tests. handleError func(context.Context, error, string, ...any) + // wg and cancel track resp. kill goroutines. + wg sync.WaitGroup + cancel func(error) + // Synchronizes updates to these fields related to event handlers. rwMutex sync.RWMutex + + // synced gets closed once all the tracker's event handlers are synced. + synced chan struct{} + // All registered event handlers. eventHandlers []cache.ResourceEventHandler // The eventQueue contains functions which deliver an event to one @@ -155,6 +163,8 @@ func newTracker(ctx context.Context, opts Options) (finalT *Tracker, finalErr er deviceClasses: opts.ClassInformer.Informer(), patchedResourceSlices: cache.NewStore(cache.MetaNamespaceKeyFunc), handleError: utilruntime.HandleErrorWithContext, + synced: make(chan struct{}), + cancel: func(error) {}, // Real function set in initInformers. eventQueue: *buffer.NewRing[func()](buffer.RingOptions{InitialSize: 0, NormalSize: 4}), } defer func() { @@ -212,6 +222,22 @@ func (t *Tracker) initInformers(ctx context.Context) error { return fmt.Errorf("add event handler for DeviceClasses: %w", err) } + // This usually short-lived goroutines monitors our upstream event handlers and + // closes our own synced channel when they are synced. + monitorCtx, cancel := context.WithCancelCause(ctx) + t.cancel = cancel + t.wg.Go(func() { + for _, handle := range []cache.ResourceEventHandlerRegistration{t.resourceSlicesHandle, t.deviceTaintsHandle, t.deviceClassesHandle} { + select { + case <-handle.HasSyncedChecker().Done(): + case <-monitorCtx.Done(): + // Abort without closing our synced channel. + return + } + } + close(t.synced) + }) + return nil } @@ -220,21 +246,28 @@ func (t *Tracker) initInformers(ctx context.Context) error { // point is possible and will emit events with up-to-date ResourceSlice // objects. func (t *Tracker) HasSynced() bool { + select { + case <-t.HasSyncedChecker().Done(): + return true + default: + return false + } +} + +func (t *Tracker) HasSyncedChecker() cache.DoneChecker { if !t.enableDeviceTaintRules { - return t.resourceSlices.HasSynced() + return t.resourceSlices.HasSyncedChecker() } - if t.resourceSlicesHandle != nil && !t.resourceSlicesHandle.HasSynced() { - return false - } - if t.deviceTaintsHandle != nil && !t.deviceTaintsHandle.HasSynced() { - return false - } - if t.deviceClassesHandle != nil && !t.deviceClassesHandle.HasSynced() { - return false - } + return trackerHasSynced{t} +} - return true +type trackerHasSynced struct{ t *Tracker } + +func (s trackerHasSynced) Name() string { return "ResourceSlice tracker" } + +func (s trackerHasSynced) Done() <-chan struct{} { + return s.t.synced } // Stop ends all background activity and blocks until that shutdown is complete. @@ -243,12 +276,16 @@ func (t *Tracker) Stop() { return } + t.cancel(errors.New("stopped")) + if t.broadcaster != nil { t.broadcaster.Shutdown() } _ = t.resourceSlices.RemoveEventHandler(t.resourceSlicesHandle) _ = t.deviceTaints.RemoveEventHandler(t.deviceTaintsHandle) _ = t.deviceClasses.RemoveEventHandler(t.deviceClassesHandle) + + t.wg.Wait() } // ListPatchedResourceSlices returns all ResourceSlices in the cluster with