From 1e1bad1ddee6bb65efd298528aafb50d4106d18a Mon Sep 17 00:00:00 2001 From: Bartosz Date: Wed, 13 May 2026 14:02:43 +0000 Subject: [PATCH] Add PlacementFeasible plugin to support early gang termination --- pkg/scheduler/framework/interface.go | 23 ++ .../plugins/gangscheduling/gangscheduling.go | 80 ++++- .../gangscheduling/gangscheduling_test.go | 295 +++++++++++++++++- pkg/scheduler/framework/runtime/framework.go | 55 ++++ .../framework/runtime/framework_test.go | 136 ++++++++ pkg/scheduler/metrics/metrics.go | 2 + .../scheduler/podgroup/podgroup_test.go | 7 +- 7 files changed, 575 insertions(+), 23 deletions(-) diff --git a/pkg/scheduler/framework/interface.go b/pkg/scheduler/framework/interface.go index ff70630fa8e..7fafc2e0ee7 100644 --- a/pkg/scheduler/framework/interface.go +++ b/pkg/scheduler/framework/interface.go @@ -180,6 +180,21 @@ type PodGroupPostFilterPlugin interface { PodGroupPostFilter(ctx context.Context, pg *v1alpha3.PodGroup, pods []*v1.Pod, pgSchedulingFunc PodGroupSchedulingFunc) (*PodGroupPostFilterResult, *fwk.Status) } +// PlacementFeasiblePlugin is an interface for plugins that are called after each pod in a pod group is evaluated. +// It is used to determine if a pod group is schedulable, may become schedulable or will not become schedulable regardless of the scheduling result of the remaining pods in the pod group. +type PlacementFeasiblePlugin interface { + fwk.Plugin + + // PlacementFeasible is called after each pod in a pod group is evaluated. + // Use placementCycleState to accumulate the results from the evaluated pods in current cycle. + // Return Unschedulable status if the pod group cannot be scheduled in the current state, but may become schedulable once more pods are evaluated. + // Return UnschedulableAndUnresolvable status if the pod group cannot be scheduled in the current placement. + // The scheduler will give up this placement and won't even evaluate remaining pods. The placement will remain eligible for preemption. + // Return Success status if the pod group can be scheduled in the current state. + // After returning Success, the plugin should keep returning Success for the remaining pods. + PlacementFeasible(ctx context.Context, placementCycleState fwk.PodGroupCycleState, podGroupInfo fwk.PodGroupInfo) *fwk.Status +} + // Framework manages the set of plugins in use by the scheduling framework. // Configured plugins are called at specified points in a scheduling context. type Framework interface { @@ -262,6 +277,14 @@ type Framework interface { // This function itself will NOT create a waiting pod object and the caller should call AddWaitingPod method to do this. RunPermitPlugins(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) (pluginsWaitTime map[string]time.Duration, status *fwk.Status) + // RunPlacementFeasiblePlugins runs the set of configured Permit plugins that implement PlacementFeasible interface. + // The result will be Success if all plugins return Success. + // The only other valid statuses are UnschedulableAndUnresolvable and Unschedulable. + // If any plugin returns invalid status, the result will be Error and the remaining plugins won't be invoked. + // Otherwise, if at least 1 plugin returns UnschedulableAndUnresolvable, the remaining plugins won't be invoked and the result will be UnschdulableAndUnresolvable. The placement will remain eligible for preemption. + // Otherwise, if at least 1 plugin returns Unschedulable, the remaining plugins will be invoked and the result will be Unschedulable. + RunPlacementFeasiblePlugins(ctx context.Context, placementCycleState fwk.PodGroupCycleState, podGroupInfo fwk.PodGroupInfo) *fwk.Status + // AddWaitingPod creates a waiting pod instance and adds it to the framework. // It takes the pluginsWaitTime map returned by the RunPermitPlugins. // Pod will remain waiting pod for the minimum duration returned by the Permit plugins. diff --git a/pkg/scheduler/framework/plugins/gangscheduling/gangscheduling.go b/pkg/scheduler/framework/plugins/gangscheduling/gangscheduling.go index d18fc55adf7..5799d22f61d 100644 --- a/pkg/scheduler/framework/plugins/gangscheduling/gangscheduling.go +++ b/pkg/scheduler/framework/plugins/gangscheduling/gangscheduling.go @@ -28,6 +28,7 @@ import ( schedulinglisters "k8s.io/client-go/listers/scheduling/v1alpha3" "k8s.io/klog/v2" fwk "k8s.io/kube-scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" @@ -54,6 +55,7 @@ type GangScheduling struct { var _ fwk.EnqueueExtensions = &GangScheduling{} var _ fwk.PreEnqueuePlugin = &GangScheduling{} var _ fwk.PermitPlugin = &GangScheduling{} +var _ framework.PlacementFeasiblePlugin = &GangScheduling{} // New initializes a new plugin and returns it. func New(_ context.Context, _ runtime.Object, fh fwk.Handle, fts feature.Features) (fwk.Plugin, error) { @@ -182,15 +184,7 @@ func (pl *GangScheduling) Permit(ctx context.Context, state fwk.CycleState, pod return nil, 0 } - // Select a lister for the pod group state based on the currently executed scheduling phase. - // In the pod group scheduling cycle, it reads from the snapshot. - // Otherwise, it reads the runtime state of the pod group from the cache. - var podGroupStateLister fwk.PodGroupStateLister - if state.IsPodGroupSchedulingCycle() { - podGroupStateLister = pl.snapshotLister.PodGroupStates() - } else { - podGroupStateLister = pl.podGroupManager.PodGroupStates() - } + podGroupStateLister := pl.podGroupManager.PodGroupStates() podGroupState, err := podGroupStateLister.Get(namespace, *schedulingGroup.PodGroupName) if err != nil { return fwk.AsStatus(err), 0 @@ -217,3 +211,71 @@ func (pl *GangScheduling) Permit(ctx context.Context, state fwk.CycleState, pod return nil, 0 } + +const placementFeasibleStateKey = "PlacementFeasible" + Name + +type placementFeasibleState struct { + evaluated, succeeded int +} + +func (s *placementFeasibleState) Clone() fwk.StateData { + return &placementFeasibleState{ + evaluated: s.evaluated, + succeeded: s.succeeded, + } +} + +func getPlacementFeasibleState(placementCycleState fwk.PodGroupCycleState) *placementFeasibleState { + state, err := placementCycleState.Read(placementFeasibleStateKey) + if err != nil { + state = &placementFeasibleState{} + placementCycleState.Write(placementFeasibleStateKey, state) + } + return state.(*placementFeasibleState) +} + +// PlacementFeasible is responsible for enforcing the gang's MinCount constraint. +// The function will only return success once the gang's MinCount is satisfied or if the pod group is not using gang scheduling policy. +// In case there are not enough remaining pods to satisfy the gang's MinCount, it returns UnschedulableAndUnresolvable which will terminate the pod group scheduling cycle early. +func (pl *GangScheduling) PlacementFeasible(ctx context.Context, placementCycleState fwk.PodGroupCycleState, podGroupInfo fwk.PodGroupInfo) *fwk.Status { + pg, err := pl.podGroupLister.PodGroups(podGroupInfo.GetNamespace()).Get(podGroupInfo.GetName()) + if err != nil { + return fwk.AsStatus(fmt.Errorf("failed to get podGroup %s to compute gang feasibility: %w", klog.KObj(podGroupInfo), err)) + } + + gangPolicy := pg.Spec.SchedulingPolicy.Gang + // This plugin only cares about pods with a Gang scheduling policy. + if gangPolicy == nil { + return nil + } + + podGroupState, err := pl.snapshotLister.PodGroupStates().Get(podGroupInfo.GetNamespace(), podGroupInfo.GetName()) + if err != nil { + return fwk.AsStatus(fmt.Errorf("failed to get podGroup state for podGroup %s to compute gang feasibility: %w", klog.KObj(pg), err)) + } + + // We need to keep track of how many pods have already been evaluated in the current PodGroup scheduling cycle. + pgState := getPlacementFeasibleState(placementCycleState) + pgState.evaluated++ + + // remaining is the number of unscheduled pods that haven't been evaluated yet in the current PodGroup scheduling cycle. + remaining := len(podGroupInfo.GetUnscheduledPods()) - pgState.evaluated + + // scheduled includes the pods that are assigned or assumed in the current PodGroup scheduling cycle. + scheduled := podGroupState.ScheduledPodsCount() + + minCount := int(gangPolicy.MinCount) + + if remaining+scheduled < minCount { + // minCount can't be satisfied because there are not enough remaining pods. + return fwk.NewStatus(fwk.UnschedulableAndUnresolvable, fmt.Sprintf("minCount (%d) cannot be satisfied: %d scheduled, %d remaining", minCount, scheduled, remaining)) + } + + if scheduled < minCount { + // minCount might be satisfied once more remaining pods are evaluated. + return fwk.NewStatus(fwk.Unschedulable, fmt.Sprintf("minCount (%d) is not yet satisfied: %d scheduled, %d remaining", minCount, scheduled, remaining)) + } + + // minCount is satisfied. + return nil +} diff --git a/pkg/scheduler/framework/plugins/gangscheduling/gangscheduling_test.go b/pkg/scheduler/framework/plugins/gangscheduling/gangscheduling_test.go index c70b96231cb..c4698f64e60 100644 --- a/pkg/scheduler/framework/plugins/gangscheduling/gangscheduling_test.go +++ b/pkg/scheduler/framework/plugins/gangscheduling/gangscheduling_test.go @@ -167,6 +167,31 @@ func (pam *podActivatorMock) Activate(_ klog.Logger, pods map[string]*v1.Pod) { } } +type mockPodGroupState struct { + fwk.PodGroupState + scheduledPodsCount int +} + +func (m *mockPodGroupState) ScheduledPodsCount() int { return m.scheduledPodsCount } + +type mockPodGroupStateLister struct { + state *mockPodGroupState + err error +} + +func (m *mockPodGroupStateLister) Get(namespace, podGroupName string) (fwk.PodGroupState, error) { + return m.state, m.err +} + +type mockSharedLister struct { + fwk.SharedLister + podGroupStateLister *mockPodGroupStateLister +} + +func (m *mockSharedLister) PodGroupStates() fwk.PodGroupStateLister { + return m.podGroupStateLister +} + func TestGangSchedulingFlow(t *testing.T) { gangPodGroup1 := st.MakePodGroup().Namespace("ns1").Name("pg1").TemplateRef("t1", "gang-wl").MinCount(3).Obj() gangPodGroup2 := st.MakePodGroup().Namespace("ns1").Name("pg2").TemplateRef("t2", "gang-wl").MinCount(4).Obj() @@ -244,17 +269,6 @@ func TestGangSchedulingFlow(t *testing.T) { wantPermitStatus: nil, wantAllowedPods: []types.UID{"p1", "p2", "p3"}, }, - { - name: "final gang pod arrives at Permit during pod group scheduling cycle", - pod: p1, - initialPods: []*v1.Pod{p2, p3, p4, p5}, - initialPodGroups: []*schedulingapi.PodGroup{gangPodGroup1, gangPodGroup2}, - podsWaitingOnPermit: []*v1.Pod{p2, p3, p4, p5}, - isDuringPodGroupSchedulingCycle: true, - wantPreEnqueueStatus: nil, - wantPermitStatus: nil, - wantAllowedPods: []types.UID{"p1", "p2", "p3"}, - }, } for _, tt := range tests { @@ -380,3 +394,262 @@ func TestGangSchedulingFlow(t *testing.T) { }) } } + +func TestPlacementFeasible(t *testing.T) { + tests := []struct { + name string + minCount int32 + unscheduledPods []*v1.Pod + podStatuses []fwk.Code + expectedStatuses []fwk.Code + initialScheduledCount int + }{ + { + name: "All pods succeed, minCount met at end", + minCount: 2, + unscheduledPods: []*v1.Pod{ + st.MakePod().Name("p1").Obj(), + st.MakePod().Name("p2").Obj(), + }, + podStatuses: []fwk.Code{ + fwk.Success, + fwk.Success, + }, + expectedStatuses: []fwk.Code{ + fwk.Unschedulable, + fwk.Success, + }, + }, + { + name: "First pod fails, minCount not satisfiable", + minCount: 3, + unscheduledPods: []*v1.Pod{ + st.MakePod().Name("p1").Obj(), + st.MakePod().Name("p2").Obj(), + st.MakePod().Name("p3").Obj(), + }, + podStatuses: []fwk.Code{ + fwk.Unschedulable, + }, + expectedStatuses: []fwk.Code{ + fwk.UnschedulableAndUnresolvable, + }, + }, + { + name: "Second pod fails, minCount not satisfiable", + minCount: 2, + unscheduledPods: []*v1.Pod{ + st.MakePod().Name("p1").Obj(), + st.MakePod().Name("p2").Obj(), + }, + podStatuses: []fwk.Code{ + fwk.Success, + fwk.Unschedulable, + }, + expectedStatuses: []fwk.Code{ + fwk.Unschedulable, + fwk.UnschedulableAndUnresolvable, + }, + }, + { + name: "Non-gang pod group ignored", + minCount: 0, // No gang policy + unscheduledPods: []*v1.Pod{st.MakePod().Name("p1").Obj()}, + podStatuses: []fwk.Code{ + fwk.Unschedulable, + }, + expectedStatuses: []fwk.Code{ + fwk.Success, + }, + }, + { + name: "More than minCount pods, all succeed", + minCount: 2, + unscheduledPods: []*v1.Pod{ + st.MakePod().Name("p1").Obj(), + st.MakePod().Name("p2").Obj(), + st.MakePod().Name("p3").Obj(), + }, + podStatuses: []fwk.Code{ + fwk.Success, + fwk.Success, + fwk.Success, + }, + expectedStatuses: []fwk.Code{ + fwk.Unschedulable, + fwk.Success, + fwk.Success, + }, + }, + { + name: "More than minCount pods, first fails", + minCount: 2, + unscheduledPods: []*v1.Pod{ + st.MakePod().Name("p1").Obj(), + st.MakePod().Name("p2").Obj(), + st.MakePod().Name("p3").Obj(), + }, + podStatuses: []fwk.Code{ + fwk.Unschedulable, + fwk.Success, + fwk.Success, + }, + expectedStatuses: []fwk.Code{ + fwk.Unschedulable, + fwk.Unschedulable, + fwk.Success, + }, + }, + { + name: "More than minCount pods, minCount not satisfiable", + minCount: 2, + unscheduledPods: []*v1.Pod{ + st.MakePod().Name("p1").Obj(), + st.MakePod().Name("p2").Obj(), + st.MakePod().Name("p3").Obj(), + }, + podStatuses: []fwk.Code{ + fwk.Unschedulable, + fwk.Unschedulable, + fwk.Success, + }, + expectedStatuses: []fwk.Code{ + fwk.Unschedulable, + fwk.UnschedulableAndUnresolvable, + fwk.UnschedulableAndUnresolvable, + }, + }, + { + name: "1 pod scheduled, 2 unscheduled pods succeed, minCount 3 met", + minCount: 3, + unscheduledPods: []*v1.Pod{ + st.MakePod().Name("p1").Obj(), + st.MakePod().Name("p2").Obj(), + }, + podStatuses: []fwk.Code{ + fwk.Success, + fwk.Success, + }, + expectedStatuses: []fwk.Code{ + fwk.Unschedulable, + fwk.Success, + }, + initialScheduledCount: 1, + }, + { + name: "minCount already met by scheduled pods", + minCount: 2, + unscheduledPods: []*v1.Pod{ + st.MakePod().Name("p1").Obj(), + }, + podStatuses: []fwk.Code{ + fwk.Unschedulable, + }, + expectedStatuses: []fwk.Code{ + fwk.Success, + }, + initialScheduledCount: 2, + }, + { + name: "1 pod scheduled, minCount 3, first unscheduled fails, not enough remaining", + minCount: 3, + unscheduledPods: []*v1.Pod{ + st.MakePod().Name("p1").Obj(), + st.MakePod().Name("p2").Obj(), + }, + podStatuses: []fwk.Code{ + fwk.Unschedulable, + }, + expectedStatuses: []fwk.Code{ + fwk.UnschedulableAndUnresolvable, + }, + initialScheduledCount: 1, + }, + { + name: "1 pod scheduled, minCount 4, first unscheduled succeeds, not enough remaining", + minCount: 4, + unscheduledPods: []*v1.Pod{ + st.MakePod().Name("p1").Obj(), + st.MakePod().Name("p2").Obj(), + }, + podStatuses: []fwk.Code{ + fwk.Success, + }, + expectedStatuses: []fwk.Code{ + fwk.UnschedulableAndUnresolvable, + }, + initialScheduledCount: 1, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + + pgName := "test-pg" + namespace := "default" + pg := st.MakePodGroup().Namespace(namespace).Name(pgName).Obj() + if tc.minCount > 0 { + pg.Spec.SchedulingPolicy.Gang = &schedulingapi.GangSchedulingPolicy{MinCount: tc.minCount} + } else { + pg.Spec.SchedulingPolicy.Basic = &schedulingapi.BasicSchedulingPolicy{} + } + + informerFactory := informers.NewSharedInformerFactory(fake.NewClientset(pg), 0) + informerFactory.Scheduling().V1alpha2().PodGroups().Informer() + informerFactory.StartWithContext(ctx) + informerFactory.WaitForCacheSyncWithContext(ctx) + + mockState := &mockPodGroupState{scheduledPodsCount: tc.initialScheduledCount} + mockLister := &mockSharedLister{ + podGroupStateLister: &mockPodGroupStateLister{state: mockState}, + } + + fh, err := frameworkruntime.NewFramework(ctx, nil, nil, + frameworkruntime.WithInformerFactory(informerFactory), + ) + if err != nil { + t.Fatalf("Failed to create framework: %v", err) + } + + p, err := New(ctx, nil, fh, feature.Features{EnableGangScheduling: true}) + if err != nil { + t.Fatalf("Failed to create plugin: %v", err) + } + pl := p.(*GangScheduling) + + // Inject the mock lister + pl.snapshotLister = mockLister + + pgInfo := &testPodGroupInfo{ + namespace: namespace, + name: pgName, + unscheduledPods: tc.unscheduledPods, + } + + cycleState := schedulerframework.NewCycleState() + + for i, code := range tc.podStatuses { + if code == fwk.Success { + mockState.scheduledPodsCount++ + } + + gotStatus := pl.PlacementFeasible(ctx, cycleState, pgInfo) + + if gotCode := gotStatus.Code(); gotCode != tc.expectedStatuses[i] { + t.Errorf("Step %d: expected status %v, got %v", i, tc.expectedStatuses[i], gotCode) + } + } + }) + } +} + +type testPodGroupInfo struct { + namespace string + name string + unscheduledPods []*v1.Pod +} + +func (t *testPodGroupInfo) GetNamespace() string { return t.namespace } +func (t *testPodGroupInfo) GetName() string { return t.name } +func (t *testPodGroupInfo) GetUnscheduledPods() []*v1.Pod { return t.unscheduledPods } diff --git a/pkg/scheduler/framework/runtime/framework.go b/pkg/scheduler/framework/runtime/framework.go index 22f686c7f54..7c6b0e6d0f6 100644 --- a/pkg/scheduler/framework/runtime/framework.go +++ b/pkg/scheduler/framework/runtime/framework.go @@ -78,6 +78,7 @@ type frameworkImpl struct { podGroupPostFilterPlugins []framework.PodGroupPostFilterPlugin placementGeneratePlugins []fwk.PlacementGeneratePlugin + placementFeasiblePlugins []framework.PlacementFeasiblePlugin placementScorePlugins []fwk.PlacementScorePlugin placementScorePluginWeight map[string]int @@ -487,6 +488,17 @@ func NewFramework(ctx context.Context, r Registry, profile *config.KubeScheduler } } + // Use GangScheduling plugin as the only PlacementFeasiblePlugin. + if utilfeature.DefaultFeatureGate.Enabled(features.GenericWorkload) { + if gs, ok := f.pluginsMap[names.GangScheduling]; ok { + if p, ok := gs.(framework.PlacementFeasiblePlugin); ok { + f.placementFeasiblePlugins = append(f.placementFeasiblePlugins, p) + } else { + logger.V(2).Info("GenericWorkload is enabled, but GangScheduling plugin does not fulfill PlacementFeasiblePlugin interface.") + } + } + } + if options.captureProfile != nil { if len(outputProfile.PluginConfig) != 0 { sort.Slice(outputProfile.PluginConfig, func(i, j int) bool { @@ -1996,6 +2008,49 @@ func (f *frameworkImpl) runPermitPlugin(ctx context.Context, pl fwk.PermitPlugin return status, timeout } +// RunPlacementFeasiblePlugins runs the set of configured Permit plugins that implement PlacementFeasible interface. +// The result will be Success if all plugins return Success. +// The only other valid statuses are UnschedulableAndUnresolvable and Unschedulable. +// If any plugin returns invalid status, the result will be Error and the remaining plugins won't be invoked. +// Otherwise, if at least 1 plugin returns UnschedulableAndUnresolvable, the remaining plugins won't be invoked and the result will be UnschdulableAndUnresolvable. +// Otherwise, if at least 1 plugin returns Unschedulable, the remaining plugins will be invoked and the result will be Unschedulable. +func (f *frameworkImpl) RunPlacementFeasiblePlugins(ctx context.Context, placementCycleState fwk.PodGroupCycleState, podGroupInfo fwk.PodGroupInfo) (status *fwk.Status) { + startTime := time.Now() + defer func() { + metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PlacementFeasible, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime)) + }() + + for _, pl := range f.placementFeasiblePlugins { + plStatus := f.runPlacementFeasiblePlugin(ctx, pl, placementCycleState, podGroupInfo) + if plStatus.IsSuccess() { + continue + } + if plStatus.Code() == fwk.Unschedulable { + status = plStatus.WithPlugin(pl.Name()) + continue + } + if plStatus.Code() == fwk.UnschedulableAndUnresolvable { + return plStatus.WithPlugin(pl.Name()) + } + if plStatus.IsError() { + return fwk.AsStatus(fmt.Errorf("running PlacementFeasible plugin: %w", plStatus.AsError())).WithPlugin(pl.Name()) + } + return fwk.AsStatus(fmt.Errorf("unexpected status from PlacementFeasible plugin: %v", plStatus.Code())).WithPlugin(pl.Name()) + } + + return status +} + +func (f *frameworkImpl) runPlacementFeasiblePlugin(ctx context.Context, pl framework.PlacementFeasiblePlugin, state fwk.PodGroupCycleState, podGroup fwk.PodGroupInfo) *fwk.Status { + if !state.ShouldRecordPluginMetrics() { + return pl.PlacementFeasible(ctx, state, podGroup) + } + startTime := time.Now() + status := pl.PlacementFeasible(ctx, state, podGroup) + f.metricsRecorder.ObservePluginDurationAsync(metrics.PlacementFeasible, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime)) + return status +} + // AddWaitingPod creates a waiting pod instance and adds it to the framework. // It takes the pluginsWaitTime map returned by the RunPermitPlugins. // Pod will remain waiting pod for the minimum duration returned by the Permit plugins. diff --git a/pkg/scheduler/framework/runtime/framework_test.go b/pkg/scheduler/framework/runtime/framework_test.go index a709cb30a1f..d11ae059b7f 100644 --- a/pkg/scheduler/framework/runtime/framework_test.go +++ b/pkg/scheduler/framework/runtime/framework_test.go @@ -228,6 +228,10 @@ func (pl *TestPlugin) PreFilter(ctx context.Context, state fwk.CycleState, p *v1 return pl.inj.PreFilterResult, fwk.NewStatus(fwk.Code(pl.inj.PreFilterStatus), injectReason) } +func (pl *TestPlugin) PlacementFeasible(ctx context.Context, state fwk.PodGroupCycleState, podGroup fwk.PodGroupInfo) *fwk.Status { + return fwk.NewStatus(fwk.Code(pl.inj.PlacementFeasibleStatus), injectReason) +} + func (pl *TestPlugin) PreFilterExtensions() fwk.PreFilterExtensions { return pl } @@ -783,6 +787,116 @@ func TestPodGroupPostFilterPlugins(t *testing.T) { } +type mockPlacementFeasiblePlugin struct { + name string + status *fwk.Status + called bool +} + +func (p *mockPlacementFeasiblePlugin) Name() string { return p.name } + +func (p *mockPlacementFeasiblePlugin) PlacementFeasible(ctx context.Context, state fwk.PodGroupCycleState, podGroup fwk.PodGroupInfo) *fwk.Status { + p.called = true + return p.status +} + +func TestRunPlacementFeasiblePlugins(t *testing.T) { + tests := []struct { + name string + plugins []*mockPlacementFeasiblePlugin + expectedStatus *fwk.Status + expectedCalled []bool + }{ + { + name: "All plugins succeed", + plugins: []*mockPlacementFeasiblePlugin{ + {name: "p1", status: nil}, + {name: "p2", status: nil}, + }, + expectedStatus: nil, + expectedCalled: []bool{true, true}, + }, + { + name: "First plugin returns Unschedulable, continues", + plugins: []*mockPlacementFeasiblePlugin{ + {name: "p1", status: fwk.NewStatus(fwk.Unschedulable, "unschedulable")}, + {name: "p2", status: nil}, + }, + expectedStatus: fwk.NewStatus(fwk.Unschedulable, "unschedulable").WithPlugin("p1"), + expectedCalled: []bool{true, true}, + }, + { + name: "First plugin returns UnschedulableAndUnresolvable, breaks", + plugins: []*mockPlacementFeasiblePlugin{ + {name: "p1", status: fwk.NewStatus(fwk.UnschedulableAndUnresolvable, "unresolvable")}, + {name: "p2", status: nil}, + }, + expectedStatus: fwk.NewStatus(fwk.UnschedulableAndUnresolvable, "unresolvable").WithPlugin("p1"), + expectedCalled: []bool{true, false}, + }, + { + name: "First plugin returns Unschedulable, second returns UnschedulableAndUnresolvable, returns unresolvable", + plugins: []*mockPlacementFeasiblePlugin{ + {name: "p1", status: fwk.NewStatus(fwk.Unschedulable, "unschedulable")}, + {name: "p2", status: fwk.NewStatus(fwk.UnschedulableAndUnresolvable, "unresolvable")}, + }, + expectedStatus: fwk.NewStatus(fwk.UnschedulableAndUnresolvable, "unresolvable").WithPlugin("p2"), + expectedCalled: []bool{true, true}, + }, + { + name: "First plugin returns Unschedulable, second returns Unschedulable, returns last unschedulable", + plugins: []*mockPlacementFeasiblePlugin{ + {name: "p1", status: fwk.NewStatus(fwk.Unschedulable, "unschedulable1")}, + {name: "p2", status: fwk.NewStatus(fwk.Unschedulable, "unschedulable2")}, + }, + expectedStatus: fwk.NewStatus(fwk.Unschedulable, "unschedulable2").WithPlugin("p2"), + expectedCalled: []bool{true, true}, + }, + { + name: "Plugin returns Error, breaks", + plugins: []*mockPlacementFeasiblePlugin{ + {name: "p1", status: fwk.NewStatus(fwk.Error, "error")}, + {name: "p2", status: nil}, + }, + expectedStatus: fwk.AsStatus(fmt.Errorf("running PlacementFeasible plugin: %w", errors.New("error"))).WithPlugin("p1"), + expectedCalled: []bool{true, false}, + }, + { + name: "Plugin returns unexpected status, breaks", + plugins: []*mockPlacementFeasiblePlugin{ + {name: "p1", status: fwk.NewStatus(fwk.Skip, "error")}, + {name: "p2", status: nil}, + }, + expectedStatus: fwk.AsStatus(fmt.Errorf("unexpected status from PlacementFeasible plugin: Skip")).WithPlugin("p1"), + expectedCalled: []bool{true, false}, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + f := &frameworkImpl{ + placementFeasiblePlugins: make([]framework.PlacementFeasiblePlugin, len(tc.plugins)), + } + for i, p := range tc.plugins { + f.placementFeasiblePlugins[i] = p + } + + status := f.RunPlacementFeasiblePlugins(ctx, framework.NewCycleState(), nil) + + if diff := cmp.Diff(tc.expectedStatus, status, statusCmpOpts...); diff != "" { + t.Errorf("Unexpected status (-want, +got):\n%s", diff) + } + + for i, p := range tc.plugins { + if p.called != tc.expectedCalled[i] { + t.Errorf("Expected plugin %s called=%v, got %v", p.name, tc.expectedCalled[i], p.called) + } + } + }) + } +} + func TestNewFrameworkMultiPointExpansion(t *testing.T) { tests := []struct { name string @@ -3557,6 +3671,14 @@ func TestRecordingMetrics(t *testing.T) { wantExtensionPoint: "PlacementScore", wantStatus: fwk.Success, }, + { + name: "PlacementFeasible - Success", + action: func(ctx context.Context, f framework.Framework) { + f.RunPlacementFeasiblePlugins(ctx, state, nil) + }, + wantExtensionPoint: "PlacementFeasible", + wantStatus: fwk.Success, + }, { name: "PreFilter - Error", @@ -3634,6 +3756,15 @@ func TestRecordingMetrics(t *testing.T) { wantExtensionPoint: "PlacementScore", wantStatus: fwk.Error, }, + { + name: "PlacementFeasible - Error", + action: func(ctx context.Context, f framework.Framework) { + f.RunPlacementFeasiblePlugins(ctx, state, nil) + }, + inject: injectedResult{PlacementFeasibleStatus: int(fwk.Error)}, + wantExtensionPoint: "PlacementFeasible", + wantStatus: fwk.Error, + }, } for _, tt := range tests { @@ -3683,6 +3814,10 @@ func TestRecordingMetrics(t *testing.T) { _ = f.Close() }() + if tt.wantExtensionPoint == "PlacementFeasible" { + f.(*frameworkImpl).placementFeasiblePlugins = []framework.PlacementFeasiblePlugin{plugin} + } + tt.action(ctx, f) // Stop the goroutine which records metrics and ensure it's stopped. @@ -4074,6 +4209,7 @@ type injectedResult struct { GeneratePlacementsResult []*fwk.Placement `json:"generatePlacementsResult,omitempty"` GeneratePlacementsStatus int `json:"generatePlacementsStatus,omitempty"` PlacementScoreStatus int `json:"placementScoreStatus,omitempty"` + PlacementFeasibleStatus int `json:"placementFeasibleStatus,omitempty"` } func setScoreRes(inj injectedResult) (int64, *fwk.Status) { diff --git a/pkg/scheduler/metrics/metrics.go b/pkg/scheduler/metrics/metrics.go index 57343c2f42e..954f7288839 100644 --- a/pkg/scheduler/metrics/metrics.go +++ b/pkg/scheduler/metrics/metrics.go @@ -65,6 +65,7 @@ var ExtensionPoints = []string{ Permit, Sign, PlacementGenerate, + PlacementFeasible, } const ( @@ -85,6 +86,7 @@ const ( Permit = "Permit" Sign = "Sign" PlacementGenerate = "PlacementGenerate" + PlacementFeasible = "PlacementFeasible" PlacementScore = "PlacementScore" PlacementScoreExtensionNormalize = "PlacementScoreExtensionNormalize" ) diff --git a/test/integration/scheduler/podgroup/podgroup_test.go b/test/integration/scheduler/podgroup/podgroup_test.go index a8af37d44e8..7de2c49a655 100644 --- a/test/integration/scheduler/podgroup/podgroup_test.go +++ b/test/integration/scheduler/podgroup/podgroup_test.go @@ -830,11 +830,12 @@ func TestPostFilterInvocationCount(t *testing.T) { } } - // 5. Verify that MockPostFilter was called exactly 3 times - // It should be called for each pod from pod group in pod group cycle + // 5. Verify that MockPostFilter was called exactly once + // It should be called for each evaluated pod from pod group in pod group cycle // but should not be called in WAP. + // Only one pod is evaluated for pod group because minCount=3 can't be satisfied with the remaining 2 pods. err = wait.PollUntilContextTimeout(testCtx.Ctx, 100*time.Millisecond, 10*time.Second, false, func(ctx context.Context) (bool, error) { - if mockPlugin.count == 3 { + if mockPlugin.count == 1 { return true, nil } return false, nil