diff --git a/test/integration/scheduler_perf/executor.go b/test/integration/scheduler_perf/executor.go index a679ecb7cad..2a71de4d3cb 100644 --- a/test/integration/scheduler_perf/executor.go +++ b/test/integration/scheduler_perf/executor.go @@ -40,6 +40,7 @@ import ( cacheddiscovery "k8s.io/client-go/discovery/cached/memory" "k8s.io/client-go/dynamic" coreinformers "k8s.io/client-go/informers/core/v1" + schedulinginformers "k8s.io/client-go/informers/scheduling/v1alpha2" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/restmapper" "k8s.io/client-go/tools/cache" @@ -61,6 +62,7 @@ type WorkloadExecutor struct { dataItems []DataItem numPodsScheduledPerNamespace map[string]int podInformer coreinformers.PodInformer + podGroupInformer schedulinginformers.PodGroupInformer throughputErrorMargin float64 testCase *testCase workload *Workload @@ -90,6 +92,8 @@ func (e *WorkloadExecutor) runOp(tCtx ktesting.TContext, op realOp, opIndex int) return e.runBarrierOp(tCtx, opIndex, concreteOp) case *sleepOp: return e.runSleepOp(tCtx, concreteOp) + case *waitForPodGroups: + return e.runWaitForPodGroupsOp(tCtx, concreteOp) case *startCollectingMetricsOp: return e.runStartCollectingMetricsOp(tCtx, opIndex, concreteOp) case *stopCollectingMetricsOp: @@ -184,6 +188,29 @@ func (e *WorkloadExecutor) runSleepOp(tCtx ktesting.TContext, op *sleepOp) error return nil } +// runWaitForPodGroupsOp executes the waitForPodGroups operation. +// It polls the scheduler's informer cache until the expected number of pod groups +// are visible in the given namespace. This ensures that subsequent operations +// (like creating pods that reference these pod groups) won't fail due to cache lag. +// It timeouts after 10 seconds if the condition is not met. +func (e *WorkloadExecutor) runWaitForPodGroupsOp(tCtx ktesting.TContext, op *waitForPodGroups) error { + tCtx.Logf("waiting for %d PodGroups in namespace %q", op.Count, op.Namespace) + err := wait.PollUntilContextTimeout(tCtx, 100*time.Millisecond, 10*time.Second, true, func(ctx context.Context) (bool, error) { + podGroups, err := e.podGroupInformer.Lister().PodGroups(op.Namespace).List(labels.Everything()) + if err != nil { + return false, err + } + if len(podGroups) >= op.Count { + return true, nil + } + return false, nil + }) + if err != nil { + return fmt.Errorf("timed out waiting for PodGroups: %w", err) + } + return nil +} + func (e *WorkloadExecutor) runStopCollectingMetrics(tCtx ktesting.TContext, opIndex int) error { items, err := stopCollectingMetrics(tCtx, e.collectorCancel, &e.collectorWG, e.workload.Threshold.Get(e.topicName), *e.workload.ThresholdMetricSelector, opIndex, e.collectors) if err != nil { diff --git a/test/integration/scheduler_perf/gangscheduling/gangscheduling_test.go b/test/integration/scheduler_perf/gangscheduling/gangscheduling_test.go new file mode 100644 index 00000000000..a064b08970d --- /dev/null +++ b/test/integration/scheduler_perf/gangscheduling/gangscheduling_test.go @@ -0,0 +1,43 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package gangscheduling + +import ( + "fmt" + "os" + "testing" + + _ "k8s.io/component-base/logs/json/register" + perf "k8s.io/kubernetes/test/integration/scheduler_perf" +) + +func TestMain(m *testing.M) { + if err := perf.InitTests(); err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + os.Exit(1) + } + + m.Run() +} + +func TestSchedulerPerfGangScheduling(t *testing.T) { + perf.RunIntegrationPerfScheduling(t, "performance-config.yaml") +} + +func BenchmarkPerfGangScheduling(b *testing.B) { + perf.RunBenchmarkPerfScheduling(b, "performance-config.yaml", "gangscheduling", nil) +} diff --git a/test/integration/scheduler_perf/gangscheduling/performance-config.yaml b/test/integration/scheduler_perf/gangscheduling/performance-config.yaml new file mode 100644 index 00000000000..2f696ad1f18 --- /dev/null +++ b/test/integration/scheduler_perf/gangscheduling/performance-config.yaml @@ -0,0 +1,93 @@ +# The following labels are used in this file: +# +# - integration-test: test cases to run as the integration test. +# - performance: test cases to run in the performance test. +# - short: supplemental label for the above two labels (must not used alone), which literally means short execution time test cases. + +- name: GangScheduling + featureGates: + GenericWorkload: true + GangScheduling: true + workloadTemplate: + - opcode: createNodes + countParam: $initNodes + - opcode: createNamespaces + prefix: gang + count: 1 + - opcode: createAny + # Create pod groups (gangs), each has a min count policy specified in pod group template. + # Each pod group is named gang-0, gang-1, ... gang-(n-1). + countParam: $initPodGroups + namespace: gang-0 + templatePath: templates/podgroup.yaml + - opcode: waitForPodGroups + # Wait for the scheduler's informer cache to reflect the newly created PodGroup objects. + namespace: gang-0 + countParam: $initPodGroups + - opcode: createPods + # Create pods with reference to the pod groups (gangs) according to their indices (e.g., pods 0-2 → gang-0, pods 3-5 → gang-1, etc.). + countParam: $initPodGroups + countMultiplierParam: $podsPerGroup + namespace: gang-0 + podTemplatePath: templates/gang-pod.yaml + collectMetrics: true + templateParams: + podsPerGroup: $podsPerGroup + workloads: + - name: 10Nodes_3Gangs + labels: [integration-test, short] + params: + initNodes: 10 + initPodGroups: 3 + podsPerGroup: 3 + - name: 100Nodes_10Gangs + labels: [integration-test] + params: + initNodes: 100 + initPodGroups: 10 + podsPerGroup: 3 + - name: 5000Nodes_1000Gangs_3000Pods + labels: [performance] + # https://perf-dash.k8s.io/#/?jobname=scheduler-perf-benchmark&metriccategoryname=Scheduler&metricname=BenchmarkPerfScheduling&Metric=scheduler_podgroup_scheduling_attempt_duration_seconds&Name=BenchmarkPerfScheduling%2FGangScheduling%2F5000Nodes_1000Gangs_3000Pods%2Ftest&event=not%20applicable&extension_point=not%20applicable&plugin=not%20applicable&result=not%20applicable + # Measured scheduler_podgroup_scheduling_attempt_duration_seconds/Average ~3.7 ms; threshold set conservatively at 8. + threshold: 8 + thresholdMetricSelector: + name: scheduler_podgroup_scheduling_attempt_duration_seconds + labels: + result: scheduled + dataBucket: Average + expectLower: true + params: + initNodes: 5000 + initPodGroups: 1000 + podsPerGroup: 3 + - name: 5000Nodes_2000Gangs_6000Pods + labels: [performance] + # https://perf-dash.k8s.io/#/?jobname=scheduler-perf-benchmark&metriccategoryname=Scheduler&metricname=BenchmarkPerfScheduling&Metric=scheduler_podgroup_scheduling_attempt_duration_seconds&Name=BenchmarkPerfScheduling%2FGangScheduling%2F5000Nodes_2000Gangs_6000Pods%2Ftest&event=not%20applicable&extension_point=not%20applicable&plugin=not%20applicable&result=not%20applicable + # Measured scheduler_podgroup_scheduling_attempt_duration_seconds/Average ~5.0 ms; threshold set conservatively at 10. + threshold: 10 + thresholdMetricSelector: + name: scheduler_podgroup_scheduling_attempt_duration_seconds + labels: + result: scheduled + dataBucket: Average + expectLower: true + params: + initNodes: 5000 + initPodGroups: 2000 + podsPerGroup: 3 + - name: 5000Nodes_3000Gangs_9000Pods + labels: [performance] + # https://perf-dash.k8s.io/#/?jobname=scheduler-perf-benchmark&metriccategoryname=Scheduler&metricname=BenchmarkPerfScheduling&Metric=scheduler_podgroup_scheduling_attempt_duration_seconds&Name=BenchmarkPerfScheduling%2FGangScheduling%2F5000Nodes_3000Gangs_9000Pods%2Ftest&event=not%20applicable&extension_point=not%20applicable&plugin=not%20applicable&result=not%20applicable + # Measured scheduler_podgroup_scheduling_attempt_duration_seconds/Average ~5.7 ms; threshold set conservatively at 12. + threshold: 12 + thresholdMetricSelector: + name: scheduler_podgroup_scheduling_attempt_duration_seconds + labels: + result: scheduled + dataBucket: Average + expectLower: true + params: + initNodes: 5000 + initPodGroups: 3000 + podsPerGroup: 3 diff --git a/test/integration/scheduler_perf/gangscheduling/templates/gang-pod.yaml b/test/integration/scheduler_perf/gangscheduling/templates/gang-pod.yaml new file mode 100644 index 00000000000..3f87739ff07 --- /dev/null +++ b/test/integration/scheduler_perf/gangscheduling/templates/gang-pod.yaml @@ -0,0 +1,15 @@ +apiVersion: v1 +kind: Pod +metadata: + name: test-gang-scheduling-{{.Index}} +spec: + schedulingGroup: + # Three pods share the same pod group. + podGroupName: gang-{{DivideInt .Index .podsPerGroup}} + containers: + - image: registry.k8s.io/pause:3.10.1 + name: pause + resources: + requests: + cpu: 100m + memory: 100Mi diff --git a/test/integration/scheduler_perf/gangscheduling/templates/podgroup.yaml b/test/integration/scheduler_perf/gangscheduling/templates/podgroup.yaml new file mode 100644 index 00000000000..a26cf435b0f --- /dev/null +++ b/test/integration/scheduler_perf/gangscheduling/templates/podgroup.yaml @@ -0,0 +1,8 @@ +apiVersion: scheduling.k8s.io/v1alpha2 +kind: PodGroup +metadata: + name: gang-{{.Index}} +spec: + schedulingPolicy: + gang: + minCount: 3 diff --git a/test/integration/scheduler_perf/operations.go b/test/integration/scheduler_perf/operations.go index 952717511b5..d8ed4788f53 100644 --- a/test/integration/scheduler_perf/operations.go +++ b/test/integration/scheduler_perf/operations.go @@ -299,6 +299,10 @@ type createPodsOp struct { Count int // Template parameter for Count. CountParam string + // Template parameter for multiplying CountParam. It is used when total number of pods + // is defined by number of pods per podgroup for multiple podgroups. + // Optional. + CountMultiplierParam string // If false, Count pods get created rapidly. This can be used to // measure how quickly the scheduler can fill up a cluster. // @@ -351,6 +355,9 @@ func (cpo *createPodsOp) isValid(allowParameterization bool) error { if !isValidCount(allowParameterization, cpo.Count, cpo.CountParam) { return fmt.Errorf("invalid Count=%d / CountParam=%q", cpo.Count, cpo.CountParam) } + if cpo.CountMultiplierParam != "" && !isValidParameterizable(cpo.CountMultiplierParam) { + return fmt.Errorf("invalid CountMultiplierParam=%q", cpo.CountMultiplierParam) + } if cpo.CollectMetrics && cpo.SkipWaitToCompletion { // While it's technically possible to achieve this, the additional // complexity is not worth it, especially given that we don't have any @@ -378,6 +385,13 @@ func (cpo createPodsOp) patchParams(w *Workload) (realOp, error) { return nil, err } } + if cpo.CountMultiplierParam != "" { + multiplier, err := w.Params.get(cpo.CountMultiplierParam[1:]) + if err != nil { + return nil, err + } + cpo.Count *= multiplier + } if cpo.DurationParam != "" { durationStr, err := getParam[string](w.Params, cpo.DurationParam[1:]) if err != nil { @@ -602,6 +616,40 @@ func (so sleepOp) patchParams(w *Workload) (realOp, error) { return &so, nil } +// waitForPodGroups defines an op that waits for a specific number of PodGroup objects to be visible in the scheduler's cache. +type waitForPodGroups struct { + // Must be waitForPodGroupsOpcode. + Opcode operationCode + // Namespace the objects should be in. + Namespace string + // Count determines how many objects to wait for. + Count int + // CountParam is the name of the parameter that determines the count. + CountParam string +} + +func (w *waitForPodGroups) isValid(allowParameterization bool) error { + if !isValidCount(allowParameterization, w.Count, w.CountParam) { + return fmt.Errorf("invalid Count=%d / CountParam=%q", w.Count, w.CountParam) + } + return nil +} + +func (w *waitForPodGroups) collectsMetrics() bool { + return false +} + +func (w waitForPodGroups) patchParams(workload *Workload) (realOp, error) { + if w.CountParam != "" { + var err error + w.Count, err = workload.Params.get(w.CountParam[1:]) + if err != nil { + return nil, err + } + } + return &w, (&w).isValid(false) +} + // startCollectingMetricsOp defines an op that starts metrics collectors. // stopCollectingMetricsOp has to be used after this op to finish collecting. type startCollectingMetricsOp struct { diff --git a/test/integration/scheduler_perf/scheduler_perf.go b/test/integration/scheduler_perf/scheduler_perf.go index 4ef9fa69590..5b884742815 100644 --- a/test/integration/scheduler_perf/scheduler_perf.go +++ b/test/integration/scheduler_perf/scheduler_perf.go @@ -76,6 +76,7 @@ const ( sleepOpcode operationCode = "sleep" startCollectingMetricsOpcode operationCode = "startCollectingMetrics" stopCollectingMetricsOpcode operationCode = "stopCollectingMetrics" + waitForPodGroupsOpcode operationCode = "waitForPodGroups" ) const ( @@ -159,6 +160,12 @@ var ( Values: schedframework.AllClusterEventLabels(), }, }, + "scheduler_podgroup_scheduling_attempt_duration_seconds": { + { + Label: resultLabelName, + Values: []string{metrics.ScheduledResult, metrics.UnschedulableResult, metrics.ErrorResult}, + }, + }, }, } @@ -507,6 +514,7 @@ func (op *op) UnmarshalJSON(b []byte) error { sleepOpcode: &sleepOp{}, startCollectingMetricsOpcode: &startCollectingMetricsOp{}, stopCollectingMetricsOpcode: &stopCollectingMetricsOp{}, + waitForPodGroupsOpcode: &waitForPodGroups{}, // TODO(#94601): add a delete nodes op to simulate scaling behaviour? } // First determine the opcode using lenient decoding (= ignore extra fields). @@ -1097,6 +1105,7 @@ func runWorkload(tCtx ktesting.TContext, tc *testCase, w *Workload, topicName st scheduler: scheduler, numPodsScheduledPerNamespace: make(map[string]int), podInformer: podInformer, + podGroupInformer: informerFactory.Scheduling().V1alpha2().PodGroups(), throughputErrorMargin: throughputErrorMargin, testCase: tc, workload: w,