From e6ef79b2f6bb05205652e4fe48ffa523d9e3a1ec Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Fri, 23 Jan 2026 19:38:52 +0100 Subject: [PATCH 1/2] client-go informers: fix potential deadlock In the unlikely situation that sharedProcessor.distribute was triggered by a resync before sharedProcessor.run had a chance to start the listeners, the sharedProcessor deadlocked: sharedProcessor.distribute held a read/write lock on listenersLock while being blocked on the write to the listener's channel. The listeners who would have read from those weren't get started because sharedProcessor.run was blocked trying to get a read lock for listenersLock. This gets fixed by releasing the read/write lock in sharedProcessor.distribute while waiting for all listeners to be started. Because either all or no listeners are started, the existing global listenersStarted boolean is sufficient. The TestListenerResyncPeriods tests now runs twice, with and without the artificial delay. It gets converted to a synctest, so it executes quickly despite the time.Sleep calls and timing is deterministic. The enhanced log output confirms that with the delay, the initial sync completes later: === RUN TestListenerResyncPeriods shared_informer_test.go:236: 0s: listener3: handle: pod1 shared_informer_test.go:236: 0s: listener3: handle: pod2 shared_informer_test.go:236: 0s: listener1: handle: pod1 shared_informer_test.go:236: 0s: listener1: handle: pod2 shared_informer_test.go:236: 0s: listener2: handle: pod1 shared_informer_test.go:236: 0s: listener2: handle: pod2 shared_informer_test.go:236: 2s: listener2: handle: pod1 shared_informer_test.go:236: 2s: listener2: handle: pod2 shared_informer_test.go:236: 3s: listener3: handle: pod1 shared_informer_test.go:236: 3s: listener3: handle: pod2 --- PASS: TestListenerResyncPeriods (0.00s) === RUN TestListenerResyncPeriodsDelayed shared_informer_test.go:236: 1s: listener1: handle: pod1 shared_informer_test.go:236: 1s: listener1: handle: pod2 shared_informer_test.go:236: 1s: listener2: handle: pod1 shared_informer_test.go:236: 1s: listener2: handle: pod2 shared_informer_test.go:236: 1s: listener3: handle: pod1 shared_informer_test.go:236: 1s: listener3: handle: pod2 shared_informer_test.go:236: 2s: listener2: handle: pod1 shared_informer_test.go:236: 2s: listener2: handle: pod2 shared_informer_test.go:236: 3s: listener3: handle: pod1 shared_informer_test.go:236: 3s: listener3: handle: pod2 --- PASS: TestListenerResyncPeriodsDelayed (0.00s) --- .../client-go/tools/cache/shared_informer.go | 32 +++++- .../tools/cache/shared_informer_test.go | 103 +++++++++++++----- 2 files changed, 103 insertions(+), 32 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go index bf58fe67829..5c4413aa006 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "sync" + "sync/atomic" "time" "k8s.io/apimachinery/pkg/api/meta" @@ -302,9 +303,12 @@ func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defa func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.Object, options SharedIndexInformerOptions) SharedIndexInformer { realClock := &clock.RealClock{} + processor := &sharedProcessor{clock: realClock} + processor.listenersRCond = sync.NewCond(processor.listenersLock.RLocker()) + return &sharedIndexInformer{ indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers), - processor: &sharedProcessor{clock: realClock}, + processor: processor, listerWatcher: lw, objectType: exampleObject, objectDescription: options.ObjectDescription, @@ -799,6 +803,7 @@ func (s *sharedIndexInformer) RemoveEventHandler(handle ResourceEventHandlerRegi type sharedProcessor struct { listenersStarted bool listenersLock sync.RWMutex + listenersRCond *sync.Cond // Caller of Wait must hold a read lock on listenersLock. // Map from listeners to whether or not they are currently syncing listeners map[*processorListener]bool clock clock.Clock @@ -868,6 +873,14 @@ func (p *sharedProcessor) distribute(obj interface{}, sync bool) { p.listenersLock.RLock() defer p.listenersLock.RUnlock() + // Before we start blocking on writes to the listeners' channels, + // ensure that they all have been started. If the processor stops, + // p.listeners gets cleared, in which case we also continue here + // and return without doing anything. + for !p.listenersStarted && len(p.listeners) > 0 { + p.listenersRCond.Wait() + } + for listener, isSyncing := range p.listeners { switch { case !sync: @@ -882,15 +895,25 @@ func (p *sharedProcessor) distribute(obj interface{}, sync bool) { } } +// sharedProcessorRunDelay is used in a synctest bubble to achieve a certain ordering of +// steps in different goroutines. +var sharedProcessorRunDelay atomic.Pointer[time.Duration] + func (p *sharedProcessor) run(ctx context.Context) { func() { - p.listenersLock.RLock() - defer p.listenersLock.RUnlock() + delay := sharedProcessorRunDelay.Load() + if delay != nil { + time.Sleep(*delay) + } + // Changing listenersStarted needs a write lock. + p.listenersLock.Lock() + defer p.listenersLock.Unlock() for listener := range p.listeners { p.wg.Start(listener.run) p.wg.Start(listener.pop) } p.listenersStarted = true + p.listenersRCond.Signal() }() <-ctx.Done() @@ -907,6 +930,9 @@ func (p *sharedProcessor) run(ctx context.Context) { // Reset to false since no listeners are running p.listenersStarted = false + // Wake up sharedProcessor.distribute. + p.listenersRCond.Signal() + p.wg.Wait() // Wait for all .pop() and .run() to stop } diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go index 88713ff6f8c..ee6ba21b359 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go @@ -25,6 +25,7 @@ import ( "strings" "sync" "testing" + "testing/synctest" "time" "github.com/google/go-cmp/cmp" @@ -46,6 +47,7 @@ import ( ) type testListener struct { + printlnFunc func(string) // Some, but not all tests use this for per-test output. lock sync.RWMutex resyncPeriod time.Duration expectedItemNames sets.Set[string] @@ -73,9 +75,18 @@ func (l *testListener) OnUpdate(old, new interface{}) { func (l *testListener) OnDelete(obj interface{}) { } +func (l *testListener) println(msg string) { + msg = l.name + ": " + msg + if l.printlnFunc != nil { + l.printlnFunc(msg) + } else { + fmt.Println(msg) + } +} + func (l *testListener) handle(obj interface{}) { key, _ := MetaNamespaceKeyFunc(obj) - fmt.Printf("%s: handle: %v\n", l.name, key) + l.println(fmt.Sprintf("handle: %v", key)) l.lock.Lock() defer l.lock.Unlock() objectMeta, _ := meta.Accessor(obj) @@ -83,7 +94,7 @@ func (l *testListener) handle(obj interface{}) { } func (l *testListener) ok() bool { - fmt.Println("polling") + l.println("polling") err := wait.PollImmediate(100*time.Millisecond, 2*time.Second, func() (bool, error) { if l.satisfiedExpectations() { return true, nil @@ -95,9 +106,9 @@ func (l *testListener) ok() bool { } // wait just a bit to allow any unexpected stragglers to come in - fmt.Println("sleeping") + l.println("sleeping") time.Sleep(1 * time.Second) - fmt.Println("final check") + l.println("final check") return l.satisfiedExpectations() } @@ -108,6 +119,12 @@ func (l *testListener) satisfiedExpectations() bool { return sets.New(l.receivedItemNames...).Equal(l.expectedItemNames) } +func (l *testListener) reset() { + l.lock.Lock() + defer l.lock.Unlock() + l.receivedItemNames = nil +} + func eventHandlerCount(i SharedInformer) int { s := i.(*sharedIndexInformer) s.startedLock.Lock() @@ -208,6 +225,29 @@ func TestIndexer(t *testing.T) { } func TestListenerResyncPeriods(t *testing.T) { + synctest.Test(t, func(t *testing.T) { testListenerResyncPeriods(t, 0) }) +} + +func TestListenerResyncPeriodsDelayed(t *testing.T) { + // sharedProcessor used to deadlock when sharedProcessor.run happened + // to be reached after the first sharedProcessor.distribute. + // This artififical delay simulates that situation. + // + // Must not run in parallel to other tests, but it doesn't need to: + // it doesn't sleep in real-world time and therefore completes quickly. + synctest.Test(t, func(t *testing.T) { testListenerResyncPeriods(t, time.Second) }) +} + +func testListenerResyncPeriods(t *testing.T, startupDelay time.Duration) { + start := time.Now() + println := func(msg string) { + delta := time.Since(start) + t.Logf("%s: %s", delta, msg) + } + + sharedProcessorRunDelay.Store(&startupDelay) + defer sharedProcessorRunDelay.Store(nil) + // source simulates an apiserver object endpoint. source := newFakeControllerSource(t) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) @@ -216,20 +256,19 @@ func TestListenerResyncPeriods(t *testing.T) { // create the shared informer and resync every 1s informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) - clock := testingclock.NewFakeClock(time.Now()) - informer.clock = clock - informer.processor.clock = clock - // listener 1, never resync listener1 := newTestListener("listener1", 0, "pod1", "pod2") + listener1.printlnFunc = println informer.AddEventHandlerWithResyncPeriod(listener1, listener1.resyncPeriod) // listener 2, resync every 2s listener2 := newTestListener("listener2", 2*time.Second, "pod1", "pod2") + listener2.printlnFunc = println informer.AddEventHandlerWithResyncPeriod(listener2, listener2.resyncPeriod) // listener 3, resync every 3s listener3 := newTestListener("listener3", 3*time.Second, "pod1", "pod2") + listener3.printlnFunc = println informer.AddEventHandlerWithResyncPeriod(listener3, listener3.resyncPeriod) listeners := []*testListener{listener1, listener2, listener3} @@ -241,59 +280,65 @@ func TestListenerResyncPeriods(t *testing.T) { wg.Wait() }() - // ensure all listeners got the initial List + // We must potentially advance time to unblock sharedProcess.run, + // otherwise synctest.Wait() below returns without anything + // being done. + time.Sleep(startupDelay) + + // Ensure all listeners got the initial after the initial processing, at the initial start time. + // + // synctest.Wait doesn't detect deadlocks involving mutexes: they are considered not durably + // blocking, so in case of such a deadlock it'll just keep waiting until the entire unit test + // times out. The backtraces then show the deadlock. + synctest.Wait() for _, listener := range listeners { - if !listener.ok() { - t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames) + if !listener.satisfiedExpectations() { + t.Errorf("%s: %s: expected %v, got %v", time.Since(start), listener.name, listener.expectedItemNames, listener.receivedItemNames) } } // reset for _, listener := range listeners { - listener.receivedItemNames = []string{} + listener.reset() } // advance so listener2 gets a resync - clock.Step(2 * time.Second) + time.Sleep(2*time.Second - startupDelay) + synctest.Wait() // make sure listener2 got the resync - if !listener2.ok() { - t.Errorf("%s: expected %v, got %v", listener2.name, listener2.expectedItemNames, listener2.receivedItemNames) + if !listener2.satisfiedExpectations() { + t.Errorf("%s: %s: expected %v, got %v", time.Since(start), listener2.name, listener2.expectedItemNames, listener2.receivedItemNames) } - // wait a bit to give errant items a chance to go to 1 and 3 - time.Sleep(1 * time.Second) - // make sure listeners 1 and 3 got nothing if len(listener1.receivedItemNames) != 0 { - t.Errorf("listener1: should not have resynced (got %d)", len(listener1.receivedItemNames)) + t.Errorf("%s: listener1: should not have resynced (got %d)", time.Since(start), len(listener1.receivedItemNames)) } if len(listener3.receivedItemNames) != 0 { - t.Errorf("listener3: should not have resynced (got %d)", len(listener3.receivedItemNames)) + t.Errorf("%s: listener3: should not have resynced (got %d)", time.Since(start), len(listener3.receivedItemNames)) } // reset for _, listener := range listeners { - listener.receivedItemNames = []string{} + listener.reset() } // advance so listener3 gets a resync - clock.Step(1 * time.Second) + time.Sleep(1 * time.Second) + synctest.Wait() // make sure listener3 got the resync - if !listener3.ok() { - t.Errorf("%s: expected %v, got %v", listener3.name, listener3.expectedItemNames, listener3.receivedItemNames) + if !listener3.satisfiedExpectations() { + t.Errorf("%s: %s: expected %v, got %v", time.Since(start), listener3.name, listener3.expectedItemNames, listener3.receivedItemNames) } - // wait a bit to give errant items a chance to go to 1 and 2 - time.Sleep(1 * time.Second) - // make sure listeners 1 and 2 got nothing if len(listener1.receivedItemNames) != 0 { - t.Errorf("listener1: should not have resynced (got %d)", len(listener1.receivedItemNames)) + t.Errorf("%s: listener1: should not have resynced (got %d)", time.Since(start), len(listener1.receivedItemNames)) } if len(listener2.receivedItemNames) != 0 { - t.Errorf("listener2: should not have resynced (got %d)", len(listener2.receivedItemNames)) + t.Errorf("%s: listener2: should not have resynced (got %d)", time.Since(start), len(listener2.receivedItemNames)) } } From 2ec0305d728bf5ce8f8df314a18e71aa120a00cf Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Tue, 27 Jan 2026 14:47:37 +0100 Subject: [PATCH 2/2] client-go informers: replace time.Sleep with callback While time.Sleep is what the test needs, maybe an arbitrary hook invocation is more acceptable in the production code because it is more general. --- .../k8s.io/client-go/tools/cache/shared_informer.go | 12 ++++++------ .../client-go/tools/cache/shared_informer_test.go | 9 +++++++-- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go index 5c4413aa006..aa5c693dba6 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go @@ -895,15 +895,15 @@ func (p *sharedProcessor) distribute(obj interface{}, sync bool) { } } -// sharedProcessorRunDelay is used in a synctest bubble to achieve a certain ordering of -// steps in different goroutines. -var sharedProcessorRunDelay atomic.Pointer[time.Duration] +// sharedProcessorRunHook can be used inside tests to execute additional code +// at the start of sharedProcessor.run. +var sharedProcessorRunHook atomic.Pointer[func()] func (p *sharedProcessor) run(ctx context.Context) { func() { - delay := sharedProcessorRunDelay.Load() - if delay != nil { - time.Sleep(*delay) + hook := sharedProcessorRunHook.Load() + if hook != nil { + (*hook)() } // Changing listenersStarted needs a write lock. p.listenersLock.Lock() diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go index ee6ba21b359..0b40315a8c2 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go @@ -245,8 +245,13 @@ func testListenerResyncPeriods(t *testing.T, startupDelay time.Duration) { t.Logf("%s: %s", delta, msg) } - sharedProcessorRunDelay.Store(&startupDelay) - defer sharedProcessorRunDelay.Store(nil) + if startupDelay > 0 { + hook := func() { + time.Sleep(startupDelay) + } + sharedProcessorRunHook.Store(&hook) + defer sharedProcessorRunHook.Store(nil) + } // source simulates an apiserver object endpoint. source := newFakeControllerSource(t)