mirror of
https://github.com/kubernetes/kubernetes.git
synced 2026-06-09 00:34:10 -04:00
Merge pull request #134245 from macsko/revert_pr_133213
Revert "fix: handle corner cases in the async preemption"
This commit is contained in:
commit
c0b9ab3351
3 changed files with 39 additions and 127 deletions
|
|
@ -189,12 +189,12 @@ func NewEvaluator(pluginName string, fh fwk.Handle, i Interface, enableAsyncPree
|
|||
}
|
||||
}
|
||||
if err := util.DeletePod(ctx, ev.Handler.ClientSet(), victim); err != nil {
|
||||
if apierrors.IsNotFound(err) {
|
||||
logger.V(2).Info("Victim Pod is already deleted", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name())
|
||||
} else {
|
||||
if !apierrors.IsNotFound(err) {
|
||||
logger.Error(err, "Tried to preempted pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(preemptor))
|
||||
return err
|
||||
}
|
||||
return err
|
||||
logger.V(2).Info("Victim Pod is already deleted", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name())
|
||||
return nil
|
||||
}
|
||||
logger.V(2).Info("Preemptor Pod preempted victim Pod", "preemptor", klog.KObj(preemptor), "victim", klog.KObj(victim), "node", c.Name())
|
||||
}
|
||||
|
|
@ -436,14 +436,7 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1.
|
|||
logger := klog.FromContext(ctx)
|
||||
errCh := parallelize.NewErrorChannel()
|
||||
fh.Parallelizer().Until(ctx, len(c.Victims().Pods), func(index int) {
|
||||
victimPod := c.Victims().Pods[index]
|
||||
if victimPod.DeletionTimestamp != nil {
|
||||
// If the victim Pod is already being deleted, we don't have to make another deletion api call.
|
||||
logger.V(2).Info("Victim Pod is already deleted, skipping the API call for it", "preemptor", klog.KObj(pod), "node", c.Name(), "victim", klog.KObj(victimPod))
|
||||
return
|
||||
}
|
||||
|
||||
if err := ev.PreemptPod(ctx, c, pod, victimPod, pluginName); err != nil && !apierrors.IsNotFound(err) {
|
||||
if err := ev.PreemptPod(ctx, c, pod, c.Victims().Pods[index], pluginName); err != nil {
|
||||
errCh.SendErrorWithCancel(err, cancel)
|
||||
}
|
||||
}, ev.PluginName)
|
||||
|
|
@ -504,34 +497,11 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName
|
|||
// Intentionally create a new context, not using a ctx from the scheduling cycle, to create ctx,
|
||||
// because this process could continue even after this scheduling cycle finishes.
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
logger := klog.FromContext(ctx)
|
||||
victimPods := make([]*v1.Pod, 0, len(c.Victims().Pods))
|
||||
for _, victim := range c.Victims().Pods {
|
||||
if victim.DeletionTimestamp != nil {
|
||||
// If the victim Pod is already being deleted, we don't have to make another deletion api call.
|
||||
logger.V(2).Info("Victim Pod is already deleted, skipping the API call for it", "preemptor", klog.KObj(pod), "node", c.Name(), "victim", klog.KObj(victim))
|
||||
continue
|
||||
}
|
||||
victimPods = append(victimPods, victim)
|
||||
}
|
||||
if len(victimPods) == 0 {
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
|
||||
errCh := parallelize.NewErrorChannel()
|
||||
// Whether all victim pods are already deleted before making API call.
|
||||
var allPodsAlreadyDeleted atomic.Bool
|
||||
allPodsAlreadyDeleted.Store(true)
|
||||
preemptPod := func(index int) {
|
||||
victim := victimPods[index]
|
||||
err := ev.PreemptPod(ctx, c, pod, victim, pluginName)
|
||||
switch {
|
||||
case err != nil && !apierrors.IsNotFound(err):
|
||||
// We don't have to handle NotFound error here, because it means the victim Pod is already deleted, and the preemption didn't have to remove it.
|
||||
victim := c.Victims().Pods[index]
|
||||
if err := ev.PreemptPod(ctx, c, pod, victim, pluginName); err != nil {
|
||||
errCh.SendErrorWithCancel(err, cancel)
|
||||
case err == nil:
|
||||
allPodsAlreadyDeleted.Store(false)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -539,24 +509,21 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName
|
|||
ev.preempting.Insert(pod.UID)
|
||||
ev.mu.Unlock()
|
||||
|
||||
logger := klog.FromContext(ctx)
|
||||
go func() {
|
||||
startTime := time.Now()
|
||||
result := metrics.GoroutineResultSuccess
|
||||
|
||||
defer metrics.PreemptionGoroutinesDuration.WithLabelValues(result).Observe(metrics.SinceInSeconds(startTime))
|
||||
defer metrics.PreemptionGoroutinesExecutionTotal.WithLabelValues(result).Inc()
|
||||
defer func() {
|
||||
// When API call isn't successful, the Pod may get stuck in the unschedulable pod pool in the worst case.
|
||||
// So, we should move the Pod to the activeQ.
|
||||
if result == metrics.GoroutineResultError ||
|
||||
// When all pods are already deleted (which is very rare, but could happen in theory),
|
||||
// it's safe to activate the preemptor Pod because it might miss Pod/delete event that requeues the pod.
|
||||
allPodsAlreadyDeleted.Load() {
|
||||
if result == metrics.GoroutineResultError {
|
||||
// When API call isn't successful, the Pod may get stuck in the unschedulable pod pool in the worst case.
|
||||
// So, we should move the Pod to the activeQ.
|
||||
ev.Handler.Activate(logger, map[string]*v1.Pod{pod.Name: pod})
|
||||
}
|
||||
}()
|
||||
defer cancel()
|
||||
logger.V(2).Info("Start the preemption asynchronously", "preemptor", klog.KObj(pod), "node", c.Name(), "numVictims", len(c.Victims().Pods), "numVictimsToDelete", len(victimPods))
|
||||
logger.V(2).Info("Start the preemption asynchronously", "preemptor", klog.KObj(pod), "node", c.Name(), "numVictims", len(c.Victims().Pods))
|
||||
|
||||
// Lower priority pods nominated to run on this node, may no longer fit on
|
||||
// this node. So, we should remove their nomination. Removing their
|
||||
|
|
@ -569,32 +536,33 @@ func (ev *Evaluator) prepareCandidateAsync(c Candidate, pod *v1.Pod, pluginName
|
|||
// We do not return as this error is not critical.
|
||||
}
|
||||
|
||||
if len(victimPods) > 1 {
|
||||
// We can evict all victims in parallel, but the last one.
|
||||
// We have to remove the pod from the preempting map before the last one is evicted
|
||||
// because, otherwise, the pod removal might be notified to the scheduling queue before
|
||||
// we remove this pod from the preempting map,
|
||||
// and the pod could end up stucking at the unschedulable pod pool
|
||||
// by all the pod removal events being ignored.
|
||||
ev.Handler.Parallelizer().Until(ctx, len(victimPods)-1, preemptPod, ev.PluginName)
|
||||
if err := errCh.ReceiveError(); err != nil {
|
||||
utilruntime.HandleErrorWithContext(ctx, err, "Error occurred during async preemption")
|
||||
result = metrics.GoroutineResultError
|
||||
}
|
||||
if len(c.Victims().Pods) == 0 {
|
||||
ev.mu.Lock()
|
||||
delete(ev.preempting, pod.UID)
|
||||
ev.mu.Unlock()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// We can evict all victims in parallel, but the last one.
|
||||
// We have to remove the pod from the preempting map before the last one is evicted
|
||||
// because, otherwise, the pod removal might be notified to the scheduling queue before
|
||||
// we remove this pod from the preempting map,
|
||||
// and the pod could end up stucking at the unschedulable pod pool
|
||||
// by all the pod removal events being ignored.
|
||||
ev.Handler.Parallelizer().Until(ctx, len(c.Victims().Pods)-1, preemptPod, ev.PluginName)
|
||||
if err := errCh.ReceiveError(); err != nil {
|
||||
utilruntime.HandleErrorWithContext(ctx, err, "Error occurred during async preemption")
|
||||
result = metrics.GoroutineResultError
|
||||
}
|
||||
|
||||
ev.mu.Lock()
|
||||
delete(ev.preempting, pod.UID)
|
||||
ev.mu.Unlock()
|
||||
|
||||
err := ev.PreemptPod(ctx, c, pod, victimPods[len(victimPods)-1], pluginName)
|
||||
switch {
|
||||
case err != nil && !apierrors.IsNotFound(err):
|
||||
// We don't have to handle NotFound error here, because it means the victim Pod is already deleted, and the preemption didn't have to remove it.
|
||||
if err := ev.PreemptPod(ctx, c, pod, c.Victims().Pods[len(c.Victims().Pods)-1], pluginName); err != nil {
|
||||
utilruntime.HandleErrorWithContext(ctx, err, "Error occurred during async preemption")
|
||||
result = metrics.GoroutineResultError
|
||||
case err == nil:
|
||||
allPodsAlreadyDeleted.Store(false)
|
||||
}
|
||||
|
||||
logger.V(2).Info("Async Preemption finished completely", "preemptor", klog.KObj(pod), "node", c.Name(), "result", result)
|
||||
|
|
|
|||
|
|
@ -30,7 +30,6 @@ import (
|
|||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
policy "k8s.io/api/policy/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
|
|
@ -45,12 +44,12 @@ import (
|
|||
"k8s.io/klog/v2/ktesting"
|
||||
extenderv1 "k8s.io/kube-scheduler/extender/v1"
|
||||
fwk "k8s.io/kube-scheduler/framework"
|
||||
apicache "k8s.io/kubernetes/pkg/scheduler/backend/api_cache"
|
||||
apidispatcher "k8s.io/kubernetes/pkg/scheduler/backend/api_dispatcher"
|
||||
"k8s.io/kubernetes/pkg/scheduler/backend/api_cache"
|
||||
"k8s.io/kubernetes/pkg/scheduler/backend/api_dispatcher"
|
||||
internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache"
|
||||
internalqueue "k8s.io/kubernetes/pkg/scheduler/backend/queue"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework"
|
||||
apicalls "k8s.io/kubernetes/pkg/scheduler/framework/api_calls"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/api_calls"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
|
||||
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
|
||||
|
|
@ -421,10 +420,6 @@ func TestPrepareCandidate(t *testing.T) {
|
|||
Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}).
|
||||
Obj()
|
||||
|
||||
notFoundVictim1 = st.MakePod().Name("not-found-victim").UID("victim1").
|
||||
Node(node1Name).SchedulerName(defaultSchedulerName).Priority(midPriority).
|
||||
Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}).
|
||||
Obj()
|
||||
failVictim = st.MakePod().Name("fail-victim").UID("victim1").
|
||||
Node(node1Name).SchedulerName(defaultSchedulerName).Priority(midPriority).
|
||||
Containers([]v1.Container{st.MakeContainer().Name("container1").Obj()}).
|
||||
|
|
@ -456,12 +451,6 @@ func TestPrepareCandidate(t *testing.T) {
|
|||
errPatchStatusFailed = errors.New("patch pod status failed")
|
||||
)
|
||||
|
||||
victimWithDeletionTimestamp := victim1.DeepCopy()
|
||||
victimWithDeletionTimestamp.Name = "victim1-with-deletion-timestamp"
|
||||
victimWithDeletionTimestamp.UID = "victim1-with-deletion-timestamp"
|
||||
victimWithDeletionTimestamp.DeletionTimestamp = &metav1.Time{Time: time.Now().Add(-100 * time.Second)}
|
||||
victimWithDeletionTimestamp.Finalizers = []string{"test"}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
nodeNames []string
|
||||
|
|
@ -490,8 +479,9 @@ func TestPrepareCandidate(t *testing.T) {
|
|||
testPods: []*v1.Pod{
|
||||
victim1,
|
||||
},
|
||||
nodeNames: []string{node1Name},
|
||||
expectedStatus: nil,
|
||||
nodeNames: []string{node1Name},
|
||||
expectedStatus: nil,
|
||||
expectedPreemptingMap: sets.New(types.UID("preemptor")),
|
||||
},
|
||||
{
|
||||
name: "one victim without condition",
|
||||
|
|
@ -513,42 +503,6 @@ func TestPrepareCandidate(t *testing.T) {
|
|||
expectedStatus: nil,
|
||||
expectedPreemptingMap: sets.New(types.UID("preemptor")),
|
||||
},
|
||||
{
|
||||
name: "one victim, but victim is already being deleted",
|
||||
|
||||
candidate: &fakeCandidate{
|
||||
name: node1Name,
|
||||
victims: &extenderv1.Victims{
|
||||
Pods: []*v1.Pod{
|
||||
victimWithDeletionTimestamp,
|
||||
},
|
||||
},
|
||||
},
|
||||
preemptor: preemptor,
|
||||
testPods: []*v1.Pod{
|
||||
victimWithDeletionTimestamp,
|
||||
},
|
||||
nodeNames: []string{node1Name},
|
||||
expectedStatus: nil,
|
||||
},
|
||||
{
|
||||
name: "one victim, but victim is already deleted",
|
||||
|
||||
candidate: &fakeCandidate{
|
||||
name: node1Name,
|
||||
victims: &extenderv1.Victims{
|
||||
Pods: []*v1.Pod{
|
||||
notFoundVictim1,
|
||||
},
|
||||
},
|
||||
},
|
||||
preemptor: preemptor,
|
||||
testPods: []*v1.Pod{},
|
||||
nodeNames: []string{node1Name},
|
||||
expectedStatus: nil,
|
||||
expectedActivatedPods: map[string]*v1.Pod{preemptor.Name: preemptor},
|
||||
expectedPreemptingMap: sets.New(types.UID("preemptor")),
|
||||
},
|
||||
{
|
||||
name: "one victim with same condition",
|
||||
|
||||
|
|
@ -709,11 +663,6 @@ func TestPrepareCandidate(t *testing.T) {
|
|||
deletionFailure = true
|
||||
return true, nil, errDeletePodFailed
|
||||
}
|
||||
// fake clientset does not return an error for not-found pods, so we simulate it here.
|
||||
if name == "not-found-victim" {
|
||||
// Simulate a not-found error.
|
||||
return true, nil, apierrors.NewNotFound(v1.Resource("pods"), name)
|
||||
}
|
||||
|
||||
deletedPods.Insert(name)
|
||||
return true, nil, nil
|
||||
|
|
@ -726,10 +675,6 @@ func TestPrepareCandidate(t *testing.T) {
|
|||
patchFailure = true
|
||||
return true, nil, errPatchStatusFailed
|
||||
}
|
||||
// fake clientset does not return an error for not-found pods, so we simulate it here.
|
||||
if action.(clienttesting.PatchAction).GetName() == "not-found-victim" {
|
||||
return true, nil, apierrors.NewNotFound(v1.Resource("pods"), "not-found-victim")
|
||||
}
|
||||
return true, nil, nil
|
||||
})
|
||||
|
||||
|
|
|
|||
|
|
@ -119,10 +119,10 @@ func TestNominatedNode(t *testing.T) {
|
|||
st.MakePod().Name("low-4").Priority(lowPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "1"}).Obj(),
|
||||
},
|
||||
{
|
||||
st.MakePod().Name("medium").Priority(mediumPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "3"}).Obj(),
|
||||
st.MakePod().Name("medium").Priority(mediumPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Obj(),
|
||||
},
|
||||
{
|
||||
st.MakePod().Name("high").Priority(highPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Obj(),
|
||||
st.MakePod().Name("high").Priority(highPriority).Req(map[v1.ResourceName]string{v1.ResourceCPU: "3"}).Obj(),
|
||||
},
|
||||
},
|
||||
postChecks: []func(ctx context.Context, cs clientset.Interface, pod *v1.Pod) error{
|
||||
|
|
@ -130,7 +130,6 @@ func TestNominatedNode(t *testing.T) {
|
|||
testutils.WaitForNominatedNodeName,
|
||||
testutils.WaitForNominatedNodeName,
|
||||
},
|
||||
podNamesToDelete: []string{"low-1", "low-2", "low-3", "low-4"},
|
||||
expectNilNominatedNodeName: true,
|
||||
},
|
||||
{
|
||||
|
|
|
|||
Loading…
Reference in a new issue