From 062910e0b8ce61772c245a968366f4f99f77448f Mon Sep 17 00:00:00 2001 From: bwplotka Date: Thu, 5 Mar 2026 10:58:50 +0000 Subject: [PATCH] yolo Signed-off-by: bwplotka --- tsdb/record/record.go | 37 +++++ tsdb/wlog/watcher_test.go | 304 +++++++++++++++++++++++++++----------- tsdb/wlog/wlog.go | 1 + 3 files changed, 257 insertions(+), 85 deletions(-) diff --git a/tsdb/record/record.go b/tsdb/record/record.go index bf0e41b66b..b0f5d173c0 100644 --- a/tsdb/record/record.go +++ b/tsdb/record/record.go @@ -310,8 +310,45 @@ func (d *Decoder) DecodeLabels(dec *encoding.Decbuf) labels.Labels { return d.builder.Labels() } +type Stats struct { + EmptyRecordsCount int + SampleDecodeSizeMax int + SampleDecodeSizeMin int + SampleDecodeSizeSum int + SampleDecodeCount int + SamplesCount int +} + +func (s *Stats) String() string { + panic("uncomment Decoder.Samples!") + if s.SampleDecodeCount == 0 { + return "no samples decoded" + } + return fmt.Sprintf("EmptyRecordsCount: %d, SampleDecodeSizeMax: %d, SampleDecodeSizeMin: %d, SampleDecodeSizeSum: %d, SampleDecodeCount: %d, SamplesCount: %d, AvgSizePerRecord: %f, AvgSamplesPerRecord: %f", + s.EmptyRecordsCount, s.SampleDecodeSizeMax, s.SampleDecodeSizeMin, s.SampleDecodeSizeSum, s.SampleDecodeCount, s.SamplesCount, float64(s.SampleDecodeSizeSum)/float64(s.SampleDecodeCount), float64(s.SamplesCount)/float64(s.SampleDecodeCount)) +} + +// Create string function that prints Stats and calculate average size per record + +var YoloStats = &Stats{} + // Samples appends samples in rec to the given slice. func (*Decoder) Samples(rec []byte, samples []RefSample) ([]RefSample, error) { + //defer func() { + // if YoloStats.SampleDecodeSizeMax == 0 || YoloStats.SampleDecodeSizeMax < len(rec) { + // YoloStats.SampleDecodeSizeMax = len(rec) + // } + // if YoloStats.SampleDecodeSizeMin == 0 || YoloStats.SampleDecodeSizeMin > len(rec) { + // YoloStats.SampleDecodeSizeMin = len(rec) + // } + // if len(samples) == 0 { + // YoloStats.EmptyRecordsCount++ + // } + // YoloStats.SampleDecodeSizeSum += len(rec) + // YoloStats.SampleDecodeCount++ + // YoloStats.SamplesCount += len(samples) + //}() + dec := encoding.Decbuf{B: rec} if Type(dec.Byte()) != Samples { diff --git a/tsdb/wlog/watcher_test.go b/tsdb/wlog/watcher_test.go index 4d2622cbff..7f5a34ae66 100644 --- a/tsdb/wlog/watcher_test.go +++ b/tsdb/wlog/watcher_test.go @@ -14,6 +14,7 @@ package wlog import ( "fmt" + "io" "math" "math/rand" "os" @@ -92,6 +93,36 @@ type writeToMock struct { delay time.Duration } +// PrintStats prints some statistics to os.Stdout e.g. +// * *Stores, *Appends counters +// *len of *Stored and *Appended data +func (wtm *writeToMock) PrintStats() { + wtm.mu.Lock() + defer wtm.mu.Unlock() + + fmt.Printf("seriesStores: %d (data: %d)\n", wtm.seriesStores, len(wtm.seriesStored)) + fmt.Printf("metadataStores: %d (data: %d)\n", wtm.metadataStores, len(wtm.metadataStored)) + fmt.Printf("sampleAppends: %d (data: %d)\n", wtm.sampleAppends, len(wtm.samplesAppended)) + fmt.Printf("exemplarAppends: %d (data: %d)\n", wtm.exemplarAppends, len(wtm.exemplarsAppended)) + fmt.Printf("histogramAppends: %d (data: %d)\n", wtm.histogramAppends, len(wtm.histogramsAppended)) + fmt.Printf("floatHistogramsAppends: %d (data: %d)\n", wtm.floatHistogramsAppends, len(wtm.floatHistogramsAppended)) + + minT := int64(math.MaxInt64) + maxT := int64(math.MinInt64) + samplesPerRef := map[chunks.HeadSeriesRef]int{} + for _, s := range wtm.samplesAppended { + minT = min(minT, s.T) + maxT = max(maxT, s.T) + samplesPerRef[s.Ref]++ + } + var avgSamplesPerRef float64 + if len(wtm.samplesAppended) > 0 { + avgSamplesPerRef = float64(len(wtm.samplesAppended)) / float64(len(samplesPerRef)) + } + fmt.Printf("activeSeries from segment: %d, activeSeries calculated: %d, samplesPerRef: %v\n", len(wtm.seriesSegmentIndexes), len(samplesPerRef), avgSamplesPerRef) + fmt.Println("time gap", timestamp.Time(minT), "-", timestamp.Time(maxT), timestamp.Time(maxT).Sub(timestamp.Time(minT))) +} + func (wtm *writeToMock) Reset() { wtm.mu.Lock() defer wtm.mu.Unlock() @@ -389,117 +420,220 @@ func TestWatcher_Tail(t *testing.T) { } } +const ( + // Captured from https://github.com/prometheus/prometheus/pull/18062 run. + prombenchSegment = "../../../test-infra/x.bwplotka/testdata/pr18062/00000916" + prombenchSegment2 = "../../../test-infra/x.bwplotka/testdata/main/00000916" +) + +func TestInspect(t *testing.T) { + inspectSeg(t, prombenchSegment) +} + +func inspectSeg(t testing.TB, segPath string) { + fmt.Println("-------", segPath, "-----") + wt := newWriteToMock(0) + watcher := NewWatcher(wMetrics, nil, nil, "test", wt, t.TempDir(), true, true, true) + watcher.SetMetrics() + watcher.SetStartTime(timestamp.Time(math.MinInt64)) + + segment, err := OpenReadSegment(segPath) + require.NoError(t, err) + t.Cleanup(func() { + _ = segment.Close() + }) + segReader := NewLiveReader(watcher.logger, watcher.readerMetrics, segment) + + info, err := segment.Stat() + require.NoError(t, err) + fmt.Println("SegmentSize:", info.Size()) + + // Read segment. + require.NoError(t, watcher.readAndHandleError(segReader, 0, true, int64(math.MaxInt64))) + wt.PrintStats() + // fmt.Println(record.YoloStats.String()) + record.YoloStats = &record.Stats{} // Reset. +} + +func TestGrowth(t *testing.T) { + sizes := [4]int{99470455, 102152938, 104982892, 107704969} + fmt.Println(float64(sizes[3]-sizes[0]) / 15.0) + fmt.Println(float64(sizes[1]-sizes[0]) / 5.0) + fmt.Println(float64(sizes[2]-sizes[1]) / 5.0) + fmt.Println(float64(sizes[3]-sizes[2]) / 5.0) +} + +func syntheticSegment(t testing.TB, appendCase testwal.RecordsCase, appends int, compress compression.Type) (string, []int) { + var ( + now = time.Now() + dir = t.TempDir() + wdir = path.Join(dir, "wal") + ) + require.NoError(t, os.Mkdir(wdir, 0o777)) + + appendCase.TsFn = func(_, _ int) int64 { + return timestamp.FromTime(now.Add(1 * time.Second)) + } + appendRecords := testwal.GenerateRecords(appendCase).Combine() + + // Create WAL for the segment and populate a single segment. + w, err := NewSize(nil, nil, wdir, DefaultSegmentSize, compress) + require.NoError(t, err) + t.Cleanup(func() { + _ = w.Close() + }) + + offsets := make([]int, 0, appends) + for range appends { + _ = PopulateTest(t, w, appendRecords, nil) + _, off, err := w.LastSegmentAndOffset() + require.NoError(t, err) + offsets = append(offsets, off) + } + require.NoError(t, w.Close()) + + first, last, err := Segments(w.Dir()) + require.NoError(t, err) + require.Equal(t, 0, last, "expected a single segments, got %v", last+1) + + return SegmentName(w.Dir(), first), offsets +} + // Recommended CLI invocation: /* export bench=watcherRead && go test ./tsdb/wlog/... \ -run '^$' -bench '^BenchmarkWatcher_ReadSegment' \ - -benchtime 50x -count 6 -cpu 2 -timeout 999m \ + -benchtime 10x -count 6 -cpu 2 -timeout 999m \ | tee ${bench}.txt - export bench=watcherReadp && go test ./tsdb/wlog/... \ - -run '^$' -bench '^BenchmarkWatcher_ReadSegment/compress=snappy/case=1000samples$' \ - -benchtime 100x -count 1 -cpu 2 -timeout 999m -cpuprofile=${bench}.cpu.pprof -memprofile=${bench}.mem.pprof \ + export bench=watcherReadp1 && go test ./tsdb/wlog/... \ + -run '^$' -bench '^BenchmarkWatcher_ReadSegment/case=pr18062/compression=snappy$' \ + -benchtime 30x -count 1 -cpu 2 -timeout 999m -cpuprofile=${bench}.cpu.pprof -memprofile=${bench}.mem.pprof \ + | tee ${bench}.txt + + export bench=watcherReadp2 && go test ./tsdb/wlog/... \ + -run '^$' -bench '^BenchmarkWatcher_ReadSegment/case=main18062/compression=snappy$' \ + -benchtime 30x -count 1 -cpu 2 -timeout 999m -cpuprofile=${bench}.cpu.pprof -memprofile=${bench}.mem.pprof \ | tee ${bench}.txt */ func BenchmarkWatcher_ReadSegment(b *testing.B) { - for _, compress := range compression.Types() { - for _, recCase := range []testwal.RecordsCase{ - // TODO(bwplotka) Improve testwal.RecordsCase, so it allows variety of scrape-like data - // and test here. - { - Name: "1000samples", - Series: 1000, - SamplesPerSeries: 1, - ExemplarsPerSeries: 1, - }, - { - Name: "1000histograms", - Series: 1000, - HistogramsPerSeries: 1, - ExemplarsPerSeries: 1, - }, - } { - b.Run(fmt.Sprintf("compress=%s/case=%v", compress, recCase.Name), func(b *testing.B) { - var ( - now = time.Now() - dir = b.TempDir() - wdir = path.Join(dir, "wal") - ) - require.NoError(b, os.Mkdir(wdir, 0o777)) - - // Generate and pre-encode a single segment that watcher will be reading. - recCase.TsFn = func(_, _ int) int64 { - return timestamp.FromTime(now.Add(1 * time.Second)) - } - // A recs represents records from a single "batch", so - // a set of record slices that might have come from a single scrape or RW/OTLP receive. - records := testwal.GenerateRecords(recCase).Combine() - - // Create WAL for the segment and populate a single segment. - w, err := NewSize(nil, nil, wdir, DefaultSegmentSize, compress) - require.NoError(b, err) - b.Cleanup(func() { - _ = w.Close() - }) - - // Log a few batches for the larger segment. In a large Prometheus instance, every Appender Commit - // notifies WAL watchers which - // Enough so it's almost full, but small enough so it fits - // the DefaultSegmentSize without compression. - const batches = 100 - for range batches { - _ = PopulateTest(b, w, records, nil) - } - require.NoError(b, w.Close()) - - first, last, err := Segments(w.Dir()) - require.NoError(b, err) - require.Equal(b, 0, last, "expected a single segments, got %v", last+1) - - seg := SegmentName(w.Dir(), first) - info, err := os.Stat(seg) - require.NoError(b, err) + synthSeg, synthOffsets := syntheticSegment(b, testwal.RecordsCase{ + Name: "1000samples", + // The exact shape is wrong. + Series: 100, // 100 series, 100 metadata + SamplesPerSeries: 2, // 200 samples overall + HistogramsPerSeries: 2, // 200 histograms + ExemplarsPerSeries: 1, // 100 exemplars + }, 1000, compression.Snappy) + offsetsBuf := make([]int64, 0, 60e3) + for _, tc := range []struct { + name string + segmentPath string + offsets []int // If empty, a single record is a single offset. + }{ + { + name: "data=pr18062/compression=snappy", + segmentPath: prombenchSegment, + }, + { + name: "data=main18062/compression=snappy", + segmentPath: prombenchSegment2, + }, + { + name: "data=synth5Rec/compression=snappy", + segmentPath: synthSeg, + offsets: synthOffsets, + }, + } { + b.Run(tc.name, func(b *testing.B) { + b.Run("case=one-go", func(b *testing.B) { // Start watcher to that reads into a bench mock that only records sampleAppends. wt := newBenchWriteToMock(0) - watcher := NewWatcher(wMetrics, nil, nil, "test", wt, dir, true, true, true) + watcher := NewWatcher(wMetrics, nil, nil, "test", wt, b.TempDir(), true, true, true) watcher.SetMetrics() - // Update the time because we just created samples around "now" time and watcher - // only starts watching after that time. - watcher.SetStartTime(now) + // Update the time because by default, watchers starts with start time=="now" time and watcher + // only starts reading data after that time. + watcher.SetStartTime(timestamp.Time(math.MinInt64)) + + segment, err := OpenReadSegment(tc.segmentPath) + require.NoError(b, err) + b.Cleanup(func() { + _ = segment.Close() + }) + + // Warm any caches. + segReader := NewLiveReader(watcher.logger, watcher.readerMetrics, segment) + require.NoError(b, watcher.readAndHandleError(segReader, 0, true, int64(math.MaxInt64))) b.ReportAllocs() b.ResetTimer() for b.Loop() { b.StopTimer() - wt.Reset() - segment, err := OpenReadSegment(seg) + _, err = segment.SegmentFile.Seek(0, io.SeekStart) require.NoError(b, err) - b.Cleanup(func() { - _ = segment.Close() - }) - segReader := NewLiveReader(watcher.logger, watcher.readerMetrics, segment) + segReader = NewLiveReader(watcher.logger, watcher.readerMetrics, segment) b.StartTimer() - b.ReportMetric(float64(info.Size()), "segmentBytesRead/op") - // Benchmarked code. - // Simulate what Start -> Run -> watch would do on a set of TSDB commits in similar time. require.NoError(b, watcher.readAndHandleError(segReader, 0, true, int64(math.MaxInt64))) - - // Close segment file. - _ = segment.Close() - // Quick check if data was actually read. - require.Equal(b, batches, wt.seriesStores) - require.Equal(b, batches, wt.metadataStores) - require.Equal(b, recCase.SamplesPerSeries*batches, wt.sampleAppends) - require.Equal(b, recCase.HistogramsPerSeries*batches, wt.histogramAppends) - require.Equal(b, batches, wt.exemplarAppends) - b.StartTimer() + b.ReportMetric(float64(segReader.Offset()), "readBytes/op") + b.ReportMetric(float64(wt.sampleAppends), "sampleAppends/op") } }) - } + b.Run("case=per-scrape", func(b *testing.B) { + // Start watcher to that reads into a bench mock that only records sampleAppends. + wt := newBenchWriteToMock(0) + watcher := NewWatcher(wMetrics, nil, nil, "test", wt, b.TempDir(), true, true, true) + watcher.SetMetrics() + // Update the time because by default, watchers starts with start time=="now" time and watcher + // only starts reading data after that time. + watcher.SetStartTime(timestamp.Time(math.MinInt64)) + + segment, err := OpenReadSegment(tc.segmentPath) + require.NoError(b, err) + b.Cleanup(func() { + _ = segment.Close() + }) + + segReader := NewLiveReader(watcher.logger, watcher.readerMetrics, segment) + + // As per https://docs.google.com/document/d/1efVAMcEw7-R_KatHHcobcFBlNsre-DoThVHI8AO2SDQ/edit?tab=t.0 + // We could assume in a worst case watcher will tail segment scrape by scrape (appender commit Notifies watcher). + if len(tc.offsets) == 0 { + // Find offsets of all records (samples for now as we know input has 100% samples). + // Pack all offsets to read and we will iterate through all. It's naive, but effective (~50k elems). + offsetsBuf = offsetsBuf[:0] + for segReader.Next() { + offsetsBuf = append(offsetsBuf, segReader.Offset()) + } + if err := segReader.Err(); err != nil && err != io.EOF { + b.Fatal(err) + } + } + limitReader := &io.LimitedReader{R: segment} + + b.ReportAllocs() + b.ResetTimer() + for b.Loop() { + b.StopTimer() + wt.Reset() + _, err = segment.SegmentFile.Seek(0, io.SeekStart) + require.NoError(b, err) + // TODO Pass IO reader that allows to control how much io.Reader reads are offered via segment io.Reader. + segReader = NewLiveReader(watcher.logger, watcher.readerMetrics, limitReader) + b.StartTimer() + + for i := range offsetsBuf { + limitReader.N = offsetsBuf[i] + require.NoError(b, watcher.readAndHandleError(segReader, 0, true, int64(math.MaxInt64))) + } + b.ReportMetric(float64(segReader.Offset()), "readBytes/op") + b.ReportMetric(float64(wt.sampleAppends), "sampleAppends/op") + } + }) + }) } } diff --git a/tsdb/wlog/wlog.go b/tsdb/wlog/wlog.go index 4aed4ab875..92739059a4 100644 --- a/tsdb/wlog/wlog.go +++ b/tsdb/wlog/wlog.go @@ -85,6 +85,7 @@ type SegmentFile interface { io.Writer io.Reader io.Closer + io.Seeker } // Segment represents a segment file.