diff --git a/pkg/scheduler/backend/queue/active_queue.go b/pkg/scheduler/backend/queue/active_queue.go index de722833d44..9581c65d996 100644 --- a/pkg/scheduler/backend/queue/active_queue.go +++ b/pkg/scheduler/backend/queue/active_queue.go @@ -323,14 +323,10 @@ func (aq *activeQueue) unlockedPop(logger klog.Logger) (*framework.QueuedPodInfo } aq.schedCycle++ - // Update metrics and reset the set of unschedulable plugins for the next attempt. + // Update metrics for unschedulable plugins. for plugin := range pInfo.UnschedulablePlugins.Union(pInfo.PendingPlugins) { metrics.UnschedulableReason(plugin, pInfo.Pod.Spec.SchedulerName).Dec() } - pInfo.UnschedulablePlugins.Clear() - pInfo.PendingPlugins.Clear() - pInfo.GatingPlugin = "" - pInfo.GatingPluginEvents = nil return pInfo, nil } diff --git a/pkg/scheduler/backend/queue/scheduling_queue.go b/pkg/scheduler/backend/queue/scheduling_queue.go index 717b55656f1..ded796af4b3 100644 --- a/pkg/scheduler/backend/queue/scheduling_queue.go +++ b/pkg/scheduler/backend/queue/scheduling_queue.go @@ -894,6 +894,8 @@ func (p *PriorityQueue) AddUnschedulableIfNotPresent(logger klog.Logger, pInfo * // We changed ConsecutiveErrorsCount or UnschedulableCount plus Timestamp, and now the calculated backoff time should be different, // removing the cached backoff time. pInfo.BackoffExpiration = time.Time{} + // Clear the flush flag since the pod is returning to the queue after a scheduling attempt. + pInfo.WasFlushedFromUnschedulable = false if !p.isSchedulingQueueHintEnabled { // fall back to the old behavior which doesn't depend on the queueing hint. @@ -948,6 +950,8 @@ func (p *PriorityQueue) flushUnschedulablePodsLeftover(logger klog.Logger) { for _, pInfo := range p.unschedulablePods.podInfoMap { lastScheduleTime := pInfo.Timestamp if currentTime.Sub(lastScheduleTime) > p.podMaxInUnschedulablePodsDuration { + // Mark this pod as flushed so we can detect if it schedules soon after + pInfo.WasFlushedFromUnschedulable = true podsToMove = append(podsToMove, pInfo) } } diff --git a/pkg/scheduler/backend/queue/scheduling_queue_test.go b/pkg/scheduler/backend/queue/scheduling_queue_test.go index 505d2bbe8b9..cad1e9373f3 100644 --- a/pkg/scheduler/backend/queue/scheduling_queue_test.go +++ b/pkg/scheduler/backend/queue/scheduling_queue_test.go @@ -649,7 +649,7 @@ func Test_InFlightPods(t *testing.T) { }, }, { - name: "popped pod must have empty UnschedulablePlugins and PendingPlugins", + name: "popped pod preserves UnschedulablePlugins and PendingPlugins", isSchedulingQueueHintEnabled: true, initialPods: []*v1.Pod{pod1}, actions: []action{ @@ -666,8 +666,13 @@ func Test_InFlightPods(t *testing.T) { {eventHappens: &pvAdd}, // Active again. {callback: func(t *testing.T, q *PriorityQueue) { poppedPod = popPod(t, logger, q, pod1) - if len(poppedPod.UnschedulablePlugins) > 0 { - t.Errorf("QueuedPodInfo from Pop should have empty UnschedulablePlugins, got instead: %+v", poppedPod) + // UnschedulablePlugins should be preserved for logging/debugging + if !poppedPod.UnschedulablePlugins.Equal(sets.New("fooPlugin2")) { + t.Errorf("QueuedPodInfo from Pop should preserve UnschedulablePlugins, expected fooPlugin2, got: %+v", poppedPod.UnschedulablePlugins) + } + // PendingPlugins are preserved after Pop() for logging + if !poppedPod.PendingPlugins.Equal(sets.New("fooPlugin1")) { + t.Errorf("QueuedPodInfo from Pop should preserve PendingPlugins, expected fooPlugin1, got: %+v", poppedPod.PendingPlugins) } }}, {callback: func(t *testing.T, q *PriorityQueue) { @@ -935,8 +940,10 @@ func TestPop(t *testing.T) { // Now check result of Pop. poppedPod = popPod(t, logger, q, pod) - if len(poppedPod.PendingPlugins) > 0 { - t.Errorf("QueuedPodInfo from Pop should have empty PendingPlugins, got instead: %+v", poppedPod) + // PendingPlugins are preserved after Pop() so they can be logged if scheduling + // succeeds, or cleared in handleSchedulingFailure() if it fails. + if !poppedPod.PendingPlugins.Equal(sets.New("fooPlugin1")) { + t.Errorf("QueuedPodInfo from Pop should preserve PendingPlugins, expected fooPlugin1, got instead: %+v", poppedPod) } }) } @@ -3023,6 +3030,65 @@ func TestHighPriorityFlushUnschedulablePodsLeftover(t *testing.T) { } } +// TestFlushUnschedulablePodsLeftoverSetsFlag verifies that the WasFlushedFromUnschedulable +// flag is correctly set when pods are flushed and cleared when they return to the queue. +func TestFlushUnschedulablePodsLeftoverSetsFlag(t *testing.T) { + c := testingclock.NewFakeClock(time.Now()) + m := makeEmptyQueueingHintMapPerProfile() + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + q := NewTestQueue(ctx, newDefaultQueueSort(), WithClock(c), WithQueueingHintMapPerProfile(m)) + + pod := st.MakePod().Name("test-pod").Namespace("ns1").UID("tp-1").Priority(midPriority).NominatedNodeName("node1").Obj() + + // Add pod to activeQ and pop it to simulate a scheduling attempt + q.Add(logger, pod) + pInfo, err := q.Pop(logger) + if err != nil { + t.Fatalf("Unexpected error from Pop: %v", err) + } + + // Verify flag is initially false + if pInfo.WasFlushedFromUnschedulable { + t.Errorf("Expected WasFlushedFromUnschedulable to be false initially, but got true") + } + + // Add pod to unschedulablePods (simulating failed scheduling) + err = q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(pod, "fakePlugin"), q.SchedulingCycle()) + if err != nil { + t.Fatalf("Unexpected error from AddUnschedulableIfNotPresent: %v", err) + } + + // Advance time past the flush duration and flush + c.Step(DefaultPodMaxInUnschedulablePodsDuration + time.Second) + q.flushUnschedulablePodsLeftover(logger) + + // Pop the pod and verify flag is now true + pInfo, err = q.Pop(logger) + if err != nil { + t.Fatalf("Unexpected error from Pop after flush: %v", err) + } + if !pInfo.WasFlushedFromUnschedulable { + t.Errorf("Expected WasFlushedFromUnschedulable to be true after flush, but got false") + } + + // Simulate pod failing to schedule again and returning to queue + err = q.AddUnschedulableIfNotPresent(logger, q.newQueuedPodInfo(pInfo.Pod, "fakePlugin"), q.SchedulingCycle()) + if err != nil { + t.Fatalf("Unexpected error from AddUnschedulableIfNotPresent: %v", err) + } + + // Verify flag is cleared when pod returns to queue + internalPInfo := q.unschedulablePods.get(pod) + if internalPInfo == nil { + t.Fatalf("pod should be in unschedulablePods") + } + if internalPInfo.WasFlushedFromUnschedulable { + t.Errorf("Expected WasFlushedFromUnschedulable to be cleared (false) after returning to queue, but got true") + } +} + func TestPriorityQueue_initPodMaxInUnschedulablePodsDuration(t *testing.T) { pod1 := st.MakePod().Name("test-pod-1").Namespace("ns1").UID("tp-1").NominatedNodeName("node1").Obj() pod2 := st.MakePod().Name("test-pod-2").Namespace("ns2").UID("tp-2").NominatedNodeName("node2").Obj() diff --git a/pkg/scheduler/framework/types.go b/pkg/scheduler/framework/types.go index 9e92527be17..8f264cc1eea 100644 --- a/pkg/scheduler/framework/types.go +++ b/pkg/scheduler/framework/types.go @@ -526,6 +526,12 @@ type QueuedPodInfo struct { // That's why we need to distinguish ConsecutiveErrorsCount for the error status and UnschedulableCount for the unschedulable status. // See https://github.com/kubernetes/kubernetes/issues/128744 for the discussion. ConsecutiveErrorsCount int + // WasFlushedFromUnschedulable tracks whether this pod was most recently moved to activeQ + // by the periodic flush from unschedulablePods due to timeout (rather than by an event). + // This is used to detect if the pod becomes schedulable soon after flush, which may + // indicate queueing hint misconfigurations or event handling bugs. + // This flag is cleared when the pod returns to the queue for any reason. + WasFlushedFromUnschedulable bool // The time when the pod is added to the queue for the first time. The pod may be added // back to the queue multiple times before it's successfully scheduled. // It shouldn't be updated once initialized. It's used to record the e2e scheduling @@ -613,6 +619,15 @@ func (pqi *QueuedPodInfo) DeepCopy() *QueuedPodInfo { } } +// ClearRejectorPlugins clears the plugin-related fields that track why a pod +// was rejected in a previous scheduling attempt. +func (pqi *QueuedPodInfo) ClearRejectorPlugins() { + pqi.UnschedulablePlugins.Clear() + pqi.PendingPlugins.Clear() + pqi.GatingPlugin = "" + pqi.GatingPluginEvents = nil +} + // PodInfo is a wrapper to a Pod with additional pre-computed information to // accelerate processing. This information is typically immutable (e.g., pre-processed // inter-pod affinity selectors). diff --git a/pkg/scheduler/metrics/metrics.go b/pkg/scheduler/metrics/metrics.go index 8fbdc0d98ab..6c1b9cca555 100644 --- a/pkg/scheduler/metrics/metrics.go +++ b/pkg/scheduler/metrics/metrics.go @@ -131,6 +131,7 @@ var ( PodSchedulingSLIDuration *metrics.HistogramVec PodSchedulingAttempts *metrics.Histogram + PodScheduledAfterFlush *metrics.Counter FrameworkExtensionPointDuration *metrics.HistogramVec PluginExecutionDuration *metrics.HistogramVec @@ -296,6 +297,14 @@ func InitMetrics() { StabilityLevel: metrics.STABLE, }) + PodScheduledAfterFlush = metrics.NewCounter( + &metrics.CounterOpts{ + Subsystem: SchedulerSubsystem, + Name: "pod_scheduled_after_flush_total", + Help: "Number of pods that were successfully scheduled after being flushed from unschedulablePods due to timeout. This metric helps detect potential queueing hint misconfigurations or event handling issues.", + StabilityLevel: metrics.ALPHA, + }) + FrameworkExtensionPointDuration = metrics.NewHistogramVec( &metrics.HistogramOpts{ Subsystem: SchedulerSubsystem, @@ -463,6 +472,7 @@ func InitMetrics() { pendingPods, PodSchedulingSLIDuration, PodSchedulingAttempts, + PodScheduledAfterFlush, FrameworkExtensionPointDuration, PluginExecutionDuration, SchedulerQueueIncomingPods, diff --git a/pkg/scheduler/schedule_one.go b/pkg/scheduler/schedule_one.go index 683bfaed690..ba10a112a5d 100644 --- a/pkg/scheduler/schedule_one.go +++ b/pkg/scheduler/schedule_one.go @@ -340,6 +340,11 @@ func (sched *Scheduler) bindingCycle( if assumedPodInfo.InitialAttemptTimestamp != nil { metrics.PodSchedulingSLIDuration.WithLabelValues(getAttemptsLabel(assumedPodInfo)).Observe(metrics.SinceInSeconds(*assumedPodInfo.InitialAttemptTimestamp)) } + // Count pods scheduled after being flushed from unschedulablePods + if assumedPodInfo.WasFlushedFromUnschedulable { + logger.V(4).Info("Pod scheduled after flush from unschedulablePods", "pod", klog.KObj(assumedPodInfo.Pod), "unschedulablePlugins", assumedPodInfo.UnschedulablePlugins, "pendingPlugins", assumedPodInfo.PendingPlugins) + metrics.PodScheduledAfterFlush.Inc() + } // Run "postbind" plugins. schedFramework.RunPostBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost) @@ -1058,6 +1063,12 @@ func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framewo err := status.AsError() errMsg := status.Message() + // Clear plugin-related fields to avoid stale data from previous scheduling attempts. + // These fields will be repopulated below for FitError cases. + // We clear them here (rather than at Pop) because we sometimes want to use them + // for logging when a pod schedules successfully (e.g., after being flushed). + podInfo.ClearRejectorPlugins() + if err == ErrNoNodesAvailable { logger.V(2).Info("Unable to schedule pod; no nodes are registered to the cluster; waiting", "pod", klog.KObj(pod)) } else if fitError, ok := err.(*framework.FitError); ok { // Inject UnschedulablePlugins to PodInfo, which will be used later for moving Pods between queues efficiently.