From 8b39544d60da53d01337e811687c331e598201fa Mon Sep 17 00:00:00 2001 From: Antoni Zawodny Date: Fri, 23 Jan 2026 11:11:26 +0100 Subject: [PATCH] Extract helper methods from gang scheduling plugin --- .../plugins/gangscheduling/gangscheduling.go | 22 +-- .../framework/plugins/helper/workload.go | 37 ++++ .../framework/plugins/helper/workload_test.go | 163 ++++++++++++++++++ 3 files changed, 204 insertions(+), 18 deletions(-) create mode 100644 pkg/scheduler/framework/plugins/helper/workload.go create mode 100644 pkg/scheduler/framework/plugins/helper/workload_test.go diff --git a/pkg/scheduler/framework/plugins/gangscheduling/gangscheduling.go b/pkg/scheduler/framework/plugins/gangscheduling/gangscheduling.go index df9f0dd32dc..37ace204928 100644 --- a/pkg/scheduler/framework/plugins/gangscheduling/gangscheduling.go +++ b/pkg/scheduler/framework/plugins/gangscheduling/gangscheduling.go @@ -30,6 +30,7 @@ import ( "k8s.io/klog/v2" fwk "k8s.io/kube-scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" "k8s.io/kubernetes/pkg/scheduler/util" ) @@ -76,18 +77,13 @@ func (pl *GangScheduling) EventsToRegister(_ context.Context) ([]fwk.ClusterEven }, nil } -// matchingWorkloadReference returns true if two pods belong to the same workload, including their pod group and replica key. -func matchingWorkloadReference(pod1, pod2 *v1.Pod) bool { - return pod1.Spec.WorkloadRef != nil && pod2.Spec.WorkloadRef != nil && pod1.Namespace == pod2.Namespace && *pod1.Spec.WorkloadRef == *pod2.Spec.WorkloadRef -} - func (pl *GangScheduling) isSchedulableAfterPodAdded(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (fwk.QueueingHint, error) { _, addedPod, err := util.As[*v1.Pod](oldObj, newObj) if err != nil { return fwk.Queue, err } - if !matchingWorkloadReference(pod, addedPod) { + if !helper.MatchingWorkloadReference(pod, addedPod) { logger.V(5).Info("another pod was added but it doesn't match the target pod's workload", "pod", klog.KObj(pod), "workloadRef", pod.Spec.WorkloadRef, "addedPod", klog.KObj(addedPod), "addedWorkloadRef", pod.Spec.WorkloadRef) return fwk.QueueSkip, nil @@ -135,7 +131,7 @@ func (pl *GangScheduling) PreEnqueue(ctx context.Context, pod *v1.Pod) *fwk.Stat return fwk.AsStatus(fmt.Errorf("failed to get workload %s/%s", namespace, workloadRef.Name)) } - policy, ok := podGroupPolicy(workload, workloadRef.PodGroup) + policy, ok := helper.PodGroupPolicy(workload, workloadRef.PodGroup) if !ok { return fwk.NewStatus(fwk.UnschedulableAndUnresolvable, fmt.Sprintf("pod group %q doesn't exist for a workload %q", workloadRef.PodGroup, workload.Name)) } @@ -186,16 +182,6 @@ func (pl *GangScheduling) Unreserve(ctx context.Context, cs fwk.CycleState, pod podGroupInfo.ForgetPod(pod.UID) } -// podGroupPolicy is a helper to find the policy for a specific pod group name in a workload. -func podGroupPolicy(workload *schedulingapi.Workload, podGroupName string) (schedulingapi.PodGroupPolicy, bool) { - for _, podGroup := range workload.Spec.PodGroups { - if podGroup.Name == podGroupName { - return podGroup.Policy, true - } - } - return schedulingapi.PodGroupPolicy{}, false -} - // Permit forces all pods in a gang to wait at this stage. Once the number of waiting (assumed) pods // reaches the gang's MinCount, all pods in the gang are permitted to proceed to binding simultaneously. func (pl *GangScheduling) Permit(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) (*fwk.Status, time.Duration) { @@ -214,7 +200,7 @@ func (pl *GangScheduling) Permit(ctx context.Context, state fwk.CycleState, pod return fwk.AsStatus(fmt.Errorf("failed to get workload %s/%s: %w", namespace, workloadRef.Name, err)), 0 } - policy, ok := podGroupPolicy(workload, workloadRef.PodGroup) + policy, ok := helper.PodGroupPolicy(workload, workloadRef.PodGroup) if !ok { return fwk.AsStatus(fmt.Errorf("pod group %q doesn't exist for a workload %q", workloadRef.PodGroup, workload.Name)), 0 } diff --git a/pkg/scheduler/framework/plugins/helper/workload.go b/pkg/scheduler/framework/plugins/helper/workload.go new file mode 100644 index 00000000000..2e490d66244 --- /dev/null +++ b/pkg/scheduler/framework/plugins/helper/workload.go @@ -0,0 +1,37 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package helper + +import ( + v1 "k8s.io/api/core/v1" + schedulingapi "k8s.io/api/scheduling/v1alpha1" +) + +// MatchingWorkloadReference returns true if two pods belong to the same workload, including their pod group and replica key. +func MatchingWorkloadReference(pod1, pod2 *v1.Pod) bool { + return pod1.Spec.WorkloadRef != nil && pod2.Spec.WorkloadRef != nil && pod1.Namespace == pod2.Namespace && *pod1.Spec.WorkloadRef == *pod2.Spec.WorkloadRef +} + +// PodGroupPolicy is a helper to find the policy for a specific pod group name in a workload. +func PodGroupPolicy(workload *schedulingapi.Workload, podGroupName string) (schedulingapi.PodGroupPolicy, bool) { + for _, podGroup := range workload.Spec.PodGroups { + if podGroup.Name == podGroupName { + return podGroup.Policy, true + } + } + return schedulingapi.PodGroupPolicy{}, false +} diff --git a/pkg/scheduler/framework/plugins/helper/workload_test.go b/pkg/scheduler/framework/plugins/helper/workload_test.go new file mode 100644 index 00000000000..89181b56f50 --- /dev/null +++ b/pkg/scheduler/framework/plugins/helper/workload_test.go @@ -0,0 +1,163 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package helper + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + v1 "k8s.io/api/core/v1" + schedulingapi "k8s.io/api/scheduling/v1alpha1" + st "k8s.io/kubernetes/pkg/scheduler/testing" +) + +func TestMatchingWorkloadReference(t *testing.T) { + workloadRef := func(name, podGroup, podGroupReplicaKey string) *v1.WorkloadReference { + return &v1.WorkloadReference{ + Name: name, + PodGroup: podGroup, + PodGroupReplicaKey: podGroupReplicaKey, + } + } + testCases := []struct { + name string + pod1 *v1.Pod + pod2 *v1.Pod + expected bool + }{ + { + name: "same pod with workloadRef", + pod1: st.MakePod().Name("pod1").Namespace("test").WorkloadRef(workloadRef("name", "pgName", "pgKey")).Obj(), + pod2: st.MakePod().Name("pod1").Namespace("test").WorkloadRef(workloadRef("name", "pgName", "pgKey")).Obj(), + expected: true, + }, + { + name: "different pods, same workloadRef", + pod1: st.MakePod().Name("pod1").Namespace("test").WorkloadRef(workloadRef("name", "pgName", "pgKey")).Obj(), + pod2: st.MakePod().Name("pod2").Namespace("test").WorkloadRef(workloadRef("name", "pgName", "pgKey")).Obj(), + expected: true, + }, + { + name: "same pod but no workloadRef", + pod1: st.MakePod().Name("pod1").Namespace("test").Obj(), + pod2: st.MakePod().Name("pod1").Namespace("test").Obj(), + expected: false, + }, + { + name: "different pods, only one with workloadRef", + pod1: st.MakePod().Name("pod1").Namespace("test").WorkloadRef(workloadRef("name", "pgName", "pgKey")).Obj(), + pod2: st.MakePod().Name("pod2").Namespace("test").Obj(), + expected: false, + }, + { + name: "same workloadRef but different namespaces", + pod1: st.MakePod().Name("pod1").Namespace("test1").WorkloadRef(workloadRef("name", "pgName", "pgKey")).Obj(), + pod2: st.MakePod().Name("pod2").Namespace("test2").WorkloadRef(workloadRef("name", "pgName", "pgKey")).Obj(), + expected: false, + }, + { + name: "same workload but different pod group", + pod1: st.MakePod().Name("pod1").Namespace("test").WorkloadRef(workloadRef("name", "pgName1", "pgKey")).Obj(), + pod2: st.MakePod().Name("pod2").Namespace("test").WorkloadRef(workloadRef("name", "pgName2", "pgKey")).Obj(), + expected: false, + }, + { + name: "same workload but different pod group replica key", + pod1: st.MakePod().Name("pod1").Namespace("test").WorkloadRef(workloadRef("name", "pgName", "pgKey1")).Obj(), + pod2: st.MakePod().Name("pod2").Namespace("test").WorkloadRef(workloadRef("name", "pgName", "pgKey2")).Obj(), + expected: false, + }, + { + name: "same pod group but different workload name", + pod1: st.MakePod().Name("pod1").Namespace("test").WorkloadRef(workloadRef("name1", "pgName", "pgKey")).Obj(), + pod2: st.MakePod().Name("pod2").Namespace("test").WorkloadRef(workloadRef("name2", "pgName", "pgKey")).Obj(), + expected: false, + }, + } + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + if got := MatchingWorkloadReference(tt.pod1, tt.pod2); got != tt.expected { + t.Errorf("MatchingWorkloadReference() = %v, want %v", got, tt.expected) + } + }) + } +} + +func TestPodGroupPolicy(t *testing.T) { + workload := &schedulingapi.Workload{ + Spec: schedulingapi.WorkloadSpec{ + PodGroups: []schedulingapi.PodGroup{ + { + Name: "pg1", + Policy: schedulingapi.PodGroupPolicy{ + Gang: &schedulingapi.GangSchedulingPolicy{ + MinCount: 10, + }, + }, + }, + { + Name: "pg2", + Policy: schedulingapi.PodGroupPolicy{ + Basic: &schedulingapi.BasicSchedulingPolicy{}, + }, + }, + }, + }, + } + testCases := []struct { + name string + podGroupName string + expectedPolicy schedulingapi.PodGroupPolicy + expectedOk bool + }{ + { + name: "gang policy", + podGroupName: "pg1", + expectedPolicy: schedulingapi.PodGroupPolicy{ + Gang: &schedulingapi.GangSchedulingPolicy{ + MinCount: 10, + }, + }, + expectedOk: true, + }, + { + name: "basic policy", + podGroupName: "pg2", + expectedPolicy: schedulingapi.PodGroupPolicy{ + Basic: &schedulingapi.BasicSchedulingPolicy{}, + }, + expectedOk: true, + }, + { + name: "pod group not found - return empty policy and false", + podGroupName: "pg3", + expectedPolicy: schedulingapi.PodGroupPolicy{}, + expectedOk: false, + }, + } + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + got, ok := PodGroupPolicy(workload, tt.podGroupName) + if ok != tt.expectedOk { + t.Errorf("PodGroupPolicy() ok: %v, want: %v", ok, tt.expectedOk) + } + if diff := cmp.Diff(got, tt.expectedPolicy); diff != "" { + t.Errorf("PodGroupPolicy() policy (-want,+got):\n%s", diff) + } + }) + } +}