mirror of
https://github.com/kubernetes/kubernetes.git
synced 2026-05-28 04:04:39 -04:00
Merge pull request #137358 from xigang/processlister_log
client-go/cache: add slow-handler tracing in processorListener
This commit is contained in:
commit
1f5701a46d
1 changed files with 21 additions and 3 deletions
|
|
@ -35,6 +35,7 @@ import (
|
|||
"k8s.io/utils/buffer"
|
||||
"k8s.io/utils/clock"
|
||||
"k8s.io/utils/ptr"
|
||||
utiltrace "k8s.io/utils/trace"
|
||||
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
|
|
@ -1213,7 +1214,8 @@ type processorListener struct {
|
|||
addCh chan interface{}
|
||||
done chan struct{}
|
||||
|
||||
handler ResourceEventHandler
|
||||
handler ResourceEventHandler
|
||||
handlerName string
|
||||
|
||||
syncTracker *synctrack.SingleFileTracker
|
||||
upstreamHasSynced DoneChecker
|
||||
|
|
@ -1224,6 +1226,9 @@ type processorListener struct {
|
|||
// TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
|
||||
// we should try to do something better.
|
||||
pendingNotifications buffer.RingGrowing
|
||||
// pendingNotificationsLength tracks pendingNotifications size and is only mutated by pop().
|
||||
// run() reads this to decide when to enable expensive time tracing.
|
||||
pendingNotificationsLength atomic.Int64
|
||||
|
||||
// requestedResyncPeriod is how frequently the listener wants a
|
||||
// full resync from the shared informer, but modified by two
|
||||
|
|
@ -1261,6 +1266,7 @@ func (p *processorListener) HasSyncedChecker() DoneChecker {
|
|||
}
|
||||
|
||||
func newProcessListener(logger klog.Logger, handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced DoneChecker) *processorListener {
|
||||
handlerName := nameForHandler(handler)
|
||||
ret := &processorListener{
|
||||
logger: logger,
|
||||
nextCh: make(chan interface{}),
|
||||
|
|
@ -1268,7 +1274,8 @@ func newProcessListener(logger klog.Logger, handler ResourceEventHandler, reques
|
|||
done: make(chan struct{}),
|
||||
upstreamHasSynced: hasSynced,
|
||||
handler: handler,
|
||||
syncTracker: synctrack.NewSingleFileTracker(fmt.Sprintf("%s + event handler %s", hasSynced.Name(), nameForHandler(handler))),
|
||||
handlerName: handlerName,
|
||||
syncTracker: synctrack.NewSingleFileTracker(fmt.Sprintf("%s + event handler %s", hasSynced.Name(), handlerName)),
|
||||
pendingNotifications: *buffer.NewRingGrowing(bufferSize),
|
||||
requestedResyncPeriod: requestedResyncPeriod,
|
||||
resyncPeriod: resyncPeriod,
|
||||
|
|
@ -1299,7 +1306,9 @@ func (p *processorListener) pop() {
|
|||
// Notification dispatched
|
||||
var ok bool
|
||||
notification, ok = p.pendingNotifications.ReadOne()
|
||||
if !ok { // Nothing to pop
|
||||
if ok {
|
||||
p.pendingNotificationsLength.Add(-1)
|
||||
} else { // Nothing to pop
|
||||
nextCh = nil // Disable this select case
|
||||
}
|
||||
case notificationToAdd, ok := <-p.addCh:
|
||||
|
|
@ -1312,6 +1321,7 @@ func (p *processorListener) pop() {
|
|||
nextCh = p.nextCh
|
||||
} else { // There is already a notification waiting to be dispatched
|
||||
p.pendingNotifications.WriteOne(notificationToAdd)
|
||||
p.pendingNotificationsLength.Add(1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1334,6 +1344,14 @@ func (p *processorListener) run() {
|
|||
// Gets reset below, but only if we get that far.
|
||||
sleepAfterCrash = true
|
||||
defer utilruntime.HandleCrashWithLogger(p.logger)
|
||||
pendingNotifications := p.pendingNotificationsLength.Load()
|
||||
if pendingNotifications > initialBufferSize {
|
||||
trace := utiltrace.New("processorListener handler",
|
||||
utiltrace.Field{Key: "handler", Value: p.handlerName},
|
||||
utiltrace.Field{Key: "pendingNotifications", Value: pendingNotifications},
|
||||
)
|
||||
defer trace.LogIfLong(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
switch notification := next.(type) {
|
||||
case updateNotification:
|
||||
|
|
|
|||
Loading…
Reference in a new issue