Merge pull request #133213 from sanposhiho/second-trial-conor

fix: handle corner cases in the async preemption
This commit is contained in:
Kubernetes Prow Robot 2025-07-28 10:06:36 -07:00 committed by GitHub
commit a2bf45b081
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 127 additions and 39 deletions

View file

@ -189,12 +189,12 @@ func NewEvaluator(pluginName string, fh framework.Handle, i Interface, enableAsy
}
}
if err := util.DeletePod(ctx, ev.Handler.ClientSet(), victim); err != nil {
if !apierrors.IsNotFound(err) {
if apierrors.IsNotFound(err) {
logger.V(2).Info("Victim Pod is already deleted", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name())
} else {
logger.Error(err, "Tried to preempted pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(preemptor))
return err
}
logger.V(2).Info("Victim Pod is already deleted", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name())
return nil
return err
}
logger.V(2).Info("Preemptor Pod preempted victim Pod", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name())
}
@ -436,7 +436,14 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.
logger := klog.FromContext(ctx)
errCh := parallelize.NewErrorChannel()
fh.Parallelizer().Until(ctx, len(c.Victims().Pods), func(index int) {
if err := ev.PreemptPod(ctx, c, pod, c.Victims().Pods[index], pluginName); err != nil {
victimPod := c.Victims().Pods[index]
if victimPod.DeletionTimestamp != nil {
// If the victim Pod is already being deleted, we don't have to make another deletion api call.
logger.V(2).Info("Victim Pod is already deleted, skipping the API call for it", "preemptor", klog.KObj(pod), "node", c.Name(), "victim", klog.KObj(victimPod))
return
}
if err := ev.PreemptPod(ctx, c, pod, victimPod, pluginName); err != nil && !apierrors.IsNotFound(err) {
errCh.SendErrorWithCancel(err, cancel)
}
}, ev.PluginName)
@ -497,11 +504,34 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName
// Intentionally create a new context, not using a ctx from the scheduling cycle, to create ctx,
// because this process could continue even after this scheduling cycle finishes.
ctx, cancel := context.WithCancel(context.Background())
logger := klog.FromContext(ctx)
victimPods := make([]*v1.Pod, 0, len(c.Victims().Pods))
for _, victim := range c.Victims().Pods {
if victim.DeletionTimestamp != nil {
// If the victim Pod is already being deleted, we don't have to make another deletion api call.
logger.V(2).Info("Victim Pod is already deleted, skipping the API call for it", "preemptor", klog.KObj(pod), "node", c.Name(), "victim", klog.KObj(victim))
continue
}
victimPods = append(victimPods, victim)
}
if len(victimPods) == 0 {
cancel()
return
}
errCh := parallelize.NewErrorChannel()
// Whether all victim pods are already deleted before making API call.
var allPodsAlreadyDeleted atomic.Bool
allPodsAlreadyDeleted.Store(true)
preemptPod := func(index int) {
victim := c.Victims().Pods[index]
if err := ev.PreemptPod(ctx, c, pod, victim, pluginName); err != nil {
victim := victimPods[index]
err := ev.PreemptPod(ctx, c, pod, victim, pluginName)
switch {
case err != nil && !apierrors.IsNotFound(err):
// We don't have to handle NotFound error here, because it means the victim Pod is already deleted, and the preemption didn't have to remove it.
errCh.SendErrorWithCancel(err, cancel)
case err == nil:
allPodsAlreadyDeleted.Store(false)
}
}
@ -509,21 +539,24 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName
ev.preempting.Insert(pod.UID)
ev.mu.Unlock()
logger := klog.FromContext(ctx)
go func() {
startTime := time.Now()
result := metrics.GoroutineResultSuccess
defer metrics.PreemptionGoroutinesDuration.WithLabelValues(result).Observe(metrics.SinceInSeconds(startTime))
defer metrics.PreemptionGoroutinesExecutionTotal.WithLabelValues(result).Inc()
defer func() {
if result == metrics.GoroutineResultError {
// When API call isn't successful, the Pod may get stuck in the unschedulable pod pool in the worst case.
// So, we should move the Pod to the activeQ.
// When API call isn't successful, the Pod may get stuck in the unschedulable pod pool in the worst case.
// So, we should move the Pod to the activeQ.
if result == metrics.GoroutineResultError ||
// When all pods are already deleted (which is very rare, but could happen in theory),
// it's safe to activate the preemptor Pod because it might miss Pod/delete event that requeues the pod.
allPodsAlreadyDeleted.Load() {
ev.Handler.Activate(logger, map[string]*v1.Pod{pod.Name: pod})
}
}()
defer cancel()
logger.V(2).Info("Start the preemption asynchronously", "preemptor", klog.KObj(pod), "node", c.Name(), "numVictims", len(c.Victims().Pods))
logger.V(2).Info("Start the preemption asynchronously", "preemptor", klog.KObj(pod), "node", c.Name(), "numVictims", len(c.Victims().Pods), "numVictimsToDelete", len(victimPods))
// Lower priority pods nominated to run on this node, may no longer fit on
// this node. So, we should remove their nomination. Removing their
@ -536,33 +569,32 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName
// We do not return as this error is not critical.
}
if len(c.Victims().Pods) == 0 {
ev.mu.Lock()
delete(ev.preempting, pod.UID)
ev.mu.Unlock()
return
}
// We can evict all victims in parallel, but the last one.
// We have to remove the pod from the preempting map before the last one is evicted
// because, otherwise, the pod removal might be notified to the scheduling queue before
// we remove this pod from the preempting map,
// and the pod could end up stucking at the unschedulable pod pool
// by all the pod removal events being ignored.
ev.Handler.Parallelizer().Until(ctx, len(c.Victims().Pods)-1, preemptPod, ev.PluginName)
if err := errCh.ReceiveError(); err != nil {
utilruntime.HandleErrorWithContext(ctx, err, "Error occurred during async preemption")
result = metrics.GoroutineResultError
if len(victimPods) > 1 {
// We can evict all victims in parallel, but the last one.
// We have to remove the pod from the preempting map before the last one is evicted
// because, otherwise, the pod removal might be notified to the scheduling queue before
// we remove this pod from the preempting map,
// and the pod could end up stucking at the unschedulable pod pool
// by all the pod removal events being ignored.
ev.Handler.Parallelizer().Until(ctx, len(victimPods)-1, preemptPod, ev.PluginName)
if err := errCh.ReceiveError(); err != nil {
utilruntime.HandleErrorWithContext(ctx, err, "Error occurred during async preemption")
result = metrics.GoroutineResultError
}
}
ev.mu.Lock()
delete(ev.preempting, pod.UID)
ev.mu.Unlock()
if err := ev.PreemptPod(ctx, c, pod, c.Victims().Pods[len(c.Victims().Pods)-1], pluginName); err != nil {
err := ev.PreemptPod(ctx, c, pod, victimPods[len(victimPods)-1], pluginName)
switch {
case err != nil && !apierrors.IsNotFound(err):
// We don't have to handle NotFound error here, because it means the victim Pod is already deleted, and the preemption didn't have to remove it.
utilruntime.HandleErrorWithContext(ctx, err, "Error occurred during async preemption")
result = metrics.GoroutineResultError
case err == nil:
allPodsAlreadyDeleted.Store(false)
}
logger.V(2).Info("Async Preemption finished completely", "preemptor", klog.KObj(pod), "node", c.Name(), "result", result)

View file

@ -30,6 +30,7 @@ import (
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
@ -44,12 +45,12 @@ import (
"k8s.io/klog/v2/ktesting"
extenderv1 "k8s.io/kube-scheduler/extender/v1"
fwk "k8s.io/kube-scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/backend/api_cache"
"k8s.io/kubernetes/pkg/scheduler/backend/api_dispatcher"
apicache "k8s.io/kubernetes/pkg/scheduler/backend/api_cache"
apidispatcher "k8s.io/kubernetes/pkg/scheduler/backend/api_dispatcher"
internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/backend/queue"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/api_calls"
apicalls "k8s.io/kubernetes/pkg/scheduler/framework/api_calls"
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
@ -420,6 +421,10 @@ func TestPrepareCandidate(t *testing.T) {
Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}).
Obj()
notFoundVictim1 = st.MakePod().Name("not-found-victim").UID("victim1").
Node(node1Name).SchedulerName(defaultSchedulerName).Priority(midPriority).
Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}).
Obj()
failVictim = st.MakePod().Name("fail-victim").UID("victim1").
Node(node1Name).SchedulerName(defaultSchedulerName).Priority(midPriority).
Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}).
@ -451,6 +456,12 @@ func TestPrepareCandidate(t *testing.T) {
errPatchStatusFailed = errors.New("patch pod status failed")
)
victimWithDeletionTimestamp := victim1.DeepCopy()
victimWithDeletionTimestamp.Name = "victim1-with-deletion-timestamp"
victimWithDeletionTimestamp.UID = "victim1-with-deletion-timestamp"
victimWithDeletionTimestamp.DeletionTimestamp = &metav1.Time{Time: time.Now().Add(-100 * time.Second)}
victimWithDeletionTimestamp.Finalizers = []string{"test"}
tests := []struct {
name string
nodeNames []string
@ -479,9 +490,8 @@ func TestPrepareCandidate(t *testing.T) {
testPods: []*v1.Pod{
victim1,
},
nodeNames: []string{node1Name},
expectedStatus: nil,
expectedPreemptingMap: sets.New(types.UID("preemptor")),
nodeNames: []string{node1Name},
expectedStatus: nil,
},
{
name: "one victim without condition",
@ -503,6 +513,42 @@ func TestPrepareCandidate(t *testing.T) {
expectedStatus: nil,
expectedPreemptingMap: sets.New(types.UID("preemptor")),
},
{
name: "one victim, but victim is already being deleted",
candidate: &fakeCandidate{
name: node1Name,
victims: &extenderv1.Victims{
Pods: []*v1.Pod{
victimWithDeletionTimestamp,
},
},
},
preemptor: preemptor,
testPods: []*v1.Pod{
victimWithDeletionTimestamp,
},
nodeNames: []string{node1Name},
expectedStatus: nil,
},
{
name: "one victim, but victim is already deleted",
candidate: &fakeCandidate{
name: node1Name,
victims: &extenderv1.Victims{
Pods: []*v1.Pod{
notFoundVictim1,
},
},
},
preemptor: preemptor,
testPods: []*v1.Pod{},
nodeNames: []string{node1Name},
expectedStatus: nil,
expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor},
expectedPreemptingMap: sets.New(types.UID("preemptor")),
},
{
name: "one victim with same condition",
@ -663,6 +709,11 @@ func TestPrepareCandidate(t *testing.T) {
deletionFailure = true
return true, nil, errDeletePodFailed
}
// fake clientset does not return an error for not-found pods, so we simulate it here.
if name == "not-found-victim" {
// Simulate a not-found error.
return true, nil, apierrors.NewNotFound(v1.Resource("pods"), name)
}
deletedPods.Insert(name)
return true, nil, nil
@ -675,6 +726,10 @@ func TestPrepareCandidate(t *testing.T) {
patchFailure = true
return true, nil, errPatchStatusFailed
}
// fake clientset does not return an error for not-found pods, so we simulate it here.
if action.(clienttesting.PatchAction).GetName() == "not-found-victim" {
return true, nil, apierrors.NewNotFound(v1.Resource("pods"), "not-found-victim")
}
return true, nil, nil
})

View file

@ -1573,10 +1573,10 @@ func TestNominatedNodeCleanUp(t *testing.T) {
st.MakePod().Name("low-4").Priority(lowPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(),
},
{
st.MakePod().Name("medium").Priority(mediumPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Obj(),
st.MakePod().Name("medium").Priority(mediumPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "3"}).Obj(),
},
{
st.MakePod().Name("high").Priority(highPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "3"}).Obj(),
st.MakePod().Name("high").Priority(highPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Obj(),
},
},
postChecks: []func(ctx context.Context, cs clientset.Interface, pod *v1.Pod) error{
@ -1584,6 +1584,7 @@ func TestNominatedNodeCleanUp(t *testing.T) {
testutils.WaitForNominatedNodeName,
testutils.WaitForNominatedNodeName,
},
podNamesToDelete: []string{"low-1", "low-2", "low-3", "low-4"},
},
{
name: "mid-priority pod preempts low-priority pod, followed by a high-priority pod without additional preemption",