diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go index bf58fe67829..5c4413aa006 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "sync" + "sync/atomic" "time" "k8s.io/apimachinery/pkg/api/meta" @@ -302,9 +303,12 @@ func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defa func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.Object, options SharedIndexInformerOptions) SharedIndexInformer { realClock := &clock.RealClock{} + processor := &sharedProcessor{clock: realClock} + processor.listenersRCond = sync.NewCond(processor.listenersLock.RLocker()) + return &sharedIndexInformer{ indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers), - processor: &sharedProcessor{clock: realClock}, + processor: processor, listerWatcher: lw, objectType: exampleObject, objectDescription: options.ObjectDescription, @@ -799,6 +803,7 @@ func (s *sharedIndexInformer) RemoveEventHandler(handle ResourceEventHandlerRegi type sharedProcessor struct { listenersStarted bool listenersLock sync.RWMutex + listenersRCond *sync.Cond // Caller of Wait must hold a read lock on listenersLock. // Map from listeners to whether or not they are currently syncing listeners map[*processorListener]bool clock clock.Clock @@ -868,6 +873,14 @@ func (p *sharedProcessor) distribute(obj interface{}, sync bool) { p.listenersLock.RLock() defer p.listenersLock.RUnlock() + // Before we start blocking on writes to the listeners' channels, + // ensure that they all have been started. If the processor stops, + // p.listeners gets cleared, in which case we also continue here + // and return without doing anything. + for !p.listenersStarted && len(p.listeners) > 0 { + p.listenersRCond.Wait() + } + for listener, isSyncing := range p.listeners { switch { case !sync: @@ -882,15 +895,25 @@ func (p *sharedProcessor) distribute(obj interface{}, sync bool) { } } +// sharedProcessorRunDelay is used in a synctest bubble to achieve a certain ordering of +// steps in different goroutines. +var sharedProcessorRunDelay atomic.Pointer[time.Duration] + func (p *sharedProcessor) run(ctx context.Context) { func() { - p.listenersLock.RLock() - defer p.listenersLock.RUnlock() + delay := sharedProcessorRunDelay.Load() + if delay != nil { + time.Sleep(*delay) + } + // Changing listenersStarted needs a write lock. + p.listenersLock.Lock() + defer p.listenersLock.Unlock() for listener := range p.listeners { p.wg.Start(listener.run) p.wg.Start(listener.pop) } p.listenersStarted = true + p.listenersRCond.Signal() }() <-ctx.Done() @@ -907,6 +930,9 @@ func (p *sharedProcessor) run(ctx context.Context) { // Reset to false since no listeners are running p.listenersStarted = false + // Wake up sharedProcessor.distribute. + p.listenersRCond.Signal() + p.wg.Wait() // Wait for all .pop() and .run() to stop } diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go index 88713ff6f8c..ee6ba21b359 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go @@ -25,6 +25,7 @@ import ( "strings" "sync" "testing" + "testing/synctest" "time" "github.com/google/go-cmp/cmp" @@ -46,6 +47,7 @@ import ( ) type testListener struct { + printlnFunc func(string) // Some, but not all tests use this for per-test output. lock sync.RWMutex resyncPeriod time.Duration expectedItemNames sets.Set[string] @@ -73,9 +75,18 @@ func (l *testListener) OnUpdate(old, new interface{}) { func (l *testListener) OnDelete(obj interface{}) { } +func (l *testListener) println(msg string) { + msg = l.name + ": " + msg + if l.printlnFunc != nil { + l.printlnFunc(msg) + } else { + fmt.Println(msg) + } +} + func (l *testListener) handle(obj interface{}) { key, _ := MetaNamespaceKeyFunc(obj) - fmt.Printf("%s: handle: %v\n", l.name, key) + l.println(fmt.Sprintf("handle: %v", key)) l.lock.Lock() defer l.lock.Unlock() objectMeta, _ := meta.Accessor(obj) @@ -83,7 +94,7 @@ func (l *testListener) handle(obj interface{}) { } func (l *testListener) ok() bool { - fmt.Println("polling") + l.println("polling") err := wait.PollImmediate(100*time.Millisecond, 2*time.Second, func() (bool, error) { if l.satisfiedExpectations() { return true, nil @@ -95,9 +106,9 @@ func (l *testListener) ok() bool { } // wait just a bit to allow any unexpected stragglers to come in - fmt.Println("sleeping") + l.println("sleeping") time.Sleep(1 * time.Second) - fmt.Println("final check") + l.println("final check") return l.satisfiedExpectations() } @@ -108,6 +119,12 @@ func (l *testListener) satisfiedExpectations() bool { return sets.New(l.receivedItemNames...).Equal(l.expectedItemNames) } +func (l *testListener) reset() { + l.lock.Lock() + defer l.lock.Unlock() + l.receivedItemNames = nil +} + func eventHandlerCount(i SharedInformer) int { s := i.(*sharedIndexInformer) s.startedLock.Lock() @@ -208,6 +225,29 @@ func TestIndexer(t *testing.T) { } func TestListenerResyncPeriods(t *testing.T) { + synctest.Test(t, func(t *testing.T) { testListenerResyncPeriods(t, 0) }) +} + +func TestListenerResyncPeriodsDelayed(t *testing.T) { + // sharedProcessor used to deadlock when sharedProcessor.run happened + // to be reached after the first sharedProcessor.distribute. + // This artififical delay simulates that situation. + // + // Must not run in parallel to other tests, but it doesn't need to: + // it doesn't sleep in real-world time and therefore completes quickly. + synctest.Test(t, func(t *testing.T) { testListenerResyncPeriods(t, time.Second) }) +} + +func testListenerResyncPeriods(t *testing.T, startupDelay time.Duration) { + start := time.Now() + println := func(msg string) { + delta := time.Since(start) + t.Logf("%s: %s", delta, msg) + } + + sharedProcessorRunDelay.Store(&startupDelay) + defer sharedProcessorRunDelay.Store(nil) + // source simulates an apiserver object endpoint. source := newFakeControllerSource(t) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) @@ -216,20 +256,19 @@ func TestListenerResyncPeriods(t *testing.T) { // create the shared informer and resync every 1s informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) - clock := testingclock.NewFakeClock(time.Now()) - informer.clock = clock - informer.processor.clock = clock - // listener 1, never resync listener1 := newTestListener("listener1", 0, "pod1", "pod2") + listener1.printlnFunc = println informer.AddEventHandlerWithResyncPeriod(listener1, listener1.resyncPeriod) // listener 2, resync every 2s listener2 := newTestListener("listener2", 2*time.Second, "pod1", "pod2") + listener2.printlnFunc = println informer.AddEventHandlerWithResyncPeriod(listener2, listener2.resyncPeriod) // listener 3, resync every 3s listener3 := newTestListener("listener3", 3*time.Second, "pod1", "pod2") + listener3.printlnFunc = println informer.AddEventHandlerWithResyncPeriod(listener3, listener3.resyncPeriod) listeners := []*testListener{listener1, listener2, listener3} @@ -241,59 +280,65 @@ func TestListenerResyncPeriods(t *testing.T) { wg.Wait() }() - // ensure all listeners got the initial List + // We must potentially advance time to unblock sharedProcess.run, + // otherwise synctest.Wait() below returns without anything + // being done. + time.Sleep(startupDelay) + + // Ensure all listeners got the initial after the initial processing, at the initial start time. + // + // synctest.Wait doesn't detect deadlocks involving mutexes: they are considered not durably + // blocking, so in case of such a deadlock it'll just keep waiting until the entire unit test + // times out. The backtraces then show the deadlock. + synctest.Wait() for _, listener := range listeners { - if !listener.ok() { - t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames) + if !listener.satisfiedExpectations() { + t.Errorf("%s: %s: expected %v, got %v", time.Since(start), listener.name, listener.expectedItemNames, listener.receivedItemNames) } } // reset for _, listener := range listeners { - listener.receivedItemNames = []string{} + listener.reset() } // advance so listener2 gets a resync - clock.Step(2 * time.Second) + time.Sleep(2*time.Second - startupDelay) + synctest.Wait() // make sure listener2 got the resync - if !listener2.ok() { - t.Errorf("%s: expected %v, got %v", listener2.name, listener2.expectedItemNames, listener2.receivedItemNames) + if !listener2.satisfiedExpectations() { + t.Errorf("%s: %s: expected %v, got %v", time.Since(start), listener2.name, listener2.expectedItemNames, listener2.receivedItemNames) } - // wait a bit to give errant items a chance to go to 1 and 3 - time.Sleep(1 * time.Second) - // make sure listeners 1 and 3 got nothing if len(listener1.receivedItemNames) != 0 { - t.Errorf("listener1: should not have resynced (got %d)", len(listener1.receivedItemNames)) + t.Errorf("%s: listener1: should not have resynced (got %d)", time.Since(start), len(listener1.receivedItemNames)) } if len(listener3.receivedItemNames) != 0 { - t.Errorf("listener3: should not have resynced (got %d)", len(listener3.receivedItemNames)) + t.Errorf("%s: listener3: should not have resynced (got %d)", time.Since(start), len(listener3.receivedItemNames)) } // reset for _, listener := range listeners { - listener.receivedItemNames = []string{} + listener.reset() } // advance so listener3 gets a resync - clock.Step(1 * time.Second) + time.Sleep(1 * time.Second) + synctest.Wait() // make sure listener3 got the resync - if !listener3.ok() { - t.Errorf("%s: expected %v, got %v", listener3.name, listener3.expectedItemNames, listener3.receivedItemNames) + if !listener3.satisfiedExpectations() { + t.Errorf("%s: %s: expected %v, got %v", time.Since(start), listener3.name, listener3.expectedItemNames, listener3.receivedItemNames) } - // wait a bit to give errant items a chance to go to 1 and 2 - time.Sleep(1 * time.Second) - // make sure listeners 1 and 2 got nothing if len(listener1.receivedItemNames) != 0 { - t.Errorf("listener1: should not have resynced (got %d)", len(listener1.receivedItemNames)) + t.Errorf("%s: listener1: should not have resynced (got %d)", time.Since(start), len(listener1.receivedItemNames)) } if len(listener2.receivedItemNames) != 0 { - t.Errorf("listener2: should not have resynced (got %d)", len(listener2.receivedItemNames)) + t.Errorf("%s: listener2: should not have resynced (got %d)", time.Since(start), len(listener2.receivedItemNames)) } }