Extract helper methods from gang scheduling plugin

This commit is contained in:
Antoni Zawodny 2026-01-23 11:11:26 +01:00
parent c2618d48c0
commit 8b39544d60
3 changed files with 204 additions and 18 deletions

View file

@ -30,6 +30,7 @@ import (
"k8s.io/klog/v2"
fwk "k8s.io/kube-scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
"k8s.io/kubernetes/pkg/scheduler/util"
)
@ -76,18 +77,13 @@ func (pl *GangScheduling) EventsToRegister(_ context.Context) ([]fwk.ClusterEven
}, nil
}
// matchingWorkloadReference returns true if two pods belong to the same workload, including their pod group and replica key.
func matchingWorkloadReference(pod1, pod2 *v1.Pod) bool {
return pod1.Spec.WorkloadRef != nil && pod2.Spec.WorkloadRef != nil && pod1.Namespace == pod2.Namespace && *pod1.Spec.WorkloadRef == *pod2.Spec.WorkloadRef
}
func (pl *GangScheduling) isSchedulableAfterPodAdded(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (fwk.QueueingHint, error) {
_, addedPod, err := util.As[*v1.Pod](oldObj, newObj)
if err != nil {
return fwk.Queue, err
}
if !matchingWorkloadReference(pod, addedPod) {
if !helper.MatchingWorkloadReference(pod, addedPod) {
logger.V(5).Info("another pod was added but it doesn't match the target pod's workload",
"pod", klog.KObj(pod), "workloadRef", pod.Spec.WorkloadRef, "addedPod", klog.KObj(addedPod), "addedWorkloadRef", pod.Spec.WorkloadRef)
return fwk.QueueSkip, nil
@ -135,7 +131,7 @@ func (pl *GangScheduling) PreEnqueue(ctx context.Context, pod *v1.Pod) *fwk.Stat
return fwk.AsStatus(fmt.Errorf("failed to get workload %s/%s", namespace, workloadRef.Name))
}
policy, ok := podGroupPolicy(workload, workloadRef.PodGroup)
policy, ok := helper.PodGroupPolicy(workload, workloadRef.PodGroup)
if !ok {
return fwk.NewStatus(fwk.UnschedulableAndUnresolvable, fmt.Sprintf("pod group %q doesn't exist for a workload %q", workloadRef.PodGroup, workload.Name))
}
@ -186,16 +182,6 @@ func (pl *GangScheduling) Unreserve(ctx context.Context, cs fwk.CycleState, pod
podGroupInfo.ForgetPod(pod.UID)
}
// podGroupPolicy is a helper to find the policy for a specific pod group name in a workload.
func podGroupPolicy(workload *schedulingapi.Workload, podGroupName string) (schedulingapi.PodGroupPolicy, bool) {
for _, podGroup := range workload.Spec.PodGroups {
if podGroup.Name == podGroupName {
return podGroup.Policy, true
}
}
return schedulingapi.PodGroupPolicy{}, false
}
// Permit forces all pods in a gang to wait at this stage. Once the number of waiting (assumed) pods
// reaches the gang's MinCount, all pods in the gang are permitted to proceed to binding simultaneously.
func (pl *GangScheduling) Permit(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) (*fwk.Status, time.Duration) {
@ -214,7 +200,7 @@ func (pl *GangScheduling) Permit(ctx context.Context, state fwk.CycleState, pod
return fwk.AsStatus(fmt.Errorf("failed to get workload %s/%s: %w", namespace, workloadRef.Name, err)), 0
}
policy, ok := podGroupPolicy(workload, workloadRef.PodGroup)
policy, ok := helper.PodGroupPolicy(workload, workloadRef.PodGroup)
if !ok {
return fwk.AsStatus(fmt.Errorf("pod group %q doesn't exist for a workload %q", workloadRef.PodGroup, workload.Name)), 0
}

View file

@ -0,0 +1,37 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package helper
import (
v1 "k8s.io/api/core/v1"
schedulingapi "k8s.io/api/scheduling/v1alpha1"
)
// MatchingWorkloadReference returns true if two pods belong to the same workload, including their pod group and replica key.
func MatchingWorkloadReference(pod1, pod2 *v1.Pod) bool {
return pod1.Spec.WorkloadRef != nil && pod2.Spec.WorkloadRef != nil && pod1.Namespace == pod2.Namespace && *pod1.Spec.WorkloadRef == *pod2.Spec.WorkloadRef
}
// PodGroupPolicy is a helper to find the policy for a specific pod group name in a workload.
func PodGroupPolicy(workload *schedulingapi.Workload, podGroupName string) (schedulingapi.PodGroupPolicy, bool) {
for _, podGroup := range workload.Spec.PodGroups {
if podGroup.Name == podGroupName {
return podGroup.Policy, true
}
}
return schedulingapi.PodGroupPolicy{}, false
}

View file

@ -0,0 +1,163 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package helper
import (
"testing"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
schedulingapi "k8s.io/api/scheduling/v1alpha1"
st "k8s.io/kubernetes/pkg/scheduler/testing"
)
func TestMatchingWorkloadReference(t *testing.T) {
workloadRef := func(name, podGroup, podGroupReplicaKey string) *v1.WorkloadReference {
return &v1.WorkloadReference{
Name: name,
PodGroup: podGroup,
PodGroupReplicaKey: podGroupReplicaKey,
}
}
testCases := []struct {
name string
pod1 *v1.Pod
pod2 *v1.Pod
expected bool
}{
{
name: "same pod with workloadRef",
pod1: st.MakePod().Name("pod1").Namespace("test").WorkloadRef(workloadRef("name", "pgName", "pgKey")).Obj(),
pod2: st.MakePod().Name("pod1").Namespace("test").WorkloadRef(workloadRef("name", "pgName", "pgKey")).Obj(),
expected: true,
},
{
name: "different pods, same workloadRef",
pod1: st.MakePod().Name("pod1").Namespace("test").WorkloadRef(workloadRef("name", "pgName", "pgKey")).Obj(),
pod2: st.MakePod().Name("pod2").Namespace("test").WorkloadRef(workloadRef("name", "pgName", "pgKey")).Obj(),
expected: true,
},
{
name: "same pod but no workloadRef",
pod1: st.MakePod().Name("pod1").Namespace("test").Obj(),
pod2: st.MakePod().Name("pod1").Namespace("test").Obj(),
expected: false,
},
{
name: "different pods, only one with workloadRef",
pod1: st.MakePod().Name("pod1").Namespace("test").WorkloadRef(workloadRef("name", "pgName", "pgKey")).Obj(),
pod2: st.MakePod().Name("pod2").Namespace("test").Obj(),
expected: false,
},
{
name: "same workloadRef but different namespaces",
pod1: st.MakePod().Name("pod1").Namespace("test1").WorkloadRef(workloadRef("name", "pgName", "pgKey")).Obj(),
pod2: st.MakePod().Name("pod2").Namespace("test2").WorkloadRef(workloadRef("name", "pgName", "pgKey")).Obj(),
expected: false,
},
{
name: "same workload but different pod group",
pod1: st.MakePod().Name("pod1").Namespace("test").WorkloadRef(workloadRef("name", "pgName1", "pgKey")).Obj(),
pod2: st.MakePod().Name("pod2").Namespace("test").WorkloadRef(workloadRef("name", "pgName2", "pgKey")).Obj(),
expected: false,
},
{
name: "same workload but different pod group replica key",
pod1: st.MakePod().Name("pod1").Namespace("test").WorkloadRef(workloadRef("name", "pgName", "pgKey1")).Obj(),
pod2: st.MakePod().Name("pod2").Namespace("test").WorkloadRef(workloadRef("name", "pgName", "pgKey2")).Obj(),
expected: false,
},
{
name: "same pod group but different workload name",
pod1: st.MakePod().Name("pod1").Namespace("test").WorkloadRef(workloadRef("name1", "pgName", "pgKey")).Obj(),
pod2: st.MakePod().Name("pod2").Namespace("test").WorkloadRef(workloadRef("name2", "pgName", "pgKey")).Obj(),
expected: false,
},
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
if got := MatchingWorkloadReference(tt.pod1, tt.pod2); got != tt.expected {
t.Errorf("MatchingWorkloadReference() = %v, want %v", got, tt.expected)
}
})
}
}
func TestPodGroupPolicy(t *testing.T) {
workload := &schedulingapi.Workload{
Spec: schedulingapi.WorkloadSpec{
PodGroups: []schedulingapi.PodGroup{
{
Name: "pg1",
Policy: schedulingapi.PodGroupPolicy{
Gang: &schedulingapi.GangSchedulingPolicy{
MinCount: 10,
},
},
},
{
Name: "pg2",
Policy: schedulingapi.PodGroupPolicy{
Basic: &schedulingapi.BasicSchedulingPolicy{},
},
},
},
},
}
testCases := []struct {
name string
podGroupName string
expectedPolicy schedulingapi.PodGroupPolicy
expectedOk bool
}{
{
name: "gang policy",
podGroupName: "pg1",
expectedPolicy: schedulingapi.PodGroupPolicy{
Gang: &schedulingapi.GangSchedulingPolicy{
MinCount: 10,
},
},
expectedOk: true,
},
{
name: "basic policy",
podGroupName: "pg2",
expectedPolicy: schedulingapi.PodGroupPolicy{
Basic: &schedulingapi.BasicSchedulingPolicy{},
},
expectedOk: true,
},
{
name: "pod group not found - return empty policy and false",
podGroupName: "pg3",
expectedPolicy: schedulingapi.PodGroupPolicy{},
expectedOk: false,
},
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
got, ok := PodGroupPolicy(workload, tt.podGroupName)
if ok != tt.expectedOk {
t.Errorf("PodGroupPolicy() ok: %v, want: %v", ok, tt.expectedOk)
}
if diff := cmp.Diff(got, tt.expectedPolicy); diff != "" {
t.Errorf("PodGroupPolicy() policy (-want,+got):\n%s", diff)
}
})
}
}