Use PlacementFeasible instead of Permit in PodGroup scheduling cycle

This commit is contained in:
Bartosz 2026-05-13 14:03:09 +00:00
parent 6b22e1ccc7
commit 68acdf68c1
No known key found for this signature in database
2 changed files with 287 additions and 285 deletions

View file

@ -68,7 +68,7 @@ func (sched *Scheduler) scheduleOnePodGroup(ctx context.Context, podGroupInfo *f
}
sched.skipPodGroupPodSchedule(ctx, schedFwk, podGroupInfo)
// skipPodGroupPodSchedule could remove some pods from the pod group.
// Pod group constraints will be re-evaluated on a Permit phase.
// Pod group constraints will be re-evaluated on a PlacementFeasible phase.
// Now, verify if it has any pods left.
if len(podGroupInfo.QueuedPodInfos) == 0 {
return
@ -304,9 +304,6 @@ type algorithmResult struct {
requiresPreemption bool
// status is a scheduling algorithm status.
status *fwk.Status
// permitStatus is a status of the permit check.
// This is only set when the `status` is success or the `requiresPreemption` is true.
permitStatus *fwk.Status
}
// podGroupPostFilterMode defines how the pod group algorithm should run post filters plugins.
@ -364,41 +361,62 @@ func (sched *Scheduler) podGroupSchedulingDefaultAlgorithm(ctx context.Context,
waitingOnPreemption: false,
}
placementCycleState := framework.NewCycleState()
placementCycleState.SetRecordPluginMetrics(true)
placementCycleState.SetPodGroupSchedulingCycle(podGroupCycleState)
logger := klog.FromContext(ctx)
logger.V(5).Info("Running a pod group scheduling algorithm", "podGroup", klog.KObj(podGroupInfo), "unscheduledPodsCount", len(podGroupInfo.QueuedPodInfos))
requiresPreemption := false
anyScheduled := false
for _, podInfo := range podGroupInfo.QueuedPodInfos {
podResult, revertFn := sched.podGroupPodSchedulingAlgorithm(ctx, schedFwk, podGroupCycleState, podGroupInfo, podInfo, postFilterMode)
result.podResults = append(result.podResults, podResult)
if !podResult.status.IsSuccess() && !podResult.requiresPreemption {
// When a pod is not feasible and doesn't require preemption, it means that it failed scheduling.
if podResult.status.IsRejected() {
// If the pod is rejected, the pod group can still be schedulable as long as the permit check can succeed.
continue
}
if revertFn != nil {
// We unreserve the pod at the end of the whole algorithm (via defer) because it should be ultimately returned to the queue,
// without binding it yet. We only assumed the pod to check feasibility of subsequent pods in the group.
defer revertFn()
}
if !podResult.status.IsSuccess() && !podResult.status.IsRejected() {
// When the algorithm returns error or unexpected status, stop evaluating the rest of the pods.
result.status = fwk.AsStatus(fmt.Errorf("failed to schedule other pod from a pod group: %w", podResult.status.AsError()))
// Clear the waiting on preemption flag that could have been set by previous pods.
result.waitingOnPreemption = false
break
}
// At this point, the pod has passed the scheduling algorithm with the Permit status being either Success or Wait.
// We unreserve the pod at the end of the whole algorithm (via defer) because it should be ultimately returned to the queue,
// without binding it yet. We only assumed the pod to check feasibility of subsequent pods in the group.
defer revertFn()
// PlacementFeasible plugins check if the pod group can meet its constraints.
// Those plugins need to be run after each pod is scheduled.
placementFeasibleStatus := schedFwk.RunPlacementFeasiblePlugins(ctx, placementCycleState, podGroupInfo)
if placementFeasibleStatus.IsError() {
// When the algorithm returns error or unexpected status, stop evaluating the rest of the pods.
result.status = fwk.AsStatus(fmt.Errorf("failed to evaluate placement feasibility: %w", placementFeasibleStatus.AsError()))
break
}
// UnschedulableAndUnresolvable from PlacementFeasible plugins indicates that the pod group
// cannot meet its constraints regardless of how many more pods we check.
// We can stop the scheduling loop early.
if placementFeasibleStatus.Code() == fwk.UnschedulableAndUnresolvable {
// We need to change the code to Unschedulable to make sure preemption can be fired.
result.status = fwk.NewStatus(fwk.Unschedulable).WithError(placementFeasibleStatus.AsError())
break
}
result.status = placementFeasibleStatus
requiresPreemption = requiresPreemption || podResult.requiresPreemption
if podResult.permitStatus.IsSuccess() {
// When the permit returns success for any pod, the pod group is schedulable.
if requiresPreemption {
// If any preemption is required, the whole pod group requires it to be feasible.
result.status = fwk.NewStatus(fwk.Unschedulable, "pod group is waiting for preemption to complete").WithError(errPodGroupUnschedulable)
// Set the waitingOnPreemption to true iff the pod group is feasible (Permit returned Success) and requires preemption.
result.waitingOnPreemption = true
} else {
result.status = nil // Success
}
anyScheduled = anyScheduled || podResult.status.IsSuccess()
}
if result.status.IsSuccess() {
if requiresPreemption {
// If any preemption is required, the whole pod group requires it to be feasible.
result.status = fwk.NewStatus(fwk.Unschedulable, "pod group is waiting for preemption to complete").WithError(errPodGroupUnschedulable)
result.waitingOnPreemption = true
} else if !anyScheduled {
// The framework requires at least 1 pod to be scheduled in order to return a success status.
result.status = fwk.NewStatus(fwk.Unschedulable).WithError(errPodGroupUnschedulable)
}
}
@ -406,7 +424,7 @@ func (sched *Scheduler) podGroupSchedulingDefaultAlgorithm(ctx context.Context,
}
// podGroupPodSchedulingAlgorithm runs a scheduling algorithm for individual pod from a pod group.
// It returns the algorithm result and, if successful or the preemption is required, the permit status together with the revert function.
// It returns the algorithm result together with the revert function.
func (sched *Scheduler) podGroupPodSchedulingAlgorithm(ctx context.Context, schedFwk framework.Framework, podGroupCycleState *framework.CycleState, podGroupInfo *framework.QueuedPodGroupInfo, podInfo *framework.QueuedPodInfo, postFilterMode podGroupPostFilterMode) (algorithmResult, func()) {
pod := podInfo.Pod
podCtx := initPodSchedulingContext(ctx, pod, podGroupCycleState, postFilterMode)
@ -455,37 +473,12 @@ func (sched *Scheduler) podGroupPodSchedulingAlgorithm(ctx context.Context, sche
}
}
_, permitStatus := schedFwk.RunPermitPlugins(ctx, podCtx.state, assumedPodInfo.Pod, scheduleResult.SuggestedHost)
if !permitStatus.IsWait() && !permitStatus.IsSuccess() {
revertFn()
if permitStatus.IsRejected() {
fitErr := &framework.FitError{
NumAllNodes: 1,
Pod: assumedPodInfo.Pod,
Diagnosis: framework.Diagnosis{
NodeToStatus: framework.NewDefaultNodeToStatus(),
},
}
fitErr.Diagnosis.NodeToStatus.Set(scheduleResult.SuggestedHost, permitStatus)
fitErr.Diagnosis.AddPluginStatus(permitStatus)
permitStatus = fwk.NewStatus(permitStatus.Code()).WithError(fitErr)
}
return algorithmResult{
pod: pod,
scheduleResult: ScheduleResult{nominatingInfo: clearNominatedNode},
podCtx: podCtx,
schedulingDuration: time.Since(start),
status: permitStatus,
}, nil
}
return algorithmResult{
pod: pod,
scheduleResult: scheduleResult,
podCtx: podCtx,
schedulingDuration: time.Since(start),
status: status,
permitStatus: permitStatus,
requiresPreemption: requiresPreemption,
}, revertFn
}
@ -552,8 +545,8 @@ func (sched *Scheduler) submitPodGroupAlgorithmResult(ctx context.Context, sched
sched.FailureHandler(ctx, schedFwk, pInfo, podGroupResult.status, nominatingInfo, podSchedulingStart)
} else {
// Pod group is unschedulable, so the pod has to be marked as unschedulable.
// Its rejection status is set to its permit status message.
status := fwk.NewStatus(fwk.Unschedulable, podResult.permitStatus.Message()).WithError(errPodGroupUnschedulable)
// Its rejection status is set to the pod group's status message.
status := fwk.NewStatus(fwk.Unschedulable, podGroupResult.status.Message()).WithError(errPodGroupUnschedulable)
sched.FailureHandler(ctx, schedFwk, pInfo, status, clearNominatedNode, podSchedulingStart)
}
unschedulablePods++
@ -567,8 +560,8 @@ func (sched *Scheduler) submitPodGroupAlgorithmResult(ctx context.Context, sched
// such as heterogeneous pod group or using inter-pod dependencies.
if podResult.requiresPreemption && !podGroupResult.waitingOnPreemption {
// Pod group is unschedulable, so the pod has to be marked as unschedulable, even if it just required preemption.
// Its rejection status is set to its permit status message, as the preemption message is no longer relevant.
status := fwk.NewStatus(fwk.Unschedulable, podResult.permitStatus.Message()).WithError(errPodGroupUnschedulable)
// Its rejection status is set to the pod group's status message, as the preemption message is no longer relevant.
status := fwk.NewStatus(fwk.Unschedulable, podGroupResult.status.Message()).WithError(errPodGroupUnschedulable)
sched.FailureHandler(ctx, schedFwk, pInfo, status, clearNominatedNode, podSchedulingStart)
} else {
// When a pod is unschedulable or preemption is required, just call the FailureHandler.

View file

@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"sort"
"sync"
"testing"
"time"
@ -66,14 +67,12 @@ type fakePodGroupPlugin struct {
postFilterResult map[string]*fwk.PostFilterResult
postFilterStatus map[string]*fwk.Status
postFilterCalled bool
permitStatus map[string]*fwk.Status
podGroupPostFilterStatus *fwk.Status
podGroupPostFilterCalled bool
}
var _ fwk.FilterPlugin = &fakePodGroupPlugin{}
var _ fwk.PostFilterPlugin = &fakePodGroupPlugin{}
var _ fwk.PermitPlugin = &fakePodGroupPlugin{}
var _ framework.PodGroupPostFilterPlugin = &fakePodGroupPlugin{}
func (mp *fakePodGroupPlugin) Name() string { return "FakePodGroupPlugin" }
@ -93,13 +92,6 @@ func (mp *fakePodGroupPlugin) PostFilter(ctx context.Context, state fwk.CycleSta
return &fwk.PostFilterResult{NominatingInfo: clearNominatedNode}, fwk.NewStatus(fwk.Unschedulable, "default fake postfilter failure")
}
func (mp *fakePodGroupPlugin) Permit(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) (*fwk.Status, time.Duration) {
if status, ok := mp.permitStatus[pod.Name]; ok {
return status, 0
}
return fwk.NewStatus(fwk.Unschedulable, "default fake permit failure"), 0
}
func (mp *fakePodGroupPlugin) PodGroupPostFilter(ctx context.Context, pg *schedulingv1alpha2.PodGroup, pods []*v1.Pod, pgSchedulingFunc framework.PodGroupSchedulingFunc) *fwk.Status {
mp.podGroupPostFilterCalled = true
if mp.podGroupPostFilterStatus != nil {
@ -108,6 +100,79 @@ func (mp *fakePodGroupPlugin) PodGroupPostFilter(ctx context.Context, pg *schedu
return fwk.NewStatus(fwk.Unschedulable, "default fake podgroup postfilter failure")
}
type fakePlacementFeasibleState struct {
podCount int
}
func (s *fakePlacementFeasibleState) Clone() fwk.StateData {
return &fakePlacementFeasibleState{podCount: s.podCount}
}
const fakePlacementFeasibleStateKey fwk.StateKey = "fakePlacementFeasibleState"
type fakePlacementFeasiblePlugin struct {
placementFeasibleStatuses [][]fwk.Code
placementCount int
}
var _ framework.PlacementFeasiblePlugin = &fakePlacementFeasiblePlugin{}
var _ fwk.PermitPlugin = &fakePlacementFeasiblePlugin{}
func (mp *fakePlacementFeasiblePlugin) Name() string {
// Name has to be GangScheduling for the PlacementFeasible plugin to be used.
// TODO: Remove this once the restriction is taken off.
return names.GangScheduling
}
// PlacementFeasible simulates the evaluation of pod group placement constraints.
// The mock uses a 2D slice (placementFeasibleStatuses) where:
// - The outer slice represents distinct placements (e.g., when evaluating multiple topology placements).
// - The inner slice represents the pod-by-pod evaluation within a single placement.
// It uses placementCycleState to track how many pods have been evaluated in the current placement.
func (mp *fakePlacementFeasiblePlugin) PlacementFeasible(ctx context.Context, placementCycleState fwk.PodGroupCycleState, podGroupInfo fwk.PodGroupInfo) *fwk.Status {
// If no mock statuses are configured, always succeed.
if len(mp.placementFeasibleStatuses) == 0 {
return nil
}
// Each placement gets a new placementCycleState. Check if this state has been initialized.
stateData, err := placementCycleState.Read(fakePlacementFeasibleStateKey)
if err != nil {
// We haven't considered this placement before (this is the first pod evaluated in this placement).
// Initialize the state and increment the placement count.
stateData = &fakePlacementFeasibleState{podCount: 0}
placementCycleState.Write(fakePlacementFeasibleStateKey, stateData)
mp.placementCount++
}
// Increment the count of pods evaluated for the current placement attempt.
state := stateData.(*fakePlacementFeasibleState)
state.podCount++
placementIndex := mp.placementCount - 1
podIndex := state.podCount - 1
// Ensure the indices are within the bounds of the injected statuses.
if placementIndex < len(mp.placementFeasibleStatuses) {
// If the specific placement has no pod statuses configured, treat it as always successful.
if len(mp.placementFeasibleStatuses[placementIndex]) == 0 {
return nil
}
if podIndex < len(mp.placementFeasibleStatuses[placementIndex]) {
code := mp.placementFeasibleStatuses[placementIndex][podIndex]
if code == fwk.Success {
return nil
}
return fwk.NewStatus(code, "injected placementFeasible status")
}
}
return fwk.AsStatus(fmt.Errorf("exceeded the expected call count"))
}
func (mp *fakePlacementFeasiblePlugin) Permit(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) (*fwk.Status, time.Duration) {
return fwk.NewStatus(fwk.Error, "unexpected call to permit"), 0
}
func TestPodGroupInfoForPod(t *testing.T) {
groupName := "pg"
p1 := st.MakePod().Name("p1").Namespace("ns1").UID("p1").PodGroupName(groupName).Priority(100).Obj()
@ -590,6 +655,7 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) {
tests := []struct {
name string
plugin *fakePodGroupPlugin
podGroupFeasibleStatuses []fwk.Code
expectedGroupStatusCode fwk.Code
expectedGroupWaitingOnPreemption bool
expectedPodStatus map[string]*fwk.Status
@ -605,11 +671,6 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) {
"p2": nil,
"p3": nil,
},
permitStatus: map[string]*fwk.Status{
"p1": nil,
"p2": nil,
"p3": nil,
},
},
expectedGroupStatusCode: fwk.Success,
expectedPodStatus: map[string]*fwk.Status{
@ -619,18 +680,18 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) {
},
},
{
name: "All pods feasible, two waiting",
name: "All pods feasible, podGroup schedulable with 3 schedulable pods",
plugin: &fakePodGroupPlugin{
filterStatus: map[string]*fwk.Status{
"p1": nil,
"p2": nil,
"p3": nil,
},
permitStatus: map[string]*fwk.Status{
"p1": fwk.NewStatus(fwk.Wait),
"p2": fwk.NewStatus(fwk.Wait),
"p3": nil,
},
},
podGroupFeasibleStatuses: []fwk.Code{
fwk.Unschedulable,
fwk.Unschedulable,
fwk.Success,
},
expectedGroupStatusCode: fwk.Success,
expectedPodStatus: map[string]*fwk.Status{
@ -640,18 +701,18 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) {
},
},
{
name: "All pods feasible, but all waiting",
name: "All pods feasible, podGroup unschedulable",
plugin: &fakePodGroupPlugin{
filterStatus: map[string]*fwk.Status{
"p1": nil,
"p2": nil,
"p3": nil,
},
permitStatus: map[string]*fwk.Status{
"p1": fwk.NewStatus(fwk.Wait),
"p2": fwk.NewStatus(fwk.Wait),
"p3": fwk.NewStatus(fwk.Wait),
},
},
podGroupFeasibleStatuses: []fwk.Code{
fwk.Unschedulable,
fwk.Unschedulable,
fwk.Unschedulable,
},
expectedGroupStatusCode: fwk.Unschedulable,
expectedPodStatus: map[string]*fwk.Status{
@ -661,45 +722,41 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) {
},
},
{
name: "All pods feasible, but last waiting",
name: "All pods feasible, podGroup UnschedulableAndUnresolvable",
plugin: &fakePodGroupPlugin{
filterStatus: map[string]*fwk.Status{
"p1": nil,
"p2": nil,
"p3": nil,
},
permitStatus: map[string]*fwk.Status{
"p1": nil,
"p2": nil,
"p3": fwk.NewStatus(fwk.Wait),
},
},
expectedGroupStatusCode: fwk.Success,
podGroupFeasibleStatuses: []fwk.Code{
fwk.UnschedulableAndUnresolvable,
},
expectedGroupStatusCode: fwk.Unschedulable,
expectedPodStatus: map[string]*fwk.Status{
"p1": nil,
"p2": nil,
"p3": nil,
// The algorithm stopped evaluating the pods after UnschedulableAndUnresolvable was received from PlacementFeasible.
},
},
{
name: "All pods feasible, one waiting, one unschedulable",
name: "All pods feasible, podGroup UnschedulableAndUnresolvable with 2 pods",
plugin: &fakePodGroupPlugin{
filterStatus: map[string]*fwk.Status{
"p1": nil,
"p2": nil,
"p3": nil,
},
permitStatus: map[string]*fwk.Status{
"p1": fwk.NewStatus(fwk.Wait),
"p2": fwk.NewStatus(fwk.Unschedulable),
"p3": nil,
},
},
expectedGroupStatusCode: fwk.Success,
podGroupFeasibleStatuses: []fwk.Code{
fwk.Unschedulable,
fwk.UnschedulableAndUnresolvable,
},
expectedGroupStatusCode: fwk.Unschedulable,
expectedPodStatus: map[string]*fwk.Status{
"p1": nil,
"p2": fwk.NewStatus(fwk.Unschedulable),
"p3": nil,
"p2": nil,
// The algorithm stopped evaluating the pods after UnschedulableAndUnresolvable was received from PlacementFeasible.
},
},
{
@ -720,11 +777,6 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) {
"p2": {NominatingInfo: &fwk.NominatingInfo{NominatedNodeName: "node1", NominatingMode: fwk.ModeOverride}},
"p3": {NominatingInfo: &fwk.NominatingInfo{NominatedNodeName: "node1", NominatingMode: fwk.ModeOverride}},
},
permitStatus: map[string]*fwk.Status{
"p1": nil,
"p2": nil,
"p3": nil,
},
},
expectedGroupStatusCode: fwk.Unschedulable,
expectedGroupWaitingOnPreemption: true,
@ -741,45 +793,7 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) {
skipForTAS: true,
},
{
name: "All pods require preemption, but waiting",
plugin: &fakePodGroupPlugin{
filterStatus: map[string]*fwk.Status{
"p1": fwk.NewStatus(fwk.Unschedulable),
"p2": fwk.NewStatus(fwk.Unschedulable),
"p3": fwk.NewStatus(fwk.Unschedulable),
},
postFilterStatus: map[string]*fwk.Status{
"p1": nil,
"p2": nil,
"p3": nil,
},
postFilterResult: map[string]*fwk.PostFilterResult{
"p1": {NominatingInfo: &fwk.NominatingInfo{NominatedNodeName: "node1", NominatingMode: fwk.ModeOverride}},
"p2": {NominatingInfo: &fwk.NominatingInfo{NominatedNodeName: "node1", NominatingMode: fwk.ModeOverride}},
"p3": {NominatingInfo: &fwk.NominatingInfo{NominatedNodeName: "node1", NominatingMode: fwk.ModeOverride}},
},
permitStatus: map[string]*fwk.Status{
"p1": fwk.NewStatus(fwk.Wait),
"p2": fwk.NewStatus(fwk.Wait),
"p3": fwk.NewStatus(fwk.Wait),
},
},
expectedGroupStatusCode: fwk.Unschedulable,
expectedPodStatus: map[string]*fwk.Status{
"p1": fwk.NewStatus(fwk.Unschedulable),
"p2": fwk.NewStatus(fwk.Unschedulable),
"p3": fwk.NewStatus(fwk.Unschedulable),
},
expectedPreemption: map[string]bool{
"p1": true,
"p2": true,
"p3": true,
},
// preemption is not yet implemented for TAS
skipForTAS: true,
},
{
name: "One pod requires preemption, but waiting, two are feasible",
name: "One pod requires preemption, podGroup schedulable with 2 schedulable pods",
plugin: &fakePodGroupPlugin{
filterStatus: map[string]*fwk.Status{
"p1": fwk.NewStatus(fwk.Unschedulable),
@ -792,11 +806,11 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) {
postFilterResult: map[string]*fwk.PostFilterResult{
"p1": {NominatingInfo: &fwk.NominatingInfo{NominatedNodeName: "node1", NominatingMode: fwk.ModeOverride}},
},
permitStatus: map[string]*fwk.Status{
"p1": fwk.NewStatus(fwk.Wait),
"p2": fwk.NewStatus(fwk.Wait),
"p3": nil,
},
},
podGroupFeasibleStatuses: []fwk.Code{
fwk.Unschedulable,
fwk.Unschedulable,
fwk.Success,
},
expectedGroupStatusCode: fwk.Unschedulable,
expectedGroupWaitingOnPreemption: true,
@ -829,10 +843,6 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) {
"p1": {NominatingInfo: &fwk.NominatingInfo{NominatedNodeName: "node1", NominatingMode: fwk.ModeOverride}},
"p2": {NominatingInfo: clearNominatedNode},
},
permitStatus: map[string]*fwk.Status{
"p1": nil,
"p3": nil,
},
},
expectedGroupStatusCode: fwk.Unschedulable,
expectedGroupWaitingOnPreemption: true,
@ -858,6 +868,11 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) {
"p3": fwk.NewStatus(fwk.Unschedulable),
},
},
podGroupFeasibleStatuses: []fwk.Code{
fwk.Success,
fwk.Success,
fwk.Success,
},
expectedGroupStatusCode: fwk.Unschedulable,
expectedPodStatus: map[string]*fwk.Status{
"p1": fwk.NewStatus(fwk.Unschedulable),
@ -873,11 +888,6 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) {
"p2": fwk.NewStatus(fwk.Error),
"p3": nil,
},
permitStatus: map[string]*fwk.Status{
"p1": nil,
"p2": nil,
"p3": nil,
},
},
expectedGroupStatusCode: fwk.Error,
expectedPodStatus: map[string]*fwk.Status{
@ -887,23 +897,22 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) {
},
},
{
name: "Any permit returned Error",
name: "Any placementFeasible returned Error",
plugin: &fakePodGroupPlugin{
filterStatus: map[string]*fwk.Status{
"p1": nil,
"p2": nil,
"p3": nil,
},
permitStatus: map[string]*fwk.Status{
"p1": nil,
"p2": fwk.NewStatus(fwk.Error),
"p3": nil,
},
},
podGroupFeasibleStatuses: []fwk.Code{
fwk.Success,
fwk.Error,
},
expectedGroupStatusCode: fwk.Error,
expectedPodStatus: map[string]*fwk.Status{
"p1": nil,
"p2": fwk.NewStatus(fwk.Error),
"p2": nil,
// The algorithm stopped evaluating the pods after an error occurred, so a "p3" status is not expected.
},
},
@ -915,11 +924,6 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) {
"p2": fwk.NewStatus(fwk.Error),
"p3": nil,
},
permitStatus: map[string]*fwk.Status{
"p1": nil,
"p2": nil,
"p3": nil,
},
postFilterResult: map[string]*fwk.PostFilterResult{
"p1": {NominatingInfo: &fwk.NominatingInfo{NominatedNodeName: "node1", NominatingMode: fwk.ModeOverride}},
},
@ -932,26 +936,25 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) {
},
},
{
name: "Any permit returned Error while waiting on preemption",
name: "Any placementFeasible returned Error while waiting on preemption",
plugin: &fakePodGroupPlugin{
filterStatus: map[string]*fwk.Status{
"p1": fwk.NewStatus(fwk.Unschedulable),
"p2": nil,
"p3": nil,
},
permitStatus: map[string]*fwk.Status{
"p1": nil,
"p2": fwk.NewStatus(fwk.Error),
"p3": nil,
},
postFilterResult: map[string]*fwk.PostFilterResult{
"p1": {NominatingInfo: &fwk.NominatingInfo{NominatedNodeName: "node1", NominatingMode: fwk.ModeOverride}},
},
},
podGroupFeasibleStatuses: []fwk.Code{
fwk.Success,
fwk.Error,
},
expectedGroupStatusCode: fwk.Error,
expectedPodStatus: map[string]*fwk.Status{
"p1": fwk.NewStatus(fwk.Unschedulable),
"p2": fwk.NewStatus(fwk.Error),
"p2": nil,
// The algorithm stopped evaluating the pods after an error occurred, so a "p3" status is not expected.
},
},
@ -964,12 +967,10 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) {
}
name := fmt.Sprintf("%s (TopologyAwareWorkloadScheduling=%v)", tt.name, tasEnabled)
t.Run(name, func(t *testing.T) {
if tasEnabled {
featuregatetesting.SetFeatureGatesDuringTest(t, utilfeature.DefaultFeatureGate, featuregatetesting.FeatureOverrides{
features.TopologyAwareWorkloadScheduling: true,
features.GenericWorkload: true,
})
}
featuregatetesting.SetFeatureGatesDuringTest(t, utilfeature.DefaultFeatureGate, featuregatetesting.FeatureOverrides{
features.TopologyAwareWorkloadScheduling: tasEnabled,
features.GenericWorkload: true,
})
logger, ctx := ktesting.NewTestContext(t)
@ -978,6 +979,10 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) {
queue := internalqueue.NewSchedulingQueue(nil, informerFactory)
snapshot := internalcache.NewEmptySnapshot()
placementFeasiblePlugin := &fakePlacementFeasiblePlugin{
placementFeasibleStatuses: [][]fwk.Code{tt.podGroupFeasibleStatuses},
}
registry := []tf.RegisterPluginFunc{
tf.RegisterFilterPlugin(tt.plugin.Name(), func(_ context.Context, _ runtime.Object, _ fwk.Handle) (fwk.Plugin, error) {
return tt.plugin, nil
@ -985,8 +990,8 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) {
tf.RegisterPostFilterPlugin(tt.plugin.Name(), func(_ context.Context, _ runtime.Object, _ fwk.Handle) (fwk.Plugin, error) {
return tt.plugin, nil
}),
tf.RegisterPermitPlugin(tt.plugin.Name(), func(_ context.Context, _ runtime.Object, _ fwk.Handle) (fwk.Plugin, error) {
return tt.plugin, nil
tf.RegisterPermitPlugin(placementFeasiblePlugin.Name(), func(_ context.Context, _ runtime.Object, _ fwk.Handle) (fwk.Plugin, error) {
return placementFeasiblePlugin, nil
}),
}
schedFwk, err := tf.NewFramework(ctx,
@ -1044,18 +1049,10 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) {
if podResult.scheduleResult.SuggestedHost != "node1" {
t.Errorf("Expected pod %s suggested host: node1, got: %v", podName, podResult.scheduleResult.SuggestedHost)
}
if expected, ok := tt.plugin.permitStatus[podName]; ok {
if podResult.permitStatus.Code() != expected.Code() {
t.Errorf("Expected pod %s permit status code: %v, got: %v", podName, expected.Code(), podResult.permitStatus.Code())
}
}
} else {
if podResult.scheduleResult.SuggestedHost != "" {
t.Errorf("Expected pod %s empty suggested host, got: %v", podName, podResult.scheduleResult.SuggestedHost)
}
if podResult.permitStatus != nil {
t.Errorf("Expected pod %s nil permit status, got: %v", podName, podResult.permitStatus)
}
}
if expected, ok := tt.expectedPreemption[podName]; ok {
if podResult.requiresPreemption != expected {
@ -1068,6 +1065,26 @@ func TestPodGroupSchedulingAlgorithm(t *testing.T) {
}
}
// This is only needed because PlacementFeasiblePlugin mock doesn't know which placement it processes and has to assume the order of placements.
// TODO: Remove this once the PlacementFeasiblePlugin becomes order-independent or another way of ordering placements is introduced.
type orderedPlacementPlugin struct {
fwk.PlacementGeneratePlugin
}
func (p *orderedPlacementPlugin) Name() string {
return p.PlacementGeneratePlugin.Name() + "_Ordered"
}
func (p *orderedPlacementPlugin) GeneratePlacements(ctx context.Context, state fwk.PodGroupCycleState, podGroup fwk.PodGroupInfo, parentPlacement *fwk.Placement) (*fwk.GeneratePlacementsResult, *fwk.Status) {
result, status := p.PlacementGeneratePlugin.GeneratePlacements(ctx, state, podGroup, parentPlacement)
if status.IsSuccess() && result != nil {
sort.Slice(result.Placements, func(i, j int) bool {
return result.Placements[i].Name < result.Placements[j].Name
})
}
return result, status
}
func TestSubmitPodGroupAlgorithmResult(t *testing.T) {
testNode := st.MakeNode().Name("node1").UID("node1").Obj()
@ -1108,15 +1125,12 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) {
podResults: []algorithmResult{{
scheduleResult: ScheduleResult{SuggestedHost: "node1"},
status: nil,
permitStatus: nil,
}, {
scheduleResult: ScheduleResult{SuggestedHost: "node1"},
status: nil,
permitStatus: nil,
}, {
scheduleResult: ScheduleResult{SuggestedHost: "node1"},
status: nil,
permitStatus: nil,
}},
},
expectBound: sets.New("p1", "p2", "p3"),
@ -1127,21 +1141,18 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) {
},
},
{
name: "All pods feasible, but all waiting",
name: "All pods feasible, but podGroup unschedulable",
algorithmResult: podGroupAlgorithmResult{
status: fwk.NewStatus(fwk.Unschedulable, "not enough capacity for the gang"),
podResults: []algorithmResult{{
scheduleResult: ScheduleResult{SuggestedHost: "node1"},
status: nil,
permitStatus: fwk.NewStatus(fwk.Wait),
}, {
scheduleResult: ScheduleResult{SuggestedHost: "node1"},
status: nil,
permitStatus: fwk.NewStatus(fwk.Wait),
}, {
scheduleResult: ScheduleResult{SuggestedHost: "node1"},
status: nil,
permitStatus: fwk.NewStatus(fwk.Wait),
}},
},
expectFailed: sets.New("p1", "p2", "p3"),
@ -1153,45 +1164,18 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) {
},
},
{
name: "All pods feasible, but last waiting",
name: "One unschedulable",
algorithmResult: podGroupAlgorithmResult{
status: nil,
podResults: []algorithmResult{{
scheduleResult: ScheduleResult{SuggestedHost: "node1"},
status: nil,
permitStatus: nil,
}, {
scheduleResult: ScheduleResult{SuggestedHost: "node1"},
status: nil,
permitStatus: nil,
}, {
scheduleResult: ScheduleResult{SuggestedHost: "node1"},
status: nil,
permitStatus: fwk.NewStatus(fwk.Wait),
}},
},
expectBound: sets.New("p1", "p2", "p3"),
expectCondition: &metav1.Condition{
Type: schedulingapi.PodGroupScheduled,
Status: metav1.ConditionTrue,
Reason: "Scheduled",
},
},
{
name: "All pods feasible, one waiting, one unschedulable",
algorithmResult: podGroupAlgorithmResult{
status: nil,
podResults: []algorithmResult{{
scheduleResult: ScheduleResult{SuggestedHost: "node1"},
status: nil,
permitStatus: fwk.NewStatus(fwk.Wait),
}, {
scheduleResult: ScheduleResult{SuggestedHost: "", nominatingInfo: clearNominatedNode},
status: fwk.NewStatus(fwk.Unschedulable),
}, {
scheduleResult: ScheduleResult{SuggestedHost: "node1"},
status: nil,
permitStatus: nil,
}},
},
expectBound: sets.New("p1", "p3"),
@ -1214,7 +1198,6 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) {
},
status: fwk.NewStatus(fwk.Unschedulable),
requiresPreemption: true,
permitStatus: nil,
}, {
scheduleResult: ScheduleResult{
SuggestedHost: "node1",
@ -1222,7 +1205,6 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) {
},
status: fwk.NewStatus(fwk.Unschedulable),
requiresPreemption: true,
permitStatus: nil,
}, {
scheduleResult: ScheduleResult{
SuggestedHost: "node1",
@ -1230,7 +1212,6 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) {
},
status: fwk.NewStatus(fwk.Unschedulable),
requiresPreemption: true,
permitStatus: nil,
}},
},
expectPreempting: sets.New("p1", "p2", "p3"),
@ -1242,45 +1223,7 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) {
},
},
{
name: "All pods require preemption, but waiting",
algorithmResult: podGroupAlgorithmResult{
status: fwk.NewStatus(fwk.Unschedulable, "preemption required but not feasible"),
podResults: []algorithmResult{{
scheduleResult: ScheduleResult{
SuggestedHost: "node1",
nominatingInfo: &fwk.NominatingInfo{NominatedNodeName: "node1", NominatingMode: fwk.ModeOverride},
},
status: fwk.NewStatus(fwk.Unschedulable),
requiresPreemption: true,
permitStatus: fwk.NewStatus(fwk.Wait),
}, {
scheduleResult: ScheduleResult{
SuggestedHost: "node1",
nominatingInfo: &fwk.NominatingInfo{NominatedNodeName: "node1", NominatingMode: fwk.ModeOverride},
},
status: fwk.NewStatus(fwk.Unschedulable),
requiresPreemption: true,
permitStatus: fwk.NewStatus(fwk.Wait),
}, {
scheduleResult: ScheduleResult{
SuggestedHost: "node1",
nominatingInfo: &fwk.NominatingInfo{NominatedNodeName: "node1", NominatingMode: fwk.ModeOverride},
},
status: fwk.NewStatus(fwk.Unschedulable),
requiresPreemption: true,
permitStatus: fwk.NewStatus(fwk.Wait),
}},
},
expectFailed: sets.New("p1", "p2", "p3"),
expectCondition: &metav1.Condition{
Type: schedulingapi.PodGroupScheduled,
Status: metav1.ConditionFalse,
Reason: schedulingapi.PodGroupReasonUnschedulable,
Message: "preemption required but not feasible",
},
},
{
name: "One pod requires preemption, but waiting, two are feasible",
name: "One pod requires preemption, two are feasible",
algorithmResult: podGroupAlgorithmResult{
status: fwk.NewStatus(fwk.Unschedulable, "waiting for preemption to complete"),
waitingOnPreemption: true,
@ -1291,15 +1234,12 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) {
},
status: fwk.NewStatus(fwk.Unschedulable),
requiresPreemption: true,
permitStatus: fwk.NewStatus(fwk.Wait),
}, {
scheduleResult: ScheduleResult{SuggestedHost: "node1"},
status: nil,
permitStatus: fwk.NewStatus(fwk.Wait),
}, {
scheduleResult: ScheduleResult{SuggestedHost: "node1"},
status: nil,
permitStatus: nil,
}},
},
expectPreempting: sets.New("p1", "p2", "p3"),
@ -1322,7 +1262,6 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) {
},
status: fwk.NewStatus(fwk.Unschedulable),
requiresPreemption: true,
permitStatus: nil,
}, {
scheduleResult: ScheduleResult{SuggestedHost: "", nominatingInfo: clearNominatedNode},
status: fwk.NewStatus(fwk.Unschedulable),
@ -1410,7 +1349,6 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) {
podResults: []algorithmResult{{
scheduleResult: ScheduleResult{SuggestedHost: "node1"},
status: nil,
permitStatus: nil,
}, {
scheduleResult: ScheduleResult{SuggestedHost: "", nominatingInfo: clearNominatedNode},
status: fwk.NewStatus(fwk.Error, "plugin returned error"),
@ -1436,7 +1374,6 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) {
},
status: fwk.NewStatus(fwk.Unschedulable),
requiresPreemption: true,
permitStatus: nil,
}, {
scheduleResult: ScheduleResult{SuggestedHost: "", nominatingInfo: clearNominatedNode},
status: fwk.NewStatus(fwk.Error, "internal failure"),
@ -1470,15 +1407,12 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) {
podResults: []algorithmResult{{
scheduleResult: ScheduleResult{SuggestedHost: "node1"},
status: nil,
permitStatus: nil,
}, {
scheduleResult: ScheduleResult{SuggestedHost: "node1"},
status: nil,
permitStatus: nil,
}, {
scheduleResult: ScheduleResult{SuggestedHost: "node1"},
status: nil,
permitStatus: nil,
}},
},
expectBound: sets.New("p1", "p2", "p3"),
@ -1542,7 +1476,6 @@ func TestSubmitPodGroupAlgorithmResult(t *testing.T) {
podResults: []algorithmResult{{
scheduleResult: ScheduleResult{SuggestedHost: "node1"},
status: nil,
permitStatus: nil,
}, {
scheduleResult: ScheduleResult{SuggestedHost: "", nominatingInfo: clearNominatedNode},
status: fwk.NewStatus(fwk.Error),
@ -2133,8 +2066,9 @@ func TestPodGroupSchedulingPlacementAlgorithm(t *testing.T) {
podGroupPod := st.MakePod().Name("foo").UID("foo").PodGroupName("pg").Obj()
tests := map[string]struct {
placementPlugin fakePlacementPlugin
expectedResult podGroupAlgorithmResult
placementPlugin fakePlacementPlugin
placementFeasibleStatuses [][]fwk.Code
expectedResult podGroupAlgorithmResult
}{
"respects higher score of placement1": {
placementPlugin: fakePlacementPlugin{
@ -2191,7 +2125,7 @@ func TestPodGroupSchedulingPlacementAlgorithm(t *testing.T) {
generatePlacementsResult: map[string][]string{},
},
expectedResult: podGroupAlgorithmResult{
status: fwk.NewStatus(fwk.Unschedulable, "no feasible placements found").WithPlugin("FakePlacementPlugin"),
status: fwk.NewStatus(fwk.Unschedulable, "no feasible placements found").WithPlugin("FakePlacementPlugin_Ordered"),
},
},
"when all placements are infeasible, returns unschedulable": {
@ -2224,6 +2158,40 @@ func TestPodGroupSchedulingPlacementAlgorithm(t *testing.T) {
status: fwk.NewStatus(fwk.Unschedulable, "0/2 placements are available, first placement status: pod group is unschedulable"),
},
},
"when all placements are infeasible, but pods are feasible, returns unschedulable": {
placementPlugin: fakePlacementPlugin{
generatePlacementsResult: map[string][]string{
"placement1": {nodes[0].Name},
"placement2": {nodes[1].Name},
},
scorePlacementsResult: map[string]int64{
"placement1": 1,
"placement2": 2,
},
filterStatus: map[string]*fwk.Status{
nodes[0].Name: nil,
nodes[1].Name: nil,
},
},
placementFeasibleStatuses: [][]fwk.Code{
{fwk.Unschedulable},
{fwk.Unschedulable},
},
expectedResult: podGroupAlgorithmResult{
podResults: []algorithmResult{
{
pod: podGroupPod,
scheduleResult: ScheduleResult{
SuggestedHost: "node1",
EvaluatedNodes: 1,
FeasibleNodes: 1,
},
status: nil,
},
},
status: fwk.NewStatus(fwk.Unschedulable, "0/2 placements are available, first placement status: injected placementFeasible status"),
},
},
"filters out infeasible placements": {
placementPlugin: fakePlacementPlugin{
generatePlacementsResult: map[string][]string{
@ -2251,6 +2219,38 @@ func TestPodGroupSchedulingPlacementAlgorithm(t *testing.T) {
status: nil,
},
},
"filters out infeasible placements with feasible pods": {
placementPlugin: fakePlacementPlugin{
generatePlacementsResult: map[string][]string{
"placement1": {nodes[0].Name},
"placement2": {nodes[1].Name},
},
scorePlacementsResult: map[string]int64{
"placement1": 1,
"placement2": 2,
},
filterStatus: map[string]*fwk.Status{
nodes[1].Name: nil,
},
},
placementFeasibleStatuses: [][]fwk.Code{
{fwk.Success}, // placement1
{fwk.Unschedulable}, // placement2
},
expectedResult: podGroupAlgorithmResult{
podResults: []algorithmResult{
{
pod: podGroupPod,
scheduleResult: ScheduleResult{
SuggestedHost: nodes[0].Name,
EvaluatedNodes: 1,
FeasibleNodes: 1,
},
},
},
status: nil,
},
},
"when generate plugin fails, returns error": {
placementPlugin: fakePlacementPlugin{
generatePlacementsStatus: fwk.NewStatus(fwk.Error, "error for test"),
@ -2286,9 +2286,15 @@ func TestPodGroupSchedulingPlacementAlgorithm(t *testing.T) {
tt.placementPlugin.name = "FakePlacementPlugin"
orderedPlacementGeneratePlugin := &orderedPlacementPlugin{&tt.placementPlugin}
placementFeasiblePlugin := &fakePlacementFeasiblePlugin{
placementFeasibleStatuses: tt.placementFeasibleStatuses,
}
registry := []tf.RegisterPluginFunc{
tf.RegisterPlacementGeneratePlugin(tt.placementPlugin.Name(), func(_ context.Context, _ runtime.Object, _ fwk.Handle) (fwk.Plugin, error) {
return &tt.placementPlugin, nil
tf.RegisterPlacementGeneratePlugin(orderedPlacementGeneratePlugin.Name(), func(_ context.Context, _ runtime.Object, _ fwk.Handle) (fwk.Plugin, error) {
return orderedPlacementGeneratePlugin, nil
}),
tf.RegisterPlacementScorePlugin(tt.placementPlugin.Name(), func(_ context.Context, _ runtime.Object, _ fwk.Handle) (fwk.Plugin, error) {
return &tt.placementPlugin, nil
@ -2296,6 +2302,9 @@ func TestPodGroupSchedulingPlacementAlgorithm(t *testing.T) {
tf.RegisterFilterPlugin(tt.placementPlugin.Name(), func(_ context.Context, _ runtime.Object, _ fwk.Handle) (fwk.Plugin, error) {
return &tt.placementPlugin, nil
}),
tf.RegisterPermitPlugin(placementFeasiblePlugin.Name(), func(_ context.Context, _ runtime.Object, _ fwk.Handle) (fwk.Plugin, error) {
return placementFeasiblePlugin, nil
}),
}
snapshot := internalcache.NewEmptySnapshot()