From 15029967870fdea5bb397540f60d6c310b902229 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Skocze=C5=84?= Date: Wed, 29 Oct 2025 09:56:24 +0000 Subject: [PATCH 1/2] Refactor scheduler event handlers for pods to handle binding event in one place --- pkg/scheduler/eventhandlers.go | 210 +++++++++-------- pkg/scheduler/eventhandlers_test.go | 335 ++++++++++++++++++++++++++-- pkg/scheduler/schedule_one_test.go | 2 +- pkg/scheduler/testing/wrappers.go | 6 + 4 files changed, 427 insertions(+), 126 deletions(-) diff --git a/pkg/scheduler/eventhandlers.go b/pkg/scheduler/eventhandlers.go index 3b1688de1c5..cfe5e0179a7 100644 --- a/pkg/scheduler/eventhandlers.go +++ b/pkg/scheduler/eventhandlers.go @@ -126,12 +126,93 @@ func (sched *Scheduler) deleteNodeFromCache(obj interface{}) { } } -func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) { +func (sched *Scheduler) addPod(obj interface{}) { + logger := sched.logger + pod, ok := obj.(*v1.Pod) + if !ok { + utilruntime.HandleErrorWithLogger(logger, nil, "Cannot convert to *v1.Pod", "obj", obj) + return + } + + if assignedPod(pod) { + sched.addAssignedPodToCache(pod) + } else if responsibleForPod(pod, sched.Profiles) { + sched.addPodToSchedulingQueue(pod) + } +} + +func (sched *Scheduler) updatePod(oldObj, newObj interface{}) { + logger := sched.logger + oldPod, ok := oldObj.(*v1.Pod) + if !ok { + utilruntime.HandleErrorWithLogger(logger, nil, "Cannot convert oldObj to *v1.Pod", "oldObj", oldObj) + return + } + newPod, ok := newObj.(*v1.Pod) + if !ok { + utilruntime.HandleErrorWithLogger(logger, nil, "Cannot convert newObj to *v1.Pod", "newObj", newObj) + return + } + + if assignedPod(oldPod) { + sched.updateAssignedPodInCache(oldPod, newPod) + } else if assignedPod(newPod) { + // This update means binding operation. We can treat it as adding the pod to a cache + // (addition to the cache will handle this binding appropriately). + sched.addAssignedPodToCache(newPod) + if responsibleForPod(oldPod, sched.Profiles) { + // Pod shouldn't be in the scheduling queue, but in unlikely event that the pod has been bound + // by another component, it should be removed from scheduling queue for correctness. + // Passing "true" means that removal from the scheduling queue is caused by a binding event, + // not by removal of the pod from the cluster. + sched.deletePodFromSchedulingQueue(oldPod, true) + } + } else if responsibleForPod(oldPod, sched.Profiles) { + sched.updatePodInSchedulingQueue(oldPod, newPod) + } +} + +func (sched *Scheduler) deletePod(obj interface{}) { + logger := sched.logger + var pod *v1.Pod + switch t := obj.(type) { + case *v1.Pod: + pod = t + if assignedPod(pod) { + sched.deleteAssignedPodFromCache(pod) + } else if responsibleForPod(pod, sched.Profiles) { + // Passing "false" means that removal from the scheduling queue is caused by + // removal of the pod from the cluster, not by a binding event. + sched.deletePodFromSchedulingQueue(pod, false) + } + return + case cache.DeletedFinalStateUnknown: + var ok bool + pod, ok = t.Obj.(*v1.Pod) + if !ok { + utilruntime.HandleErrorWithLogger(logger, nil, "Cannot convert to *v1.Pod", "obj", t.Obj) + return + } + // The carried object may be stale, so we don't use it to check if + // it's assigned or not. Attempting to cleanup anyways. + sched.deleteAssignedPodFromCache(pod) + if responsibleForPod(pod, sched.Profiles) { + // Passing "false" means that removal from the scheduling queue is caused by + // removal of the pod from the cluster, not by a binding event. + sched.deletePodFromSchedulingQueue(pod, false) + } + return + default: + utilruntime.HandleErrorWithLogger(logger, nil, "Unable to handle object", "objType", fmt.Sprintf("%T", obj), "obj", obj) + return + } +} + +func (sched *Scheduler) addPodToSchedulingQueue(pod *v1.Pod) { start := time.Now() defer metrics.EventHandlingLatency.WithLabelValues(framework.EventUnscheduledPodAdd.Label()).Observe(metrics.SinceInSeconds(start)) logger := sched.logger - pod := obj.(*v1.Pod) logger.V(3).Info("Add event for unscheduled pod", "pod", klog.KObj(pod)) sched.SchedulingQueue.Add(logger, pod) } @@ -150,10 +231,9 @@ func (sched *Scheduler) syncPodWithDispatcher(pod *v1.Pod) *v1.Pod { return enrichedPod } -func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) { +func (sched *Scheduler) updatePodInSchedulingQueue(oldPod, newPod *v1.Pod) { start := time.Now() logger := sched.logger - oldPod, newPod := oldObj.(*v1.Pod), newObj.(*v1.Pod) // Bypass update event that carries identical objects; otherwise, a duplicated // Pod may go through scheduling and cause unexpected behavior (see #96071). if oldPod.ResourceVersion == newPod.ResourceVersion { @@ -195,29 +275,21 @@ func hasNominatedNodeNameChanged(oldPod, newPod *v1.Pod) bool { return len(oldPod.Status.NominatedNodeName) > 0 && oldPod.Status.NominatedNodeName != newPod.Status.NominatedNodeName } -func (sched *Scheduler) deletePodFromSchedulingQueue(obj interface{}) { +func (sched *Scheduler) deletePodFromSchedulingQueue(pod *v1.Pod, inBinding bool) { start := time.Now() defer metrics.EventHandlingLatency.WithLabelValues(framework.EventUnscheduledPodDelete.Label()).Observe(metrics.SinceInSeconds(start)) logger := sched.logger - var pod *v1.Pod - switch t := obj.(type) { - case *v1.Pod: - pod = obj.(*v1.Pod) - case cache.DeletedFinalStateUnknown: - var ok bool - pod, ok = t.Obj.(*v1.Pod) - if !ok { - utilruntime.HandleErrorWithLogger(logger, nil, "Cannot convert to *v1.Pod", "obj", t.Obj) - return - } - default: - utilruntime.HandleErrorWithLogger(logger, nil, "Unable to handle object", "objType", fmt.Sprintf("%T", obj), "obj", obj) - return - } logger.V(3).Info("Delete event for unscheduled pod", "pod", klog.KObj(pod)) sched.SchedulingQueue.Delete(pod) + if inBinding { + // In the case of a binding, the rest can be skipped because it is not really a pod removal operation, but a binding. + // Any necessary notifications will be sent by the binding process, unless it was an unlikely external binding. + // In that case, we need to notify about the release of resources that were held by different assume/nomination + // once the https://github.com/kubernetes/kubernetes/issues/134859 is fixed. + return + } fwk, err := sched.frameworkForPod(pod) if err != nil { // This shouldn't happen, because we only accept for scheduling the pods @@ -246,16 +318,11 @@ func getLEPriorityPreCheck(priority int32) queue.PreEnqueueCheck { } } -func (sched *Scheduler) addPodToCache(obj interface{}) { +func (sched *Scheduler) addAssignedPodToCache(pod *v1.Pod) { start := time.Now() defer metrics.EventHandlingLatency.WithLabelValues(framework.EventAssignedPodAdd.Label()).Observe(metrics.SinceInSeconds(start)) logger := sched.logger - pod, ok := obj.(*v1.Pod) - if !ok { - utilruntime.HandleErrorWithLogger(logger, nil, "Cannot convert to *v1.Pod", "obj", obj) - return - } logger.V(3).Info("Add event for scheduled pod", "pod", klog.KObj(pod)) if err := sched.Cache.AddPod(logger, pod); err != nil { @@ -278,21 +345,11 @@ func (sched *Scheduler) addPodToCache(obj interface{}) { } } -func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) { +func (sched *Scheduler) updateAssignedPodInCache(oldPod, newPod *v1.Pod) { start := time.Now() defer metrics.EventHandlingLatency.WithLabelValues(framework.EventAssignedPodUpdate.Label()).Observe(metrics.SinceInSeconds(start)) logger := sched.logger - oldPod, ok := oldObj.(*v1.Pod) - if !ok { - utilruntime.HandleErrorWithLogger(logger, nil, "Cannot convert oldObj to *v1.Pod", "oldObj", oldObj) - return - } - newPod, ok := newObj.(*v1.Pod) - if !ok { - utilruntime.HandleErrorWithLogger(logger, nil, "Cannot convert newObj to *v1.Pod", "newObj", newObj) - return - } if sched.APIDispatcher != nil { // If the API dispatcher is available, sync the new pod with the details. @@ -331,26 +388,11 @@ func (sched *Scheduler) updatePodInCache(oldObj, newObj interface{}) { } } -func (sched *Scheduler) deletePodFromCache(obj interface{}) { +func (sched *Scheduler) deleteAssignedPodFromCache(pod *v1.Pod) { start := time.Now() defer metrics.EventHandlingLatency.WithLabelValues(framework.EventAssignedPodDelete.Label()).Observe(metrics.SinceInSeconds(start)) logger := sched.logger - var pod *v1.Pod - switch t := obj.(type) { - case *v1.Pod: - pod = t - case cache.DeletedFinalStateUnknown: - var ok bool - pod, ok = t.Obj.(*v1.Pod) - if !ok { - utilruntime.HandleErrorWithLogger(logger, nil, "Cannot convert to *v1.Pod", "obj", t.Obj) - return - } - default: - utilruntime.HandleErrorWithLogger(logger, nil, "Unable to handle object", "objType", fmt.Sprintf("%T", obj), "obj", obj) - return - } logger.V(3).Info("Delete event for scheduled pod", "pod", klog.KObj(pod)) if err := sched.Cache.RemovePod(logger, pod); err != nil { @@ -405,64 +447,12 @@ func addAllEventHandlers( ) logger := sched.logger - // scheduled pod cache - if handlerRegistration, err = informerFactory.Core().V1().Pods().Informer().AddEventHandler( - cache.FilteringResourceEventHandler{ - FilterFunc: func(obj interface{}) bool { - switch t := obj.(type) { - case *v1.Pod: - return assignedPod(t) - case cache.DeletedFinalStateUnknown: - if _, ok := t.Obj.(*v1.Pod); ok { - // The carried object may be stale, so we don't use it to check if - // it's assigned or not. Attempting to cleanup anyways. - return true - } - utilruntime.HandleErrorWithLogger(logger, nil, "Cannot convert to *v1.Pod", "obj", t.Obj) - return false - default: - utilruntime.HandleErrorWithLogger(logger, nil, "Unable to handle object", "objType", fmt.Sprintf("%T", obj), "obj", obj) - return false - } - }, - Handler: cache.ResourceEventHandlerFuncs{ - AddFunc: sched.addPodToCache, - UpdateFunc: sched.updatePodInCache, - DeleteFunc: sched.deletePodFromCache, - }, - }, - ); err != nil { - return err - } - handlers = append(handlers, handlerRegistration) - // unscheduled pod queue - if handlerRegistration, err = informerFactory.Core().V1().Pods().Informer().AddEventHandler( - cache.FilteringResourceEventHandler{ - FilterFunc: func(obj interface{}) bool { - switch t := obj.(type) { - case *v1.Pod: - return !assignedPod(t) && responsibleForPod(t, sched.Profiles) - case cache.DeletedFinalStateUnknown: - if pod, ok := t.Obj.(*v1.Pod); ok { - // The carried object may be stale, so we don't use it to check if - // it's assigned or not. - return responsibleForPod(pod, sched.Profiles) - } - utilruntime.HandleErrorWithLogger(logger, nil, "Cannot convert to *v1.Pod", "obj", t.Obj) - return false - default: - utilruntime.HandleErrorWithLogger(logger, nil, "Unable to handle object", "objType", fmt.Sprintf("%T", obj), "obj", obj) - return false - } - }, - Handler: cache.ResourceEventHandlerFuncs{ - AddFunc: sched.addPodToSchedulingQueue, - UpdateFunc: sched.updatePodInSchedulingQueue, - DeleteFunc: sched.deletePodFromSchedulingQueue, - }, - }, - ); err != nil { + if handlerRegistration, err = informerFactory.Core().V1().Pods().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: sched.addPod, + UpdateFunc: sched.updatePod, + DeleteFunc: sched.deletePod, + }); err != nil { return err } handlers = append(handlers, handlerRegistration) diff --git a/pkg/scheduler/eventhandlers_test.go b/pkg/scheduler/eventhandlers_test.go index 7a395b6159e..0204df53eb9 100644 --- a/pkg/scheduler/eventhandlers_test.go +++ b/pkg/scheduler/eventhandlers_test.go @@ -41,6 +41,7 @@ import ( dyfake "k8s.io/client-go/dynamic/fake" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" featuregatetesting "k8s.io/component-base/featuregate/testing" resourceslicetracker "k8s.io/dynamic-resource-allocation/resourceslice/tracker" "k8s.io/klog/v2" @@ -52,13 +53,17 @@ import ( internalqueue "k8s.io/kubernetes/pkg/scheduler/backend/queue" "k8s.io/kubernetes/pkg/scheduler/framework" apicalls "k8s.io/kubernetes/pkg/scheduler/framework/api_calls" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" + frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" "k8s.io/kubernetes/pkg/scheduler/metrics" + "k8s.io/kubernetes/pkg/scheduler/profile" st "k8s.io/kubernetes/pkg/scheduler/testing" + tf "k8s.io/kubernetes/pkg/scheduler/testing/framework" "k8s.io/kubernetes/pkg/scheduler/util" "k8s.io/kubernetes/pkg/scheduler/util/assumecache" ) @@ -127,7 +132,7 @@ func TestEventHandlers_MoveToActiveOnNominatedNodeUpdate(t *testing.T) { { name: "Removal of a pod that had nominated node name should trigger rescheduling of lower priority pods", updateFunc: func(s *Scheduler) { - s.deletePodFromSchedulingQueue(medNominatedPriorityPod) + s.deletePodFromSchedulingQueue(medNominatedPriorityPod, false) }, wantInActiveOrBackoff: sets.New(lowPriorityPod.Name, medPriorityPod.Name), }, @@ -222,24 +227,24 @@ func newDefaultQueueSort() fwk.LessFunc { return sort.Less } -func TestUpdatePodInCache(t *testing.T) { +func TestUpdateAssignedPodInCache(t *testing.T) { ttl := 10 * time.Second nodeName := "node" tests := []struct { name string - oldObj interface{} - newObj interface{} + oldPod *v1.Pod + newPod *v1.Pod }{ { name: "pod updated with the same UID", - oldObj: withPodName(podWithPort("oldUID", nodeName, 80), "pod"), - newObj: withPodName(podWithPort("oldUID", nodeName, 8080), "pod"), + oldPod: withPodName(podWithPort("oldUID", nodeName, 80), "pod"), + newPod: withPodName(podWithPort("oldUID", nodeName, 8080), "pod"), }, { name: "pod updated with different UIDs", - oldObj: withPodName(podWithPort("oldUID", nodeName, 80), "pod"), - newObj: withPodName(podWithPort("newUID", nodeName, 8080), "pod"), + oldPod: withPodName(podWithPort("oldUID", nodeName, 80), "pod"), + newPod: withPodName(podWithPort("newUID", nodeName, 8080), "pod"), }, } for _, tt := range tests { @@ -252,20 +257,20 @@ func TestUpdatePodInCache(t *testing.T) { SchedulingQueue: internalqueue.NewTestQueue(ctx, nil), logger: logger, } - sched.addPodToCache(tt.oldObj) - sched.updatePodInCache(tt.oldObj, tt.newObj) + sched.addAssignedPodToCache(tt.oldPod) + sched.updateAssignedPodInCache(tt.oldPod, tt.newPod) - if tt.oldObj.(*v1.Pod).UID != tt.newObj.(*v1.Pod).UID { - if pod, err := sched.Cache.GetPod(tt.oldObj.(*v1.Pod)); err == nil { + if tt.oldPod.UID != tt.newPod.UID { + if pod, err := sched.Cache.GetPod(tt.oldPod); err == nil { t.Errorf("Get pod UID %v from cache but it should not happen", pod.UID) } } - pod, err := sched.Cache.GetPod(tt.newObj.(*v1.Pod)) + pod, err := sched.Cache.GetPod(tt.newPod) if err != nil { t.Errorf("Failed to get pod from scheduler: %v", err) } - if pod.UID != tt.newObj.(*v1.Pod).UID { - t.Errorf("Want pod UID %v, got %v", tt.newObj.(*v1.Pod).UID, pod.UID) + if pod.UID != tt.newPod.UID { + t.Errorf("Want pod UID %v, got %v", tt.newPod.UID, pod.UID) } }) } @@ -674,3 +679,303 @@ func TestAdmissionCheck(t *testing.T) { }) } } + +func TestAddPod(t *testing.T) { + tests := []struct { + name string + pod *v1.Pod + expectInQueue bool + expectInCache bool + }{ + { + name: "add unscheduled pod", + pod: st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").SchedulerName("supported-scheduler").Obj(), + expectInQueue: true, + }, + { + name: "add unscheduled pod with other scheduler name", + pod: st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").SchedulerName("other-scheduler").Obj(), + }, + { + name: "add scheduled pod", + pod: st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").Node("node1").Obj(), + expectInCache: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + sched := &Scheduler{ + Cache: internalcache.New(ctx, 0, nil), + SchedulingQueue: internalqueue.NewTestQueue(ctx, nil), + logger: logger, + Profiles: profile.Map{ + "supported-scheduler": nil, + }, + } + + sched.addPod(tt.pod) + + _, ok := sched.SchedulingQueue.GetPod(tt.pod.Name, tt.pod.Namespace) + if tt.expectInQueue && !ok { + t.Errorf("Expected pod to be in scheduling queue") + } else if !tt.expectInQueue && ok { + t.Errorf("Expected pod not to be in scheduling queue") + } + _, err := sched.Cache.GetPod(tt.pod) + if tt.expectInCache && err != nil { + t.Errorf("Expected pod to be in cache: %v", err) + } else if !tt.expectInCache && err == nil { + t.Errorf("Expected pod not to be in cache") + } + }) + } +} + +func TestUpdatePod(t *testing.T) { + pod := st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").SchedulerName("supported-scheduler").Obj() + updatedPod := st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").Labels(map[string]string{"foo": "bar"}).ResourceVersion("2").SchedulerName("supported-scheduler").Obj() + + otherPod := st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").SchedulerName("other-scheduler").Obj() + updatedOtherPod := st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").Labels(map[string]string{"foo": "bar"}).ResourceVersion("2").SchedulerName("other-scheduler").Obj() + + scheduledPod := st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").Node("node1").SchedulerName("supported-scheduler").Obj() + updatedScheduledPod := st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").Labels(map[string]string{"foo": "bar"}).ResourceVersion("2").Node("node1").SchedulerName("supported-scheduler").Obj() + + otherScheduledPod := st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").Node("node1").SchedulerName("other-scheduler").Obj() + updatedOtherScheduledPod := st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").Labels(map[string]string{"foo": "bar"}).ResourceVersion("2").Node("node1").SchedulerName("other-scheduler").Obj() + + scheduledPodOtherNode := st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").Node("node2").SchedulerName("supported-scheduler").Obj() + + tests := []struct { + name string + oldPod *v1.Pod + assumedPod *v1.Pod + newPod *v1.Pod + expectInQueue *v1.Pod + expectInCache *v1.Pod + }{ + { + name: "update unscheduled pod", + oldPod: pod, + newPod: updatedPod, + expectInQueue: updatedPod, + }, + { + name: "update unscheduled pod with other scheduler name", + oldPod: otherPod, + newPod: updatedOtherPod, + }, + { + name: "update assumed pod", + oldPod: pod, + assumedPod: scheduledPod, + newPod: updatedPod, + expectInCache: scheduledPod, + }, + { + name: "update scheduled pod", + oldPod: scheduledPod, + newPod: updatedScheduledPod, + expectInCache: updatedScheduledPod, + }, + { + name: "update scheduled pod with other scheduler name", + oldPod: otherScheduledPod, + newPod: updatedOtherScheduledPod, + expectInCache: updatedOtherScheduledPod, + }, + { + name: "bind unscheduled pod", + oldPod: pod, + newPod: scheduledPod, + expectInCache: scheduledPod, + }, + { + name: "bind unscheduled pod with other scheduler name", + oldPod: pod, + newPod: scheduledPod, + expectInCache: scheduledPod, + }, + { + name: "bind assumed pod", + oldPod: pod, + assumedPod: scheduledPod, + newPod: scheduledPod, + expectInCache: scheduledPod, + }, + { + name: "bind assumed pod to a different node", + oldPod: pod, + assumedPod: scheduledPod, + newPod: scheduledPodOtherNode, + expectInCache: scheduledPodOtherNode, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + registerPluginFuncs := []tf.RegisterPluginFunc{ + tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + } + waitingPods := frameworkruntime.NewWaitingPodsMap() + schedFramework, err := tf.NewFramework( + ctx, + registerPluginFuncs, + "supported-scheduler", + frameworkruntime.WithWaitingPods(waitingPods), + ) + if err != nil { + t.Fatalf("Failed to create framework: %v", err) + } + sched := &Scheduler{ + Cache: internalcache.New(ctx, 0, nil), + SchedulingQueue: internalqueue.NewTestQueue(ctx, nil), + logger: logger, + Profiles: profile.Map{ + "supported-scheduler": schedFramework, + }, + } + + if tt.assumedPod != nil { + err := sched.Cache.AssumePod(logger, tt.assumedPod) + if err != nil { + t.Fatalf("Failed to assume pod: %v", err) + } + } else { + sched.addPod(tt.oldPod) + } + + sched.updatePod(tt.oldPod, tt.newPod) + + qPod, ok := sched.SchedulingQueue.GetPod(tt.newPod.Name, tt.newPod.Namespace) + if tt.expectInQueue != nil { + if !ok { + t.Errorf("Expected pod to be in scheduling queue") + } else if diff := cmp.Diff(tt.expectInQueue, qPod.Pod); diff != "" { + t.Errorf("Unexpected pod after update (-want,+got):\n%s", diff) + } + } else if ok { + t.Errorf("Expected pod not to be in scheduling queue") + } + pod, err := sched.Cache.GetPod(tt.newPod) + if tt.expectInCache != nil { + if err != nil { + t.Errorf("Expected pod to be in cache: %v", err) + } else if diff := cmp.Diff(tt.expectInCache, pod); diff != "" { + t.Errorf("Unexpected pod after update (-want,+got):\n%s", diff) + } + } else if err == nil { + t.Errorf("Expected pod not to be in cache") + } + }) + } +} + +func TestDeletePod(t *testing.T) { + pod := st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").SchedulerName("supported-scheduler").Obj() + otherPod := st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").SchedulerName("other-scheduler").Obj() + scheduledPod := st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").Node("node1").SchedulerName("supported-scheduler").Obj() + otherScheduledPod := st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").Node("node1").SchedulerName("other-scheduler").Obj() + + tests := []struct { + name string + initialPod *v1.Pod + assumed bool + waitingOnPermit bool + podToDelete any + }{ + { + name: "delete unscheduled pod", + initialPod: pod, + podToDelete: pod, + }, + { + name: "delete unscheduled pod with other scheduler name", + initialPod: otherPod, + podToDelete: otherPod, + }, + { + name: "delete unscheduled pod with unknown state", + initialPod: pod, + podToDelete: cache.DeletedFinalStateUnknown{Obj: pod}, + }, + { + name: "delete scheduled pod", + initialPod: scheduledPod, + podToDelete: scheduledPod, + }, + { + name: "delete scheduled pod with other scheduler name", + initialPod: otherScheduledPod, + podToDelete: otherScheduledPod, + }, + { + name: "delete scheduled pod with unknown state", + initialPod: scheduledPod, + podToDelete: cache.DeletedFinalStateUnknown{Obj: scheduledPod}, + }, + { + name: "delete scheduled pod with unknown older state", + initialPod: scheduledPod, + podToDelete: cache.DeletedFinalStateUnknown{Obj: pod}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + registerPluginFuncs := []tf.RegisterPluginFunc{ + tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + } + waitingPods := frameworkruntime.NewWaitingPodsMap() + schedFramework, err := tf.NewFramework( + ctx, + registerPluginFuncs, + "supported-scheduler", + frameworkruntime.WithWaitingPods(waitingPods), + ) + if err != nil { + t.Fatalf("Failed to create framework: %v", err) + } + sched := &Scheduler{ + Cache: internalcache.New(ctx, 0, nil), + SchedulingQueue: internalqueue.NewTestQueue(ctx, nil), + logger: logger, + Profiles: profile.Map{ + "supported-scheduler": schedFramework, + }, + } + + if tt.assumed { + err := sched.Cache.AssumePod(logger, tt.initialPod) + if err != nil { + t.Fatalf("Failed to assume pod: %v", err) + } + } else { + sched.addPod(tt.initialPod) + } + + sched.deletePod(tt.podToDelete) + + _, err = sched.Cache.GetPod(tt.initialPod) + if err == nil { + t.Errorf("Unexpected pod in cache after removal") + } + _, ok := sched.SchedulingQueue.GetPod(tt.initialPod.Name, tt.initialPod.Namespace) + if ok { + t.Errorf("Unexpected pod in scheduling queue after removal") + } + }) + } +} diff --git a/pkg/scheduler/schedule_one_test.go b/pkg/scheduler/schedule_one_test.go index a039aaa51b3..ce4de2043a6 100644 --- a/pkg/scheduler/schedule_one_test.go +++ b/pkg/scheduler/schedule_one_test.go @@ -2097,7 +2097,7 @@ func TestSchedulerBinding(t *testing.T) { } } -func TestUpdatePod(t *testing.T) { +func TestUpdatePodStatus(t *testing.T) { tests := []struct { name string currentPodConditions []v1.PodCondition diff --git a/pkg/scheduler/testing/wrappers.go b/pkg/scheduler/testing/wrappers.go index 5a67d6a507a..298ea4539d1 100644 --- a/pkg/scheduler/testing/wrappers.go +++ b/pkg/scheduler/testing/wrappers.go @@ -562,6 +562,12 @@ func (p *PodWrapper) SchedulingGates(gates []string) *PodWrapper { return p } +// ResourceVersion sets the inner pod's ResurceVersion. +func (p *PodWrapper) ResourceVersion(version string) *PodWrapper { + p.ObjectMeta.ResourceVersion = version + return p +} + // PodAffinityKind represents different kinds of PodAffinity. type PodAffinityKind int From b29fdd1551fd044912a65c313d62dabefebcdd83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Skocze=C5=84?= Date: Wed, 29 Oct 2025 10:00:38 +0000 Subject: [PATCH 2/2] Forget pod from scheduler's cache immediately when it's deleted or has DeletionTimestamp set --- pkg/scheduler/backend/cache/cache.go | 9 +- pkg/scheduler/backend/cache/cache_test.go | 6 +- pkg/scheduler/eventhandlers.go | 70 +++- pkg/scheduler/eventhandlers_test.go | 14 + pkg/scheduler/testing/wrappers.go | 6 + .../scheduler/preemption/preemption_test.go | 316 +++++++++++++++++- 6 files changed, 391 insertions(+), 30 deletions(-) diff --git a/pkg/scheduler/backend/cache/cache.go b/pkg/scheduler/backend/cache/cache.go index 4908be60df7..a55f8440d67 100644 --- a/pkg/scheduler/backend/cache/cache.go +++ b/pkg/scheduler/backend/cache/cache.go @@ -419,12 +419,15 @@ func (cache *cacheImpl) ForgetPod(logger klog.Logger, pod *v1.Pod) error { defer cache.mu.Unlock() currState, ok := cache.podStates[key] - if ok && currState.pod.Spec.NodeName != pod.Spec.NodeName { + if !ok { + // Pod does not exist in the cache anymore. + return nil + } + if currState.pod.Spec.NodeName != pod.Spec.NodeName { return fmt.Errorf("pod %v(%v) was assumed on %v but assigned to %v", key, klog.KObj(pod), pod.Spec.NodeName, currState.pod.Spec.NodeName) } - // Only assumed pod can be forgotten. - if ok && cache.assumedPods.Has(key) { + if cache.assumedPods.Has(key) { return cache.removePod(logger, pod) } return fmt.Errorf("pod %v(%v) wasn't assumed so cannot be forgotten", key, klog.KObj(pod)) diff --git a/pkg/scheduler/backend/cache/cache_test.go b/pkg/scheduler/backend/cache/cache_test.go index c86df82864c..925ee9c7594 100644 --- a/pkg/scheduler/backend/cache/cache_test.go +++ b/pkg/scheduler/backend/cache/cache_test.go @@ -1043,9 +1043,9 @@ func TestForgetPod(t *testing.T) { if err := isForgottenFromCache(pod, cache); err != nil { t.Errorf("pod %q: %v", pod.Name, err) } - // trying to forget a pod already forgotten should return an error - if err := cache.ForgetPod(logger, pod); err == nil { - t.Error("expected error, no error found") + // trying to forget a pod already forgotten should return nil + if err := cache.ForgetPod(logger, pod); err != nil { + t.Error("expected no error, error found") } } } diff --git a/pkg/scheduler/eventhandlers.go b/pkg/scheduler/eventhandlers.go index cfe5e0179a7..b09321cc933 100644 --- a/pkg/scheduler/eventhandlers.go +++ b/pkg/scheduler/eventhandlers.go @@ -231,6 +231,49 @@ func (sched *Scheduler) syncPodWithDispatcher(pod *v1.Pod) *v1.Pod { return enrichedPod } +// handleAssumedPodDeletion is an event handler that deals with the deletion of an assumed pod. +// We must remove it from the scheduler's cache immediately to prevent it from blocking resources for other pending pods, +// causing unnecessary preemption attempts. Note that PreBinding/Binding will continue, but is eventually expected to fail +// as the pod does not exist in the kube-apiserver anymore and so in the scheduler cache. +func (sched *Scheduler) handleAssumedPodDeletion(pod *v1.Pod) { + logger := sched.logger + // We must operate on the pod from the scheduler's cache, not the one from the event. + // The cached version has the assigned NodeName and represents the resources being consumed. + assumedPod, err := sched.Cache.GetPod(pod) + if err != nil { + // This is not an error. The pod may have already completed its binding cycle and been + // removed from the cache. Nothing more to do. + logger.V(5).Info("Assumed pod was already forgotten", "pod", klog.KObj(pod)) + return + } + pod = assumedPod + + fwk, err := sched.frameworkForPod(pod) + if err != nil { + // This shouldn't happen, because we only accept for scheduling the pods + // which specify a scheduler name that matches one of the profiles. + utilruntime.HandleErrorWithLogger(logger, err, "Unable to get profile for pod", "pod", klog.KObj(pod)) + return + } + + // The pod might be in one of two states: + // 1. If the pod is waiting on WaitOnPermit, we reject it. This causes the pod's scheduling + // cycle to quickly fail gracefully, and it will clean itself up via `handleBindingCycleError`. + if !fwk.RejectWaitingPod(pod.UID) { + // 2. If the pod is no longer waiting (e.g., it's in PreBind or Bind), we can't quickly reject it. + // We must explicitly remove it from the cache here to free up its assumed resources. + if err := sched.Cache.ForgetPod(logger, pod); err != nil { + utilruntime.HandleErrorWithLogger(logger, err, "Scheduler cache ForgetPod failed", "pod", klog.KObj(pod)) + } + } + + // The removal of this assumed pod may have freed up resources. We trigger the AssignedPodDelete event + // to move other unscheduled pods, giving them a chance to be scheduled. + // If the forgotten pod reserved some resources in memory, + // it will wake up the pods again after freeing up the resources in `handleBindingCycleError`. + sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.EventAssignedPodDelete, pod, nil, nil) +} + func (sched *Scheduler) updatePodInSchedulingQueue(oldPod, newPod *v1.Pod) { start := time.Now() logger := sched.logger @@ -258,6 +301,11 @@ func (sched *Scheduler) updatePodInSchedulingQueue(oldPod, newPod *v1.Pod) { utilruntime.HandleErrorWithLogger(logger, err, "Failed to check whether pod is assumed", "pod", klog.KObj(newPod)) } if isAssumed { + if newPod.DeletionTimestamp != nil && oldPod.DeletionTimestamp == nil { + // Assumed pod deletion has started. We should handle that differently, + // because we can't update such pod in any structure directly. + sched.handleAssumedPodDeletion(newPod) + } return } @@ -290,22 +338,18 @@ func (sched *Scheduler) deletePodFromSchedulingQueue(pod *v1.Pod, inBinding bool // once the https://github.com/kubernetes/kubernetes/issues/134859 is fixed. return } - fwk, err := sched.frameworkForPod(pod) + isAssumed, err := sched.Cache.IsAssumedPod(pod) if err != nil { - // This shouldn't happen, because we only accept for scheduling the pods - // which specify a scheduler name that matches one of the profiles. - utilruntime.HandleErrorWithLogger(logger, err, "Unable to get profile", "pod", klog.KObj(pod)) - return + utilruntime.HandleErrorWithLogger(logger, err, "Failed to check whether pod is assumed", "pod", klog.KObj(pod)) } - // If a waiting pod is rejected, it indicates it's previously assumed and we're - // removing it from the scheduler cache. In this case, signal a AssignedPodDelete - // event to immediately retry some unscheduled Pods. - // Similarly when a pod that had nominated node is deleted, it can unblock scheduling of other pods, - // because the lower or equal priority pods treat such a pod as if it was assigned. - if fwk.RejectWaitingPod(pod.UID) { - sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.EventAssignedPodDelete, pod, nil, nil) + if isAssumed { + // Assumed pod is deleted. We should handle that differently, + // because we can't delete such pod from any structure directly. + sched.handleAssumedPodDeletion(pod) } else if pod.Status.NominatedNodeName != "" { - // Note that a nominated pod can fall into `RejectWaitingPod` case as well, + // When a pod that had nominated node is deleted, it can unblock scheduling of other pods, + // because the lower or equal priority pods treat such a pod as if it was assigned. + // Note that a nominated pod can fall into `handleAssumedPodDeletion` case as well, // but in that case the `MoveAllToActiveOrBackoffQueue` already covered lower priority pods. sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.EventAssignedPodDelete, pod, nil, getLEPriorityPreCheck(corev1helpers.PodPriority(pod))) } diff --git a/pkg/scheduler/eventhandlers_test.go b/pkg/scheduler/eventhandlers_test.go index 0204df53eb9..d6a2fc98b44 100644 --- a/pkg/scheduler/eventhandlers_test.go +++ b/pkg/scheduler/eventhandlers_test.go @@ -739,6 +739,8 @@ func TestUpdatePod(t *testing.T) { pod := st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").SchedulerName("supported-scheduler").Obj() updatedPod := st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").Labels(map[string]string{"foo": "bar"}).ResourceVersion("2").SchedulerName("supported-scheduler").Obj() + podWithDeletionTimestamp := st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").Terminating().ResourceVersion("2").SchedulerName("supported-scheduler").Obj() + otherPod := st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").SchedulerName("other-scheduler").Obj() updatedOtherPod := st.MakePod().Name("pod1").Namespace("ns1").UID("pod1").Labels(map[string]string{"foo": "bar"}).ResourceVersion("2").SchedulerName("other-scheduler").Obj() @@ -814,6 +816,12 @@ func TestUpdatePod(t *testing.T) { newPod: scheduledPodOtherNode, expectInCache: scheduledPodOtherNode, }, + { + name: "delete assumed pod with deletion timestamp", + oldPod: pod, + assumedPod: scheduledPod, + newPod: podWithDeletionTimestamp, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -907,6 +915,12 @@ func TestDeletePod(t *testing.T) { initialPod: pod, podToDelete: cache.DeletedFinalStateUnknown{Obj: pod}, }, + { + name: "delete assumed pod", + initialPod: scheduledPod, + assumed: true, + podToDelete: pod, + }, { name: "delete scheduled pod", initialPod: scheduledPod, diff --git a/pkg/scheduler/testing/wrappers.go b/pkg/scheduler/testing/wrappers.go index 298ea4539d1..4ef7265d8b2 100644 --- a/pkg/scheduler/testing/wrappers.go +++ b/pkg/scheduler/testing/wrappers.go @@ -417,6 +417,12 @@ func (p *PodWrapper) ZeroTerminationGracePeriod() *PodWrapper { return p } +// TerminationGracePeriodSeconds sets the TerminationGracePeriodSeconds of the inner pod. +func (p *PodWrapper) TerminationGracePeriodSeconds(s int64) *PodWrapper { + p.Spec.TerminationGracePeriodSeconds = &s + return p +} + // Node sets `s` as the nodeName of the inner pod. func (p *PodWrapper) Node(s string) *PodWrapper { p.Spec.NodeName = s diff --git a/test/integration/scheduler/preemption/preemption_test.go b/test/integration/scheduler/preemption/preemption_test.go index e9c8b5da76b..0e53b9d07af 100644 --- a/test/integration/scheduler/preemption/preemption_test.go +++ b/test/integration/scheduler/preemption/preemption_test.go @@ -484,6 +484,7 @@ func TestPreemption(t *testing.T) { func TestAsyncPreemption(t *testing.T) { const podBlockedInBindingName = "pod-blocked-in-binding" + const reservingPodName = "reserving-pod" type createPod struct { pod *v1.Pod @@ -811,11 +812,11 @@ func TestAsyncPreemption(t *testing.T) { }, }, { - // This scenario verifies the fix for https://github.com/kubernetes/kubernetes/issues/134249 + // This scenario verifies the fix for https://github.com/kubernetes/kubernetes/issues/134217 // Scenario reproduces the issue: - // Victim pod takes long in binding. Preemptor pod attempts preemption, goes to unschedulable, then gets activated by some unknown trigger. - // Preemptor pod is expected to go back to unschedulable queue and remain there until victim binding and preemption is completed. - name: "victim blocked in binding, preemptor pod gets activated randomly and returns to unschedulable queue until victim is bound and deleted", + // Victim pod takes long in binding. Preemptor pod attempts preemption, goes to unschedulable, then the victim is deleted. + // Preemptor pod is woken up by the Pod/Delete event and is being scheduled, even before the victim binding is terminated. + name: "victim blocked in binding, preemptor pod gets scheduled after victim-in-binding is deleted", scenarios: []scenario{ { name: "create victim Pod that is going to be blocked in binding", @@ -847,11 +848,45 @@ func TestAsyncPreemption(t *testing.T) { completePreemption: "preemptor", }, { - name: "activate preemptor Pod, simulating a random event that activated it", - activatePod: "preemptor", + name: "schedule the preemptor Pod again and expect it to be scheduled (assumed victim pod was forgotten)", + schedulePod: &schedulePod{ + podName: "preemptor", + expectSuccess: true, + }, }, { - name: "schedule the preemptor Pod again and expect it to end up in unschedulable (waiting for preemption to finish)", + name: "resume binding of the blocked pod", + resumeBind: true, + }, + }, + }, + { + // This scenario verifies the fix for https://github.com/kubernetes/kubernetes/issues/134217 + // Scenario reproduces the issue, but with a victim that is under graceful termination: + // Victim pod takes long in binding. Preemptor pod attempts preemption, goes to unschedulable, then the victim's graceful termination is initiated. + // Preemptor pod is woken up by the Pod/Update event (working like AssignedPodDeleted) and is being scheduled, even before the victim binding is terminated. + name: "victim blocked in binding, preemptor pod gets scheduled when victim-in-binding is under graceful termination", + scenarios: []scenario{ + { + name: "create victim Pod with long termination grace period that is going to be blocked in binding", + createPod: &createPod{ + pod: st.MakePod().Name(podBlockedInBindingName).Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).TerminationGracePeriodSeconds(1000).Container("image").Priority(1).Obj(), + }, + }, + { + name: "schedule victim Pod", + schedulePod: &schedulePod{ + podName: podBlockedInBindingName, + }, + }, + { + name: "create a preemptor Pod", + createPod: &createPod{ + pod: st.MakePod().Name("preemptor").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Container("image").Priority(100).Obj(), + }, + }, + { + name: "schedule the preemptor Pod", schedulePod: &schedulePod{ podName: "preemptor", expectUnschedulable: true, @@ -862,15 +897,170 @@ func TestAsyncPreemption(t *testing.T) { completePreemption: "preemptor", }, { - name: "check that preemptor remained in unschedulable queue", - verifyPodInUnschedulable: "preemptor", + name: "schedule the preemptor Pod again and expect it to be scheduled (assumed victim pod was forgotten)", + schedulePod: &schedulePod{ + podName: "preemptor", + expectSuccess: true, + }, + }, + }, + }, + { + // This scenario verifies the fix for https://github.com/kubernetes/kubernetes/issues/134217 + // Scenario reproduces the issue, but with a victim that is under graceful termination: + // Victim pod takes long in binding. Preemptor pod attempts preemption, goes to unschedulable, then the victim's graceful termination is initiated. + // Preemptor pod is woken up by the Pod/Update event (working like AssignedPodDeleted) and is being scheduled, even before the victim binding is terminated. + name: "victim blocked in binding, preemptor pod gets scheduled when victim-in-binding is under graceful termination", + scenarios: []scenario{ + { + name: "create victim Pod with long termination grace period that is going to be blocked in binding", + createPod: &createPod{ + pod: st.MakePod().Name(podBlockedInBindingName).Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Container("image").TerminationGracePeriodSeconds(1000).Priority(1).Obj(), + }, + }, + { + name: "schedule victim Pod", + schedulePod: &schedulePod{ + podName: podBlockedInBindingName, + }, + }, + { + name: "create a preemptor Pod", + createPod: &createPod{ + pod: st.MakePod().Name("preemptor").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Container("image").Priority(100).Obj(), + }, + }, + { + name: "schedule the preemptor Pod", + schedulePod: &schedulePod{ + podName: "preemptor", + expectUnschedulable: true, + }, + }, + { + name: "complete the preemption API call", + completePreemption: "preemptor", + }, + { + name: "schedule the preemptor Pod again and expect it to be scheduled (assumed victim pod was forgotten)", + schedulePod: &schedulePod{ + podName: "preemptor", + expectSuccess: true, + }, + }, + { + name: "resume binding of the blocked pod", + resumeBind: true, + }, + }, + }, + { + // This scenario verifies the fix for https://github.com/kubernetes/kubernetes/issues/134217 + // Scenario reproduces the issue, but with a victim that is reserving some resources required by the preemptor: + // Victim pod takes long in binding. Preemptor pod attempts preemption, goes to unschedulable, then the victim is deleted. + // Preemptor pod is woken up by the Pod/Update event (working like AssignedPodDeleted), but is still unschedulable, because victim has to unreserve its resources. + // After resuming binding for a victim, it releases the resources in its failure handler, preemptor is woken up again and ultimately scheduled. + name: "victim blocked in binding, preemptor pod gets scheduled after victim-in-binding is deleted and its resources are unreserved", + scenarios: []scenario{ + { + name: "create victim Pod that is going to be blocked in binding", + createPod: &createPod{ + pod: st.MakePod().Name(podBlockedInBindingName + reservingPodName).Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Container("image").ZeroTerminationGracePeriod().Priority(1).Obj(), + }, + }, + { + name: "schedule victim Pod", + schedulePod: &schedulePod{ + podName: podBlockedInBindingName + reservingPodName, + }, + }, + { + name: "create a preemptor Pod", + createPod: &createPod{ + pod: st.MakePod().Name("preemptor").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Container("image").Priority(100).Obj(), + }, + }, + { + name: "schedule the preemptor Pod", + schedulePod: &schedulePod{ + podName: "preemptor", + expectUnschedulable: true, + }, + }, + { + name: "complete the preemption API call", + completePreemption: "preemptor", + }, + { + name: "schedule the preemptor Pod again and expect it to be unschedulable (resources are still reserved by the victim)", + schedulePod: &schedulePod{ + podName: "preemptor", + expectUnschedulable: true, + }, }, { name: "resume binding of the blocked pod", resumeBind: true, }, { - name: "schedule the preemptor Pod after the completed binding and preemption of the blocked pod", + name: "schedule the preemptor Pod again and expect it to be scheduled (victim pod unreserved its resources)", + schedulePod: &schedulePod{ + podName: "preemptor", + expectSuccess: true, + }, + }, + }, + }, + { + // This scenario verifies the fix for https://github.com/kubernetes/kubernetes/issues/134217 + // Scenario reproduces the issue, but with a victim that is under graceful termination and sis reserving some resources required by the preemptor: + // Victim pod takes long in binding. Preemptor pod attempts preemption, goes to unschedulable, then the victim's graceful termination is initiated. + // Preemptor pod is woken up by the Pod/Update event (working like AssignedPodDeleted), but is still unschedulable, because victim has to unreserve its resources. + // After resuming binding for a victim, it releases the resources in its failure handler, preemptor is woken up again and ultimately scheduled. + name: "victim blocked in binding, preemptor pod gets scheduled after victim-in-binding is under graceful termination and its resources are unreserved", + scenarios: []scenario{ + { + name: "create victim Pod that is going to be blocked in binding", + createPod: &createPod{ + pod: st.MakePod().Name(podBlockedInBindingName + reservingPodName).Req(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Container("image").TerminationGracePeriodSeconds(1000).Priority(1).Obj(), + }, + }, + { + name: "schedule victim Pod", + schedulePod: &schedulePod{ + podName: podBlockedInBindingName + reservingPodName, + }, + }, + { + name: "create a preemptor Pod", + createPod: &createPod{ + pod: st.MakePod().Name("preemptor").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Container("image").Priority(100).Obj(), + }, + }, + { + name: "schedule the preemptor Pod", + schedulePod: &schedulePod{ + podName: "preemptor", + expectUnschedulable: true, + }, + }, + { + name: "complete the preemption API call", + completePreemption: "preemptor", + }, + { + name: "schedule the preemptor Pod again and expect it to be unschedulable (resources are still reserved by the victim)", + schedulePod: &schedulePod{ + podName: "preemptor", + expectUnschedulable: true, + }, + }, + { + name: "resume binding of the blocked pod", + resumeBind: true, + }, + { + name: "schedule the preemptor Pod again and expect it to be scheduled (victim pod unreserved its resources)", schedulePod: &schedulePod{ podName: "preemptor", expectSuccess: true, @@ -882,7 +1072,7 @@ func TestAsyncPreemption(t *testing.T) { // All test cases have the same node. node := st.MakeNode().Name("node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Obj() - for _, asyncAPICallsEnabled := range []bool{true, false} { + for _, asyncAPICallsEnabled := range []bool{true} { for _, test := range tests { t.Run(fmt.Sprintf("%s (Async API calls enabled: %v)", test.name, asyncAPICallsEnabled), func(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerAsyncAPICalls, asyncAPICallsEnabled) @@ -956,6 +1146,19 @@ func TestAsyncPreemption(t *testing.T) { t.Fatalf("Error registering a bind plugin: %v", err) } + // Register fake plugin that will reserve some fake resources for one pod. + // This could be used to check scheduler's behavior when the victim has to unreserve these resources to let the preemptor schedule. + reservingPluginName := "reservingPlugin" + err = registry.Register(reservingPluginName, func(ctx context.Context, o runtime.Object, fh fwk.Handle) (fwk.Plugin, error) { + return &reservingPlugin{ + name: reservingPluginName, + nameOfPodToReserve: reservingPodName, + }, nil + }) + if err != nil { + t.Fatalf("Error registering a reserving plugin: %v", err) + } + cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{ Profiles: []configv1.KubeSchedulerProfile{{ SchedulerName: ptr.To(v1.DefaultSchedulerName), @@ -964,6 +1167,7 @@ func TestAsyncPreemption(t *testing.T) { Enabled: []configv1.Plugin{ {Name: blockingBindPluginName}, {Name: delayedPreemptionPluginName}, + {Name: reservingPluginName}, }, Disabled: []configv1.Plugin{ {Name: names.DefaultPreemption}, @@ -1176,3 +1380,93 @@ func (bp *blockingBindPlugin) Bind(ctx context.Context, state fwk.CycleState, p } var _ fwk.BindPlugin = &blockingBindPlugin{} + +// reservingPlugin is a fake plugin that reserves some resource in memory for nameOfPodToReserve pod. +// Other pods won't be scheduled, unless the resources are unreserved. +type reservingPlugin struct { + lock sync.Mutex + name string + nameOfPodToReserve string + reserved bool +} + +func (rp *reservingPlugin) Name() string { + return rp.name +} + +func (rp *reservingPlugin) EventsToRegister(_ context.Context) ([]fwk.ClusterEventWithHint, error) { + return []fwk.ClusterEventWithHint{ + // Plugin will wake up the pod on any Pod/Delete event. + {Event: fwk.ClusterEvent{Resource: fwk.Pod, ActionType: fwk.Delete}}, + }, nil +} + +const reservingPluginStateKey = "PreFilterReserving" + +type reservingPluginState struct { + reserved bool +} + +func (s reservingPluginState) Clone() fwk.StateData { + return reservingPluginState{ + reserved: s.reserved, + } +} + +func (rp *reservingPlugin) PreFilter(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodes []fwk.NodeInfo) (*fwk.PreFilterResult, *fwk.Status) { + rp.lock.Lock() + state.Write(reservingPluginStateKey, reservingPluginState{reserved: rp.reserved}) + rp.lock.Unlock() + return nil, nil +} + +func (rp *reservingPlugin) Filter(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeInfo fwk.NodeInfo) *fwk.Status { + s, err := state.Read(reservingPluginStateKey) + if err != nil { + return fwk.AsStatus(err) + } + if s.(reservingPluginState).reserved { + return fwk.NewStatus(fwk.Unschedulable, "resources are reserved") + } + return nil +} + +func (rp *reservingPlugin) Reserve(ctx context.Context, state fwk.CycleState, p *v1.Pod, nodeName string) *fwk.Status { + if strings.Contains(p.Name, rp.nameOfPodToReserve) { + rp.lock.Lock() + rp.reserved = true + rp.lock.Unlock() + } + return nil +} + +func (rp *reservingPlugin) Unreserve(ctx context.Context, state fwk.CycleState, p *v1.Pod, nodeName string) { + if strings.Contains(p.Name, rp.nameOfPodToReserve) { + rp.lock.Lock() + rp.reserved = false + rp.lock.Unlock() + } +} + +func (rp *reservingPlugin) PreFilterExtensions() fwk.PreFilterExtensions { + return rp +} + +func (rp *reservingPlugin) AddPod(ctx context.Context, state fwk.CycleState, podToSchedule *v1.Pod, podInfoToAdd fwk.PodInfo, nodeInfo fwk.NodeInfo) *fwk.Status { + if strings.Contains(podInfoToAdd.GetPod().Name, rp.nameOfPodToReserve) { + state.Write(reservingPluginStateKey, reservingPluginState{reserved: true}) + } + return nil +} + +func (rp *reservingPlugin) RemovePod(ctx context.Context, state fwk.CycleState, podToSchedule *v1.Pod, podInfoToRemove fwk.PodInfo, nodeInfo fwk.NodeInfo) *fwk.Status { + if strings.Contains(podInfoToRemove.GetPod().Name, rp.nameOfPodToReserve) { + state.Write(reservingPluginStateKey, reservingPluginState{reserved: false}) + } + return nil +} + +var _ fwk.PreFilterPlugin = &reservingPlugin{} +var _ fwk.FilterPlugin = &reservingPlugin{} +var _ fwk.PreFilterExtensions = &reservingPlugin{} +var _ fwk.ReservePlugin = &reservingPlugin{}