scheduler: match preemptor eligibility behavior in pod group preemption

This commit is contained in:
Matt Matejczyk 2026-04-30 11:46:59 +00:00
parent b151ef23a5
commit 259b504c3b
4 changed files with 201 additions and 24 deletions

View file

@ -377,7 +377,7 @@ func (pl *DefaultPreemption) PodEligibleToPreemptOthers(_ context.Context, pod *
if nodeInfo, _ := nodeInfos.Get(nomNodeName); nodeInfo != nil {
for _, p := range nodeInfo.GetPods() {
if pl.isPreemptionAllowed(nodeInfo, p, pod) && podTerminatingByPreemption(p.GetPod()) {
if pl.isPreemptionAllowed(nodeInfo, p, pod) && preemption.PodTerminatingByPreemption(p.GetPod()) {
// There is a terminating pod on the nominated node.
return false, "not eligible due to a terminating pod on the nominated node."
}
@ -398,20 +398,6 @@ func (pl *DefaultPreemption) isPreemptionAllowed(nodeInfo fwk.NodeInfo, victim f
return corev1helpers.PodPriority(victim.GetPod()) < corev1helpers.PodPriority(preemptor) && pl.IsEligiblePod(nodeInfo, victim, preemptor)
}
// podTerminatingByPreemption returns true if the pod is in the termination state caused by scheduler preemption.
func podTerminatingByPreemption(p *v1.Pod) bool {
if p.DeletionTimestamp == nil {
return false
}
for _, condition := range p.Status.Conditions {
if condition.Type == v1.DisruptionTarget {
return condition.Status == v1.ConditionTrue && condition.Reason == v1.PodReasonPreemptionByScheduler
}
}
return false
}
// filterPodsWithPDBViolation groups the given "pods" into two groups of "violatingPods"
// and "nonViolatingPods" based on whether their PDBs will be violated if they are
// preempted.

View file

@ -25,10 +25,13 @@ import (
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1"
schedulingapi "k8s.io/api/scheduling/v1alpha2"
"k8s.io/apimachinery/pkg/util/sets"
policylisters "k8s.io/client-go/listers/policy/v1"
schedulinglisters "k8s.io/client-go/listers/scheduling/v1alpha2"
corev1helpers "k8s.io/component-helpers/scheduling/corev1"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
"k8s.io/kubernetes/pkg/scheduler/util"
"k8s.io/klog/v2"
extenderv1 "k8s.io/kube-scheduler/extender/v1"
@ -97,17 +100,17 @@ func (ev *PodGroupEvaluator) selectVictimsOnDomain(
podGroupSchedulingFunc framework.PodGroupSchedulingFunc) (*extenderv1.Victims, *fwk.Status) {
logger := klog.FromContext(ctx)
// Ensure the preemptor is eligible to preempt other pods.
if ok, msg := ev.preemptorEligibleToPreemptOthers(ctx, preemptor); !ok {
logger.V(5).Info("Preemptor is not eligible for preemption", "preemptor", klog.KObj(preemptor.podGroup), "reason", msg)
return nil, fwk.NewStatus(fwk.Unschedulable, msg)
}
nameToNode := make(map[string]fwk.NodeInfo)
for _, nodeInfo := range domain.Nodes() {
nameToNode[nodeInfo.Node().Name] = nodeInfo
}
// Ensure the preemptor is eligible to preempt other pods.
if ok, msg := ev.preemptorEligibleToPreemptOthers(ctx, preemptor, nameToNode); !ok {
logger.V(5).Info("Preemptor is not eligible for preemption", "preemptor", klog.KObj(preemptor.podGroup), "reason", msg)
return nil, fwk.NewStatus(fwk.Unschedulable, msg)
}
// Compared to the default preemption algorithm do not run the runPreFilterExtensionRemovePod
// or runPreFilterExtensionAddPod as pod group scheduling does prefilter anyway.
removePods := func(v *victim) error {
@ -254,13 +257,41 @@ func (ev *PodGroupEvaluator) isPreemptionAllowed(victim *victim, preemptor *podG
// preemptorEligibleToPreemptOthers returns one bool and one string. The bool
// indicates whether this preemptor should be considered for preempting other pods or
// not. The string includes the reason if this preemptor isn't eligible.
func (ev *PodGroupEvaluator) preemptorEligibleToPreemptOthers(_ context.Context, preemptor *podGroupPreemptor) (bool, string) {
func (ev *PodGroupEvaluator) preemptorEligibleToPreemptOthers(_ context.Context, preemptor *podGroupPreemptor, nameToNode map[string]fwk.NodeInfo) (bool, string) {
if preemptor.PreemptionPolicy() == v1.PreemptNever {
return false, "not eligible due to preemptionPolicy=Never."
}
nominatedNodes := sets.New[string]()
for _, pod := range preemptor.Members() {
if len(pod.Status.NominatedNodeName) > 0 {
nominatedNodes.Insert(pod.Status.NominatedNodeName)
}
}
for nomNodeName := range nominatedNodes {
if nodeInfo, exists := nameToNode[nomNodeName]; exists {
for _, p := range nodeInfo.GetPods() {
if ev.getPodPriority(p.GetPod()) < preemptor.Priority() && PodTerminatingByPreemption(p.GetPod()) {
return false, "not eligible due to a terminating pod on the nominated node."
}
}
}
}
return true, ""
}
// getPodPriority returns the effective preemption priority of a pod. If the pod belongs to
// a pod group, it returns the priority of the pod group.
// Otherwise, it returns the pod's own priority.
func (ev *PodGroupEvaluator) getPodPriority(p *v1.Pod) int32 {
if pg := getPodGroup(p, ev.podGroupLister); pg != nil {
return util.PodGroupPriority(pg)
}
return corev1helpers.PodPriority(p)
}
// moreImportantVictim decides which of two preemption units is considered more critical.
// This function is dedidcated only for PodGroup preemption.
//

View file

@ -221,13 +221,136 @@ func TestPodGroupEvaluator_SelectVictimsOnDomain(t *testing.T) {
),
blockingRules: []blockingRule{
{nodeName: "node1", capacity: 1, blockingVictims: sets.New("p1")},
{nodeName: "node2", capacity: 1, blockingVictims: sets.New("p2")},
{nodeName: "node3", capacity: 1, blockingVictims: sets.New("p3")},
},
expectedPods: []string{},
expectedStatus: fwk.NewStatus(fwk.Unschedulable),
},
{
name: "Preemptor group is not eligible if any member has nominated node with terminating pods",
nodeNames: []string{"node1"},
initPods: []*v1.Pod{
st.MakePod().Name("victim").UID("v1").Node("node1").Priority(lowPriority).Condition(v1.DisruptionTarget, v1.ConditionTrue, v1.PodReasonPreemptionByScheduler).Terminating().Obj(),
},
preemptor: newPodGroupPreemptor(
st.MakePodGroup().Name("preemptor-pg").Priority(highPriority).Obj(),
[]*v1.Pod{
st.MakePod().Name("p1").UID("p1").Priority(highPriority).Obj(),
st.MakePod().Name("p2").UID("p2").Priority(highPriority).NominatedNodeName("node1").Obj(),
},
),
blockingRules: []blockingRule{},
expectedPods: []string{},
expectedStatus: fwk.NewStatus(fwk.Unschedulable, "not eligible due to a terminating pod on the nominated node."),
},
{
name: "Preemptor group is eligible if terminating pods are on non-nominated nodes",
nodeNames: []string{"node1", "node2"},
initPods: []*v1.Pod{
st.MakePod().Name("victim").UID("v1").Node("node2").Priority(lowPriority).Condition(v1.DisruptionTarget, v1.ConditionTrue, v1.PodReasonPreemptionByScheduler).Terminating().Obj(),
st.MakePod().Name("other-victim").UID("v2").Node("node1").Priority(lowPriority).Obj(),
},
initPodGroups: []*schedulingapi.PodGroup{},
preemptor: newPodGroupPreemptor(
st.MakePodGroup().Name("preemptor-pg").Priority(highPriority).Obj(),
[]*v1.Pod{
st.MakePod().Name("p1").UID("p1").Priority(highPriority).Obj(),
st.MakePod().Name("p2").UID("p2").Priority(highPriority).NominatedNodeName("node1").Obj(),
},
),
blockingRules: []blockingRule{
{nodeName: "node1", blockingVictims: sets.New("other-victim"), capacity: 1},
},
expectedPods: []string{"other-victim"},
expectedStatus: fwk.NewStatus(fwk.Success),
},
{
name: "Preemptor group is not eligible if nominated node has terminating pod belonging to a pod group of lower priority",
nodeNames: []string{"node1"},
initPods: []*v1.Pod{
st.MakePod().Name("victim").UID("v1").Node("node1").Priority(highPriority).PodGroupName("victim-pg").Condition(v1.DisruptionTarget, v1.ConditionTrue, v1.PodReasonPreemptionByScheduler).Terminating().Obj(),
},
initPodGroups: []*schedulingapi.PodGroup{
st.MakePodGroup().Name("victim-pg").UID("victim-pg").Priority(lowPriority).DisruptionMode(schedulingapi.DisruptionModePodGroup).Obj(),
},
preemptor: newPodGroupPreemptor(
st.MakePodGroup().Name("preemptor-pg").Priority(highPriority).Obj(),
[]*v1.Pod{
st.MakePod().Name("p1").UID("p1").Priority(highPriority).Obj(),
st.MakePod().Name("p2").UID("p2").Priority(highPriority).NominatedNodeName("node1").Obj(),
},
),
blockingRules: []blockingRule{},
expectedPods: []string{},
expectedStatus: fwk.NewStatus(fwk.Unschedulable, "not eligible due to a terminating pod on the nominated node."),
},
{
name: "Preemptor group is eligible if nominated node has terminating pod belonging to a pod group of higher priority",
nodeNames: []string{"node1"},
initPods: []*v1.Pod{
st.MakePod().Name("victim").UID("v1").Node("node1").Priority(lowPriority).PodGroupName("victim-pg").Condition(v1.DisruptionTarget, v1.ConditionTrue, v1.PodReasonPreemptionByScheduler).Terminating().Obj(),
st.MakePod().Name("other-victim").UID("v2").Node("node1").Priority(lowPriority).Obj(),
},
initPodGroups: []*schedulingapi.PodGroup{
st.MakePodGroup().Name("victim-pg").UID("victim-pg").Priority(highPriority).DisruptionMode(schedulingapi.DisruptionModePodGroup).Obj(),
},
preemptor: newPodGroupPreemptor(
st.MakePodGroup().Name("preemptor-pg").Priority(midPriority).Obj(),
[]*v1.Pod{
st.MakePod().Name("p1").UID("p1").Priority(midPriority).Obj(),
st.MakePod().Name("p2").UID("p2").Priority(midPriority).NominatedNodeName("node1").Obj(),
},
),
blockingRules: []blockingRule{
{nodeName: "node1", blockingVictims: sets.New("other-victim"), capacity: 1},
},
expectedPods: []string{"other-victim"},
expectedStatus: fwk.NewStatus(fwk.Success),
},
{
name: "Preemptor group is not eligible if nominated node has terminating pod belonging to a pod group of lower priority with DisruptionModePod",
nodeNames: []string{"node1"},
initPods: []*v1.Pod{
st.MakePod().Name("victim").UID("v1").Node("node1").Priority(highPriority).PodGroupName("victim-pg").Condition(v1.DisruptionTarget, v1.ConditionTrue, v1.PodReasonPreemptionByScheduler).Terminating().Obj(),
},
initPodGroups: []*schedulingapi.PodGroup{
st.MakePodGroup().Name("victim-pg").UID("victim-pg").Priority(lowPriority).DisruptionMode(schedulingapi.DisruptionModePod).Obj(),
},
preemptor: newPodGroupPreemptor(
st.MakePodGroup().Name("preemptor-pg").Priority(highPriority).Obj(),
[]*v1.Pod{
st.MakePod().Name("p1").UID("p1").Priority(highPriority).Obj(),
st.MakePod().Name("p2").UID("p2").Priority(highPriority).NominatedNodeName("node1").Obj(),
},
),
blockingRules: []blockingRule{},
expectedPods: []string{},
expectedStatus: fwk.NewStatus(fwk.Unschedulable, "not eligible due to a terminating pod on the nominated node."),
},
{
name: "Preemptor group is eligible if nominated node has terminating pod belonging to a pod group of higher priority with nil DisruptionMode",
nodeNames: []string{"node1"},
initPods: []*v1.Pod{
st.MakePod().Name("victim").UID("v1").Node("node1").Priority(lowPriority).PodGroupName("victim-pg").Condition(v1.DisruptionTarget, v1.ConditionTrue, v1.PodReasonPreemptionByScheduler).Terminating().Obj(),
st.MakePod().Name("other-victim").UID("v2").Node("node1").Priority(lowPriority).Obj(),
},
initPodGroups: []*schedulingapi.PodGroup{
st.MakePodGroup().Name("victim-pg").UID("victim-pg").Priority(highPriority).Obj(),
},
preemptor: newPodGroupPreemptor(
st.MakePodGroup().Name("preemptor-pg").Priority(midPriority).Obj(),
[]*v1.Pod{
st.MakePod().Name("p1").UID("p1").Priority(midPriority).Obj(),
st.MakePod().Name("p2").UID("p2").Priority(midPriority).NominatedNodeName("node1").Obj(),
},
),
blockingRules: []blockingRule{
{nodeName: "node1", blockingVictims: sets.New("other-victim"), capacity: 1},
},
expectedPods: []string{"other-victim"},
expectedStatus: fwk.NewStatus(fwk.Success),
},
{
name: "Preempt single lower priority pod",
nodeNames: []string{"node1"},
@ -1027,7 +1150,9 @@ func TestPodGroupEvaluator_SelectVictimsOnDomain(t *testing.T) {
return nil, fwk.NewStatus(fwk.Unschedulable)
}
pl := &PodGroupEvaluator{}
pl := &PodGroupEvaluator{
podGroupLister: pgLister,
}
victims, gotStatus := pl.selectVictimsOnDomain(ctx, tt.preemptor, domain, tt.pdbs, mockSchedulingFunc)
if !gotStatus.IsSuccess() {

View file

@ -0,0 +1,35 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package preemption
import (
v1 "k8s.io/api/core/v1"
)
// PodTerminatingByPreemption returns true if the pod is in the termination state caused by scheduler preemption.
func PodTerminatingByPreemption(p *v1.Pod) bool {
if p.DeletionTimestamp == nil {
return false
}
for _, condition := range p.Status.Conditions {
if condition.Type == v1.DisruptionTarget {
return condition.Status == v1.ConditionTrue && condition.Reason == v1.PodReasonPreemptionByScheduler
}
}
return false
}