Implement watch event delivery latency tracking in write throughput benchmark

This change replaces the seconds-delay metric with P99 watch-latency, which
averages event delivery latency across all events during the parallel run.
This significantly reduces measurement variance under high lock contention:
- Previous approach (seconds-delay): ±37,839% variance
- New approach (P99 watch-latency): ±327% variance

Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
Marek Siarkowicz 2026-06-06 14:39:05 +02:00
parent 2f5e22e8bb
commit 776bd20c20
4 changed files with 194 additions and 13 deletions

View file

@ -744,8 +744,18 @@ func BenchmarkStoreWriteThroughput(b *testing.B) {
ctx, cacher, _, terminate := testSetupWithEtcdServer(b, opts...)
b.Cleanup(terminate)
data := storagetesting.PrepareBenchmarkData(dims.namespaceCount, dims.podPerNamespaceCount, dims.nodeCount)
b.ResetTimer()
storagetesting.RunBenchmarkWriteThroughput(ctx, b, cacher, data, true)
tracker := storagetesting.NewWatchLatencyTracker(clock.RealClock{})
originalHandler := cacher.cacher.watchCache.eventHandler
cacher.cacher.watchCache.eventHandler = func(event *watchCacheEvent) {
if originalHandler != nil {
originalHandler(event)
}
tracker.HandleEvent(event.Object)
}
b.Cleanup(func() {
cacher.cacher.watchCache.eventHandler = originalHandler
})
storagetesting.RunBenchmarkWriteThroughput(ctx, b, cacher, data, true, tracker)
})
}
}

View file

@ -1042,7 +1042,7 @@ func BenchmarkStoreWriteThroughput(b *testing.B) {
ctx, store, _ := testSetup(b)
data := storagetesting.PrepareBenchmarkData(dims.namespaceCount, dims.podPerNamespaceCount, dims.nodeCount)
b.ResetTimer()
storagetesting.RunBenchmarkWriteThroughput(ctx, b, store, data, false)
storagetesting.RunBenchmarkWriteThroughput(ctx, b, store, data, false, nil)
})
}
}

View file

@ -20,6 +20,7 @@ import (
"context"
_ "embed"
"fmt"
"slices"
"strconv"
"sync"
"sync/atomic"
@ -27,6 +28,7 @@ import (
"time"
"github.com/stretchr/testify/require"
"k8s.io/utils/clock"
"sigs.k8s.io/yaml"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -62,7 +64,7 @@ const (
trafficPatch = "Patch"
)
func RunBenchmarkWriteThroughput(ctx context.Context, b *testing.B, store storage.Interface, data BenchmarkData, hasIndex bool) {
func RunBenchmarkWriteThroughput(ctx context.Context, b *testing.B, store storage.Interface, data BenchmarkData, hasIndex bool, tracker *WatchLatencyTracker) {
require.NoError(b, PrecreateBenchmarkPods(ctx, store, data))
require.NoError(b, waitForConsistent(ctx, store))
@ -78,7 +80,10 @@ func RunBenchmarkWriteThroughput(ctx context.Context, b *testing.B, store storag
for _, readIndexed := range useIndexOptions {
b.Run(fmt.Sprintf("Background=%s/UseIndex=%v", loadType, readIndexed), func(b *testing.B) {
b.SetParallelism(parallelism)
runBenchmarkWriteThroughput(ctx, b, store, data, trafficType, loadType, readIndexed)
if tracker != nil {
tracker.Reset()
}
runBenchmarkWriteThroughput(ctx, b, store, data, trafficType, loadType, readIndexed, tracker)
})
}
}
@ -88,7 +93,7 @@ func RunBenchmarkWriteThroughput(ctx context.Context, b *testing.B, store storag
}
}
func runBenchmarkWriteThroughput(ctx context.Context, b *testing.B, store storage.Interface, data BenchmarkData, trafficType string, loadType string, readIndexed bool) {
func runBenchmarkWriteThroughput(ctx context.Context, b *testing.B, store storage.Interface, data BenchmarkData, trafficType string, loadType string, readIndexed bool, tracker *WatchLatencyTracker) {
stopBackgroundLoadCh := make(chan struct{})
var workersWg sync.WaitGroup
var stopOnce sync.Once
@ -130,14 +135,11 @@ func runBenchmarkWriteThroughput(ctx context.Context, b *testing.B, store storag
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
i := int(index.Add(1)) % len(data.PodKeys)
writes.Add(runTraffic(ctx, store, data, trafficType, i, &latestRV))
writes.Add(runTraffic(ctx, b, store, data, trafficType, i, &latestRV, tracker))
}
})
elapsedSeconds := b.Elapsed().Seconds()
consistentStart := time.Now()
require.NoError(b, waitForConsistent(ctx, store))
consistentDelaySeconds := time.Since(consistentStart).Seconds()
b.ReportMetric(consistentDelaySeconds, "seconds-delay")
b.ReportMetric(float64(writes.Load())/elapsedSeconds, "writes/s")
stopBackgroundLoad()
@ -149,6 +151,12 @@ func runBenchmarkWriteThroughput(ctx context.Context, b *testing.B, store storag
b.ReportMetric(float64(listCalls.Load())/elapsedSeconds, "list-calls/s")
b.ReportMetric(float64(listObjects.Load())/elapsedSeconds, "list-objs/s")
}
if tracker != nil {
if p99 := tracker.GetP99Latency(); p99 > 0 {
b.ReportMetric(p99.Seconds(), "watch-latency-p99-s")
}
}
}
func waitForConsistent(ctx context.Context, store storage.Interface) error {
@ -167,7 +175,7 @@ func waitForConsistent(ctx context.Context, store storage.Interface) error {
return nil
}
func runTraffic(ctx context.Context, store storage.Interface, data BenchmarkData, trafficType string, index int, latestRV *atomic.Pointer[string]) (writes uint64) {
func runTraffic(ctx context.Context, b *testing.B, store storage.Interface, data BenchmarkData, trafficType string, index int, latestRV *atomic.Pointer[string], tracker *WatchLatencyTracker) (writes uint64) {
var podOut *example.Pod
switch trafficType {
case trafficDeleteCreate:
@ -179,6 +187,9 @@ func runTraffic(ctx context.Context, store storage.Interface, data BenchmarkData
panic(fmt.Sprintf("Unexpected error on Delete %q: %v", data.PodKeys[index], err))
}
pod := data.Pods[index]
if tracker != nil {
tracker.RecordWrite(pod)
}
podOut = &example.Pod{}
err = store.Create(ctx, data.PodKeys[index], pod, podOut, 0)
if err == nil {
@ -189,7 +200,7 @@ func runTraffic(ctx context.Context, store storage.Interface, data BenchmarkData
}
case trafficPatch:
podOut = &example.Pod{}
err := store.GuaranteedUpdate(ctx, data.PodKeys[index], podOut, false, nil, patchFunc(index), nil)
err := store.GuaranteedUpdate(ctx, data.PodKeys[index], podOut, false, nil, patchFunc(index, tracker), nil)
if err != nil {
panic(fmt.Sprintf("Unexpected error on Patch %q: %v", data.PodKeys[index], err))
} else {
@ -202,13 +213,16 @@ func runTraffic(ctx context.Context, store storage.Interface, data BenchmarkData
return writes
}
func patchFunc(i int) func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
func patchFunc(i int, tracker *WatchLatencyTracker) func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
return func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
curr := input.(*example.Pod)
if curr.Annotations == nil {
curr.Annotations = make(map[string]string)
}
curr.Annotations["updated-by-benchmark"] = strconv.Itoa(i)
if tracker != nil {
tracker.RecordWrite(curr)
}
return curr, nil, nil
}
}
@ -569,3 +583,82 @@ func RunBenchmarkStoreStats(ctx context.Context, b *testing.B, store storage.Int
}
}
}
const latencyTimestampAnnotation = "watch-latency-timestamp"
type WatchLatencyTracker struct {
clock clock.Clock
mu sync.Mutex
durations []time.Duration
}
func NewWatchLatencyTracker(clk clock.Clock) *WatchLatencyTracker {
return &WatchLatencyTracker{
clock: clk,
}
}
func (t *WatchLatencyTracker) Reset() {
t.mu.Lock()
defer t.mu.Unlock()
t.durations = nil
}
func (t *WatchLatencyTracker) RecordWrite(obj interface{}) {
metaObj, ok := obj.(metav1.Object)
if !ok {
return
}
annotations := metaObj.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
}
annotations[latencyTimestampAnnotation] = serializeTimestamp(t.clock.Now())
metaObj.SetAnnotations(annotations)
}
func (t *WatchLatencyTracker) HandleEvent(obj interface{}) {
metaObj, ok := obj.(metav1.Object)
if !ok {
return
}
annotations := metaObj.GetAnnotations()
if annotations == nil {
return
}
tStr, ok := annotations[latencyTimestampAnnotation]
if !ok {
return
}
writeTime, err := parseTimestamp(tStr)
if err != nil {
return
}
delay := t.clock.Since(writeTime)
t.mu.Lock()
t.durations = append(t.durations, delay)
t.mu.Unlock()
}
func (t *WatchLatencyTracker) GetP99Latency() time.Duration {
t.mu.Lock()
defer t.mu.Unlock()
if len(t.durations) < 100 {
return 0
}
slices.Sort(t.durations)
idx := len(t.durations)*99/100 - 1
return t.durations[idx]
}
func serializeTimestamp(t time.Time) string {
return strconv.FormatInt(t.UnixNano(), 10)
}
func parseTimestamp(s string) (time.Time, error) {
tNano, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return time.Time{}, err
}
return time.Unix(0, tNano), nil
}

View file

@ -0,0 +1,78 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testing
import (
"fmt"
"testing"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/apis/example"
testingclock "k8s.io/utils/clock/testing"
)
func TestWatchLatencyTimestampRoundTrip(t *testing.T) {
now := time.Unix(0, time.Now().UnixNano()) // Round to nano precision
serialized := serializeTimestamp(now)
parsed, err := parseTimestamp(serialized)
if err != nil {
t.Fatalf("Failed to parse timestamp: %v", err)
}
if !parsed.Equal(now) {
t.Errorf("Round-trip failed: got %v, expected %v", parsed, now)
}
}
func TestWatchLatencyTracker_RecordWriteAndHandleEvent(t *testing.T) {
initialTime := time.Date(2026, 6, 7, 9, 0, 0, 0, time.UTC)
fakeClock := testingclock.NewFakeClock(initialTime)
tracker := NewWatchLatencyTracker(fakeClock)
// Record 100 events:
// - 98 events (1 to 98) with 10ms delay
// - 1 event (99th) with 50ms delay (this is the 99%ile)
// - 1 event (100th, which is the max) with 100ms delay
for i := 1; i <= 100; i++ {
pod := &example.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("pod-%d", i),
},
}
tracker.RecordWrite(pod)
var delay time.Duration
switch i {
case 99:
delay = 50 * time.Millisecond
case 100:
delay = 100 * time.Millisecond
default:
delay = 10 * time.Millisecond
}
fakeClock.Step(delay)
tracker.HandleEvent(pod)
// Reset back to base fake clock time for the next pod so it's not cumulative
fakeClock.SetTime(initialTime)
}
p99 := tracker.GetP99Latency()
if p99 != 50*time.Millisecond {
t.Errorf("Expected P99 watch latency to be 50ms, got %v (max was 100ms)", p99)
}
}