diff --git a/pkg/scheduler/schedule_one_podgroup.go b/pkg/scheduler/schedule_one_podgroup.go index c46b711f37e..8e0a27158b0 100644 --- a/pkg/scheduler/schedule_one_podgroup.go +++ b/pkg/scheduler/schedule_one_podgroup.go @@ -229,11 +229,16 @@ func (sched *Scheduler) podGroupCycle(ctx context.Context, schedFwk framework.Fr result := podGroupAlgorithmResult{ status: fwk.AsStatus(err), } + // Ensure podResults has an entry for each pod in the pod group with Error status. + result = completePodGroupAlgorithmResult(ctx, podGroupInfo, podGroupCycleState, runAllPostFilters, result) sched.submitPodGroupAlgorithmResult(ctx, schedFwk, podGroupCycleState, podGroupInfo, result, start) return } result := sched.podGroupSchedulingAlgorithm(ctx, schedFwk, podGroupCycleState, podGroupInfo, runAllPostFilters) + + // Ensure podResults has an entry for each pod in the pod group with a status. + result = completePodGroupAlgorithmResult(ctx, podGroupInfo, podGroupCycleState, runAllPostFilters, result) metrics.PodGroupSchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start)) // Run workload aware preemption if required. If the preemption is successful, @@ -490,7 +495,29 @@ func (sched *Scheduler) podGroupPodSchedulingAlgorithm(ctx context.Context, sche }, revertFn } +// completePodGroupAlgorithmResult ensures that the podGroupAlgorithmResult contains the same number of podResults as there are pods in QueuedPodInfos. +func completePodGroupAlgorithmResult(ctx context.Context, podGroupInfo *framework.QueuedPodGroupInfo, podGroupState *framework.CycleState, postFilterMode podGroupPostFilterMode, podGroupResult podGroupAlgorithmResult) podGroupAlgorithmResult { + numInResult := len(podGroupResult.podResults) + numInQueue := len(podGroupInfo.QueuedPodInfos) + if numInResult == numInQueue { + return podGroupResult + } + newResults := make([]algorithmResult, numInQueue) + copy(newResults, podGroupResult.podResults) + for i := numInResult; i < numInQueue; i++ { + pInfo := podGroupInfo.QueuedPodInfos[i] + newResults[i] = algorithmResult{ + podCtx: initPodSchedulingContext(ctx, pInfo.Pod, podGroupState, postFilterMode), + status: podGroupResult.status.Clone(), + } + } + podGroupResult.podResults = newResults + return podGroupResult +} + // submitPodGroupAlgorithmResult submits the result of the pod group scheduling algorithm. +// It assumes that podGroupResult contains results for all pods from the pod group, +// if it does not, podGroupCondition will be updated to reflect the error. // If that algorithm succedeed, the schedulable pods proceed to the binding cycle. // Unschedulable pods are moved back to the scheduling queue and need to wait // for the next pod group scheduling cycle. @@ -499,19 +526,16 @@ func (sched *Scheduler) podGroupPodSchedulingAlgorithm(ctx context.Context, sche func (sched *Scheduler) submitPodGroupAlgorithmResult(ctx context.Context, schedFwk framework.Framework, podGroupState *framework.CycleState, podGroupInfo *framework.QueuedPodGroupInfo, podGroupResult podGroupAlgorithmResult, start time.Time) { logger := klog.FromContext(ctx) + if len(podGroupResult.podResults) != len(podGroupInfo.QueuedPodInfos) { + // This should never happen, but if it does, complete the result with the error status. + logger.Error(fmt.Errorf("some pods were not processed"), "scheduling error for pod group", "podGroup", klog.KObj(podGroupInfo)) + podGroupResult.status = fwk.NewStatus(fwk.Error, "scheduling error for pod group, some pods were not processed") + podGroupResult.podResults = nil + podGroupResult = completePodGroupAlgorithmResult(ctx, podGroupInfo, podGroupState, runAllPostFilters, podGroupResult) + } var scheduledPods, unschedulablePods int for i, pInfo := range podGroupInfo.QueuedPodInfos { - var podResult algorithmResult - if len(podGroupResult.podResults) > i { - podResult = podGroupResult.podResults[i] - } else { - // In pod group-level unschedulable or error cases, podResult may not be defined. - // Initialize it now to handle pod failure correctly. - podResult = algorithmResult{ - podCtx: initPodSchedulingContext(ctx, pInfo.Pod, podGroupState, runAllPostFilters), - status: podGroupResult.status.Clone(), - } - } + podResult := podGroupResult.podResults[i] podCtx := podResult.podCtx ctx := klog.NewContext(ctx, podCtx.logger) // To be consistent with pod-by-pod scheduling, construct pod scheduling start time as `now - scheduling duration`. @@ -776,14 +800,13 @@ func makeProposedAssignments(res *podGroupAlgorithmResult) []fwk.ProposedAssignm return proposedAssignments } -// podGroupSchedulingAlgorithm attempts to schedule pods in the pod group according to the policy and constraints and returns the scheduling result for each pod in the pod group. +// podGroupSchedulingAlgorithm attempts to schedule pods in the pod group according to the policy and constraints and returns the scheduling result for all evaluated pods in the pod group, not necessarily all pods in the pod group. func (sched *Scheduler) podGroupSchedulingAlgorithm(ctx context.Context, schedFwk framework.Framework, podGroupCycleState *framework.CycleState, podGroupInfo *framework.QueuedPodGroupInfo, postFilterMode podGroupPostFilterMode) podGroupAlgorithmResult { podGroupCycleCtx, cancel := context.WithCancel(ctx) defer cancel() if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareWorkloadScheduling) { return sched.podGroupSchedulingPlacementAlgorithm(podGroupCycleCtx, schedFwk, podGroupCycleState, podGroupInfo, postFilterMode) - } else { - return sched.podGroupSchedulingDefaultAlgorithm(podGroupCycleCtx, schedFwk, podGroupCycleState, podGroupInfo, postFilterMode) } + return sched.podGroupSchedulingDefaultAlgorithm(podGroupCycleCtx, schedFwk, podGroupCycleState, podGroupInfo, postFilterMode) } diff --git a/pkg/scheduler/schedule_one_podgroup_test.go b/pkg/scheduler/schedule_one_podgroup_test.go index b23d568d193..bf3235777c9 100644 --- a/pkg/scheduler/schedule_one_podgroup_test.go +++ b/pkg/scheduler/schedule_one_podgroup_test.go @@ -424,6 +424,141 @@ func TestPodGroupCycle_UpdateSnapshotError(t *testing.T) { } } +func TestPodGroupCycle_FillsPodResultsOnFewerResults(t *testing.T) { + testPodGroup := st.MakePodGroup().Name("pg").Namespace("default").Obj() + p1 := st.MakePod().Name("p1").UID("p1").PodGroupName("pg").SchedulerName("test-scheduler").Obj() + p2 := st.MakePod().Name("p2").UID("p2").PodGroupName("pg").SchedulerName("test-scheduler").Obj() + p3 := st.MakePod().Name("p3").UID("p3").PodGroupName("pg").SchedulerName("test-scheduler").Obj() + testNode := st.MakeNode().Name("node1").UID("node1").Obj() + + qInfo1 := &framework.QueuedPodInfo{PodInfo: &framework.PodInfo{Pod: p1}} + qInfo2 := &framework.QueuedPodInfo{PodInfo: &framework.PodInfo{Pod: p2}} + qInfo3 := &framework.QueuedPodInfo{PodInfo: &framework.PodInfo{Pod: p3}} + + podGroupInfo := &framework.QueuedPodGroupInfo{ + QueuedPodInfos: []*framework.QueuedPodInfo{qInfo1, qInfo2, qInfo3}, + PodGroupInfo: &framework.PodGroupInfo{ + Name: "pg", + Namespace: "default", + UnscheduledPods: []*v1.Pod{p1, p2, p3}, + }, + } + + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + fakePlugin := &fakePodGroupPlugin{ + filterStatus: map[string]*fwk.Status{ + "p1": nil, + "p2": fwk.NewStatus(fwk.Error, "filter error for p2"), + "p3": nil, + }, + permitStatus: map[string]*fwk.Status{ + "p1": nil, + "p2": nil, + "p3": nil, + }, + } + + registry := []tf.RegisterPluginFunc{ + tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + tf.RegisterPostFilterPlugin(fakePlugin.Name(), func(_ context.Context, _ runtime.Object, _ fwk.Handle) (fwk.Plugin, error) { + return fakePlugin, nil + }), + tf.RegisterPermitPlugin(fakePlugin.Name(), func(_ context.Context, _ runtime.Object, _ fwk.Handle) (fwk.Plugin, error) { + return fakePlugin, nil + }), + tf.RegisterFilterPlugin(fakePlugin.Name(), func(_ context.Context, _ runtime.Object, _ fwk.Handle) (fwk.Plugin, error) { + return fakePlugin, nil + }), + } + + client := clientsetfake.NewSimpleClientset(testPodGroup, testNode) + informerFactory := informers.NewSharedInformerFactory(client, 0) + podGroupLister := informerFactory.Scheduling().V1alpha3().PodGroups().Lister() + + informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) + queue := internalqueue.NewSchedulingQueue(nil, informerFactory) + snapshot := internalcache.NewEmptySnapshot() + + schedFwk, err := tf.NewFramework(ctx, registry, "test-scheduler", + frameworkruntime.WithInformerFactory(informerFactory), + frameworkruntime.WithSnapshotSharedLister(snapshot), + frameworkruntime.WithPodNominator(queue), + frameworkruntime.WithClientSet(client), + frameworkruntime.WithEventRecorder(events.NewFakeRecorder(100)), + ) + if err != nil { + t.Fatalf("Failed to create new framework: %v", err) + } + + cache := internalcache.New(ctx, nil, true) + logger, ctx := ktesting.NewTestContext(t) + cache.AddNode(logger, testNode) + + handledPods := make(map[string]*fwk.Status) + var lock sync.Mutex + + sched := &Scheduler{ + Profiles: profile.Map{"test-scheduler": schedFwk}, + SchedulingQueue: internalqueue.NewTestQueue(ctx, nil), + Cache: cache, + client: client, + podGroupLister: podGroupLister, + nodeInfoSnapshot: internalcache.NewEmptySnapshot(), + workloadAwarePreemptionEnabled: false, + FailureHandler: func(ctx context.Context, fwk framework.Framework, p *framework.QueuedPodInfo, status *fwk.Status, ni *fwk.NominatingInfo, start time.Time) { + lock.Lock() + defer lock.Unlock() + handledPods[p.Pod.Name] = status + }, + } + + // Checking that scheduling algorithm is returning shorter list + if err := sched.Cache.UpdateSnapshot(logger, sched.nodeInfoSnapshot); err != nil { + t.Fatalf("Failed to update snapshot: %v", err) + } + sched.SchedulePod = sched.schedulePod + schedulePodResult := sched.podGroupSchedulingAlgorithm(ctx, schedFwk, framework.NewCycleState(), podGroupInfo, runAllPostFilters) + if len(schedulePodResult.podResults) != 2 { + t.Errorf("Expected 2 pod results, got %d", len(schedulePodResult.podResults)) + } + + // Run the scheduling cycle and check that all pods are handled. + sched.podGroupCycle(ctx, schedFwk, framework.NewCycleState(), podGroupInfo) + + lock.Lock() + defer lock.Unlock() + + if len(handledPods) != 3 { + t.Errorf("Expected FailureHandler to be called for 3 pods, but got called for %d", len(handledPods)) + } + + expectedGroupErrMsg := "failed to schedule other pod from a pod group: running \"FakePodGroupPlugin\" filter plugin: filter error for p2" + expectedP2ErrMsg := "running \"FakePodGroupPlugin\" filter plugin: filter error for p2" + + if status, ok := handledPods["p1"]; !ok { + t.Errorf("Expected FailureHandler to be called for p1") + } else if status.AsError() == nil || status.AsError().Error() != expectedGroupErrMsg { + t.Errorf("Expected status error for p1 to be %q, got %v", expectedGroupErrMsg, status.AsError()) + } + + if status, ok := handledPods["p2"]; !ok { + t.Errorf("Expected FailureHandler to be called for p2") + } else if status.AsError() == nil || status.AsError().Error() != expectedP2ErrMsg { + t.Errorf("Expected status error for p2 to be %q, got %v", expectedP2ErrMsg, status.AsError()) + } + + if status, ok := handledPods["p3"]; !ok { + t.Errorf("Expected FailureHandler to be called for p3") + } else if status.AsError() == nil || status.AsError().Error() != expectedGroupErrMsg { + t.Errorf("Expected status error for p3 to be %q, got %v", expectedGroupErrMsg, status.AsError()) + } +} + func TestPodGroupCycle_PodGroupPostFilter(t *testing.T) { tests := []struct { name string @@ -1391,8 +1526,17 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) { { name: "Unschedulable for the entire pod group", algorithmResult: podGroupAlgorithmResult{ - status: fwk.NewStatus(fwk.Unschedulable, "node affinity mismatch"), - podResults: []algorithmResult{}, + status: fwk.NewStatus(fwk.Unschedulable, "node affinity mismatch"), + podResults: []algorithmResult{{ + scheduleResult: ScheduleResult{SuggestedHost: "", nominatingInfo: clearNominatedNode}, + status: fwk.NewStatus(fwk.Unschedulable), + }, { + scheduleResult: ScheduleResult{SuggestedHost: "", nominatingInfo: clearNominatedNode}, + status: fwk.NewStatus(fwk.Unschedulable), + }, { + scheduleResult: ScheduleResult{SuggestedHost: "", nominatingInfo: clearNominatedNode}, + status: fwk.NewStatus(fwk.Unschedulable), + }}, }, expectBound: sets.New[string](), expectFailed: sets.New("p1", "p2", "p3"), @@ -1414,6 +1558,9 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) { }, { scheduleResult: ScheduleResult{SuggestedHost: "", nominatingInfo: clearNominatedNode}, status: fwk.NewStatus(fwk.Error, "plugin returned error"), + }, { + scheduleResult: ScheduleResult{SuggestedHost: "", nominatingInfo: clearNominatedNode}, + status: fwk.NewStatus(fwk.Error, "plugin returned error"), }}, }, expectBound: sets.New[string](), @@ -1440,6 +1587,9 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) { }, { scheduleResult: ScheduleResult{SuggestedHost: "", nominatingInfo: clearNominatedNode}, status: fwk.NewStatus(fwk.Error, "internal failure"), + }, { + scheduleResult: ScheduleResult{SuggestedHost: "", nominatingInfo: clearNominatedNode}, + status: fwk.NewStatus(fwk.Error, "internal failure"), }}, }, expectBound: sets.New[string](), @@ -1546,6 +1696,9 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) { }, { scheduleResult: ScheduleResult{SuggestedHost: "", nominatingInfo: clearNominatedNode}, status: fwk.NewStatus(fwk.Error), + }, { + scheduleResult: ScheduleResult{SuggestedHost: "", nominatingInfo: clearNominatedNode}, + status: fwk.NewStatus(fwk.Error), }}, }, expectBound: sets.New[string](), @@ -1557,6 +1710,25 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) { Message: "All pods scheduled", }, }, + { + name: "Different number of pods in result and queue, should fail all queue pods", + algorithmResult: podGroupAlgorithmResult{ + status: fwk.NewStatus(fwk.Error), + podResults: []algorithmResult{{ + scheduleResult: ScheduleResult{SuggestedHost: "node1"}, + status: nil, + permitStatus: nil, + }}, + }, + expectBound: sets.New[string](), + expectFailed: sets.New("p1", "p2", "p3"), + expectCondition: &metav1.Condition{ + Type: schedulingapi.PodGroupScheduled, + Status: metav1.ConditionFalse, + Reason: schedulingapi.PodGroupReasonSchedulerError, + Message: fwk.NewStatus(fwk.Error, "scheduling error for pod group, some pods were not processed").AsError().Error(), + }, + }, } for _, tt := range tests {