Improve concurrent store list benchmarking

This commit is contained in:
Marek Siarkowicz 2026-06-04 12:01:00 +02:00
parent fbcbb81625
commit 216ab1b334

View file

@ -120,16 +120,15 @@ func runBenchmarkWriteThroughput(ctx context.Context, b *testing.B, store storag
listCalls.Store(0)
listObjects.Store(0)
b.ResetTimer()
start := time.Now()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
writes.Add(runTraffic(ctx, b, store, data, trafficType))
}
})
end := time.Now()
elapsedSeconds := end.Sub(start).Seconds()
elapsedSeconds := b.Elapsed().Seconds()
consistentStart := time.Now()
require.NoError(b, waitForConsistent(ctx, store))
consistentDelaySeconds := float64(time.Since(end).Nanoseconds()) / float64(time.Second.Nanoseconds())
consistentDelaySeconds := time.Since(consistentStart).Seconds()
b.ReportMetric(consistentDelaySeconds, "seconds-delay")
b.ReportMetric(float64(writes.Load())/elapsedSeconds, "writes/s")
@ -400,19 +399,24 @@ func RunBenchmarkStoreList(ctx context.Context, b *testing.B, store storage.Inte
}
func runBenchmarkStoreList(ctx context.Context, b *testing.B, store storage.Interface, limit int64, match metav1.ResourceVersionMatch, scope scope, data BenchmarkData, useIndex bool) {
wg := sync.WaitGroup{}
objectCount := atomic.Uint64{}
pageCount := atomic.Uint64{}
for i := 0; i < b.N; i++ {
wg.Add(1)
resourceVersion := ""
switch match {
case metav1.ResourceVersionMatchExact, metav1.ResourceVersionMatchNotOlderThan:
maxRevision := 1 + len(data.Pods)
resourceVersion = fmt.Sprintf("%d", maxRevision-99+i%100)
}
go func(resourceVersion, nodeName, namespaceName string) {
defer wg.Done()
listCount := atomic.Uint64{}
var index atomic.Uint64
b.SetParallelism(4)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
i := int(index.Add(1))
resourceVersion := ""
switch match {
case metav1.ResourceVersionMatchExact, metav1.ResourceVersionMatchNotOlderThan:
maxRevision := 1 + len(data.Pods)
resourceVersion = fmt.Sprintf("%d", maxRevision-99+i%100)
}
nodeName := data.NodeNames[i%len(data.NodeNames)]
namespaceName := data.NamespaceNames[i%len(data.NamespaceNames)]
opts := storage.ListOptions{
Recursive: true,
ResourceVersion: resourceVersion,
@ -426,36 +430,36 @@ func runBenchmarkStoreList(ctx context.Context, b *testing.B, store storage.Inte
}
switch scope {
case cluster:
objects, pages := paginateList(ctx, store, "/pods/", opts)
objects, lists := paginateList(ctx, store, "/pods/", opts)
objectCount.Add(uint64(objects))
pageCount.Add(uint64(pages))
listCount.Add(uint64(lists))
case node:
if useIndex {
opts.Predicate.GetAttrs = podAttr
opts.Predicate.IndexFields = []string{"spec.nodeName"}
opts.Predicate.Field = fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeName})
}
objects, pages := paginateList(ctx, store, "/pods/", opts)
objects, lists := paginateList(ctx, store, "/pods/", opts)
objectCount.Add(uint64(objects))
pageCount.Add(uint64(pages))
listCount.Add(uint64(lists))
case namespace:
ctx := ctx
if useIndex {
opts.Predicate.IndexFields = []string{"metadata.namespace"}
ctx = request.WithRequestInfo(ctx, &request.RequestInfo{Namespace: namespaceName})
}
objects, pages := paginateList(ctx, store, "/pods/"+namespaceName, opts)
objects, lists := paginateList(ctx, store, "/pods/"+namespaceName, opts)
objectCount.Add(uint64(objects))
pageCount.Add(uint64(pages))
listCount.Add(uint64(lists))
}
}(resourceVersion, data.NodeNames[i%len(data.NodeNames)], data.NamespaceNames[i%len(data.NamespaceNames)])
}
wg.Wait()
b.ReportMetric(float64(objectCount.Load())/float64(b.N), "objects/op")
b.ReportMetric(float64(pageCount.Load())/float64(b.N), "pages/op")
}
})
elapsedSeconds := b.Elapsed().Seconds()
b.ReportMetric(float64(objectCount.Load())/elapsedSeconds, "list-objs/s")
b.ReportMetric(float64(listCount.Load())/elapsedSeconds, "list-calls/s")
}
func paginateList(ctx context.Context, store storage.Interface, key string, opts storage.ListOptions) (objectCount int, pageCount int) {
func paginateList(ctx context.Context, store storage.Interface, key string, opts storage.ListOptions) (objectCount int, listCount int) {
listOut := &example.PodList{}
err := store.GetList(ctx, key, opts, listOut)
if err != nil {
@ -464,7 +468,7 @@ func paginateList(ctx context.Context, store storage.Interface, key string, opts
opts.Predicate.Continue = listOut.Continue
opts.ResourceVersion = ""
opts.ResourceVersionMatch = ""
pageCount += 1
listCount += 1
objectCount += len(listOut.Items)
for opts.Predicate.Continue != "" {
listOut := &example.PodList{}
@ -473,10 +477,10 @@ func paginateList(ctx context.Context, store storage.Interface, key string, opts
panic(fmt.Sprintf("Unexpected error %s", err))
}
opts.Predicate.Continue = listOut.Continue
pageCount += 1
listCount += 1
objectCount += len(listOut.Items)
}
return objectCount, pageCount
return objectCount, listCount
}
func podAttr(obj runtime.Object) (labels.Set, fields.Set, error) {