From ed74d4cd52fada6c30440f5e19276767bbf5f7a3 Mon Sep 17 00:00:00 2001 From: Kensei Nakada Date: Fri, 25 Jul 2025 18:59:03 +0900 Subject: [PATCH 1/2] Revert "Revert "fix: handle corner cases in the async preemption"" This reverts commit 006d7620a8c10a652bfd9bd570ddd62fd98dc2c7. --- .../framework/preemption/preemption.go | 93 +++++++++++++------ .../framework/preemption/preemption_test.go | 67 +++++++++++-- 2 files changed, 125 insertions(+), 35 deletions(-) diff --git a/pkg/scheduler/framework/preemption/preemption.go b/pkg/scheduler/framework/preemption/preemption.go index ee98910ef99..bd182a8ef76 100644 --- a/pkg/scheduler/framework/preemption/preemption.go +++ b/pkg/scheduler/framework/preemption/preemption.go @@ -191,10 +191,8 @@ func NewEvaluator(pluginName string, fh framework.Handle, i Interface, enableAsy if err := util.DeletePod(ctx, ev.Handler.ClientSet(), victim); err != nil { if !apierrors.IsNotFound(err) { logger.Error(err, "Tried to preempted pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(preemptor)) - return err } - logger.V(2).Info("Victim Pod is already deleted", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name()) - return nil + return err } logger.V(2).Info("Preemptor Pod preempted victim Pod", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name()) } @@ -436,7 +434,18 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1. logger := klog.FromContext(ctx) errCh := parallelize.NewErrorChannel() fh.Parallelizer().Until(ctx, len(c.Victims().Pods), func(index int) { - if err := ev.PreemptPod(ctx, c, pod, c.Victims().Pods[index], pluginName); err != nil { + victimPod := c.Victims().Pods[index] + if victimPod.DeletionTimestamp != nil { + // If the victim Pod is already being deleted, we don't have to make another deletion api call. + logger.V(2).Info("Victim Pod is already deleted, skipping the API call for it", "preemptor", klog.KObj(pod), "node", c.Name(), "victim", klog.KObj(victimPod)) + return + } + + if err := ev.PreemptPod(ctx, c, pod, victimPod, pluginName); err != nil { + if apierrors.IsNotFound(err) { + logger.V(2).Info("Victim Pod is already deleted", "preemptor", klog.KObj(pod), "victim", klog.KObj(victimPod), "node", c.Name()) + return + } errCh.SendErrorWithCancel(err, cancel) } }, ev.PluginName) @@ -497,9 +506,24 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName // Intentionally create a new context, not using a ctx from the scheduling cycle, to create ctx, // because this process could continue even after this scheduling cycle finishes. ctx, cancel := context.WithCancel(context.Background()) + logger := klog.FromContext(ctx) + victimPods := make([]*v1.Pod, 0, len(c.Victims().Pods)) + for _, victim := range c.Victims().Pods { + if victim.DeletionTimestamp != nil { + // If the victim Pod is already being deleted, we don't have to make another deletion api call. + logger.V(2).Info("Victim Pod is already deleted, skipping the API call for it", "preemptor", klog.KObj(pod), "node", c.Name(), "victim", klog.KObj(victim)) + continue + } + victimPods = append(victimPods, victim) + } + if len(victimPods) == 0 { + cancel() + return + } + errCh := parallelize.NewErrorChannel() preemptPod := func(index int) { - victim := c.Victims().Pods[index] + victim := victimPods[index] if err := ev.PreemptPod(ctx, c, pod, victim, pluginName); err != nil { errCh.SendErrorWithCancel(err, cancel) } @@ -509,21 +533,26 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName ev.preempting.Insert(pod.UID) ev.mu.Unlock() - logger := klog.FromContext(ctx) go func() { startTime := time.Now() result := metrics.GoroutineResultSuccess + + // Whether all victim pods are already deleted before making API call. + allPodsAlreadyDeleted := true defer metrics.PreemptionGoroutinesDuration.WithLabelValues(result).Observe(metrics.SinceInSeconds(startTime)) defer metrics.PreemptionGoroutinesExecutionTotal.WithLabelValues(result).Inc() defer func() { - if result == metrics.GoroutineResultError { - // When API call isn't successful, the Pod may get stuck in the unschedulable pod pool in the worst case. - // So, we should move the Pod to the activeQ. + // When API call isn't successful, the Pod may get stuck in the unschedulable pod pool in the worst case. + // So, we should move the Pod to the activeQ. + if result == metrics.GoroutineResultError || + // When all pods are already deleted (which is very rare, but could happen in theory), + // it's safe to activate the preemptor Pod because it might miss Pod/delete event that requeues the pod. + allPodsAlreadyDeleted { ev.Handler.Activate(logger, map[string]*v1.Pod{pod.Name: pod}) } }() defer cancel() - logger.V(2).Info("Start the preemption asynchronously", "preemptor", klog.KObj(pod), "node", c.Name(), "numVictims", len(c.Victims().Pods)) + logger.V(2).Info("Start the preemption asynchronously", "preemptor", klog.KObj(pod), "node", c.Name(), "numVictims", len(c.Victims().Pods), "numVictimsToDelete", len(victimPods)) // Lower priority pods nominated to run on this node, may no longer fit on // this node. So, we should remove their nomination. Removing their @@ -536,33 +565,39 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName // We do not return as this error is not critical. } - if len(c.Victims().Pods) == 0 { - ev.mu.Lock() - delete(ev.preempting, pod.UID) - ev.mu.Unlock() - - return - } - - // We can evict all victims in parallel, but the last one. - // We have to remove the pod from the preempting map before the last one is evicted - // because, otherwise, the pod removal might be notified to the scheduling queue before - // we remove this pod from the preempting map, - // and the pod could end up stucking at the unschedulable pod pool - // by all the pod removal events being ignored. - ev.Handler.Parallelizer().Until(ctx, len(c.Victims().Pods)-1, preemptPod, ev.PluginName) - if err := errCh.ReceiveError(); err != nil { - utilruntime.HandleErrorWithContext(ctx, err, "Error occurred during async preemption") - result = metrics.GoroutineResultError + if len(victimPods) > 1 { + // We can evict all victims in parallel, but the last one. + // We have to remove the pod from the preempting map before the last one is evicted + // because, otherwise, the pod removal might be notified to the scheduling queue before + // we remove this pod from the preempting map, + // and the pod could end up stucking at the unschedulable pod pool + // by all the pod removal events being ignored. + ev.Handler.Parallelizer().Until(ctx, len(victimPods)-1, preemptPod, ev.PluginName) + err := errCh.ReceiveError() + switch { + case apierrors.IsNotFound(err): + logger.V(2).Info("Victim Pod is already deleted", "preemptor", klog.KObj(pod), "node", c.Name(), "err", err) + case err != nil: + utilruntime.HandleErrorWithContext(ctx, err, "Error occurred during async preemption") + result = metrics.GoroutineResultError + default: + allPodsAlreadyDeleted = false + } } ev.mu.Lock() delete(ev.preempting, pod.UID) ev.mu.Unlock() - if err := ev.PreemptPod(ctx, c, pod, c.Victims().Pods[len(c.Victims().Pods)-1], pluginName); err != nil { + err := ev.PreemptPod(ctx, c, pod, victimPods[len(victimPods)-1], pluginName) + switch { + case apierrors.IsNotFound(err): + logger.V(2).Info("Victim Pod is already deleted", "preemptor", klog.KObj(pod), "node", c.Name(), "err", err) + case err != nil: utilruntime.HandleErrorWithContext(ctx, err, "Error occurred during async preemption") result = metrics.GoroutineResultError + default: + allPodsAlreadyDeleted = false } logger.V(2).Info("Async Preemption finished completely", "preemptor", klog.KObj(pod), "node", c.Name(), "result", result) diff --git a/pkg/scheduler/framework/preemption/preemption_test.go b/pkg/scheduler/framework/preemption/preemption_test.go index e8b0a80ecad..5a37f0df4f3 100644 --- a/pkg/scheduler/framework/preemption/preemption_test.go +++ b/pkg/scheduler/framework/preemption/preemption_test.go @@ -30,6 +30,7 @@ import ( v1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -44,12 +45,12 @@ import ( "k8s.io/klog/v2/ktesting" extenderv1 "k8s.io/kube-scheduler/extender/v1" fwk "k8s.io/kube-scheduler/framework" - "k8s.io/kubernetes/pkg/scheduler/backend/api_cache" - "k8s.io/kubernetes/pkg/scheduler/backend/api_dispatcher" + apicache "k8s.io/kubernetes/pkg/scheduler/backend/api_cache" + apidispatcher "k8s.io/kubernetes/pkg/scheduler/backend/api_dispatcher" internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/backend/queue" "k8s.io/kubernetes/pkg/scheduler/framework" - "k8s.io/kubernetes/pkg/scheduler/framework/api_calls" + apicalls "k8s.io/kubernetes/pkg/scheduler/framework/api_calls" "k8s.io/kubernetes/pkg/scheduler/framework/parallelize" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" @@ -420,6 +421,10 @@ func TestPrepareCandidate(t *testing.T) { Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}). Obj() + notFoundVictim1 = st.MakePod().Name("not-found-victim").UID("victim1"). + Node(node1Name).SchedulerName(defaultSchedulerName).Priority(midPriority). + Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}). + Obj() failVictim = st.MakePod().Name("fail-victim").UID("victim1"). Node(node1Name).SchedulerName(defaultSchedulerName).Priority(midPriority). Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}). @@ -451,6 +456,12 @@ func TestPrepareCandidate(t *testing.T) { errPatchStatusFailed = errors.New("patch pod status failed") ) + victimWithDeletionTimestamp := victim1.DeepCopy() + victimWithDeletionTimestamp.Name = "victim1-with-deletion-timestamp" + victimWithDeletionTimestamp.UID = "victim1-with-deletion-timestamp" + victimWithDeletionTimestamp.DeletionTimestamp = &metav1.Time{Time: time.Now().Add(-100 * time.Second)} + victimWithDeletionTimestamp.Finalizers = []string{"test"} + tests := []struct { name string nodeNames []string @@ -479,9 +490,8 @@ func TestPrepareCandidate(t *testing.T) { testPods: []*v1.Pod{ victim1, }, - nodeNames: []string{node1Name}, - expectedStatus: nil, - expectedPreemptingMap: sets.New(types.UID("preemptor")), + nodeNames: []string{node1Name}, + expectedStatus: nil, }, { name: "one victim without condition", @@ -503,6 +513,42 @@ func TestPrepareCandidate(t *testing.T) { expectedStatus: nil, expectedPreemptingMap: sets.New(types.UID("preemptor")), }, + { + name: "one victim, but victim is already being deleted", + + candidate: &fakeCandidate{ + name: node1Name, + victims: &extenderv1.Victims{ + Pods: []*v1.Pod{ + victimWithDeletionTimestamp, + }, + }, + }, + preemptor: preemptor, + testPods: []*v1.Pod{ + victimWithDeletionTimestamp, + }, + nodeNames: []string{node1Name}, + expectedStatus: nil, + }, + { + name: "one victim, but victim is already deleted", + + candidate: &fakeCandidate{ + name: node1Name, + victims: &extenderv1.Victims{ + Pods: []*v1.Pod{ + notFoundVictim1, + }, + }, + }, + preemptor: preemptor, + testPods: []*v1.Pod{}, + nodeNames: []string{node1Name}, + expectedStatus: nil, + expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor}, + expectedPreemptingMap: sets.New(types.UID("preemptor")), + }, { name: "one victim with same condition", @@ -663,6 +709,11 @@ func TestPrepareCandidate(t *testing.T) { deletionFailure = true return true, nil, errDeletePodFailed } + // fake clientset does not return an error for not-found pods, so we simulate it here. + if name == "not-found-victim" { + // Simulate a not-found error. + return true, nil, apierrors.NewNotFound(v1.Resource("pods"), name) + } deletedPods.Insert(name) return true, nil, nil @@ -675,6 +726,10 @@ func TestPrepareCandidate(t *testing.T) { patchFailure = true return true, nil, errPatchStatusFailed } + // fake clientset does not return an error for not-found pods, so we simulate it here. + if action.(clienttesting.PatchAction).GetName() == "not-found-victim" { + return true, nil, apierrors.NewNotFound(v1.Resource("pods"), "not-found-victim") + } return true, nil, nil }) From f3466f8adc18855883336990d94601bccb3a40b6 Mon Sep 17 00:00:00 2001 From: Kensei Nakada Date: Fri, 25 Jul 2025 19:02:52 +0900 Subject: [PATCH 2/2] fix: flake integration test --- .../framework/preemption/preemption.go | 41 +++++++++---------- .../scheduler/preemption/preemption_test.go | 5 ++- 2 files changed, 22 insertions(+), 24 deletions(-) diff --git a/pkg/scheduler/framework/preemption/preemption.go b/pkg/scheduler/framework/preemption/preemption.go index bd182a8ef76..9bcd91592fe 100644 --- a/pkg/scheduler/framework/preemption/preemption.go +++ b/pkg/scheduler/framework/preemption/preemption.go @@ -189,7 +189,9 @@ func NewEvaluator(pluginName string, fh framework.Handle, i Interface, enableAsy } } if err := util.DeletePod(ctx, ev.Handler.ClientSet(), victim); err != nil { - if !apierrors.IsNotFound(err) { + if apierrors.IsNotFound(err) { + logger.V(2).Info("Victim Pod is already deleted", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name()) + } else { logger.Error(err, "Tried to preempted pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(preemptor)) } return err @@ -441,11 +443,7 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1. return } - if err := ev.PreemptPod(ctx, c, pod, victimPod, pluginName); err != nil { - if apierrors.IsNotFound(err) { - logger.V(2).Info("Victim Pod is already deleted", "preemptor", klog.KObj(pod), "victim", klog.KObj(victimPod), "node", c.Name()) - return - } + if err := ev.PreemptPod(ctx, c, pod, victimPod, pluginName); err != nil && !apierrors.IsNotFound(err) { errCh.SendErrorWithCancel(err, cancel) } }, ev.PluginName) @@ -522,10 +520,18 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName } errCh := parallelize.NewErrorChannel() + // Whether all victim pods are already deleted before making API call. + var allPodsAlreadyDeleted atomic.Bool + allPodsAlreadyDeleted.Store(true) preemptPod := func(index int) { victim := victimPods[index] - if err := ev.PreemptPod(ctx, c, pod, victim, pluginName); err != nil { + err := ev.PreemptPod(ctx, c, pod, victim, pluginName) + switch { + case err != nil && !apierrors.IsNotFound(err): + // We don't have to handle NotFound error here, because it means the victim Pod is already deleted, and the preemption didn't have to remove it. errCh.SendErrorWithCancel(err, cancel) + case err == nil: + allPodsAlreadyDeleted.Store(false) } } @@ -537,8 +543,6 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName startTime := time.Now() result := metrics.GoroutineResultSuccess - // Whether all victim pods are already deleted before making API call. - allPodsAlreadyDeleted := true defer metrics.PreemptionGoroutinesDuration.WithLabelValues(result).Observe(metrics.SinceInSeconds(startTime)) defer metrics.PreemptionGoroutinesExecutionTotal.WithLabelValues(result).Inc() defer func() { @@ -547,7 +551,7 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName if result == metrics.GoroutineResultError || // When all pods are already deleted (which is very rare, but could happen in theory), // it's safe to activate the preemptor Pod because it might miss Pod/delete event that requeues the pod. - allPodsAlreadyDeleted { + allPodsAlreadyDeleted.Load() { ev.Handler.Activate(logger, map[string]*v1.Pod{pod.Name: pod}) } }() @@ -573,15 +577,9 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName // and the pod could end up stucking at the unschedulable pod pool // by all the pod removal events being ignored. ev.Handler.Parallelizer().Until(ctx, len(victimPods)-1, preemptPod, ev.PluginName) - err := errCh.ReceiveError() - switch { - case apierrors.IsNotFound(err): - logger.V(2).Info("Victim Pod is already deleted", "preemptor", klog.KObj(pod), "node", c.Name(), "err", err) - case err != nil: + if err := errCh.ReceiveError(); err != nil { utilruntime.HandleErrorWithContext(ctx, err, "Error occurred during async preemption") result = metrics.GoroutineResultError - default: - allPodsAlreadyDeleted = false } } @@ -591,13 +589,12 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName err := ev.PreemptPod(ctx, c, pod, victimPods[len(victimPods)-1], pluginName) switch { - case apierrors.IsNotFound(err): - logger.V(2).Info("Victim Pod is already deleted", "preemptor", klog.KObj(pod), "node", c.Name(), "err", err) - case err != nil: + case err != nil && !apierrors.IsNotFound(err): + // We don't have to handle NotFound error here, because it means the victim Pod is already deleted, and the preemption didn't have to remove it. utilruntime.HandleErrorWithContext(ctx, err, "Error occurred during async preemption") result = metrics.GoroutineResultError - default: - allPodsAlreadyDeleted = false + case err == nil: + allPodsAlreadyDeleted.Store(false) } logger.V(2).Info("Async Preemption finished completely", "preemptor", klog.KObj(pod), "node", c.Name(), "result", result) diff --git a/test/integration/scheduler/preemption/preemption_test.go b/test/integration/scheduler/preemption/preemption_test.go index 7cd6bd41220..c418b14f012 100644 --- a/test/integration/scheduler/preemption/preemption_test.go +++ b/test/integration/scheduler/preemption/preemption_test.go @@ -1573,10 +1573,10 @@ func TestNominatedNodeCleanUp(t *testing.T) { st.MakePod().Name("low-4").Priority(lowPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), }, { - st.MakePod().Name("medium").Priority(mediumPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Obj(), + st.MakePod().Name("medium").Priority(mediumPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "3"}).Obj(), }, { - st.MakePod().Name("high").Priority(highPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "3"}).Obj(), + st.MakePod().Name("high").Priority(highPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Obj(), }, }, postChecks: []func(ctx context.Context, cs clientset.Interface, pod *v1.Pod) error{ @@ -1584,6 +1584,7 @@ func TestNominatedNodeCleanUp(t *testing.T) { testutils.WaitForNominatedNodeName, testutils.WaitForNominatedNodeName, }, + podNamesToDelete: []string{"low-1", "low-2", "low-3", "low-4"}, }, { name: "mid-priority pod preempts low-priority pod, followed by a high-priority pod without additional preemption",