diff --git a/pkg/scheduler/util/assumecache/assume_cache.go b/pkg/scheduler/util/assumecache/assume_cache.go index fae3230bc3a..928c985ee87 100644 --- a/pkg/scheduler/util/assumecache/assume_cache.go +++ b/pkg/scheduler/util/assumecache/assume_cache.go @@ -508,7 +508,11 @@ func (c *AssumeCache) AddEventHandler(handler cache.ResourceEventHandler) cache. if c.handlerRegistration == nil { // No informer, so immediately synced. - return syncedHandlerRegistration{} + s := syncedHandlerRegistration{ + synced: make(chan struct{}), + } + close(s.synced) + return s } return c.handlerRegistration @@ -557,6 +561,14 @@ func (c *AssumeCache) emitEvents() { // syncedHandlerRegistration is an implementation of ResourceEventHandlerRegistration // which always returns true. -type syncedHandlerRegistration struct{} +type syncedHandlerRegistration struct { + synced chan struct{} +} func (syncedHandlerRegistration) HasSynced() bool { return true } + +func (s syncedHandlerRegistration) HasSyncedChecker() cache.DoneChecker { return s } + +func (s syncedHandlerRegistration) Name() string { return "AssumeCache" } + +func (s syncedHandlerRegistration) Done() <-chan struct{} { return s.synced } diff --git a/staging/src/k8s.io/apiserver/pkg/admission/plugin/policy/internal/generic/controller.go b/staging/src/k8s.io/apiserver/pkg/admission/plugin/policy/internal/generic/controller.go index a94004c272d..65ffee4e2f6 100644 --- a/staging/src/k8s.io/apiserver/pkg/admission/plugin/policy/internal/generic/controller.go +++ b/staging/src/k8s.io/apiserver/pkg/admission/plugin/policy/internal/generic/controller.go @@ -21,7 +21,6 @@ import ( "errors" "fmt" "sync" - "sync/atomic" "time" kerrors "k8s.io/apimachinery/pkg/api/errors" @@ -48,10 +47,7 @@ type controller[T runtime.Object] struct { options ControllerOptions - // must hold a func() bool or nil - notificationsDelivered atomic.Value - - hasProcessed synctrack.AsyncTracker[string] + hasProcessed *synctrack.AsyncTracker[string] } type ControllerOptions struct { @@ -77,17 +73,11 @@ func NewController[T runtime.Object]( } c := &controller[T]{ - options: options, - informer: informer, - reconciler: reconciler, - queue: nil, - } - c.hasProcessed.UpstreamHasSynced = func() bool { - f := c.notificationsDelivered.Load() - if f == nil { - return false - } - return f.(func() bool)() + options: options, + informer: informer, + reconciler: reconciler, + queue: nil, + hasProcessed: synctrack.NewAsyncTracker[string](options.Name), } return c } @@ -159,12 +149,9 @@ func (c *controller[T]) Run(ctx context.Context) error { return err } - c.notificationsDelivered.Store(registration.HasSynced) - // Make sure event handler is removed from informer in case return early from // an error defer func() { - c.notificationsDelivered.Store(func() bool { return false }) // Remove event handler and Handle Error here. Error should only be raised // for improper usage of event handler API. if err := c.informer.RemoveEventHandler(registration); err != nil { @@ -174,7 +161,12 @@ func (c *controller[T]) Run(ctx context.Context) error { // Wait for initial cache list to complete before beginning to reconcile // objects. - if !cache.WaitForNamedCacheSyncWithContext(ctx, c.informer.HasSynced) { + if !cache.WaitFor(ctx, "caches", c.informer.HasSyncedChecker(), registration.HasSyncedChecker()) { + // TODO: should cache.WaitFor return an error? + // ctx.Err() or context.Cause(ctx)? + // Either of them would make dead code like the "if err == nil" + // below more obvious. + // ctx cancelled during cache sync. return early err := ctx.Err() if err == nil { @@ -184,6 +176,10 @@ func (c *controller[T]) Run(ctx context.Context) error { return err } + // c.informer *and* our handler have synced, which implies that our AddFunc(= enqueue) + // and thus c.hasProcessed.Start have been called for the initial list => upstream is done. + c.hasProcessed.UpstreamHasSynced() + waitGroup := sync.WaitGroup{} for i := uint(0); i < c.options.Workers; i++ { diff --git a/staging/src/k8s.io/client-go/tools/cache/controller.go b/staging/src/k8s.io/client-go/tools/cache/controller.go index a5d47716f70..75d2ca979f1 100644 --- a/staging/src/k8s.io/client-go/tools/cache/controller.go +++ b/staging/src/k8s.io/client-go/tools/cache/controller.go @@ -142,6 +142,11 @@ type Controller interface { // HasSynced delegates to the Config's Queue HasSynced() bool + // HasSyncedChecker enables waiting for syncing without polling. + // The returned DoneChecker can be passed to WaitFor. + // It delegates to the Config's Queue. + HasSyncedChecker() DoneChecker + // LastSyncResourceVersion delegates to the Reflector when there // is one, otherwise returns the empty string LastSyncResourceVersion() string @@ -168,11 +173,13 @@ func (c *controller) RunWithContext(ctx context.Context) { <-ctx.Done() c.config.Queue.Close() }() + logger := klog.FromContext(ctx) r := NewReflectorWithOptions( c.config.ListerWatcher, c.config.ObjectType, c.config.Queue, ReflectorOptions{ + Logger: &logger, ResyncPeriod: c.config.FullResyncPeriod, MinWatchTimeout: c.config.MinWatchTimeout, TypeDescription: c.config.ObjectDescription, @@ -206,6 +213,13 @@ func (c *controller) HasSynced() bool { return c.config.Queue.HasSynced() } +// HasSyncedChecker enables waiting for syncing without polling. +// The returned DoneChecker can be passed to [WaitFor]. +// It delegates to the Config's Queue. +func (c *controller) HasSyncedChecker() DoneChecker { + return c.config.Queue.HasSyncedChecker() +} + func (c *controller) LastSyncResourceVersion() string { c.reflectorMutex.RLock() defer c.reflectorMutex.RUnlock() @@ -591,6 +605,7 @@ func NewTransformingIndexerInformer( // Multiplexes updates in the form of a list of Deltas into a Store, and informs // a given handler of events OnUpdate, OnAdd, OnDelete func processDeltas( + logger klog.Logger, // Object which receives event notifications from the given deltas handler ResourceEventHandler, clientState Store, @@ -608,7 +623,7 @@ func processDeltas( if !ok { return fmt.Errorf("ReplacedAll did not contain ReplacedAllInfo: %T", obj) } - if err := processReplacedAllInfo(handler, info, clientState, isInInitialList, keyFunc); err != nil { + if err := processReplacedAllInfo(logger, handler, info, clientState, isInInitialList, keyFunc); err != nil { return err } case SyncAll: @@ -653,6 +668,7 @@ func processDeltas( // Returns an error if any Delta or transaction fails. For TransactionError, // only successful operations trigger callbacks. func processDeltasInBatch( + logger klog.Logger, handler ResourceEventHandler, clientState Store, deltas []Delta, @@ -666,7 +682,7 @@ func processDeltasInBatch( if !txnSupported { var errs []error for _, delta := range deltas { - if err := processDeltas(handler, clientState, Deltas{delta}, isInInitialList, keyFunc); err != nil { + if err := processDeltas(logger, handler, clientState, Deltas{delta}, isInInitialList, keyFunc); err != nil { errs = append(errs, err) } } @@ -731,7 +747,7 @@ func processDeltasInBatch( return nil } -func processReplacedAllInfo(handler ResourceEventHandler, info ReplacedAllInfo, clientState Store, isInInitialList bool, keyFunc KeyFunc) error { +func processReplacedAllInfo(logger klog.Logger, handler ResourceEventHandler, info ReplacedAllInfo, clientState Store, isInInitialList bool, keyFunc KeyFunc) error { var deletions []DeletedFinalStateUnknown type replacement struct { oldObj interface{} @@ -739,7 +755,7 @@ func processReplacedAllInfo(handler ResourceEventHandler, info ReplacedAllInfo, } replacements := make([]replacement, 0, len(info.Objects)) - err := reconcileReplacement(nil, clientState, info.Objects, keyFunc, + err := reconcileReplacement(logger, nil, clientState, info.Objects, keyFunc, func(obj DeletedFinalStateUnknown) error { deletions = append(deletions, obj) return nil @@ -792,7 +808,7 @@ func newInformer(clientState Store, options InformerOptions, keyFunc KeyFunc) Co if options.Logger != nil { logger = *options.Logger } - fifo := newQueueFIFO(logger, clientState, options.Transform, options.Identifier, options.FIFOMetricsProvider) + logger, fifo := newQueueFIFO(logger, options.ObjectType, clientState, options.Transform, options.Identifier, options.FIFOMetricsProvider) cfg := &Config{ Queue: fifo, @@ -803,21 +819,30 @@ func newInformer(clientState Store, options InformerOptions, keyFunc KeyFunc) Co Process: func(obj interface{}, isInInitialList bool) error { if deltas, ok := obj.(Deltas); ok { - return processDeltas(options.Handler, clientState, deltas, isInInitialList, keyFunc) + // This must be the logger *of the fifo*. + return processDeltas(logger, options.Handler, clientState, deltas, isInInitialList, keyFunc) } return errors.New("object given as Process argument is not Deltas") }, ProcessBatch: func(deltaList []Delta, isInInitialList bool) error { - return processDeltasInBatch(options.Handler, clientState, deltaList, isInInitialList, keyFunc) + // Same here. + return processDeltasInBatch(logger, options.Handler, clientState, deltaList, isInInitialList, keyFunc) }, } return New(cfg) } -func newQueueFIFO(logger klog.Logger, clientState Store, transform TransformFunc, identifier InformerNameAndResource, metricsProvider FIFOMetricsProvider) Queue { +// newQueueFIFO constructs a new FIFO, choosing between real and delta FIFO +// depending on the InOrderInformers feature gate. +// +// It returns the FIFO and the logger used by the FIFO. +// That logger includes the name used for the FIFO, +// in contrast to the logger which was passed in. +func newQueueFIFO(logger klog.Logger, objectType any, clientState Store, transform TransformFunc, identifier InformerNameAndResource, metricsProvider FIFOMetricsProvider) (klog.Logger, Queue) { if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) { options := RealFIFOOptions{ Logger: &logger, + Name: fmt.Sprintf("RealFIFO %T", objectType), KeyFunction: MetaNamespaceKeyFunc, Transformer: transform, Identifier: identifier, @@ -830,13 +855,16 @@ func newQueueFIFO(logger klog.Logger, clientState Store, transform TransformFunc } else { options.KnownObjects = clientState } - return NewRealFIFOWithOptions(options) + f := NewRealFIFOWithOptions(options) + return f.logger, f } else { - return NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ Logger: &logger, + Name: fmt.Sprintf("DeltaFIFO %T", objectType), KnownObjects: clientState, EmitDeltaTypeReplaced: true, Transformer: transform, }) + return f.logger, f } } diff --git a/staging/src/k8s.io/client-go/tools/cache/controller_bench_test.go b/staging/src/k8s.io/client-go/tools/cache/controller_bench_test.go index 754ab3f1236..f81a3d3f97f 100644 --- a/staging/src/k8s.io/client-go/tools/cache/controller_bench_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/controller_bench_test.go @@ -26,6 +26,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" fcache "k8s.io/client-go/tools/cache/testing" + "k8s.io/klog/v2/ktesting" ) const handlerWaitTime = time.Millisecond @@ -33,7 +34,8 @@ const handlerWaitTime = time.Millisecond func BenchmarkAddWithSlowHandlers(b *testing.B) { for _, unlockWhileProcessing := range []bool{false, true} { b.Run(fmt.Sprintf("unlockWhileProcessing=%t", unlockWhileProcessing), func(b *testing.B) { - ctx, cancel := context.WithCancel(context.Background()) + logger, ctx := ktesting.NewTestContext(b) + ctx, cancel := context.WithCancel(ctx) source := fcache.NewFakeControllerSource() b.Cleanup(func() { cancel() @@ -65,12 +67,12 @@ func BenchmarkAddWithSlowHandlers(b *testing.B) { Process: func(obj interface{}, isInInitialList bool) error { if deltas, ok := obj.(Deltas); ok { - return processDeltas(handler, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) + return processDeltas(logger, handler, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) } return errors.New("object given as Process argument is not Deltas") }, ProcessBatch: func(deltaList []Delta, isInInitialList bool) error { - return processDeltasInBatch(handler, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) + return processDeltasInBatch(logger, handler, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) }, } c := New(cfg) diff --git a/staging/src/k8s.io/client-go/tools/cache/controller_test.go b/staging/src/k8s.io/client-go/tools/cache/controller_test.go index c4ee71f477e..9528dc6caec 100644 --- a/staging/src/k8s.io/client-go/tools/cache/controller_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/controller_test.go @@ -834,6 +834,7 @@ func TestProcessDeltasInBatch(t *testing.T) { } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { + logger, _ := ktesting.NewTestContext(t) mockStore := &mockTxnStore{ Store: NewStore(MetaNamespaceKeyFunc), failingObjs: tc.failingObjects, @@ -851,6 +852,7 @@ func TestProcessDeltasInBatch(t *testing.T) { }, } err := processDeltasInBatch( + logger, dummyListener, mockStore, tc.deltaList, @@ -929,12 +931,12 @@ func TestReplaceEvents(t *testing.T) { Process: func(obj interface{}, isInInitialList bool) error { if deltas, ok := obj.(Deltas); ok { - return processDeltas(recorder, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) + return processDeltas(fifo.logger, recorder, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) } return errors.New("object given as Process argument is not Deltas") }, ProcessBatch: func(deltaList []Delta, isInInitialList bool) error { - return processDeltasInBatch(recorder, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) + return processDeltasInBatch(fifo.logger, recorder, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) }, } c := New(cfg) @@ -1066,12 +1068,12 @@ func TestResetWatch(t *testing.T) { Process: func(obj interface{}, isInInitialList bool) error { if deltas, ok := obj.(Deltas); ok { - return processDeltas(recorder, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) + return processDeltas(fifo.logger, recorder, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) } return errors.New("object given as Process argument is not Deltas") }, ProcessBatch: func(deltaList []Delta, isInInitialList bool) error { - return processDeltasInBatch(recorder, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) + return processDeltasInBatch(fifo.logger, recorder, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc) }, } c := New(cfg) diff --git a/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go b/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go index 12157dad5c6..b57cc472831 100644 --- a/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go +++ b/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go @@ -126,6 +126,11 @@ type DeltaFIFO struct { // A key is in `queue` if and only if it is in `items`. queue []string + // synced is initially an open channel. It gets closed (once!) by checkSynced_locked + // as soon as the initial sync is considered complete. + synced chan struct{} + syncedClosed bool + // populated is true if the first batch of items inserted by Replace() has been populated // or Delete/Add/Update/AddIfNotPresent was called first. populated bool @@ -272,6 +277,7 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO { f := &DeltaFIFO{ logger: klog.Background(), name: "DeltaFIFO", + synced: make(chan struct{}), items: map[string]Deltas{}, queue: []string{}, keyFunc: opts.KeyFunction, @@ -283,8 +289,8 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO { if opts.Logger != nil { f.logger = *opts.Logger } - if opts.Name != "" { - f.name = opts.Name + if name := opts.Name; name != "" { + f.name = name } f.logger = klog.LoggerWithName(f.logger, f.name) f.cond.L = &f.lock @@ -294,6 +300,7 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO { var ( _ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue _ = TransformingStore(&DeltaFIFO{}) // DeltaFIFO implements TransformingStore to allow memory optimizations + _ = DoneChecker(&DeltaFIFO{}) // DeltaFIFO implements DoneChecker. ) var ( @@ -339,8 +346,36 @@ func (f *DeltaFIFO) HasSynced() bool { return f.hasSynced_locked() } +// HasSyncedChecker is done if an Add/Update/Delete/AddIfNotPresent are called first, +// or the first batch of items inserted by Replace() has been popped. +func (f *DeltaFIFO) HasSyncedChecker() DoneChecker { + return f +} + +// Name implements [DoneChecker.Name] +func (f *DeltaFIFO) Name() string { + return f.name +} + +// Done implements [DoneChecker.Done] +func (f *DeltaFIFO) Done() <-chan struct{} { + return f.synced +} + +// hasSynced_locked returns the result of a prior checkSynced_locked call. func (f *DeltaFIFO) hasSynced_locked() bool { - return f.populated && f.initialPopulationCount == 0 + return f.syncedClosed +} + +// checkSynced_locked checks whether the initial is completed. +// It must be called whenever populated or initialPopulationCount change. +func (f *DeltaFIFO) checkSynced_locked() { + synced := f.populated && f.initialPopulationCount == 0 + if synced && !f.syncedClosed { + // Initial sync is complete. + f.syncedClosed = true + close(f.synced) + } } // Add inserts an item, and puts it in the queue. The item is only enqueued @@ -349,6 +384,7 @@ func (f *DeltaFIFO) Add(obj interface{}) error { f.lock.Lock() defer f.lock.Unlock() f.populated = true + f.checkSynced_locked() return f.queueActionLocked(Added, obj) } @@ -357,6 +393,7 @@ func (f *DeltaFIFO) Update(obj interface{}) error { f.lock.Lock() defer f.lock.Unlock() f.populated = true + f.checkSynced_locked() return f.queueActionLocked(Updated, obj) } @@ -373,6 +410,7 @@ func (f *DeltaFIFO) Delete(obj interface{}) error { f.lock.Lock() defer f.lock.Unlock() f.populated = true + f.checkSynced_locked() if f.knownObjects == nil { if _, exists := f.items[id]; !exists { // Presumably, this was deleted when a relist happened. @@ -538,6 +576,7 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { depth := len(f.queue) if f.initialPopulationCount > 0 { f.initialPopulationCount-- + f.checkSynced_locked() } item, ok := f.items[id] if !ok { @@ -650,6 +689,7 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error { if !f.populated { f.populated = true f.initialPopulationCount = keys.Len() + queuedDeletions + f.checkSynced_locked() } return nil diff --git a/staging/src/k8s.io/client-go/tools/cache/event_handler_name.go b/staging/src/k8s.io/client-go/tools/cache/event_handler_name.go new file mode 100644 index 00000000000..9489f9b47ba --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/cache/event_handler_name.go @@ -0,0 +1,119 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "fmt" + "reflect" + "runtime" + "strings" +) + +func nameForHandler(handler ResourceEventHandler) (name string) { + defer func() { + // Last resort: let Sprintf handle it. + if name == "" { + name = fmt.Sprintf("%T", handler) + } + }() + + if handler == nil { + return "" + } + switch handler := handler.(type) { + case *ResourceEventHandlerFuncs: + return nameForHandlerFuncs(*handler) + case ResourceEventHandlerFuncs: + return nameForHandlerFuncs(handler) + default: + // We can use the fully qualified name of whatever + // provides the interface. We don't care whether + // it contains fields or methods which provide + // the interface methods. + value := reflect.ValueOf(handler) + if value.Type().Kind() == reflect.Interface { + // Probably not needed, but let's play it safe. + value = value.Elem() + } + if value.Type().Kind() == reflect.Pointer { + value = value.Elem() + } + name := value.Type().PkgPath() + if name != "" { + name += "." + } + if typeName := value.Type().Name(); typeName != "" { + name += typeName + } + return name + } +} + +func nameForHandlerFuncs(funcs ResourceEventHandlerFuncs) string { + return nameForFunctions(funcs.AddFunc, funcs.UpdateFunc, funcs.DeleteFunc) +} + +func nameForFunctions(fs ...any) string { + // If all functions are defined in the same place, then we + // don't care about the actual function name in + // e.g. "main.FuncName" or "main.(*Foo).FuncName-fm", instead + // we use the common qualifier. + // + // But we don't know that yet, so we also collect all names. + var qualifier string + singleQualifier := true + var names []string + for _, f := range fs { + if f == nil { + continue + } + name := nameForFunction(f) + if name == "" { + continue + } + names = append(names, name) + + newQualifier := name + index := strings.LastIndexByte(newQualifier, '.') + if index > 0 { + newQualifier = newQualifier[:index] + } + switch qualifier { + case "": + qualifier = newQualifier + case newQualifier: + // So far, so good... + default: + // Nope, different. + singleQualifier = false + } + } + + if singleQualifier { + return qualifier + } + + return strings.Join(names, "+") +} + +func nameForFunction(f any) string { + fn := runtime.FuncForPC(reflect.ValueOf(f).Pointer()) + if fn == nil { + return "" + } + return fn.Name() +} diff --git a/staging/src/k8s.io/client-go/tools/cache/event_handler_name_test.go b/staging/src/k8s.io/client-go/tools/cache/event_handler_name_test.go new file mode 100644 index 00000000000..d17f66a0866 --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/cache/event_handler_name_test.go @@ -0,0 +1,87 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "testing" +) + +type mockHandler struct{} + +func (m mockHandler) OnAdd(any, bool) {} +func (m mockHandler) OnUpdate(any, any) {} +func (m mockHandler) OnDelete(any) {} + +func TestNameForHandler(t *testing.T) { + emptyHandler := ResourceEventHandlerFuncs{} + + for name, tc := range map[string]struct { + handler ResourceEventHandler + wantName string + }{ + "mixture": { + handler: ResourceEventHandlerFuncs{ + UpdateFunc: emptyHandler.OnUpdate, + DeleteFunc: func(any) {}, + }, + wantName: "k8s.io/client-go/tools/cache.ResourceEventHandlerFuncs.OnUpdate-fm+k8s.io/client-go/tools/cache.TestNameForHandler.func1", // Testcase must come first to get func1. + }, + "add": { + handler: ResourceEventHandlerFuncs{AddFunc: func(any) {}}, + wantName: "k8s.io/client-go/tools/cache.TestNameForHandler", + }, + "update": { + handler: ResourceEventHandlerFuncs{UpdateFunc: func(any, any) {}}, + wantName: "k8s.io/client-go/tools/cache.TestNameForHandler", + }, + "delete": { + handler: ResourceEventHandlerFuncs{DeleteFunc: func(any) {}}, + wantName: "k8s.io/client-go/tools/cache.TestNameForHandler", + }, + "all": { + handler: ResourceEventHandlerFuncs{ + AddFunc: func(any) {}, + UpdateFunc: func(any, any) {}, + DeleteFunc: func(any) {}, + }, + wantName: "k8s.io/client-go/tools/cache.TestNameForHandler", + }, + "ptrToFuncs": { + handler: &ResourceEventHandlerFuncs{AddFunc: func(any) {}}, + wantName: "k8s.io/client-go/tools/cache.TestNameForHandler", + }, + "struct": { + handler: mockHandler{}, + wantName: "k8s.io/client-go/tools/cache.mockHandler", + }, + "ptrToStruct": { + handler: &mockHandler{}, + wantName: "k8s.io/client-go/tools/cache.mockHandler", + }, + "nil": { + handler: nil, + wantName: "", + }, + } { + t.Run(name, func(t *testing.T) { + gotName := nameForHandler(tc.handler) + if gotName != tc.wantName { + t.Errorf("Got name:\n %s\nWanted name:\n %s", gotName, tc.wantName) + } + }) + } +} diff --git a/staging/src/k8s.io/client-go/tools/cache/fifo.go b/staging/src/k8s.io/client-go/tools/cache/fifo.go index 44d006b6987..e31e2d27039 100644 --- a/staging/src/k8s.io/client-go/tools/cache/fifo.go +++ b/staging/src/k8s.io/client-go/tools/cache/fifo.go @@ -58,6 +58,12 @@ type Queue interface { // Update, or Delete; otherwise the first batch is empty. HasSynced() bool + // HasSyncedChecker is done once the first batch of keys have all been + // popped. The first batch of keys are those of the first Replace + // operation if that happened before any Add, AddIfNotPresent, + // Update, or Delete; otherwise the first batch is empty. + HasSyncedChecker() DoneChecker + // Close the queue Close() } @@ -110,6 +116,11 @@ type FIFO struct { items map[string]interface{} queue []string + // synced is initially an open channel. It gets closed (once!) by checkSynced + // as soon as the initial sync is considered complete. + synced chan struct{} + syncedClosed bool + // populated is true if the first batch of items inserted by Replace() has been populated // or Delete/Add/Update was called first. populated bool @@ -127,7 +138,8 @@ type FIFO struct { } var ( - _ = Queue(&FIFO{}) // FIFO is a Queue + _ = Queue(&FIFO{}) // FIFO is a Queue + _ = DoneChecker(&FIFO{}) // ... and implements DoneChecker. ) // Close the queue. @@ -146,8 +158,36 @@ func (f *FIFO) HasSynced() bool { return f.hasSynced_locked() } +// HasSyncedChecker is done if an Add/Update/Delete/AddIfNotPresent are called first, +// or the first batch of items inserted by Replace() has been popped. +func (f *FIFO) HasSyncedChecker() DoneChecker { + return f +} + +// Name implements [DoneChecker.Name] +func (f *FIFO) Name() string { + return "FIFO" // FIFO doesn't seem to be used outside of a few tests, so changing the NewFIFO API to pass in a name doesn't seem worth it. +} + +// Done implements [DoneChecker.Done] +func (f *FIFO) Done() <-chan struct{} { + return f.synced +} + +// hasSynced_locked returns the result of a prior checkSynced call. func (f *FIFO) hasSynced_locked() bool { - return f.populated && f.initialPopulationCount == 0 + return f.syncedClosed +} + +// checkSynced checks whether the initial sync is completed. +// It must be called whenever populated or initialPopulationCount change +// while the mutex is still locked. +func (f *FIFO) checkSynced() { + synced := f.populated && f.initialPopulationCount == 0 + if synced && !f.syncedClosed { + f.syncedClosed = true + close(f.synced) + } } // Add inserts an item, and puts it in the queue. The item is only enqueued @@ -160,6 +200,7 @@ func (f *FIFO) Add(obj interface{}) error { f.lock.Lock() defer f.lock.Unlock() f.populated = true + f.checkSynced() if _, exists := f.items[id]; !exists { f.queue = append(f.queue, id) } @@ -184,6 +225,7 @@ func (f *FIFO) Delete(obj interface{}) error { f.lock.Lock() defer f.lock.Unlock() f.populated = true + f.checkSynced() delete(f.items, id) return err } @@ -220,6 +262,8 @@ func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) { f.queue = f.queue[1:] if f.initialPopulationCount > 0 { f.initialPopulationCount-- + // Must be done *after* process has completed. + defer f.checkSynced() } item, ok := f.items[id] if !ok { @@ -252,6 +296,7 @@ func (f *FIFO) Replace(list []interface{}, resourceVersion string) error { if !f.populated { f.populated = true f.initialPopulationCount = len(items) + f.checkSynced() } f.items = items @@ -290,6 +335,7 @@ func (f *FIFO) Resync() error { // process. func NewFIFO(keyFunc KeyFunc) *FIFO { f := &FIFO{ + synced: make(chan struct{}), items: map[string]interface{}{}, queue: []string{}, keyFunc: keyFunc, diff --git a/staging/src/k8s.io/client-go/tools/cache/processor_listener_test.go b/staging/src/k8s.io/client-go/tools/cache/processor_listener_test.go index 517258a160d..cd8c5c739bf 100644 --- a/staging/src/k8s.io/client-go/tools/cache/processor_listener_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/processor_listener_test.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "context" "sync" "testing" "time" @@ -29,6 +30,25 @@ const ( concurrencyLevel = 5 ) +type mockSynced struct { + context.Context + cancel func() +} + +func newMockSynced(tb testing.TB, synced bool) *mockSynced { + m := &mockSynced{} + m.Context, m.cancel = context.WithCancel(context.Background()) + if synced { + m.cancel() + } + tb.Cleanup(m.cancel) + return m +} + +func (m *mockSynced) Name() string { + return "mock" +} + func BenchmarkListener(b *testing.B) { var notification addNotification @@ -40,7 +60,7 @@ func BenchmarkListener(b *testing.B) { AddFunc: func(obj interface{}) { swg.Done() }, - }, 0, 0, time.Now(), 1024*1024, func() bool { return true }) + }, 0, 0, time.Now(), 1024*1024, newMockSynced(b, true)) var wg wait.Group defer wg.Wait() // Wait for .run and .pop to stop defer close(pl.addCh) // Tell .run and .pop to stop diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector_data_consistency_detector_test.go b/staging/src/k8s.io/client-go/tools/cache/reflector_data_consistency_detector_test.go index f6670a42c0e..fc9c1529e19 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector_data_consistency_detector_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector_data_consistency_detector_test.go @@ -73,7 +73,7 @@ func runTestReflectorDataConsistencyDetector(t *testing.T, transformer Transform defer cancel() store := NewStore(MetaNamespaceKeyFunc) - fifo := newQueueFIFO(logger, store, transformer, InformerNameAndResource{}, nil) + _, fifo := newQueueFIFO(logger, nil, store, transformer, InformerNameAndResource{}, nil) lw := &ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go index 45daafa6177..503cd2e2dac 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "slices" "sync" "sync/atomic" "time" @@ -194,6 +195,14 @@ type SharedInformer interface { // For that, please call HasSynced on the handle returned by // AddEventHandler. HasSynced() bool + // HasSyncedChecker completes if the shared informer's store has been + // informed by at least one full LIST of the authoritative state + // of the informer's object collection. This is unrelated to "resync". + // + // Note that this doesn't tell you if an individual handler is synced!! + // For that, please use HasSyncedChecker on the handle returned by + // AddEventHandler. + HasSyncedChecker() DoneChecker // LastSyncResourceVersion is the resource version observed when last synced with the underlying // store. The value returned is not synchronized with access to the underlying store and is not // thread-safe. @@ -247,6 +256,10 @@ type ResourceEventHandlerRegistration interface { // HasSynced reports if both the parent has synced and all pre-sync // events have been delivered. HasSynced() bool + + // HasSyncedChecker reports if both the parent has synced and all pre-sync + // events have been delivered. + HasSyncedChecker() DoneChecker } // Optional configuration options for [SharedInformer.AddEventHandlerWithOptions]. @@ -309,6 +322,7 @@ func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.O return &sharedIndexInformer{ indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers), processor: processor, + synced: make(chan struct{}), listerWatcher: lw, objectType: exampleObject, objectDescription: options.ObjectDescription, @@ -414,6 +428,107 @@ func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool return true } +// WaitFor waits for a set of activities to complete, like cache syncing. +// It returns true if it was successful, false if the context was canceled +// before all activities are completed. +// +// If a non-nil "what" is provided, then progress information is logged +// while waiting ("Waiting", for=""). +// +// In contrast to other WaitForCacheSync alternatives, this one here doesn't +// need polling, which makes it react immediately. When used in a synctest unit +// test, waiting completes without moving time forward randomly, which +// makes tests more predictable. +func WaitFor(ctx context.Context, what string, checkers ...DoneChecker) bool { + logger := klog.FromContext(ctx) + if what != "" { + helper, l := logger.WithCallStackHelper() + logger = l + helper() + logger.Info("Waiting", "for", what) + } + + // Check in parallel to ensure that we log "Done waiting" as soon + // as possible for each checker. The timing may be useful to know. + // We cannot log inside the goroutine, the stack unwinding wouldn't + // work, so instead each goroutine just notifies the parent + // goroutine when it's checker is done and the main goroutine then + // logs it. + var wg sync.WaitGroup + type result struct { + checker DoneChecker + done bool + } + doneChecker := make(chan result) + for _, checker := range checkers { + wg.Go(func() { + select { + case <-checker.Done(): + doneChecker <- result{checker, true} + case <-ctx.Done(): + // We can end up here even when the checker is already done, + // select is not deterministic. Check once more without blocking + // before finally giving up. + select { + case <-checker.Done(): + doneChecker <- result{checker, true} + default: + doneChecker <- result{checker, false} + } + } + }) + } + instances := make([]string, 0, len(checkers)) + for range len(checkers) { + // We are guaranteed to get exactly one result from each goroutine, so this won't block forever. + result := <-doneChecker + if result.done { + if what != "" { + logger.Info("Done waiting", "for", what, "instance", result.checker.Name()) + } + } else { + // We don't need this information unless we are a) logging or b) debugging interactively. + instances = append(instances, result.checker.Name()) + } + } + wg.Wait() + + if what != "" && len(instances) > 0 { + slices.Sort(instances) + logger.Info("Timed out waiting", "for", what, "cause", context.Cause(ctx), "instances", instances) + } + + done := len(instances) == 0 + return done +} + +// DoneChecker, in contrast to [InformerSynced], supports waiting +// for some activity to finish without polling and has a name +// that describes itself. +// +// To check for completion without blocking, use [IsDone]. +type DoneChecker interface { + // Name returns a string describing the entity that is being waited for. + // + // Note that this name might be computed, so callers should only + // get the name outside of a hot code path. + Name() string + + // Done returns a channel that will be closed on completion + // of the activity. + Done() <-chan struct{} +} + +// IsDone returns true if the activity is done, false otherwise. +func IsDone(checker DoneChecker) bool { + select { + case <-checker.Done(): + return true + default: + return false + } +} + // `*sharedIndexInformer` implements SharedIndexInformer and has three // main components. One is an indexed local cache, `indexer Indexer`. // The second main component is a Controller that pulls @@ -431,6 +546,10 @@ type sharedIndexInformer struct { indexer Indexer controller Controller + // synced gets created when creating the sharedIndexInformer. + // It gets closed when Run detects that the processor created + synced chan struct{} + processor *sharedProcessor cacheMutationDetector MutationDetector @@ -494,6 +613,10 @@ func (v *dummyController) HasSynced() bool { return v.informer.HasSynced() } +func (v *dummyController) HasSyncedChecker() DoneChecker { + return v.informer.HasSyncedChecker() +} + func (v *dummyController) LastSyncResourceVersion() string { if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InformerResourceVersion) { return v.informer.LastSyncResourceVersion() @@ -563,7 +686,7 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) { s.startedLock.Lock() defer s.startedLock.Unlock() - fifo := newQueueFIFO(logger, s.indexer, s.transform, s.identifier, s.fifoMetricsProvider) + logger, fifo := newQueueFIFO(logger, s.objectType, s.indexer, s.transform, s.identifier, s.fifoMetricsProvider) cfg := &Config{ Queue: fifo, @@ -573,8 +696,12 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) { FullResyncPeriod: s.resyncCheckPeriod, ShouldResync: s.processor.shouldResync, - Process: s.HandleDeltas, - ProcessBatch: s.HandleBatchDeltas, + Process: func(obj interface{}, isInInitialList bool) error { + return s.handleDeltas(logger, obj, isInInitialList) + }, + ProcessBatch: func(deltas []Delta, isInInitialList bool) error { + return s.handleBatchDeltas(logger, deltas, isInInitialList) + }, WatchErrorHandlerWithContext: s.watchErrorHandler, } @@ -594,6 +721,15 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) { // has a RunWithContext method that we can use here. wg.StartWithChannel(processorStopCtx.Done(), s.cacheMutationDetector.Run) wg.StartWithContext(processorStopCtx, s.processor.run) + wg.Start(func() { + select { + case <-ctx.Done(): + // We were stopped without completing the sync. + case <-s.controller.HasSyncedChecker().Done(): + // Controller has synced and thus so have we. + close(s.synced) + } + }) defer func() { s.startedLock.Lock() @@ -610,13 +746,31 @@ func (s *sharedIndexInformer) HasStarted() bool { } func (s *sharedIndexInformer) HasSynced() bool { - s.startedLock.Lock() - defer s.startedLock.Unlock() - - if s.controller == nil { + select { + case <-s.synced: + return true + default: return false } - return s.controller.HasSynced() +} + +func (s *sharedIndexInformer) HasSyncedChecker() DoneChecker { + return &sharedIndexInformerDone{ + s: s, + } +} + +// sharedIndexInformerDone implements [NamedCacheSync] for a [sharedIndexInformer]. +type sharedIndexInformerDone struct { + s *sharedIndexInformer +} + +func (sd *sharedIndexInformerDone) Name() string { + return fmt.Sprintf("SharedIndexInformer %T", sd.s.objectType) +} + +func (sd *sharedIndexInformerDone) Done() <-chan struct{} { + return sd.s.synced } func (s *sharedIndexInformer) LastSyncResourceVersion() string { @@ -708,7 +862,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithOptions(handler ResourceEventHa } } - listener := newProcessListener(logger, handler, resyncPeriod, determineResyncPeriod(logger, resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSynced) + listener := newProcessListener(logger, handler, resyncPeriod, determineResyncPeriod(logger, resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSyncedChecker()) if !s.started { return s.processor.addListener(listener), nil @@ -737,20 +891,20 @@ func (s *sharedIndexInformer) AddEventHandlerWithOptions(handler ResourceEventHa return handle, nil } -func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool) error { +func (s *sharedIndexInformer) handleDeltas(logger klog.Logger, obj interface{}, isInInitialList bool) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() if deltas, ok := obj.(Deltas); ok { - return processDeltas(s, s.indexer, deltas, isInInitialList, s.keyFunc) + return processDeltas(logger, s, s.indexer, deltas, isInInitialList, s.keyFunc) } return errors.New("object given as Process argument is not Deltas") } -func (s *sharedIndexInformer) HandleBatchDeltas(deltas []Delta, isInInitialList bool) error { +func (s *sharedIndexInformer) handleBatchDeltas(logger klog.Logger, deltas []Delta, isInInitialList bool) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() - return processDeltasInBatch(s, s.indexer, deltas, isInInitialList, s.keyFunc) + return processDeltasInBatch(logger, s, s.indexer, deltas, isInInitialList, s.keyFunc) } // Conforms to ResourceEventHandler @@ -854,6 +1008,7 @@ func (p *sharedProcessor) addListener(listener *processorListener) ResourceEvent p.listeners[listener] = true if p.listenersStarted { + p.wg.Start(listener.watchSynced) p.wg.Start(listener.run) p.wg.Start(listener.pop) } @@ -925,6 +1080,7 @@ func (p *sharedProcessor) run(ctx context.Context) { p.listenersLock.Lock() defer p.listenersLock.Unlock() for listener := range p.listeners { + p.wg.Start(listener.watchSynced) p.wg.Start(listener.run) p.wg.Start(listener.pop) } @@ -986,7 +1142,7 @@ func (p *sharedProcessor) resyncCheckPeriodChanged(logger klog.Logger, resyncChe } // processorListener relays notifications from a sharedProcessor to -// one ResourceEventHandler --- using two goroutines, two unbuffered +// one ResourceEventHandler --- using three goroutines, two unbuffered // channels, and an unbounded ring buffer. The `add(notification)` // function sends the given notification to `addCh`. One goroutine // runs `pop()`, which pumps notifications from `addCh` to `nextCh` @@ -994,16 +1150,23 @@ func (p *sharedProcessor) resyncCheckPeriodChanged(logger klog.Logger, resyncChe // Another goroutine runs `run()`, which receives notifications from // `nextCh` and synchronously invokes the appropriate handler method. // +// The third goroutine watches the upstream "has synced" channel +// and notifies a SingleFileTracker instance. That instance then +// combines the upstream state and the processListener state to +// implement the overall "event handler has synced". +// // processorListener also keeps track of the adjusted requested resync // period of the listener. type processorListener struct { logger klog.Logger nextCh chan interface{} addCh chan interface{} + done chan struct{} handler ResourceEventHandler - syncTracker *synctrack.SingleFileTracker + syncTracker *synctrack.SingleFileTracker + upstreamHasSynced DoneChecker // pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed. // There is one per listener, but a failing/stalled listener will have infinite pendingNotifications @@ -1041,13 +1204,21 @@ func (p *processorListener) HasSynced() bool { return p.syncTracker.HasSynced() } -func newProcessListener(logger klog.Logger, handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced func() bool) *processorListener { +// HasNamedSync is done if the source informer has synced, and all +// corresponding events have been delivered. +func (p *processorListener) HasSyncedChecker() DoneChecker { + return p.syncTracker +} + +func newProcessListener(logger klog.Logger, handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced DoneChecker) *processorListener { ret := &processorListener{ logger: logger, nextCh: make(chan interface{}), addCh: make(chan interface{}), + done: make(chan struct{}), + upstreamHasSynced: hasSynced, handler: handler, - syncTracker: &synctrack.SingleFileTracker{UpstreamHasSynced: hasSynced}, + syncTracker: synctrack.NewSingleFileTracker(fmt.Sprintf("%s + event handler %s", hasSynced.Name(), nameForHandler(handler))), pendingNotifications: *buffer.NewRingGrowing(bufferSize), requestedResyncPeriod: requestedResyncPeriod, resyncPeriod: resyncPeriod, @@ -1068,6 +1239,7 @@ func (p *processorListener) add(notification interface{}) { func (p *processorListener) pop() { defer utilruntime.HandleCrashWithLogger(p.logger) defer close(p.nextCh) // Tell .run() to stop + defer close(p.done) // Tell .watchSynced() to stop var nextCh chan<- interface{} var notification interface{} @@ -1131,6 +1303,16 @@ func (p *processorListener) run() { } } +func (p *processorListener) watchSynced() { + select { + case <-p.upstreamHasSynced.Done(): + // Notify tracker that the upstream has synced. + p.syncTracker.UpstreamHasSynced() + case <-p.done: + // Give up waiting for sync. + } +} + // shouldResync determines if the listener needs a resync. If the listener's resyncPeriod is 0, // this always returns false. func (p *processorListener) shouldResync(now time.Time) bool { diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go index 0b40315a8c2..835e6debdcb 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go @@ -1134,8 +1134,13 @@ func TestAddWhileActive(t *testing.T) { return } - if !handle1.HasSynced() { - t.Error("Not synced after Run??") + select { + case <-handle1.HasSyncedChecker().Done(): + if !handle1.HasSynced() { + t.Error("Not synced after channel said we are synced??") + } + case <-time.After(10 * time.Second): + t.Error("Not synced 10 seconds after Run??") } listener2.lock.Lock() // ensure we observe it before it has synced diff --git a/staging/src/k8s.io/client-go/tools/cache/synctrack/synctrack.go b/staging/src/k8s.io/client-go/tools/cache/synctrack/synctrack.go index 3fa2beb6b71..e941dafb85a 100644 --- a/staging/src/k8s.io/client-go/tools/cache/synctrack/synctrack.go +++ b/staging/src/k8s.io/client-go/tools/cache/synctrack/synctrack.go @@ -20,6 +20,7 @@ limitations under the License. package synctrack import ( + "context" "sync" "sync/atomic" @@ -27,11 +28,32 @@ import ( ) // AsyncTracker helps propagate HasSynced in the face of multiple worker threads. +// The user has to monitor the upstream "has synced" +// and notify the tracker when that changes from false to true. type AsyncTracker[T comparable] struct { - UpstreamHasSynced func() bool + // name describes the instance. + name string + + // upstreamHasSynced is changed from false (initial value) to true + // when UpstreamHasSynced is called. + upstreamHasSynced atomic.Bool lock sync.Mutex waiting sets.Set[T] + + // synced gets canceled once both the tracker and upstream are synced. + // A context is convenient for this because it gives us a channel + // and handles thread-safety. + synced context.Context + cancel func() +} + +func NewAsyncTracker[T comparable](name string) *AsyncTracker[T] { + t := &AsyncTracker[T]{ + name: name, + } + t.synced, t.cancel = context.WithCancel(context.Background()) + return t } // Start should be called prior to processing each key which is part of the @@ -57,6 +79,28 @@ func (t *AsyncTracker[T]) Finished(key T) { if t.waiting != nil { t.waiting.Delete(key) } + + // Maybe synced now? + if t.upstreamHasSynced.Load() && len(t.waiting) == 0 { + // Mark as synced. + t.cancel() + } +} + +// UpstreamHasSynced needs to be called at least once as soon as +// the upstream "has synced" becomes true. It tells AsyncTracker +// that the source is synced. +// +// Must be called after handing over the initial list to Start. +func (t *AsyncTracker[T]) UpstreamHasSynced() { + // Upstream is done, but we might not be yet. + t.upstreamHasSynced.Store(true) + t.lock.Lock() + defer t.lock.Unlock() + if len(t.waiting) == 0 { + // Mark as synced. + t.cancel() + } } // HasSynced returns true if the source is synced and every key present in the @@ -64,27 +108,51 @@ func (t *AsyncTracker[T]) Finished(key T) { // itself synced until *after* it has delivered the notification for the last // key, and that notification handler must have called Start. func (t *AsyncTracker[T]) HasSynced() bool { - // Call UpstreamHasSynced first: it might take a lock, which might take - // a significant amount of time, and we can't hold our lock while - // waiting on that or a user is likely to get a deadlock. - if !t.UpstreamHasSynced() { - return false - } - t.lock.Lock() - defer t.lock.Unlock() - return t.waiting.Len() == 0 + return t.synced.Err() != nil +} + +// Done returns a channel that is closed if the source is synced and every key present in the +// initial list has been processed. This relies on the source not considering +// itself synced until *after* it has delivered the notification for the last +// key, and that notification handler must have called Start. +func (t *AsyncTracker[T]) Done() <-chan struct{} { + return t.synced.Done() +} + +func (t *AsyncTracker[T]) Name() string { + return t.name } // SingleFileTracker helps propagate HasSynced when events are processed in -// order (i.e. via a queue). +// order (i.e. via a queue). The user has to monitor the upstream "has synced" +// and notify the tracker when that changes from false to true. type SingleFileTracker struct { + // name describes the instance. + name string + // Important: count is used with atomic operations so it must be 64-bit // aligned, otherwise atomic operations will panic. Having it at the top of // the struct will guarantee that, even on 32-bit arches. // See https://pkg.go.dev/sync/atomic#pkg-note-BUG for more information. count int64 - UpstreamHasSynced func() bool + // upstreamHasSynced is changed from false (initial value) to true + // when UpstreamHasSynced is called. + upstreamHasSynced atomic.Bool + + // synced gets canceled once both the tracker and upstream are synced. + // A context is convenient for this because it gives us a channel + // and handles thread-safety. + synced context.Context + cancel func() +} + +func NewSingleFileTracker(name string) *SingleFileTracker { + t := &SingleFileTracker{ + name: name, + } + t.synced, t.cancel = context.WithCancel(context.Background()) + return t } // Start should be called prior to processing each key which is part of the @@ -103,6 +171,26 @@ func (t *SingleFileTracker) Finished() { if result < 0 { panic("synctrack: negative counter; this logic error means HasSynced may return incorrect value") } + + // Maybe synced now? + if result == 0 && t.upstreamHasSynced.Load() { + // Mark as synced. + t.cancel() + } +} + +// UpstreamHasSynced needs to be called at least once as soon as +// the upstream "has synced" becomes true. It tells SingleFileTracker +// that the source is synced. +// +// Must be called after handing over the initial list to Start. +func (t *SingleFileTracker) UpstreamHasSynced() { + // Upstream is done, but we might not be yet. + t.upstreamHasSynced.Store(true) + if atomic.LoadInt64(&t.count) == 0 { + // Mark as synced. + t.cancel() + } } // HasSynced returns true if the source is synced and every key present in the @@ -110,11 +198,17 @@ func (t *SingleFileTracker) Finished() { // itself synced until *after* it has delivered the notification for the last // key, and that notification handler must have called Start. func (t *SingleFileTracker) HasSynced() bool { - // Call UpstreamHasSynced first: it might take a lock, which might take - // a significant amount of time, and we don't want to then act on a - // stale count value. - if !t.UpstreamHasSynced() { - return false - } - return atomic.LoadInt64(&t.count) <= 0 + return t.synced.Err() != nil +} + +// Done returns a channel that is closed if the source is synced and every key present in the +// initial list has been processed. This relies on the source not considering +// itself synced until *after* it has delivered the notification for the last +// key, and that notification handler must have called Start. +func (t *SingleFileTracker) Done() <-chan struct{} { + return t.synced.Done() +} + +func (t *SingleFileTracker) Name() string { + return t.name } diff --git a/staging/src/k8s.io/client-go/tools/cache/synctrack/synctrack_test.go b/staging/src/k8s.io/client-go/tools/cache/synctrack/synctrack_test.go index 4cf089e225c..bff8838e8ae 100644 --- a/staging/src/k8s.io/client-go/tools/cache/synctrack/synctrack_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/synctrack/synctrack_test.go @@ -19,29 +19,24 @@ package synctrack import ( "strings" "sync" - "time" "testing" ) -func testSingleFileFuncs(upstreamHasSynced func() bool) (start func(), finished func(), hasSynced func() bool) { - tracker := SingleFileTracker{ - UpstreamHasSynced: upstreamHasSynced, - } - return tracker.Start, tracker.Finished, tracker.HasSynced +func testSingleFileFuncs() (upstreamHasSynced func(), start func(), finished func(), hasSynced func() bool, synced <-chan struct{}) { + tracker := NewSingleFileTracker("") + return tracker.UpstreamHasSynced, tracker.Start, tracker.Finished, tracker.HasSynced, tracker.Done() } -func testAsyncFuncs(upstreamHasSynced func() bool) (start func(), finished func(), hasSynced func() bool) { - tracker := AsyncTracker[string]{ - UpstreamHasSynced: upstreamHasSynced, - } - return func() { tracker.Start("key") }, func() { tracker.Finished("key") }, tracker.HasSynced +func testAsyncFuncs() (upstreamHasSynced func(), start func(), finished func(), hasSynced func() bool, synced <-chan struct{}) { + tracker := NewAsyncTracker[string]("") + return tracker.UpstreamHasSynced, func() { tracker.Start("key") }, func() { tracker.Finished("key") }, tracker.HasSynced, tracker.Done() } func TestBasicLogic(t *testing.T) { table := []struct { name string - construct func(func() bool) (func(), func(), func() bool) + construct func() (func(), func(), func(), func() bool, <-chan struct{}) }{ {"SingleFile", testSingleFileFuncs}, {"Async", testAsyncFuncs}, @@ -50,27 +45,87 @@ func TestBasicLogic(t *testing.T) { for _, entry := range table { t.Run(entry.name, func(t *testing.T) { table := []struct { - synced bool + synced bool + syncedBeforeFinish bool + start bool finish bool expectSynced bool }{ - {false, true, true, false}, - {true, true, false, false}, - {false, true, false, false}, - {true, true, true, true}, + {false, false, true, true, false}, + {true, false, true, false, false}, + {true, true, true, false, false}, + {false, false, true, false, false}, + {true, false, true, true, true}, + {true, true, true, true, true}, } for _, tt := range table { - Start, Finished, HasSynced := entry.construct(func() bool { return tt.synced }) + upstreamHasSynced, start, finished, hasSynced, synced := entry.construct() + syncedDone := func() bool { + select { + case <-synced: + return true + default: + return false + } + } + + if hasSynced() { + t.Errorf("for %#v got HasSynced() true before start (wanted false)", tt) + } + if syncedDone() { + t.Errorf("for %#v got Done() true before start (wanted false)", tt) + } + if tt.start { - Start() + start() + } + + if hasSynced() { + t.Errorf("for %#v got HasSynced() true after start (wanted false)", tt) + } + if syncedDone() { + t.Errorf("for %#v got Done() true after start (wanted false)", tt) + } + + // "upstream has synced" may occur before or after finished, but not before start. + if tt.synced && tt.syncedBeforeFinish { + upstreamHasSynced() + if hasSynced() { + t.Errorf("for %#v got HasSynced() true after upstreamHasSynced and before finish (wanted false)", tt) + } + if syncedDone() { + t.Errorf("for %#v got Done() true after upstreamHasSynced and before finish (wanted false)", tt) + } } if tt.finish { - Finished() + finished() } - got := HasSynced() - if e, a := tt.expectSynced, got; e != a { - t.Errorf("for %#v got %v (wanted %v)", tt, a, e) + if tt.synced && !tt.syncedBeforeFinish { + if hasSynced() { + t.Errorf("for %#v got HasSynced() true after finish and before upstreamHasSynced (wanted false)", tt) + } + if syncedDone() { + t.Errorf("for %#v got Done() true after finish and before upstreamHasSynced (wanted false)", tt) + } + upstreamHasSynced() + } + if e, a := tt.expectSynced, hasSynced(); e != a { + t.Errorf("for %#v got HasSynced() %v (wanted %v)", tt, a, e) + } + if e, a := tt.expectSynced, syncedDone(); e != a { + t.Errorf("for %#v got Done() %v (wanted %v)", tt, a, e) + } + + select { + case <-synced: + if !tt.expectSynced { + t.Errorf("for %#v got done (wanted not done)", tt) + } + default: + if tt.expectSynced { + t.Errorf("for %#v got done (wanted not done)", tt) + } } } }) @@ -78,7 +133,7 @@ func TestBasicLogic(t *testing.T) { } func TestAsyncLocking(t *testing.T) { - aft := AsyncTracker[int]{UpstreamHasSynced: func() bool { return true }} + aft := NewAsyncTracker[int]("") var wg sync.WaitGroup for _, i := range []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10} { @@ -92,6 +147,7 @@ func TestAsyncLocking(t *testing.T) { }(i) } wg.Wait() + aft.UpstreamHasSynced() if !aft.HasSynced() { t.Errorf("async tracker must have made a threading error?") } @@ -99,7 +155,7 @@ func TestAsyncLocking(t *testing.T) { } func TestSingleFileCounting(t *testing.T) { - sft := SingleFileTracker{UpstreamHasSynced: func() bool { return true }} + sft := NewSingleFileTracker("") for i := 0; i < 100; i++ { sft.Start() @@ -115,6 +171,8 @@ func TestSingleFileCounting(t *testing.T) { } sft.Finished() + sft.UpstreamHasSynced() + if !sft.HasSynced() { t.Fatal("Unexpectedly not synced?") } @@ -148,24 +206,35 @@ func TestSingleFileCounting(t *testing.T) { func TestSingleFile(t *testing.T) { table := []struct { - synced bool + synced bool + syncedBeforeStops bool + starts int stops int expectSynced bool }{ - {false, 1, 1, false}, - {true, 1, 0, false}, - {false, 1, 0, false}, - {true, 1, 1, true}, + {false, false, 1, 1, false}, + {true, false, 1, 0, false}, + {true, true, 1, 0, false}, + {false, false, 1, 0, false}, + {true, false, 1, 1, true}, + {true, true, 1, 1, true}, } for _, tt := range table { - sft := SingleFileTracker{UpstreamHasSynced: func() bool { return tt.synced }} + sft := NewSingleFileTracker("") for i := 0; i < tt.starts; i++ { sft.Start() } + // "upstream has synced" may occur before or after finished, but not before start. + if tt.synced && tt.syncedBeforeStops { + sft.UpstreamHasSynced() + } for i := 0; i < tt.stops; i++ { sft.Finished() } + if tt.synced && !tt.syncedBeforeStops { + sft.UpstreamHasSynced() + } got := sft.HasSynced() if e, a := tt.expectSynced, got; e != a { t.Errorf("for %#v got %v (wanted %v)", tt, a, e) @@ -173,67 +242,3 @@ func TestSingleFile(t *testing.T) { } } - -func TestNoStaleValue(t *testing.T) { - table := []struct { - name string - construct func(func() bool) (func(), func(), func() bool) - }{ - {"SingleFile", testSingleFileFuncs}, - {"Async", testAsyncFuncs}, - } - - for _, entry := range table { - t.Run(entry.name, func(t *testing.T) { - var lock sync.Mutex - upstreamHasSynced := func() bool { - lock.Lock() - defer lock.Unlock() - return true - } - - Start, Finished, HasSynced := entry.construct(upstreamHasSynced) - - // Ordinarily the corresponding lock would be held and you wouldn't be - // able to call this function at this point. - if !HasSynced() { - t.Fatal("Unexpectedly not synced??") - } - - Start() - if HasSynced() { - t.Fatal("Unexpectedly synced??") - } - Finished() - if !HasSynced() { - t.Fatal("Unexpectedly not synced??") - } - - // Now we will prove that if the lock is held, you can't get a false - // HasSynced return. - lock.Lock() - - // This goroutine calls HasSynced - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - if HasSynced() { - t.Error("Unexpectedly synced??") - } - }() - - // This goroutine increments + unlocks. The sleep is to bias the - // runtime such that the other goroutine usually wins (it needs to work - // in both orderings, this one is more likely to be buggy). - go func() { - time.Sleep(time.Millisecond) - Start() - lock.Unlock() - }() - - wg.Wait() - }) - } - -} diff --git a/staging/src/k8s.io/client-go/tools/cache/the_real_fifo.go b/staging/src/k8s.io/client-go/tools/cache/the_real_fifo.go index c60d2ebb8b2..4856a9bafff 100644 --- a/staging/src/k8s.io/client-go/tools/cache/the_real_fifo.go +++ b/staging/src/k8s.io/client-go/tools/cache/the_real_fifo.go @@ -34,6 +34,7 @@ type RealFIFOOptions struct { Logger *klog.Logger // Name can be used to override the default "RealFIFO" name for the new instance. + // Optional. Used only if Identifier.Name returns an empty string. Name string // KeyFunction is used to figure out what key an object should have. (It's @@ -98,6 +99,11 @@ type RealFIFO struct { items []Delta + // synced is initially an open channel. It gets closed (once!) by checkSynced_locked + // as soon as the initial sync is considered complete. + synced chan struct{} + syncedClosed bool + // populated is true if the first batch of items inserted by Replace() has been populated // or Delete/Add/Update was called first. populated bool @@ -160,6 +166,7 @@ type SyncAllInfo struct{} var ( _ = Queue(&RealFIFO{}) // RealFIFO is a Queue _ = TransformingStore(&RealFIFO{}) // RealFIFO implements TransformingStore to allow memory optimizations + _ = DoneChecker(&RealFIFO{}) // RealFIFO and implements DoneChecker. ) // Close the queue. @@ -196,11 +203,37 @@ func (f *RealFIFO) HasSynced() bool { return f.hasSynced_locked() } -// ignoring lint to reduce delta to the original for review. It's ok adjust later. -// -//lint:file-ignore ST1003: should not use underscores in Go names +// HasSyncedChecker is done if an Add/Update/Delete/AddIfNotPresent are called first, +// or the first batch of items inserted by Replace() has been popped. +func (f *RealFIFO) HasSyncedChecker() DoneChecker { + return f +} + +// Name implements [DoneChecker.Name] +func (f *RealFIFO) Name() string { + return f.name +} + +// Done implements [DoneChecker.Done] +func (f *RealFIFO) Done() <-chan struct{} { + return f.synced +} + +// hasSynced_locked returns the result of a prior checkSynced_locked call. func (f *RealFIFO) hasSynced_locked() bool { - return f.populated && f.initialPopulationCount == 0 + return f.syncedClosed +} + +// checkSynced_locked checks whether the initial batch of items (set via Replace) has been delivered +// and closes the synced channel as needed. It must be called after changing f.populated and/or +// f.initialPopulationCount while the mutex is still locked. +func (f *RealFIFO) checkSynced_locked() { + synced := f.populated && f.initialPopulationCount == 0 + if synced && !f.syncedClosed { + // Initial sync is complete. + f.syncedClosed = true + close(f.synced) + } } // addToItems_locked appends to the delta list. @@ -291,6 +324,7 @@ func (f *RealFIFO) Add(obj interface{}) error { defer f.lock.Unlock() f.populated = true + f.checkSynced_locked() retErr := f.addToItems_locked(Added, false, obj) return retErr @@ -302,6 +336,7 @@ func (f *RealFIFO) Update(obj interface{}) error { defer f.lock.Unlock() f.populated = true + f.checkSynced_locked() retErr := f.addToItems_locked(Updated, false, obj) return retErr @@ -315,6 +350,7 @@ func (f *RealFIFO) Delete(obj interface{}) error { defer f.lock.Unlock() f.populated = true + f.checkSynced_locked() retErr := f.addToItems_locked(Deleted, false, obj) return retErr @@ -362,6 +398,7 @@ func (f *RealFIFO) Pop(process PopProcessFunc) (interface{}, error) { defer func() { if f.initialPopulationCount > 0 { f.initialPopulationCount-- + f.checkSynced_locked() } }() @@ -482,7 +519,6 @@ func (f *RealFIFO) PopBatch(processBatch ProcessBatchFunc, processSingle PopProc unique.Insert(id) moveDeltaToProcessList(i) } - f.items = f.items[len(deltas):] // Decrement initialPopulationCount if needed. // This is done in a defer so we only do this *after* processing is complete, @@ -490,6 +526,7 @@ func (f *RealFIFO) PopBatch(processBatch ProcessBatchFunc, processSingle PopProc defer func() { if f.initialPopulationCount > 0 { f.initialPopulationCount -= len(deltas) + f.checkSynced_locked() } }() @@ -539,7 +576,7 @@ func (f *RealFIFO) Replace(newItems []interface{}, resourceVersion string) error if f.emitAtomicEvents { err = f.addReplaceToItemsLocked(newItems, resourceVersion) } else { - err = reconcileReplacement(f.items, f.knownObjects, newItems, f.keyOf, + err = reconcileReplacement(f.logger, f.items, f.knownObjects, newItems, f.keyOf, func(obj DeletedFinalStateUnknown) error { return f.addToItems_locked(Deleted, true, obj) }, @@ -554,6 +591,7 @@ func (f *RealFIFO) Replace(newItems []interface{}, resourceVersion string) error if !f.populated { f.populated = true f.initialPopulationCount = len(f.items) + f.checkSynced_locked() } return nil @@ -563,6 +601,7 @@ func (f *RealFIFO) Replace(newItems []interface{}, resourceVersion string) error // and based upon the state of the items in the queue and known objects will call onDelete and onReplace // depending upon whether the item is being deleted or replaced/added. func reconcileReplacement( + logger klog.Logger, queuedItems []Delta, knownObjects KeyListerGetter, newItems []interface{}, @@ -638,10 +677,10 @@ func reconcileReplacement( deletedObj, exists, err := knownObjects.GetByKey(knownKey) if err != nil { deletedObj = nil - utilruntime.HandleErrorWithLogger(klog.TODO(), err, "Error during lookup, placing DeleteFinalStateUnknown marker without object", "key", knownKey) + utilruntime.HandleErrorWithLogger(logger, err, "Error during lookup, placing DeleteFinalStateUnknown marker without object", "key", knownKey) } else if !exists { deletedObj = nil - utilruntime.HandleErrorWithLogger(klog.TODO(), nil, "Key does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", "key", knownKey) + utilruntime.HandleErrorWithLogger(logger, nil, "Key does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", "key", knownKey) } retErr := onDelete(DeletedFinalStateUnknown{ Key: knownKey, @@ -757,6 +796,7 @@ func NewRealFIFOWithOptions(opts RealFIFOOptions) *RealFIFO { logger: klog.Background(), name: "RealFIFO", items: make([]Delta, 0, 10), + synced: make(chan struct{}), keyFunc: opts.KeyFunction, knownObjects: opts.KnownObjects, transformer: opts.Transformer, @@ -769,8 +809,11 @@ func NewRealFIFOWithOptions(opts RealFIFOOptions) *RealFIFO { if opts.Logger != nil { f.logger = *opts.Logger } - if opts.Name != "" { - f.name = opts.Name + if name := opts.Name; name != "" { + f.name = name + } + if name := opts.Identifier.Name(); name != "" { + f.name = name } f.logger = klog.LoggerWithName(f.logger, f.name) f.cond.L = &f.lock diff --git a/staging/src/k8s.io/client-go/tools/cache/wait_test.go b/staging/src/k8s.io/client-go/tools/cache/wait_test.go new file mode 100644 index 00000000000..b5368368903 --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/cache/wait_test.go @@ -0,0 +1,200 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "bytes" + "context" + "errors" + "fmt" + "os" + "runtime" + "sync" + "testing" + "testing/synctest" + "time" + + "github.com/stretchr/testify/assert" + "k8s.io/klog/v2" + "k8s.io/klog/v2/textlogger" +) + +func init() { + // The test below is sensitive to the time zone, log output uses time.Local. + time.Local = time.UTC +} + +func TestWaitFor(t *testing.T) { + for name, tc := range map[string]struct { + what string + checkers []DoneChecker + timeout time.Duration + timeoutReason string + + expectDone bool + + // Time is predictable and starts at the synctest epoch. + // %[1]d is the pid, %[2]d the line number of the WaitFor call. + expectOutput string + }{ + "empty": { + expectDone: true, + }, + "no-caches": { + what: "my-caches", + expectDone: true, + expectOutput: `I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Waiting" for="my-caches" +`, + }, + "no-logging": { + checkers: []DoneChecker{newMockChecker("first", 10*time.Second), newMockChecker("second", 5*time.Second), newMockChecker("last", 0*time.Second)}, + expectDone: true, + }, + "with-logging": { + what: "my-caches", + checkers: []DoneChecker{newMockChecker("first", 10*time.Second), newMockChecker("second", 5*time.Second), newMockChecker("last", 0*time.Second)}, + expectDone: true, + expectOutput: `I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Waiting" for="my-caches" +I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="last" +I0101 00:00:05.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="second" +I0101 00:00:10.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="first" +`, + }, + "some-timeout": { + timeout: time.Minute, + what: "my-caches", + checkers: []DoneChecker{newMockChecker("first", 10*time.Second), newMockChecker("second", -1), newMockChecker("last", 0*time.Second)}, + expectDone: false, + expectOutput: `I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Waiting" for="my-caches" +I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="last" +I0101 00:00:10.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="first" +I0101 00:01:00.000000 %7[1]d wait_test.go:%[2]d] "Timed out waiting" for="my-caches" cause="context deadline exceeded" instances=["second"] +`, + }, + "some-canceled": { + timeout: -1, + what: "my-caches", + checkers: []DoneChecker{newMockChecker("first", 10*time.Second), newMockChecker("second", -1), newMockChecker("last", 0*time.Second)}, + expectDone: false, + expectOutput: `I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Waiting" for="my-caches" +I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="last" +I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Timed out waiting" for="my-caches" cause="context canceled" instances=["first","second"] +`, + }, + "more": { + timeoutReason: "go fish", + timeout: 5 * time.Second, + what: "my-caches", + checkers: []DoneChecker{newMockChecker("first", 10*time.Second), newMockChecker("second", -1), newMockChecker("last", 0*time.Second)}, + expectDone: false, + expectOutput: `I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Waiting" for="my-caches" +I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="last" +I0101 00:00:05.000000 %7[1]d wait_test.go:%[2]d] "Timed out waiting" for="my-caches" cause="go fish" instances=["first","second"] +`, + }, + "all": { + timeout: time.Minute, + what: "my-caches", + checkers: []DoneChecker{newMockChecker("first", -1), newMockChecker("second", -1), newMockChecker("last", -1)}, + expectDone: false, + expectOutput: `I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Waiting" for="my-caches" +I0101 00:01:00.000000 %7[1]d wait_test.go:%[2]d] "Timed out waiting" for="my-caches" cause="context deadline exceeded" instances=["first","last","second"] +`, + }, + } { + t.Run(name, func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + var buffer bytes.Buffer + logger := textlogger.NewLogger(textlogger.NewConfig(textlogger.Output(&buffer))) + ctx := klog.NewContext(context.Background(), logger) + var wg sync.WaitGroup + defer wg.Wait() + if tc.timeout != 0 { + switch tc.timeoutReason { + case "": + if tc.timeout > 0 { + c, cancel := context.WithTimeout(ctx, tc.timeout) + defer cancel() + ctx = c + } else { + c, cancel := context.WithCancel(ctx) + cancel() + ctx = c + } + default: + c, cancel := context.WithCancelCause(ctx) + wg.Go(func() { + time.Sleep(tc.timeout) + cancel(errors.New(tc.timeoutReason)) + }) + ctx = c + } + } + _, _, line, _ := runtime.Caller(0) + done := WaitFor(ctx, tc.what, tc.checkers...) + expectOutput := tc.expectOutput + if expectOutput != "" { + expectOutput = fmt.Sprintf(expectOutput, os.Getpid(), line+1) + } + assert.Equal(t, tc.expectDone, done, "done") + assert.Equal(t, expectOutput, buffer.String(), "output") + }) + }) + } +} + +// newMockChecker can be created outside of a synctest bubble. +// It constructs the channel inside when Done is first called. +func newMockChecker(name string, delay time.Duration) DoneChecker { + return &mockChecker{ + name: name, + delay: delay, + } +} + +type mockChecker struct { + name string + delay time.Duration + initialized bool + done <-chan struct{} +} + +func (m *mockChecker) Name() string { return m.name } +func (m *mockChecker) Done() <-chan struct{} { + if !m.initialized { + switch { + case m.delay > 0: + // In the future. + ctx := context.Background() + // This leaks a cancel, but is hard to avoid (cannot use the parent t.Cleanup, no other way to delay calling it). Doesn't matter in a unit test. + //nolint:govet + ctx, _ = context.WithTimeout(ctx, m.delay) + m.done = ctx.Done() + case m.delay == 0: + // Immediately. + c := make(chan struct{}) + close(c) + m.done = c + default: + // Never. + c := make(chan struct{}) + m.done = c + } + m.initialized = true + } + return m.done +} diff --git a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/tracker/tracker.go b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/tracker/tracker.go index 5ca9719b288..2c94f309e49 100644 --- a/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/tracker/tracker.go +++ b/staging/src/k8s.io/dynamic-resource-allocation/resourceslice/tracker/tracker.go @@ -71,8 +71,16 @@ type Tracker struct { // may be overridden in tests. handleError func(context.Context, error, string, ...any) + // wg and cancel track resp. kill goroutines. + wg sync.WaitGroup + cancel func(error) + // Synchronizes updates to these fields related to event handlers. rwMutex sync.RWMutex + + // synced gets closed once all the tracker's event handlers are synced. + synced chan struct{} + // All registered event handlers. eventHandlers []cache.ResourceEventHandler // The eventQueue contains functions which deliver an event to one @@ -155,6 +163,8 @@ func newTracker(ctx context.Context, opts Options) (finalT *Tracker, finalErr er deviceClasses: opts.ClassInformer.Informer(), patchedResourceSlices: cache.NewStore(cache.MetaNamespaceKeyFunc), handleError: utilruntime.HandleErrorWithContext, + synced: make(chan struct{}), + cancel: func(error) {}, // Real function set in initInformers. eventQueue: *buffer.NewRing[func()](buffer.RingOptions{InitialSize: 0, NormalSize: 4}), } defer func() { @@ -212,6 +222,22 @@ func (t *Tracker) initInformers(ctx context.Context) error { return fmt.Errorf("add event handler for DeviceClasses: %w", err) } + // This usually short-lived goroutines monitors our upstream event handlers and + // closes our own synced channel when they are synced. + monitorCtx, cancel := context.WithCancelCause(ctx) + t.cancel = cancel + t.wg.Go(func() { + for _, handle := range []cache.ResourceEventHandlerRegistration{t.resourceSlicesHandle, t.deviceTaintsHandle, t.deviceClassesHandle} { + select { + case <-handle.HasSyncedChecker().Done(): + case <-monitorCtx.Done(): + // Abort without closing our synced channel. + return + } + } + close(t.synced) + }) + return nil } @@ -220,21 +246,28 @@ func (t *Tracker) initInformers(ctx context.Context) error { // point is possible and will emit events with up-to-date ResourceSlice // objects. func (t *Tracker) HasSynced() bool { + select { + case <-t.HasSyncedChecker().Done(): + return true + default: + return false + } +} + +func (t *Tracker) HasSyncedChecker() cache.DoneChecker { if !t.enableDeviceTaintRules { - return t.resourceSlices.HasSynced() + return t.resourceSlices.HasSyncedChecker() } - if t.resourceSlicesHandle != nil && !t.resourceSlicesHandle.HasSynced() { - return false - } - if t.deviceTaintsHandle != nil && !t.deviceTaintsHandle.HasSynced() { - return false - } - if t.deviceClassesHandle != nil && !t.deviceClassesHandle.HasSynced() { - return false - } + return trackerHasSynced{t} +} - return true +type trackerHasSynced struct{ t *Tracker } + +func (s trackerHasSynced) Name() string { return "ResourceSlice tracker" } + +func (s trackerHasSynced) Done() <-chan struct{} { + return s.t.synced } // Stop ends all background activity and blocks until that shutdown is complete. @@ -243,12 +276,16 @@ func (t *Tracker) Stop() { return } + t.cancel(errors.New("stopped")) + if t.broadcaster != nil { t.broadcaster.Shutdown() } _ = t.resourceSlices.RemoveEventHandler(t.resourceSlicesHandle) _ = t.deviceTaints.RemoveEventHandler(t.deviceTaintsHandle) _ = t.deviceClasses.RemoveEventHandler(t.deviceClassesHandle) + + t.wg.Wait() } // ListPatchedResourceSlices returns all ResourceSlices in the cluster with