From 630e6a3bfdfaa397e1122ab5c51c49bd81f889fc Mon Sep 17 00:00:00 2001 From: vshkrabkov Date: Wed, 20 May 2026 18:27:49 +0000 Subject: [PATCH 1/2] proposal for scheduling library --- pkg/scheduler/framework/cycle_state.go | 13 ++++ pkg/scheduler/framework/cycle_state_test.go | 35 +++++++---- pkg/scheduler/framework/runtime/framework.go | 2 +- .../framework/runtime/framework_test.go | 37 +++++++++-- pkg/scheduler/schedule_one.go | 61 +++++++++++++++++++ pkg/scheduler/schedule_one_podgroup.go | 49 ++++----------- .../kube-scheduler/framework/cycle_state.go | 4 ++ 7 files changed, 144 insertions(+), 57 deletions(-) diff --git a/pkg/scheduler/framework/cycle_state.go b/pkg/scheduler/framework/cycle_state.go index 31b78725356..137e8cd30a2 100644 --- a/pkg/scheduler/framework/cycle_state.go +++ b/pkg/scheduler/framework/cycle_state.go @@ -38,6 +38,8 @@ type CycleState struct { skipPreBindPlugins sets.Set[string] // skipAllPostFilterPlugins indicates whether to skip all plugins in the PostFilter extension point. skipAllPostFilterPlugins bool + // skipAllScorePlugins indicates whether to skip all plugins in the Score extension point. + skipAllScorePlugins bool // GetParallelPreBindPlugins returns plugins that can be run in parallel with other plugins // in the PreBind extension point. parallelPreBindPlugins sets.Set[string] @@ -85,6 +87,16 @@ func (c *CycleState) GetSkipScorePlugins() sets.Set[string] { return c.skipScorePlugins } +// SetSkipAllScorePlugins sets whether to skip all plugins in the Score extension point. +func (c *CycleState) SetSkipAllScorePlugins(flag bool) { + c.skipAllScorePlugins = flag +} + +// ShouldSkipAllScorePlugins returns whether to skip all plugins in the Score extension point. +func (c *CycleState) ShouldSkipAllScorePlugins() bool { + return c.skipAllScorePlugins +} + func (c *CycleState) SetSkipPreBindPlugins(plugins sets.Set[string]) { c.skipPreBindPlugins = plugins } @@ -141,6 +153,7 @@ func (c *CycleState) Clone() fwk.CycleState { copy.parallelPreBindPlugins = c.parallelPreBindPlugins copy.podGroupCycleState = c.podGroupCycleState copy.skipAllPostFilterPlugins = c.skipAllPostFilterPlugins + copy.skipAllScorePlugins = c.skipAllScorePlugins return copy } diff --git a/pkg/scheduler/framework/cycle_state_test.go b/pkg/scheduler/framework/cycle_state_test.go index 70b70085fdf..a93b657c094 100644 --- a/pkg/scheduler/framework/cycle_state_test.go +++ b/pkg/scheduler/framework/cycle_state_test.go @@ -40,7 +40,7 @@ var key fwk.StateKey = "fakedata_key" // createCycleStateWithFakeData creates *CycleState with fakeData. // The given data is used in stored fakeData. -func createCycleStateWithFakeData(data string, recordPluginMetrics bool, skipAllPostFilterPlugins bool, skipPlugins ...[]string) *CycleState { +func createCycleStateWithFakeData(data string, recordPluginMetrics bool, skipAllPostFilterPlugins bool, skipAllScorePlugins bool, skipPlugins ...[]string) *CycleState { c := NewCycleState() c.Write(key, &fakeData{ data: data, @@ -53,6 +53,7 @@ func createCycleStateWithFakeData(data string, recordPluginMetrics bool, skipAll c.SetSkipScorePlugins(sets.New(skipPlugins[1]...)) } c.SetSkipAllPostFilterPlugins(skipAllPostFilterPlugins) + c.SetSkipAllScorePlugins(skipAllScorePlugins) return c } @@ -78,6 +79,9 @@ func isCycleStateEqual(a, b *CycleState) (bool, string) { if diff := cmp.Diff(a.skipAllPostFilterPlugins, b.skipAllPostFilterPlugins); diff != "" { return false, fmt.Sprintf("CycleState A and B have different SkipAllPostFilterPlugins sets. -wanted,+got:\n%s", diff) } + if diff := cmp.Diff(a.skipAllScorePlugins, b.skipAllScorePlugins); diff != "" { + return false, fmt.Sprintf("CycleState A and B have different SkipAllScorePlugins sets. -wanted,+got:\n%s", diff) + } var msg string isEqual := true @@ -133,33 +137,38 @@ func TestCycleStateClone(t *testing.T) { }{ { name: "clone with recordPluginMetrics true", - state: createCycleStateWithFakeData("data", true, false), - wantClonedState: createCycleStateWithFakeData("data", true, false), + state: createCycleStateWithFakeData("data", true, false, false), + wantClonedState: createCycleStateWithFakeData("data", true, false, false), }, { name: "clone with recordPluginMetrics false", - state: createCycleStateWithFakeData("data", false, false), - wantClonedState: createCycleStateWithFakeData("data", false, false), + state: createCycleStateWithFakeData("data", false, false, false), + wantClonedState: createCycleStateWithFakeData("data", false, false, false), }, { name: "clone with SkipFilterPlugins", - state: createCycleStateWithFakeData("data", true, false, []string{"p1", "p2", "p3"}), - wantClonedState: createCycleStateWithFakeData("data", true, false, []string{"p1", "p2", "p3"}), + state: createCycleStateWithFakeData("data", true, false, false, []string{"p1", "p2", "p3"}), + wantClonedState: createCycleStateWithFakeData("data", true, false, false, []string{"p1", "p2", "p3"}), }, { name: "clone with SkipScorePlugins", - state: createCycleStateWithFakeData("data", false, false, []string{}, []string{"p1", "p2", "p3"}), - wantClonedState: createCycleStateWithFakeData("data", false, false, []string{}, []string{"p1", "p2", "p3"}), + state: createCycleStateWithFakeData("data", false, false, false, []string{}, []string{"p1", "p2", "p3"}), + wantClonedState: createCycleStateWithFakeData("data", false, false, false, []string{}, []string{"p1", "p2", "p3"}), }, { name: "clone with SkipScorePlugins and SkipFilterPlugins", - state: createCycleStateWithFakeData("data", true, false, []string{"p0"}, []string{"p1", "p2", "p3"}), - wantClonedState: createCycleStateWithFakeData("data", true, false, []string{"p0"}, []string{"p1", "p2", "p3"}), + state: createCycleStateWithFakeData("data", true, false, false, []string{"p0"}, []string{"p1", "p2", "p3"}), + wantClonedState: createCycleStateWithFakeData("data", true, false, false, []string{"p0"}, []string{"p1", "p2", "p3"}), }, { name: "clone with SkipAllPostFilterPlugins", - state: createCycleStateWithFakeData("data", true, true, []string{"p0"}, []string{"p1", "p2", "p3"}), - wantClonedState: createCycleStateWithFakeData("data", true, true, []string{"p0"}, []string{"p1", "p2", "p3"}), + state: createCycleStateWithFakeData("data", true, true, false, []string{"p0"}, []string{"p1", "p2", "p3"}), + wantClonedState: createCycleStateWithFakeData("data", true, true, false, []string{"p0"}, []string{"p1", "p2", "p3"}), + }, + { + name: "clone with SkipAllScorePlugins", + state: createCycleStateWithFakeData("data", true, false, true, []string{"p0"}, []string{"p1", "p2", "p3"}), + wantClonedState: createCycleStateWithFakeData("data", true, false, true, []string{"p0"}, []string{"p1", "p2", "p3"}), }, { name: "clone with nil CycleState", diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 22f686c7f54..58368da55a9 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -1346,7 +1346,7 @@ func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state fwk.CycleStat plugins := make([]fwk.ScorePlugin, 0, numPlugins) pluginToNodeScores := make(map[string]fwk.NodeScoreList, numPlugins) for _, pl := range f.scorePlugins { - if state.GetSkipScorePlugins().Has(pl.Name()) { + if state.ShouldSkipAllScorePlugins() || state.GetSkipScorePlugins().Has(pl.Name()) { continue } plugins = append(plugins, pl) diff --git a/pkg/scheduler/framework/runtime/framework_test.go b/pkg/scheduler/framework/runtime/framework_test.go index cb06bed9c39..3300345ee86 100644 --- a/pkg/scheduler/framework/runtime/framework_test.go +++ b/pkg/scheduler/framework/runtime/framework_test.go @@ -1381,12 +1381,13 @@ func TestRunPreScorePlugins(t *testing.T) { func TestRunScorePlugins(t *testing.T) { tests := []struct { - name string - registry Registry - plugins *config.Plugins - pluginConfigs []config.PluginConfig - want []fwk.NodePluginScores - skippedPlugins sets.Set[string] + name string + registry Registry + plugins *config.Plugins + pluginConfigs []config.PluginConfig + want []fwk.NodePluginScores + skippedPlugins sets.Set[string] + skipAllScorePlugins bool // If err is true, we expect RunScorePlugin to fail. err bool }{ @@ -1744,6 +1745,29 @@ func TestRunScorePlugins(t *testing.T) { }, }, }, + { + name: "skip all score plugins", + plugins: buildScoreConfigDefaultWeights(scorePlugin1), + skipAllScorePlugins: true, + pluginConfigs: []config.PluginConfig{ + { + Name: scorePlugin1, + Args: &runtime.Unknown{ + Raw: []byte(`{ "scoreStatus": 1 }`), // To make sure this plugin isn't called, set error as an injected result. + }, + }, + }, + want: []fwk.NodePluginScores{ + { + Name: "node1", + Scores: []fwk.PluginScore{}, + }, + { + Name: "node2", + Scores: []fwk.PluginScore{}, + }, + }, + }, } for _, tt := range tests { @@ -1766,6 +1790,7 @@ func TestRunScorePlugins(t *testing.T) { state := framework.NewCycleState() state.SetSkipScorePlugins(tt.skippedPlugins) + state.SetSkipAllScorePlugins(tt.skipAllScorePlugins) res, status := f.RunScorePlugins(ctx, state, pod, BuildNodeInfos(nodes)) if tt.err { diff --git a/pkg/scheduler/schedule_one.go b/pkg/scheduler/schedule_one.go index 9421ed2eadb..a22a395366d 100644 --- a/pkg/scheduler/schedule_one.go +++ b/pkg/scheduler/schedule_one.go @@ -251,6 +251,67 @@ func (sched *Scheduler) prepareForBindingCycle( return assumedPodInfo, nil } +type SimulationResult struct { + Pod *v1.Pod + ScheduleResult ScheduleResult + Status *fwk.Status + RequiresPreemption bool + AssumedPodInfo *framework.QueuedPodInfo +} + +func (sched *Scheduler) SimulateScheduling(ctx context.Context, + state fwk.CycleState, + schedFramework framework.Framework, + podInfo *framework.QueuedPodInfo, +) (*SimulationResult, func()) { + pod := podInfo.GetPod() + + requiresPreemption := false + scheduleResult, status := sched.schedulingAlgorithm(ctx, state, schedFramework, podInfo, time.Now()) + if !status.IsSuccess() { + if scheduleResult.nominatingInfo != nil && scheduleResult.nominatingInfo.NominatedNodeName != "" { + // If the NominatedNodeName is set, the preemption is required. + // Continue with assuming and reserving, because the subsequent pods from this group + // have to see this one as already scheduled on its nominated place. + // Set SuggestedHost to NominatedNodeName to handle the pod similarly to one that is feasible. + scheduleResult.SuggestedHost = scheduleResult.nominatingInfo.NominatedNodeName + requiresPreemption = true + } else { + // In case of pod being just unschedulable or having an error, just return now. + return &SimulationResult{ + Pod: pod, + ScheduleResult: scheduleResult, + Status: status, + }, nil + } + } + + assumedPodInfo, assumeStatus := sched.assumeAndReserve(ctx, state, schedFramework, podInfo, scheduleResult) + if !assumeStatus.IsSuccess() { + return &SimulationResult{ + Pod: pod, + ScheduleResult: ScheduleResult{nominatingInfo: clearNominatedNode}, + Status: assumeStatus, + }, nil + } + + revertFn := func() { + err := sched.unreserveAndForget(ctx, state, schedFramework, assumedPodInfo, scheduleResult.SuggestedHost) + if err != nil { + utilruntime.HandleErrorWithContext(ctx, err, "ForgetPod failed") + } + } + + return &SimulationResult{ + Pod: pod, + ScheduleResult: scheduleResult, + Status: status, + RequiresPreemption: requiresPreemption, + AssumedPodInfo: assumedPodInfo, + }, revertFn + +} + // schedulingAlgorithm runs fitering and scoring phases for a single pod, // together with post filter when the pod is unschedulable. func (sched *Scheduler) schedulingAlgorithm( diff --git a/pkg/scheduler/schedule_one_podgroup.go b/pkg/scheduler/schedule_one_podgroup.go index 63a54b99d8e..df0f7cea66b 100644 --- a/pkg/scheduler/schedule_one_podgroup.go +++ b/pkg/scheduler/schedule_one_podgroup.go @@ -416,46 +416,21 @@ func (sched *Scheduler) podGroupPodSchedulingAlgorithm(ctx context.Context, sche logger.V(4).Info("Attempting to schedule a pod belonging to a pod group", "podGroup", klog.KObj(podGroupInfo), "pod", klog.KObj(pod)) - requiresPreemption := false - scheduleResult, status := sched.schedulingAlgorithm(ctx, podCtx.state, schedFwk, podInfo, start) - if !status.IsSuccess() { - if scheduleResult.nominatingInfo != nil && scheduleResult.nominatingInfo.NominatedNodeName != "" { - // If the NominatedNodeName is set, the preemption is required. - // Continue with assuming and reserving, because the subsequent pods from this group - // have to see this one as already scheduled on its nominated place. - // Set SuggestedHost to NominatedNodeName to handle the pod similarly to one that is feasible. - scheduleResult.SuggestedHost = scheduleResult.nominatingInfo.NominatedNodeName - requiresPreemption = true - } else { - // In case of pod being just unschedulable or having an error, just return now. - return algorithmResult{ - pod: pod, - scheduleResult: scheduleResult, - podCtx: podCtx, - schedulingDuration: time.Since(start), - status: status, - }, nil - } - } - assumedPodInfo, assumeStatus := sched.assumeAndReserve(ctx, podCtx.state, schedFwk, podInfo, scheduleResult) - if !assumeStatus.IsSuccess() { + simResult, revertFn := sched.SimulateScheduling(ctx, podCtx.state, schedFwk, podInfo) + + if simResult.AssumedPodInfo == nil { return algorithmResult{ - pod: pod, - scheduleResult: ScheduleResult{nominatingInfo: clearNominatedNode}, + pod: simResult.Pod, + scheduleResult: simResult.ScheduleResult, podCtx: podCtx, schedulingDuration: time.Since(start), - status: assumeStatus, + status: simResult.Status, }, nil } - revertFn := func() { - err := sched.unreserveAndForget(ctx, podCtx.state, schedFwk, assumedPodInfo, scheduleResult.SuggestedHost) - if err != nil { - utilruntime.HandleErrorWithContext(ctx, err, "ForgetPod failed") - } - } + assumedPodInfo := simResult.AssumedPodInfo - _, permitStatus := schedFwk.RunPermitPlugins(ctx, podCtx.state, assumedPodInfo.Pod, scheduleResult.SuggestedHost) + _, permitStatus := schedFwk.RunPermitPlugins(ctx, podCtx.state, assumedPodInfo.Pod, simResult.ScheduleResult.SuggestedHost) if !permitStatus.IsWait() && !permitStatus.IsSuccess() { revertFn() if permitStatus.IsRejected() { @@ -466,7 +441,7 @@ func (sched *Scheduler) podGroupPodSchedulingAlgorithm(ctx context.Context, sche NodeToStatus: framework.NewDefaultNodeToStatus(), }, } - fitErr.Diagnosis.NodeToStatus.Set(scheduleResult.SuggestedHost, permitStatus) + fitErr.Diagnosis.NodeToStatus.Set(simResult.ScheduleResult.SuggestedHost, permitStatus) fitErr.Diagnosis.AddPluginStatus(permitStatus) permitStatus = fwk.NewStatus(permitStatus.Code()).WithError(fitErr) } @@ -481,12 +456,12 @@ func (sched *Scheduler) podGroupPodSchedulingAlgorithm(ctx context.Context, sche return algorithmResult{ pod: pod, - scheduleResult: scheduleResult, + scheduleResult: simResult.ScheduleResult, podCtx: podCtx, schedulingDuration: time.Since(start), - status: status, + status: simResult.Status, permitStatus: permitStatus, - requiresPreemption: requiresPreemption, + requiresPreemption: simResult.RequiresPreemption, }, revertFn } diff --git a/staging/src/k8s.io/kube-scheduler/framework/cycle_state.go b/staging/src/k8s.io/kube-scheduler/framework/cycle_state.go index 4d103b2e39c..e02f208e901 100644 --- a/staging/src/k8s.io/kube-scheduler/framework/cycle_state.go +++ b/staging/src/k8s.io/kube-scheduler/framework/cycle_state.go @@ -76,6 +76,10 @@ type CycleState interface { // ShouldSkipAllPostFilterPlugins returns whether all plugins should be skipped in the PostFilter extension point. // This function is mostly for the scheduling framework runtime, plugins usually don't have to use it. ShouldSkipAllPostFilterPlugins() bool + // SetSkipAllScorePlugins sets whether to skip all plugins in the Score extension point. + SetSkipAllScorePlugins(flag bool) + // ShouldSkipAllScorePlugins returns whether to skip all plugins in the Score extension point. + ShouldSkipAllScorePlugins() bool // Read retrieves data with the given "key" from CycleState. If the key is not // present, ErrNotFound is returned. From ce3cd58b2274c327e5bcd88bf8973cc8094d1426 Mon Sep 17 00:00:00 2001 From: vshkrabkov Date: Thu, 21 May 2026 17:52:25 +0000 Subject: [PATCH 2/2] export assumeAndReserve and unreserveAndForget from scheduler, adds assume all existing pods function for snapshot --- pkg/scheduler/backend/cache/snapshot.go | 25 +++ pkg/scheduler/backend/cache/snapshot_test.go | 65 ++++++ pkg/scheduler/schedule_one.go | 46 ++-- pkg/scheduler/schedule_one_podgroup.go | 24 +- pkg/scheduler/schedule_one_test.go | 219 +++++++++++++++++++ 5 files changed, 349 insertions(+), 30 deletions(-) diff --git a/pkg/scheduler/backend/cache/snapshot.go b/pkg/scheduler/backend/cache/snapshot.go index d4b2b92fee1..a1d6aeacf3a 100644 --- a/pkg/scheduler/backend/cache/snapshot.go +++ b/pkg/scheduler/backend/cache/snapshot.go @@ -480,3 +480,28 @@ func (s *Snapshot) ListNodesInPlacement() ([]fwk.NodeInfo, error) { } return s.placementNodes.nodeInfoList, nil } + +// AssumeAllExistingPods assumes all pods that are already present in the snapshot. +func (s *Snapshot) AssumeAllExistingPods() error { + currentlyAssumed := sets.New[string]() + revertAssumed := func() { + for key := range currentlyAssumed { + delete(s.assumedPods, key) + } + } + + for _, nodeInfo := range s.nodeInfoList { + for _, podInfo := range nodeInfo.GetPods() { + p := podInfo.GetPod() + key, err := framework.GetPodKey(p) + if err != nil { + revertAssumed() + return fmt.Errorf("couldn't create key for pod: %w", err) + } + s.assumedPods[key] = p + currentlyAssumed.Insert(key) + } + } + + return nil +} diff --git a/pkg/scheduler/backend/cache/snapshot_test.go b/pkg/scheduler/backend/cache/snapshot_test.go index 5907e9534be..346544bb9f5 100644 --- a/pkg/scheduler/backend/cache/snapshot_test.go +++ b/pkg/scheduler/backend/cache/snapshot_test.go @@ -798,6 +798,71 @@ func TestSnapshot_Placement(t *testing.T) { } } +func TestSnapshot_AssumeAllExistingPods(t *testing.T) { + node1 := st.MakeNode().Name("node-1").Obj() + node2 := st.MakeNode().Name("node-2").Obj() + + pod1 := st.MakePod().Name("pod-1").Namespace("ns").UID("pod-1").Node("node-1").Obj() + pod2 := st.MakePod().Name("pod-2").Namespace("ns").UID("pod-2").Node("node-1").Obj() + pod3 := st.MakePod().Name("pod-3").Namespace("ns").UID("pod-3").Node("node-2").Obj() + + tests := []struct { + name string + initialPods []*v1.Pod + initialNodes []*v1.Node + preAssumedPods map[string]*v1.Pod + expectedAssumedPods sets.Set[string] + }{ + { + name: "all existing pods assumed successfully", + initialPods: []*v1.Pod{pod1, pod2, pod3}, + initialNodes: []*v1.Node{node1, node2}, + expectedAssumedPods: sets.New("pod-1", "pod-2", "pod-3"), + }, + { + name: "empty snapshot, no pods assumed", + initialPods: nil, + initialNodes: []*v1.Node{node1, node2}, + expectedAssumedPods: sets.New[string](), + }, + { + name: "pods already pre-assumed are kept or overwritten", + initialPods: []*v1.Pod{pod1}, + initialNodes: []*v1.Node{node1}, + preAssumedPods: map[string]*v1.Pod{ + "pod-2": pod2, + }, + expectedAssumedPods: sets.New("pod-1", "pod-2"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + snapshot := NewSnapshot(tt.initialPods, tt.initialNodes) + if tt.preAssumedPods != nil { + for k, v := range tt.preAssumedPods { + snapshot.assumedPods[k] = v + } + } + + err := snapshot.AssumeAllExistingPods() + if err != nil { + t.Fatalf("AssumeAllExistingPods() failed: %v", err) + } + + if len(tt.expectedAssumedPods) != len(snapshot.assumedPods) { + t.Errorf("Unexpected number of assumed pods: want %d, got %d", len(tt.expectedAssumedPods), len(snapshot.assumedPods)) + } + + for key := range tt.expectedAssumedPods { + if _, ok := snapshot.assumedPods[key]; !ok { + t.Errorf("Expected pod %q to be assumed, but it wasn't", key) + } + } + }) + } +} + func TestSnapshot_BackupRestore(t *testing.T) { podWithAffinity := st.MakePod().Name("p-aff").Namespace("ns").UID("p-aff").PodAffinity("key", &metav1.LabelSelector{MatchLabels: map[string]string{"key": "value"}}, st.PodAffinityWithRequiredReq).Node("node-1").Obj() podWithAntiAffinity := st.MakePod().Name("p-anti").Namespace("ns").UID("p-anti").PodAntiAffinity("key", &metav1.LabelSelector{MatchLabels: map[string]string{"key": "value"}}, st.PodAntiAffinityWithRequiredReq).Node("node-1").Obj() diff --git a/pkg/scheduler/schedule_one.go b/pkg/scheduler/schedule_one.go index a22a395366d..76b3de78271 100644 --- a/pkg/scheduler/schedule_one.go +++ b/pkg/scheduler/schedule_one.go @@ -207,7 +207,7 @@ func (sched *Scheduler) prepareForBindingCycle( podsToActivate *framework.PodsToActivate, scheduleResult ScheduleResult, ) (*framework.QueuedPodInfo, *fwk.Status) { - assumedPodInfo, status := sched.assumeAndReserve(ctx, state, schedFramework, podInfo, scheduleResult) + assumedPodInfo, status := sched.AssumeAndReserve(ctx, state, schedFramework, podInfo, scheduleResult) if !status.IsSuccess() { return assumedPodInfo, status } @@ -219,7 +219,7 @@ func (sched *Scheduler) prepareForBindingCycle( schedFramework.AddWaitingPod(assumedPod, pluginsWaitTime) } else if !runPermitStatus.IsSuccess() { // trigger un-reserve plugins to clean up state associated with the reserved Pod - err := sched.unreserveAndForget(ctx, state, schedFramework, assumedPodInfo, scheduleResult.SuggestedHost) + err := sched.UnreserveAndForget(ctx, state, schedFramework, assumedPodInfo, scheduleResult.SuggestedHost) if err != nil { utilruntime.HandleErrorWithContext(ctx, err, "ForgetPod failed") } @@ -251,19 +251,29 @@ func (sched *Scheduler) prepareForBindingCycle( return assumedPodInfo, nil } -type SimulationResult struct { - Pod *v1.Pod - ScheduleResult ScheduleResult - Status *fwk.Status +// SchedulingTryResult holds the outcome of a TryScheduling operation. +type SchedulingTryResult struct { + // Pod is the pod that was tentatively scheduled. + Pod *v1.Pod + // ScheduleResult contains the details of the scheduling decision (e.g., suggested host). + ScheduleResult ScheduleResult + // Status indicates the success or failure of the scheduling operation. + Status *fwk.Status + // RequiresPreemption is true if the pod was only schedulable after nominating a node for preemption. RequiresPreemption bool - AssumedPodInfo *framework.QueuedPodInfo + // AssumedPodInfo contains the queued pod info after assumption and reservation. + AssumedPodInfo *framework.QueuedPodInfo } -func (sched *Scheduler) SimulateScheduling(ctx context.Context, +// TryScheduling performs a tentative scheduling of a pod by running the scheduling +// algorithm and assuming the pod in memory. +// It returns a revert function that can be used to undo the assumption/reservation. +// This is primarily used in pod group scheduling to check if an entire group can fit. +func (sched *Scheduler) TryScheduling(ctx context.Context, state fwk.CycleState, schedFramework framework.Framework, podInfo *framework.QueuedPodInfo, -) (*SimulationResult, func()) { +) (*SchedulingTryResult, func()) { pod := podInfo.GetPod() requiresPreemption := false @@ -278,7 +288,7 @@ func (sched *Scheduler) SimulateScheduling(ctx context.Context, requiresPreemption = true } else { // In case of pod being just unschedulable or having an error, just return now. - return &SimulationResult{ + return &SchedulingTryResult{ Pod: pod, ScheduleResult: scheduleResult, Status: status, @@ -286,9 +296,9 @@ func (sched *Scheduler) SimulateScheduling(ctx context.Context, } } - assumedPodInfo, assumeStatus := sched.assumeAndReserve(ctx, state, schedFramework, podInfo, scheduleResult) + assumedPodInfo, assumeStatus := sched.AssumeAndReserve(ctx, state, schedFramework, podInfo, scheduleResult) if !assumeStatus.IsSuccess() { - return &SimulationResult{ + return &SchedulingTryResult{ Pod: pod, ScheduleResult: ScheduleResult{nominatingInfo: clearNominatedNode}, Status: assumeStatus, @@ -296,13 +306,13 @@ func (sched *Scheduler) SimulateScheduling(ctx context.Context, } revertFn := func() { - err := sched.unreserveAndForget(ctx, state, schedFramework, assumedPodInfo, scheduleResult.SuggestedHost) + err := sched.UnreserveAndForget(ctx, state, schedFramework, assumedPodInfo, scheduleResult.SuggestedHost) if err != nil { utilruntime.HandleErrorWithContext(ctx, err, "ForgetPod failed") } } - return &SimulationResult{ + return &SchedulingTryResult{ Pod: pod, ScheduleResult: scheduleResult, Status: status, @@ -371,7 +381,7 @@ func (sched *Scheduler) schedulingAlgorithm( } // assumeAndReserve assumes and reserves the pod in scheduler's memory. -func (sched *Scheduler) assumeAndReserve( +func (sched *Scheduler) AssumeAndReserve( ctx context.Context, state fwk.CycleState, schedFramework framework.Framework, @@ -397,7 +407,7 @@ func (sched *Scheduler) assumeAndReserve( // Run the Reserve method of reserve plugins. if sts := schedFramework.RunReservePluginsReserve(ctx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() { // trigger un-reserve to clean up state associated with the reserved Pod - err := sched.unreserveAndForget(ctx, state, schedFramework, assumedPodInfo, scheduleResult.SuggestedHost) + err := sched.UnreserveAndForget(ctx, state, schedFramework, assumedPodInfo, scheduleResult.SuggestedHost) if err != nil { utilruntime.HandleErrorWithContext(ctx, err, "ForgetPod failed") } @@ -422,7 +432,7 @@ func (sched *Scheduler) assumeAndReserve( // unreserveAndForget unreserves and forgets the pod from scheduler's memory. // This function shouldn't be called during binding cycle with a state, where IsPodGroupSchedulingCycle is set to true, // but this shouldn't happen, because such pods with such state cannot reach binding. -func (sched *Scheduler) unreserveAndForget( +func (sched *Scheduler) UnreserveAndForget( ctx context.Context, state fwk.CycleState, schedFramework framework.Framework, @@ -575,7 +585,7 @@ func (sched *Scheduler) handleBindingCycleError( assumedPod := podInfo.Pod // trigger un-reserve plugins to clean up state associated with the reserved Pod - if forgetErr := sched.unreserveAndForget(ctx, state, fwk, podInfo, scheduleResult.SuggestedHost); forgetErr != nil { + if forgetErr := sched.UnreserveAndForget(ctx, state, fwk, podInfo, scheduleResult.SuggestedHost); forgetErr != nil { utilruntime.HandleErrorWithContext(ctx, forgetErr, "ForgetPod failed") } else { // "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event, diff --git a/pkg/scheduler/schedule_one_podgroup.go b/pkg/scheduler/schedule_one_podgroup.go index df0f7cea66b..77b3c241606 100644 --- a/pkg/scheduler/schedule_one_podgroup.go +++ b/pkg/scheduler/schedule_one_podgroup.go @@ -416,21 +416,21 @@ func (sched *Scheduler) podGroupPodSchedulingAlgorithm(ctx context.Context, sche logger.V(4).Info("Attempting to schedule a pod belonging to a pod group", "podGroup", klog.KObj(podGroupInfo), "pod", klog.KObj(pod)) - simResult, revertFn := sched.SimulateScheduling(ctx, podCtx.state, schedFwk, podInfo) + tryResult, revertFn := sched.TryScheduling(ctx, podCtx.state, schedFwk, podInfo) + assumedPodInfo := tryResult.AssumedPodInfo + schedRes := tryResult.ScheduleResult - if simResult.AssumedPodInfo == nil { + if assumedPodInfo == nil { return algorithmResult{ - pod: simResult.Pod, - scheduleResult: simResult.ScheduleResult, + pod: tryResult.Pod, + scheduleResult: tryResult.ScheduleResult, podCtx: podCtx, schedulingDuration: time.Since(start), - status: simResult.Status, + status: tryResult.Status, }, nil } - assumedPodInfo := simResult.AssumedPodInfo - - _, permitStatus := schedFwk.RunPermitPlugins(ctx, podCtx.state, assumedPodInfo.Pod, simResult.ScheduleResult.SuggestedHost) + _, permitStatus := schedFwk.RunPermitPlugins(ctx, podCtx.state, assumedPodInfo.Pod, schedRes.SuggestedHost) if !permitStatus.IsWait() && !permitStatus.IsSuccess() { revertFn() if permitStatus.IsRejected() { @@ -441,7 +441,7 @@ func (sched *Scheduler) podGroupPodSchedulingAlgorithm(ctx context.Context, sche NodeToStatus: framework.NewDefaultNodeToStatus(), }, } - fitErr.Diagnosis.NodeToStatus.Set(simResult.ScheduleResult.SuggestedHost, permitStatus) + fitErr.Diagnosis.NodeToStatus.Set(schedRes.SuggestedHost, permitStatus) fitErr.Diagnosis.AddPluginStatus(permitStatus) permitStatus = fwk.NewStatus(permitStatus.Code()).WithError(fitErr) } @@ -456,12 +456,12 @@ func (sched *Scheduler) podGroupPodSchedulingAlgorithm(ctx context.Context, sche return algorithmResult{ pod: pod, - scheduleResult: simResult.ScheduleResult, + scheduleResult: schedRes, podCtx: podCtx, schedulingDuration: time.Since(start), - status: simResult.Status, + status: tryResult.Status, permitStatus: permitStatus, - requiresPreemption: simResult.RequiresPreemption, + requiresPreemption: tryResult.RequiresPreemption, }, revertFn } diff --git a/pkg/scheduler/schedule_one_test.go b/pkg/scheduler/schedule_one_test.go index 3b379ffcc2f..da4a2bf66f3 100644 --- a/pkg/scheduler/schedule_one_test.go +++ b/pkg/scheduler/schedule_one_test.go @@ -4902,3 +4902,222 @@ func TestEvaluateNominatedNode(t *testing.T) { }) } } + +func TestTryScheduling(t *testing.T) { + node1 := st.MakeNode().Name("node1").Obj() + pod1 := st.MakePod().Name("pod1").Namespace("default").UID("pod1").Obj() + + tests := []struct { + name string + pod *v1.Pod + filterStatus *fwk.Status + postFilterStatus *fwk.Status + postFilterResult *fwk.PostFilterResult + reserveStatus *fwk.Status + isPodGroupCycle bool + wantSuccess bool + wantSuggestedHost string + wantAssumedInCache bool + wantAssumedInSnap bool + wantRequiresPreempt bool + wantErrorMessage string + }{ + { + name: "success: pod fits on node", + pod: pod1, + filterStatus: fwk.NewStatus(fwk.Success), + wantSuccess: true, + wantSuggestedHost: "node1", + wantAssumedInCache: true, + }, + { + name: "failure: algorithm finds no nodes", + pod: pod1, + filterStatus: fwk.NewStatus(fwk.Unschedulable, "fake failure"), + postFilterStatus: fwk.NewStatus(fwk.Unschedulable), + wantSuccess: false, + wantErrorMessage: "fake failure", + }, + { + name: "preemption: algorithm fails but PostFilter nominates node", + pod: pod1, + filterStatus: fwk.NewStatus(fwk.Unschedulable, "fake failure"), + postFilterStatus: fwk.NewStatus(fwk.Success), + postFilterResult: &fwk.PostFilterResult{NominatingInfo: &fwk.NominatingInfo{NominatedNodeName: "node1", NominatingMode: fwk.ModeOverride}}, + wantSuccess: false, + wantRequiresPreempt: true, + wantSuggestedHost: "node1", + wantAssumedInCache: true, + }, + { + name: "reserve failure: algorithm succeeds but Reserve plugin fails", + pod: pod1, + filterStatus: fwk.NewStatus(fwk.Success), + reserveStatus: fwk.NewStatus(fwk.Error, "reserve fake failure"), + wantSuccess: false, + wantErrorMessage: "reserve fake failure", + wantAssumedInCache: false, + }, + { + name: "pod group cycle: assumed in snapshot only", + pod: pod1, + filterStatus: fwk.NewStatus(fwk.Success), + isPodGroupCycle: true, + wantSuccess: true, + wantSuggestedHost: "node1", + wantAssumedInCache: false, + wantAssumedInSnap: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + client := clientsetfake.NewClientset(node1, tt.pod) + informerFactory := informers.NewSharedInformerFactory(client, 0) + cache := internalcache.New(ctx, nil, true) + cache.AddNode(logger, node1) + snapshot := internalcache.NewEmptySnapshot() + queue := internalqueue.NewTestQueue(ctx, nil) + + podInfo, _ := framework.NewPodInfo(tt.pod) + queuedPodInfo := &framework.QueuedPodInfo{PodInfo: podInfo} + + fakePlugin := &trySchedulingPlugin{ + fakePodGroupPlugin: &fakePodGroupPlugin{ + filterStatus: map[string]*fwk.Status{tt.pod.Name: tt.filterStatus}, + postFilterStatus: map[string]*fwk.Status{tt.pod.Name: tt.postFilterStatus}, + postFilterResult: map[string]*fwk.PostFilterResult{tt.pod.Name: tt.postFilterResult}, + }, + reserveStatus: tt.reserveStatus, + } + + registry := frameworkruntime.Registry{ + queuesort.Name: queuesort.New, + defaultbinder.Name: defaultbinder.New, + "TrySchedulingPlugin": func(ctx context.Context, obj runtime.Object, handle fwk.Handle) (fwk.Plugin, error) { + return fakePlugin, nil + }, + } + profileCfg := schedulerapi.KubeSchedulerProfile{ + SchedulerName: "default-scheduler", + Plugins: &schedulerapi.Plugins{ + QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: queuesort.Name}}}, + Filter: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "TrySchedulingPlugin"}}}, + PostFilter: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "TrySchedulingPlugin"}}}, + Reserve: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "TrySchedulingPlugin"}}}, + Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: defaultbinder.Name}}}, + }, + } + + schedFwk, err := frameworkruntime.NewFramework(ctx, registry, &profileCfg, + frameworkruntime.WithInformerFactory(informerFactory), + frameworkruntime.WithSnapshotSharedLister(snapshot), + frameworkruntime.WithPodNominator(queue), + ) + if err != nil { + t.Fatalf("Failed to create framework: %v", err) + } + + sched := &Scheduler{ + Cache: cache, + nodeInfoSnapshot: snapshot, + Profiles: profile.Map{"default-scheduler": schedFwk}, + SchedulingQueue: queue, + } + sched.SchedulePod = sched.schedulePod + + if err := sched.Cache.UpdateSnapshot(logger, sched.nodeInfoSnapshot); err != nil { + t.Fatalf("Failed to update snapshot: %v", err) + } + + state := framework.NewCycleState() + if tt.isPodGroupCycle { + state.SetPodGroupSchedulingCycle(framework.NewCycleState()) + } + + tryResult, revertFn := sched.TryScheduling(ctx, state, schedFwk, queuedPodInfo) + + // Verify Success/Failure + if tryResult.Status.IsSuccess() != tt.wantSuccess { + t.Errorf("tryResult.Status.IsSuccess() = %v, want %v", tryResult.Status.IsSuccess(), tt.wantSuccess) + } + + // Verify Error Message + if tt.wantErrorMessage != "" && !strings.Contains(tryResult.Status.Message(), tt.wantErrorMessage) { + t.Errorf("tryResult.Status.Message() = %q, want it to contain %q", tryResult.Status.Message(), tt.wantErrorMessage) + } + + // Verify Suggested Host + if tryResult.ScheduleResult.SuggestedHost != tt.wantSuggestedHost { + t.Errorf("tryResult.ScheduleResult.SuggestedHost = %q, want %q", tryResult.ScheduleResult.SuggestedHost, tt.wantSuggestedHost) + } + + // Verify RequiresPreemption + if tryResult.RequiresPreemption != tt.wantRequiresPreempt { + t.Errorf("tryResult.RequiresPreemption = %v, want %v", tryResult.RequiresPreemption, tt.wantRequiresPreempt) + } + + // Verify Assumption in Cache + isAssumed, _ := cache.IsAssumedPod(tt.pod) + if isAssumed != tt.wantAssumedInCache { + t.Errorf("cache.IsAssumedPod() = %v, want %v", isAssumed, tt.wantAssumedInCache) + } + + // Verify Assumption in Snapshot + inSnap := false + if nodeInfo, err := snapshot.Get("node1"); err == nil { + for _, p := range nodeInfo.GetPods() { + if p.GetPod().Name == tt.pod.Name { + inSnap = true + break + } + } + } + if inSnap != tt.wantAssumedInSnap { + t.Errorf("pod in snapshot = %v, want %v", inSnap, tt.wantAssumedInSnap) + } + + // Verify Revert Function + if (revertFn != nil) != (tt.wantAssumedInCache || tt.wantAssumedInSnap) { + t.Errorf("revertFn is nil = %v, want %v", revertFn == nil, !(tt.wantAssumedInCache || tt.wantAssumedInSnap)) + } + + if revertFn != nil { + revertFn() + isAssumed, _ = cache.IsAssumedPod(tt.pod) + if isAssumed { + t.Errorf("pod still assumed in cache after revert") + } + inSnap = false + if nodeInfo, err := snapshot.Get("node1"); err == nil { + for _, p := range nodeInfo.GetPods() { + if p.GetPod().Name == tt.pod.Name { + inSnap = true + break + } + } + } + if inSnap { + t.Errorf("pod still in snapshot after revert") + } + } + }) + } +} + +// trySchedulingPlugin is a mock plugin used in TestTryScheduling to control +// the outcome of various extension points. +type trySchedulingPlugin struct { + *fakePodGroupPlugin + reserveStatus *fwk.Status +} + +func (p *trySchedulingPlugin) Reserve(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) *fwk.Status { + if p.reserveStatus != nil { + return p.reserveStatus + } + return fwk.NewStatus(fwk.Success) +} +func (p *trySchedulingPlugin) Unreserve(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) { +}