client-go cache: wait for cache sync via channels, better logging

The main advantage is that waiting on channels creates a causal relationship
between goroutines which is visible to synctest. When a controller in a
synctest bubble does a WaitFor in a test's background goroutine for the
controller, the test can use synctest.Wait to wait for completion of cache
sync, without requiring any test specific "has controller synced" API. Without
this, the test had to poll or otherwise wait for the controller.

The polling in WaitForCacheSync moved the virtual clock forward by a random
amount, depending on how often it had to check in wait.Poll. Now tests can be
written such that all events during a test happen at a predictable time. This
will be demonstrated in a separate commit for the
pkg/controller/devicetainteviction unit test.

The benefit for normal production is immediate continuation when the last
informer is synced (not really a problem, but still...) and more important,
nicer logging thanks to the names associated with the thing that is being
waited for. The caller decides whether logging is enabled or disabled and
describes what is being waited for (typically informer caches, but maybe also
event handlers or even something else entirely as long as it implements the
DoneChecker interface).

Before:

    Waiting for caches to sync
    Caches are synced

After:

    Waiting for="cache and event handler sync"
    Done waiting for="cache and event handler sync" instance="SharedIndexInformer *v1.Pod"
    Done waiting for="cache and event handler sync" instance="SharedIndexInformer *v1.ResourceClaim"
    Done waiting for="cache and event handler sync" instance="SharedIndexInformer *v1.ResourceSlice"
    Done waiting for="cache and event handler sync" instance="SharedIndexInformer *v1.DeviceClass"
    Done waiting for="cache and event handler sync" instance="SharedIndexInformer *v1alpha3.DeviceTaintRule"
    Done waiting for="cache and event handler sync" instance="SharedIndexInformer *v1.ResourceClaim + event handler k8s.io/kubernetes/pkg/controller/devicetainteviction.(*Controller).Run"
    Done waiting for="cache and event handler sync" instance="SharedIndexInformer *v1.Pod + event handler k8s.io/kubernetes/pkg/controller/devicetainteviction.(*Controller).Run"
    Done waiting for="cache and event handler sync" instance="SharedIndexInformer *v1alpha3.DeviceTaintRule + event handler k8s.io/kubernetes/pkg/controller/devicetainteviction.(*Controller).Run"
    Done waiting for="cache and event handler sync" instance="SharedIndexInformer *v1.ResourceSlice + event handler k8s.io/kubernetes/pkg/controller/devicetainteviction.(*Controller).Run"

The "SharedIndexInformer *v1.Pod" is also how this appears in metrics.
This commit is contained in:
Patrick Ohly 2025-11-18 12:39:11 +01:00
parent 45251e5f65
commit fdcbb6cba9
18 changed files with 1118 additions and 200 deletions

View file

@ -508,7 +508,11 @@ func (c *AssumeCache) AddEventHandler(handler cache.ResourceEventHandler) cache.
if c.handlerRegistration == nil {
// No informer, so immediately synced.
return syncedHandlerRegistration{}
s := syncedHandlerRegistration{
synced: make(chan struct{}),
}
close(s.synced)
return s
}
return c.handlerRegistration
@ -557,6 +561,14 @@ func (c *AssumeCache) emitEvents() {
// syncedHandlerRegistration is an implementation of ResourceEventHandlerRegistration
// which always returns true.
type syncedHandlerRegistration struct{}
type syncedHandlerRegistration struct {
synced chan struct{}
}
func (syncedHandlerRegistration) HasSynced() bool { return true }
func (s syncedHandlerRegistration) HasSyncedChecker() cache.DoneChecker { return s }
func (s syncedHandlerRegistration) Name() string { return "AssumeCache" }
func (s syncedHandlerRegistration) Done() <-chan struct{} { return s.synced }

View file

@ -21,7 +21,6 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
kerrors "k8s.io/apimachinery/pkg/api/errors"
@ -48,10 +47,7 @@ type controller[T runtime.Object] struct {
options ControllerOptions
// must hold a func() bool or nil
notificationsDelivered atomic.Value
hasProcessed synctrack.AsyncTracker[string]
hasProcessed *synctrack.AsyncTracker[string]
}
type ControllerOptions struct {
@ -77,17 +73,11 @@ func NewController[T runtime.Object](
}
c := &controller[T]{
options: options,
informer: informer,
reconciler: reconciler,
queue: nil,
}
c.hasProcessed.UpstreamHasSynced = func() bool {
f := c.notificationsDelivered.Load()
if f == nil {
return false
}
return f.(func() bool)()
options: options,
informer: informer,
reconciler: reconciler,
queue: nil,
hasProcessed: synctrack.NewAsyncTracker[string](options.Name),
}
return c
}
@ -159,12 +149,9 @@ func (c *controller[T]) Run(ctx context.Context) error {
return err
}
c.notificationsDelivered.Store(registration.HasSynced)
// Make sure event handler is removed from informer in case return early from
// an error
defer func() {
c.notificationsDelivered.Store(func() bool { return false })
// Remove event handler and Handle Error here. Error should only be raised
// for improper usage of event handler API.
if err := c.informer.RemoveEventHandler(registration); err != nil {
@ -174,7 +161,12 @@ func (c *controller[T]) Run(ctx context.Context) error {
// Wait for initial cache list to complete before beginning to reconcile
// objects.
if !cache.WaitForNamedCacheSyncWithContext(ctx, c.informer.HasSynced) {
if !cache.WaitFor(ctx, "caches", c.informer.HasSyncedChecker(), registration.HasSyncedChecker()) {
// TODO: should cache.WaitFor return an error?
// ctx.Err() or context.Cause(ctx)?
// Either of them would make dead code like the "if err == nil"
// below more obvious.
// ctx cancelled during cache sync. return early
err := ctx.Err()
if err == nil {
@ -184,6 +176,10 @@ func (c *controller[T]) Run(ctx context.Context) error {
return err
}
// c.informer *and* our handler have synced, which implies that our AddFunc(= enqueue)
// and thus c.hasProcessed.Start have been called for the initial list => upstream is done.
c.hasProcessed.UpstreamHasSynced()
waitGroup := sync.WaitGroup{}
for i := uint(0); i < c.options.Workers; i++ {

View file

@ -142,6 +142,11 @@ type Controller interface {
// HasSynced delegates to the Config's Queue
HasSynced() bool
// HasSyncedChecker enables waiting for syncing without polling.
// The returned DoneChecker can be passed to WaitFor.
// It delegates to the Config's Queue.
HasSyncedChecker() DoneChecker
// LastSyncResourceVersion delegates to the Reflector when there
// is one, otherwise returns the empty string
LastSyncResourceVersion() string
@ -168,11 +173,13 @@ func (c *controller) RunWithContext(ctx context.Context) {
<-ctx.Done()
c.config.Queue.Close()
}()
logger := klog.FromContext(ctx)
r := NewReflectorWithOptions(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
ReflectorOptions{
Logger: &logger,
ResyncPeriod: c.config.FullResyncPeriod,
MinWatchTimeout: c.config.MinWatchTimeout,
TypeDescription: c.config.ObjectDescription,
@ -206,6 +213,13 @@ func (c *controller) HasSynced() bool {
return c.config.Queue.HasSynced()
}
// HasSyncedChecker enables waiting for syncing without polling.
// The returned DoneChecker can be passed to [WaitFor].
// It delegates to the Config's Queue.
func (c *controller) HasSyncedChecker() DoneChecker {
return c.config.Queue.HasSyncedChecker()
}
func (c *controller) LastSyncResourceVersion() string {
c.reflectorMutex.RLock()
defer c.reflectorMutex.RUnlock()
@ -591,6 +605,7 @@ func NewTransformingIndexerInformer(
// Multiplexes updates in the form of a list of Deltas into a Store, and informs
// a given handler of events OnUpdate, OnAdd, OnDelete
func processDeltas(
logger klog.Logger,
// Object which receives event notifications from the given deltas
handler ResourceEventHandler,
clientState Store,
@ -608,7 +623,7 @@ func processDeltas(
if !ok {
return fmt.Errorf("ReplacedAll did not contain ReplacedAllInfo: %T", obj)
}
if err := processReplacedAllInfo(handler, info, clientState, isInInitialList, keyFunc); err != nil {
if err := processReplacedAllInfo(logger, handler, info, clientState, isInInitialList, keyFunc); err != nil {
return err
}
case SyncAll:
@ -653,6 +668,7 @@ func processDeltas(
// Returns an error if any Delta or transaction fails. For TransactionError,
// only successful operations trigger callbacks.
func processDeltasInBatch(
logger klog.Logger,
handler ResourceEventHandler,
clientState Store,
deltas []Delta,
@ -666,7 +682,7 @@ func processDeltasInBatch(
if !txnSupported {
var errs []error
for _, delta := range deltas {
if err := processDeltas(handler, clientState, Deltas{delta}, isInInitialList, keyFunc); err != nil {
if err := processDeltas(logger, handler, clientState, Deltas{delta}, isInInitialList, keyFunc); err != nil {
errs = append(errs, err)
}
}
@ -731,7 +747,7 @@ func processDeltasInBatch(
return nil
}
func processReplacedAllInfo(handler ResourceEventHandler, info ReplacedAllInfo, clientState Store, isInInitialList bool, keyFunc KeyFunc) error {
func processReplacedAllInfo(logger klog.Logger, handler ResourceEventHandler, info ReplacedAllInfo, clientState Store, isInInitialList bool, keyFunc KeyFunc) error {
var deletions []DeletedFinalStateUnknown
type replacement struct {
oldObj interface{}
@ -739,7 +755,7 @@ func processReplacedAllInfo(handler ResourceEventHandler, info ReplacedAllInfo,
}
replacements := make([]replacement, 0, len(info.Objects))
err := reconcileReplacement(nil, clientState, info.Objects, keyFunc,
err := reconcileReplacement(logger, nil, clientState, info.Objects, keyFunc,
func(obj DeletedFinalStateUnknown) error {
deletions = append(deletions, obj)
return nil
@ -792,7 +808,7 @@ func newInformer(clientState Store, options InformerOptions, keyFunc KeyFunc) Co
if options.Logger != nil {
logger = *options.Logger
}
fifo := newQueueFIFO(logger, clientState, options.Transform, options.Identifier, options.FIFOMetricsProvider)
logger, fifo := newQueueFIFO(logger, options.ObjectType, clientState, options.Transform, options.Identifier, options.FIFOMetricsProvider)
cfg := &Config{
Queue: fifo,
@ -803,21 +819,30 @@ func newInformer(clientState Store, options InformerOptions, keyFunc KeyFunc) Co
Process: func(obj interface{}, isInInitialList bool) error {
if deltas, ok := obj.(Deltas); ok {
return processDeltas(options.Handler, clientState, deltas, isInInitialList, keyFunc)
// This must be the logger *of the fifo*.
return processDeltas(logger, options.Handler, clientState, deltas, isInInitialList, keyFunc)
}
return errors.New("object given as Process argument is not Deltas")
},
ProcessBatch: func(deltaList []Delta, isInInitialList bool) error {
return processDeltasInBatch(options.Handler, clientState, deltaList, isInInitialList, keyFunc)
// Same here.
return processDeltasInBatch(logger, options.Handler, clientState, deltaList, isInInitialList, keyFunc)
},
}
return New(cfg)
}
func newQueueFIFO(logger klog.Logger, clientState Store, transform TransformFunc, identifier InformerNameAndResource, metricsProvider FIFOMetricsProvider) Queue {
// newQueueFIFO constructs a new FIFO, choosing between real and delta FIFO
// depending on the InOrderInformers feature gate.
//
// It returns the FIFO and the logger used by the FIFO.
// That logger includes the name used for the FIFO,
// in contrast to the logger which was passed in.
func newQueueFIFO(logger klog.Logger, objectType any, clientState Store, transform TransformFunc, identifier InformerNameAndResource, metricsProvider FIFOMetricsProvider) (klog.Logger, Queue) {
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) {
options := RealFIFOOptions{
Logger: &logger,
Name: fmt.Sprintf("RealFIFO %T", objectType),
KeyFunction: MetaNamespaceKeyFunc,
Transformer: transform,
Identifier: identifier,
@ -830,13 +855,16 @@ func newQueueFIFO(logger klog.Logger, clientState Store, transform TransformFunc
} else {
options.KnownObjects = clientState
}
return NewRealFIFOWithOptions(options)
f := NewRealFIFOWithOptions(options)
return f.logger, f
} else {
return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
Logger: &logger,
Name: fmt.Sprintf("DeltaFIFO %T", objectType),
KnownObjects: clientState,
EmitDeltaTypeReplaced: true,
Transformer: transform,
})
return f.logger, f
}
}

View file

@ -26,6 +26,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
fcache "k8s.io/client-go/tools/cache/testing"
"k8s.io/klog/v2/ktesting"
)
const handlerWaitTime = time.Millisecond
@ -33,7 +34,8 @@ const handlerWaitTime = time.Millisecond
func BenchmarkAddWithSlowHandlers(b *testing.B) {
for _, unlockWhileProcessing := range []bool{false, true} {
b.Run(fmt.Sprintf("unlockWhileProcessing=%t", unlockWhileProcessing), func(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
logger, ctx := ktesting.NewTestContext(b)
ctx, cancel := context.WithCancel(ctx)
source := fcache.NewFakeControllerSource()
b.Cleanup(func() {
cancel()
@ -65,12 +67,12 @@ func BenchmarkAddWithSlowHandlers(b *testing.B) {
Process: func(obj interface{}, isInInitialList bool) error {
if deltas, ok := obj.(Deltas); ok {
return processDeltas(handler, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc)
return processDeltas(logger, handler, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc)
}
return errors.New("object given as Process argument is not Deltas")
},
ProcessBatch: func(deltaList []Delta, isInInitialList bool) error {
return processDeltasInBatch(handler, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc)
return processDeltasInBatch(logger, handler, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc)
},
}
c := New(cfg)

View file

@ -834,6 +834,7 @@ func TestProcessDeltasInBatch(t *testing.T) {
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
mockStore := &mockTxnStore{
Store: NewStore(MetaNamespaceKeyFunc),
failingObjs: tc.failingObjects,
@ -851,6 +852,7 @@ func TestProcessDeltasInBatch(t *testing.T) {
},
}
err := processDeltasInBatch(
logger,
dummyListener,
mockStore,
tc.deltaList,
@ -929,12 +931,12 @@ func TestReplaceEvents(t *testing.T) {
Process: func(obj interface{}, isInInitialList bool) error {
if deltas, ok := obj.(Deltas); ok {
return processDeltas(recorder, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc)
return processDeltas(fifo.logger, recorder, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc)
}
return errors.New("object given as Process argument is not Deltas")
},
ProcessBatch: func(deltaList []Delta, isInInitialList bool) error {
return processDeltasInBatch(recorder, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc)
return processDeltasInBatch(fifo.logger, recorder, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc)
},
}
c := New(cfg)
@ -1066,12 +1068,12 @@ func TestResetWatch(t *testing.T) {
Process: func(obj interface{}, isInInitialList bool) error {
if deltas, ok := obj.(Deltas); ok {
return processDeltas(recorder, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc)
return processDeltas(fifo.logger, recorder, store, deltas, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc)
}
return errors.New("object given as Process argument is not Deltas")
},
ProcessBatch: func(deltaList []Delta, isInInitialList bool) error {
return processDeltasInBatch(recorder, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc)
return processDeltasInBatch(fifo.logger, recorder, store, deltaList, isInInitialList, DeletionHandlingMetaNamespaceKeyFunc)
},
}
c := New(cfg)

View file

@ -126,6 +126,11 @@ type DeltaFIFO struct {
// A key is in `queue` if and only if it is in `items`.
queue []string
// synced is initially an open channel. It gets closed (once!) by checkSynced_locked
// as soon as the initial sync is considered complete.
synced chan struct{}
syncedClosed bool
// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update/AddIfNotPresent was called first.
populated bool
@ -272,6 +277,7 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
f := &DeltaFIFO{
logger: klog.Background(),
name: "DeltaFIFO",
synced: make(chan struct{}),
items: map[string]Deltas{},
queue: []string{},
keyFunc: opts.KeyFunction,
@ -283,8 +289,8 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
if opts.Logger != nil {
f.logger = *opts.Logger
}
if opts.Name != "" {
f.name = opts.Name
if name := opts.Name; name != "" {
f.name = name
}
f.logger = klog.LoggerWithName(f.logger, f.name)
f.cond.L = &f.lock
@ -294,6 +300,7 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
var (
_ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
_ = TransformingStore(&DeltaFIFO{}) // DeltaFIFO implements TransformingStore to allow memory optimizations
_ = DoneChecker(&DeltaFIFO{}) // DeltaFIFO implements DoneChecker.
)
var (
@ -339,8 +346,36 @@ func (f *DeltaFIFO) HasSynced() bool {
return f.hasSynced_locked()
}
// HasSyncedChecker is done if an Add/Update/Delete/AddIfNotPresent are called first,
// or the first batch of items inserted by Replace() has been popped.
func (f *DeltaFIFO) HasSyncedChecker() DoneChecker {
return f
}
// Name implements [DoneChecker.Name]
func (f *DeltaFIFO) Name() string {
return f.name
}
// Done implements [DoneChecker.Done]
func (f *DeltaFIFO) Done() <-chan struct{} {
return f.synced
}
// hasSynced_locked returns the result of a prior checkSynced_locked call.
func (f *DeltaFIFO) hasSynced_locked() bool {
return f.populated && f.initialPopulationCount == 0
return f.syncedClosed
}
// checkSynced_locked checks whether the initial is completed.
// It must be called whenever populated or initialPopulationCount change.
func (f *DeltaFIFO) checkSynced_locked() {
synced := f.populated && f.initialPopulationCount == 0
if synced && !f.syncedClosed {
// Initial sync is complete.
f.syncedClosed = true
close(f.synced)
}
}
// Add inserts an item, and puts it in the queue. The item is only enqueued
@ -349,6 +384,7 @@ func (f *DeltaFIFO) Add(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
f.checkSynced_locked()
return f.queueActionLocked(Added, obj)
}
@ -357,6 +393,7 @@ func (f *DeltaFIFO) Update(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
f.checkSynced_locked()
return f.queueActionLocked(Updated, obj)
}
@ -373,6 +410,7 @@ func (f *DeltaFIFO) Delete(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
f.checkSynced_locked()
if f.knownObjects == nil {
if _, exists := f.items[id]; !exists {
// Presumably, this was deleted when a relist happened.
@ -538,6 +576,7 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
depth := len(f.queue)
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
f.checkSynced_locked()
}
item, ok := f.items[id]
if !ok {
@ -650,6 +689,7 @@ func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
if !f.populated {
f.populated = true
f.initialPopulationCount = keys.Len() + queuedDeletions
f.checkSynced_locked()
}
return nil

View file

@ -0,0 +1,119 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"fmt"
"reflect"
"runtime"
"strings"
)
func nameForHandler(handler ResourceEventHandler) (name string) {
defer func() {
// Last resort: let Sprintf handle it.
if name == "" {
name = fmt.Sprintf("%T", handler)
}
}()
if handler == nil {
return ""
}
switch handler := handler.(type) {
case *ResourceEventHandlerFuncs:
return nameForHandlerFuncs(*handler)
case ResourceEventHandlerFuncs:
return nameForHandlerFuncs(handler)
default:
// We can use the fully qualified name of whatever
// provides the interface. We don't care whether
// it contains fields or methods which provide
// the interface methods.
value := reflect.ValueOf(handler)
if value.Type().Kind() == reflect.Interface {
// Probably not needed, but let's play it safe.
value = value.Elem()
}
if value.Type().Kind() == reflect.Pointer {
value = value.Elem()
}
name := value.Type().PkgPath()
if name != "" {
name += "."
}
if typeName := value.Type().Name(); typeName != "" {
name += typeName
}
return name
}
}
func nameForHandlerFuncs(funcs ResourceEventHandlerFuncs) string {
return nameForFunctions(funcs.AddFunc, funcs.UpdateFunc, funcs.DeleteFunc)
}
func nameForFunctions(fs ...any) string {
// If all functions are defined in the same place, then we
// don't care about the actual function name in
// e.g. "main.FuncName" or "main.(*Foo).FuncName-fm", instead
// we use the common qualifier.
//
// But we don't know that yet, so we also collect all names.
var qualifier string
singleQualifier := true
var names []string
for _, f := range fs {
if f == nil {
continue
}
name := nameForFunction(f)
if name == "" {
continue
}
names = append(names, name)
newQualifier := name
index := strings.LastIndexByte(newQualifier, '.')
if index > 0 {
newQualifier = newQualifier[:index]
}
switch qualifier {
case "":
qualifier = newQualifier
case newQualifier:
// So far, so good...
default:
// Nope, different.
singleQualifier = false
}
}
if singleQualifier {
return qualifier
}
return strings.Join(names, "+")
}
func nameForFunction(f any) string {
fn := runtime.FuncForPC(reflect.ValueOf(f).Pointer())
if fn == nil {
return ""
}
return fn.Name()
}

View file

@ -0,0 +1,87 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"testing"
)
type mockHandler struct{}
func (m mockHandler) OnAdd(any, bool) {}
func (m mockHandler) OnUpdate(any, any) {}
func (m mockHandler) OnDelete(any) {}
func TestNameForHandler(t *testing.T) {
emptyHandler := ResourceEventHandlerFuncs{}
for name, tc := range map[string]struct {
handler ResourceEventHandler
wantName string
}{
"mixture": {
handler: ResourceEventHandlerFuncs{
UpdateFunc: emptyHandler.OnUpdate,
DeleteFunc: func(any) {},
},
wantName: "k8s.io/client-go/tools/cache.ResourceEventHandlerFuncs.OnUpdate-fm+k8s.io/client-go/tools/cache.TestNameForHandler.func1", // Testcase must come first to get func1.
},
"add": {
handler: ResourceEventHandlerFuncs{AddFunc: func(any) {}},
wantName: "k8s.io/client-go/tools/cache.TestNameForHandler",
},
"update": {
handler: ResourceEventHandlerFuncs{UpdateFunc: func(any, any) {}},
wantName: "k8s.io/client-go/tools/cache.TestNameForHandler",
},
"delete": {
handler: ResourceEventHandlerFuncs{DeleteFunc: func(any) {}},
wantName: "k8s.io/client-go/tools/cache.TestNameForHandler",
},
"all": {
handler: ResourceEventHandlerFuncs{
AddFunc: func(any) {},
UpdateFunc: func(any, any) {},
DeleteFunc: func(any) {},
},
wantName: "k8s.io/client-go/tools/cache.TestNameForHandler",
},
"ptrToFuncs": {
handler: &ResourceEventHandlerFuncs{AddFunc: func(any) {}},
wantName: "k8s.io/client-go/tools/cache.TestNameForHandler",
},
"struct": {
handler: mockHandler{},
wantName: "k8s.io/client-go/tools/cache.mockHandler",
},
"ptrToStruct": {
handler: &mockHandler{},
wantName: "k8s.io/client-go/tools/cache.mockHandler",
},
"nil": {
handler: nil,
wantName: "<nil>",
},
} {
t.Run(name, func(t *testing.T) {
gotName := nameForHandler(tc.handler)
if gotName != tc.wantName {
t.Errorf("Got name:\n %s\nWanted name:\n %s", gotName, tc.wantName)
}
})
}
}

View file

@ -58,6 +58,12 @@ type Queue interface {
// Update, or Delete; otherwise the first batch is empty.
HasSynced() bool
// HasSyncedChecker is done once the first batch of keys have all been
// popped. The first batch of keys are those of the first Replace
// operation if that happened before any Add, AddIfNotPresent,
// Update, or Delete; otherwise the first batch is empty.
HasSyncedChecker() DoneChecker
// Close the queue
Close()
}
@ -110,6 +116,11 @@ type FIFO struct {
items map[string]interface{}
queue []string
// synced is initially an open channel. It gets closed (once!) by checkSynced
// as soon as the initial sync is considered complete.
synced chan struct{}
syncedClosed bool
// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update was called first.
populated bool
@ -127,7 +138,8 @@ type FIFO struct {
}
var (
_ = Queue(&FIFO{}) // FIFO is a Queue
_ = Queue(&FIFO{}) // FIFO is a Queue
_ = DoneChecker(&FIFO{}) // ... and implements DoneChecker.
)
// Close the queue.
@ -146,8 +158,36 @@ func (f *FIFO) HasSynced() bool {
return f.hasSynced_locked()
}
// HasSyncedChecker is done if an Add/Update/Delete/AddIfNotPresent are called first,
// or the first batch of items inserted by Replace() has been popped.
func (f *FIFO) HasSyncedChecker() DoneChecker {
return f
}
// Name implements [DoneChecker.Name]
func (f *FIFO) Name() string {
return "FIFO" // FIFO doesn't seem to be used outside of a few tests, so changing the NewFIFO API to pass in a name doesn't seem worth it.
}
// Done implements [DoneChecker.Done]
func (f *FIFO) Done() <-chan struct{} {
return f.synced
}
// hasSynced_locked returns the result of a prior checkSynced call.
func (f *FIFO) hasSynced_locked() bool {
return f.populated && f.initialPopulationCount == 0
return f.syncedClosed
}
// checkSynced checks whether the initial sync is completed.
// It must be called whenever populated or initialPopulationCount change
// while the mutex is still locked.
func (f *FIFO) checkSynced() {
synced := f.populated && f.initialPopulationCount == 0
if synced && !f.syncedClosed {
f.syncedClosed = true
close(f.synced)
}
}
// Add inserts an item, and puts it in the queue. The item is only enqueued
@ -160,6 +200,7 @@ func (f *FIFO) Add(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
f.checkSynced()
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)
}
@ -184,6 +225,7 @@ func (f *FIFO) Delete(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
f.checkSynced()
delete(f.items, id)
return err
}
@ -220,6 +262,8 @@ func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.queue = f.queue[1:]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
// Must be done *after* process has completed.
defer f.checkSynced()
}
item, ok := f.items[id]
if !ok {
@ -252,6 +296,7 @@ func (f *FIFO) Replace(list []interface{}, resourceVersion string) error {
if !f.populated {
f.populated = true
f.initialPopulationCount = len(items)
f.checkSynced()
}
f.items = items
@ -290,6 +335,7 @@ func (f *FIFO) Resync() error {
// process.
func NewFIFO(keyFunc KeyFunc) *FIFO {
f := &FIFO{
synced: make(chan struct{}),
items: map[string]interface{}{},
queue: []string{},
keyFunc: keyFunc,

View file

@ -17,6 +17,7 @@ limitations under the License.
package cache
import (
"context"
"sync"
"testing"
"time"
@ -29,6 +30,25 @@ const (
concurrencyLevel = 5
)
type mockSynced struct {
context.Context
cancel func()
}
func newMockSynced(tb testing.TB, synced bool) *mockSynced {
m := &mockSynced{}
m.Context, m.cancel = context.WithCancel(context.Background())
if synced {
m.cancel()
}
tb.Cleanup(m.cancel)
return m
}
func (m *mockSynced) Name() string {
return "mock"
}
func BenchmarkListener(b *testing.B) {
var notification addNotification
@ -40,7 +60,7 @@ func BenchmarkListener(b *testing.B) {
AddFunc: func(obj interface{}) {
swg.Done()
},
}, 0, 0, time.Now(), 1024*1024, func() bool { return true })
}, 0, 0, time.Now(), 1024*1024, newMockSynced(b, true))
var wg wait.Group
defer wg.Wait() // Wait for .run and .pop to stop
defer close(pl.addCh) // Tell .run and .pop to stop

View file

@ -73,7 +73,7 @@ func runTestReflectorDataConsistencyDetector(t *testing.T, transformer Transform
defer cancel()
store := NewStore(MetaNamespaceKeyFunc)
fifo := newQueueFIFO(logger, store, transformer, InformerNameAndResource{}, nil)
_, fifo := newQueueFIFO(logger, nil, store, transformer, InformerNameAndResource{}, nil)
lw := &ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {

View file

@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"slices"
"sync"
"sync/atomic"
"time"
@ -194,6 +195,14 @@ type SharedInformer interface {
// For that, please call HasSynced on the handle returned by
// AddEventHandler.
HasSynced() bool
// HasSyncedChecker completes if the shared informer's store has been
// informed by at least one full LIST of the authoritative state
// of the informer's object collection. This is unrelated to "resync".
//
// Note that this doesn't tell you if an individual handler is synced!!
// For that, please use HasSyncedChecker on the handle returned by
// AddEventHandler.
HasSyncedChecker() DoneChecker
// LastSyncResourceVersion is the resource version observed when last synced with the underlying
// store. The value returned is not synchronized with access to the underlying store and is not
// thread-safe.
@ -247,6 +256,10 @@ type ResourceEventHandlerRegistration interface {
// HasSynced reports if both the parent has synced and all pre-sync
// events have been delivered.
HasSynced() bool
// HasSyncedChecker reports if both the parent has synced and all pre-sync
// events have been delivered.
HasSyncedChecker() DoneChecker
}
// Optional configuration options for [SharedInformer.AddEventHandlerWithOptions].
@ -309,6 +322,7 @@ func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.O
return &sharedIndexInformer{
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers),
processor: processor,
synced: make(chan struct{}),
listerWatcher: lw,
objectType: exampleObject,
objectDescription: options.ObjectDescription,
@ -414,6 +428,107 @@ func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool
return true
}
// WaitFor waits for a set of activities to complete, like cache syncing.
// It returns true if it was successful, false if the context was canceled
// before all activities are completed.
//
// If a non-nil "what" is provided, then progress information is logged
// while waiting ("Waiting", for="<what>").
//
// In contrast to other WaitForCacheSync alternatives, this one here doesn't
// need polling, which makes it react immediately. When used in a synctest unit
// test, waiting completes without moving time forward randomly, which
// makes tests more predictable.
func WaitFor(ctx context.Context, what string, checkers ...DoneChecker) bool {
logger := klog.FromContext(ctx)
if what != "" {
helper, l := logger.WithCallStackHelper()
logger = l
helper()
logger.Info("Waiting", "for", what)
}
// Check in parallel to ensure that we log "Done waiting" as soon
// as possible for each checker. The timing may be useful to know.
// We cannot log inside the goroutine, the stack unwinding wouldn't
// work, so instead each goroutine just notifies the parent
// goroutine when it's checker is done and the main goroutine then
// logs it.
var wg sync.WaitGroup
type result struct {
checker DoneChecker
done bool
}
doneChecker := make(chan result)
for _, checker := range checkers {
wg.Go(func() {
select {
case <-checker.Done():
doneChecker <- result{checker, true}
case <-ctx.Done():
// We can end up here even when the checker is already done,
// select is not deterministic. Check once more without blocking
// before finally giving up.
select {
case <-checker.Done():
doneChecker <- result{checker, true}
default:
doneChecker <- result{checker, false}
}
}
})
}
instances := make([]string, 0, len(checkers))
for range len(checkers) {
// We are guaranteed to get exactly one result from each goroutine, so this won't block forever.
result := <-doneChecker
if result.done {
if what != "" {
logger.Info("Done waiting", "for", what, "instance", result.checker.Name())
}
} else {
// We don't need this information unless we are a) logging or b) debugging interactively.
instances = append(instances, result.checker.Name())
}
}
wg.Wait()
if what != "" && len(instances) > 0 {
slices.Sort(instances)
logger.Info("Timed out waiting", "for", what, "cause", context.Cause(ctx), "instances", instances)
}
done := len(instances) == 0
return done
}
// DoneChecker, in contrast to [InformerSynced], supports waiting
// for some activity to finish without polling and has a name
// that describes itself.
//
// To check for completion without blocking, use [IsDone].
type DoneChecker interface {
// Name returns a string describing the entity that is being waited for.
//
// Note that this name might be computed, so callers should only
// get the name outside of a hot code path.
Name() string
// Done returns a channel that will be closed on completion
// of the activity.
Done() <-chan struct{}
}
// IsDone returns true if the activity is done, false otherwise.
func IsDone(checker DoneChecker) bool {
select {
case <-checker.Done():
return true
default:
return false
}
}
// `*sharedIndexInformer` implements SharedIndexInformer and has three
// main components. One is an indexed local cache, `indexer Indexer`.
// The second main component is a Controller that pulls
@ -431,6 +546,10 @@ type sharedIndexInformer struct {
indexer Indexer
controller Controller
// synced gets created when creating the sharedIndexInformer.
// It gets closed when Run detects that the processor created
synced chan struct{}
processor *sharedProcessor
cacheMutationDetector MutationDetector
@ -494,6 +613,10 @@ func (v *dummyController) HasSynced() bool {
return v.informer.HasSynced()
}
func (v *dummyController) HasSyncedChecker() DoneChecker {
return v.informer.HasSyncedChecker()
}
func (v *dummyController) LastSyncResourceVersion() string {
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InformerResourceVersion) {
return v.informer.LastSyncResourceVersion()
@ -563,7 +686,7 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) {
s.startedLock.Lock()
defer s.startedLock.Unlock()
fifo := newQueueFIFO(logger, s.indexer, s.transform, s.identifier, s.fifoMetricsProvider)
logger, fifo := newQueueFIFO(logger, s.objectType, s.indexer, s.transform, s.identifier, s.fifoMetricsProvider)
cfg := &Config{
Queue: fifo,
@ -573,8 +696,12 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) {
FullResyncPeriod: s.resyncCheckPeriod,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
ProcessBatch: s.HandleBatchDeltas,
Process: func(obj interface{}, isInInitialList bool) error {
return s.handleDeltas(logger, obj, isInInitialList)
},
ProcessBatch: func(deltas []Delta, isInInitialList bool) error {
return s.handleBatchDeltas(logger, deltas, isInInitialList)
},
WatchErrorHandlerWithContext: s.watchErrorHandler,
}
@ -594,6 +721,15 @@ func (s *sharedIndexInformer) RunWithContext(ctx context.Context) {
// has a RunWithContext method that we can use here.
wg.StartWithChannel(processorStopCtx.Done(), s.cacheMutationDetector.Run)
wg.StartWithContext(processorStopCtx, s.processor.run)
wg.Start(func() {
select {
case <-ctx.Done():
// We were stopped without completing the sync.
case <-s.controller.HasSyncedChecker().Done():
// Controller has synced and thus so have we.
close(s.synced)
}
})
defer func() {
s.startedLock.Lock()
@ -610,13 +746,31 @@ func (s *sharedIndexInformer) HasStarted() bool {
}
func (s *sharedIndexInformer) HasSynced() bool {
s.startedLock.Lock()
defer s.startedLock.Unlock()
if s.controller == nil {
select {
case <-s.synced:
return true
default:
return false
}
return s.controller.HasSynced()
}
func (s *sharedIndexInformer) HasSyncedChecker() DoneChecker {
return &sharedIndexInformerDone{
s: s,
}
}
// sharedIndexInformerDone implements [NamedCacheSync] for a [sharedIndexInformer].
type sharedIndexInformerDone struct {
s *sharedIndexInformer
}
func (sd *sharedIndexInformerDone) Name() string {
return fmt.Sprintf("SharedIndexInformer %T", sd.s.objectType)
}
func (sd *sharedIndexInformerDone) Done() <-chan struct{} {
return sd.s.synced
}
func (s *sharedIndexInformer) LastSyncResourceVersion() string {
@ -708,7 +862,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithOptions(handler ResourceEventHa
}
}
listener := newProcessListener(logger, handler, resyncPeriod, determineResyncPeriod(logger, resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSynced)
listener := newProcessListener(logger, handler, resyncPeriod, determineResyncPeriod(logger, resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSyncedChecker())
if !s.started {
return s.processor.addListener(listener), nil
@ -737,20 +891,20 @@ func (s *sharedIndexInformer) AddEventHandlerWithOptions(handler ResourceEventHa
return handle, nil
}
func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool) error {
func (s *sharedIndexInformer) handleDeltas(logger klog.Logger, obj interface{}, isInInitialList bool) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
if deltas, ok := obj.(Deltas); ok {
return processDeltas(s, s.indexer, deltas, isInInitialList, s.keyFunc)
return processDeltas(logger, s, s.indexer, deltas, isInInitialList, s.keyFunc)
}
return errors.New("object given as Process argument is not Deltas")
}
func (s *sharedIndexInformer) HandleBatchDeltas(deltas []Delta, isInInitialList bool) error {
func (s *sharedIndexInformer) handleBatchDeltas(logger klog.Logger, deltas []Delta, isInInitialList bool) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
return processDeltasInBatch(s, s.indexer, deltas, isInInitialList, s.keyFunc)
return processDeltasInBatch(logger, s, s.indexer, deltas, isInInitialList, s.keyFunc)
}
// Conforms to ResourceEventHandler
@ -854,6 +1008,7 @@ func (p *sharedProcessor) addListener(listener *processorListener) ResourceEvent
p.listeners[listener] = true
if p.listenersStarted {
p.wg.Start(listener.watchSynced)
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
@ -925,6 +1080,7 @@ func (p *sharedProcessor) run(ctx context.Context) {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
for listener := range p.listeners {
p.wg.Start(listener.watchSynced)
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
@ -986,7 +1142,7 @@ func (p *sharedProcessor) resyncCheckPeriodChanged(logger klog.Logger, resyncChe
}
// processorListener relays notifications from a sharedProcessor to
// one ResourceEventHandler --- using two goroutines, two unbuffered
// one ResourceEventHandler --- using three goroutines, two unbuffered
// channels, and an unbounded ring buffer. The `add(notification)`
// function sends the given notification to `addCh`. One goroutine
// runs `pop()`, which pumps notifications from `addCh` to `nextCh`
@ -994,16 +1150,23 @@ func (p *sharedProcessor) resyncCheckPeriodChanged(logger klog.Logger, resyncChe
// Another goroutine runs `run()`, which receives notifications from
// `nextCh` and synchronously invokes the appropriate handler method.
//
// The third goroutine watches the upstream "has synced" channel
// and notifies a SingleFileTracker instance. That instance then
// combines the upstream state and the processListener state to
// implement the overall "event handler has synced".
//
// processorListener also keeps track of the adjusted requested resync
// period of the listener.
type processorListener struct {
logger klog.Logger
nextCh chan interface{}
addCh chan interface{}
done chan struct{}
handler ResourceEventHandler
syncTracker *synctrack.SingleFileTracker
syncTracker *synctrack.SingleFileTracker
upstreamHasSynced DoneChecker
// pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
// There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
@ -1041,13 +1204,21 @@ func (p *processorListener) HasSynced() bool {
return p.syncTracker.HasSynced()
}
func newProcessListener(logger klog.Logger, handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced func() bool) *processorListener {
// HasNamedSync is done if the source informer has synced, and all
// corresponding events have been delivered.
func (p *processorListener) HasSyncedChecker() DoneChecker {
return p.syncTracker
}
func newProcessListener(logger klog.Logger, handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced DoneChecker) *processorListener {
ret := &processorListener{
logger: logger,
nextCh: make(chan interface{}),
addCh: make(chan interface{}),
done: make(chan struct{}),
upstreamHasSynced: hasSynced,
handler: handler,
syncTracker: &synctrack.SingleFileTracker{UpstreamHasSynced: hasSynced},
syncTracker: synctrack.NewSingleFileTracker(fmt.Sprintf("%s + event handler %s", hasSynced.Name(), nameForHandler(handler))),
pendingNotifications: *buffer.NewRingGrowing(bufferSize),
requestedResyncPeriod: requestedResyncPeriod,
resyncPeriod: resyncPeriod,
@ -1068,6 +1239,7 @@ func (p *processorListener) add(notification interface{}) {
func (p *processorListener) pop() {
defer utilruntime.HandleCrashWithLogger(p.logger)
defer close(p.nextCh) // Tell .run() to stop
defer close(p.done) // Tell .watchSynced() to stop
var nextCh chan<- interface{}
var notification interface{}
@ -1131,6 +1303,16 @@ func (p *processorListener) run() {
}
}
func (p *processorListener) watchSynced() {
select {
case <-p.upstreamHasSynced.Done():
// Notify tracker that the upstream has synced.
p.syncTracker.UpstreamHasSynced()
case <-p.done:
// Give up waiting for sync.
}
}
// shouldResync determines if the listener needs a resync. If the listener's resyncPeriod is 0,
// this always returns false.
func (p *processorListener) shouldResync(now time.Time) bool {

View file

@ -1134,8 +1134,13 @@ func TestAddWhileActive(t *testing.T) {
return
}
if !handle1.HasSynced() {
t.Error("Not synced after Run??")
select {
case <-handle1.HasSyncedChecker().Done():
if !handle1.HasSynced() {
t.Error("Not synced after channel said we are synced??")
}
case <-time.After(10 * time.Second):
t.Error("Not synced 10 seconds after Run??")
}
listener2.lock.Lock() // ensure we observe it before it has synced

View file

@ -20,6 +20,7 @@ limitations under the License.
package synctrack
import (
"context"
"sync"
"sync/atomic"
@ -27,11 +28,32 @@ import (
)
// AsyncTracker helps propagate HasSynced in the face of multiple worker threads.
// The user has to monitor the upstream "has synced"
// and notify the tracker when that changes from false to true.
type AsyncTracker[T comparable] struct {
UpstreamHasSynced func() bool
// name describes the instance.
name string
// upstreamHasSynced is changed from false (initial value) to true
// when UpstreamHasSynced is called.
upstreamHasSynced atomic.Bool
lock sync.Mutex
waiting sets.Set[T]
// synced gets canceled once both the tracker and upstream are synced.
// A context is convenient for this because it gives us a channel
// and handles thread-safety.
synced context.Context
cancel func()
}
func NewAsyncTracker[T comparable](name string) *AsyncTracker[T] {
t := &AsyncTracker[T]{
name: name,
}
t.synced, t.cancel = context.WithCancel(context.Background())
return t
}
// Start should be called prior to processing each key which is part of the
@ -57,6 +79,28 @@ func (t *AsyncTracker[T]) Finished(key T) {
if t.waiting != nil {
t.waiting.Delete(key)
}
// Maybe synced now?
if t.upstreamHasSynced.Load() && len(t.waiting) == 0 {
// Mark as synced.
t.cancel()
}
}
// UpstreamHasSynced needs to be called at least once as soon as
// the upstream "has synced" becomes true. It tells AsyncTracker
// that the source is synced.
//
// Must be called after handing over the initial list to Start.
func (t *AsyncTracker[T]) UpstreamHasSynced() {
// Upstream is done, but we might not be yet.
t.upstreamHasSynced.Store(true)
t.lock.Lock()
defer t.lock.Unlock()
if len(t.waiting) == 0 {
// Mark as synced.
t.cancel()
}
}
// HasSynced returns true if the source is synced and every key present in the
@ -64,27 +108,51 @@ func (t *AsyncTracker[T]) Finished(key T) {
// itself synced until *after* it has delivered the notification for the last
// key, and that notification handler must have called Start.
func (t *AsyncTracker[T]) HasSynced() bool {
// Call UpstreamHasSynced first: it might take a lock, which might take
// a significant amount of time, and we can't hold our lock while
// waiting on that or a user is likely to get a deadlock.
if !t.UpstreamHasSynced() {
return false
}
t.lock.Lock()
defer t.lock.Unlock()
return t.waiting.Len() == 0
return t.synced.Err() != nil
}
// Done returns a channel that is closed if the source is synced and every key present in the
// initial list has been processed. This relies on the source not considering
// itself synced until *after* it has delivered the notification for the last
// key, and that notification handler must have called Start.
func (t *AsyncTracker[T]) Done() <-chan struct{} {
return t.synced.Done()
}
func (t *AsyncTracker[T]) Name() string {
return t.name
}
// SingleFileTracker helps propagate HasSynced when events are processed in
// order (i.e. via a queue).
// order (i.e. via a queue). The user has to monitor the upstream "has synced"
// and notify the tracker when that changes from false to true.
type SingleFileTracker struct {
// name describes the instance.
name string
// Important: count is used with atomic operations so it must be 64-bit
// aligned, otherwise atomic operations will panic. Having it at the top of
// the struct will guarantee that, even on 32-bit arches.
// See https://pkg.go.dev/sync/atomic#pkg-note-BUG for more information.
count int64
UpstreamHasSynced func() bool
// upstreamHasSynced is changed from false (initial value) to true
// when UpstreamHasSynced is called.
upstreamHasSynced atomic.Bool
// synced gets canceled once both the tracker and upstream are synced.
// A context is convenient for this because it gives us a channel
// and handles thread-safety.
synced context.Context
cancel func()
}
func NewSingleFileTracker(name string) *SingleFileTracker {
t := &SingleFileTracker{
name: name,
}
t.synced, t.cancel = context.WithCancel(context.Background())
return t
}
// Start should be called prior to processing each key which is part of the
@ -103,6 +171,26 @@ func (t *SingleFileTracker) Finished() {
if result < 0 {
panic("synctrack: negative counter; this logic error means HasSynced may return incorrect value")
}
// Maybe synced now?
if result == 0 && t.upstreamHasSynced.Load() {
// Mark as synced.
t.cancel()
}
}
// UpstreamHasSynced needs to be called at least once as soon as
// the upstream "has synced" becomes true. It tells SingleFileTracker
// that the source is synced.
//
// Must be called after handing over the initial list to Start.
func (t *SingleFileTracker) UpstreamHasSynced() {
// Upstream is done, but we might not be yet.
t.upstreamHasSynced.Store(true)
if atomic.LoadInt64(&t.count) == 0 {
// Mark as synced.
t.cancel()
}
}
// HasSynced returns true if the source is synced and every key present in the
@ -110,11 +198,17 @@ func (t *SingleFileTracker) Finished() {
// itself synced until *after* it has delivered the notification for the last
// key, and that notification handler must have called Start.
func (t *SingleFileTracker) HasSynced() bool {
// Call UpstreamHasSynced first: it might take a lock, which might take
// a significant amount of time, and we don't want to then act on a
// stale count value.
if !t.UpstreamHasSynced() {
return false
}
return atomic.LoadInt64(&t.count) <= 0
return t.synced.Err() != nil
}
// Done returns a channel that is closed if the source is synced and every key present in the
// initial list has been processed. This relies on the source not considering
// itself synced until *after* it has delivered the notification for the last
// key, and that notification handler must have called Start.
func (t *SingleFileTracker) Done() <-chan struct{} {
return t.synced.Done()
}
func (t *SingleFileTracker) Name() string {
return t.name
}

View file

@ -19,29 +19,24 @@ package synctrack
import (
"strings"
"sync"
"time"
"testing"
)
func testSingleFileFuncs(upstreamHasSynced func() bool) (start func(), finished func(), hasSynced func() bool) {
tracker := SingleFileTracker{
UpstreamHasSynced: upstreamHasSynced,
}
return tracker.Start, tracker.Finished, tracker.HasSynced
func testSingleFileFuncs() (upstreamHasSynced func(), start func(), finished func(), hasSynced func() bool, synced <-chan struct{}) {
tracker := NewSingleFileTracker("")
return tracker.UpstreamHasSynced, tracker.Start, tracker.Finished, tracker.HasSynced, tracker.Done()
}
func testAsyncFuncs(upstreamHasSynced func() bool) (start func(), finished func(), hasSynced func() bool) {
tracker := AsyncTracker[string]{
UpstreamHasSynced: upstreamHasSynced,
}
return func() { tracker.Start("key") }, func() { tracker.Finished("key") }, tracker.HasSynced
func testAsyncFuncs() (upstreamHasSynced func(), start func(), finished func(), hasSynced func() bool, synced <-chan struct{}) {
tracker := NewAsyncTracker[string]("")
return tracker.UpstreamHasSynced, func() { tracker.Start("key") }, func() { tracker.Finished("key") }, tracker.HasSynced, tracker.Done()
}
func TestBasicLogic(t *testing.T) {
table := []struct {
name string
construct func(func() bool) (func(), func(), func() bool)
construct func() (func(), func(), func(), func() bool, <-chan struct{})
}{
{"SingleFile", testSingleFileFuncs},
{"Async", testAsyncFuncs},
@ -50,27 +45,87 @@ func TestBasicLogic(t *testing.T) {
for _, entry := range table {
t.Run(entry.name, func(t *testing.T) {
table := []struct {
synced bool
synced bool
syncedBeforeFinish bool
start bool
finish bool
expectSynced bool
}{
{false, true, true, false},
{true, true, false, false},
{false, true, false, false},
{true, true, true, true},
{false, false, true, true, false},
{true, false, true, false, false},
{true, true, true, false, false},
{false, false, true, false, false},
{true, false, true, true, true},
{true, true, true, true, true},
}
for _, tt := range table {
Start, Finished, HasSynced := entry.construct(func() bool { return tt.synced })
upstreamHasSynced, start, finished, hasSynced, synced := entry.construct()
syncedDone := func() bool {
select {
case <-synced:
return true
default:
return false
}
}
if hasSynced() {
t.Errorf("for %#v got HasSynced() true before start (wanted false)", tt)
}
if syncedDone() {
t.Errorf("for %#v got Done() true before start (wanted false)", tt)
}
if tt.start {
Start()
start()
}
if hasSynced() {
t.Errorf("for %#v got HasSynced() true after start (wanted false)", tt)
}
if syncedDone() {
t.Errorf("for %#v got Done() true after start (wanted false)", tt)
}
// "upstream has synced" may occur before or after finished, but not before start.
if tt.synced && tt.syncedBeforeFinish {
upstreamHasSynced()
if hasSynced() {
t.Errorf("for %#v got HasSynced() true after upstreamHasSynced and before finish (wanted false)", tt)
}
if syncedDone() {
t.Errorf("for %#v got Done() true after upstreamHasSynced and before finish (wanted false)", tt)
}
}
if tt.finish {
Finished()
finished()
}
got := HasSynced()
if e, a := tt.expectSynced, got; e != a {
t.Errorf("for %#v got %v (wanted %v)", tt, a, e)
if tt.synced && !tt.syncedBeforeFinish {
if hasSynced() {
t.Errorf("for %#v got HasSynced() true after finish and before upstreamHasSynced (wanted false)", tt)
}
if syncedDone() {
t.Errorf("for %#v got Done() true after finish and before upstreamHasSynced (wanted false)", tt)
}
upstreamHasSynced()
}
if e, a := tt.expectSynced, hasSynced(); e != a {
t.Errorf("for %#v got HasSynced() %v (wanted %v)", tt, a, e)
}
if e, a := tt.expectSynced, syncedDone(); e != a {
t.Errorf("for %#v got Done() %v (wanted %v)", tt, a, e)
}
select {
case <-synced:
if !tt.expectSynced {
t.Errorf("for %#v got done (wanted not done)", tt)
}
default:
if tt.expectSynced {
t.Errorf("for %#v got done (wanted not done)", tt)
}
}
}
})
@ -78,7 +133,7 @@ func TestBasicLogic(t *testing.T) {
}
func TestAsyncLocking(t *testing.T) {
aft := AsyncTracker[int]{UpstreamHasSynced: func() bool { return true }}
aft := NewAsyncTracker[int]("")
var wg sync.WaitGroup
for _, i := range []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10} {
@ -92,6 +147,7 @@ func TestAsyncLocking(t *testing.T) {
}(i)
}
wg.Wait()
aft.UpstreamHasSynced()
if !aft.HasSynced() {
t.Errorf("async tracker must have made a threading error?")
}
@ -99,7 +155,7 @@ func TestAsyncLocking(t *testing.T) {
}
func TestSingleFileCounting(t *testing.T) {
sft := SingleFileTracker{UpstreamHasSynced: func() bool { return true }}
sft := NewSingleFileTracker("")
for i := 0; i < 100; i++ {
sft.Start()
@ -115,6 +171,8 @@ func TestSingleFileCounting(t *testing.T) {
}
sft.Finished()
sft.UpstreamHasSynced()
if !sft.HasSynced() {
t.Fatal("Unexpectedly not synced?")
}
@ -148,24 +206,35 @@ func TestSingleFileCounting(t *testing.T) {
func TestSingleFile(t *testing.T) {
table := []struct {
synced bool
synced bool
syncedBeforeStops bool
starts int
stops int
expectSynced bool
}{
{false, 1, 1, false},
{true, 1, 0, false},
{false, 1, 0, false},
{true, 1, 1, true},
{false, false, 1, 1, false},
{true, false, 1, 0, false},
{true, true, 1, 0, false},
{false, false, 1, 0, false},
{true, false, 1, 1, true},
{true, true, 1, 1, true},
}
for _, tt := range table {
sft := SingleFileTracker{UpstreamHasSynced: func() bool { return tt.synced }}
sft := NewSingleFileTracker("")
for i := 0; i < tt.starts; i++ {
sft.Start()
}
// "upstream has synced" may occur before or after finished, but not before start.
if tt.synced && tt.syncedBeforeStops {
sft.UpstreamHasSynced()
}
for i := 0; i < tt.stops; i++ {
sft.Finished()
}
if tt.synced && !tt.syncedBeforeStops {
sft.UpstreamHasSynced()
}
got := sft.HasSynced()
if e, a := tt.expectSynced, got; e != a {
t.Errorf("for %#v got %v (wanted %v)", tt, a, e)
@ -173,67 +242,3 @@ func TestSingleFile(t *testing.T) {
}
}
func TestNoStaleValue(t *testing.T) {
table := []struct {
name string
construct func(func() bool) (func(), func(), func() bool)
}{
{"SingleFile", testSingleFileFuncs},
{"Async", testAsyncFuncs},
}
for _, entry := range table {
t.Run(entry.name, func(t *testing.T) {
var lock sync.Mutex
upstreamHasSynced := func() bool {
lock.Lock()
defer lock.Unlock()
return true
}
Start, Finished, HasSynced := entry.construct(upstreamHasSynced)
// Ordinarily the corresponding lock would be held and you wouldn't be
// able to call this function at this point.
if !HasSynced() {
t.Fatal("Unexpectedly not synced??")
}
Start()
if HasSynced() {
t.Fatal("Unexpectedly synced??")
}
Finished()
if !HasSynced() {
t.Fatal("Unexpectedly not synced??")
}
// Now we will prove that if the lock is held, you can't get a false
// HasSynced return.
lock.Lock()
// This goroutine calls HasSynced
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if HasSynced() {
t.Error("Unexpectedly synced??")
}
}()
// This goroutine increments + unlocks. The sleep is to bias the
// runtime such that the other goroutine usually wins (it needs to work
// in both orderings, this one is more likely to be buggy).
go func() {
time.Sleep(time.Millisecond)
Start()
lock.Unlock()
}()
wg.Wait()
})
}
}

View file

@ -34,6 +34,7 @@ type RealFIFOOptions struct {
Logger *klog.Logger
// Name can be used to override the default "RealFIFO" name for the new instance.
// Optional. Used only if Identifier.Name returns an empty string.
Name string
// KeyFunction is used to figure out what key an object should have. (It's
@ -98,6 +99,11 @@ type RealFIFO struct {
items []Delta
// synced is initially an open channel. It gets closed (once!) by checkSynced_locked
// as soon as the initial sync is considered complete.
synced chan struct{}
syncedClosed bool
// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update was called first.
populated bool
@ -160,6 +166,7 @@ type SyncAllInfo struct{}
var (
_ = Queue(&RealFIFO{}) // RealFIFO is a Queue
_ = TransformingStore(&RealFIFO{}) // RealFIFO implements TransformingStore to allow memory optimizations
_ = DoneChecker(&RealFIFO{}) // RealFIFO and implements DoneChecker.
)
// Close the queue.
@ -196,11 +203,37 @@ func (f *RealFIFO) HasSynced() bool {
return f.hasSynced_locked()
}
// ignoring lint to reduce delta to the original for review. It's ok adjust later.
//
//lint:file-ignore ST1003: should not use underscores in Go names
// HasSyncedChecker is done if an Add/Update/Delete/AddIfNotPresent are called first,
// or the first batch of items inserted by Replace() has been popped.
func (f *RealFIFO) HasSyncedChecker() DoneChecker {
return f
}
// Name implements [DoneChecker.Name]
func (f *RealFIFO) Name() string {
return f.name
}
// Done implements [DoneChecker.Done]
func (f *RealFIFO) Done() <-chan struct{} {
return f.synced
}
// hasSynced_locked returns the result of a prior checkSynced_locked call.
func (f *RealFIFO) hasSynced_locked() bool {
return f.populated && f.initialPopulationCount == 0
return f.syncedClosed
}
// checkSynced_locked checks whether the initial batch of items (set via Replace) has been delivered
// and closes the synced channel as needed. It must be called after changing f.populated and/or
// f.initialPopulationCount while the mutex is still locked.
func (f *RealFIFO) checkSynced_locked() {
synced := f.populated && f.initialPopulationCount == 0
if synced && !f.syncedClosed {
// Initial sync is complete.
f.syncedClosed = true
close(f.synced)
}
}
// addToItems_locked appends to the delta list.
@ -291,6 +324,7 @@ func (f *RealFIFO) Add(obj interface{}) error {
defer f.lock.Unlock()
f.populated = true
f.checkSynced_locked()
retErr := f.addToItems_locked(Added, false, obj)
return retErr
@ -302,6 +336,7 @@ func (f *RealFIFO) Update(obj interface{}) error {
defer f.lock.Unlock()
f.populated = true
f.checkSynced_locked()
retErr := f.addToItems_locked(Updated, false, obj)
return retErr
@ -315,6 +350,7 @@ func (f *RealFIFO) Delete(obj interface{}) error {
defer f.lock.Unlock()
f.populated = true
f.checkSynced_locked()
retErr := f.addToItems_locked(Deleted, false, obj)
return retErr
@ -362,6 +398,7 @@ func (f *RealFIFO) Pop(process PopProcessFunc) (interface{}, error) {
defer func() {
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
f.checkSynced_locked()
}
}()
@ -482,7 +519,6 @@ func (f *RealFIFO) PopBatch(processBatch ProcessBatchFunc, processSingle PopProc
unique.Insert(id)
moveDeltaToProcessList(i)
}
f.items = f.items[len(deltas):]
// Decrement initialPopulationCount if needed.
// This is done in a defer so we only do this *after* processing is complete,
@ -490,6 +526,7 @@ func (f *RealFIFO) PopBatch(processBatch ProcessBatchFunc, processSingle PopProc
defer func() {
if f.initialPopulationCount > 0 {
f.initialPopulationCount -= len(deltas)
f.checkSynced_locked()
}
}()
@ -539,7 +576,7 @@ func (f *RealFIFO) Replace(newItems []interface{}, resourceVersion string) error
if f.emitAtomicEvents {
err = f.addReplaceToItemsLocked(newItems, resourceVersion)
} else {
err = reconcileReplacement(f.items, f.knownObjects, newItems, f.keyOf,
err = reconcileReplacement(f.logger, f.items, f.knownObjects, newItems, f.keyOf,
func(obj DeletedFinalStateUnknown) error {
return f.addToItems_locked(Deleted, true, obj)
},
@ -554,6 +591,7 @@ func (f *RealFIFO) Replace(newItems []interface{}, resourceVersion string) error
if !f.populated {
f.populated = true
f.initialPopulationCount = len(f.items)
f.checkSynced_locked()
}
return nil
@ -563,6 +601,7 @@ func (f *RealFIFO) Replace(newItems []interface{}, resourceVersion string) error
// and based upon the state of the items in the queue and known objects will call onDelete and onReplace
// depending upon whether the item is being deleted or replaced/added.
func reconcileReplacement(
logger klog.Logger,
queuedItems []Delta,
knownObjects KeyListerGetter,
newItems []interface{},
@ -638,10 +677,10 @@ func reconcileReplacement(
deletedObj, exists, err := knownObjects.GetByKey(knownKey)
if err != nil {
deletedObj = nil
utilruntime.HandleErrorWithLogger(klog.TODO(), err, "Error during lookup, placing DeleteFinalStateUnknown marker without object", "key", knownKey)
utilruntime.HandleErrorWithLogger(logger, err, "Error during lookup, placing DeleteFinalStateUnknown marker without object", "key", knownKey)
} else if !exists {
deletedObj = nil
utilruntime.HandleErrorWithLogger(klog.TODO(), nil, "Key does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", "key", knownKey)
utilruntime.HandleErrorWithLogger(logger, nil, "Key does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", "key", knownKey)
}
retErr := onDelete(DeletedFinalStateUnknown{
Key: knownKey,
@ -757,6 +796,7 @@ func NewRealFIFOWithOptions(opts RealFIFOOptions) *RealFIFO {
logger: klog.Background(),
name: "RealFIFO",
items: make([]Delta, 0, 10),
synced: make(chan struct{}),
keyFunc: opts.KeyFunction,
knownObjects: opts.KnownObjects,
transformer: opts.Transformer,
@ -769,8 +809,11 @@ func NewRealFIFOWithOptions(opts RealFIFOOptions) *RealFIFO {
if opts.Logger != nil {
f.logger = *opts.Logger
}
if opts.Name != "" {
f.name = opts.Name
if name := opts.Name; name != "" {
f.name = name
}
if name := opts.Identifier.Name(); name != "" {
f.name = name
}
f.logger = klog.LoggerWithName(f.logger, f.name)
f.cond.L = &f.lock

View file

@ -0,0 +1,200 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"bytes"
"context"
"errors"
"fmt"
"os"
"runtime"
"sync"
"testing"
"testing/synctest"
"time"
"github.com/stretchr/testify/assert"
"k8s.io/klog/v2"
"k8s.io/klog/v2/textlogger"
)
func init() {
// The test below is sensitive to the time zone, log output uses time.Local.
time.Local = time.UTC
}
func TestWaitFor(t *testing.T) {
for name, tc := range map[string]struct {
what string
checkers []DoneChecker
timeout time.Duration
timeoutReason string
expectDone bool
// Time is predictable and starts at the synctest epoch.
// %[1]d is the pid, %[2]d the line number of the WaitFor call.
expectOutput string
}{
"empty": {
expectDone: true,
},
"no-caches": {
what: "my-caches",
expectDone: true,
expectOutput: `I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Waiting" for="my-caches"
`,
},
"no-logging": {
checkers: []DoneChecker{newMockChecker("first", 10*time.Second), newMockChecker("second", 5*time.Second), newMockChecker("last", 0*time.Second)},
expectDone: true,
},
"with-logging": {
what: "my-caches",
checkers: []DoneChecker{newMockChecker("first", 10*time.Second), newMockChecker("second", 5*time.Second), newMockChecker("last", 0*time.Second)},
expectDone: true,
expectOutput: `I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Waiting" for="my-caches"
I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="last"
I0101 00:00:05.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="second"
I0101 00:00:10.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="first"
`,
},
"some-timeout": {
timeout: time.Minute,
what: "my-caches",
checkers: []DoneChecker{newMockChecker("first", 10*time.Second), newMockChecker("second", -1), newMockChecker("last", 0*time.Second)},
expectDone: false,
expectOutput: `I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Waiting" for="my-caches"
I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="last"
I0101 00:00:10.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="first"
I0101 00:01:00.000000 %7[1]d wait_test.go:%[2]d] "Timed out waiting" for="my-caches" cause="context deadline exceeded" instances=["second"]
`,
},
"some-canceled": {
timeout: -1,
what: "my-caches",
checkers: []DoneChecker{newMockChecker("first", 10*time.Second), newMockChecker("second", -1), newMockChecker("last", 0*time.Second)},
expectDone: false,
expectOutput: `I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Waiting" for="my-caches"
I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="last"
I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Timed out waiting" for="my-caches" cause="context canceled" instances=["first","second"]
`,
},
"more": {
timeoutReason: "go fish",
timeout: 5 * time.Second,
what: "my-caches",
checkers: []DoneChecker{newMockChecker("first", 10*time.Second), newMockChecker("second", -1), newMockChecker("last", 0*time.Second)},
expectDone: false,
expectOutput: `I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Waiting" for="my-caches"
I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Done waiting" for="my-caches" instance="last"
I0101 00:00:05.000000 %7[1]d wait_test.go:%[2]d] "Timed out waiting" for="my-caches" cause="go fish" instances=["first","second"]
`,
},
"all": {
timeout: time.Minute,
what: "my-caches",
checkers: []DoneChecker{newMockChecker("first", -1), newMockChecker("second", -1), newMockChecker("last", -1)},
expectDone: false,
expectOutput: `I0101 00:00:00.000000 %7[1]d wait_test.go:%[2]d] "Waiting" for="my-caches"
I0101 00:01:00.000000 %7[1]d wait_test.go:%[2]d] "Timed out waiting" for="my-caches" cause="context deadline exceeded" instances=["first","last","second"]
`,
},
} {
t.Run(name, func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
var buffer bytes.Buffer
logger := textlogger.NewLogger(textlogger.NewConfig(textlogger.Output(&buffer)))
ctx := klog.NewContext(context.Background(), logger)
var wg sync.WaitGroup
defer wg.Wait()
if tc.timeout != 0 {
switch tc.timeoutReason {
case "":
if tc.timeout > 0 {
c, cancel := context.WithTimeout(ctx, tc.timeout)
defer cancel()
ctx = c
} else {
c, cancel := context.WithCancel(ctx)
cancel()
ctx = c
}
default:
c, cancel := context.WithCancelCause(ctx)
wg.Go(func() {
time.Sleep(tc.timeout)
cancel(errors.New(tc.timeoutReason))
})
ctx = c
}
}
_, _, line, _ := runtime.Caller(0)
done := WaitFor(ctx, tc.what, tc.checkers...)
expectOutput := tc.expectOutput
if expectOutput != "" {
expectOutput = fmt.Sprintf(expectOutput, os.Getpid(), line+1)
}
assert.Equal(t, tc.expectDone, done, "done")
assert.Equal(t, expectOutput, buffer.String(), "output")
})
})
}
}
// newMockChecker can be created outside of a synctest bubble.
// It constructs the channel inside when Done is first called.
func newMockChecker(name string, delay time.Duration) DoneChecker {
return &mockChecker{
name: name,
delay: delay,
}
}
type mockChecker struct {
name string
delay time.Duration
initialized bool
done <-chan struct{}
}
func (m *mockChecker) Name() string { return m.name }
func (m *mockChecker) Done() <-chan struct{} {
if !m.initialized {
switch {
case m.delay > 0:
// In the future.
ctx := context.Background()
// This leaks a cancel, but is hard to avoid (cannot use the parent t.Cleanup, no other way to delay calling it). Doesn't matter in a unit test.
//nolint:govet
ctx, _ = context.WithTimeout(ctx, m.delay)
m.done = ctx.Done()
case m.delay == 0:
// Immediately.
c := make(chan struct{})
close(c)
m.done = c
default:
// Never.
c := make(chan struct{})
m.done = c
}
m.initialized = true
}
return m.done
}

View file

@ -71,8 +71,16 @@ type Tracker struct {
// may be overridden in tests.
handleError func(context.Context, error, string, ...any)
// wg and cancel track resp. kill goroutines.
wg sync.WaitGroup
cancel func(error)
// Synchronizes updates to these fields related to event handlers.
rwMutex sync.RWMutex
// synced gets closed once all the tracker's event handlers are synced.
synced chan struct{}
// All registered event handlers.
eventHandlers []cache.ResourceEventHandler
// The eventQueue contains functions which deliver an event to one
@ -155,6 +163,8 @@ func newTracker(ctx context.Context, opts Options) (finalT *Tracker, finalErr er
deviceClasses: opts.ClassInformer.Informer(),
patchedResourceSlices: cache.NewStore(cache.MetaNamespaceKeyFunc),
handleError: utilruntime.HandleErrorWithContext,
synced: make(chan struct{}),
cancel: func(error) {}, // Real function set in initInformers.
eventQueue: *buffer.NewRing[func()](buffer.RingOptions{InitialSize: 0, NormalSize: 4}),
}
defer func() {
@ -212,6 +222,22 @@ func (t *Tracker) initInformers(ctx context.Context) error {
return fmt.Errorf("add event handler for DeviceClasses: %w", err)
}
// This usually short-lived goroutines monitors our upstream event handlers and
// closes our own synced channel when they are synced.
monitorCtx, cancel := context.WithCancelCause(ctx)
t.cancel = cancel
t.wg.Go(func() {
for _, handle := range []cache.ResourceEventHandlerRegistration{t.resourceSlicesHandle, t.deviceTaintsHandle, t.deviceClassesHandle} {
select {
case <-handle.HasSyncedChecker().Done():
case <-monitorCtx.Done():
// Abort without closing our synced channel.
return
}
}
close(t.synced)
})
return nil
}
@ -220,21 +246,28 @@ func (t *Tracker) initInformers(ctx context.Context) error {
// point is possible and will emit events with up-to-date ResourceSlice
// objects.
func (t *Tracker) HasSynced() bool {
select {
case <-t.HasSyncedChecker().Done():
return true
default:
return false
}
}
func (t *Tracker) HasSyncedChecker() cache.DoneChecker {
if !t.enableDeviceTaintRules {
return t.resourceSlices.HasSynced()
return t.resourceSlices.HasSyncedChecker()
}
if t.resourceSlicesHandle != nil && !t.resourceSlicesHandle.HasSynced() {
return false
}
if t.deviceTaintsHandle != nil && !t.deviceTaintsHandle.HasSynced() {
return false
}
if t.deviceClassesHandle != nil && !t.deviceClassesHandle.HasSynced() {
return false
}
return trackerHasSynced{t}
}
return true
type trackerHasSynced struct{ t *Tracker }
func (s trackerHasSynced) Name() string { return "ResourceSlice tracker" }
func (s trackerHasSynced) Done() <-chan struct{} {
return s.t.synced
}
// Stop ends all background activity and blocks until that shutdown is complete.
@ -243,12 +276,16 @@ func (t *Tracker) Stop() {
return
}
t.cancel(errors.New("stopped"))
if t.broadcaster != nil {
t.broadcaster.Shutdown()
}
_ = t.resourceSlices.RemoveEventHandler(t.resourceSlicesHandle)
_ = t.deviceTaints.RemoveEventHandler(t.deviceTaintsHandle)
_ = t.deviceClasses.RemoveEventHandler(t.deviceClassesHandle)
t.wg.Wait()
}
// ListPatchedResourceSlices returns all ResourceSlices in the cluster with