Revert "Merge pull request #133213 from sanposhiho/second-trial-conor"

This reverts commit a2bf45b081, reversing
changes made to 2b2ea27250.
This commit is contained in:
Maciej Skoczeń 2025-09-24 11:05:16 +00:00
parent 2fc2c0a38d
commit d2e6be440c
3 changed files with 39 additions and 127 deletions

View file

@ -189,12 +189,12 @@ func NewEvaluator(pluginName string, fh fwk.Handle, i Interface, enableAsyncPree
}
}
if err := util.DeletePod(ctx, ev.Handler.ClientSet(), victim); err != nil {
if apierrors.IsNotFound(err) {
logger.V(2).Info("Victim Pod is already deleted", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name())
} else {
if !apierrors.IsNotFound(err) {
logger.Error(err, "Tried to preempted pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(preemptor))
return err
}
return err
logger.V(2).Info("Victim Pod is already deleted", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name())
return nil
}
logger.V(2).Info("Preemptor Pod preempted victim Pod", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name())
}
@ -436,14 +436,7 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.
logger := klog.FromContext(ctx)
errCh := parallelize.NewErrorChannel()
fh.Parallelizer().Until(ctx, len(c.Victims().Pods), func(index int) {
victimPod := c.Victims().Pods[index]
if victimPod.DeletionTimestamp != nil {
// If the victim Pod is already being deleted, we don't have to make another deletion api call.
logger.V(2).Info("Victim Pod is already deleted, skipping the API call for it", "preemptor", klog.KObj(pod), "node", c.Name(), "victim", klog.KObj(victimPod))
return
}
if err := ev.PreemptPod(ctx, c, pod, victimPod, pluginName); err != nil && !apierrors.IsNotFound(err) {
if err := ev.PreemptPod(ctx, c, pod, c.Victims().Pods[index], pluginName); err != nil {
errCh.SendErrorWithCancel(err, cancel)
}
}, ev.PluginName)
@ -504,34 +497,11 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName
// Intentionally create a new context, not using a ctx from the scheduling cycle, to create ctx,
// because this process could continue even after this scheduling cycle finishes.
ctx, cancel := context.WithCancel(context.Background())
logger := klog.FromContext(ctx)
victimPods := make([]*v1.Pod, 0, len(c.Victims().Pods))
for _, victim := range c.Victims().Pods {
if victim.DeletionTimestamp != nil {
// If the victim Pod is already being deleted, we don't have to make another deletion api call.
logger.V(2).Info("Victim Pod is already deleted, skipping the API call for it", "preemptor", klog.KObj(pod), "node", c.Name(), "victim", klog.KObj(victim))
continue
}
victimPods = append(victimPods, victim)
}
if len(victimPods) == 0 {
cancel()
return
}
errCh := parallelize.NewErrorChannel()
// Whether all victim pods are already deleted before making API call.
var allPodsAlreadyDeleted atomic.Bool
allPodsAlreadyDeleted.Store(true)
preemptPod := func(index int) {
victim := victimPods[index]
err := ev.PreemptPod(ctx, c, pod, victim, pluginName)
switch {
case err != nil && !apierrors.IsNotFound(err):
// We don't have to handle NotFound error here, because it means the victim Pod is already deleted, and the preemption didn't have to remove it.
victim := c.Victims().Pods[index]
if err := ev.PreemptPod(ctx, c, pod, victim, pluginName); err != nil {
errCh.SendErrorWithCancel(err, cancel)
case err == nil:
allPodsAlreadyDeleted.Store(false)
}
}
@ -539,24 +509,21 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName
ev.preempting.Insert(pod.UID)
ev.mu.Unlock()
logger := klog.FromContext(ctx)
go func() {
startTime := time.Now()
result := metrics.GoroutineResultSuccess
defer metrics.PreemptionGoroutinesDuration.WithLabelValues(result).Observe(metrics.SinceInSeconds(startTime))
defer metrics.PreemptionGoroutinesExecutionTotal.WithLabelValues(result).Inc()
defer func() {
// When API call isn't successful, the Pod may get stuck in the unschedulable pod pool in the worst case.
// So, we should move the Pod to the activeQ.
if result == metrics.GoroutineResultError ||
// When all pods are already deleted (which is very rare, but could happen in theory),
// it's safe to activate the preemptor Pod because it might miss Pod/delete event that requeues the pod.
allPodsAlreadyDeleted.Load() {
if result == metrics.GoroutineResultError {
// When API call isn't successful, the Pod may get stuck in the unschedulable pod pool in the worst case.
// So, we should move the Pod to the activeQ.
ev.Handler.Activate(logger, map[string]*v1.Pod{pod.Name: pod})
}
}()
defer cancel()
logger.V(2).Info("Start the preemption asynchronously", "preemptor", klog.KObj(pod), "node", c.Name(), "numVictims", len(c.Victims().Pods), "numVictimsToDelete", len(victimPods))
logger.V(2).Info("Start the preemption asynchronously", "preemptor", klog.KObj(pod), "node", c.Name(), "numVictims", len(c.Victims().Pods))
// Lower priority pods nominated to run on this node, may no longer fit on
// this node. So, we should remove their nomination. Removing their
@ -569,32 +536,33 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName
// We do not return as this error is not critical.
}
if len(victimPods) > 1 {
// We can evict all victims in parallel, but the last one.
// We have to remove the pod from the preempting map before the last one is evicted
// because, otherwise, the pod removal might be notified to the scheduling queue before
// we remove this pod from the preempting map,
// and the pod could end up stucking at the unschedulable pod pool
// by all the pod removal events being ignored.
ev.Handler.Parallelizer().Until(ctx, len(victimPods)-1, preemptPod, ev.PluginName)
if err := errCh.ReceiveError(); err != nil {
utilruntime.HandleErrorWithContext(ctx, err, "Error occurred during async preemption")
result = metrics.GoroutineResultError
}
if len(c.Victims().Pods) == 0 {
ev.mu.Lock()
delete(ev.preempting, pod.UID)
ev.mu.Unlock()
return
}
// We can evict all victims in parallel, but the last one.
// We have to remove the pod from the preempting map before the last one is evicted
// because, otherwise, the pod removal might be notified to the scheduling queue before
// we remove this pod from the preempting map,
// and the pod could end up stucking at the unschedulable pod pool
// by all the pod removal events being ignored.
ev.Handler.Parallelizer().Until(ctx, len(c.Victims().Pods)-1, preemptPod, ev.PluginName)
if err := errCh.ReceiveError(); err != nil {
utilruntime.HandleErrorWithContext(ctx, err, "Error occurred during async preemption")
result = metrics.GoroutineResultError
}
ev.mu.Lock()
delete(ev.preempting, pod.UID)
ev.mu.Unlock()
err := ev.PreemptPod(ctx, c, pod, victimPods[len(victimPods)-1], pluginName)
switch {
case err != nil && !apierrors.IsNotFound(err):
// We don't have to handle NotFound error here, because it means the victim Pod is already deleted, and the preemption didn't have to remove it.
if err := ev.PreemptPod(ctx, c, pod, c.Victims().Pods[len(c.Victims().Pods)-1], pluginName); err != nil {
utilruntime.HandleErrorWithContext(ctx, err, "Error occurred during async preemption")
result = metrics.GoroutineResultError
case err == nil:
allPodsAlreadyDeleted.Store(false)
}
logger.V(2).Info("Async Preemption finished completely", "preemptor", klog.KObj(pod), "node", c.Name(), "result", result)

View file

@ -30,7 +30,6 @@ import (
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
@ -45,12 +44,12 @@ import (
"k8s.io/klog/v2/ktesting"
extenderv1 "k8s.io/kube-scheduler/extender/v1"
fwk "k8s.io/kube-scheduler/framework"
apicache "k8s.io/kubernetes/pkg/scheduler/backend/api_cache"
apidispatcher "k8s.io/kubernetes/pkg/scheduler/backend/api_dispatcher"
"k8s.io/kubernetes/pkg/scheduler/backend/api_cache"
"k8s.io/kubernetes/pkg/scheduler/backend/api_dispatcher"
internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache"
internalqueue "k8s.io/kubernetes/pkg/scheduler/backend/queue"
"k8s.io/kubernetes/pkg/scheduler/framework"
apicalls "k8s.io/kubernetes/pkg/scheduler/framework/api_calls"
"k8s.io/kubernetes/pkg/scheduler/framework/api_calls"
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
@ -421,10 +420,6 @@ func TestPrepareCandidate(t *testing.T) {
Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}).
Obj()
notFoundVictim1 = st.MakePod().Name("not-found-victim").UID("victim1").
Node(node1Name).SchedulerName(defaultSchedulerName).Priority(midPriority).
Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}).
Obj()
failVictim = st.MakePod().Name("fail-victim").UID("victim1").
Node(node1Name).SchedulerName(defaultSchedulerName).Priority(midPriority).
Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}).
@ -456,12 +451,6 @@ func TestPrepareCandidate(t *testing.T) {
errPatchStatusFailed = errors.New("patch pod status failed")
)
victimWithDeletionTimestamp := victim1.DeepCopy()
victimWithDeletionTimestamp.Name = "victim1-with-deletion-timestamp"
victimWithDeletionTimestamp.UID = "victim1-with-deletion-timestamp"
victimWithDeletionTimestamp.DeletionTimestamp = &metav1.Time{Time: time.Now().Add(-100 * time.Second)}
victimWithDeletionTimestamp.Finalizers = []string{"test"}
tests := []struct {
name string
nodeNames []string
@ -490,8 +479,9 @@ func TestPrepareCandidate(t *testing.T) {
testPods: []*v1.Pod{
victim1,
},
nodeNames: []string{node1Name},
expectedStatus: nil,
nodeNames: []string{node1Name},
expectedStatus: nil,
expectedPreemptingMap: sets.New(types.UID("preemptor")),
},
{
name: "one victim without condition",
@ -513,42 +503,6 @@ func TestPrepareCandidate(t *testing.T) {
expectedStatus: nil,
expectedPreemptingMap: sets.New(types.UID("preemptor")),
},
{
name: "one victim, but victim is already being deleted",
candidate: &fakeCandidate{
name: node1Name,
victims: &extenderv1.Victims{
Pods: []*v1.Pod{
victimWithDeletionTimestamp,
},
},
},
preemptor: preemptor,
testPods: []*v1.Pod{
victimWithDeletionTimestamp,
},
nodeNames: []string{node1Name},
expectedStatus: nil,
},
{
name: "one victim, but victim is already deleted",
candidate: &fakeCandidate{
name: node1Name,
victims: &extenderv1.Victims{
Pods: []*v1.Pod{
notFoundVictim1,
},
},
},
preemptor: preemptor,
testPods: []*v1.Pod{},
nodeNames: []string{node1Name},
expectedStatus: nil,
expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor},
expectedPreemptingMap: sets.New(types.UID("preemptor")),
},
{
name: "one victim with same condition",
@ -709,11 +663,6 @@ func TestPrepareCandidate(t *testing.T) {
deletionFailure = true
return true, nil, errDeletePodFailed
}
// fake clientset does not return an error for not-found pods, so we simulate it here.
if name == "not-found-victim" {
// Simulate a not-found error.
return true, nil, apierrors.NewNotFound(v1.Resource("pods"), name)
}
deletedPods.Insert(name)
return true, nil, nil
@ -726,10 +675,6 @@ func TestPrepareCandidate(t *testing.T) {
patchFailure = true
return true, nil, errPatchStatusFailed
}
// fake clientset does not return an error for not-found pods, so we simulate it here.
if action.(clienttesting.PatchAction).GetName() == "not-found-victim" {
return true, nil, apierrors.NewNotFound(v1.Resource("pods"), "not-found-victim")
}
return true, nil, nil
})

View file

@ -119,10 +119,10 @@ func TestNominatedNode(t *testing.T) {
st.MakePod().Name("low-4").Priority(lowPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(),
},
{
st.MakePod().Name("medium").Priority(mediumPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "3"}).Obj(),
st.MakePod().Name("medium").Priority(mediumPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Obj(),
},
{
st.MakePod().Name("high").Priority(highPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Obj(),
st.MakePod().Name("high").Priority(highPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "3"}).Obj(),
},
},
postChecks: []func(ctx context.Context, cs clientset.Interface, pod *v1.Pod) error{
@ -130,7 +130,6 @@ func TestNominatedNode(t *testing.T) {
testutils.WaitForNominatedNodeName,
testutils.WaitForNominatedNodeName,
},
podNamesToDelete: []string{"low-1", "low-2", "low-3", "low-4"},
expectNilNominatedNodeName: true,
},
{