Add performance tests for gang scheduling

This commit is contained in:
iomarsayed 2026-04-28 11:24:28 +00:00
parent 5ce17ed71b
commit 8f018f97a8
7 changed files with 243 additions and 0 deletions

View file

@ -40,6 +40,7 @@ import (
cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/dynamic"
coreinformers "k8s.io/client-go/informers/core/v1"
schedulinginformers "k8s.io/client-go/informers/scheduling/v1alpha2"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/tools/cache"
@ -61,6 +62,7 @@ type WorkloadExecutor struct {
dataItems []DataItem
numPodsScheduledPerNamespace map[string]int
podInformer coreinformers.PodInformer
podGroupInformer schedulinginformers.PodGroupInformer
throughputErrorMargin float64
testCase *testCase
workload *Workload
@ -90,6 +92,8 @@ func (e *WorkloadExecutor) runOp(tCtx ktesting.TContext, op realOp, opIndex int)
return e.runBarrierOp(tCtx, opIndex, concreteOp)
case *sleepOp:
return e.runSleepOp(tCtx, concreteOp)
case *waitForPodGroups:
return e.runWaitForPodGroupsOp(tCtx, concreteOp)
case *startCollectingMetricsOp:
return e.runStartCollectingMetricsOp(tCtx, opIndex, concreteOp)
case *stopCollectingMetricsOp:
@ -184,6 +188,29 @@ func (e *WorkloadExecutor) runSleepOp(tCtx ktesting.TContext, op *sleepOp) error
return nil
}
// runWaitForPodGroupsOp executes the waitForPodGroups operation.
// It polls the scheduler's informer cache until the expected number of pod groups
// are visible in the given namespace. This ensures that subsequent operations
// (like creating pods that reference these pod groups) won't fail due to cache lag.
// It timeouts after 10 seconds if the condition is not met.
func (e *WorkloadExecutor) runWaitForPodGroupsOp(tCtx ktesting.TContext, op *waitForPodGroups) error {
tCtx.Logf("waiting for %d PodGroups in namespace %q", op.Count, op.Namespace)
err := wait.PollUntilContextTimeout(tCtx, 100*time.Millisecond, 10*time.Second, true, func(ctx context.Context) (bool, error) {
podGroups, err := e.podGroupInformer.Lister().PodGroups(op.Namespace).List(labels.Everything())
if err != nil {
return false, err
}
if len(podGroups) >= op.Count {
return true, nil
}
return false, nil
})
if err != nil {
return fmt.Errorf("timed out waiting for PodGroups: %w", err)
}
return nil
}
func (e *WorkloadExecutor) runStopCollectingMetrics(tCtx ktesting.TContext, opIndex int) error {
items, err := stopCollectingMetrics(tCtx, e.collectorCancel, &e.collectorWG, e.workload.Threshold.Get(e.topicName), *e.workload.ThresholdMetricSelector, opIndex, e.collectors)
if err != nil {

View file

@ -0,0 +1,43 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gangscheduling
import (
"fmt"
"os"
"testing"
_ "k8s.io/component-base/logs/json/register"
perf "k8s.io/kubernetes/test/integration/scheduler_perf"
)
func TestMain(m *testing.M) {
if err := perf.InitTests(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
m.Run()
}
func TestSchedulerPerfGangScheduling(t *testing.T) {
perf.RunIntegrationPerfScheduling(t, "performance-config.yaml")
}
func BenchmarkPerfGangScheduling(b *testing.B) {
perf.RunBenchmarkPerfScheduling(b, "performance-config.yaml", "gangscheduling", nil)
}

View file

@ -0,0 +1,93 @@
# The following labels are used in this file:
#
# - integration-test: test cases to run as the integration test.
# - performance: test cases to run in the performance test.
# - short: supplemental label for the above two labels (must not used alone), which literally means short execution time test cases.
- name: GangScheduling
featureGates:
GenericWorkload: true
GangScheduling: true
workloadTemplate:
- opcode: createNodes
countParam: $initNodes
- opcode: createNamespaces
prefix: gang
count: 1
- opcode: createAny
# Create pod groups (gangs), each has a min count policy specified in pod group template.
# Each pod group is named gang-0, gang-1, ... gang-(n-1).
countParam: $initPodGroups
namespace: gang-0
templatePath: templates/podgroup.yaml
- opcode: waitForPodGroups
# Wait for the scheduler's informer cache to reflect the newly created PodGroup objects.
namespace: gang-0
countParam: $initPodGroups
- opcode: createPods
# Create pods with reference to the pod groups (gangs) according to their indices (e.g., pods 0-2 → gang-0, pods 3-5 → gang-1, etc.).
countParam: $initPodGroups
countMultiplierParam: $podsPerGroup
namespace: gang-0
podTemplatePath: templates/gang-pod.yaml
collectMetrics: true
templateParams:
podsPerGroup: $podsPerGroup
workloads:
- name: 10Nodes_3Gangs
labels: [integration-test, short]
params:
initNodes: 10
initPodGroups: 3
podsPerGroup: 3
- name: 100Nodes_10Gangs
labels: [integration-test]
params:
initNodes: 100
initPodGroups: 10
podsPerGroup: 3
- name: 5000Nodes_1000Gangs_3000Pods
labels: [performance]
# https://perf-dash.k8s.io/#/?jobname=scheduler-perf-benchmark&metriccategoryname=Scheduler&metricname=BenchmarkPerfScheduling&Metric=scheduler_podgroup_scheduling_attempt_duration_seconds&Name=BenchmarkPerfScheduling%2FGangScheduling%2F5000Nodes_1000Gangs_3000Pods%2Ftest&event=not%20applicable&extension_point=not%20applicable&plugin=not%20applicable&result=not%20applicable
# Measured scheduler_podgroup_scheduling_attempt_duration_seconds/Average ~3.7 ms; threshold set conservatively at 8.
threshold: 8
thresholdMetricSelector:
name: scheduler_podgroup_scheduling_attempt_duration_seconds
labels:
result: scheduled
dataBucket: Average
expectLower: true
params:
initNodes: 5000
initPodGroups: 1000
podsPerGroup: 3
- name: 5000Nodes_2000Gangs_6000Pods
labels: [performance]
# https://perf-dash.k8s.io/#/?jobname=scheduler-perf-benchmark&metriccategoryname=Scheduler&metricname=BenchmarkPerfScheduling&Metric=scheduler_podgroup_scheduling_attempt_duration_seconds&Name=BenchmarkPerfScheduling%2FGangScheduling%2F5000Nodes_2000Gangs_6000Pods%2Ftest&event=not%20applicable&extension_point=not%20applicable&plugin=not%20applicable&result=not%20applicable
# Measured scheduler_podgroup_scheduling_attempt_duration_seconds/Average ~5.0 ms; threshold set conservatively at 10.
threshold: 10
thresholdMetricSelector:
name: scheduler_podgroup_scheduling_attempt_duration_seconds
labels:
result: scheduled
dataBucket: Average
expectLower: true
params:
initNodes: 5000
initPodGroups: 2000
podsPerGroup: 3
- name: 5000Nodes_3000Gangs_9000Pods
labels: [performance]
# https://perf-dash.k8s.io/#/?jobname=scheduler-perf-benchmark&metriccategoryname=Scheduler&metricname=BenchmarkPerfScheduling&Metric=scheduler_podgroup_scheduling_attempt_duration_seconds&Name=BenchmarkPerfScheduling%2FGangScheduling%2F5000Nodes_3000Gangs_9000Pods%2Ftest&event=not%20applicable&extension_point=not%20applicable&plugin=not%20applicable&result=not%20applicable
# Measured scheduler_podgroup_scheduling_attempt_duration_seconds/Average ~5.7 ms; threshold set conservatively at 12.
threshold: 12
thresholdMetricSelector:
name: scheduler_podgroup_scheduling_attempt_duration_seconds
labels:
result: scheduled
dataBucket: Average
expectLower: true
params:
initNodes: 5000
initPodGroups: 3000
podsPerGroup: 3

View file

@ -0,0 +1,15 @@
apiVersion: v1
kind: Pod
metadata:
name: test-gang-scheduling-{{.Index}}
spec:
schedulingGroup:
# Three pods share the same pod group.
podGroupName: gang-{{DivideInt .Index .podsPerGroup}}
containers:
- image: registry.k8s.io/pause:3.10.1
name: pause
resources:
requests:
cpu: 100m
memory: 100Mi

View file

@ -0,0 +1,8 @@
apiVersion: scheduling.k8s.io/v1alpha2
kind: PodGroup
metadata:
name: gang-{{.Index}}
spec:
schedulingPolicy:
gang:
minCount: 3

View file

@ -299,6 +299,10 @@ type createPodsOp struct {
Count int
// Template parameter for Count.
CountParam string
// Template parameter for multiplying CountParam. It is used when total number of pods
// is defined by number of pods per podgroup for multiple podgroups.
// Optional.
CountMultiplierParam string
// If false, Count pods get created rapidly. This can be used to
// measure how quickly the scheduler can fill up a cluster.
//
@ -351,6 +355,9 @@ func (cpo *createPodsOp) isValid(allowParameterization bool) error {
if !isValidCount(allowParameterization, cpo.Count, cpo.CountParam) {
return fmt.Errorf("invalid Count=%d / CountParam=%q", cpo.Count, cpo.CountParam)
}
if cpo.CountMultiplierParam != "" && !isValidParameterizable(cpo.CountMultiplierParam) {
return fmt.Errorf("invalid CountMultiplierParam=%q", cpo.CountMultiplierParam)
}
if cpo.CollectMetrics && cpo.SkipWaitToCompletion {
// While it's technically possible to achieve this, the additional
// complexity is not worth it, especially given that we don't have any
@ -378,6 +385,13 @@ func (cpo createPodsOp) patchParams(w *Workload) (realOp, error) {
return nil, err
}
}
if cpo.CountMultiplierParam != "" {
multiplier, err := w.Params.get(cpo.CountMultiplierParam[1:])
if err != nil {
return nil, err
}
cpo.Count *= multiplier
}
if cpo.DurationParam != "" {
durationStr, err := getParam[string](w.Params, cpo.DurationParam[1:])
if err != nil {
@ -602,6 +616,40 @@ func (so sleepOp) patchParams(w *Workload) (realOp, error) {
return &so, nil
}
// waitForPodGroups defines an op that waits for a specific number of PodGroup objects to be visible in the scheduler's cache.
type waitForPodGroups struct {
// Must be waitForPodGroupsOpcode.
Opcode operationCode
// Namespace the objects should be in.
Namespace string
// Count determines how many objects to wait for.
Count int
// CountParam is the name of the parameter that determines the count.
CountParam string
}
func (w *waitForPodGroups) isValid(allowParameterization bool) error {
if !isValidCount(allowParameterization, w.Count, w.CountParam) {
return fmt.Errorf("invalid Count=%d / CountParam=%q", w.Count, w.CountParam)
}
return nil
}
func (w *waitForPodGroups) collectsMetrics() bool {
return false
}
func (w waitForPodGroups) patchParams(workload *Workload) (realOp, error) {
if w.CountParam != "" {
var err error
w.Count, err = workload.Params.get(w.CountParam[1:])
if err != nil {
return nil, err
}
}
return &w, (&w).isValid(false)
}
// startCollectingMetricsOp defines an op that starts metrics collectors.
// stopCollectingMetricsOp has to be used after this op to finish collecting.
type startCollectingMetricsOp struct {

View file

@ -76,6 +76,7 @@ const (
sleepOpcode operationCode = "sleep"
startCollectingMetricsOpcode operationCode = "startCollectingMetrics"
stopCollectingMetricsOpcode operationCode = "stopCollectingMetrics"
waitForPodGroupsOpcode operationCode = "waitForPodGroups"
)
const (
@ -159,6 +160,12 @@ var (
Values: schedframework.AllClusterEventLabels(),
},
},
"scheduler_podgroup_scheduling_attempt_duration_seconds": {
{
Label: resultLabelName,
Values: []string{metrics.ScheduledResult, metrics.UnschedulableResult, metrics.ErrorResult},
},
},
},
}
@ -507,6 +514,7 @@ func (op *op) UnmarshalJSON(b []byte) error {
sleepOpcode: &sleepOp{},
startCollectingMetricsOpcode: &startCollectingMetricsOp{},
stopCollectingMetricsOpcode: &stopCollectingMetricsOp{},
waitForPodGroupsOpcode: &waitForPodGroups{},
// TODO(#94601): add a delete nodes op to simulate scaling behaviour?
}
// First determine the opcode using lenient decoding (= ignore extra fields).
@ -1097,6 +1105,7 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *Workload, topicName st
scheduler: scheduler,
numPodsScheduledPerNamespace: make(map[string]int),
podInformer: podInformer,
podGroupInformer: informerFactory.Scheduling().V1alpha2().PodGroups(),
throughputErrorMargin: throughputErrorMargin,
testCase: tc,
workload: w,