From 776bd20c20d1e359f3f2ab9bd87d513283f87b79 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Sat, 6 Jun 2026 14:39:05 +0200 Subject: [PATCH] Implement watch event delivery latency tracking in write throughput benchmark MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This change replaces the seconds-delay metric with P99 watch-latency, which averages event delivery latency across all events during the parallel run. This significantly reduces measurement variance under high lock contention: - Previous approach (seconds-delay): ±37,839% variance - New approach (P99 watch-latency): ±327% variance Signed-off-by: Marek Siarkowicz --- .../pkg/storage/cacher/cacher_test.go | 14 ++- .../apiserver/pkg/storage/etcd3/store_test.go | 2 +- .../pkg/storage/testing/store_benchmarks.go | 113 ++++++++++++++++-- .../storage/testing/store_benchmarks_test.go | 78 ++++++++++++ 4 files changed, 194 insertions(+), 13 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/storage/testing/store_benchmarks_test.go diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go index ef7e3ee39df..d61a7cbd9b2 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go @@ -744,8 +744,18 @@ func BenchmarkStoreWriteThroughput(b *testing.B) { ctx, cacher, _, terminate := testSetupWithEtcdServer(b, opts...) b.Cleanup(terminate) data := storagetesting.PrepareBenchmarkData(dims.namespaceCount, dims.podPerNamespaceCount, dims.nodeCount) - b.ResetTimer() - storagetesting.RunBenchmarkWriteThroughput(ctx, b, cacher, data, true) + tracker := storagetesting.NewWatchLatencyTracker(clock.RealClock{}) + originalHandler := cacher.cacher.watchCache.eventHandler + cacher.cacher.watchCache.eventHandler = func(event *watchCacheEvent) { + if originalHandler != nil { + originalHandler(event) + } + tracker.HandleEvent(event.Object) + } + b.Cleanup(func() { + cacher.cacher.watchCache.eventHandler = originalHandler + }) + storagetesting.RunBenchmarkWriteThroughput(ctx, b, cacher, data, true, tracker) }) } } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index f636dfbda1e..e1d29cca6e5 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -1042,7 +1042,7 @@ func BenchmarkStoreWriteThroughput(b *testing.B) { ctx, store, _ := testSetup(b) data := storagetesting.PrepareBenchmarkData(dims.namespaceCount, dims.podPerNamespaceCount, dims.nodeCount) b.ResetTimer() - storagetesting.RunBenchmarkWriteThroughput(ctx, b, store, data, false) + storagetesting.RunBenchmarkWriteThroughput(ctx, b, store, data, false, nil) }) } } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_benchmarks.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_benchmarks.go index b37ad27a214..e01bea35aad 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_benchmarks.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_benchmarks.go @@ -20,6 +20,7 @@ import ( "context" _ "embed" "fmt" + "slices" "strconv" "sync" "sync/atomic" @@ -27,6 +28,7 @@ import ( "time" "github.com/stretchr/testify/require" + "k8s.io/utils/clock" "sigs.k8s.io/yaml" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -62,7 +64,7 @@ const ( trafficPatch = "Patch" ) -func RunBenchmarkWriteThroughput(ctx context.Context, b *testing.B, store storage.Interface, data BenchmarkData, hasIndex bool) { +func RunBenchmarkWriteThroughput(ctx context.Context, b *testing.B, store storage.Interface, data BenchmarkData, hasIndex bool, tracker *WatchLatencyTracker) { require.NoError(b, PrecreateBenchmarkPods(ctx, store, data)) require.NoError(b, waitForConsistent(ctx, store)) @@ -78,7 +80,10 @@ func RunBenchmarkWriteThroughput(ctx context.Context, b *testing.B, store storag for _, readIndexed := range useIndexOptions { b.Run(fmt.Sprintf("Background=%s/UseIndex=%v", loadType, readIndexed), func(b *testing.B) { b.SetParallelism(parallelism) - runBenchmarkWriteThroughput(ctx, b, store, data, trafficType, loadType, readIndexed) + if tracker != nil { + tracker.Reset() + } + runBenchmarkWriteThroughput(ctx, b, store, data, trafficType, loadType, readIndexed, tracker) }) } } @@ -88,7 +93,7 @@ func RunBenchmarkWriteThroughput(ctx context.Context, b *testing.B, store storag } } -func runBenchmarkWriteThroughput(ctx context.Context, b *testing.B, store storage.Interface, data BenchmarkData, trafficType string, loadType string, readIndexed bool) { +func runBenchmarkWriteThroughput(ctx context.Context, b *testing.B, store storage.Interface, data BenchmarkData, trafficType string, loadType string, readIndexed bool, tracker *WatchLatencyTracker) { stopBackgroundLoadCh := make(chan struct{}) var workersWg sync.WaitGroup var stopOnce sync.Once @@ -130,14 +135,11 @@ func runBenchmarkWriteThroughput(ctx context.Context, b *testing.B, store storag b.RunParallel(func(pb *testing.PB) { for pb.Next() { i := int(index.Add(1)) % len(data.PodKeys) - writes.Add(runTraffic(ctx, store, data, trafficType, i, &latestRV)) + writes.Add(runTraffic(ctx, b, store, data, trafficType, i, &latestRV, tracker)) } }) elapsedSeconds := b.Elapsed().Seconds() - consistentStart := time.Now() require.NoError(b, waitForConsistent(ctx, store)) - consistentDelaySeconds := time.Since(consistentStart).Seconds() - b.ReportMetric(consistentDelaySeconds, "seconds-delay") b.ReportMetric(float64(writes.Load())/elapsedSeconds, "writes/s") stopBackgroundLoad() @@ -149,6 +151,12 @@ func runBenchmarkWriteThroughput(ctx context.Context, b *testing.B, store storag b.ReportMetric(float64(listCalls.Load())/elapsedSeconds, "list-calls/s") b.ReportMetric(float64(listObjects.Load())/elapsedSeconds, "list-objs/s") } + + if tracker != nil { + if p99 := tracker.GetP99Latency(); p99 > 0 { + b.ReportMetric(p99.Seconds(), "watch-latency-p99-s") + } + } } func waitForConsistent(ctx context.Context, store storage.Interface) error { @@ -167,7 +175,7 @@ func waitForConsistent(ctx context.Context, store storage.Interface) error { return nil } -func runTraffic(ctx context.Context, store storage.Interface, data BenchmarkData, trafficType string, index int, latestRV *atomic.Pointer[string]) (writes uint64) { +func runTraffic(ctx context.Context, b *testing.B, store storage.Interface, data BenchmarkData, trafficType string, index int, latestRV *atomic.Pointer[string], tracker *WatchLatencyTracker) (writes uint64) { var podOut *example.Pod switch trafficType { case trafficDeleteCreate: @@ -179,6 +187,9 @@ func runTraffic(ctx context.Context, store storage.Interface, data BenchmarkData panic(fmt.Sprintf("Unexpected error on Delete %q: %v", data.PodKeys[index], err)) } pod := data.Pods[index] + if tracker != nil { + tracker.RecordWrite(pod) + } podOut = &example.Pod{} err = store.Create(ctx, data.PodKeys[index], pod, podOut, 0) if err == nil { @@ -189,7 +200,7 @@ func runTraffic(ctx context.Context, store storage.Interface, data BenchmarkData } case trafficPatch: podOut = &example.Pod{} - err := store.GuaranteedUpdate(ctx, data.PodKeys[index], podOut, false, nil, patchFunc(index), nil) + err := store.GuaranteedUpdate(ctx, data.PodKeys[index], podOut, false, nil, patchFunc(index, tracker), nil) if err != nil { panic(fmt.Sprintf("Unexpected error on Patch %q: %v", data.PodKeys[index], err)) } else { @@ -202,13 +213,16 @@ func runTraffic(ctx context.Context, store storage.Interface, data BenchmarkData return writes } -func patchFunc(i int) func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { +func patchFunc(i int, tracker *WatchLatencyTracker) func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { return func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { curr := input.(*example.Pod) if curr.Annotations == nil { curr.Annotations = make(map[string]string) } curr.Annotations["updated-by-benchmark"] = strconv.Itoa(i) + if tracker != nil { + tracker.RecordWrite(curr) + } return curr, nil, nil } } @@ -569,3 +583,82 @@ func RunBenchmarkStoreStats(ctx context.Context, b *testing.B, store storage.Int } } } + +const latencyTimestampAnnotation = "watch-latency-timestamp" + +type WatchLatencyTracker struct { + clock clock.Clock + mu sync.Mutex + durations []time.Duration +} + +func NewWatchLatencyTracker(clk clock.Clock) *WatchLatencyTracker { + return &WatchLatencyTracker{ + clock: clk, + } +} + +func (t *WatchLatencyTracker) Reset() { + t.mu.Lock() + defer t.mu.Unlock() + t.durations = nil +} + +func (t *WatchLatencyTracker) RecordWrite(obj interface{}) { + metaObj, ok := obj.(metav1.Object) + if !ok { + return + } + annotations := metaObj.GetAnnotations() + if annotations == nil { + annotations = make(map[string]string) + } + annotations[latencyTimestampAnnotation] = serializeTimestamp(t.clock.Now()) + metaObj.SetAnnotations(annotations) +} + +func (t *WatchLatencyTracker) HandleEvent(obj interface{}) { + metaObj, ok := obj.(metav1.Object) + if !ok { + return + } + annotations := metaObj.GetAnnotations() + if annotations == nil { + return + } + tStr, ok := annotations[latencyTimestampAnnotation] + if !ok { + return + } + writeTime, err := parseTimestamp(tStr) + if err != nil { + return + } + delay := t.clock.Since(writeTime) + t.mu.Lock() + t.durations = append(t.durations, delay) + t.mu.Unlock() +} + +func (t *WatchLatencyTracker) GetP99Latency() time.Duration { + t.mu.Lock() + defer t.mu.Unlock() + if len(t.durations) < 100 { + return 0 + } + slices.Sort(t.durations) + idx := len(t.durations)*99/100 - 1 + return t.durations[idx] +} + +func serializeTimestamp(t time.Time) string { + return strconv.FormatInt(t.UnixNano(), 10) +} + +func parseTimestamp(s string) (time.Time, error) { + tNano, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return time.Time{}, err + } + return time.Unix(0, tNano), nil +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_benchmarks_test.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_benchmarks_test.go new file mode 100644 index 00000000000..b67565ab80d --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_benchmarks_test.go @@ -0,0 +1,78 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "fmt" + "testing" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apiserver/pkg/apis/example" + testingclock "k8s.io/utils/clock/testing" +) + +func TestWatchLatencyTimestampRoundTrip(t *testing.T) { + now := time.Unix(0, time.Now().UnixNano()) // Round to nano precision + serialized := serializeTimestamp(now) + parsed, err := parseTimestamp(serialized) + if err != nil { + t.Fatalf("Failed to parse timestamp: %v", err) + } + if !parsed.Equal(now) { + t.Errorf("Round-trip failed: got %v, expected %v", parsed, now) + } +} + +func TestWatchLatencyTracker_RecordWriteAndHandleEvent(t *testing.T) { + initialTime := time.Date(2026, 6, 7, 9, 0, 0, 0, time.UTC) + fakeClock := testingclock.NewFakeClock(initialTime) + tracker := NewWatchLatencyTracker(fakeClock) + + // Record 100 events: + // - 98 events (1 to 98) with 10ms delay + // - 1 event (99th) with 50ms delay (this is the 99%ile) + // - 1 event (100th, which is the max) with 100ms delay + for i := 1; i <= 100; i++ { + pod := &example.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("pod-%d", i), + }, + } + tracker.RecordWrite(pod) + + var delay time.Duration + switch i { + case 99: + delay = 50 * time.Millisecond + case 100: + delay = 100 * time.Millisecond + default: + delay = 10 * time.Millisecond + } + fakeClock.Step(delay) + tracker.HandleEvent(pod) + + // Reset back to base fake clock time for the next pod so it's not cumulative + fakeClock.SetTime(initialTime) + } + + p99 := tracker.GetP99Latency() + if p99 != 50*time.Millisecond { + t.Errorf("Expected P99 watch latency to be 50ms, got %v (max was 100ms)", p99) + } +}