Merge pull request #135126 from mrvarmazyar/add-pod-flush-metric

scheduler: add metric for pods scheduled after flush
This commit is contained in:
Kubernetes Prow Robot 2025-12-17 19:59:16 -08:00 committed by GitHub
commit 05ae5a310c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 112 additions and 10 deletions

View file

@ -323,14 +323,10 @@ func (aq *activeQueue) unlockedPop(logger klog.Logger) (*framework.QueuedPodInfo
}
aq.schedCycle++
// Update metrics and reset the set of unschedulable plugins for the next attempt.
// Update metrics for unschedulable plugins.
for plugin := range pInfo.UnschedulablePlugins.Union(pInfo.PendingPlugins) {
metrics.UnschedulableReason(plugin, pInfo.Pod.Spec.SchedulerName).Dec()
}
pInfo.UnschedulablePlugins.Clear()
pInfo.PendingPlugins.Clear()
pInfo.GatingPlugin = ""
pInfo.GatingPluginEvents = nil
return pInfo, nil
}

View file

@ -894,6 +894,8 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo *
// We changed ConsecutiveErrorsCount or UnschedulableCount plus Timestamp, and now the calculated backoff time should be different,
// removing the cached backoff time.
pInfo.BackoffExpiration = time.Time{}
// Clear the flush flag since the pod is returning to the queue after a scheduling attempt.
pInfo.WasFlushedFromUnschedulable = false
if !p.isSchedulingQueueHintEnabled {
// fall back to the old behavior which doesn't depend on the queueing hint.
@ -948,6 +950,8 @@ func (p *PriorityQueue) flushUnschedulablePodsLeftover(logger klog.Logger) {
for _, pInfo := range p.unschedulablePods.podInfoMap {
lastScheduleTime := pInfo.Timestamp
if currentTime.Sub(lastScheduleTime) > p.podMaxInUnschedulablePodsDuration {
// Mark this pod as flushed so we can detect if it schedules soon after
pInfo.WasFlushedFromUnschedulable = true
podsToMove = append(podsToMove, pInfo)
}
}

View file

@ -649,7 +649,7 @@ func Test_InFlightPods(t *testing.T) {
},
},
{
name: "popped pod must have empty UnschedulablePlugins and PendingPlugins",
name: "popped pod preserves UnschedulablePlugins and PendingPlugins",
isSchedulingQueueHintEnabled: true,
initialPods: []*v1.Pod{pod1},
actions: []action{
@ -666,8 +666,13 @@ func Test_InFlightPods(t *testing.T) {
{eventHappens: &pvAdd}, // Active again.
{callback: func(t *testing.T, q *PriorityQueue) {
poppedPod = popPod(t, logger, q, pod1)
if len(poppedPod.UnschedulablePlugins) > 0 {
t.Errorf("QueuedPodInfo from Pop should have empty UnschedulablePlugins, got instead: %+v", poppedPod)
// UnschedulablePlugins should be preserved for logging/debugging
if !poppedPod.UnschedulablePlugins.Equal(sets.New("fooPlugin2")) {
t.Errorf("QueuedPodInfo from Pop should preserve UnschedulablePlugins, expected fooPlugin2, got: %+v", poppedPod.UnschedulablePlugins)
}
// PendingPlugins are preserved after Pop() for logging
if !poppedPod.PendingPlugins.Equal(sets.New("fooPlugin1")) {
t.Errorf("QueuedPodInfo from Pop should preserve PendingPlugins, expected fooPlugin1, got: %+v", poppedPod.PendingPlugins)
}
}},
{callback: func(t *testing.T, q *PriorityQueue) {
@ -935,8 +940,10 @@ func TestPop(t *testing.T) {
// Now check result of Pop.
poppedPod = popPod(t, logger, q, pod)
if len(poppedPod.PendingPlugins) > 0 {
t.Errorf("QueuedPodInfo from Pop should have empty PendingPlugins, got instead: %+v", poppedPod)
// PendingPlugins are preserved after Pop() so they can be logged if scheduling
// succeeds, or cleared in handleSchedulingFailure() if it fails.
if !poppedPod.PendingPlugins.Equal(sets.New("fooPlugin1")) {
t.Errorf("QueuedPodInfo from Pop should preserve PendingPlugins, expected fooPlugin1, got instead: %+v", poppedPod)
}
})
}
@ -3023,6 +3030,65 @@ func TestHighPriorityFlushUnschedulablePodsLeftover(t *testing.T) {
}
}
// TestFlushUnschedulablePodsLeftoverSetsFlag verifies that the WasFlushedFromUnschedulable
// flag is correctly set when pods are flushed and cleared when they return to the queue.
func TestFlushUnschedulablePodsLeftoverSetsFlag(t *testing.T) {
c := testingclock.NewFakeClock(time.Now())
m := makeEmptyQueueingHintMapPerProfile()
logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m))
pod := st.MakePod().Name("test-pod").Namespace("ns1").UID("tp-1").Priority(midPriority).NominatedNodeName("node1").Obj()
// Add pod to activeQ and pop it to simulate a scheduling attempt
q.Add(logger, pod)
pInfo, err := q.Pop(logger)
if err != nil {
t.Fatalf("Unexpected error from Pop: %v", err)
}
// Verify flag is initially false
if pInfo.WasFlushedFromUnschedulable {
t.Errorf("Expected WasFlushedFromUnschedulable to be false initially, but got true")
}
// Add pod to unschedulablePods (simulating failed scheduling)
err = q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(pod, "fakePlugin"), q.SchedulingCycle())
if err != nil {
t.Fatalf("Unexpected error from AddUnschedulableIfNotPresent: %v", err)
}
// Advance time past the flush duration and flush
c.Step(DefaultPodMaxInUnschedulablePodsDuration + time.Second)
q.flushUnschedulablePodsLeftover(logger)
// Pop the pod and verify flag is now true
pInfo, err = q.Pop(logger)
if err != nil {
t.Fatalf("Unexpected error from Pop after flush: %v", err)
}
if !pInfo.WasFlushedFromUnschedulable {
t.Errorf("Expected WasFlushedFromUnschedulable to be true after flush, but got false")
}
// Simulate pod failing to schedule again and returning to queue
err = q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(pInfo.Pod, "fakePlugin"), q.SchedulingCycle())
if err != nil {
t.Fatalf("Unexpected error from AddUnschedulableIfNotPresent: %v", err)
}
// Verify flag is cleared when pod returns to queue
internalPInfo := q.unschedulablePods.get(pod)
if internalPInfo == nil {
t.Fatalf("pod should be in unschedulablePods")
}
if internalPInfo.WasFlushedFromUnschedulable {
t.Errorf("Expected WasFlushedFromUnschedulable to be cleared (false) after returning to queue, but got true")
}
}
func TestPriorityQueue_initPodMaxInUnschedulablePodsDuration(t *testing.T) {
pod1 := st.MakePod().Name("test-pod-1").Namespace("ns1").UID("tp-1").NominatedNodeName("node1").Obj()
pod2 := st.MakePod().Name("test-pod-2").Namespace("ns2").UID("tp-2").NominatedNodeName("node2").Obj()

View file

@ -526,6 +526,12 @@ type QueuedPodInfo struct {
// That's why we need to distinguish ConsecutiveErrorsCount for the error status and UnschedulableCount for the unschedulable status.
// See https://github.com/kubernetes/kubernetes/issues/128744 for the discussion.
ConsecutiveErrorsCount int
// WasFlushedFromUnschedulable tracks whether this pod was most recently moved to activeQ
// by the periodic flush from unschedulablePods due to timeout (rather than by an event).
// This is used to detect if the pod becomes schedulable soon after flush, which may
// indicate queueing hint misconfigurations or event handling bugs.
// This flag is cleared when the pod returns to the queue for any reason.
WasFlushedFromUnschedulable bool
// The time when the pod is added to the queue for the first time. The pod may be added
// back to the queue multiple times before it's successfully scheduled.
// It shouldn't be updated once initialized. It's used to record the e2e scheduling
@ -613,6 +619,15 @@ func (pqi *QueuedPodInfo) DeepCopy() *QueuedPodInfo {
}
}
// ClearRejectorPlugins clears the plugin-related fields that track why a pod
// was rejected in a previous scheduling attempt.
func (pqi *QueuedPodInfo) ClearRejectorPlugins() {
pqi.UnschedulablePlugins.Clear()
pqi.PendingPlugins.Clear()
pqi.GatingPlugin = ""
pqi.GatingPluginEvents = nil
}
// PodInfo is a wrapper to a Pod with additional pre-computed information to
// accelerate processing. This information is typically immutable (e.g., pre-processed
// inter-pod affinity selectors).

View file

@ -131,6 +131,7 @@ var (
PodSchedulingSLIDuration *metrics.HistogramVec
PodSchedulingAttempts *metrics.Histogram
PodScheduledAfterFlush *metrics.Counter
FrameworkExtensionPointDuration *metrics.HistogramVec
PluginExecutionDuration *metrics.HistogramVec
@ -296,6 +297,14 @@ func InitMetrics() {
StabilityLevel: metrics.STABLE,
})
PodScheduledAfterFlush = metrics.NewCounter(
&metrics.CounterOpts{
Subsystem: SchedulerSubsystem,
Name: "pod_scheduled_after_flush_total",
Help: "Number of pods that were successfully scheduled after being flushed from unschedulablePods due to timeout. This metric helps detect potential queueing hint misconfigurations or event handling issues.",
StabilityLevel: metrics.ALPHA,
})
FrameworkExtensionPointDuration = metrics.NewHistogramVec(
&metrics.HistogramOpts{
Subsystem: SchedulerSubsystem,
@ -463,6 +472,7 @@ func InitMetrics() {
pendingPods,
PodSchedulingSLIDuration,
PodSchedulingAttempts,
PodScheduledAfterFlush,
FrameworkExtensionPointDuration,
PluginExecutionDuration,
SchedulerQueueIncomingPods,

View file

@ -340,6 +340,11 @@ func (sched *Scheduler) bindingCycle(
if assumedPodInfo.InitialAttemptTimestamp != nil {
metrics.PodSchedulingSLIDuration.WithLabelValues(getAttemptsLabel(assumedPodInfo)).Observe(metrics.SinceInSeconds(*assumedPodInfo.InitialAttemptTimestamp))
}
// Count pods scheduled after being flushed from unschedulablePods
if assumedPodInfo.WasFlushedFromUnschedulable {
logger.V(4).Info("Pod scheduled after flush from unschedulablePods", "pod", klog.KObj(assumedPodInfo.Pod), "unschedulablePlugins", assumedPodInfo.UnschedulablePlugins, "pendingPlugins", assumedPodInfo.PendingPlugins)
metrics.PodScheduledAfterFlush.Inc()
}
// Run "postbind" plugins.
schedFramework.RunPostBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
@ -1058,6 +1063,12 @@ func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framewo
err := status.AsError()
errMsg := status.Message()
// Clear plugin-related fields to avoid stale data from previous scheduling attempts.
// These fields will be repopulated below for FitError cases.
// We clear them here (rather than at Pop) because we sometimes want to use them
// for logging when a pod schedules successfully (e.g., after being flushed).
podInfo.ClearRejectorPlugins()
if err == ErrNoNodesAvailable {
logger.V(2).Info("Unable to schedule pod; no nodes are registered to the cluster; waiting", "pod", klog.KObj(pod))
} else if fitError, ok := err.(*framework.FitError); ok { // Inject UnschedulablePlugins to PodInfo, which will be used later for moving Pods between queues efficiently.