diff --git a/pkg/scheduler/framework/preemption/preemption.go b/pkg/scheduler/framework/preemption/preemption.go index ee98910ef99..9bcd91592fe 100644 --- a/pkg/scheduler/framework/preemption/preemption.go +++ b/pkg/scheduler/framework/preemption/preemption.go @@ -189,12 +189,12 @@ func NewEvaluator(pluginName string, fh framework.Handle, i Interface, enableAsy } } if err := util.DeletePod(ctx, ev.Handler.ClientSet(), victim); err != nil { - if !apierrors.IsNotFound(err) { + if apierrors.IsNotFound(err) { + logger.V(2).Info("Victim Pod is already deleted", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name()) + } else { logger.Error(err, "Tried to preempted pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(preemptor)) - return err } - logger.V(2).Info("Victim Pod is already deleted", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name()) - return nil + return err } logger.V(2).Info("Preemptor Pod preempted victim Pod", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name()) } @@ -436,7 +436,14 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1. logger := klog.FromContext(ctx) errCh := parallelize.NewErrorChannel() fh.Parallelizer().Until(ctx, len(c.Victims().Pods), func(index int) { - if err := ev.PreemptPod(ctx, c, pod, c.Victims().Pods[index], pluginName); err != nil { + victimPod := c.Victims().Pods[index] + if victimPod.DeletionTimestamp != nil { + // If the victim Pod is already being deleted, we don't have to make another deletion api call. + logger.V(2).Info("Victim Pod is already deleted, skipping the API call for it", "preemptor", klog.KObj(pod), "node", c.Name(), "victim", klog.KObj(victimPod)) + return + } + + if err := ev.PreemptPod(ctx, c, pod, victimPod, pluginName); err != nil && !apierrors.IsNotFound(err) { errCh.SendErrorWithCancel(err, cancel) } }, ev.PluginName) @@ -497,11 +504,34 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName // Intentionally create a new context, not using a ctx from the scheduling cycle, to create ctx, // because this process could continue even after this scheduling cycle finishes. ctx, cancel := context.WithCancel(context.Background()) + logger := klog.FromContext(ctx) + victimPods := make([]*v1.Pod, 0, len(c.Victims().Pods)) + for _, victim := range c.Victims().Pods { + if victim.DeletionTimestamp != nil { + // If the victim Pod is already being deleted, we don't have to make another deletion api call. + logger.V(2).Info("Victim Pod is already deleted, skipping the API call for it", "preemptor", klog.KObj(pod), "node", c.Name(), "victim", klog.KObj(victim)) + continue + } + victimPods = append(victimPods, victim) + } + if len(victimPods) == 0 { + cancel() + return + } + errCh := parallelize.NewErrorChannel() + // Whether all victim pods are already deleted before making API call. + var allPodsAlreadyDeleted atomic.Bool + allPodsAlreadyDeleted.Store(true) preemptPod := func(index int) { - victim := c.Victims().Pods[index] - if err := ev.PreemptPod(ctx, c, pod, victim, pluginName); err != nil { + victim := victimPods[index] + err := ev.PreemptPod(ctx, c, pod, victim, pluginName) + switch { + case err != nil && !apierrors.IsNotFound(err): + // We don't have to handle NotFound error here, because it means the victim Pod is already deleted, and the preemption didn't have to remove it. errCh.SendErrorWithCancel(err, cancel) + case err == nil: + allPodsAlreadyDeleted.Store(false) } } @@ -509,21 +539,24 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName ev.preempting.Insert(pod.UID) ev.mu.Unlock() - logger := klog.FromContext(ctx) go func() { startTime := time.Now() result := metrics.GoroutineResultSuccess + defer metrics.PreemptionGoroutinesDuration.WithLabelValues(result).Observe(metrics.SinceInSeconds(startTime)) defer metrics.PreemptionGoroutinesExecutionTotal.WithLabelValues(result).Inc() defer func() { - if result == metrics.GoroutineResultError { - // When API call isn't successful, the Pod may get stuck in the unschedulable pod pool in the worst case. - // So, we should move the Pod to the activeQ. + // When API call isn't successful, the Pod may get stuck in the unschedulable pod pool in the worst case. + // So, we should move the Pod to the activeQ. + if result == metrics.GoroutineResultError || + // When all pods are already deleted (which is very rare, but could happen in theory), + // it's safe to activate the preemptor Pod because it might miss Pod/delete event that requeues the pod. + allPodsAlreadyDeleted.Load() { ev.Handler.Activate(logger, map[string]*v1.Pod{pod.Name: pod}) } }() defer cancel() - logger.V(2).Info("Start the preemption asynchronously", "preemptor", klog.KObj(pod), "node", c.Name(), "numVictims", len(c.Victims().Pods)) + logger.V(2).Info("Start the preemption asynchronously", "preemptor", klog.KObj(pod), "node", c.Name(), "numVictims", len(c.Victims().Pods), "numVictimsToDelete", len(victimPods)) // Lower priority pods nominated to run on this node, may no longer fit on // this node. So, we should remove their nomination. Removing their @@ -536,33 +569,32 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName // We do not return as this error is not critical. } - if len(c.Victims().Pods) == 0 { - ev.mu.Lock() - delete(ev.preempting, pod.UID) - ev.mu.Unlock() - - return - } - - // We can evict all victims in parallel, but the last one. - // We have to remove the pod from the preempting map before the last one is evicted - // because, otherwise, the pod removal might be notified to the scheduling queue before - // we remove this pod from the preempting map, - // and the pod could end up stucking at the unschedulable pod pool - // by all the pod removal events being ignored. - ev.Handler.Parallelizer().Until(ctx, len(c.Victims().Pods)-1, preemptPod, ev.PluginName) - if err := errCh.ReceiveError(); err != nil { - utilruntime.HandleErrorWithContext(ctx, err, "Error occurred during async preemption") - result = metrics.GoroutineResultError + if len(victimPods) > 1 { + // We can evict all victims in parallel, but the last one. + // We have to remove the pod from the preempting map before the last one is evicted + // because, otherwise, the pod removal might be notified to the scheduling queue before + // we remove this pod from the preempting map, + // and the pod could end up stucking at the unschedulable pod pool + // by all the pod removal events being ignored. + ev.Handler.Parallelizer().Until(ctx, len(victimPods)-1, preemptPod, ev.PluginName) + if err := errCh.ReceiveError(); err != nil { + utilruntime.HandleErrorWithContext(ctx, err, "Error occurred during async preemption") + result = metrics.GoroutineResultError + } } ev.mu.Lock() delete(ev.preempting, pod.UID) ev.mu.Unlock() - if err := ev.PreemptPod(ctx, c, pod, c.Victims().Pods[len(c.Victims().Pods)-1], pluginName); err != nil { + err := ev.PreemptPod(ctx, c, pod, victimPods[len(victimPods)-1], pluginName) + switch { + case err != nil && !apierrors.IsNotFound(err): + // We don't have to handle NotFound error here, because it means the victim Pod is already deleted, and the preemption didn't have to remove it. utilruntime.HandleErrorWithContext(ctx, err, "Error occurred during async preemption") result = metrics.GoroutineResultError + case err == nil: + allPodsAlreadyDeleted.Store(false) } logger.V(2).Info("Async Preemption finished completely", "preemptor", klog.KObj(pod), "node", c.Name(), "result", result) diff --git a/pkg/scheduler/framework/preemption/preemption_test.go b/pkg/scheduler/framework/preemption/preemption_test.go index e8b0a80ecad..5a37f0df4f3 100644 --- a/pkg/scheduler/framework/preemption/preemption_test.go +++ b/pkg/scheduler/framework/preemption/preemption_test.go @@ -30,6 +30,7 @@ import ( v1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -44,12 +45,12 @@ import ( "k8s.io/klog/v2/ktesting" extenderv1 "k8s.io/kube-scheduler/extender/v1" fwk "k8s.io/kube-scheduler/framework" - "k8s.io/kubernetes/pkg/scheduler/backend/api_cache" - "k8s.io/kubernetes/pkg/scheduler/backend/api_dispatcher" + apicache "k8s.io/kubernetes/pkg/scheduler/backend/api_cache" + apidispatcher "k8s.io/kubernetes/pkg/scheduler/backend/api_dispatcher" internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/backend/queue" "k8s.io/kubernetes/pkg/scheduler/framework" - "k8s.io/kubernetes/pkg/scheduler/framework/api_calls" + apicalls "k8s.io/kubernetes/pkg/scheduler/framework/api_calls" "k8s.io/kubernetes/pkg/scheduler/framework/parallelize" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" @@ -420,6 +421,10 @@ func TestPrepareCandidate(t *testing.T) { Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}). Obj() + notFoundVictim1 = st.MakePod().Name("not-found-victim").UID("victim1"). + Node(node1Name).SchedulerName(defaultSchedulerName).Priority(midPriority). + Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}). + Obj() failVictim = st.MakePod().Name("fail-victim").UID("victim1"). Node(node1Name).SchedulerName(defaultSchedulerName).Priority(midPriority). Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}). @@ -451,6 +456,12 @@ func TestPrepareCandidate(t *testing.T) { errPatchStatusFailed = errors.New("patch pod status failed") ) + victimWithDeletionTimestamp := victim1.DeepCopy() + victimWithDeletionTimestamp.Name = "victim1-with-deletion-timestamp" + victimWithDeletionTimestamp.UID = "victim1-with-deletion-timestamp" + victimWithDeletionTimestamp.DeletionTimestamp = &metav1.Time{Time: time.Now().Add(-100 * time.Second)} + victimWithDeletionTimestamp.Finalizers = []string{"test"} + tests := []struct { name string nodeNames []string @@ -479,9 +490,8 @@ func TestPrepareCandidate(t *testing.T) { testPods: []*v1.Pod{ victim1, }, - nodeNames: []string{node1Name}, - expectedStatus: nil, - expectedPreemptingMap: sets.New(types.UID("preemptor")), + nodeNames: []string{node1Name}, + expectedStatus: nil, }, { name: "one victim without condition", @@ -503,6 +513,42 @@ func TestPrepareCandidate(t *testing.T) { expectedStatus: nil, expectedPreemptingMap: sets.New(types.UID("preemptor")), }, + { + name: "one victim, but victim is already being deleted", + + candidate: &fakeCandidate{ + name: node1Name, + victims: &extenderv1.Victims{ + Pods: []*v1.Pod{ + victimWithDeletionTimestamp, + }, + }, + }, + preemptor: preemptor, + testPods: []*v1.Pod{ + victimWithDeletionTimestamp, + }, + nodeNames: []string{node1Name}, + expectedStatus: nil, + }, + { + name: "one victim, but victim is already deleted", + + candidate: &fakeCandidate{ + name: node1Name, + victims: &extenderv1.Victims{ + Pods: []*v1.Pod{ + notFoundVictim1, + }, + }, + }, + preemptor: preemptor, + testPods: []*v1.Pod{}, + nodeNames: []string{node1Name}, + expectedStatus: nil, + expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor}, + expectedPreemptingMap: sets.New(types.UID("preemptor")), + }, { name: "one victim with same condition", @@ -663,6 +709,11 @@ func TestPrepareCandidate(t *testing.T) { deletionFailure = true return true, nil, errDeletePodFailed } + // fake clientset does not return an error for not-found pods, so we simulate it here. + if name == "not-found-victim" { + // Simulate a not-found error. + return true, nil, apierrors.NewNotFound(v1.Resource("pods"), name) + } deletedPods.Insert(name) return true, nil, nil @@ -675,6 +726,10 @@ func TestPrepareCandidate(t *testing.T) { patchFailure = true return true, nil, errPatchStatusFailed } + // fake clientset does not return an error for not-found pods, so we simulate it here. + if action.(clienttesting.PatchAction).GetName() == "not-found-victim" { + return true, nil, apierrors.NewNotFound(v1.Resource("pods"), "not-found-victim") + } return true, nil, nil }) diff --git a/test/integration/scheduler/preemption/preemption_test.go b/test/integration/scheduler/preemption/preemption_test.go index 7cd6bd41220..c418b14f012 100644 --- a/test/integration/scheduler/preemption/preemption_test.go +++ b/test/integration/scheduler/preemption/preemption_test.go @@ -1573,10 +1573,10 @@ func TestNominatedNodeCleanUp(t *testing.T) { st.MakePod().Name("low-4").Priority(lowPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), }, { - st.MakePod().Name("medium").Priority(mediumPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Obj(), + st.MakePod().Name("medium").Priority(mediumPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "3"}).Obj(), }, { - st.MakePod().Name("high").Priority(highPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "3"}).Obj(), + st.MakePod().Name("high").Priority(highPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Obj(), }, }, postChecks: []func(ctx context.Context, cs clientset.Interface, pod *v1.Pod) error{ @@ -1584,6 +1584,7 @@ func TestNominatedNodeCleanUp(t *testing.T) { testutils.WaitForNominatedNodeName, testutils.WaitForNominatedNodeName, }, + podNamesToDelete: []string{"low-1", "low-2", "low-3", "low-4"}, }, { name: "mid-priority pod preempts low-priority pod, followed by a high-priority pod without additional preemption",