Add support for NNN in podgrouppreemption

This commit is contained in:
Antoni Basista 2026-05-08 14:43:51 +00:00
parent 122e9166ae
commit 8b8aa9c52b
8 changed files with 347 additions and 51 deletions

View file

@ -165,6 +165,11 @@ type SortedScoredNodes interface {
// scheduling during workload-aware preemption algorithm.
type PodGroupSchedulingFunc func(ctx context.Context) (*fwk.PodGroupAssignments, *fwk.Status)
// PodGroupPostFilterResult stores information about nominated nodes for a pod group.
type PodGroupPostFilterResult struct {
NominatedNodeNames map[*v1.Pod]*fwk.NominatingInfo
}
// PodGroupPostFilterPlugin is an interface for plugins that are called
// after a PodGroup cannot be scheduled.
// It should not be used by any other plugin but DefaultPreemption.
@ -172,7 +177,7 @@ type PodGroupPostFilterPlugin interface {
fwk.Plugin
// PodGroupPostFilter is called after a PodGroup cannot be scheduled.
PodGroupPostFilter(ctx context.Context, pg *v1alpha3.PodGroup, pods []*v1.Pod, pgSchedulingFunc PodGroupSchedulingFunc) *fwk.Status
PodGroupPostFilter(ctx context.Context, pg *v1alpha3.PodGroup, pods []*v1.Pod, pgSchedulingFunc PodGroupSchedulingFunc) (*PodGroupPostFilterResult, *fwk.Status)
}
// Framework manages the set of plugins in use by the scheduling framework.

View file

@ -452,6 +452,6 @@ func filterPodsWithPDBViolation(podInfos []fwk.PodInfo, pdbs []*policy.PodDisrup
}
// PodGroupPostFilter runs a default preemption for the pod group.
func (pl *DefaultPreemption) PodGroupPostFilter(ctx context.Context, pg *schedulingapi.PodGroup, pods []*v1.Pod, pgSchedulingFunc framework.PodGroupSchedulingFunc) *fwk.Status {
func (pl *DefaultPreemption) PodGroupPostFilter(ctx context.Context, pg *schedulingapi.PodGroup, pods []*v1.Pod, pgSchedulingFunc framework.PodGroupSchedulingFunc) (*framework.PodGroupPostFilterResult, *fwk.Status) {
return pl.podGroupEvaluator.Preempt(ctx, pg, pods, pgSchedulingFunc)
}

View file

@ -61,32 +61,39 @@ func NewPodGroupEvaluator(fh fwk.Handle, executor *Executor) *PodGroupEvaluator
// Preempt implements the preemption logic where the preemptor is a pod group
// and the domain is the whole cluster. It preempts pod from the cluster
// in order to make enough room for the pod group to be scheduled.
// It returns a status of the whole preemption process.
// It returns PodGroupPreemptorResult which contains the mapping of nominated nodes
// for each pod in the pod group, and a status of the whole preemption process.
// The preemption logic modifies the NodeInfo provided by a Handle
// podGroupSchedulingFunc is a function that will be run to check feasibility of a pod group
// scheduling after modifying the node state.
// The caller is expected to backup the NodeInfo before calling this function
// And rollback the state to the backup after function is finished.
func (ev *PodGroupEvaluator) Preempt(ctx context.Context, pg *schedulingapi.PodGroup, pods []*v1.Pod, podGroupSchedulingFunc framework.PodGroupSchedulingFunc) *fwk.Status {
func (ev *PodGroupEvaluator) Preempt(ctx context.Context, pg *schedulingapi.PodGroup, pods []*v1.Pod, podGroupSchedulingFunc framework.PodGroupSchedulingFunc) (*framework.PodGroupPostFilterResult, *fwk.Status) {
// In case of workload-aware preemption, the domain is whole cluster.
// We do not make a snapshot of node info. Those nodes will be shared
// with the PodGroup scheduling algorithm passed as podGroupSchedulingFunc.
allNodes, err := ev.Handle.SnapshotSharedLister().NodeInfos().List()
if err != nil {
return fwk.AsStatus(fmt.Errorf("failed to list node infos: %w", err))
return nil, fwk.AsStatus(fmt.Errorf("failed to list node infos: %w", err))
}
domain := newDomainForWorkloadPreemption(allNodes, ev.podGroupLister, "cluster-domain")
preemptor := newPodGroupPreemptor(pg, pods)
pdbs, err := getPodDisruptionBudgets(ev.pdbLister)
if err != nil {
return fwk.AsStatus(fmt.Errorf("failed to get pod disruption budgets: %w", err))
return nil, fwk.AsStatus(fmt.Errorf("failed to get pod disruption budgets: %w", err))
}
victims, status := ev.selectVictimsOnDomain(ctx, preemptor, domain, pdbs, podGroupSchedulingFunc)
res, status := ev.selectVictimsOnDomain(ctx, preemptor, domain, pdbs, podGroupSchedulingFunc)
if !status.IsSuccess() {
return status
return nil, status
}
return ev.Executor.actuatePodGroupPreemption(ctx, victims, preemptor.pods, preemptor.podGroup, names.DefaultPreemption)
status = ev.Executor.actuatePodGroupPreemption(ctx, res.victims, preemptor.pods, preemptor.podGroup, names.DefaultPreemption)
return &framework.PodGroupPostFilterResult{NominatedNodeNames: res.nominatedNodeNames}, status
}
type selectVictimsResult struct {
nominatedNodeNames map[*v1.Pod]*fwk.NominatingInfo
victims *extenderv1.Victims
}
// selectVictimsOnDomain selects a set of victims that can be removed from the
@ -97,7 +104,7 @@ func (ev *PodGroupEvaluator) selectVictimsOnDomain(
preemptor *podGroupPreemptor,
domain *domain,
pdbs []*policy.PodDisruptionBudget,
podGroupSchedulingFunc framework.PodGroupSchedulingFunc) (*extenderv1.Victims, *fwk.Status) {
podGroupSchedulingFunc framework.PodGroupSchedulingFunc) (*selectVictimsResult, *fwk.Status) {
logger := klog.FromContext(ctx)
nameToNode := make(map[string]fwk.NodeInfo)
@ -162,11 +169,11 @@ func (ev *PodGroupEvaluator) selectVictimsOnDomain(
}
// If the scheduling failed after removing all potential victims, return the status.
assignments, status := podGroupSchedulingFunc(ctx)
podGroupAssignments, status := podGroupSchedulingFunc(ctx)
if !status.IsSuccess() {
return nil, status
}
maxScheduledCount := len(assignments.ProposedAssignments)
maxScheduledCount := len(podGroupAssignments.ProposedAssignments)
sort.Slice(potentialVictims, func(i, j int) bool {
return moreImportantVictim(potentialVictims[i], potentialVictims[j])
@ -175,9 +182,9 @@ func (ev *PodGroupEvaluator) selectVictimsOnDomain(
violatingVictims, nonViolatingVictims := filterVictimsWithPDBViolation(potentialVictims, pdbs)
numViolatingVictim := 0
reprieveVictim := func(v *victim) (bool, error) {
reprieveVictim := func(v *victim) (bool, *fwk.PodGroupAssignments, error) {
if err := addPods(v); err != nil {
return false, err
return false, nil, err
}
assignments, status := podGroupSchedulingFunc(ctx)
@ -198,7 +205,7 @@ func (ev *PodGroupEvaluator) selectVictimsOnDomain(
if !fits {
if err := removePods(v); err != nil {
return false, err
return false, nil, err
}
var names []string
for _, p := range v.Pods() {
@ -208,7 +215,7 @@ func (ev *PodGroupEvaluator) selectVictimsOnDomain(
logger.V(6).Info("Pods are potential preemption victims on domain", "pods", pods, "domain", domain.GetName())
}
return fits, nil
return fits, assignments, nil
}
// Try to reprieve as many pods as possible. We first try to reprieve the PDB
@ -216,18 +223,22 @@ func (ev *PodGroupEvaluator) selectVictimsOnDomain(
// from the highest importance victims.
var victimsToPreempt []*victim
for _, v := range violatingVictims {
if fits, err := reprieveVictim(v); err != nil {
if fits, assignments, err := reprieveVictim(v); err != nil {
return nil, fwk.AsStatus(err)
} else if !fits {
} else if fits {
podGroupAssignments = assignments
} else {
victimsToPreempt = append(victimsToPreempt, v)
numViolatingVictim++
}
}
for _, v := range nonViolatingVictims {
if fits, err := reprieveVictim(v); err != nil {
if fits, assignments, err := reprieveVictim(v); err != nil {
return nil, fwk.AsStatus(err)
} else if !fits {
} else if fits {
podGroupAssignments = assignments
} else {
victimsToPreempt = append(victimsToPreempt, v)
}
}
@ -245,7 +256,17 @@ func (ev *PodGroupEvaluator) selectVictimsOnDomain(
v := &extenderv1.Victims{
Pods: podsToPreempt,
}
return v, nil
n := make(map[*v1.Pod]*fwk.NominatingInfo)
for _, p := range podGroupAssignments.ProposedAssignments {
if p.GetNodeName() != "" {
n[p.GetPod()] = &fwk.NominatingInfo{
NominatingMode: fwk.ModeOverride,
NominatedNodeName: p.GetNodeName(),
}
}
}
return &selectVictimsResult{nominatedNodeNames: n, victims: v}, nil
}
// isPreemptionAllowed returns whether the victim residing on nodeInfo can be preempted by the preemptor

View file

@ -958,8 +958,13 @@ func TestPodGroupEvaluator_SelectVictimsOnDomain(t *testing.T) {
scheduledCount = 3 // v1 reprieved: this increases scheduledCount!
}
proposedAssignments := make([]fwk.ProposedAssignment, scheduledCount)
for i := range scheduledCount {
pod := st.MakePod().Name(fmt.Sprintf("p%d", i+1)).UID(fmt.Sprintf("p%d", i+1)).Obj()
proposedAssignments[i] = &testProposedAssignment{pod: pod, nodeName: "node1"}
}
return &fwk.PodGroupAssignments{
ProposedAssignments: make([]fwk.ProposedAssignment, scheduledCount),
ProposedAssignments: proposedAssignments,
}, fwk.NewStatus(fwk.Success)
},
expectedPods: []string{"v2"},
@ -999,8 +1004,14 @@ func TestPodGroupEvaluator_SelectVictimsOnDomain(t *testing.T) {
scheduledCount = 4 // v1 reprieved: this increases scheduledCount!
}
proposedAssignments := make([]fwk.ProposedAssignment, scheduledCount)
for i := range scheduledCount {
pod := st.MakePod().Name(fmt.Sprintf("p%d", i+1)).UID(fmt.Sprintf("p%d", i+1)).Obj()
proposedAssignments[i] = &testProposedAssignment{pod: pod, nodeName: "node1"}
}
return &fwk.PodGroupAssignments{
ProposedAssignments: make([]fwk.ProposedAssignment, scheduledCount),
ProposedAssignments: proposedAssignments,
}, fwk.NewStatus(fwk.Success)
},
expectedPods: []string{"v2"},
@ -1143,8 +1154,12 @@ func TestPodGroupEvaluator_SelectVictimsOnDomain(t *testing.T) {
if availableSlots >= neededSlots {
assignmentsCount := min(availableSlots, len(tt.preemptor.Members()))
proposedAssignments := make([]fwk.ProposedAssignment, assignmentsCount)
for i := range assignmentsCount {
proposedAssignments[i] = &testProposedAssignment{pod: tt.preemptor.Members()[i], nodeName: "node1"}
}
return &fwk.PodGroupAssignments{
ProposedAssignments: make([]fwk.ProposedAssignment, assignmentsCount),
ProposedAssignments: proposedAssignments,
}, fwk.NewStatus(fwk.Success)
}
return nil, fwk.NewStatus(fwk.Unschedulable)
@ -1154,7 +1169,7 @@ func TestPodGroupEvaluator_SelectVictimsOnDomain(t *testing.T) {
podGroupLister: pgLister,
}
victims, gotStatus := pl.selectVictimsOnDomain(ctx, tt.preemptor, domain, tt.pdbs, mockSchedulingFunc)
res, gotStatus := pl.selectVictimsOnDomain(ctx, tt.preemptor, domain, tt.pdbs, mockSchedulingFunc)
if !gotStatus.IsSuccess() {
t.Logf("SelectVictimsOnDomain failed: %v", gotStatus.Message())
}
@ -1167,12 +1182,12 @@ func TestPodGroupEvaluator_SelectVictimsOnDomain(t *testing.T) {
if wantCode != fwk.Success {
return
}
if victims == nil {
if res == nil {
t.Fatalf("expected non-nil victims on success")
}
gotNames := sets.Set[string]{}
for _, p := range victims.Pods {
for _, p := range res.victims.Pods {
gotNames.Insert(p.Name)
}
wantNames := sets.New(tt.expectedPods...)
@ -1319,3 +1334,69 @@ func TestMoreImportantVictim(t *testing.T) {
})
}
}
type testProposedAssignment struct {
pod *v1.Pod
nodeName string
}
func (a *testProposedAssignment) GetPod() *v1.Pod {
return a.pod
}
func (a *testProposedAssignment) GetNodeName() string {
return a.nodeName
}
func TestPodGroupEvaluator_SelectVictimsOnDomain_NominatedNodes(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
p1 := st.MakePod().Name("p1").UID("p1").Obj()
p2 := st.MakePod().Name("p2").UID("p2").Obj()
preemptor := newPodGroupPreemptor(
st.MakePodGroup().Name("preemptor-pg").Priority(highPriority).Obj(),
[]*v1.Pod{p1, p2},
)
domainNodes := []fwk.NodeInfo{
framework.NewNodeInfo(),
}
domainNodes[0].SetNode(st.MakeNode().Name("node1").Obj())
// Add a low priority pod as a potential victim to satisfy the check
p3 := st.MakePod().Name("p3").UID("p3").Node("node1").Priority(lowPriority).Obj()
podInfo, _ := framework.NewPodInfo(p3)
domainNodes[0].AddPodInfo(podInfo)
pgLister := &mockPodGroupLister{podGroups: make(map[string]*schedulingapi.PodGroup)}
domain := newDomainForWorkloadPreemption(domainNodes, pgLister, "test-domain")
mockSchedulingFunc := func(ctx context.Context) (*fwk.PodGroupAssignments, *fwk.Status) {
return &fwk.PodGroupAssignments{
ProposedAssignments: []fwk.ProposedAssignment{
&testProposedAssignment{pod: p1, nodeName: "node1"},
&testProposedAssignment{pod: p2, nodeName: ""}, // No node assigned
},
}, fwk.NewStatus(fwk.Success)
}
pl := &PodGroupEvaluator{}
result, gotStatus := pl.selectVictimsOnDomain(ctx, preemptor, domain, nil, mockSchedulingFunc)
if !gotStatus.IsSuccess() {
t.Fatalf("SelectVictimsOnDomain failed: %v", gotStatus.Message())
}
if result == nil {
t.Fatalf("expected non-nil result")
}
if len(result.nominatedNodeNames) != 1 {
t.Errorf("Expected 1 nominated node name, got %d", len(result.nominatedNodeNames))
}
if info, ok := result.nominatedNodeNames[p1]; !ok || info.NominatedNodeName != "node1" {
t.Errorf("Expected p1 to be nominated for node1, got %v", info)
}
}

View file

@ -282,8 +282,8 @@ func (pl *TestPlugin) PlacementScoreExtensions() fwk.PlacementScoreExtensions {
return nil
}
func (pl *TestPlugin) PodGroupPostFilter(ctx context.Context, pg *v1alpha3.PodGroup, pods []*v1.Pod, pgSchedulingFunc framework.PodGroupSchedulingFunc) *fwk.Status {
return nil
func (pl *TestPlugin) PodGroupPostFilter(ctx context.Context, pg *v1alpha3.PodGroup, pods []*v1.Pod, pgSchedulingFunc framework.PodGroupSchedulingFunc) (*framework.PodGroupPostFilterResult, *fwk.Status) {
return nil, nil
}
func newTestCloseErrorPlugin(_ context.Context, injArgs runtime.Object, f fwk.Handle) (fwk.Plugin, error) {

View file

@ -244,9 +244,14 @@ func (sched *Scheduler) podGroupCycle(ctx context.Context, schedFwk framework.Fr
// Run workload aware preemption if required. If the preemption is successful,
// we need to put the pods from pod group back into the scheduling queue.
if sched.workloadAwarePreemptionEnabled && result.status.Code() == fwk.Unschedulable {
status := sched.runWorkloadAwarePreemption(ctx, schedFwk, podGroupCycleState, podGroupInfo)
pgPostFilterResult, status := sched.runWorkloadAwarePreemption(ctx, schedFwk, podGroupCycleState, podGroupInfo)
if status.IsSuccess() {
result.waitingOnPreemption = true
for i := range result.podResults {
if nodeNameInfo, ok := pgPostFilterResult.NominatedNodeNames[result.podResults[i].pod]; ok {
result.podResults[i].scheduleResult.nominatingInfo = nodeNameInfo
}
}
} else if status.IsError() {
result.status = status
} else {
@ -264,24 +269,24 @@ func (sched *Scheduler) podGroupCycle(ctx context.Context, schedFwk framework.Fr
// original state.
// The function used for evaluating feasibility of pod group scheduling is
// scheduler.podGroupSchedulingAlgorithm run without any post filters.
func (sched *Scheduler) runWorkloadAwarePreemption(ctx context.Context, schedFwk framework.Framework, podGroupCycleState *framework.CycleState, podGroupInfo *framework.QueuedPodGroupInfo) *fwk.Status {
func (sched *Scheduler) runWorkloadAwarePreemption(ctx context.Context, schedFwk framework.Framework, podGroupCycleState *framework.CycleState, podGroupInfo *framework.QueuedPodGroupInfo) (*framework.PodGroupPostFilterResult, *fwk.Status) {
// Default preemption should be the only pod group post filter registered plugin.
plugins := schedFwk.PodGroupPostFilterPlugins()
if len(plugins) == 0 {
return fwk.NewStatus(fwk.Unschedulable, "default preemption plugin is not registered, workload aware preemption is disabled")
return nil, fwk.NewStatus(fwk.Unschedulable, "default preemption plugin is not registered, workload aware preemption is disabled")
}
pg, err := schedFwk.SharedInformerFactory().Scheduling().V1alpha3().PodGroups().Lister().PodGroups(podGroupInfo.Namespace).Get(podGroupInfo.Name)
if err != nil {
return fwk.AsStatus(fmt.Errorf("failed to get pod group object: %w", err))
return nil, fwk.AsStatus(fmt.Errorf("failed to get pod group object: %w", err))
}
if pg.Spec.SchedulingConstraints != nil && len(pg.Spec.SchedulingConstraints.Topology) > 0 {
return fwk.NewStatus(fwk.Unschedulable, "workload aware preemption is not supported for pod groups with scheduling constraints")
return nil, fwk.NewStatus(fwk.Unschedulable, "workload aware preemption is not supported for pod groups with scheduling constraints")
}
restoreFn, err := sched.nodeInfoSnapshot.BackupSnapshot()
if err != nil {
return fwk.AsStatus(fmt.Errorf("failed to backup snapshot: %w", err))
return nil, fwk.AsStatus(fmt.Errorf("failed to backup snapshot: %w", err))
}
defer restoreFn()

View file

@ -69,6 +69,7 @@ type fakePodGroupPlugin struct {
permitStatus map[string]*fwk.Status
podGroupPostFilterStatus *fwk.Status
podGroupPostFilterCalled bool
podGroupPostFilterResult map[string]*fwk.NominatingInfo
}
var _ fwk.FilterPlugin = &fakePodGroupPlugin{}
@ -100,12 +101,19 @@ func (mp *fakePodGroupPlugin) Permit(ctx context.Context, state fwk.CycleState,
return fwk.NewStatus(fwk.Unschedulable, "default fake permit failure"), 0
}
func (mp *fakePodGroupPlugin) PodGroupPostFilter(ctx context.Context, pg *schedulingv1alpha3.PodGroup, pods []*v1.Pod, pgSchedulingFunc framework.PodGroupSchedulingFunc) *fwk.Status {
func (mp *fakePodGroupPlugin) PodGroupPostFilter(ctx context.Context, pg *schedulingv1alpha3.PodGroup, pods []*v1.Pod, pgSchedulingFunc framework.PodGroupSchedulingFunc) (*framework.PodGroupPostFilterResult, *fwk.Status) {
mp.podGroupPostFilterCalled = true
if mp.podGroupPostFilterStatus != nil {
return mp.podGroupPostFilterStatus
if mp.podGroupPostFilterStatus == nil {
return nil, fwk.NewStatus(fwk.Unschedulable, "default fake podgroup postfilter failure")
}
return fwk.NewStatus(fwk.Unschedulable, "default fake podgroup postfilter failure")
if mp.podGroupPostFilterResult == nil {
return nil, mp.podGroupPostFilterStatus
}
n := make(map[*v1.Pod]*fwk.NominatingInfo, len(pods))
for _, passedPod := range pods {
n[passedPod] = mp.podGroupPostFilterResult[passedPod.Name]
}
return &framework.PodGroupPostFilterResult{NominatedNodeNames: n}, mp.podGroupPostFilterStatus
}
func TestPodGroupInfoForPod(t *testing.T) {
@ -2697,12 +2705,13 @@ func (f *fakeDefaultPreemption) Name() string {
func TestRunWorkloadAwarePreemption(t *testing.T) {
tests := []struct {
name string
podGroupInfo *framework.QueuedPodGroupInfo
existingPodGroups []*schedulingv1alpha3.PodGroup
pluginsRegistered bool
pluginReturnStatus *fwk.Status
expectedStatus *fwk.Status
name string
podGroupInfo *framework.QueuedPodGroupInfo
existingPodGroups []*schedulingv1alpha3.PodGroup
pluginsRegistered bool
pluginReturnStatus *fwk.Status
pluginNominatedNodes map[string]*fwk.NominatingInfo
expectedStatus *fwk.Status
}{
{
name: "error when no PodGroupPostFilter plugin is registered",
@ -2759,7 +2768,10 @@ func TestRunWorkloadAwarePreemption(t *testing.T) {
},
pluginsRegistered: true,
pluginReturnStatus: fwk.NewStatus(fwk.Success),
expectedStatus: fwk.NewStatus(fwk.Success),
pluginNominatedNodes: map[string]*fwk.NominatingInfo{
"p1": {NominatedNodeName: "node1", NominatingMode: fwk.ModeOverride},
},
expectedStatus: fwk.NewStatus(fwk.Success),
},
{
name: "failure when plugin returns unschedulable status",
@ -2810,12 +2822,12 @@ func TestRunWorkloadAwarePreemption(t *testing.T) {
queuesort.Name: queuesort.New,
defaultbinder.Name: defaultbinder.New,
}
if tt.pluginsRegistered {
registry["DefaultPreemption"] = func(ctx context.Context, obj runtime.Object, handle fwk.Handle) (fwk.Plugin, error) {
return &fakeDefaultPreemption{
fakePodGroupPlugin: &fakePodGroupPlugin{
podGroupPostFilterStatus: tt.pluginReturnStatus,
podGroupPostFilterResult: tt.pluginNominatedNodes,
},
}, nil
}
@ -2871,13 +2883,145 @@ func TestRunWorkloadAwarePreemption(t *testing.T) {
// Just inject logger explicitly in context to avoid panic
ctx = klog.NewContext(ctx, logger)
status := sched.runWorkloadAwarePreemption(ctx, schedFwk, framework.NewCycleState(), tt.podGroupInfo)
res, status := sched.runWorkloadAwarePreemption(ctx, schedFwk, framework.NewCycleState(), tt.podGroupInfo)
if tt.expectedStatus.Code() != status.Code() || tt.expectedStatus.Message() != status.Message() {
t.Errorf("Unexpected status, want code %v message %q, got code %v message %q",
tt.expectedStatus.Code(), tt.expectedStatus.Message(),
status.Code(), status.Message())
}
if len(tt.pluginNominatedNodes) > 0 {
for pod, nni := range res.NominatedNodeNames {
if !cmp.Equal(nni, tt.pluginNominatedNodes[pod.Name]) {
t.Errorf("Unexpected result, want %v, got %v", tt.pluginNominatedNodes, res.NominatedNodeNames)
}
}
}
})
}
}
func TestPodGroupCycle_NominatedNodes(t *testing.T) {
featuregatetesting.SetFeatureGatesDuringTest(t, utilfeature.DefaultFeatureGate, featuregatetesting.FeatureOverrides{
features.GenericWorkload: true,
features.WorkloadAwarePreemption: true,
features.GangScheduling: true,
})
testPodGroup := st.MakePodGroup().Name("pg").Namespace("default").Obj()
p1 := st.MakePod().Name("p1").UID("p1").PodGroupName("pg").SchedulerName("test-scheduler").Obj()
p2 := st.MakePod().Name("p2").UID("p2").PodGroupName("pg").SchedulerName("test-scheduler").Obj()
qInfo1 := &framework.QueuedPodInfo{PodInfo: &framework.PodInfo{Pod: p1}}
qInfo2 := &framework.QueuedPodInfo{PodInfo: &framework.PodInfo{Pod: p2}}
podGroupInfo := &framework.QueuedPodGroupInfo{
QueuedPodInfos: []*framework.QueuedPodInfo{qInfo1, qInfo2},
PodGroupInfo: &framework.PodGroupInfo{
Name: "pg",
Namespace: "default",
UnscheduledPods: []*v1.Pod{p1, p2},
},
}
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Mock PodGroupPostFilter to return NominatedNodeNames
nominatedNodes := map[string]*fwk.NominatingInfo{
p1.Name: {NominatingMode: fwk.ModeOverride, NominatedNodeName: "node1"},
}
fakePlugin := &fakePodGroupPlugin{
podGroupPostFilterStatus: fwk.NewStatus(fwk.Success),
podGroupPostFilterResult: nominatedNodes,
}
registry := frameworkruntime.Registry{
queuesort.Name: queuesort.New,
defaultbinder.Name: defaultbinder.New,
"DefaultPreemption": func(ctx context.Context, obj runtime.Object, handle fwk.Handle) (fwk.Plugin, error) {
return &fakeDefaultPreemption{fakePodGroupPlugin: fakePlugin}, nil
},
}
profileCfg := config.KubeSchedulerProfile{
SchedulerName: "test-scheduler",
Plugins: &config.Plugins{
QueueSort: config.PluginSet{
Enabled: []config.Plugin{{Name: queuesort.Name}},
},
Bind: config.PluginSet{
Enabled: []config.Plugin{{Name: defaultbinder.Name}},
},
PostFilter: config.PluginSet{
Enabled: []config.Plugin{{Name: "DefaultPreemption"}},
},
},
}
client := clientsetfake.NewSimpleClientset(testPodGroup)
informerFactory := informers.NewSharedInformerFactory(client, 0)
podGroupLister := informerFactory.Scheduling().V1alpha3().PodGroups().Lister()
informerFactory.Start(ctx.Done())
informerFactory.WaitForCacheSync(ctx.Done())
schedFwk, err := frameworkruntime.NewFramework(ctx, registry, &profileCfg,
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithClientSet(client),
frameworkruntime.WithEventRecorder(events.NewFakeRecorder(100)),
)
if err != nil {
t.Fatalf("Failed to create framework: %v", err)
}
cache := internalcache.New(ctx, nil, true)
sched := &Scheduler{
Profiles: profile.Map{"test-scheduler": schedFwk},
Cache: cache,
nodeInfoSnapshot: internalcache.NewEmptySnapshot(),
podGroupLister: podGroupLister,
workloadAwarePreemptionEnabled: true,
client: client,
}
// Mock SchedulePod to return Unschedulable initially, and success on subsequent calls
callCount := 0
sched.SchedulePod = func(ctx context.Context, fwk framework.Framework, state fwk.CycleState, podInfo *framework.QueuedPodInfo) (ScheduleResult, error) {
callCount++
if callCount <= 2 {
return ScheduleResult{}, &framework.FitError{Pod: podInfo.Pod, NumAllNodes: 1}
}
if podInfo.Pod.Name == "p1" {
return ScheduleResult{SuggestedHost: "node1"}, nil
}
if podInfo.Pod.Name == "p2" {
return ScheduleResult{SuggestedHost: "node2"}, nil
}
return ScheduleResult{}, fmt.Errorf("unexpected pod")
}
capturedFailureHandler := make(map[string]*fwk.NominatingInfo)
sched.FailureHandler = func(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, status *fwk.Status, nominatingInfo *fwk.NominatingInfo, start time.Time) {
capturedFailureHandler[podInfo.Pod.Name] = nominatingInfo
}
// Just inject logger explicitly in context to avoid panic
logger, _ := ktesting.NewTestContext(t)
ctx = klog.NewContext(ctx, logger)
sched.podGroupCycle(ctx, schedFwk, framework.NewCycleState(), podGroupInfo)
if len(capturedFailureHandler) == 0 {
t.Fatalf("expected FailureHandler to be called")
}
if capturedFailureHandler[p1.Name].NominatedNodeName != "node1" {
t.Errorf("Expected p1 to be nominated for node1, got %s", capturedFailureHandler[p1.Name].NominatedNodeName)
}
if capturedFailureHandler[p2.Name] != nil {
t.Errorf("Expected p2 to not be nominated, got %v", capturedFailureHandler[p2.Name])
}
}

View file

@ -41,9 +41,10 @@ import (
// TestPodGroupPreemption tests preemption scenarios involving pod groups.
func TestPodGroupPreemption(t *testing.T) {
featuregatetesting.SetFeatureGatesDuringTest(t, utilfeature.DefaultFeatureGate, featuregatetesting.FeatureOverrides{
features.GenericWorkload: true,
features.GangScheduling: true,
features.WorkloadAwarePreemption: true,
features.GenericWorkload: true,
features.GangScheduling: true,
features.WorkloadAwarePreemption: true,
features.ClearingNominatedNodeNameAfterBinding: false,
})
tests := []struct {
@ -56,6 +57,7 @@ func TestPodGroupPreemption(t *testing.T) {
expectedScheduled []string
expectedPreempted []string
expectedUnschedulable []string
expectedToHaveNNNInfo []string
expectedPodsPreemptedByWAP int
}{
{
@ -78,6 +80,7 @@ func TestPodGroupPreemption(t *testing.T) {
},
expectedScheduled: []string{"high-1", "high-2", "high-3"},
expectedPreempted: []string{"low-1", "low-2", "low-3"},
expectedToHaveNNNInfo: []string{"high-1", "high-2", "high-3"},
expectedPodsPreemptedByWAP: 3,
},
{
@ -100,6 +103,7 @@ func TestPodGroupPreemption(t *testing.T) {
},
expectedScheduled: []string{"high-1", "high-2", "high-3"},
expectedPreempted: []string{"low-1", "low-2", "low-3"},
expectedToHaveNNNInfo: []string{"high-1", "high-2", "high-3"},
expectedPodsPreemptedByWAP: 3,
},
{
@ -126,6 +130,7 @@ func TestPodGroupPreemption(t *testing.T) {
// high-1 and high-2 will fit on node1 if low-1 is preempted.
expectedScheduled: []string{"high-1", "high-2", "high-3", "low-2"},
expectedPreempted: []string{"low-1"},
expectedToHaveNNNInfo: []string{"high-1", "high-2", "high-3"},
expectedPodsPreemptedByWAP: 1,
},
{
@ -150,8 +155,10 @@ func TestPodGroupPreemption(t *testing.T) {
},
// high-1 will fit on node1 (it has 1 CPU free).
// high-2 and high-3 will fit on node2 if very-low-1 is preempted.
// high-2 will preempt very-low-1 and high-3 will schedule in next cycle to free space.
expectedScheduled: []string{"high-1", "high-2", "high-3", "low-1"},
expectedPreempted: []string{"very-low-1"},
expectedToHaveNNNInfo: []string{"high-2"},
expectedPodsPreemptedByWAP: 1,
},
{
@ -180,6 +187,7 @@ func TestPodGroupPreemption(t *testing.T) {
},
expectedScheduled: []string{"high-1", "high-2"},
expectedPreempted: []string{"low-3"},
expectedToHaveNNNInfo: []string{"high-1", "high-2"},
expectedPodsPreemptedByWAP: 1,
},
{
@ -205,6 +213,7 @@ func TestPodGroupPreemption(t *testing.T) {
},
expectedScheduled: []string{"high-1", "high-2", "high-3", "high-4"},
expectedPreempted: []string{"low-1", "low-2", "low-3", "low-4"},
expectedToHaveNNNInfo: []string{"high-1", "high-2", "high-3", "high-4"},
expectedPodsPreemptedByWAP: 4,
},
{
@ -228,6 +237,7 @@ func TestPodGroupPreemption(t *testing.T) {
expectedScheduled: []string{"mid-1", "low-1", "low-2"},
expectedPreempted: []string{},
expectedUnschedulable: []string{"high-1", "high-2", "high-3"},
expectedToHaveNNNInfo: []string{},
expectedPodsPreemptedByWAP: 0,
},
{
@ -249,6 +259,7 @@ func TestPodGroupPreemption(t *testing.T) {
},
expectedScheduled: []string{"high-1", "high-2", "mid-1"},
expectedPreempted: []string{"low-1", "low-2"},
expectedToHaveNNNInfo: []string{"high-1", "high-2"},
expectedPodsPreemptedByWAP: 2,
},
{
@ -270,6 +281,7 @@ func TestPodGroupPreemption(t *testing.T) {
},
expectedScheduled: []string{"high-1"},
expectedPreempted: []string{"low-1", "low-2", "low-3"},
expectedToHaveNNNInfo: []string{"high-1"},
expectedPodsPreemptedByWAP: 3,
},
{
@ -291,6 +303,7 @@ func TestPodGroupPreemption(t *testing.T) {
},
expectedScheduled: []string{"high-1"},
expectedPreempted: []string{"low-1", "low-2", "low-3"},
expectedToHaveNNNInfo: []string{"high-1"},
expectedPodsPreemptedByWAP: 3,
},
{
@ -312,6 +325,7 @@ func TestPodGroupPreemption(t *testing.T) {
},
expectedScheduled: []string{"low-1"},
expectedPreempted: []string{"high-1", "high-2", "high-3"},
expectedToHaveNNNInfo: []string{"low-1"},
expectedPodsPreemptedByWAP: 3,
},
{
@ -334,6 +348,7 @@ func TestPodGroupPreemption(t *testing.T) {
expectedScheduled: []string{"low-1", "low-2", "low-3"},
expectedPreempted: []string{},
expectedUnschedulable: []string{"high-1"},
expectedToHaveNNNInfo: []string{},
expectedPodsPreemptedByWAP: 0,
},
{
@ -354,6 +369,7 @@ func TestPodGroupPreemption(t *testing.T) {
},
expectedScheduled: []string{"high-1", "low-1", "low-2"},
expectedPreempted: []string{"low-3"},
expectedToHaveNNNInfo: []string{"high-1"},
expectedPodsPreemptedByWAP: 0,
},
{
@ -378,6 +394,7 @@ func TestPodGroupPreemption(t *testing.T) {
},
expectedScheduled: []string{"p-a", "p-b", "p-c"},
expectedPreempted: []string{"p1", "p2", "p3"},
expectedToHaveNNNInfo: []string{"p-a", "p-b", "p-c"},
expectedPodsPreemptedByWAP: 3,
},
{
@ -404,6 +421,7 @@ func TestPodGroupPreemption(t *testing.T) {
},
expectedScheduled: []string{"p-a", "p-b", "p-c", "p4"},
expectedPreempted: []string{"p1", "p2", "p3"},
expectedToHaveNNNInfo: []string{"p-a", "p-b", "p-c"},
expectedPodsPreemptedByWAP: 3,
},
{
@ -431,6 +449,7 @@ func TestPodGroupPreemption(t *testing.T) {
expectedScheduled: []string{"p-a", "p-b", "p3", "p4"},
expectedPreempted: []string{"p1", "p2"},
expectedUnschedulable: []string{"p-c"},
expectedToHaveNNNInfo: []string{"p-a", "p-b"},
expectedPodsPreemptedByWAP: 2,
},
{
@ -456,6 +475,7 @@ func TestPodGroupPreemption(t *testing.T) {
},
expectedScheduled: []string{"p-a", "p-b", "p-c"},
expectedPreempted: []string{"v1", "v2", "v3"},
expectedToHaveNNNInfo: []string{"p-a", "p-b", "p-c"},
expectedPodsPreemptedByWAP: 3,
},
{
@ -482,9 +502,11 @@ func TestPodGroupPreemption(t *testing.T) {
st.MakePod().Name("p-b").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Container("image").PodGroupName("preemptor-pg").ZeroTerminationGracePeriod().Priority(101).Obj(),
st.MakePod().Name("p-c").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Container("image").PodGroupName("preemptor-pg").ZeroTerminationGracePeriod().Priority(100).Obj(),
},
// p-a will preempt victim-pg, p-b will schedule to empty space, so only p-a will have NNN info.
expectedScheduled: []string{"p-a", "p-b", "v3", "v4"},
expectedPreempted: []string{"v1", "v2"},
expectedUnschedulable: []string{"p-c"},
expectedToHaveNNNInfo: []string{"p-a"},
expectedPodsPreemptedByWAP: 2,
},
{
@ -509,6 +531,7 @@ func TestPodGroupPreemption(t *testing.T) {
},
expectedScheduled: []string{"p-a", "p-b", "p-c"},
expectedPreempted: []string{"p1", "p2", "p3"},
expectedToHaveNNNInfo: []string{"p-a", "p-b", "p-c"},
expectedPodsPreemptedByWAP: 3,
},
{
@ -535,6 +558,7 @@ func TestPodGroupPreemption(t *testing.T) {
},
expectedScheduled: []string{"p-a", "p-b", "p-c", "p4"},
expectedPreempted: []string{"p1", "p2", "p3"},
expectedToHaveNNNInfo: []string{"p-a", "p-b", "p-c"},
expectedPodsPreemptedByWAP: 3,
},
{
@ -562,6 +586,7 @@ func TestPodGroupPreemption(t *testing.T) {
expectedScheduled: []string{"p-a", "p-b", "p3", "p4"},
expectedPreempted: []string{"p1", "p2"},
expectedUnschedulable: []string{"p-c"},
expectedToHaveNNNInfo: []string{"p-a", "p-b"},
expectedPodsPreemptedByWAP: 2,
},
{
@ -587,6 +612,7 @@ func TestPodGroupPreemption(t *testing.T) {
},
expectedScheduled: []string{"p-a", "p-b", "p-c"},
expectedPreempted: []string{"v1", "v2", "v3"},
expectedToHaveNNNInfo: []string{"p-a", "p-b", "p-c"},
expectedPodsPreemptedByWAP: 3,
},
{
@ -613,9 +639,11 @@ func TestPodGroupPreemption(t *testing.T) {
st.MakePod().Name("p-b").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Container("image").PodGroupName("preemptor-pg").ZeroTerminationGracePeriod().Priority(101).Obj(),
st.MakePod().Name("p-c").Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Container("image").PodGroupName("preemptor-pg").ZeroTerminationGracePeriod().Priority(100).Obj(),
},
// p-a will preempt "victim-pg" and p-b will schedule to empty space, so only p-a will have NNN info.
expectedScheduled: []string{"p-a", "p-b", "v3", "v4"},
expectedPreempted: []string{"v1", "v2"},
expectedUnschedulable: []string{"p-c"},
expectedToHaveNNNInfo: []string{"p-a"},
expectedPodsPreemptedByWAP: 2,
},
{
@ -812,6 +840,18 @@ func TestPodGroupPreemption(t *testing.T) {
t.Errorf("Pod %s was expected to be preempted but wasn't", podName)
}
}
// 9. Verify preemptor pods have nominated node name
for _, podName := range tt.expectedToHaveNNNInfo {
pod, err := cs.CoreV1().Pods(ns).Get(testCtx.Ctx, podName, metav1.GetOptions{})
if err != nil {
t.Errorf("Error getting pod %s: %v", podName, err)
continue
}
if err := testutils.WaitForNominatedNodeName(testCtx.Ctx, cs, pod); err != nil {
t.Errorf("Error waiting for nominated node name for pod %s: %v", podName, err)
}
}
})
}
}