diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go index 3f8218e42dc..ff70630fa8e 100644 --- a/pkg/scheduler/framework/interface.go +++ b/pkg/scheduler/framework/interface.go @@ -165,6 +165,11 @@ type SortedScoredNodes interface { // scheduling during workload-aware preemption algorithm. type PodGroupSchedulingFunc func(ctx context.Context) (*fwk.PodGroupAssignments, *fwk.Status) +// PodGroupPostFilterResult stores information about nominated nodes for a pod group. +type PodGroupPostFilterResult struct { + NominatedNodeNames map[*v1.Pod]*fwk.NominatingInfo +} + // PodGroupPostFilterPlugin is an interface for plugins that are called // after a PodGroup cannot be scheduled. // It should not be used by any other plugin but DefaultPreemption. @@ -172,7 +177,7 @@ type PodGroupPostFilterPlugin interface { fwk.Plugin // PodGroupPostFilter is called after a PodGroup cannot be scheduled. - PodGroupPostFilter(ctx context.Context, pg *v1alpha3.PodGroup, pods []*v1.Pod, pgSchedulingFunc PodGroupSchedulingFunc) *fwk.Status + PodGroupPostFilter(ctx context.Context, pg *v1alpha3.PodGroup, pods []*v1.Pod, pgSchedulingFunc PodGroupSchedulingFunc) (*PodGroupPostFilterResult, *fwk.Status) } // Framework manages the set of plugins in use by the scheduling framework. diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go index b65a9f86d2f..b008efb821a 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go @@ -452,6 +452,6 @@ func filterPodsWithPDBViolation(podInfos []fwk.PodInfo, pdbs []*policy.PodDisrup } // PodGroupPostFilter runs a default preemption for the pod group. -func (pl *DefaultPreemption) PodGroupPostFilter(ctx context.Context, pg *schedulingapi.PodGroup, pods []*v1.Pod, pgSchedulingFunc framework.PodGroupSchedulingFunc) *fwk.Status { +func (pl *DefaultPreemption) PodGroupPostFilter(ctx context.Context, pg *schedulingapi.PodGroup, pods []*v1.Pod, pgSchedulingFunc framework.PodGroupSchedulingFunc) (*framework.PodGroupPostFilterResult, *fwk.Status) { return pl.podGroupEvaluator.Preempt(ctx, pg, pods, pgSchedulingFunc) } diff --git a/pkg/scheduler/framework/preemption/podgrouppreemption.go b/pkg/scheduler/framework/preemption/podgrouppreemption.go index 88ddb0dae30..f252359360a 100644 --- a/pkg/scheduler/framework/preemption/podgrouppreemption.go +++ b/pkg/scheduler/framework/preemption/podgrouppreemption.go @@ -61,32 +61,39 @@ func NewPodGroupEvaluator(fh fwk.Handle, executor *Executor) *PodGroupEvaluator // Preempt implements the preemption logic where the preemptor is a pod group // and the domain is the whole cluster. It preempts pod from the cluster // in order to make enough room for the pod group to be scheduled. -// It returns a status of the whole preemption process. +// It returns PodGroupPreemptorResult which contains the mapping of nominated nodes +// for each pod in the pod group, and a status of the whole preemption process. // The preemption logic modifies the NodeInfo provided by a Handle // podGroupSchedulingFunc is a function that will be run to check feasibility of a pod group // scheduling after modifying the node state. // The caller is expected to backup the NodeInfo before calling this function // And rollback the state to the backup after function is finished. -func (ev *PodGroupEvaluator) Preempt(ctx context.Context, pg *schedulingapi.PodGroup, pods []*v1.Pod, podGroupSchedulingFunc framework.PodGroupSchedulingFunc) *fwk.Status { +func (ev *PodGroupEvaluator) Preempt(ctx context.Context, pg *schedulingapi.PodGroup, pods []*v1.Pod, podGroupSchedulingFunc framework.PodGroupSchedulingFunc) (*framework.PodGroupPostFilterResult, *fwk.Status) { // In case of workload-aware preemption, the domain is whole cluster. // We do not make a snapshot of node info. Those nodes will be shared // with the PodGroup scheduling algorithm passed as podGroupSchedulingFunc. allNodes, err := ev.Handle.SnapshotSharedLister().NodeInfos().List() if err != nil { - return fwk.AsStatus(fmt.Errorf("failed to list node infos: %w", err)) + return nil, fwk.AsStatus(fmt.Errorf("failed to list node infos: %w", err)) } domain := newDomainForWorkloadPreemption(allNodes, ev.podGroupLister, "cluster-domain") preemptor := newPodGroupPreemptor(pg, pods) pdbs, err := getPodDisruptionBudgets(ev.pdbLister) if err != nil { - return fwk.AsStatus(fmt.Errorf("failed to get pod disruption budgets: %w", err)) + return nil, fwk.AsStatus(fmt.Errorf("failed to get pod disruption budgets: %w", err)) } - victims, status := ev.selectVictimsOnDomain(ctx, preemptor, domain, pdbs, podGroupSchedulingFunc) + res, status := ev.selectVictimsOnDomain(ctx, preemptor, domain, pdbs, podGroupSchedulingFunc) if !status.IsSuccess() { - return status + return nil, status } - return ev.Executor.actuatePodGroupPreemption(ctx, victims, preemptor.pods, preemptor.podGroup, names.DefaultPreemption) + status = ev.Executor.actuatePodGroupPreemption(ctx, res.victims, preemptor.pods, preemptor.podGroup, names.DefaultPreemption) + return &framework.PodGroupPostFilterResult{NominatedNodeNames: res.nominatedNodeNames}, status +} + +type selectVictimsResult struct { + nominatedNodeNames map[*v1.Pod]*fwk.NominatingInfo + victims *extenderv1.Victims } // selectVictimsOnDomain selects a set of victims that can be removed from the @@ -97,7 +104,7 @@ func (ev *PodGroupEvaluator) selectVictimsOnDomain( preemptor *podGroupPreemptor, domain *domain, pdbs []*policy.PodDisruptionBudget, - podGroupSchedulingFunc framework.PodGroupSchedulingFunc) (*extenderv1.Victims, *fwk.Status) { + podGroupSchedulingFunc framework.PodGroupSchedulingFunc) (*selectVictimsResult, *fwk.Status) { logger := klog.FromContext(ctx) nameToNode := make(map[string]fwk.NodeInfo) @@ -162,11 +169,11 @@ func (ev *PodGroupEvaluator) selectVictimsOnDomain( } // If the scheduling failed after removing all potential victims, return the status. - assignments, status := podGroupSchedulingFunc(ctx) + podGroupAssignments, status := podGroupSchedulingFunc(ctx) if !status.IsSuccess() { return nil, status } - maxScheduledCount := len(assignments.ProposedAssignments) + maxScheduledCount := len(podGroupAssignments.ProposedAssignments) sort.Slice(potentialVictims, func(i, j int) bool { return moreImportantVictim(potentialVictims[i], potentialVictims[j]) @@ -175,9 +182,9 @@ func (ev *PodGroupEvaluator) selectVictimsOnDomain( violatingVictims, nonViolatingVictims := filterVictimsWithPDBViolation(potentialVictims, pdbs) numViolatingVictim := 0 - reprieveVictim := func(v *victim) (bool, error) { + reprieveVictim := func(v *victim) (bool, *fwk.PodGroupAssignments, error) { if err := addPods(v); err != nil { - return false, err + return false, nil, err } assignments, status := podGroupSchedulingFunc(ctx) @@ -198,7 +205,7 @@ func (ev *PodGroupEvaluator) selectVictimsOnDomain( if !fits { if err := removePods(v); err != nil { - return false, err + return false, nil, err } var names []string for _, p := range v.Pods() { @@ -208,7 +215,7 @@ func (ev *PodGroupEvaluator) selectVictimsOnDomain( logger.V(6).Info("Pods are potential preemption victims on domain", "pods", pods, "domain", domain.GetName()) } - return fits, nil + return fits, assignments, nil } // Try to reprieve as many pods as possible. We first try to reprieve the PDB @@ -216,18 +223,22 @@ func (ev *PodGroupEvaluator) selectVictimsOnDomain( // from the highest importance victims. var victimsToPreempt []*victim for _, v := range violatingVictims { - if fits, err := reprieveVictim(v); err != nil { + if fits, assignments, err := reprieveVictim(v); err != nil { return nil, fwk.AsStatus(err) - } else if !fits { + } else if fits { + podGroupAssignments = assignments + } else { victimsToPreempt = append(victimsToPreempt, v) numViolatingVictim++ } } for _, v := range nonViolatingVictims { - if fits, err := reprieveVictim(v); err != nil { + if fits, assignments, err := reprieveVictim(v); err != nil { return nil, fwk.AsStatus(err) - } else if !fits { + } else if fits { + podGroupAssignments = assignments + } else { victimsToPreempt = append(victimsToPreempt, v) } } @@ -245,7 +256,17 @@ func (ev *PodGroupEvaluator) selectVictimsOnDomain( v := &extenderv1.Victims{ Pods: podsToPreempt, } - return v, nil + n := make(map[*v1.Pod]*fwk.NominatingInfo) + for _, p := range podGroupAssignments.ProposedAssignments { + if p.GetNodeName() != "" { + n[p.GetPod()] = &fwk.NominatingInfo{ + NominatingMode: fwk.ModeOverride, + NominatedNodeName: p.GetNodeName(), + } + } + } + + return &selectVictimsResult{nominatedNodeNames: n, victims: v}, nil } // isPreemptionAllowed returns whether the victim residing on nodeInfo can be preempted by the preemptor diff --git a/pkg/scheduler/framework/preemption/podgrouppreemption_test.go b/pkg/scheduler/framework/preemption/podgrouppreemption_test.go index 8384251fae3..57e827debe3 100644 --- a/pkg/scheduler/framework/preemption/podgrouppreemption_test.go +++ b/pkg/scheduler/framework/preemption/podgrouppreemption_test.go @@ -958,8 +958,13 @@ func TestPodGroupEvaluator_SelectVictimsOnDomain(t *testing.T) { scheduledCount = 3 // v1 reprieved: this increases scheduledCount! } + proposedAssignments := make([]fwk.ProposedAssignment, scheduledCount) + for i := range scheduledCount { + pod := st.MakePod().Name(fmt.Sprintf("p%d", i+1)).UID(fmt.Sprintf("p%d", i+1)).Obj() + proposedAssignments[i] = &testProposedAssignment{pod: pod, nodeName: "node1"} + } return &fwk.PodGroupAssignments{ - ProposedAssignments: make([]fwk.ProposedAssignment, scheduledCount), + ProposedAssignments: proposedAssignments, }, fwk.NewStatus(fwk.Success) }, expectedPods: []string{"v2"}, @@ -999,8 +1004,14 @@ func TestPodGroupEvaluator_SelectVictimsOnDomain(t *testing.T) { scheduledCount = 4 // v1 reprieved: this increases scheduledCount! } + proposedAssignments := make([]fwk.ProposedAssignment, scheduledCount) + for i := range scheduledCount { + pod := st.MakePod().Name(fmt.Sprintf("p%d", i+1)).UID(fmt.Sprintf("p%d", i+1)).Obj() + proposedAssignments[i] = &testProposedAssignment{pod: pod, nodeName: "node1"} + } + return &fwk.PodGroupAssignments{ - ProposedAssignments: make([]fwk.ProposedAssignment, scheduledCount), + ProposedAssignments: proposedAssignments, }, fwk.NewStatus(fwk.Success) }, expectedPods: []string{"v2"}, @@ -1143,8 +1154,12 @@ func TestPodGroupEvaluator_SelectVictimsOnDomain(t *testing.T) { if availableSlots >= neededSlots { assignmentsCount := min(availableSlots, len(tt.preemptor.Members())) + proposedAssignments := make([]fwk.ProposedAssignment, assignmentsCount) + for i := range assignmentsCount { + proposedAssignments[i] = &testProposedAssignment{pod: tt.preemptor.Members()[i], nodeName: "node1"} + } return &fwk.PodGroupAssignments{ - ProposedAssignments: make([]fwk.ProposedAssignment, assignmentsCount), + ProposedAssignments: proposedAssignments, }, fwk.NewStatus(fwk.Success) } return nil, fwk.NewStatus(fwk.Unschedulable) @@ -1154,7 +1169,7 @@ func TestPodGroupEvaluator_SelectVictimsOnDomain(t *testing.T) { podGroupLister: pgLister, } - victims, gotStatus := pl.selectVictimsOnDomain(ctx, tt.preemptor, domain, tt.pdbs, mockSchedulingFunc) + res, gotStatus := pl.selectVictimsOnDomain(ctx, tt.preemptor, domain, tt.pdbs, mockSchedulingFunc) if !gotStatus.IsSuccess() { t.Logf("SelectVictimsOnDomain failed: %v", gotStatus.Message()) } @@ -1167,12 +1182,12 @@ func TestPodGroupEvaluator_SelectVictimsOnDomain(t *testing.T) { if wantCode != fwk.Success { return } - if victims == nil { + if res == nil { t.Fatalf("expected non-nil victims on success") } gotNames := sets.Set[string]{} - for _, p := range victims.Pods { + for _, p := range res.victims.Pods { gotNames.Insert(p.Name) } wantNames := sets.New(tt.expectedPods...) @@ -1319,3 +1334,69 @@ func TestMoreImportantVictim(t *testing.T) { }) } } + +type testProposedAssignment struct { + pod *v1.Pod + nodeName string +} + +func (a *testProposedAssignment) GetPod() *v1.Pod { + return a.pod +} + +func (a *testProposedAssignment) GetNodeName() string { + return a.nodeName +} + +func TestPodGroupEvaluator_SelectVictimsOnDomain_NominatedNodes(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + + p1 := st.MakePod().Name("p1").UID("p1").Obj() + p2 := st.MakePod().Name("p2").UID("p2").Obj() + + preemptor := newPodGroupPreemptor( + st.MakePodGroup().Name("preemptor-pg").Priority(highPriority).Obj(), + []*v1.Pod{p1, p2}, + ) + + domainNodes := []fwk.NodeInfo{ + framework.NewNodeInfo(), + } + domainNodes[0].SetNode(st.MakeNode().Name("node1").Obj()) + + // Add a low priority pod as a potential victim to satisfy the check + p3 := st.MakePod().Name("p3").UID("p3").Node("node1").Priority(lowPriority).Obj() + podInfo, _ := framework.NewPodInfo(p3) + domainNodes[0].AddPodInfo(podInfo) + + pgLister := &mockPodGroupLister{podGroups: make(map[string]*schedulingapi.PodGroup)} + domain := newDomainForWorkloadPreemption(domainNodes, pgLister, "test-domain") + + mockSchedulingFunc := func(ctx context.Context) (*fwk.PodGroupAssignments, *fwk.Status) { + return &fwk.PodGroupAssignments{ + ProposedAssignments: []fwk.ProposedAssignment{ + &testProposedAssignment{pod: p1, nodeName: "node1"}, + &testProposedAssignment{pod: p2, nodeName: ""}, // No node assigned + }, + }, fwk.NewStatus(fwk.Success) + } + + pl := &PodGroupEvaluator{} + + result, gotStatus := pl.selectVictimsOnDomain(ctx, preemptor, domain, nil, mockSchedulingFunc) + if !gotStatus.IsSuccess() { + t.Fatalf("SelectVictimsOnDomain failed: %v", gotStatus.Message()) + } + + if result == nil { + t.Fatalf("expected non-nil result") + } + + if len(result.nominatedNodeNames) != 1 { + t.Errorf("Expected 1 nominated node name, got %d", len(result.nominatedNodeNames)) + } + + if info, ok := result.nominatedNodeNames[p1]; !ok || info.NominatedNodeName != "node1" { + t.Errorf("Expected p1 to be nominated for node1, got %v", info) + } +} diff --git a/pkg/scheduler/framework/runtime/framework_test.go b/pkg/scheduler/framework/runtime/framework_test.go index c1d8ac80e2a..a709cb30a1f 100644 --- a/pkg/scheduler/framework/runtime/framework_test.go +++ b/pkg/scheduler/framework/runtime/framework_test.go @@ -282,8 +282,8 @@ func (pl *TestPlugin) PlacementScoreExtensions() fwk.PlacementScoreExtensions { return nil } -func (pl *TestPlugin) PodGroupPostFilter(ctx context.Context, pg *v1alpha3.PodGroup, pods []*v1.Pod, pgSchedulingFunc framework.PodGroupSchedulingFunc) *fwk.Status { - return nil +func (pl *TestPlugin) PodGroupPostFilter(ctx context.Context, pg *v1alpha3.PodGroup, pods []*v1.Pod, pgSchedulingFunc framework.PodGroupSchedulingFunc) (*framework.PodGroupPostFilterResult, *fwk.Status) { + return nil, nil } func newTestCloseErrorPlugin(_ context.Context, injArgs runtime.Object, f fwk.Handle) (fwk.Plugin, error) { diff --git a/pkg/scheduler/schedule_one_podgroup.go b/pkg/scheduler/schedule_one_podgroup.go index 8e0a27158b0..dd4e436fc92 100644 --- a/pkg/scheduler/schedule_one_podgroup.go +++ b/pkg/scheduler/schedule_one_podgroup.go @@ -244,9 +244,14 @@ func (sched *Scheduler) podGroupCycle(ctx context.Context, schedFwk framework.Fr // Run workload aware preemption if required. If the preemption is successful, // we need to put the pods from pod group back into the scheduling queue. if sched.workloadAwarePreemptionEnabled && result.status.Code() == fwk.Unschedulable { - status := sched.runWorkloadAwarePreemption(ctx, schedFwk, podGroupCycleState, podGroupInfo) + pgPostFilterResult, status := sched.runWorkloadAwarePreemption(ctx, schedFwk, podGroupCycleState, podGroupInfo) if status.IsSuccess() { result.waitingOnPreemption = true + for i := range result.podResults { + if nodeNameInfo, ok := pgPostFilterResult.NominatedNodeNames[result.podResults[i].pod]; ok { + result.podResults[i].scheduleResult.nominatingInfo = nodeNameInfo + } + } } else if status.IsError() { result.status = status } else { @@ -264,24 +269,24 @@ func (sched *Scheduler) podGroupCycle(ctx context.Context, schedFwk framework.Fr // original state. // The function used for evaluating feasibility of pod group scheduling is // scheduler.podGroupSchedulingAlgorithm run without any post filters. -func (sched *Scheduler) runWorkloadAwarePreemption(ctx context.Context, schedFwk framework.Framework, podGroupCycleState *framework.CycleState, podGroupInfo *framework.QueuedPodGroupInfo) *fwk.Status { +func (sched *Scheduler) runWorkloadAwarePreemption(ctx context.Context, schedFwk framework.Framework, podGroupCycleState *framework.CycleState, podGroupInfo *framework.QueuedPodGroupInfo) (*framework.PodGroupPostFilterResult, *fwk.Status) { // Default preemption should be the only pod group post filter registered plugin. plugins := schedFwk.PodGroupPostFilterPlugins() if len(plugins) == 0 { - return fwk.NewStatus(fwk.Unschedulable, "default preemption plugin is not registered, workload aware preemption is disabled") + return nil, fwk.NewStatus(fwk.Unschedulable, "default preemption plugin is not registered, workload aware preemption is disabled") } pg, err := schedFwk.SharedInformerFactory().Scheduling().V1alpha3().PodGroups().Lister().PodGroups(podGroupInfo.Namespace).Get(podGroupInfo.Name) if err != nil { - return fwk.AsStatus(fmt.Errorf("failed to get pod group object: %w", err)) + return nil, fwk.AsStatus(fmt.Errorf("failed to get pod group object: %w", err)) } if pg.Spec.SchedulingConstraints != nil && len(pg.Spec.SchedulingConstraints.Topology) > 0 { - return fwk.NewStatus(fwk.Unschedulable, "workload aware preemption is not supported for pod groups with scheduling constraints") + return nil, fwk.NewStatus(fwk.Unschedulable, "workload aware preemption is not supported for pod groups with scheduling constraints") } restoreFn, err := sched.nodeInfoSnapshot.BackupSnapshot() if err != nil { - return fwk.AsStatus(fmt.Errorf("failed to backup snapshot: %w", err)) + return nil, fwk.AsStatus(fmt.Errorf("failed to backup snapshot: %w", err)) } defer restoreFn() diff --git a/pkg/scheduler/schedule_one_podgroup_test.go b/pkg/scheduler/schedule_one_podgroup_test.go index bf3235777c9..c0043e060c2 100644 --- a/pkg/scheduler/schedule_one_podgroup_test.go +++ b/pkg/scheduler/schedule_one_podgroup_test.go @@ -69,6 +69,7 @@ type fakePodGroupPlugin struct { permitStatus map[string]*fwk.Status podGroupPostFilterStatus *fwk.Status podGroupPostFilterCalled bool + podGroupPostFilterResult map[string]*fwk.NominatingInfo } var _ fwk.FilterPlugin = &fakePodGroupPlugin{} @@ -100,12 +101,19 @@ func (mp *fakePodGroupPlugin) Permit(ctx context.Context, state fwk.CycleState, return fwk.NewStatus(fwk.Unschedulable, "default fake permit failure"), 0 } -func (mp *fakePodGroupPlugin) PodGroupPostFilter(ctx context.Context, pg *schedulingv1alpha3.PodGroup, pods []*v1.Pod, pgSchedulingFunc framework.PodGroupSchedulingFunc) *fwk.Status { +func (mp *fakePodGroupPlugin) PodGroupPostFilter(ctx context.Context, pg *schedulingv1alpha3.PodGroup, pods []*v1.Pod, pgSchedulingFunc framework.PodGroupSchedulingFunc) (*framework.PodGroupPostFilterResult, *fwk.Status) { mp.podGroupPostFilterCalled = true - if mp.podGroupPostFilterStatus != nil { - return mp.podGroupPostFilterStatus + if mp.podGroupPostFilterStatus == nil { + return nil, fwk.NewStatus(fwk.Unschedulable, "default fake podgroup postfilter failure") } - return fwk.NewStatus(fwk.Unschedulable, "default fake podgroup postfilter failure") + if mp.podGroupPostFilterResult == nil { + return nil, mp.podGroupPostFilterStatus + } + n := make(map[*v1.Pod]*fwk.NominatingInfo, len(pods)) + for _, passedPod := range pods { + n[passedPod] = mp.podGroupPostFilterResult[passedPod.Name] + } + return &framework.PodGroupPostFilterResult{NominatedNodeNames: n}, mp.podGroupPostFilterStatus } func TestPodGroupInfoForPod(t *testing.T) { @@ -2697,12 +2705,13 @@ func (f *fakeDefaultPreemption) Name() string { func TestRunWorkloadAwarePreemption(t *testing.T) { tests := []struct { - name string - podGroupInfo *framework.QueuedPodGroupInfo - existingPodGroups []*schedulingv1alpha3.PodGroup - pluginsRegistered bool - pluginReturnStatus *fwk.Status - expectedStatus *fwk.Status + name string + podGroupInfo *framework.QueuedPodGroupInfo + existingPodGroups []*schedulingv1alpha3.PodGroup + pluginsRegistered bool + pluginReturnStatus *fwk.Status + pluginNominatedNodes map[string]*fwk.NominatingInfo + expectedStatus *fwk.Status }{ { name: "error when no PodGroupPostFilter plugin is registered", @@ -2759,7 +2768,10 @@ func TestRunWorkloadAwarePreemption(t *testing.T) { }, pluginsRegistered: true, pluginReturnStatus: fwk.NewStatus(fwk.Success), - expectedStatus: fwk.NewStatus(fwk.Success), + pluginNominatedNodes: map[string]*fwk.NominatingInfo{ + "p1": {NominatedNodeName: "node1", NominatingMode: fwk.ModeOverride}, + }, + expectedStatus: fwk.NewStatus(fwk.Success), }, { name: "failure when plugin returns unschedulable status", @@ -2810,12 +2822,12 @@ func TestRunWorkloadAwarePreemption(t *testing.T) { queuesort.Name: queuesort.New, defaultbinder.Name: defaultbinder.New, } - if tt.pluginsRegistered { registry["DefaultPreemption"] = func(ctx context.Context, obj runtime.Object, handle fwk.Handle) (fwk.Plugin, error) { return &fakeDefaultPreemption{ fakePodGroupPlugin: &fakePodGroupPlugin{ podGroupPostFilterStatus: tt.pluginReturnStatus, + podGroupPostFilterResult: tt.pluginNominatedNodes, }, }, nil } @@ -2871,13 +2883,145 @@ func TestRunWorkloadAwarePreemption(t *testing.T) { // Just inject logger explicitly in context to avoid panic ctx = klog.NewContext(ctx, logger) - status := sched.runWorkloadAwarePreemption(ctx, schedFwk, framework.NewCycleState(), tt.podGroupInfo) + res, status := sched.runWorkloadAwarePreemption(ctx, schedFwk, framework.NewCycleState(), tt.podGroupInfo) if tt.expectedStatus.Code() != status.Code() || tt.expectedStatus.Message() != status.Message() { t.Errorf("Unexpected status, want code %v message %q, got code %v message %q", tt.expectedStatus.Code(), tt.expectedStatus.Message(), status.Code(), status.Message()) } + + if len(tt.pluginNominatedNodes) > 0 { + for pod, nni := range res.NominatedNodeNames { + if !cmp.Equal(nni, tt.pluginNominatedNodes[pod.Name]) { + t.Errorf("Unexpected result, want %v, got %v", tt.pluginNominatedNodes, res.NominatedNodeNames) + } + } + } }) } } + +func TestPodGroupCycle_NominatedNodes(t *testing.T) { + featuregatetesting.SetFeatureGatesDuringTest(t, utilfeature.DefaultFeatureGate, featuregatetesting.FeatureOverrides{ + features.GenericWorkload: true, + features.WorkloadAwarePreemption: true, + features.GangScheduling: true, + }) + + testPodGroup := st.MakePodGroup().Name("pg").Namespace("default").Obj() + p1 := st.MakePod().Name("p1").UID("p1").PodGroupName("pg").SchedulerName("test-scheduler").Obj() + p2 := st.MakePod().Name("p2").UID("p2").PodGroupName("pg").SchedulerName("test-scheduler").Obj() + + qInfo1 := &framework.QueuedPodInfo{PodInfo: &framework.PodInfo{Pod: p1}} + qInfo2 := &framework.QueuedPodInfo{PodInfo: &framework.PodInfo{Pod: p2}} + + podGroupInfo := &framework.QueuedPodGroupInfo{ + QueuedPodInfos: []*framework.QueuedPodInfo{qInfo1, qInfo2}, + PodGroupInfo: &framework.PodGroupInfo{ + Name: "pg", + Namespace: "default", + UnscheduledPods: []*v1.Pod{p1, p2}, + }, + } + + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // Mock PodGroupPostFilter to return NominatedNodeNames + nominatedNodes := map[string]*fwk.NominatingInfo{ + p1.Name: {NominatingMode: fwk.ModeOverride, NominatedNodeName: "node1"}, + } + fakePlugin := &fakePodGroupPlugin{ + podGroupPostFilterStatus: fwk.NewStatus(fwk.Success), + podGroupPostFilterResult: nominatedNodes, + } + + registry := frameworkruntime.Registry{ + queuesort.Name: queuesort.New, + defaultbinder.Name: defaultbinder.New, + "DefaultPreemption": func(ctx context.Context, obj runtime.Object, handle fwk.Handle) (fwk.Plugin, error) { + return &fakeDefaultPreemption{fakePodGroupPlugin: fakePlugin}, nil + }, + } + + profileCfg := config.KubeSchedulerProfile{ + SchedulerName: "test-scheduler", + Plugins: &config.Plugins{ + QueueSort: config.PluginSet{ + Enabled: []config.Plugin{{Name: queuesort.Name}}, + }, + Bind: config.PluginSet{ + Enabled: []config.Plugin{{Name: defaultbinder.Name}}, + }, + PostFilter: config.PluginSet{ + Enabled: []config.Plugin{{Name: "DefaultPreemption"}}, + }, + }, + } + + client := clientsetfake.NewSimpleClientset(testPodGroup) + informerFactory := informers.NewSharedInformerFactory(client, 0) + podGroupLister := informerFactory.Scheduling().V1alpha3().PodGroups().Lister() + + informerFactory.Start(ctx.Done()) + informerFactory.WaitForCacheSync(ctx.Done()) + + schedFwk, err := frameworkruntime.NewFramework(ctx, registry, &profileCfg, + frameworkruntime.WithInformerFactory(informerFactory), + frameworkruntime.WithClientSet(client), + frameworkruntime.WithEventRecorder(events.NewFakeRecorder(100)), + ) + if err != nil { + t.Fatalf("Failed to create framework: %v", err) + } + + cache := internalcache.New(ctx, nil, true) + sched := &Scheduler{ + Profiles: profile.Map{"test-scheduler": schedFwk}, + Cache: cache, + nodeInfoSnapshot: internalcache.NewEmptySnapshot(), + podGroupLister: podGroupLister, + workloadAwarePreemptionEnabled: true, + client: client, + } + + // Mock SchedulePod to return Unschedulable initially, and success on subsequent calls + callCount := 0 + sched.SchedulePod = func(ctx context.Context, fwk framework.Framework, state fwk.CycleState, podInfo *framework.QueuedPodInfo) (ScheduleResult, error) { + callCount++ + if callCount <= 2 { + return ScheduleResult{}, &framework.FitError{Pod: podInfo.Pod, NumAllNodes: 1} + } + if podInfo.Pod.Name == "p1" { + return ScheduleResult{SuggestedHost: "node1"}, nil + } + if podInfo.Pod.Name == "p2" { + return ScheduleResult{SuggestedHost: "node2"}, nil + } + return ScheduleResult{}, fmt.Errorf("unexpected pod") + } + capturedFailureHandler := make(map[string]*fwk.NominatingInfo) + sched.FailureHandler = func(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, status *fwk.Status, nominatingInfo *fwk.NominatingInfo, start time.Time) { + capturedFailureHandler[podInfo.Pod.Name] = nominatingInfo + } + + // Just inject logger explicitly in context to avoid panic + logger, _ := ktesting.NewTestContext(t) + ctx = klog.NewContext(ctx, logger) + + sched.podGroupCycle(ctx, schedFwk, framework.NewCycleState(), podGroupInfo) + + if len(capturedFailureHandler) == 0 { + t.Fatalf("expected FailureHandler to be called") + } + + if capturedFailureHandler[p1.Name].NominatedNodeName != "node1" { + t.Errorf("Expected p1 to be nominated for node1, got %s", capturedFailureHandler[p1.Name].NominatedNodeName) + } + + if capturedFailureHandler[p2.Name] != nil { + t.Errorf("Expected p2 to not be nominated, got %v", capturedFailureHandler[p2.Name]) + } +} diff --git a/test/integration/scheduler/preemption/podgrouppreemption_test.go b/test/integration/scheduler/preemption/podgrouppreemption_test.go index aefcdc8bc35..c9c8c8f9440 100644 --- a/test/integration/scheduler/preemption/podgrouppreemption_test.go +++ b/test/integration/scheduler/preemption/podgrouppreemption_test.go @@ -41,9 +41,10 @@ import ( // TestPodGroupPreemption tests preemption scenarios involving pod groups. func TestPodGroupPreemption(t *testing.T) { featuregatetesting.SetFeatureGatesDuringTest(t, utilfeature.DefaultFeatureGate, featuregatetesting.FeatureOverrides{ - features.GenericWorkload: true, - features.GangScheduling: true, - features.WorkloadAwarePreemption: true, + features.GenericWorkload: true, + features.GangScheduling: true, + features.WorkloadAwarePreemption: true, + features.ClearingNominatedNodeNameAfterBinding: false, }) tests := []struct { @@ -56,6 +57,7 @@ func TestPodGroupPreemption(t *testing.T) { expectedScheduled []string expectedPreempted []string expectedUnschedulable []string + expectedToHaveNNNInfo []string expectedPodsPreemptedByWAP int }{ { @@ -78,6 +80,7 @@ func TestPodGroupPreemption(t *testing.T) { }, expectedScheduled: []string{"high-1", "high-2", "high-3"}, expectedPreempted: []string{"low-1", "low-2", "low-3"}, + expectedToHaveNNNInfo: []string{"high-1", "high-2", "high-3"}, expectedPodsPreemptedByWAP: 3, }, { @@ -100,6 +103,7 @@ func TestPodGroupPreemption(t *testing.T) { }, expectedScheduled: []string{"high-1", "high-2", "high-3"}, expectedPreempted: []string{"low-1", "low-2", "low-3"}, + expectedToHaveNNNInfo: []string{"high-1", "high-2", "high-3"}, expectedPodsPreemptedByWAP: 3, }, { @@ -126,6 +130,7 @@ func TestPodGroupPreemption(t *testing.T) { // high-1 and high-2 will fit on node1 if low-1 is preempted. expectedScheduled: []string{"high-1", "high-2", "high-3", "low-2"}, expectedPreempted: []string{"low-1"}, + expectedToHaveNNNInfo: []string{"high-1", "high-2", "high-3"}, expectedPodsPreemptedByWAP: 1, }, { @@ -150,8 +155,10 @@ func TestPodGroupPreemption(t *testing.T) { }, // high-1 will fit on node1 (it has 1 CPU free). // high-2 and high-3 will fit on node2 if very-low-1 is preempted. + // high-2 will preempt very-low-1 and high-3 will schedule in next cycle to free space. expectedScheduled: []string{"high-1", "high-2", "high-3", "low-1"}, expectedPreempted: []string{"very-low-1"}, + expectedToHaveNNNInfo: []string{"high-2"}, expectedPodsPreemptedByWAP: 1, }, { @@ -180,6 +187,7 @@ func TestPodGroupPreemption(t *testing.T) { }, expectedScheduled: []string{"high-1", "high-2"}, expectedPreempted: []string{"low-3"}, + expectedToHaveNNNInfo: []string{"high-1", "high-2"}, expectedPodsPreemptedByWAP: 1, }, { @@ -205,6 +213,7 @@ func TestPodGroupPreemption(t *testing.T) { }, expectedScheduled: []string{"high-1", "high-2", "high-3", "high-4"}, expectedPreempted: []string{"low-1", "low-2", "low-3", "low-4"}, + expectedToHaveNNNInfo: []string{"high-1", "high-2", "high-3", "high-4"}, expectedPodsPreemptedByWAP: 4, }, { @@ -228,6 +237,7 @@ func TestPodGroupPreemption(t *testing.T) { expectedScheduled: []string{"mid-1", "low-1", "low-2"}, expectedPreempted: []string{}, expectedUnschedulable: []string{"high-1", "high-2", "high-3"}, + expectedToHaveNNNInfo: []string{}, expectedPodsPreemptedByWAP: 0, }, { @@ -249,6 +259,7 @@ func TestPodGroupPreemption(t *testing.T) { }, expectedScheduled: []string{"high-1", "high-2", "mid-1"}, expectedPreempted: []string{"low-1", "low-2"}, + expectedToHaveNNNInfo: []string{"high-1", "high-2"}, expectedPodsPreemptedByWAP: 2, }, { @@ -270,6 +281,7 @@ func TestPodGroupPreemption(t *testing.T) { }, expectedScheduled: []string{"high-1"}, expectedPreempted: []string{"low-1", "low-2", "low-3"}, + expectedToHaveNNNInfo: []string{"high-1"}, expectedPodsPreemptedByWAP: 3, }, { @@ -291,6 +303,7 @@ func TestPodGroupPreemption(t *testing.T) { }, expectedScheduled: []string{"high-1"}, expectedPreempted: []string{"low-1", "low-2", "low-3"}, + expectedToHaveNNNInfo: []string{"high-1"}, expectedPodsPreemptedByWAP: 3, }, { @@ -312,6 +325,7 @@ func TestPodGroupPreemption(t *testing.T) { }, expectedScheduled: []string{"low-1"}, expectedPreempted: []string{"high-1", "high-2", "high-3"}, + expectedToHaveNNNInfo: []string{"low-1"}, expectedPodsPreemptedByWAP: 3, }, { @@ -334,6 +348,7 @@ func TestPodGroupPreemption(t *testing.T) { expectedScheduled: []string{"low-1", "low-2", "low-3"}, expectedPreempted: []string{}, expectedUnschedulable: []string{"high-1"}, + expectedToHaveNNNInfo: []string{}, expectedPodsPreemptedByWAP: 0, }, { @@ -354,6 +369,7 @@ func TestPodGroupPreemption(t *testing.T) { }, expectedScheduled: []string{"high-1", "low-1", "low-2"}, expectedPreempted: []string{"low-3"}, + expectedToHaveNNNInfo: []string{"high-1"}, expectedPodsPreemptedByWAP: 0, }, { @@ -378,6 +394,7 @@ func TestPodGroupPreemption(t *testing.T) { }, expectedScheduled: []string{"p-a", "p-b", "p-c"}, expectedPreempted: []string{"p1", "p2", "p3"}, + expectedToHaveNNNInfo: []string{"p-a", "p-b", "p-c"}, expectedPodsPreemptedByWAP: 3, }, { @@ -404,6 +421,7 @@ func TestPodGroupPreemption(t *testing.T) { }, expectedScheduled: []string{"p-a", "p-b", "p-c", "p4"}, expectedPreempted: []string{"p1", "p2", "p3"}, + expectedToHaveNNNInfo: []string{"p-a", "p-b", "p-c"}, expectedPodsPreemptedByWAP: 3, }, { @@ -431,6 +449,7 @@ func TestPodGroupPreemption(t *testing.T) { expectedScheduled: []string{"p-a", "p-b", "p3", "p4"}, expectedPreempted: []string{"p1", "p2"}, expectedUnschedulable: []string{"p-c"}, + expectedToHaveNNNInfo: []string{"p-a", "p-b"}, expectedPodsPreemptedByWAP: 2, }, { @@ -456,6 +475,7 @@ func TestPodGroupPreemption(t *testing.T) { }, expectedScheduled: []string{"p-a", "p-b", "p-c"}, expectedPreempted: []string{"v1", "v2", "v3"}, + expectedToHaveNNNInfo: []string{"p-a", "p-b", "p-c"}, expectedPodsPreemptedByWAP: 3, }, { @@ -482,9 +502,11 @@ func TestPodGroupPreemption(t *testing.T) { st.MakePod().Name("p-b").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Container("image").PodGroupName("preemptor-pg").ZeroTerminationGracePeriod().Priority(101).Obj(), st.MakePod().Name("p-c").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Container("image").PodGroupName("preemptor-pg").ZeroTerminationGracePeriod().Priority(100).Obj(), }, + // p-a will preempt victim-pg, p-b will schedule to empty space, so only p-a will have NNN info. expectedScheduled: []string{"p-a", "p-b", "v3", "v4"}, expectedPreempted: []string{"v1", "v2"}, expectedUnschedulable: []string{"p-c"}, + expectedToHaveNNNInfo: []string{"p-a"}, expectedPodsPreemptedByWAP: 2, }, { @@ -509,6 +531,7 @@ func TestPodGroupPreemption(t *testing.T) { }, expectedScheduled: []string{"p-a", "p-b", "p-c"}, expectedPreempted: []string{"p1", "p2", "p3"}, + expectedToHaveNNNInfo: []string{"p-a", "p-b", "p-c"}, expectedPodsPreemptedByWAP: 3, }, { @@ -535,6 +558,7 @@ func TestPodGroupPreemption(t *testing.T) { }, expectedScheduled: []string{"p-a", "p-b", "p-c", "p4"}, expectedPreempted: []string{"p1", "p2", "p3"}, + expectedToHaveNNNInfo: []string{"p-a", "p-b", "p-c"}, expectedPodsPreemptedByWAP: 3, }, { @@ -562,6 +586,7 @@ func TestPodGroupPreemption(t *testing.T) { expectedScheduled: []string{"p-a", "p-b", "p3", "p4"}, expectedPreempted: []string{"p1", "p2"}, expectedUnschedulable: []string{"p-c"}, + expectedToHaveNNNInfo: []string{"p-a", "p-b"}, expectedPodsPreemptedByWAP: 2, }, { @@ -587,6 +612,7 @@ func TestPodGroupPreemption(t *testing.T) { }, expectedScheduled: []string{"p-a", "p-b", "p-c"}, expectedPreempted: []string{"v1", "v2", "v3"}, + expectedToHaveNNNInfo: []string{"p-a", "p-b", "p-c"}, expectedPodsPreemptedByWAP: 3, }, { @@ -613,9 +639,11 @@ func TestPodGroupPreemption(t *testing.T) { st.MakePod().Name("p-b").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Container("image").PodGroupName("preemptor-pg").ZeroTerminationGracePeriod().Priority(101).Obj(), st.MakePod().Name("p-c").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Container("image").PodGroupName("preemptor-pg").ZeroTerminationGracePeriod().Priority(100).Obj(), }, + // p-a will preempt "victim-pg" and p-b will schedule to empty space, so only p-a will have NNN info. expectedScheduled: []string{"p-a", "p-b", "v3", "v4"}, expectedPreempted: []string{"v1", "v2"}, expectedUnschedulable: []string{"p-c"}, + expectedToHaveNNNInfo: []string{"p-a"}, expectedPodsPreemptedByWAP: 2, }, { @@ -812,6 +840,18 @@ func TestPodGroupPreemption(t *testing.T) { t.Errorf("Pod %s was expected to be preempted but wasn't", podName) } } + + // 9. Verify preemptor pods have nominated node name + for _, podName := range tt.expectedToHaveNNNInfo { + pod, err := cs.CoreV1().Pods(ns).Get(testCtx.Ctx, podName, metav1.GetOptions{}) + if err != nil { + t.Errorf("Error getting pod %s: %v", podName, err) + continue + } + if err := testutils.WaitForNominatedNodeName(testCtx.Ctx, cs, pod); err != nil { + t.Errorf("Error waiting for nominated node name for pod %s: %v", podName, err) + } + } }) } }