Merge pull request #136457 from tosi3k/workload-helper

Extract helper methods from gang scheduling plugin
This commit is contained in:
Kubernetes Prow Robot 2026-01-26 20:01:51 +05:30 committed by GitHub
commit 584add12b6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 204 additions and 18 deletions

View file

@ -30,6 +30,7 @@ import (
"k8s.io/klog/v2"
fwk "k8s.io/kube-scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
"k8s.io/kubernetes/pkg/scheduler/util"
)
@ -76,18 +77,13 @@ func (pl *GangScheduling) EventsToRegister(_ context.Context) ([]fwk.ClusterEven
}, nil
}
// matchingWorkloadReference returns true if two pods belong to the same workload, including their pod group and replica key.
func matchingWorkloadReference(pod1, pod2 *v1.Pod) bool {
return pod1.Spec.WorkloadRef != nil && pod2.Spec.WorkloadRef != nil && pod1.Namespace == pod2.Namespace && *pod1.Spec.WorkloadRef == *pod2.Spec.WorkloadRef
}
func (pl *GangScheduling) isSchedulableAfterPodAdded(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (fwk.QueueingHint, error) {
_, addedPod, err := util.As[*v1.Pod](oldObj, newObj)
if err != nil {
return fwk.Queue, err
}
if !matchingWorkloadReference(pod, addedPod) {
if !helper.MatchingWorkloadReference(pod, addedPod) {
logger.V(5).Info("another pod was added but it doesn't match the target pod's workload",
"pod", klog.KObj(pod), "workloadRef", pod.Spec.WorkloadRef, "addedPod", klog.KObj(addedPod), "addedWorkloadRef", pod.Spec.WorkloadRef)
return fwk.QueueSkip, nil
@ -135,7 +131,7 @@ func (pl *GangScheduling) PreEnqueue(ctx context.Context, pod *v1.Pod) *fwk.Stat
return fwk.AsStatus(fmt.Errorf("failed to get workload %s/%s", namespace, workloadRef.Name))
}
policy, ok := podGroupPolicy(workload, workloadRef.PodGroup)
policy, ok := helper.PodGroupPolicy(workload, workloadRef.PodGroup)
if !ok {
return fwk.NewStatus(fwk.UnschedulableAndUnresolvable, fmt.Sprintf("pod group %q doesn't exist for a workload %q", workloadRef.PodGroup, workload.Name))
}
@ -186,16 +182,6 @@ func (pl *GangScheduling) Unreserve(ctx context.Context, cs fwk.CycleState, pod
podGroupState.ForgetPod(pod.UID)
}
// podGroupPolicy is a helper to find the policy for a specific pod group name in a workload.
func podGroupPolicy(workload *schedulingapi.Workload, podGroupName string) (schedulingapi.PodGroupPolicy, bool) {
for _, podGroup := range workload.Spec.PodGroups {
if podGroup.Name == podGroupName {
return podGroup.Policy, true
}
}
return schedulingapi.PodGroupPolicy{}, false
}
// Permit forces all pods in a gang to wait at this stage. Once the number of waiting (assumed) pods
// reaches the gang's MinCount, all pods in the gang are permitted to proceed to binding simultaneously.
func (pl *GangScheduling) Permit(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) (*fwk.Status, time.Duration) {
@ -214,7 +200,7 @@ func (pl *GangScheduling) Permit(ctx context.Context, state fwk.CycleState, pod
return fwk.AsStatus(fmt.Errorf("failed to get workload %s/%s: %w", namespace, workloadRef.Name, err)), 0
}
policy, ok := podGroupPolicy(workload, workloadRef.PodGroup)
policy, ok := helper.PodGroupPolicy(workload, workloadRef.PodGroup)
if !ok {
return fwk.AsStatus(fmt.Errorf("pod group %q doesn't exist for a workload %q", workloadRef.PodGroup, workload.Name)), 0
}

View file

@ -0,0 +1,37 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package helper
import (
v1 "k8s.io/api/core/v1"
schedulingapi "k8s.io/api/scheduling/v1alpha1"
)
// MatchingWorkloadReference returns true if two pods belong to the same workload, including their pod group and replica key.
func MatchingWorkloadReference(pod1, pod2 *v1.Pod) bool {
return pod1.Spec.WorkloadRef != nil && pod2.Spec.WorkloadRef != nil && pod1.Namespace == pod2.Namespace && *pod1.Spec.WorkloadRef == *pod2.Spec.WorkloadRef
}
// PodGroupPolicy is a helper to find the policy for a specific pod group name in a workload.
func PodGroupPolicy(workload *schedulingapi.Workload, podGroupName string) (schedulingapi.PodGroupPolicy, bool) {
for _, podGroup := range workload.Spec.PodGroups {
if podGroup.Name == podGroupName {
return podGroup.Policy, true
}
}
return schedulingapi.PodGroupPolicy{}, false
}

View file

@ -0,0 +1,163 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package helper
import (
"testing"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
schedulingapi "k8s.io/api/scheduling/v1alpha1"
st "k8s.io/kubernetes/pkg/scheduler/testing"
)
func TestMatchingWorkloadReference(t *testing.T) {
workloadRef := func(name, podGroup, podGroupReplicaKey string) *v1.WorkloadReference {
return &v1.WorkloadReference{
Name: name,
PodGroup: podGroup,
PodGroupReplicaKey: podGroupReplicaKey,
}
}
testCases := []struct {
name string
pod1 *v1.Pod
pod2 *v1.Pod
expected bool
}{
{
name: "same pod with workloadRef",
pod1: st.MakePod().Name("pod1").Namespace("test").WorkloadRef(workloadRef("name", "pgName", "pgKey")).Obj(),
pod2: st.MakePod().Name("pod1").Namespace("test").WorkloadRef(workloadRef("name", "pgName", "pgKey")).Obj(),
expected: true,
},
{
name: "different pods, same workloadRef",
pod1: st.MakePod().Name("pod1").Namespace("test").WorkloadRef(workloadRef("name", "pgName", "pgKey")).Obj(),
pod2: st.MakePod().Name("pod2").Namespace("test").WorkloadRef(workloadRef("name", "pgName", "pgKey")).Obj(),
expected: true,
},
{
name: "same pod but no workloadRef",
pod1: st.MakePod().Name("pod1").Namespace("test").Obj(),
pod2: st.MakePod().Name("pod1").Namespace("test").Obj(),
expected: false,
},
{
name: "different pods, only one with workloadRef",
pod1: st.MakePod().Name("pod1").Namespace("test").WorkloadRef(workloadRef("name", "pgName", "pgKey")).Obj(),
pod2: st.MakePod().Name("pod2").Namespace("test").Obj(),
expected: false,
},
{
name: "same workloadRef but different namespaces",
pod1: st.MakePod().Name("pod1").Namespace("test1").WorkloadRef(workloadRef("name", "pgName", "pgKey")).Obj(),
pod2: st.MakePod().Name("pod2").Namespace("test2").WorkloadRef(workloadRef("name", "pgName", "pgKey")).Obj(),
expected: false,
},
{
name: "same workload but different pod group",
pod1: st.MakePod().Name("pod1").Namespace("test").WorkloadRef(workloadRef("name", "pgName1", "pgKey")).Obj(),
pod2: st.MakePod().Name("pod2").Namespace("test").WorkloadRef(workloadRef("name", "pgName2", "pgKey")).Obj(),
expected: false,
},
{
name: "same workload but different pod group replica key",
pod1: st.MakePod().Name("pod1").Namespace("test").WorkloadRef(workloadRef("name", "pgName", "pgKey1")).Obj(),
pod2: st.MakePod().Name("pod2").Namespace("test").WorkloadRef(workloadRef("name", "pgName", "pgKey2")).Obj(),
expected: false,
},
{
name: "same pod group but different workload name",
pod1: st.MakePod().Name("pod1").Namespace("test").WorkloadRef(workloadRef("name1", "pgName", "pgKey")).Obj(),
pod2: st.MakePod().Name("pod2").Namespace("test").WorkloadRef(workloadRef("name2", "pgName", "pgKey")).Obj(),
expected: false,
},
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
if got := MatchingWorkloadReference(tt.pod1, tt.pod2); got != tt.expected {
t.Errorf("MatchingWorkloadReference() = %v, want %v", got, tt.expected)
}
})
}
}
func TestPodGroupPolicy(t *testing.T) {
workload := &schedulingapi.Workload{
Spec: schedulingapi.WorkloadSpec{
PodGroups: []schedulingapi.PodGroup{
{
Name: "pg1",
Policy: schedulingapi.PodGroupPolicy{
Gang: &schedulingapi.GangSchedulingPolicy{
MinCount: 10,
},
},
},
{
Name: "pg2",
Policy: schedulingapi.PodGroupPolicy{
Basic: &schedulingapi.BasicSchedulingPolicy{},
},
},
},
},
}
testCases := []struct {
name string
podGroupName string
expectedPolicy schedulingapi.PodGroupPolicy
expectedOk bool
}{
{
name: "gang policy",
podGroupName: "pg1",
expectedPolicy: schedulingapi.PodGroupPolicy{
Gang: &schedulingapi.GangSchedulingPolicy{
MinCount: 10,
},
},
expectedOk: true,
},
{
name: "basic policy",
podGroupName: "pg2",
expectedPolicy: schedulingapi.PodGroupPolicy{
Basic: &schedulingapi.BasicSchedulingPolicy{},
},
expectedOk: true,
},
{
name: "pod group not found - return empty policy and false",
podGroupName: "pg3",
expectedPolicy: schedulingapi.PodGroupPolicy{},
expectedOk: false,
},
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
got, ok := PodGroupPolicy(workload, tt.podGroupName)
if ok != tt.expectedOk {
t.Errorf("PodGroupPolicy() ok: %v, want: %v", ok, tt.expectedOk)
}
if diff := cmp.Diff(got, tt.expectedPolicy); diff != "" {
t.Errorf("PodGroupPolicy() policy (-want,+got):\n%s", diff)
}
})
}
}