From 5ea4532969c3fe6dd04a3f8c9aeba7e44da9e035 Mon Sep 17 00:00:00 2001 From: vshkrabkov <32546211+vshkrabkov@users.noreply.github.com> Date: Wed, 18 Mar 2026 18:50:57 +0100 Subject: [PATCH] Add PodGroupPodsCount placement score scheduler plugin (#137488) * placement score plugin, grounded on count of scheduled pods and pods for placement * align with v1alpha2 api changes * addressing review comments * plugin renaming --- .../apis/config/v1/default_plugins.go | 7 +- .../apis/config/v1/default_plugins_test.go | 35 ++ .../framework/plugins/names/names.go | 1 + .../podgrouppodscount/podgroup_pods_count.go | 87 +++++ .../podgroup_pods_count_test.go | 302 ++++++++++++++++++ pkg/scheduler/framework/plugins/registry.go | 2 + 6 files changed, 433 insertions(+), 1 deletion(-) create mode 100644 pkg/scheduler/framework/plugins/podgrouppodscount/podgroup_pods_count.go create mode 100644 pkg/scheduler/framework/plugins/podgrouppodscount/podgroup_pods_count_test.go diff --git a/pkg/scheduler/apis/config/v1/default_plugins.go b/pkg/scheduler/apis/config/v1/default_plugins.go index 4cd8f652500..9475cdfd7b6 100644 --- a/pkg/scheduler/apis/config/v1/default_plugins.go +++ b/pkg/scheduler/apis/config/v1/default_plugins.go @@ -68,7 +68,7 @@ func applyFeatureGates(config *v1.Plugins) { applyGangScheduling(config) } if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareWorkloadScheduling) { - config.MultiPoint.Enabled = append(config.MultiPoint.Enabled, v1.Plugin{Name: names.TopologyPlacementGenerator}) + applyTopologyAwareWorkloadScheduling(config) } } @@ -94,6 +94,11 @@ func applyGangScheduling(config *v1.Plugins) { config.MultiPoint.Enabled = append(config.MultiPoint.Enabled, v1.Plugin{Name: names.GangScheduling}) } +func applyTopologyAwareWorkloadScheduling(config *v1.Plugins) { + config.MultiPoint.Enabled = append(config.MultiPoint.Enabled, v1.Plugin{Name: names.TopologyPlacementGenerator}) + config.MultiPoint.Enabled = append(config.MultiPoint.Enabled, v1.Plugin{Name: names.PodGroupPodsCount, Weight: ptr.To[int32](1)}) +} + // mergePlugins merges the custom set into the given default one, handling disabled sets. func mergePlugins(logger klog.Logger, defaultPlugins, customPlugins *v1.Plugins) *v1.Plugins { if customPlugins == nil { diff --git a/pkg/scheduler/apis/config/v1/default_plugins_test.go b/pkg/scheduler/apis/config/v1/default_plugins_test.go index 4c23683adaf..35b668fdfba 100644 --- a/pkg/scheduler/apis/config/v1/default_plugins_test.go +++ b/pkg/scheduler/apis/config/v1/default_plugins_test.go @@ -197,6 +197,41 @@ func TestApplyFeatureGates(t *testing.T) { }, }, }, + { + name: "Feature gate TopologyAwareWorkloadScheduling enabled", + features: map[featuregate.Feature]bool{ + features.GenericWorkload: true, + features.TopologyAwareWorkloadScheduling: true, + }, + wantConfig: &v1.Plugins{ + MultiPoint: v1.PluginSet{ + Enabled: []v1.Plugin{ + {Name: names.SchedulingGates}, + {Name: names.PrioritySort}, + {Name: names.NodeUnschedulable}, + {Name: names.NodeName}, + {Name: names.TaintToleration, Weight: ptr.To[int32](3)}, + {Name: names.NodeAffinity, Weight: ptr.To[int32](2)}, + {Name: names.NodePorts}, + {Name: names.NodeResourcesFit, Weight: ptr.To[int32](1)}, + {Name: names.VolumeRestrictions}, + {Name: names.NodeVolumeLimits}, + {Name: names.VolumeBinding}, + {Name: names.VolumeZone}, + {Name: names.PodTopologySpread, Weight: ptr.To[int32](2)}, + {Name: names.InterPodAffinity, Weight: ptr.To[int32](2)}, + {Name: names.DynamicResources, Weight: ptr.To[int32](2)}, + {Name: names.DefaultPreemption}, + {Name: names.NodeResourcesBalancedAllocation, Weight: ptr.To[int32](1)}, + {Name: names.ImageLocality, Weight: ptr.To[int32](1)}, + {Name: names.DefaultBinder}, + {Name: names.NodeDeclaredFeatures}, + {Name: names.TopologyPlacementGenerator}, + {Name: names.PodGroupPodsCount, Weight: ptr.To[int32](1)}, + }, + }, + }, + }, } for _, test := range tests { diff --git a/pkg/scheduler/framework/plugins/names/names.go b/pkg/scheduler/framework/plugins/names/names.go index 86b764c95a2..e63efba7017 100644 --- a/pkg/scheduler/framework/plugins/names/names.go +++ b/pkg/scheduler/framework/plugins/names/names.go @@ -39,4 +39,5 @@ const ( VolumeRestrictions = "VolumeRestrictions" VolumeZone = "VolumeZone" TopologyPlacementGenerator = "TopologyPlacementGenerator" + PodGroupPodsCount = "PodGroupPodsCount" ) diff --git a/pkg/scheduler/framework/plugins/podgrouppodscount/podgroup_pods_count.go b/pkg/scheduler/framework/plugins/podgrouppodscount/podgroup_pods_count.go new file mode 100644 index 00000000000..143e66d7636 --- /dev/null +++ b/pkg/scheduler/framework/plugins/podgrouppodscount/podgroup_pods_count.go @@ -0,0 +1,87 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podgrouppodscount + +import ( + "context" + "errors" + + "k8s.io/apimachinery/pkg/runtime" + fwk "k8s.io/kube-scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" +) + +// PodGroupPodsCount is a placement score plugin that favors placements that can accommodate more pods from the considered PodGroup. +type PodGroupPodsCount struct { + handle fwk.Handle +} + +var _ fwk.PlacementScorePlugin = &PodGroupPodsCount{} + +const Name = names.PodGroupPodsCount + +// New initializes a new plugin and returns it. +func New(_ context.Context, _ runtime.Object, h fwk.Handle, _ feature.Features) (fwk.Plugin, error) { + return &PodGroupPodsCount{handle: h}, nil +} + +// Name returns name of the plugin. It is used in logs, etc. +func (pl *PodGroupPodsCount) Name() string { + return Name +} + +// ScorePlacement calculates a score for a given Placement. +// Both scheduled (assumed/assigned) pods and the proposed assignments are taken into consideration +// when computing the score. This ensures that the relative difference between choices is reduced, +// and small changes to the total count result in small changes to the score. +func (pl *PodGroupPodsCount) ScorePlacement(ctx context.Context, state fwk.PodGroupCycleState, podGroup fwk.PodGroupInfo, placement *fwk.PodGroupAssignments) (int64, *fwk.Status) { + pgState, err := pl.handle.SnapshotSharedLister().PodGroupStates().Get(podGroup.GetNamespace(), podGroup.GetName()) + if err != nil { + return 0, fwk.AsStatus(err) + } + + return int64(pgState.ScheduledPodsCount() + len(placement.ProposedAssignments)), nil +} + +// PlacementScoreExtensions returns a PlacementScoreExtensions interface if it implements one, or nil if does not. +// PodGroupPodsCount implements this interface. +func (pl *PodGroupPodsCount) PlacementScoreExtensions() fwk.PlacementScoreExtensions { + return pl +} + +// NormalizePlacementScore normalizes the scores to a range of [MinScore, MaxScore]. +// The normalization is based on the maximum count among all candidate placements. +// We purposely do not consider MinCount (the minimum pods required for the group) during normalization +// to avoid large gaps in scores when there are minimal differences in pod counts. +func (pl *PodGroupPodsCount) NormalizePlacementScore(ctx context.Context, state fwk.PodGroupCycleState, podGroup fwk.PodGroupInfo, scores []fwk.PlacementScore) *fwk.Status { + maxCount := int64(0) + + for _, score := range scores { + maxCount = max(maxCount, score.Score) + } + + if maxCount == 0 { + return fwk.AsStatus(errors.New("no pods from pod group are assigned to any of the candidate placements")) + } + + for i := range scores { + scores[i].Score = fwk.MinScore + scores[i].Score*(fwk.MaxScore-fwk.MinScore)/maxCount + } + + return nil +} diff --git a/pkg/scheduler/framework/plugins/podgrouppodscount/podgroup_pods_count_test.go b/pkg/scheduler/framework/plugins/podgrouppodscount/podgroup_pods_count_test.go new file mode 100644 index 00000000000..8df2aeebf32 --- /dev/null +++ b/pkg/scheduler/framework/plugins/podgrouppodscount/podgroup_pods_count_test.go @@ -0,0 +1,302 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podgrouppodscount + +import ( + "context" + "testing" + + "github.com/google/go-cmp/cmp" + v1 "k8s.io/api/core/v1" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/klog/v2/ktesting" + fwk "k8s.io/kube-scheduler/framework" + "k8s.io/kubernetes/pkg/features" + internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache" + "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" + frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" + "k8s.io/kubernetes/pkg/scheduler/metrics" + st "k8s.io/kubernetes/pkg/scheduler/testing" +) + +func init() { + metrics.Register() +} + +type mockProposedAssignment struct { + nodeName string + pod *v1.Pod +} + +var _ fwk.ProposedAssignment = &mockProposedAssignment{} + +func (pa *mockProposedAssignment) GetNodeName() string { + return pa.nodeName +} + +func (pa *mockProposedAssignment) GetPod() *v1.Pod { + return pa.pod +} + +func TestScorePlacement(t *testing.T) { + podGroupName := "pg1" + + createPod := func(podName, podGroupName, nodeName string) *v1.Pod { + return st.MakePod().Name(podName).Namespace("default").UID(podName).PodGroupName(podGroupName).Node(nodeName).Obj() + } + + createPodWithoutNode := func(podName, podGroupName string) *v1.Pod { + return createPod(podName, podGroupName, "") + } + + proposedAssignments := []fwk.ProposedAssignment{ + &mockProposedAssignment{ + nodeName: "node1", + pod: createPodWithoutNode("proposed-pod-1", podGroupName), + }, + &mockProposedAssignment{ + nodeName: "node2", + pod: createPodWithoutNode("proposed-pod-2", podGroupName), + }, + } + + tests := []struct { + name string + pod *v1.Pod + assignedPods []*v1.Pod // Pods to be added to the snapshot + assumedPods []*v1.Pod // Pods to be assumed in the snapshot + placement *fwk.PodGroupAssignments + expectedScore int64 + }{ + { + name: "existing assigned and assumed pods", + pod: createPodWithoutNode("p-new", podGroupName), + assignedPods: []*v1.Pod{ + // Assigned pods + createPod("p2", podGroupName, "node2"), + createPod("p3", podGroupName, "node3"), + }, + assumedPods: []*v1.Pod{ + // Assumed pod + createPod("p1", podGroupName, "node1"), + }, + placement: &fwk.PodGroupAssignments{ + ProposedAssignments: proposedAssignments, + }, + expectedScore: 5, // 1 assumed + 2 assigned + 2 proposed = 5 + }, + { + name: "no assumed pods", + pod: createPodWithoutNode("p-new", podGroupName), + assignedPods: []*v1.Pod{ + createPod("p1", podGroupName, "node1"), + }, + placement: &fwk.PodGroupAssignments{ + ProposedAssignments: proposedAssignments, + }, + expectedScore: 3, // 1 assigned + 2 proposed = 3 + }, + { + name: "no assigned pods", + pod: createPodWithoutNode("p-new", podGroupName), + assumedPods: []*v1.Pod{ + // Assumed pod + createPod("p1", podGroupName, "node1"), + }, + placement: &fwk.PodGroupAssignments{ + ProposedAssignments: proposedAssignments, + }, + expectedScore: 3, // 1 assumed + 2 proposed = 3 + }, + { + name: "no assigned pods, no assumed pods", + pod: createPodWithoutNode("p-new", podGroupName), + placement: &fwk.PodGroupAssignments{ + ProposedAssignments: proposedAssignments, + }, + expectedScore: 2, // 2 proposed + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Enable GenericWorkload feature gate to populate PodGroupState in cache + featuregatetesting.SetFeatureGatesDuringTest(t, utilfeature.DefaultFeatureGate, featuregatetesting.FeatureOverrides{ + features.GenericWorkload: true, + features.TopologyAwareWorkloadScheduling: true, + }) + + logger, ctx := ktesting.NewTestContext(t) + + // Setup cache, snapshot and framework + snapshot := internalcache.NewEmptySnapshot() + cache := internalcache.New(ctx, nil, true) + informerFactory := informers.NewSharedInformerFactory(fake.NewClientset(), 0) + + fh, err := frameworkruntime.NewFramework(ctx, nil, nil, + frameworkruntime.WithInformerFactory(informerFactory), + frameworkruntime.WithSnapshotSharedLister(snapshot), + ) + if err != nil { + t.Fatalf("Failed to create framework: %v", err) + } + + // Add assigned pods to cache + for _, p := range tt.assignedPods { + if err := cache.AddPod(logger, p); err != nil { + t.Fatalf("Failed to add pod %v: %v", p.Name, err) + } + } + + // Add assumed pods to cache + for _, p := range tt.assumedPods { + cache.AddPodGroupMember(p) + if err := cache.AssumePod(logger, p); err != nil { + t.Fatalf("Failed to assume pod %v: %v", p.Name, err) + } + } + // Add proposed pods to cache + for _, assignment := range tt.placement.ProposedAssignments { + cache.AddPodGroupMember(assignment.GetPod()) + } + + // Update snapshot + if err := cache.UpdateSnapshot(logger, snapshot); err != nil { + t.Fatalf("Failed to update snapshot: %v", err) + } + + // Create the plugin + plugin, err := New(ctx, nil, fh, feature.Features{}) + if err != nil { + t.Fatalf("Failed to create plugin: %v", err) + } + pl := plugin.(*PodGroupPodsCount) + + // Construct PodGroupInfo for the test pod + pgInfo := &framework.PodGroupInfo{ + Namespace: tt.pod.Namespace, + Name: *tt.pod.Spec.SchedulingGroup.PodGroupName, + } + + // Run ScorePlacement + score, status := pl.ScorePlacement(ctx, nil, pgInfo, tt.placement) + if !status.IsSuccess() { + t.Errorf("ScorePlacement failed: %v", status.Message()) + } + if score != tt.expectedScore { + t.Errorf("Expected score %d, got %d", tt.expectedScore, score) + } + }) + } +} + +func TestNormalizePlacementScore(t *testing.T) { + tests := []struct { + name string + scores []fwk.PlacementScore + expectedNormalizedScores []fwk.PlacementScore + expectedError string + }{ + { + name: "distinct scores", + scores: []fwk.PlacementScore{ + {Score: 10}, + {Score: 50}, + {Score: 110}, + }, + // Normalized score is calculated as: score * (MaxScore - MinScore) / maxCount. + // With MinScore=0, MaxScore=100, and maxCount=110 (using integer division): + // 10 * 100 / 110 = 9 + // 50 * 100 / 110 = 45 + // 110 * 100 / 110 = 100 + expectedNormalizedScores: []fwk.PlacementScore{ + {Score: 9}, + {Score: 45}, + {Score: 100}, + }, + }, + { + name: "equal scores", + scores: []fwk.PlacementScore{ + {Score: 50}, + {Score: 50}, + }, + expectedNormalizedScores: []fwk.PlacementScore{ + {Score: 100}, + {Score: 100}, + }, + }, + { + name: "single score", + scores: []fwk.PlacementScore{ + {Score: 50}, + }, + expectedNormalizedScores: []fwk.PlacementScore{ + {Score: 100}, + }, + }, + { + name: "some minimal score that is far from a group of scores located closely", // to test that normalization will not distribute it evenly + scores: []fwk.PlacementScore{ + {Score: 11}, + {Score: 100}, + {Score: 101}, + {Score: 102}, + }, + expectedNormalizedScores: []fwk.PlacementScore{ + {Score: 10}, + {Score: 98}, + {Score: 99}, + {Score: 100}, + }, + }, + { + name: "zero scores", + scores: []fwk.PlacementScore{{Score: 0}, {Score: 0}}, + expectedError: `no pods from pod group are assigned to any of the candidate placements`, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + pl := &PodGroupPodsCount{} + pgInfo := &framework.PodGroupInfo{Name: "pg1"} + status := pl.NormalizePlacementScore(context.Background(), nil, pgInfo, tt.scores) + if tt.expectedError != "" { + if status.IsSuccess() { + t.Fatal("Expected error, but got success") + } + if tt.expectedError != status.Message() { + t.Errorf("Unexpected error message. Want %s\n, got %s", tt.expectedError, status.Message()) + } + return + } + + if !status.IsSuccess() { + t.Errorf("NormalizePlacementScore failed: %v", status.Message()) + } + + if diff := cmp.Diff(tt.expectedNormalizedScores, tt.scores); diff != "" { + t.Errorf("Unexpected scores (-want, +got):\n%s", diff) + } + }) + } +} diff --git a/pkg/scheduler/framework/plugins/registry.go b/pkg/scheduler/framework/plugins/registry.go index 175300781e5..c09c34919c8 100644 --- a/pkg/scheduler/framework/plugins/registry.go +++ b/pkg/scheduler/framework/plugins/registry.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeunschedulable" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/podgrouppodscount" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/schedulinggates" @@ -71,6 +72,7 @@ func NewInTreeRegistry() runtime.Registry { schedulinggates.Name: runtime.FactoryAdapter(fts, schedulinggates.New), gangscheduling.Name: runtime.FactoryAdapter(fts, gangscheduling.New), topologyaware.Name: runtime.FactoryAdapter(fts, topologyaware.New), + podgrouppodscount.Name: runtime.FactoryAdapter(fts, podgrouppodscount.New), } return registry