diff --git a/pkg/scheduler/framework/preemption/preemption.go b/pkg/scheduler/framework/preemption/preemption.go index 71766273dff..296a2c50517 100644 --- a/pkg/scheduler/framework/preemption/preemption.go +++ b/pkg/scheduler/framework/preemption/preemption.go @@ -189,12 +189,12 @@ func NewEvaluator(pluginName string, fh fwk.Handle, i Interface, enableAsyncPree } } if err := util.DeletePod(ctx, ev.Handler.ClientSet(), victim); err != nil { - if apierrors.IsNotFound(err) { - logger.V(2).Info("Victim Pod is already deleted", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name()) - } else { + if !apierrors.IsNotFound(err) { logger.Error(err, "Tried to preempted pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(preemptor)) + return err } - return err + logger.V(2).Info("Victim Pod is already deleted", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name()) + return nil } logger.V(2).Info("Preemptor Pod preempted victim Pod", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name()) } @@ -436,14 +436,7 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1. logger := klog.FromContext(ctx) errCh := parallelize.NewErrorChannel() fh.Parallelizer().Until(ctx, len(c.Victims().Pods), func(index int) { - victimPod := c.Victims().Pods[index] - if victimPod.DeletionTimestamp != nil { - // If the victim Pod is already being deleted, we don't have to make another deletion api call. - logger.V(2).Info("Victim Pod is already deleted, skipping the API call for it", "preemptor", klog.KObj(pod), "node", c.Name(), "victim", klog.KObj(victimPod)) - return - } - - if err := ev.PreemptPod(ctx, c, pod, victimPod, pluginName); err != nil && !apierrors.IsNotFound(err) { + if err := ev.PreemptPod(ctx, c, pod, c.Victims().Pods[index], pluginName); err != nil { errCh.SendErrorWithCancel(err, cancel) } }, ev.PluginName) @@ -504,34 +497,11 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName // Intentionally create a new context, not using a ctx from the scheduling cycle, to create ctx, // because this process could continue even after this scheduling cycle finishes. ctx, cancel := context.WithCancel(context.Background()) - logger := klog.FromContext(ctx) - victimPods := make([]*v1.Pod, 0, len(c.Victims().Pods)) - for _, victim := range c.Victims().Pods { - if victim.DeletionTimestamp != nil { - // If the victim Pod is already being deleted, we don't have to make another deletion api call. - logger.V(2).Info("Victim Pod is already deleted, skipping the API call for it", "preemptor", klog.KObj(pod), "node", c.Name(), "victim", klog.KObj(victim)) - continue - } - victimPods = append(victimPods, victim) - } - if len(victimPods) == 0 { - cancel() - return - } - errCh := parallelize.NewErrorChannel() - // Whether all victim pods are already deleted before making API call. - var allPodsAlreadyDeleted atomic.Bool - allPodsAlreadyDeleted.Store(true) preemptPod := func(index int) { - victim := victimPods[index] - err := ev.PreemptPod(ctx, c, pod, victim, pluginName) - switch { - case err != nil && !apierrors.IsNotFound(err): - // We don't have to handle NotFound error here, because it means the victim Pod is already deleted, and the preemption didn't have to remove it. + victim := c.Victims().Pods[index] + if err := ev.PreemptPod(ctx, c, pod, victim, pluginName); err != nil { errCh.SendErrorWithCancel(err, cancel) - case err == nil: - allPodsAlreadyDeleted.Store(false) } } @@ -539,24 +509,21 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName ev.preempting.Insert(pod.UID) ev.mu.Unlock() + logger := klog.FromContext(ctx) go func() { startTime := time.Now() result := metrics.GoroutineResultSuccess - defer metrics.PreemptionGoroutinesDuration.WithLabelValues(result).Observe(metrics.SinceInSeconds(startTime)) defer metrics.PreemptionGoroutinesExecutionTotal.WithLabelValues(result).Inc() defer func() { - // When API call isn't successful, the Pod may get stuck in the unschedulable pod pool in the worst case. - // So, we should move the Pod to the activeQ. - if result == metrics.GoroutineResultError || - // When all pods are already deleted (which is very rare, but could happen in theory), - // it's safe to activate the preemptor Pod because it might miss Pod/delete event that requeues the pod. - allPodsAlreadyDeleted.Load() { + if result == metrics.GoroutineResultError { + // When API call isn't successful, the Pod may get stuck in the unschedulable pod pool in the worst case. + // So, we should move the Pod to the activeQ. ev.Handler.Activate(logger, map[string]*v1.Pod{pod.Name: pod}) } }() defer cancel() - logger.V(2).Info("Start the preemption asynchronously", "preemptor", klog.KObj(pod), "node", c.Name(), "numVictims", len(c.Victims().Pods), "numVictimsToDelete", len(victimPods)) + logger.V(2).Info("Start the preemption asynchronously", "preemptor", klog.KObj(pod), "node", c.Name(), "numVictims", len(c.Victims().Pods)) // Lower priority pods nominated to run on this node, may no longer fit on // this node. So, we should remove their nomination. Removing their @@ -569,32 +536,33 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName // We do not return as this error is not critical. } - if len(victimPods) > 1 { - // We can evict all victims in parallel, but the last one. - // We have to remove the pod from the preempting map before the last one is evicted - // because, otherwise, the pod removal might be notified to the scheduling queue before - // we remove this pod from the preempting map, - // and the pod could end up stucking at the unschedulable pod pool - // by all the pod removal events being ignored. - ev.Handler.Parallelizer().Until(ctx, len(victimPods)-1, preemptPod, ev.PluginName) - if err := errCh.ReceiveError(); err != nil { - utilruntime.HandleErrorWithContext(ctx, err, "Error occurred during async preemption") - result = metrics.GoroutineResultError - } + if len(c.Victims().Pods) == 0 { + ev.mu.Lock() + delete(ev.preempting, pod.UID) + ev.mu.Unlock() + + return + } + + // We can evict all victims in parallel, but the last one. + // We have to remove the pod from the preempting map before the last one is evicted + // because, otherwise, the pod removal might be notified to the scheduling queue before + // we remove this pod from the preempting map, + // and the pod could end up stucking at the unschedulable pod pool + // by all the pod removal events being ignored. + ev.Handler.Parallelizer().Until(ctx, len(c.Victims().Pods)-1, preemptPod, ev.PluginName) + if err := errCh.ReceiveError(); err != nil { + utilruntime.HandleErrorWithContext(ctx, err, "Error occurred during async preemption") + result = metrics.GoroutineResultError } ev.mu.Lock() delete(ev.preempting, pod.UID) ev.mu.Unlock() - err := ev.PreemptPod(ctx, c, pod, victimPods[len(victimPods)-1], pluginName) - switch { - case err != nil && !apierrors.IsNotFound(err): - // We don't have to handle NotFound error here, because it means the victim Pod is already deleted, and the preemption didn't have to remove it. + if err := ev.PreemptPod(ctx, c, pod, c.Victims().Pods[len(c.Victims().Pods)-1], pluginName); err != nil { utilruntime.HandleErrorWithContext(ctx, err, "Error occurred during async preemption") result = metrics.GoroutineResultError - case err == nil: - allPodsAlreadyDeleted.Store(false) } logger.V(2).Info("Async Preemption finished completely", "preemptor", klog.KObj(pod), "node", c.Name(), "result", result) diff --git a/pkg/scheduler/framework/preemption/preemption_test.go b/pkg/scheduler/framework/preemption/preemption_test.go index da1c060aa64..2e4e49f47d7 100644 --- a/pkg/scheduler/framework/preemption/preemption_test.go +++ b/pkg/scheduler/framework/preemption/preemption_test.go @@ -30,7 +30,6 @@ import ( v1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -45,12 +44,12 @@ import ( "k8s.io/klog/v2/ktesting" extenderv1 "k8s.io/kube-scheduler/extender/v1" fwk "k8s.io/kube-scheduler/framework" - apicache "k8s.io/kubernetes/pkg/scheduler/backend/api_cache" - apidispatcher "k8s.io/kubernetes/pkg/scheduler/backend/api_dispatcher" + "k8s.io/kubernetes/pkg/scheduler/backend/api_cache" + "k8s.io/kubernetes/pkg/scheduler/backend/api_dispatcher" internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache" internalqueue "k8s.io/kubernetes/pkg/scheduler/backend/queue" "k8s.io/kubernetes/pkg/scheduler/framework" - apicalls "k8s.io/kubernetes/pkg/scheduler/framework/api_calls" + "k8s.io/kubernetes/pkg/scheduler/framework/api_calls" "k8s.io/kubernetes/pkg/scheduler/framework/parallelize" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" @@ -421,10 +420,6 @@ func TestPrepareCandidate(t *testing.T) { Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}). Obj() - notFoundVictim1 = st.MakePod().Name("not-found-victim").UID("victim1"). - Node(node1Name).SchedulerName(defaultSchedulerName).Priority(midPriority). - Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}). - Obj() failVictim = st.MakePod().Name("fail-victim").UID("victim1"). Node(node1Name).SchedulerName(defaultSchedulerName).Priority(midPriority). Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}). @@ -456,12 +451,6 @@ func TestPrepareCandidate(t *testing.T) { errPatchStatusFailed = errors.New("patch pod status failed") ) - victimWithDeletionTimestamp := victim1.DeepCopy() - victimWithDeletionTimestamp.Name = "victim1-with-deletion-timestamp" - victimWithDeletionTimestamp.UID = "victim1-with-deletion-timestamp" - victimWithDeletionTimestamp.DeletionTimestamp = &metav1.Time{Time: time.Now().Add(-100 * time.Second)} - victimWithDeletionTimestamp.Finalizers = []string{"test"} - tests := []struct { name string nodeNames []string @@ -490,8 +479,9 @@ func TestPrepareCandidate(t *testing.T) { testPods: []*v1.Pod{ victim1, }, - nodeNames: []string{node1Name}, - expectedStatus: nil, + nodeNames: []string{node1Name}, + expectedStatus: nil, + expectedPreemptingMap: sets.New(types.UID("preemptor")), }, { name: "one victim without condition", @@ -513,42 +503,6 @@ func TestPrepareCandidate(t *testing.T) { expectedStatus: nil, expectedPreemptingMap: sets.New(types.UID("preemptor")), }, - { - name: "one victim, but victim is already being deleted", - - candidate: &fakeCandidate{ - name: node1Name, - victims: &extenderv1.Victims{ - Pods: []*v1.Pod{ - victimWithDeletionTimestamp, - }, - }, - }, - preemptor: preemptor, - testPods: []*v1.Pod{ - victimWithDeletionTimestamp, - }, - nodeNames: []string{node1Name}, - expectedStatus: nil, - }, - { - name: "one victim, but victim is already deleted", - - candidate: &fakeCandidate{ - name: node1Name, - victims: &extenderv1.Victims{ - Pods: []*v1.Pod{ - notFoundVictim1, - }, - }, - }, - preemptor: preemptor, - testPods: []*v1.Pod{}, - nodeNames: []string{node1Name}, - expectedStatus: nil, - expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor}, - expectedPreemptingMap: sets.New(types.UID("preemptor")), - }, { name: "one victim with same condition", @@ -709,11 +663,6 @@ func TestPrepareCandidate(t *testing.T) { deletionFailure = true return true, nil, errDeletePodFailed } - // fake clientset does not return an error for not-found pods, so we simulate it here. - if name == "not-found-victim" { - // Simulate a not-found error. - return true, nil, apierrors.NewNotFound(v1.Resource("pods"), name) - } deletedPods.Insert(name) return true, nil, nil @@ -726,10 +675,6 @@ func TestPrepareCandidate(t *testing.T) { patchFailure = true return true, nil, errPatchStatusFailed } - // fake clientset does not return an error for not-found pods, so we simulate it here. - if action.(clienttesting.PatchAction).GetName() == "not-found-victim" { - return true, nil, apierrors.NewNotFound(v1.Resource("pods"), "not-found-victim") - } return true, nil, nil }) diff --git a/test/integration/scheduler/preemption/nominatednodename/nominatednodename_test.go b/test/integration/scheduler/preemption/nominatednodename/nominatednodename_test.go index f70fdbc80df..9220f7b5ee6 100644 --- a/test/integration/scheduler/preemption/nominatednodename/nominatednodename_test.go +++ b/test/integration/scheduler/preemption/nominatednodename/nominatednodename_test.go @@ -119,10 +119,10 @@ func TestNominatedNode(t *testing.T) { st.MakePod().Name("low-4").Priority(lowPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(), }, { - st.MakePod().Name("medium").Priority(mediumPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "3"}).Obj(), + st.MakePod().Name("medium").Priority(mediumPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Obj(), }, { - st.MakePod().Name("high").Priority(highPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Obj(), + st.MakePod().Name("high").Priority(highPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "3"}).Obj(), }, }, postChecks: []func(ctx context.Context, cs clientset.Interface, pod *v1.Pod) error{ @@ -130,7 +130,6 @@ func TestNominatedNode(t *testing.T) { testutils.WaitForNominatedNodeName, testutils.WaitForNominatedNodeName, }, - podNamesToDelete: []string{"low-1", "low-2", "low-3", "low-4"}, expectNilNominatedNodeName: true, }, {