mirror of
https://github.com/kubernetes/kubernetes.git
synced 2026-05-28 04:04:39 -04:00
client-go informers: fix potential deadlock
In the unlikely situation that sharedProcessor.distribute was triggered by a
resync before sharedProcessor.run had a chance to start the listeners, the
sharedProcessor deadlocked: sharedProcessor.distribute held a read/write lock
on listenersLock while being blocked on the write to the listener's
channel. The listeners who would have read from those weren't get started
because sharedProcessor.run was blocked trying to get a read lock for
listenersLock.
This gets fixed by releasing the read/write lock in sharedProcessor.distribute
while waiting for all listeners to be started. Because either all or no
listeners are started, the existing global listenersStarted boolean is
sufficient.
The TestListenerResyncPeriods tests now runs twice, with and without the
artificial delay. It gets converted to a synctest, so it executes quickly
despite the time.Sleep calls and timing is deterministic. The enhanced log
output confirms that with the delay, the initial sync completes later:
=== RUN TestListenerResyncPeriods
shared_informer_test.go:236: 0s: listener3: handle: pod1
shared_informer_test.go:236: 0s: listener3: handle: pod2
shared_informer_test.go:236: 0s: listener1: handle: pod1
shared_informer_test.go:236: 0s: listener1: handle: pod2
shared_informer_test.go:236: 0s: listener2: handle: pod1
shared_informer_test.go:236: 0s: listener2: handle: pod2
shared_informer_test.go:236: 2s: listener2: handle: pod1
shared_informer_test.go:236: 2s: listener2: handle: pod2
shared_informer_test.go:236: 3s: listener3: handle: pod1
shared_informer_test.go:236: 3s: listener3: handle: pod2
--- PASS: TestListenerResyncPeriods (0.00s)
=== RUN TestListenerResyncPeriodsDelayed
shared_informer_test.go:236: 1s: listener1: handle: pod1
shared_informer_test.go:236: 1s: listener1: handle: pod2
shared_informer_test.go:236: 1s: listener2: handle: pod1
shared_informer_test.go:236: 1s: listener2: handle: pod2
shared_informer_test.go:236: 1s: listener3: handle: pod1
shared_informer_test.go:236: 1s: listener3: handle: pod2
shared_informer_test.go:236: 2s: listener2: handle: pod1
shared_informer_test.go:236: 2s: listener2: handle: pod2
shared_informer_test.go:236: 3s: listener3: handle: pod1
shared_informer_test.go:236: 3s: listener3: handle: pod2
--- PASS: TestListenerResyncPeriodsDelayed (0.00s)
This commit is contained in:
parent
2274f69d9a
commit
e6ef79b2f6
2 changed files with 103 additions and 32 deletions
|
|
@ -21,6 +21,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
|
|
@ -302,9 +303,12 @@ func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defa
|
|||
func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.Object, options SharedIndexInformerOptions) SharedIndexInformer {
|
||||
realClock := &clock.RealClock{}
|
||||
|
||||
processor := &sharedProcessor{clock: realClock}
|
||||
processor.listenersRCond = sync.NewCond(processor.listenersLock.RLocker())
|
||||
|
||||
return &sharedIndexInformer{
|
||||
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers),
|
||||
processor: &sharedProcessor{clock: realClock},
|
||||
processor: processor,
|
||||
listerWatcher: lw,
|
||||
objectType: exampleObject,
|
||||
objectDescription: options.ObjectDescription,
|
||||
|
|
@ -799,6 +803,7 @@ func (s *sharedIndexInformer) RemoveEventHandler(handle ResourceEventHandlerRegi
|
|||
type sharedProcessor struct {
|
||||
listenersStarted bool
|
||||
listenersLock sync.RWMutex
|
||||
listenersRCond *sync.Cond // Caller of Wait must hold a read lock on listenersLock.
|
||||
// Map from listeners to whether or not they are currently syncing
|
||||
listeners map[*processorListener]bool
|
||||
clock clock.Clock
|
||||
|
|
@ -868,6 +873,14 @@ func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
|
|||
p.listenersLock.RLock()
|
||||
defer p.listenersLock.RUnlock()
|
||||
|
||||
// Before we start blocking on writes to the listeners' channels,
|
||||
// ensure that they all have been started. If the processor stops,
|
||||
// p.listeners gets cleared, in which case we also continue here
|
||||
// and return without doing anything.
|
||||
for !p.listenersStarted && len(p.listeners) > 0 {
|
||||
p.listenersRCond.Wait()
|
||||
}
|
||||
|
||||
for listener, isSyncing := range p.listeners {
|
||||
switch {
|
||||
case !sync:
|
||||
|
|
@ -882,15 +895,25 @@ func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
|
|||
}
|
||||
}
|
||||
|
||||
// sharedProcessorRunDelay is used in a synctest bubble to achieve a certain ordering of
|
||||
// steps in different goroutines.
|
||||
var sharedProcessorRunDelay atomic.Pointer[time.Duration]
|
||||
|
||||
func (p *sharedProcessor) run(ctx context.Context) {
|
||||
func() {
|
||||
p.listenersLock.RLock()
|
||||
defer p.listenersLock.RUnlock()
|
||||
delay := sharedProcessorRunDelay.Load()
|
||||
if delay != nil {
|
||||
time.Sleep(*delay)
|
||||
}
|
||||
// Changing listenersStarted needs a write lock.
|
||||
p.listenersLock.Lock()
|
||||
defer p.listenersLock.Unlock()
|
||||
for listener := range p.listeners {
|
||||
p.wg.Start(listener.run)
|
||||
p.wg.Start(listener.pop)
|
||||
}
|
||||
p.listenersStarted = true
|
||||
p.listenersRCond.Signal()
|
||||
}()
|
||||
<-ctx.Done()
|
||||
|
||||
|
|
@ -907,6 +930,9 @@ func (p *sharedProcessor) run(ctx context.Context) {
|
|||
// Reset to false since no listeners are running
|
||||
p.listenersStarted = false
|
||||
|
||||
// Wake up sharedProcessor.distribute.
|
||||
p.listenersRCond.Signal()
|
||||
|
||||
p.wg.Wait() // Wait for all .pop() and .run() to stop
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ import (
|
|||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"testing/synctest"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
|
|
@ -46,6 +47,7 @@ import (
|
|||
)
|
||||
|
||||
type testListener struct {
|
||||
printlnFunc func(string) // Some, but not all tests use this for per-test output.
|
||||
lock sync.RWMutex
|
||||
resyncPeriod time.Duration
|
||||
expectedItemNames sets.Set[string]
|
||||
|
|
@ -73,9 +75,18 @@ func (l *testListener) OnUpdate(old, new interface{}) {
|
|||
func (l *testListener) OnDelete(obj interface{}) {
|
||||
}
|
||||
|
||||
func (l *testListener) println(msg string) {
|
||||
msg = l.name + ": " + msg
|
||||
if l.printlnFunc != nil {
|
||||
l.printlnFunc(msg)
|
||||
} else {
|
||||
fmt.Println(msg)
|
||||
}
|
||||
}
|
||||
|
||||
func (l *testListener) handle(obj interface{}) {
|
||||
key, _ := MetaNamespaceKeyFunc(obj)
|
||||
fmt.Printf("%s: handle: %v\n", l.name, key)
|
||||
l.println(fmt.Sprintf("handle: %v", key))
|
||||
l.lock.Lock()
|
||||
defer l.lock.Unlock()
|
||||
objectMeta, _ := meta.Accessor(obj)
|
||||
|
|
@ -83,7 +94,7 @@ func (l *testListener) handle(obj interface{}) {
|
|||
}
|
||||
|
||||
func (l *testListener) ok() bool {
|
||||
fmt.Println("polling")
|
||||
l.println("polling")
|
||||
err := wait.PollImmediate(100*time.Millisecond, 2*time.Second, func() (bool, error) {
|
||||
if l.satisfiedExpectations() {
|
||||
return true, nil
|
||||
|
|
@ -95,9 +106,9 @@ func (l *testListener) ok() bool {
|
|||
}
|
||||
|
||||
// wait just a bit to allow any unexpected stragglers to come in
|
||||
fmt.Println("sleeping")
|
||||
l.println("sleeping")
|
||||
time.Sleep(1 * time.Second)
|
||||
fmt.Println("final check")
|
||||
l.println("final check")
|
||||
return l.satisfiedExpectations()
|
||||
}
|
||||
|
||||
|
|
@ -108,6 +119,12 @@ func (l *testListener) satisfiedExpectations() bool {
|
|||
return sets.New(l.receivedItemNames...).Equal(l.expectedItemNames)
|
||||
}
|
||||
|
||||
func (l *testListener) reset() {
|
||||
l.lock.Lock()
|
||||
defer l.lock.Unlock()
|
||||
l.receivedItemNames = nil
|
||||
}
|
||||
|
||||
func eventHandlerCount(i SharedInformer) int {
|
||||
s := i.(*sharedIndexInformer)
|
||||
s.startedLock.Lock()
|
||||
|
|
@ -208,6 +225,29 @@ func TestIndexer(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestListenerResyncPeriods(t *testing.T) {
|
||||
synctest.Test(t, func(t *testing.T) { testListenerResyncPeriods(t, 0) })
|
||||
}
|
||||
|
||||
func TestListenerResyncPeriodsDelayed(t *testing.T) {
|
||||
// sharedProcessor used to deadlock when sharedProcessor.run happened
|
||||
// to be reached after the first sharedProcessor.distribute.
|
||||
// This artififical delay simulates that situation.
|
||||
//
|
||||
// Must not run in parallel to other tests, but it doesn't need to:
|
||||
// it doesn't sleep in real-world time and therefore completes quickly.
|
||||
synctest.Test(t, func(t *testing.T) { testListenerResyncPeriods(t, time.Second) })
|
||||
}
|
||||
|
||||
func testListenerResyncPeriods(t *testing.T, startupDelay time.Duration) {
|
||||
start := time.Now()
|
||||
println := func(msg string) {
|
||||
delta := time.Since(start)
|
||||
t.Logf("%s: %s", delta, msg)
|
||||
}
|
||||
|
||||
sharedProcessorRunDelay.Store(&startupDelay)
|
||||
defer sharedProcessorRunDelay.Store(nil)
|
||||
|
||||
// source simulates an apiserver object endpoint.
|
||||
source := newFakeControllerSource(t)
|
||||
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
|
||||
|
|
@ -216,20 +256,19 @@ func TestListenerResyncPeriods(t *testing.T) {
|
|||
// create the shared informer and resync every 1s
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
|
||||
|
||||
clock := testingclock.NewFakeClock(time.Now())
|
||||
informer.clock = clock
|
||||
informer.processor.clock = clock
|
||||
|
||||
// listener 1, never resync
|
||||
listener1 := newTestListener("listener1", 0, "pod1", "pod2")
|
||||
listener1.printlnFunc = println
|
||||
informer.AddEventHandlerWithResyncPeriod(listener1, listener1.resyncPeriod)
|
||||
|
||||
// listener 2, resync every 2s
|
||||
listener2 := newTestListener("listener2", 2*time.Second, "pod1", "pod2")
|
||||
listener2.printlnFunc = println
|
||||
informer.AddEventHandlerWithResyncPeriod(listener2, listener2.resyncPeriod)
|
||||
|
||||
// listener 3, resync every 3s
|
||||
listener3 := newTestListener("listener3", 3*time.Second, "pod1", "pod2")
|
||||
listener3.printlnFunc = println
|
||||
informer.AddEventHandlerWithResyncPeriod(listener3, listener3.resyncPeriod)
|
||||
listeners := []*testListener{listener1, listener2, listener3}
|
||||
|
||||
|
|
@ -241,59 +280,65 @@ func TestListenerResyncPeriods(t *testing.T) {
|
|||
wg.Wait()
|
||||
}()
|
||||
|
||||
// ensure all listeners got the initial List
|
||||
// We must potentially advance time to unblock sharedProcess.run,
|
||||
// otherwise synctest.Wait() below returns without anything
|
||||
// being done.
|
||||
time.Sleep(startupDelay)
|
||||
|
||||
// Ensure all listeners got the initial after the initial processing, at the initial start time.
|
||||
//
|
||||
// synctest.Wait doesn't detect deadlocks involving mutexes: they are considered not durably
|
||||
// blocking, so in case of such a deadlock it'll just keep waiting until the entire unit test
|
||||
// times out. The backtraces then show the deadlock.
|
||||
synctest.Wait()
|
||||
for _, listener := range listeners {
|
||||
if !listener.ok() {
|
||||
t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames)
|
||||
if !listener.satisfiedExpectations() {
|
||||
t.Errorf("%s: %s: expected %v, got %v", time.Since(start), listener.name, listener.expectedItemNames, listener.receivedItemNames)
|
||||
}
|
||||
}
|
||||
|
||||
// reset
|
||||
for _, listener := range listeners {
|
||||
listener.receivedItemNames = []string{}
|
||||
listener.reset()
|
||||
}
|
||||
|
||||
// advance so listener2 gets a resync
|
||||
clock.Step(2 * time.Second)
|
||||
time.Sleep(2*time.Second - startupDelay)
|
||||
synctest.Wait()
|
||||
|
||||
// make sure listener2 got the resync
|
||||
if !listener2.ok() {
|
||||
t.Errorf("%s: expected %v, got %v", listener2.name, listener2.expectedItemNames, listener2.receivedItemNames)
|
||||
if !listener2.satisfiedExpectations() {
|
||||
t.Errorf("%s: %s: expected %v, got %v", time.Since(start), listener2.name, listener2.expectedItemNames, listener2.receivedItemNames)
|
||||
}
|
||||
|
||||
// wait a bit to give errant items a chance to go to 1 and 3
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
// make sure listeners 1 and 3 got nothing
|
||||
if len(listener1.receivedItemNames) != 0 {
|
||||
t.Errorf("listener1: should not have resynced (got %d)", len(listener1.receivedItemNames))
|
||||
t.Errorf("%s: listener1: should not have resynced (got %d)", time.Since(start), len(listener1.receivedItemNames))
|
||||
}
|
||||
if len(listener3.receivedItemNames) != 0 {
|
||||
t.Errorf("listener3: should not have resynced (got %d)", len(listener3.receivedItemNames))
|
||||
t.Errorf("%s: listener3: should not have resynced (got %d)", time.Since(start), len(listener3.receivedItemNames))
|
||||
}
|
||||
|
||||
// reset
|
||||
for _, listener := range listeners {
|
||||
listener.receivedItemNames = []string{}
|
||||
listener.reset()
|
||||
}
|
||||
|
||||
// advance so listener3 gets a resync
|
||||
clock.Step(1 * time.Second)
|
||||
time.Sleep(1 * time.Second)
|
||||
synctest.Wait()
|
||||
|
||||
// make sure listener3 got the resync
|
||||
if !listener3.ok() {
|
||||
t.Errorf("%s: expected %v, got %v", listener3.name, listener3.expectedItemNames, listener3.receivedItemNames)
|
||||
if !listener3.satisfiedExpectations() {
|
||||
t.Errorf("%s: %s: expected %v, got %v", time.Since(start), listener3.name, listener3.expectedItemNames, listener3.receivedItemNames)
|
||||
}
|
||||
|
||||
// wait a bit to give errant items a chance to go to 1 and 2
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
// make sure listeners 1 and 2 got nothing
|
||||
if len(listener1.receivedItemNames) != 0 {
|
||||
t.Errorf("listener1: should not have resynced (got %d)", len(listener1.receivedItemNames))
|
||||
t.Errorf("%s: listener1: should not have resynced (got %d)", time.Since(start), len(listener1.receivedItemNames))
|
||||
}
|
||||
if len(listener2.receivedItemNames) != 0 {
|
||||
t.Errorf("listener2: should not have resynced (got %d)", len(listener2.receivedItemNames))
|
||||
t.Errorf("%s: listener2: should not have resynced (got %d)", time.Since(start), len(listener2.receivedItemNames))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue