From 68acdf68c19059ca651ed09f162ebff45067e672 Mon Sep 17 00:00:00 2001 From: Bartosz Date: Wed, 13 May 2026 14:03:09 +0000 Subject: [PATCH] Use PlacementFeasible instead of Permit in PodGroup scheduling cycle --- pkg/scheduler/schedule_one_podgroup.go | 105 ++--- pkg/scheduler/schedule_one_podgroup_test.go | 467 ++++++++++---------- 2 files changed, 287 insertions(+), 285 deletions(-) diff --git a/pkg/scheduler/schedule_one_podgroup.go b/pkg/scheduler/schedule_one_podgroup.go index 63a54b99d8e..886c8441e1a 100644 --- a/pkg/scheduler/schedule_one_podgroup.go +++ b/pkg/scheduler/schedule_one_podgroup.go @@ -68,7 +68,7 @@ func (sched *Scheduler) scheduleOnePodGroup(ctx context.Context, podGroupInfo *f } sched.skipPodGroupPodSchedule(ctx, schedFwk, podGroupInfo) // skipPodGroupPodSchedule could remove some pods from the pod group. - // Pod group constraints will be re-evaluated on a Permit phase. + // Pod group constraints will be re-evaluated on a PlacementFeasible phase. // Now, verify if it has any pods left. if len(podGroupInfo.QueuedPodInfos) == 0 { return @@ -304,9 +304,6 @@ type algorithmResult struct { requiresPreemption bool // status is a scheduling algorithm status. status *fwk.Status - // permitStatus is a status of the permit check. - // This is only set when the `status` is success or the `requiresPreemption` is true. - permitStatus *fwk.Status } // podGroupPostFilterMode defines how the pod group algorithm should run post filters plugins. @@ -364,41 +361,62 @@ func (sched *Scheduler) podGroupSchedulingDefaultAlgorithm(ctx context.Context, waitingOnPreemption: false, } + placementCycleState := framework.NewCycleState() + placementCycleState.SetRecordPluginMetrics(true) + placementCycleState.SetPodGroupSchedulingCycle(podGroupCycleState) + logger := klog.FromContext(ctx) logger.V(5).Info("Running a pod group scheduling algorithm", "podGroup", klog.KObj(podGroupInfo), "unscheduledPodsCount", len(podGroupInfo.QueuedPodInfos)) requiresPreemption := false + anyScheduled := false for _, podInfo := range podGroupInfo.QueuedPodInfos { podResult, revertFn := sched.podGroupPodSchedulingAlgorithm(ctx, schedFwk, podGroupCycleState, podGroupInfo, podInfo, postFilterMode) result.podResults = append(result.podResults, podResult) - if !podResult.status.IsSuccess() && !podResult.requiresPreemption { - // When a pod is not feasible and doesn't require preemption, it means that it failed scheduling. - if podResult.status.IsRejected() { - // If the pod is rejected, the pod group can still be schedulable as long as the permit check can succeed. - continue - } + if revertFn != nil { + // We unreserve the pod at the end of the whole algorithm (via defer) because it should be ultimately returned to the queue, + // without binding it yet. We only assumed the pod to check feasibility of subsequent pods in the group. + defer revertFn() + } + + if !podResult.status.IsSuccess() && !podResult.status.IsRejected() { // When the algorithm returns error or unexpected status, stop evaluating the rest of the pods. result.status = fwk.AsStatus(fmt.Errorf("failed to schedule other pod from a pod group: %w", podResult.status.AsError())) - // Clear the waiting on preemption flag that could have been set by previous pods. - result.waitingOnPreemption = false break } - // At this point, the pod has passed the scheduling algorithm with the Permit status being either Success or Wait. - // We unreserve the pod at the end of the whole algorithm (via defer) because it should be ultimately returned to the queue, - // without binding it yet. We only assumed the pod to check feasibility of subsequent pods in the group. - defer revertFn() + // PlacementFeasible plugins check if the pod group can meet its constraints. + // Those plugins need to be run after each pod is scheduled. + placementFeasibleStatus := schedFwk.RunPlacementFeasiblePlugins(ctx, placementCycleState, podGroupInfo) + + if placementFeasibleStatus.IsError() { + // When the algorithm returns error or unexpected status, stop evaluating the rest of the pods. + result.status = fwk.AsStatus(fmt.Errorf("failed to evaluate placement feasibility: %w", placementFeasibleStatus.AsError())) + break + } + + // UnschedulableAndUnresolvable from PlacementFeasible plugins indicates that the pod group + // cannot meet its constraints regardless of how many more pods we check. + // We can stop the scheduling loop early. + if placementFeasibleStatus.Code() == fwk.UnschedulableAndUnresolvable { + // We need to change the code to Unschedulable to make sure preemption can be fired. + result.status = fwk.NewStatus(fwk.Unschedulable).WithError(placementFeasibleStatus.AsError()) + break + } + + result.status = placementFeasibleStatus requiresPreemption = requiresPreemption || podResult.requiresPreemption - if podResult.permitStatus.IsSuccess() { - // When the permit returns success for any pod, the pod group is schedulable. - if requiresPreemption { - // If any preemption is required, the whole pod group requires it to be feasible. - result.status = fwk.NewStatus(fwk.Unschedulable, "pod group is waiting for preemption to complete").WithError(errPodGroupUnschedulable) - // Set the waitingOnPreemption to true iff the pod group is feasible (Permit returned Success) and requires preemption. - result.waitingOnPreemption = true - } else { - result.status = nil // Success - } + anyScheduled = anyScheduled || podResult.status.IsSuccess() + } + + if result.status.IsSuccess() { + if requiresPreemption { + // If any preemption is required, the whole pod group requires it to be feasible. + result.status = fwk.NewStatus(fwk.Unschedulable, "pod group is waiting for preemption to complete").WithError(errPodGroupUnschedulable) + result.waitingOnPreemption = true + } else if !anyScheduled { + // The framework requires at least 1 pod to be scheduled in order to return a success status. + result.status = fwk.NewStatus(fwk.Unschedulable).WithError(errPodGroupUnschedulable) } } @@ -406,7 +424,7 @@ func (sched *Scheduler) podGroupSchedulingDefaultAlgorithm(ctx context.Context, } // podGroupPodSchedulingAlgorithm runs a scheduling algorithm for individual pod from a pod group. -// It returns the algorithm result and, if successful or the preemption is required, the permit status together with the revert function. +// It returns the algorithm result together with the revert function. func (sched *Scheduler) podGroupPodSchedulingAlgorithm(ctx context.Context, schedFwk framework.Framework, podGroupCycleState *framework.CycleState, podGroupInfo *framework.QueuedPodGroupInfo, podInfo *framework.QueuedPodInfo, postFilterMode podGroupPostFilterMode) (algorithmResult, func()) { pod := podInfo.Pod podCtx := initPodSchedulingContext(ctx, pod, podGroupCycleState, postFilterMode) @@ -455,37 +473,12 @@ func (sched *Scheduler) podGroupPodSchedulingAlgorithm(ctx context.Context, sche } } - _, permitStatus := schedFwk.RunPermitPlugins(ctx, podCtx.state, assumedPodInfo.Pod, scheduleResult.SuggestedHost) - if !permitStatus.IsWait() && !permitStatus.IsSuccess() { - revertFn() - if permitStatus.IsRejected() { - fitErr := &framework.FitError{ - NumAllNodes: 1, - Pod: assumedPodInfo.Pod, - Diagnosis: framework.Diagnosis{ - NodeToStatus: framework.NewDefaultNodeToStatus(), - }, - } - fitErr.Diagnosis.NodeToStatus.Set(scheduleResult.SuggestedHost, permitStatus) - fitErr.Diagnosis.AddPluginStatus(permitStatus) - permitStatus = fwk.NewStatus(permitStatus.Code()).WithError(fitErr) - } - return algorithmResult{ - pod: pod, - scheduleResult: ScheduleResult{nominatingInfo: clearNominatedNode}, - podCtx: podCtx, - schedulingDuration: time.Since(start), - status: permitStatus, - }, nil - } - return algorithmResult{ pod: pod, scheduleResult: scheduleResult, podCtx: podCtx, schedulingDuration: time.Since(start), status: status, - permitStatus: permitStatus, requiresPreemption: requiresPreemption, }, revertFn } @@ -552,8 +545,8 @@ func (sched *Scheduler) submitPodGroupAlgorithmResult(ctx context.Context, sched sched.FailureHandler(ctx, schedFwk, pInfo, podGroupResult.status, nominatingInfo, podSchedulingStart) } else { // Pod group is unschedulable, so the pod has to be marked as unschedulable. - // Its rejection status is set to its permit status message. - status := fwk.NewStatus(fwk.Unschedulable, podResult.permitStatus.Message()).WithError(errPodGroupUnschedulable) + // Its rejection status is set to the pod group's status message. + status := fwk.NewStatus(fwk.Unschedulable, podGroupResult.status.Message()).WithError(errPodGroupUnschedulable) sched.FailureHandler(ctx, schedFwk, pInfo, status, clearNominatedNode, podSchedulingStart) } unschedulablePods++ @@ -567,8 +560,8 @@ func (sched *Scheduler) submitPodGroupAlgorithmResult(ctx context.Context, sched // such as heterogeneous pod group or using inter-pod dependencies. if podResult.requiresPreemption && !podGroupResult.waitingOnPreemption { // Pod group is unschedulable, so the pod has to be marked as unschedulable, even if it just required preemption. - // Its rejection status is set to its permit status message, as the preemption message is no longer relevant. - status := fwk.NewStatus(fwk.Unschedulable, podResult.permitStatus.Message()).WithError(errPodGroupUnschedulable) + // Its rejection status is set to the pod group's status message, as the preemption message is no longer relevant. + status := fwk.NewStatus(fwk.Unschedulable, podGroupResult.status.Message()).WithError(errPodGroupUnschedulable) sched.FailureHandler(ctx, schedFwk, pInfo, status, clearNominatedNode, podSchedulingStart) } else { // When a pod is unschedulable or preemption is required, just call the FailureHandler. diff --git a/pkg/scheduler/schedule_one_podgroup_test.go b/pkg/scheduler/schedule_one_podgroup_test.go index cfa34514a84..53c74a84913 100644 --- a/pkg/scheduler/schedule_one_podgroup_test.go +++ b/pkg/scheduler/schedule_one_podgroup_test.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "sort" "sync" "testing" "time" @@ -66,14 +67,12 @@ type fakePodGroupPlugin struct { postFilterResult map[string]*fwk.PostFilterResult postFilterStatus map[string]*fwk.Status postFilterCalled bool - permitStatus map[string]*fwk.Status podGroupPostFilterStatus *fwk.Status podGroupPostFilterCalled bool } var _ fwk.FilterPlugin = &fakePodGroupPlugin{} var _ fwk.PostFilterPlugin = &fakePodGroupPlugin{} -var _ fwk.PermitPlugin = &fakePodGroupPlugin{} var _ framework.PodGroupPostFilterPlugin = &fakePodGroupPlugin{} func (mp *fakePodGroupPlugin) Name() string { return "FakePodGroupPlugin" } @@ -93,13 +92,6 @@ func (mp *fakePodGroupPlugin) PostFilter(ctx context.Context, state fwk.CycleSta return &fwk.PostFilterResult{NominatingInfo: clearNominatedNode}, fwk.NewStatus(fwk.Unschedulable, "default fake postfilter failure") } -func (mp *fakePodGroupPlugin) Permit(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) (*fwk.Status, time.Duration) { - if status, ok := mp.permitStatus[pod.Name]; ok { - return status, 0 - } - return fwk.NewStatus(fwk.Unschedulable, "default fake permit failure"), 0 -} - func (mp *fakePodGroupPlugin) PodGroupPostFilter(ctx context.Context, pg *schedulingv1alpha2.PodGroup, pods []*v1.Pod, pgSchedulingFunc framework.PodGroupSchedulingFunc) *fwk.Status { mp.podGroupPostFilterCalled = true if mp.podGroupPostFilterStatus != nil { @@ -108,6 +100,79 @@ func (mp *fakePodGroupPlugin) PodGroupPostFilter(ctx context.Context, pg *schedu return fwk.NewStatus(fwk.Unschedulable, "default fake podgroup postfilter failure") } +type fakePlacementFeasibleState struct { + podCount int +} + +func (s *fakePlacementFeasibleState) Clone() fwk.StateData { + return &fakePlacementFeasibleState{podCount: s.podCount} +} + +const fakePlacementFeasibleStateKey fwk.StateKey = "fakePlacementFeasibleState" + +type fakePlacementFeasiblePlugin struct { + placementFeasibleStatuses [][]fwk.Code + placementCount int +} + +var _ framework.PlacementFeasiblePlugin = &fakePlacementFeasiblePlugin{} +var _ fwk.PermitPlugin = &fakePlacementFeasiblePlugin{} + +func (mp *fakePlacementFeasiblePlugin) Name() string { + // Name has to be GangScheduling for the PlacementFeasible plugin to be used. + // TODO: Remove this once the restriction is taken off. + return names.GangScheduling +} + +// PlacementFeasible simulates the evaluation of pod group placement constraints. +// The mock uses a 2D slice (placementFeasibleStatuses) where: +// - The outer slice represents distinct placements (e.g., when evaluating multiple topology placements). +// - The inner slice represents the pod-by-pod evaluation within a single placement. +// It uses placementCycleState to track how many pods have been evaluated in the current placement. +func (mp *fakePlacementFeasiblePlugin) PlacementFeasible(ctx context.Context, placementCycleState fwk.PodGroupCycleState, podGroupInfo fwk.PodGroupInfo) *fwk.Status { + // If no mock statuses are configured, always succeed. + if len(mp.placementFeasibleStatuses) == 0 { + return nil + } + + // Each placement gets a new placementCycleState. Check if this state has been initialized. + stateData, err := placementCycleState.Read(fakePlacementFeasibleStateKey) + if err != nil { + // We haven't considered this placement before (this is the first pod evaluated in this placement). + // Initialize the state and increment the placement count. + stateData = &fakePlacementFeasibleState{podCount: 0} + placementCycleState.Write(fakePlacementFeasibleStateKey, stateData) + mp.placementCount++ + } + + // Increment the count of pods evaluated for the current placement attempt. + state := stateData.(*fakePlacementFeasibleState) + state.podCount++ + + placementIndex := mp.placementCount - 1 + podIndex := state.podCount - 1 + + // Ensure the indices are within the bounds of the injected statuses. + if placementIndex < len(mp.placementFeasibleStatuses) { + // If the specific placement has no pod statuses configured, treat it as always successful. + if len(mp.placementFeasibleStatuses[placementIndex]) == 0 { + return nil + } + if podIndex < len(mp.placementFeasibleStatuses[placementIndex]) { + code := mp.placementFeasibleStatuses[placementIndex][podIndex] + if code == fwk.Success { + return nil + } + return fwk.NewStatus(code, "injected placementFeasible status") + } + } + return fwk.AsStatus(fmt.Errorf("exceeded the expected call count")) +} + +func (mp *fakePlacementFeasiblePlugin) Permit(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) (*fwk.Status, time.Duration) { + return fwk.NewStatus(fwk.Error, "unexpected call to permit"), 0 +} + func TestPodGroupInfoForPod(t *testing.T) { groupName := "pg" p1 := st.MakePod().Name("p1").Namespace("ns1").UID("p1").PodGroupName(groupName).Priority(100).Obj() @@ -590,6 +655,7 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) { tests := []struct { name string plugin *fakePodGroupPlugin + podGroupFeasibleStatuses []fwk.Code expectedGroupStatusCode fwk.Code expectedGroupWaitingOnPreemption bool expectedPodStatus map[string]*fwk.Status @@ -605,11 +671,6 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) { "p2": nil, "p3": nil, }, - permitStatus: map[string]*fwk.Status{ - "p1": nil, - "p2": nil, - "p3": nil, - }, }, expectedGroupStatusCode: fwk.Success, expectedPodStatus: map[string]*fwk.Status{ @@ -619,18 +680,18 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) { }, }, { - name: "All pods feasible, two waiting", + name: "All pods feasible, podGroup schedulable with 3 schedulable pods", plugin: &fakePodGroupPlugin{ filterStatus: map[string]*fwk.Status{ "p1": nil, "p2": nil, "p3": nil, }, - permitStatus: map[string]*fwk.Status{ - "p1": fwk.NewStatus(fwk.Wait), - "p2": fwk.NewStatus(fwk.Wait), - "p3": nil, - }, + }, + podGroupFeasibleStatuses: []fwk.Code{ + fwk.Unschedulable, + fwk.Unschedulable, + fwk.Success, }, expectedGroupStatusCode: fwk.Success, expectedPodStatus: map[string]*fwk.Status{ @@ -640,18 +701,18 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) { }, }, { - name: "All pods feasible, but all waiting", + name: "All pods feasible, podGroup unschedulable", plugin: &fakePodGroupPlugin{ filterStatus: map[string]*fwk.Status{ "p1": nil, "p2": nil, "p3": nil, }, - permitStatus: map[string]*fwk.Status{ - "p1": fwk.NewStatus(fwk.Wait), - "p2": fwk.NewStatus(fwk.Wait), - "p3": fwk.NewStatus(fwk.Wait), - }, + }, + podGroupFeasibleStatuses: []fwk.Code{ + fwk.Unschedulable, + fwk.Unschedulable, + fwk.Unschedulable, }, expectedGroupStatusCode: fwk.Unschedulable, expectedPodStatus: map[string]*fwk.Status{ @@ -661,45 +722,41 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) { }, }, { - name: "All pods feasible, but last waiting", + name: "All pods feasible, podGroup UnschedulableAndUnresolvable", plugin: &fakePodGroupPlugin{ filterStatus: map[string]*fwk.Status{ "p1": nil, "p2": nil, "p3": nil, }, - permitStatus: map[string]*fwk.Status{ - "p1": nil, - "p2": nil, - "p3": fwk.NewStatus(fwk.Wait), - }, }, - expectedGroupStatusCode: fwk.Success, + podGroupFeasibleStatuses: []fwk.Code{ + fwk.UnschedulableAndUnresolvable, + }, + expectedGroupStatusCode: fwk.Unschedulable, expectedPodStatus: map[string]*fwk.Status{ "p1": nil, - "p2": nil, - "p3": nil, + // The algorithm stopped evaluating the pods after UnschedulableAndUnresolvable was received from PlacementFeasible. }, }, { - name: "All pods feasible, one waiting, one unschedulable", + name: "All pods feasible, podGroup UnschedulableAndUnresolvable with 2 pods", plugin: &fakePodGroupPlugin{ filterStatus: map[string]*fwk.Status{ "p1": nil, "p2": nil, "p3": nil, }, - permitStatus: map[string]*fwk.Status{ - "p1": fwk.NewStatus(fwk.Wait), - "p2": fwk.NewStatus(fwk.Unschedulable), - "p3": nil, - }, }, - expectedGroupStatusCode: fwk.Success, + podGroupFeasibleStatuses: []fwk.Code{ + fwk.Unschedulable, + fwk.UnschedulableAndUnresolvable, + }, + expectedGroupStatusCode: fwk.Unschedulable, expectedPodStatus: map[string]*fwk.Status{ "p1": nil, - "p2": fwk.NewStatus(fwk.Unschedulable), - "p3": nil, + "p2": nil, + // The algorithm stopped evaluating the pods after UnschedulableAndUnresolvable was received from PlacementFeasible. }, }, { @@ -720,11 +777,6 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) { "p2": {NominatingInfo: &fwk.NominatingInfo{NominatedNodeName: "node1", NominatingMode: fwk.ModeOverride}}, "p3": {NominatingInfo: &fwk.NominatingInfo{NominatedNodeName: "node1", NominatingMode: fwk.ModeOverride}}, }, - permitStatus: map[string]*fwk.Status{ - "p1": nil, - "p2": nil, - "p3": nil, - }, }, expectedGroupStatusCode: fwk.Unschedulable, expectedGroupWaitingOnPreemption: true, @@ -741,45 +793,7 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) { skipForTAS: true, }, { - name: "All pods require preemption, but waiting", - plugin: &fakePodGroupPlugin{ - filterStatus: map[string]*fwk.Status{ - "p1": fwk.NewStatus(fwk.Unschedulable), - "p2": fwk.NewStatus(fwk.Unschedulable), - "p3": fwk.NewStatus(fwk.Unschedulable), - }, - postFilterStatus: map[string]*fwk.Status{ - "p1": nil, - "p2": nil, - "p3": nil, - }, - postFilterResult: map[string]*fwk.PostFilterResult{ - "p1": {NominatingInfo: &fwk.NominatingInfo{NominatedNodeName: "node1", NominatingMode: fwk.ModeOverride}}, - "p2": {NominatingInfo: &fwk.NominatingInfo{NominatedNodeName: "node1", NominatingMode: fwk.ModeOverride}}, - "p3": {NominatingInfo: &fwk.NominatingInfo{NominatedNodeName: "node1", NominatingMode: fwk.ModeOverride}}, - }, - permitStatus: map[string]*fwk.Status{ - "p1": fwk.NewStatus(fwk.Wait), - "p2": fwk.NewStatus(fwk.Wait), - "p3": fwk.NewStatus(fwk.Wait), - }, - }, - expectedGroupStatusCode: fwk.Unschedulable, - expectedPodStatus: map[string]*fwk.Status{ - "p1": fwk.NewStatus(fwk.Unschedulable), - "p2": fwk.NewStatus(fwk.Unschedulable), - "p3": fwk.NewStatus(fwk.Unschedulable), - }, - expectedPreemption: map[string]bool{ - "p1": true, - "p2": true, - "p3": true, - }, - // preemption is not yet implemented for TAS - skipForTAS: true, - }, - { - name: "One pod requires preemption, but waiting, two are feasible", + name: "One pod requires preemption, podGroup schedulable with 2 schedulable pods", plugin: &fakePodGroupPlugin{ filterStatus: map[string]*fwk.Status{ "p1": fwk.NewStatus(fwk.Unschedulable), @@ -792,11 +806,11 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) { postFilterResult: map[string]*fwk.PostFilterResult{ "p1": {NominatingInfo: &fwk.NominatingInfo{NominatedNodeName: "node1", NominatingMode: fwk.ModeOverride}}, }, - permitStatus: map[string]*fwk.Status{ - "p1": fwk.NewStatus(fwk.Wait), - "p2": fwk.NewStatus(fwk.Wait), - "p3": nil, - }, + }, + podGroupFeasibleStatuses: []fwk.Code{ + fwk.Unschedulable, + fwk.Unschedulable, + fwk.Success, }, expectedGroupStatusCode: fwk.Unschedulable, expectedGroupWaitingOnPreemption: true, @@ -829,10 +843,6 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) { "p1": {NominatingInfo: &fwk.NominatingInfo{NominatedNodeName: "node1", NominatingMode: fwk.ModeOverride}}, "p2": {NominatingInfo: clearNominatedNode}, }, - permitStatus: map[string]*fwk.Status{ - "p1": nil, - "p3": nil, - }, }, expectedGroupStatusCode: fwk.Unschedulable, expectedGroupWaitingOnPreemption: true, @@ -858,6 +868,11 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) { "p3": fwk.NewStatus(fwk.Unschedulable), }, }, + podGroupFeasibleStatuses: []fwk.Code{ + fwk.Success, + fwk.Success, + fwk.Success, + }, expectedGroupStatusCode: fwk.Unschedulable, expectedPodStatus: map[string]*fwk.Status{ "p1": fwk.NewStatus(fwk.Unschedulable), @@ -873,11 +888,6 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) { "p2": fwk.NewStatus(fwk.Error), "p3": nil, }, - permitStatus: map[string]*fwk.Status{ - "p1": nil, - "p2": nil, - "p3": nil, - }, }, expectedGroupStatusCode: fwk.Error, expectedPodStatus: map[string]*fwk.Status{ @@ -887,23 +897,22 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) { }, }, { - name: "Any permit returned Error", + name: "Any placementFeasible returned Error", plugin: &fakePodGroupPlugin{ filterStatus: map[string]*fwk.Status{ "p1": nil, "p2": nil, "p3": nil, }, - permitStatus: map[string]*fwk.Status{ - "p1": nil, - "p2": fwk.NewStatus(fwk.Error), - "p3": nil, - }, + }, + podGroupFeasibleStatuses: []fwk.Code{ + fwk.Success, + fwk.Error, }, expectedGroupStatusCode: fwk.Error, expectedPodStatus: map[string]*fwk.Status{ "p1": nil, - "p2": fwk.NewStatus(fwk.Error), + "p2": nil, // The algorithm stopped evaluating the pods after an error occurred, so a "p3" status is not expected. }, }, @@ -915,11 +924,6 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) { "p2": fwk.NewStatus(fwk.Error), "p3": nil, }, - permitStatus: map[string]*fwk.Status{ - "p1": nil, - "p2": nil, - "p3": nil, - }, postFilterResult: map[string]*fwk.PostFilterResult{ "p1": {NominatingInfo: &fwk.NominatingInfo{NominatedNodeName: "node1", NominatingMode: fwk.ModeOverride}}, }, @@ -932,26 +936,25 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) { }, }, { - name: "Any permit returned Error while waiting on preemption", + name: "Any placementFeasible returned Error while waiting on preemption", plugin: &fakePodGroupPlugin{ filterStatus: map[string]*fwk.Status{ "p1": fwk.NewStatus(fwk.Unschedulable), "p2": nil, "p3": nil, }, - permitStatus: map[string]*fwk.Status{ - "p1": nil, - "p2": fwk.NewStatus(fwk.Error), - "p3": nil, - }, postFilterResult: map[string]*fwk.PostFilterResult{ "p1": {NominatingInfo: &fwk.NominatingInfo{NominatedNodeName: "node1", NominatingMode: fwk.ModeOverride}}, }, }, + podGroupFeasibleStatuses: []fwk.Code{ + fwk.Success, + fwk.Error, + }, expectedGroupStatusCode: fwk.Error, expectedPodStatus: map[string]*fwk.Status{ "p1": fwk.NewStatus(fwk.Unschedulable), - "p2": fwk.NewStatus(fwk.Error), + "p2": nil, // The algorithm stopped evaluating the pods after an error occurred, so a "p3" status is not expected. }, }, @@ -964,12 +967,10 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) { } name := fmt.Sprintf("%s (TopologyAwareWorkloadScheduling=%v)", tt.name, tasEnabled) t.Run(name, func(t *testing.T) { - if tasEnabled { - featuregatetesting.SetFeatureGatesDuringTest(t, utilfeature.DefaultFeatureGate, featuregatetesting.FeatureOverrides{ - features.TopologyAwareWorkloadScheduling: true, - features.GenericWorkload: true, - }) - } + featuregatetesting.SetFeatureGatesDuringTest(t, utilfeature.DefaultFeatureGate, featuregatetesting.FeatureOverrides{ + features.TopologyAwareWorkloadScheduling: tasEnabled, + features.GenericWorkload: true, + }) logger, ctx := ktesting.NewTestContext(t) @@ -978,6 +979,10 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) { queue := internalqueue.NewSchedulingQueue(nil, informerFactory) snapshot := internalcache.NewEmptySnapshot() + placementFeasiblePlugin := &fakePlacementFeasiblePlugin{ + placementFeasibleStatuses: [][]fwk.Code{tt.podGroupFeasibleStatuses}, + } + registry := []tf.RegisterPluginFunc{ tf.RegisterFilterPlugin(tt.plugin.Name(), func(_ context.Context, _ runtime.Object, _ fwk.Handle) (fwk.Plugin, error) { return tt.plugin, nil @@ -985,8 +990,8 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) { tf.RegisterPostFilterPlugin(tt.plugin.Name(), func(_ context.Context, _ runtime.Object, _ fwk.Handle) (fwk.Plugin, error) { return tt.plugin, nil }), - tf.RegisterPermitPlugin(tt.plugin.Name(), func(_ context.Context, _ runtime.Object, _ fwk.Handle) (fwk.Plugin, error) { - return tt.plugin, nil + tf.RegisterPermitPlugin(placementFeasiblePlugin.Name(), func(_ context.Context, _ runtime.Object, _ fwk.Handle) (fwk.Plugin, error) { + return placementFeasiblePlugin, nil }), } schedFwk, err := tf.NewFramework(ctx, @@ -1044,18 +1049,10 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) { if podResult.scheduleResult.SuggestedHost != "node1" { t.Errorf("Expected pod %s suggested host: node1, got: %v", podName, podResult.scheduleResult.SuggestedHost) } - if expected, ok := tt.plugin.permitStatus[podName]; ok { - if podResult.permitStatus.Code() != expected.Code() { - t.Errorf("Expected pod %s permit status code: %v, got: %v", podName, expected.Code(), podResult.permitStatus.Code()) - } - } } else { if podResult.scheduleResult.SuggestedHost != "" { t.Errorf("Expected pod %s empty suggested host, got: %v", podName, podResult.scheduleResult.SuggestedHost) } - if podResult.permitStatus != nil { - t.Errorf("Expected pod %s nil permit status, got: %v", podName, podResult.permitStatus) - } } if expected, ok := tt.expectedPreemption[podName]; ok { if podResult.requiresPreemption != expected { @@ -1068,6 +1065,26 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) { } } +// This is only needed because PlacementFeasiblePlugin mock doesn't know which placement it processes and has to assume the order of placements. +// TODO: Remove this once the PlacementFeasiblePlugin becomes order-independent or another way of ordering placements is introduced. +type orderedPlacementPlugin struct { + fwk.PlacementGeneratePlugin +} + +func (p *orderedPlacementPlugin) Name() string { + return p.PlacementGeneratePlugin.Name() + "_Ordered" +} + +func (p *orderedPlacementPlugin) GeneratePlacements(ctx context.Context, state fwk.PodGroupCycleState, podGroup fwk.PodGroupInfo, parentPlacement *fwk.Placement) (*fwk.GeneratePlacementsResult, *fwk.Status) { + result, status := p.PlacementGeneratePlugin.GeneratePlacements(ctx, state, podGroup, parentPlacement) + if status.IsSuccess() && result != nil { + sort.Slice(result.Placements, func(i, j int) bool { + return result.Placements[i].Name < result.Placements[j].Name + }) + } + return result, status +} + func TestSubmitPodGroupAlgorithmResult(t *testing.T) { testNode := st.MakeNode().Name("node1").UID("node1").Obj() @@ -1108,15 +1125,12 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) { podResults: []algorithmResult{{ scheduleResult: ScheduleResult{SuggestedHost: "node1"}, status: nil, - permitStatus: nil, }, { scheduleResult: ScheduleResult{SuggestedHost: "node1"}, status: nil, - permitStatus: nil, }, { scheduleResult: ScheduleResult{SuggestedHost: "node1"}, status: nil, - permitStatus: nil, }}, }, expectBound: sets.New("p1", "p2", "p3"), @@ -1127,21 +1141,18 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) { }, }, { - name: "All pods feasible, but all waiting", + name: "All pods feasible, but podGroup unschedulable", algorithmResult: podGroupAlgorithmResult{ status: fwk.NewStatus(fwk.Unschedulable, "not enough capacity for the gang"), podResults: []algorithmResult{{ scheduleResult: ScheduleResult{SuggestedHost: "node1"}, status: nil, - permitStatus: fwk.NewStatus(fwk.Wait), }, { scheduleResult: ScheduleResult{SuggestedHost: "node1"}, status: nil, - permitStatus: fwk.NewStatus(fwk.Wait), }, { scheduleResult: ScheduleResult{SuggestedHost: "node1"}, status: nil, - permitStatus: fwk.NewStatus(fwk.Wait), }}, }, expectFailed: sets.New("p1", "p2", "p3"), @@ -1153,45 +1164,18 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) { }, }, { - name: "All pods feasible, but last waiting", + name: "One unschedulable", algorithmResult: podGroupAlgorithmResult{ status: nil, podResults: []algorithmResult{{ scheduleResult: ScheduleResult{SuggestedHost: "node1"}, status: nil, - permitStatus: nil, - }, { - scheduleResult: ScheduleResult{SuggestedHost: "node1"}, - status: nil, - permitStatus: nil, - }, { - scheduleResult: ScheduleResult{SuggestedHost: "node1"}, - status: nil, - permitStatus: fwk.NewStatus(fwk.Wait), - }}, - }, - expectBound: sets.New("p1", "p2", "p3"), - expectCondition: &metav1.Condition{ - Type: schedulingapi.PodGroupScheduled, - Status: metav1.ConditionTrue, - Reason: "Scheduled", - }, - }, - { - name: "All pods feasible, one waiting, one unschedulable", - algorithmResult: podGroupAlgorithmResult{ - status: nil, - podResults: []algorithmResult{{ - scheduleResult: ScheduleResult{SuggestedHost: "node1"}, - status: nil, - permitStatus: fwk.NewStatus(fwk.Wait), }, { scheduleResult: ScheduleResult{SuggestedHost: "", nominatingInfo: clearNominatedNode}, status: fwk.NewStatus(fwk.Unschedulable), }, { scheduleResult: ScheduleResult{SuggestedHost: "node1"}, status: nil, - permitStatus: nil, }}, }, expectBound: sets.New("p1", "p3"), @@ -1214,7 +1198,6 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) { }, status: fwk.NewStatus(fwk.Unschedulable), requiresPreemption: true, - permitStatus: nil, }, { scheduleResult: ScheduleResult{ SuggestedHost: "node1", @@ -1222,7 +1205,6 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) { }, status: fwk.NewStatus(fwk.Unschedulable), requiresPreemption: true, - permitStatus: nil, }, { scheduleResult: ScheduleResult{ SuggestedHost: "node1", @@ -1230,7 +1212,6 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) { }, status: fwk.NewStatus(fwk.Unschedulable), requiresPreemption: true, - permitStatus: nil, }}, }, expectPreempting: sets.New("p1", "p2", "p3"), @@ -1242,45 +1223,7 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) { }, }, { - name: "All pods require preemption, but waiting", - algorithmResult: podGroupAlgorithmResult{ - status: fwk.NewStatus(fwk.Unschedulable, "preemption required but not feasible"), - podResults: []algorithmResult{{ - scheduleResult: ScheduleResult{ - SuggestedHost: "node1", - nominatingInfo: &fwk.NominatingInfo{NominatedNodeName: "node1", NominatingMode: fwk.ModeOverride}, - }, - status: fwk.NewStatus(fwk.Unschedulable), - requiresPreemption: true, - permitStatus: fwk.NewStatus(fwk.Wait), - }, { - scheduleResult: ScheduleResult{ - SuggestedHost: "node1", - nominatingInfo: &fwk.NominatingInfo{NominatedNodeName: "node1", NominatingMode: fwk.ModeOverride}, - }, - status: fwk.NewStatus(fwk.Unschedulable), - requiresPreemption: true, - permitStatus: fwk.NewStatus(fwk.Wait), - }, { - scheduleResult: ScheduleResult{ - SuggestedHost: "node1", - nominatingInfo: &fwk.NominatingInfo{NominatedNodeName: "node1", NominatingMode: fwk.ModeOverride}, - }, - status: fwk.NewStatus(fwk.Unschedulable), - requiresPreemption: true, - permitStatus: fwk.NewStatus(fwk.Wait), - }}, - }, - expectFailed: sets.New("p1", "p2", "p3"), - expectCondition: &metav1.Condition{ - Type: schedulingapi.PodGroupScheduled, - Status: metav1.ConditionFalse, - Reason: schedulingapi.PodGroupReasonUnschedulable, - Message: "preemption required but not feasible", - }, - }, - { - name: "One pod requires preemption, but waiting, two are feasible", + name: "One pod requires preemption, two are feasible", algorithmResult: podGroupAlgorithmResult{ status: fwk.NewStatus(fwk.Unschedulable, "waiting for preemption to complete"), waitingOnPreemption: true, @@ -1291,15 +1234,12 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) { }, status: fwk.NewStatus(fwk.Unschedulable), requiresPreemption: true, - permitStatus: fwk.NewStatus(fwk.Wait), }, { scheduleResult: ScheduleResult{SuggestedHost: "node1"}, status: nil, - permitStatus: fwk.NewStatus(fwk.Wait), }, { scheduleResult: ScheduleResult{SuggestedHost: "node1"}, status: nil, - permitStatus: nil, }}, }, expectPreempting: sets.New("p1", "p2", "p3"), @@ -1322,7 +1262,6 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) { }, status: fwk.NewStatus(fwk.Unschedulable), requiresPreemption: true, - permitStatus: nil, }, { scheduleResult: ScheduleResult{SuggestedHost: "", nominatingInfo: clearNominatedNode}, status: fwk.NewStatus(fwk.Unschedulable), @@ -1410,7 +1349,6 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) { podResults: []algorithmResult{{ scheduleResult: ScheduleResult{SuggestedHost: "node1"}, status: nil, - permitStatus: nil, }, { scheduleResult: ScheduleResult{SuggestedHost: "", nominatingInfo: clearNominatedNode}, status: fwk.NewStatus(fwk.Error, "plugin returned error"), @@ -1436,7 +1374,6 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) { }, status: fwk.NewStatus(fwk.Unschedulable), requiresPreemption: true, - permitStatus: nil, }, { scheduleResult: ScheduleResult{SuggestedHost: "", nominatingInfo: clearNominatedNode}, status: fwk.NewStatus(fwk.Error, "internal failure"), @@ -1470,15 +1407,12 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) { podResults: []algorithmResult{{ scheduleResult: ScheduleResult{SuggestedHost: "node1"}, status: nil, - permitStatus: nil, }, { scheduleResult: ScheduleResult{SuggestedHost: "node1"}, status: nil, - permitStatus: nil, }, { scheduleResult: ScheduleResult{SuggestedHost: "node1"}, status: nil, - permitStatus: nil, }}, }, expectBound: sets.New("p1", "p2", "p3"), @@ -1542,7 +1476,6 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) { podResults: []algorithmResult{{ scheduleResult: ScheduleResult{SuggestedHost: "node1"}, status: nil, - permitStatus: nil, }, { scheduleResult: ScheduleResult{SuggestedHost: "", nominatingInfo: clearNominatedNode}, status: fwk.NewStatus(fwk.Error), @@ -2133,8 +2066,9 @@ func TestPodGroupSchedulingPlacementAlgorithm(t *testing.T) { podGroupPod := st.MakePod().Name("foo").UID("foo").PodGroupName("pg").Obj() tests := map[string]struct { - placementPlugin fakePlacementPlugin - expectedResult podGroupAlgorithmResult + placementPlugin fakePlacementPlugin + placementFeasibleStatuses [][]fwk.Code + expectedResult podGroupAlgorithmResult }{ "respects higher score of placement1": { placementPlugin: fakePlacementPlugin{ @@ -2191,7 +2125,7 @@ func TestPodGroupSchedulingPlacementAlgorithm(t *testing.T) { generatePlacementsResult: map[string][]string{}, }, expectedResult: podGroupAlgorithmResult{ - status: fwk.NewStatus(fwk.Unschedulable, "no feasible placements found").WithPlugin("FakePlacementPlugin"), + status: fwk.NewStatus(fwk.Unschedulable, "no feasible placements found").WithPlugin("FakePlacementPlugin_Ordered"), }, }, "when all placements are infeasible, returns unschedulable": { @@ -2224,6 +2158,40 @@ func TestPodGroupSchedulingPlacementAlgorithm(t *testing.T) { status: fwk.NewStatus(fwk.Unschedulable, "0/2 placements are available, first placement status: pod group is unschedulable"), }, }, + "when all placements are infeasible, but pods are feasible, returns unschedulable": { + placementPlugin: fakePlacementPlugin{ + generatePlacementsResult: map[string][]string{ + "placement1": {nodes[0].Name}, + "placement2": {nodes[1].Name}, + }, + scorePlacementsResult: map[string]int64{ + "placement1": 1, + "placement2": 2, + }, + filterStatus: map[string]*fwk.Status{ + nodes[0].Name: nil, + nodes[1].Name: nil, + }, + }, + placementFeasibleStatuses: [][]fwk.Code{ + {fwk.Unschedulable}, + {fwk.Unschedulable}, + }, + expectedResult: podGroupAlgorithmResult{ + podResults: []algorithmResult{ + { + pod: podGroupPod, + scheduleResult: ScheduleResult{ + SuggestedHost: "node1", + EvaluatedNodes: 1, + FeasibleNodes: 1, + }, + status: nil, + }, + }, + status: fwk.NewStatus(fwk.Unschedulable, "0/2 placements are available, first placement status: injected placementFeasible status"), + }, + }, "filters out infeasible placements": { placementPlugin: fakePlacementPlugin{ generatePlacementsResult: map[string][]string{ @@ -2251,6 +2219,38 @@ func TestPodGroupSchedulingPlacementAlgorithm(t *testing.T) { status: nil, }, }, + "filters out infeasible placements with feasible pods": { + placementPlugin: fakePlacementPlugin{ + generatePlacementsResult: map[string][]string{ + "placement1": {nodes[0].Name}, + "placement2": {nodes[1].Name}, + }, + scorePlacementsResult: map[string]int64{ + "placement1": 1, + "placement2": 2, + }, + filterStatus: map[string]*fwk.Status{ + nodes[1].Name: nil, + }, + }, + placementFeasibleStatuses: [][]fwk.Code{ + {fwk.Success}, // placement1 + {fwk.Unschedulable}, // placement2 + }, + expectedResult: podGroupAlgorithmResult{ + podResults: []algorithmResult{ + { + pod: podGroupPod, + scheduleResult: ScheduleResult{ + SuggestedHost: nodes[0].Name, + EvaluatedNodes: 1, + FeasibleNodes: 1, + }, + }, + }, + status: nil, + }, + }, "when generate plugin fails, returns error": { placementPlugin: fakePlacementPlugin{ generatePlacementsStatus: fwk.NewStatus(fwk.Error, "error for test"), @@ -2286,9 +2286,15 @@ func TestPodGroupSchedulingPlacementAlgorithm(t *testing.T) { tt.placementPlugin.name = "FakePlacementPlugin" + orderedPlacementGeneratePlugin := &orderedPlacementPlugin{&tt.placementPlugin} + + placementFeasiblePlugin := &fakePlacementFeasiblePlugin{ + placementFeasibleStatuses: tt.placementFeasibleStatuses, + } + registry := []tf.RegisterPluginFunc{ - tf.RegisterPlacementGeneratePlugin(tt.placementPlugin.Name(), func(_ context.Context, _ runtime.Object, _ fwk.Handle) (fwk.Plugin, error) { - return &tt.placementPlugin, nil + tf.RegisterPlacementGeneratePlugin(orderedPlacementGeneratePlugin.Name(), func(_ context.Context, _ runtime.Object, _ fwk.Handle) (fwk.Plugin, error) { + return orderedPlacementGeneratePlugin, nil }), tf.RegisterPlacementScorePlugin(tt.placementPlugin.Name(), func(_ context.Context, _ runtime.Object, _ fwk.Handle) (fwk.Plugin, error) { return &tt.placementPlugin, nil @@ -2296,6 +2302,9 @@ func TestPodGroupSchedulingPlacementAlgorithm(t *testing.T) { tf.RegisterFilterPlugin(tt.placementPlugin.Name(), func(_ context.Context, _ runtime.Object, _ fwk.Handle) (fwk.Plugin, error) { return &tt.placementPlugin, nil }), + tf.RegisterPermitPlugin(placementFeasiblePlugin.Name(), func(_ context.Context, _ runtime.Object, _ fwk.Handle) (fwk.Plugin, error) { + return placementFeasiblePlugin, nil + }), } snapshot := internalcache.NewEmptySnapshot()