From 259b504c3b4e5278fc59ad94e5d9ae2fd67b2bca Mon Sep 17 00:00:00 2001 From: Matt Matejczyk Date: Thu, 30 Apr 2026 11:46:59 +0000 Subject: [PATCH] scheduler: match preemptor eligibility behavior in pod group preemption --- .../defaultpreemption/default_preemption.go | 16 +-- .../preemption/podgrouppreemption.go | 45 +++++- .../preemption/podgrouppreemption_test.go | 129 +++++++++++++++++- pkg/scheduler/framework/preemption/util.go | 35 +++++ 4 files changed, 201 insertions(+), 24 deletions(-) create mode 100644 pkg/scheduler/framework/preemption/util.go diff --git a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go index feddc3c87df..a4cb5cd3d14 100644 --- a/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go +++ b/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go @@ -377,7 +377,7 @@ func (pl *DefaultPreemption) PodEligibleToPreemptOthers(_ context.Context, pod * if nodeInfo, _ := nodeInfos.Get(nomNodeName); nodeInfo != nil { for _, p := range nodeInfo.GetPods() { - if pl.isPreemptionAllowed(nodeInfo, p, pod) && podTerminatingByPreemption(p.GetPod()) { + if pl.isPreemptionAllowed(nodeInfo, p, pod) && preemption.PodTerminatingByPreemption(p.GetPod()) { // There is a terminating pod on the nominated node. return false, "not eligible due to a terminating pod on the nominated node." } @@ -398,20 +398,6 @@ func (pl *DefaultPreemption) isPreemptionAllowed(nodeInfo fwk.NodeInfo, victim f return corev1helpers.PodPriority(victim.GetPod()) < corev1helpers.PodPriority(preemptor) && pl.IsEligiblePod(nodeInfo, victim, preemptor) } -// podTerminatingByPreemption returns true if the pod is in the termination state caused by scheduler preemption. -func podTerminatingByPreemption(p *v1.Pod) bool { - if p.DeletionTimestamp == nil { - return false - } - - for _, condition := range p.Status.Conditions { - if condition.Type == v1.DisruptionTarget { - return condition.Status == v1.ConditionTrue && condition.Reason == v1.PodReasonPreemptionByScheduler - } - } - return false -} - // filterPodsWithPDBViolation groups the given "pods" into two groups of "violatingPods" // and "nonViolatingPods" based on whether their PDBs will be violated if they are // preempted. diff --git a/pkg/scheduler/framework/preemption/podgrouppreemption.go b/pkg/scheduler/framework/preemption/podgrouppreemption.go index ab8b6aa0ccf..c5b7e8cdec2 100644 --- a/pkg/scheduler/framework/preemption/podgrouppreemption.go +++ b/pkg/scheduler/framework/preemption/podgrouppreemption.go @@ -25,10 +25,13 @@ import ( v1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1" schedulingapi "k8s.io/api/scheduling/v1alpha2" + "k8s.io/apimachinery/pkg/util/sets" policylisters "k8s.io/client-go/listers/policy/v1" schedulinglisters "k8s.io/client-go/listers/scheduling/v1alpha2" + corev1helpers "k8s.io/component-helpers/scheduling/corev1" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" + "k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/klog/v2" extenderv1 "k8s.io/kube-scheduler/extender/v1" @@ -97,17 +100,17 @@ func (ev *PodGroupEvaluator) selectVictimsOnDomain( podGroupSchedulingFunc framework.PodGroupSchedulingFunc) (*extenderv1.Victims, *fwk.Status) { logger := klog.FromContext(ctx) - // Ensure the preemptor is eligible to preempt other pods. - if ok, msg := ev.preemptorEligibleToPreemptOthers(ctx, preemptor); !ok { - logger.V(5).Info("Preemptor is not eligible for preemption", "preemptor", klog.KObj(preemptor.podGroup), "reason", msg) - return nil, fwk.NewStatus(fwk.Unschedulable, msg) - } - nameToNode := make(map[string]fwk.NodeInfo) for _, nodeInfo := range domain.Nodes() { nameToNode[nodeInfo.Node().Name] = nodeInfo } + // Ensure the preemptor is eligible to preempt other pods. + if ok, msg := ev.preemptorEligibleToPreemptOthers(ctx, preemptor, nameToNode); !ok { + logger.V(5).Info("Preemptor is not eligible for preemption", "preemptor", klog.KObj(preemptor.podGroup), "reason", msg) + return nil, fwk.NewStatus(fwk.Unschedulable, msg) + } + // Compared to the default preemption algorithm do not run the runPreFilterExtensionRemovePod // or runPreFilterExtensionAddPod as pod group scheduling does prefilter anyway. removePods := func(v *victim) error { @@ -254,13 +257,41 @@ func (ev *PodGroupEvaluator) isPreemptionAllowed(victim *victim, preemptor *podG // preemptorEligibleToPreemptOthers returns one bool and one string. The bool // indicates whether this preemptor should be considered for preempting other pods or // not. The string includes the reason if this preemptor isn't eligible. -func (ev *PodGroupEvaluator) preemptorEligibleToPreemptOthers(_ context.Context, preemptor *podGroupPreemptor) (bool, string) { +func (ev *PodGroupEvaluator) preemptorEligibleToPreemptOthers(_ context.Context, preemptor *podGroupPreemptor, nameToNode map[string]fwk.NodeInfo) (bool, string) { if preemptor.PreemptionPolicy() == v1.PreemptNever { return false, "not eligible due to preemptionPolicy=Never." } + + nominatedNodes := sets.New[string]() + for _, pod := range preemptor.Members() { + if len(pod.Status.NominatedNodeName) > 0 { + nominatedNodes.Insert(pod.Status.NominatedNodeName) + } + } + + for nomNodeName := range nominatedNodes { + if nodeInfo, exists := nameToNode[nomNodeName]; exists { + for _, p := range nodeInfo.GetPods() { + if ev.getPodPriority(p.GetPod()) < preemptor.Priority() && PodTerminatingByPreemption(p.GetPod()) { + return false, "not eligible due to a terminating pod on the nominated node." + } + } + } + } + return true, "" } +// getPodPriority returns the effective preemption priority of a pod. If the pod belongs to +// a pod group, it returns the priority of the pod group. +// Otherwise, it returns the pod's own priority. +func (ev *PodGroupEvaluator) getPodPriority(p *v1.Pod) int32 { + if pg := getPodGroup(p, ev.podGroupLister); pg != nil { + return util.PodGroupPriority(pg) + } + return corev1helpers.PodPriority(p) +} + // moreImportantVictim decides which of two preemption units is considered more critical. // This function is dedidcated only for PodGroup preemption. // diff --git a/pkg/scheduler/framework/preemption/podgrouppreemption_test.go b/pkg/scheduler/framework/preemption/podgrouppreemption_test.go index be8cb58874c..788355efb9a 100644 --- a/pkg/scheduler/framework/preemption/podgrouppreemption_test.go +++ b/pkg/scheduler/framework/preemption/podgrouppreemption_test.go @@ -221,13 +221,136 @@ func TestPodGroupEvaluator_SelectVictimsOnDomain(t *testing.T) { ), blockingRules: []blockingRule{ {nodeName: "node1", capacity: 1, blockingVictims: sets.New("p1")}, - {nodeName: "node2", capacity: 1, blockingVictims: sets.New("p2")}, {nodeName: "node3", capacity: 1, blockingVictims: sets.New("p3")}, }, expectedPods: []string{}, expectedStatus: fwk.NewStatus(fwk.Unschedulable), }, + { + name: "Preemptor group is not eligible if any member has nominated node with terminating pods", + nodeNames: []string{"node1"}, + initPods: []*v1.Pod{ + st.MakePod().Name("victim").UID("v1").Node("node1").Priority(lowPriority).Condition(v1.DisruptionTarget, v1.ConditionTrue, v1.PodReasonPreemptionByScheduler).Terminating().Obj(), + }, + preemptor: newPodGroupPreemptor( + st.MakePodGroup().Name("preemptor-pg").Priority(highPriority).Obj(), + []*v1.Pod{ + st.MakePod().Name("p1").UID("p1").Priority(highPriority).Obj(), + st.MakePod().Name("p2").UID("p2").Priority(highPriority).NominatedNodeName("node1").Obj(), + }, + ), + blockingRules: []blockingRule{}, + expectedPods: []string{}, + expectedStatus: fwk.NewStatus(fwk.Unschedulable, "not eligible due to a terminating pod on the nominated node."), + }, + { + name: "Preemptor group is eligible if terminating pods are on non-nominated nodes", + nodeNames: []string{"node1", "node2"}, + initPods: []*v1.Pod{ + st.MakePod().Name("victim").UID("v1").Node("node2").Priority(lowPriority).Condition(v1.DisruptionTarget, v1.ConditionTrue, v1.PodReasonPreemptionByScheduler).Terminating().Obj(), + st.MakePod().Name("other-victim").UID("v2").Node("node1").Priority(lowPriority).Obj(), + }, + initPodGroups: []*schedulingapi.PodGroup{}, + preemptor: newPodGroupPreemptor( + st.MakePodGroup().Name("preemptor-pg").Priority(highPriority).Obj(), + []*v1.Pod{ + st.MakePod().Name("p1").UID("p1").Priority(highPriority).Obj(), + st.MakePod().Name("p2").UID("p2").Priority(highPriority).NominatedNodeName("node1").Obj(), + }, + ), + blockingRules: []blockingRule{ + {nodeName: "node1", blockingVictims: sets.New("other-victim"), capacity: 1}, + }, + expectedPods: []string{"other-victim"}, + expectedStatus: fwk.NewStatus(fwk.Success), + }, + { + name: "Preemptor group is not eligible if nominated node has terminating pod belonging to a pod group of lower priority", + nodeNames: []string{"node1"}, + initPods: []*v1.Pod{ + st.MakePod().Name("victim").UID("v1").Node("node1").Priority(highPriority).PodGroupName("victim-pg").Condition(v1.DisruptionTarget, v1.ConditionTrue, v1.PodReasonPreemptionByScheduler).Terminating().Obj(), + }, + initPodGroups: []*schedulingapi.PodGroup{ + st.MakePodGroup().Name("victim-pg").UID("victim-pg").Priority(lowPriority).DisruptionMode(schedulingapi.DisruptionModePodGroup).Obj(), + }, + preemptor: newPodGroupPreemptor( + st.MakePodGroup().Name("preemptor-pg").Priority(highPriority).Obj(), + []*v1.Pod{ + st.MakePod().Name("p1").UID("p1").Priority(highPriority).Obj(), + st.MakePod().Name("p2").UID("p2").Priority(highPriority).NominatedNodeName("node1").Obj(), + }, + ), + blockingRules: []blockingRule{}, + expectedPods: []string{}, + expectedStatus: fwk.NewStatus(fwk.Unschedulable, "not eligible due to a terminating pod on the nominated node."), + }, + { + name: "Preemptor group is eligible if nominated node has terminating pod belonging to a pod group of higher priority", + nodeNames: []string{"node1"}, + initPods: []*v1.Pod{ + st.MakePod().Name("victim").UID("v1").Node("node1").Priority(lowPriority).PodGroupName("victim-pg").Condition(v1.DisruptionTarget, v1.ConditionTrue, v1.PodReasonPreemptionByScheduler).Terminating().Obj(), + st.MakePod().Name("other-victim").UID("v2").Node("node1").Priority(lowPriority).Obj(), + }, + initPodGroups: []*schedulingapi.PodGroup{ + st.MakePodGroup().Name("victim-pg").UID("victim-pg").Priority(highPriority).DisruptionMode(schedulingapi.DisruptionModePodGroup).Obj(), + }, + preemptor: newPodGroupPreemptor( + st.MakePodGroup().Name("preemptor-pg").Priority(midPriority).Obj(), + []*v1.Pod{ + st.MakePod().Name("p1").UID("p1").Priority(midPriority).Obj(), + st.MakePod().Name("p2").UID("p2").Priority(midPriority).NominatedNodeName("node1").Obj(), + }, + ), + blockingRules: []blockingRule{ + {nodeName: "node1", blockingVictims: sets.New("other-victim"), capacity: 1}, + }, + expectedPods: []string{"other-victim"}, + expectedStatus: fwk.NewStatus(fwk.Success), + }, + { + name: "Preemptor group is not eligible if nominated node has terminating pod belonging to a pod group of lower priority with DisruptionModePod", + nodeNames: []string{"node1"}, + initPods: []*v1.Pod{ + st.MakePod().Name("victim").UID("v1").Node("node1").Priority(highPriority).PodGroupName("victim-pg").Condition(v1.DisruptionTarget, v1.ConditionTrue, v1.PodReasonPreemptionByScheduler).Terminating().Obj(), + }, + initPodGroups: []*schedulingapi.PodGroup{ + st.MakePodGroup().Name("victim-pg").UID("victim-pg").Priority(lowPriority).DisruptionMode(schedulingapi.DisruptionModePod).Obj(), + }, + preemptor: newPodGroupPreemptor( + st.MakePodGroup().Name("preemptor-pg").Priority(highPriority).Obj(), + []*v1.Pod{ + st.MakePod().Name("p1").UID("p1").Priority(highPriority).Obj(), + st.MakePod().Name("p2").UID("p2").Priority(highPriority).NominatedNodeName("node1").Obj(), + }, + ), + blockingRules: []blockingRule{}, + expectedPods: []string{}, + expectedStatus: fwk.NewStatus(fwk.Unschedulable, "not eligible due to a terminating pod on the nominated node."), + }, + { + name: "Preemptor group is eligible if nominated node has terminating pod belonging to a pod group of higher priority with nil DisruptionMode", + nodeNames: []string{"node1"}, + initPods: []*v1.Pod{ + st.MakePod().Name("victim").UID("v1").Node("node1").Priority(lowPriority).PodGroupName("victim-pg").Condition(v1.DisruptionTarget, v1.ConditionTrue, v1.PodReasonPreemptionByScheduler).Terminating().Obj(), + st.MakePod().Name("other-victim").UID("v2").Node("node1").Priority(lowPriority).Obj(), + }, + initPodGroups: []*schedulingapi.PodGroup{ + st.MakePodGroup().Name("victim-pg").UID("victim-pg").Priority(highPriority).Obj(), + }, + preemptor: newPodGroupPreemptor( + st.MakePodGroup().Name("preemptor-pg").Priority(midPriority).Obj(), + []*v1.Pod{ + st.MakePod().Name("p1").UID("p1").Priority(midPriority).Obj(), + st.MakePod().Name("p2").UID("p2").Priority(midPriority).NominatedNodeName("node1").Obj(), + }, + ), + blockingRules: []blockingRule{ + {nodeName: "node1", blockingVictims: sets.New("other-victim"), capacity: 1}, + }, + expectedPods: []string{"other-victim"}, + expectedStatus: fwk.NewStatus(fwk.Success), + }, { name: "Preempt single lower priority pod", nodeNames: []string{"node1"}, @@ -1027,7 +1150,9 @@ func TestPodGroupEvaluator_SelectVictimsOnDomain(t *testing.T) { return nil, fwk.NewStatus(fwk.Unschedulable) } - pl := &PodGroupEvaluator{} + pl := &PodGroupEvaluator{ + podGroupLister: pgLister, + } victims, gotStatus := pl.selectVictimsOnDomain(ctx, tt.preemptor, domain, tt.pdbs, mockSchedulingFunc) if !gotStatus.IsSuccess() { diff --git a/pkg/scheduler/framework/preemption/util.go b/pkg/scheduler/framework/preemption/util.go new file mode 100644 index 00000000000..f73d4b62945 --- /dev/null +++ b/pkg/scheduler/framework/preemption/util.go @@ -0,0 +1,35 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package preemption + +import ( + v1 "k8s.io/api/core/v1" +) + +// PodTerminatingByPreemption returns true if the pod is in the termination state caused by scheduler preemption. +func PodTerminatingByPreemption(p *v1.Pod) bool { + if p.DeletionTimestamp == nil { + return false + } + + for _, condition := range p.Status.Conditions { + if condition.Type == v1.DisruptionTarget { + return condition.Status == v1.ConditionTrue && condition.Reason == v1.PodReasonPreemptionByScheduler + } + } + return false +}