mirror of
https://github.com/kubernetes/kubernetes.git
synced 2026-05-28 04:04:39 -04:00
Merge pull request #137340 from vshkrabkov/perf-test/init-opportunistic-batching
Opportunistic batching performance tests
This commit is contained in:
commit
d7e0dc363e
9 changed files with 354 additions and 21 deletions
42
test/integration/scheduler_perf/batching/batching_test.go
Normal file
42
test/integration/scheduler_perf/batching/batching_test.go
Normal file
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
Copyright The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package batching
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
_ "k8s.io/component-base/logs/json/register"
|
||||
perf "k8s.io/kubernetes/test/integration/scheduler_perf"
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
if err := perf.InitTests(); err != nil {
|
||||
fmt.Fprintf(os.Stderr, "%v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
m.Run()
|
||||
}
|
||||
|
||||
func TestSchedulerPerf(t *testing.T) {
|
||||
perf.RunIntegrationPerfScheduling(t, "performance-config.yaml")
|
||||
}
|
||||
|
||||
func BenchmarkPerfScheduling(b *testing.B) {
|
||||
perf.RunBenchmarkPerfScheduling(b, "performance-config.yaml", "batching", nil)
|
||||
}
|
||||
146
test/integration/scheduler_perf/batching/performance-config.yaml
Normal file
146
test/integration/scheduler_perf/batching/performance-config.yaml
Normal file
|
|
@ -0,0 +1,146 @@
|
|||
# HostPortConflict Scenario
|
||||
# Best-case scenario for opportunistic batching.
|
||||
# The `HostPort` constraint (port 80) strictly limits scheduling to 1 pod per node.
|
||||
# Since 20000 Pods need port 80 and we have 20000 Nodes, once Pod N is scheduled on Node X, Node X is immediately invalid for Pod N+1.
|
||||
# Opportunistic batching should detect this and skip Node X for Pod N+1 without running plugins, resulting in significant performance improvement (O(N) -> O(1) node scans).
|
||||
- name: HostPortConflict
|
||||
schedulerConfigPath: scheduler-config-no-topology.yaml
|
||||
defaultPodTemplatePath: ../templates/pod-hostport-80.yaml
|
||||
workloadTemplate:
|
||||
- opcode: createNodes
|
||||
countParam: $initNodes
|
||||
nodeTemplatePath: ../templates/node-default.yaml
|
||||
- opcode: createPods
|
||||
signatureBatchSizeParam: $signatureBatchSize
|
||||
countParam: $measurePods
|
||||
collectMetrics: true
|
||||
workloads:
|
||||
- name: 10Nodes_10Pods_10BatchSize
|
||||
featureGates:
|
||||
OpportunisticBatching: true
|
||||
labels: [integration-test, short]
|
||||
params:
|
||||
signatureBatchSize: 10
|
||||
initNodes: 10
|
||||
measurePods: 10
|
||||
- name: 20000Nodes_20000Pods_10BatchSize
|
||||
featureGates:
|
||||
OpportunisticBatching: true
|
||||
labels: [performance]
|
||||
params:
|
||||
signatureBatchSize: 10
|
||||
initNodes: 20000
|
||||
measurePods: 20000
|
||||
- name: 20000Nodes_20000Pods_100BatchSize
|
||||
featureGates:
|
||||
OpportunisticBatching: true
|
||||
labels: [performance]
|
||||
params:
|
||||
signatureBatchSize: 100
|
||||
initNodes: 20000
|
||||
measurePods: 20000
|
||||
- name: 20000Nodes_20000Pods_1000BatchSize
|
||||
featureGates:
|
||||
OpportunisticBatching: true
|
||||
labels: [performance]
|
||||
params:
|
||||
signatureBatchSize: 1000
|
||||
initNodes: 20000
|
||||
measurePods: 20000
|
||||
- name: 20000Nodes_20000Pods_5000BatchSize
|
||||
featureGates:
|
||||
OpportunisticBatching: true
|
||||
labels: [performance]
|
||||
params:
|
||||
signatureBatchSize: 5000
|
||||
initNodes: 20000
|
||||
measurePods: 20000
|
||||
- name: 20000Nodes_20000Pods_20000BatchSize
|
||||
featureGates:
|
||||
OpportunisticBatching: true
|
||||
labels: [performance]
|
||||
params:
|
||||
signatureBatchSize: 20000
|
||||
initNodes: 20000
|
||||
measurePods: 20000
|
||||
- name: 20000Nodes_20000Pods_OpportunisticBatchingDisabled
|
||||
featureGates:
|
||||
OpportunisticBatching: false
|
||||
labels: [performance]
|
||||
params:
|
||||
signatureBatchSize: 1
|
||||
initNodes: 20000
|
||||
measurePods: 20000
|
||||
|
||||
# ResourceSaturation Scenario
|
||||
# Simulates a common "bin-packing" scenario where pods fill up nodes based on CPU/Memory.
|
||||
# We use pods requesting 3 CPU on 4 CPU nodes, which forces a 1-pod-per-node placement (since 3+3 > 4).
|
||||
# This validates that the batching logic correctly handles resource availability constraints, which are more complex than simple boolean predicates like HostPort.
|
||||
- name: ResourceSaturation
|
||||
schedulerConfigPath: scheduler-config-no-topology.yaml
|
||||
defaultPodTemplatePath: ../templates/pod-saturation.yaml
|
||||
workloadTemplate:
|
||||
- opcode: createNodes
|
||||
countParam: $initNodes
|
||||
nodeTemplatePath: ../templates/node-default.yaml
|
||||
- opcode: createPods
|
||||
signatureBatchSizeParam: $signatureBatchSize
|
||||
countParam: $measurePods
|
||||
collectMetrics: true
|
||||
workloads:
|
||||
- name: 10Nodes_10Pods_10BatchSize
|
||||
featureGates:
|
||||
OpportunisticBatching: true
|
||||
labels: [integration-test, short]
|
||||
params:
|
||||
signatureBatchSize: 10
|
||||
initNodes: 10
|
||||
measurePods: 10
|
||||
- name: 20000Nodes_20000Pods_10BatchSize
|
||||
featureGates:
|
||||
OpportunisticBatching: true
|
||||
labels: [performance]
|
||||
params:
|
||||
signatureBatchSize: 10
|
||||
initNodes: 20000
|
||||
measurePods: 20000
|
||||
- name: 20000Nodes_20000Pods_100BatchSize
|
||||
featureGates:
|
||||
OpportunisticBatching: true
|
||||
labels: [performance]
|
||||
params:
|
||||
signatureBatchSize: 100
|
||||
initNodes: 20000
|
||||
measurePods: 20000
|
||||
- name: 20000Nodes_20000Pods_1000BatchSize
|
||||
featureGates:
|
||||
OpportunisticBatching: true
|
||||
labels: [performance]
|
||||
params:
|
||||
signatureBatchSize: 1000
|
||||
initNodes: 20000
|
||||
measurePods: 20000
|
||||
- name: 20000Nodes_20000Pods_5000BatchSize
|
||||
featureGates:
|
||||
OpportunisticBatching: true
|
||||
labels: [performance]
|
||||
params:
|
||||
signatureBatchSize: 5000
|
||||
initNodes: 20000
|
||||
measurePods: 20000
|
||||
- name: 20000Nodes_20000Pods_20000BatchSize
|
||||
featureGates:
|
||||
OpportunisticBatching: true
|
||||
labels: [performance]
|
||||
params:
|
||||
signatureBatchSize: 20000
|
||||
initNodes: 20000
|
||||
measurePods: 20000
|
||||
- name: 20000Nodes_20000Pods_OpportunisticBatchingDisabled
|
||||
featureGates:
|
||||
OpportunisticBatching: false
|
||||
labels: [performance]
|
||||
params:
|
||||
signatureBatchSize: 1
|
||||
initNodes: 20000
|
||||
measurePods: 20000
|
||||
|
|
@ -0,0 +1,9 @@
|
|||
apiVersion: kubescheduler.config.k8s.io/v1
|
||||
kind: KubeSchedulerConfiguration
|
||||
profiles:
|
||||
- schedulerName: default-scheduler
|
||||
pluginConfig:
|
||||
- name: PodTopologySpread
|
||||
args:
|
||||
defaultingType: List
|
||||
defaultConstraints: []
|
||||
|
|
@ -939,7 +939,7 @@ func waitUntilPodsScheduledInNamespace(tCtx ktesting.TContext, podInformer corei
|
|||
func getPodStrategy(cpo *createPodsOp) (testutils.TestPodCreateStrategy, error) {
|
||||
podTemplate := testutils.StaticPodTemplate(makeBasePod())
|
||||
if cpo.PodTemplatePath != nil {
|
||||
podTemplate = podTemplateWithParams{path: *cpo.PodTemplatePath, params: cpo.TemplateParams}
|
||||
podTemplate = podTemplateWithParams{path: *cpo.PodTemplatePath, params: cpo.TemplateParams, batchSize: cpo.SignatureBatchSize}
|
||||
}
|
||||
if cpo.PersistentVolumeClaimTemplatePath == nil {
|
||||
return testutils.NewCustomCreatePodStrategy(podTemplate), nil
|
||||
|
|
@ -974,8 +974,9 @@ func (n nodeTemplateWithParams) GetNodeTemplate(index, count int) (*v1.Node, err
|
|||
}
|
||||
|
||||
type podTemplateWithParams struct {
|
||||
path string
|
||||
params map[string]any
|
||||
path string
|
||||
params map[string]any
|
||||
batchSize int
|
||||
}
|
||||
|
||||
func (p podTemplateWithParams) GetPodTemplate(index, count int) (*v1.Pod, error) {
|
||||
|
|
@ -987,6 +988,12 @@ func (p podTemplateWithParams) GetPodTemplate(index, count int) (*v1.Pod, error)
|
|||
if err := getSpecFromTextTemplateFile(p.path, env, podSpec); err != nil {
|
||||
return nil, fmt.Errorf("parsing Pod: %w", err)
|
||||
}
|
||||
|
||||
if podSpec.Labels == nil {
|
||||
podSpec.Labels = make(map[string]string)
|
||||
}
|
||||
podSpec.Labels["signature"] = fmt.Sprintf("signature-label-%d", index/p.batchSize)
|
||||
|
||||
return podSpec, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -396,6 +396,69 @@ func TestRunOp(t *testing.T) {
|
|||
}),
|
||||
},
|
||||
},
|
||||
{
|
||||
|
||||
name: "Create Pods with Signature Labels from File",
|
||||
op: &createPodsOp{
|
||||
Opcode: createPodsOpcode,
|
||||
Count: 3,
|
||||
PodTemplatePath: createObjTemplateFile(t,
|
||||
&v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "custom-pod-{{.Index}}",
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: "container",
|
||||
Image: "pause",
|
||||
},
|
||||
},
|
||||
},
|
||||
}),
|
||||
SkipWaitToCompletion: true,
|
||||
SignatureBatchSize: 2,
|
||||
},
|
||||
verifyFuncs: []verifyFunc{
|
||||
verifyCount(3),
|
||||
verifyPodSignature(2),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Create Pods with Signature Labels from File (unset SignatureBatchSize)",
|
||||
op: &createPodsOp{
|
||||
Opcode: createPodsOpcode,
|
||||
Count: 3,
|
||||
PodTemplatePath: createObjTemplateFile(t,
|
||||
&v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "custom-pod-{{.Index}}",
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: "container",
|
||||
Image: "pause",
|
||||
},
|
||||
},
|
||||
},
|
||||
}),
|
||||
SkipWaitToCompletion: true,
|
||||
},
|
||||
verifyFuncs: []verifyFunc{
|
||||
verifyCount(3),
|
||||
verifyPodSignature(1),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Create Pods with Invalid SignatureBatchSize",
|
||||
op: &createPodsOp{
|
||||
Opcode: createPodsOpcode,
|
||||
Count: 1,
|
||||
SignatureBatchSize: -1,
|
||||
},
|
||||
expectedFailure: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
|
|
@ -421,19 +484,21 @@ func TestRunOp(t *testing.T) {
|
|||
|
||||
opToRun := tt.op
|
||||
opIndex := 0
|
||||
if tt.workload != nil {
|
||||
if patchable, ok := tt.op.(interface {
|
||||
patchParams(w *Workload) (realOp, error)
|
||||
}); ok {
|
||||
patchedOp, err := patchable.patchParams(tt.workload)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to patch params: %v", err)
|
||||
}
|
||||
opToRun = patchedOp
|
||||
}
|
||||
w := tt.workload
|
||||
if w == nil {
|
||||
w = &Workload{}
|
||||
}
|
||||
|
||||
err := exec.runOp(tCtx, opToRun, opIndex)
|
||||
var err error
|
||||
if patchable, ok := tt.op.(interface {
|
||||
patchParams(w *Workload) (realOp, error)
|
||||
}); ok {
|
||||
opToRun, err = patchable.patchParams(w)
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
err = exec.runOp(tCtx, opToRun, opIndex)
|
||||
}
|
||||
|
||||
if tt.expectedFailure {
|
||||
if err == nil {
|
||||
|
|
@ -489,6 +554,29 @@ func verifyCount(expectedCount int) verifyFunc {
|
|||
}
|
||||
}
|
||||
|
||||
func verifyPodSignature(batchSize int) verifyFunc {
|
||||
return func(t *testing.T, tCtx ktesting.TContext, op realOp, opIndex int) error {
|
||||
pods, err := tCtx.Client().CoreV1().Pods(metav1.NamespaceAll).List(tCtx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(pods.Items) == 0 {
|
||||
return fmt.Errorf("no pods found")
|
||||
}
|
||||
for i, pod := range pods.Items {
|
||||
val, ok := pod.Labels["signature"]
|
||||
if !ok {
|
||||
return fmt.Errorf("pod %s missing signature label", pod.Name)
|
||||
}
|
||||
expected := fmt.Sprintf("signature-label-%d", i/batchSize)
|
||||
if val != expected {
|
||||
return fmt.Errorf("pod %d: unexpected signature: got %q, want %q", i, val, expected)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// verifyLabelValuesAllowed returns a verification function that checks if the label values for a given key.
|
||||
func verifyLabelValuesAllowed(key string, allowValues sets.Set[string]) verifyFunc {
|
||||
return func(t *testing.T, tCtx ktesting.TContext, op realOp, opIndex int) error {
|
||||
|
|
|
|||
|
|
@ -349,9 +349,19 @@ type createPodsOp struct {
|
|||
// Params to be passed to the template.
|
||||
// Values with `$` prefix will be resolved to the workload parameters.
|
||||
TemplateParams map[string]any
|
||||
// SignatureBatchSize defines how many subsequent pods have the same "signature" label.
|
||||
// If positive, every SignatureBatchSize pods will have a "signature" label with value "signature-label-<index/batchSize>".
|
||||
// If not specified, it defaults to 1 (each pod has a unique signature).
|
||||
// Optional
|
||||
SignatureBatchSize int
|
||||
// Template parameter for SignatureBatchSize.
|
||||
SignatureBatchSizeParam string
|
||||
}
|
||||
|
||||
func (cpo *createPodsOp) isValid(allowParameterization bool) error {
|
||||
if !isValidCount(allowParameterization, cpo.SignatureBatchSize, cpo.SignatureBatchSizeParam) {
|
||||
return fmt.Errorf("invalid SignatureBatchSize=%d / SignatureBatchSizeParam=%q", cpo.SignatureBatchSize, cpo.SignatureBatchSizeParam)
|
||||
}
|
||||
if !isValidCount(allowParameterization, cpo.Count, cpo.CountParam) {
|
||||
return fmt.Errorf("invalid Count=%d / CountParam=%q", cpo.Count, cpo.CountParam)
|
||||
}
|
||||
|
|
@ -392,6 +402,17 @@ func (cpo createPodsOp) patchParams(w *Workload) (realOp, error) {
|
|||
}
|
||||
cpo.Count *= multiplier
|
||||
}
|
||||
if cpo.SignatureBatchSizeParam != "" {
|
||||
paramKey := cpo.SignatureBatchSizeParam[1:]
|
||||
signatureBatchSize, err := w.Params.get(paramKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cpo.SignatureBatchSize = signatureBatchSize
|
||||
}
|
||||
if cpo.SignatureBatchSize == 0 {
|
||||
cpo.SignatureBatchSize = 1
|
||||
}
|
||||
if cpo.DurationParam != "" {
|
||||
durationStr, err := getParam[string](w.Params, cpo.DurationParam[1:])
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -43,7 +43,6 @@ import (
|
|||
"k8s.io/component-base/metrics/legacyregistry"
|
||||
"k8s.io/component-base/metrics/testutil"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/scheduler"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
"k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
|
||||
|
|
@ -688,12 +687,6 @@ func setupTestCase(t testing.TB, tc *testCase, featureGates map[featuregate.Feat
|
|||
// quit *before* restoring klog settings.
|
||||
framework.GoleakCheck(t)
|
||||
|
||||
if _, found := featureGates[features.OpportunisticBatching]; !found {
|
||||
if featureGates == nil {
|
||||
featureGates = map[featuregate.Feature]bool{}
|
||||
}
|
||||
featureGates[features.OpportunisticBatching] = false
|
||||
}
|
||||
featuregatetesting.SetFeatureGatesDuringTest(t, utilfeature.DefaultFeatureGate, featureGates)
|
||||
|
||||
if opts.preRunFn != nil {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,15 @@
|
|||
apiVersion: v1
|
||||
kind: Pod
|
||||
metadata:
|
||||
generateName: pod-hostport-80-
|
||||
spec:
|
||||
containers:
|
||||
- image: registry.k8s.io/pause:3.10.1
|
||||
name: pause
|
||||
ports:
|
||||
- containerPort: 80
|
||||
hostPort: 80
|
||||
resources:
|
||||
requests:
|
||||
cpu: 100m
|
||||
memory: 100Mi
|
||||
|
|
@ -0,0 +1,12 @@
|
|||
apiVersion: v1
|
||||
kind: Pod
|
||||
metadata:
|
||||
generateName: pod-saturation-
|
||||
spec:
|
||||
containers:
|
||||
- image: registry.k8s.io/pause:3.10.1
|
||||
name: pause
|
||||
resources:
|
||||
requests:
|
||||
cpu: 3
|
||||
memory: 1Gi
|
||||
Loading…
Reference in a new issue