Signed-off-by: bwplotka <bwplotka@gmail.com>
This commit is contained in:
bwplotka 2026-03-05 10:58:50 +00:00
parent 9594506888
commit 062910e0b8
3 changed files with 257 additions and 85 deletions

View file

@ -310,8 +310,45 @@ func (d *Decoder) DecodeLabels(dec *encoding.Decbuf) labels.Labels {
return d.builder.Labels()
}
type Stats struct {
EmptyRecordsCount int
SampleDecodeSizeMax int
SampleDecodeSizeMin int
SampleDecodeSizeSum int
SampleDecodeCount int
SamplesCount int
}
func (s *Stats) String() string {
panic("uncomment Decoder.Samples!")
if s.SampleDecodeCount == 0 {
return "no samples decoded"
}
return fmt.Sprintf("EmptyRecordsCount: %d, SampleDecodeSizeMax: %d, SampleDecodeSizeMin: %d, SampleDecodeSizeSum: %d, SampleDecodeCount: %d, SamplesCount: %d, AvgSizePerRecord: %f, AvgSamplesPerRecord: %f",
s.EmptyRecordsCount, s.SampleDecodeSizeMax, s.SampleDecodeSizeMin, s.SampleDecodeSizeSum, s.SampleDecodeCount, s.SamplesCount, float64(s.SampleDecodeSizeSum)/float64(s.SampleDecodeCount), float64(s.SamplesCount)/float64(s.SampleDecodeCount))
}
// Create string function that prints Stats and calculate average size per record
var YoloStats = &Stats{}
// Samples appends samples in rec to the given slice.
func (*Decoder) Samples(rec []byte, samples []RefSample) ([]RefSample, error) {
//defer func() {
// if YoloStats.SampleDecodeSizeMax == 0 || YoloStats.SampleDecodeSizeMax < len(rec) {
// YoloStats.SampleDecodeSizeMax = len(rec)
// }
// if YoloStats.SampleDecodeSizeMin == 0 || YoloStats.SampleDecodeSizeMin > len(rec) {
// YoloStats.SampleDecodeSizeMin = len(rec)
// }
// if len(samples) == 0 {
// YoloStats.EmptyRecordsCount++
// }
// YoloStats.SampleDecodeSizeSum += len(rec)
// YoloStats.SampleDecodeCount++
// YoloStats.SamplesCount += len(samples)
//}()
dec := encoding.Decbuf{B: rec}
if Type(dec.Byte()) != Samples {

View file

@ -14,6 +14,7 @@ package wlog
import (
"fmt"
"io"
"math"
"math/rand"
"os"
@ -92,6 +93,36 @@ type writeToMock struct {
delay time.Duration
}
// PrintStats prints some statistics to os.Stdout e.g.
// * *Stores, *Appends counters
// *len of *Stored and *Appended data
func (wtm *writeToMock) PrintStats() {
wtm.mu.Lock()
defer wtm.mu.Unlock()
fmt.Printf("seriesStores: %d (data: %d)\n", wtm.seriesStores, len(wtm.seriesStored))
fmt.Printf("metadataStores: %d (data: %d)\n", wtm.metadataStores, len(wtm.metadataStored))
fmt.Printf("sampleAppends: %d (data: %d)\n", wtm.sampleAppends, len(wtm.samplesAppended))
fmt.Printf("exemplarAppends: %d (data: %d)\n", wtm.exemplarAppends, len(wtm.exemplarsAppended))
fmt.Printf("histogramAppends: %d (data: %d)\n", wtm.histogramAppends, len(wtm.histogramsAppended))
fmt.Printf("floatHistogramsAppends: %d (data: %d)\n", wtm.floatHistogramsAppends, len(wtm.floatHistogramsAppended))
minT := int64(math.MaxInt64)
maxT := int64(math.MinInt64)
samplesPerRef := map[chunks.HeadSeriesRef]int{}
for _, s := range wtm.samplesAppended {
minT = min(minT, s.T)
maxT = max(maxT, s.T)
samplesPerRef[s.Ref]++
}
var avgSamplesPerRef float64
if len(wtm.samplesAppended) > 0 {
avgSamplesPerRef = float64(len(wtm.samplesAppended)) / float64(len(samplesPerRef))
}
fmt.Printf("activeSeries from segment: %d, activeSeries calculated: %d, samplesPerRef: %v\n", len(wtm.seriesSegmentIndexes), len(samplesPerRef), avgSamplesPerRef)
fmt.Println("time gap", timestamp.Time(minT), "-", timestamp.Time(maxT), timestamp.Time(maxT).Sub(timestamp.Time(minT)))
}
func (wtm *writeToMock) Reset() {
wtm.mu.Lock()
defer wtm.mu.Unlock()
@ -389,117 +420,220 @@ func TestWatcher_Tail(t *testing.T) {
}
}
const (
// Captured from https://github.com/prometheus/prometheus/pull/18062 run.
prombenchSegment = "../../../test-infra/x.bwplotka/testdata/pr18062/00000916"
prombenchSegment2 = "../../../test-infra/x.bwplotka/testdata/main/00000916"
)
func TestInspect(t *testing.T) {
inspectSeg(t, prombenchSegment)
}
func inspectSeg(t testing.TB, segPath string) {
fmt.Println("-------", segPath, "-----")
wt := newWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "test", wt, t.TempDir(), true, true, true)
watcher.SetMetrics()
watcher.SetStartTime(timestamp.Time(math.MinInt64))
segment, err := OpenReadSegment(segPath)
require.NoError(t, err)
t.Cleanup(func() {
_ = segment.Close()
})
segReader := NewLiveReader(watcher.logger, watcher.readerMetrics, segment)
info, err := segment.Stat()
require.NoError(t, err)
fmt.Println("SegmentSize:", info.Size())
// Read segment.
require.NoError(t, watcher.readAndHandleError(segReader, 0, true, int64(math.MaxInt64)))
wt.PrintStats()
// fmt.Println(record.YoloStats.String())
record.YoloStats = &record.Stats{} // Reset.
}
func TestGrowth(t *testing.T) {
sizes := [4]int{99470455, 102152938, 104982892, 107704969}
fmt.Println(float64(sizes[3]-sizes[0]) / 15.0)
fmt.Println(float64(sizes[1]-sizes[0]) / 5.0)
fmt.Println(float64(sizes[2]-sizes[1]) / 5.0)
fmt.Println(float64(sizes[3]-sizes[2]) / 5.0)
}
func syntheticSegment(t testing.TB, appendCase testwal.RecordsCase, appends int, compress compression.Type) (string, []int) {
var (
now = time.Now()
dir = t.TempDir()
wdir = path.Join(dir, "wal")
)
require.NoError(t, os.Mkdir(wdir, 0o777))
appendCase.TsFn = func(_, _ int) int64 {
return timestamp.FromTime(now.Add(1 * time.Second))
}
appendRecords := testwal.GenerateRecords(appendCase).Combine()
// Create WAL for the segment and populate a single segment.
w, err := NewSize(nil, nil, wdir, DefaultSegmentSize, compress)
require.NoError(t, err)
t.Cleanup(func() {
_ = w.Close()
})
offsets := make([]int, 0, appends)
for range appends {
_ = PopulateTest(t, w, appendRecords, nil)
_, off, err := w.LastSegmentAndOffset()
require.NoError(t, err)
offsets = append(offsets, off)
}
require.NoError(t, w.Close())
first, last, err := Segments(w.Dir())
require.NoError(t, err)
require.Equal(t, 0, last, "expected a single segments, got %v", last+1)
return SegmentName(w.Dir(), first), offsets
}
// Recommended CLI invocation:
/*
export bench=watcherRead && go test ./tsdb/wlog/... \
-run '^$' -bench '^BenchmarkWatcher_ReadSegment' \
-benchtime 50x -count 6 -cpu 2 -timeout 999m \
-benchtime 10x -count 6 -cpu 2 -timeout 999m \
| tee ${bench}.txt
export bench=watcherReadp && go test ./tsdb/wlog/... \
-run '^$' -bench '^BenchmarkWatcher_ReadSegment/compress=snappy/case=1000samples$' \
-benchtime 100x -count 1 -cpu 2 -timeout 999m -cpuprofile=${bench}.cpu.pprof -memprofile=${bench}.mem.pprof \
export bench=watcherReadp1 && go test ./tsdb/wlog/... \
-run '^$' -bench '^BenchmarkWatcher_ReadSegment/case=pr18062/compression=snappy$' \
-benchtime 30x -count 1 -cpu 2 -timeout 999m -cpuprofile=${bench}.cpu.pprof -memprofile=${bench}.mem.pprof \
| tee ${bench}.txt
export bench=watcherReadp2 && go test ./tsdb/wlog/... \
-run '^$' -bench '^BenchmarkWatcher_ReadSegment/case=main18062/compression=snappy$' \
-benchtime 30x -count 1 -cpu 2 -timeout 999m -cpuprofile=${bench}.cpu.pprof -memprofile=${bench}.mem.pprof \
| tee ${bench}.txt
*/
func BenchmarkWatcher_ReadSegment(b *testing.B) {
for _, compress := range compression.Types() {
for _, recCase := range []testwal.RecordsCase{
// TODO(bwplotka) Improve testwal.RecordsCase, so it allows variety of scrape-like data
// and test here.
{
Name: "1000samples",
Series: 1000,
SamplesPerSeries: 1,
ExemplarsPerSeries: 1,
},
{
Name: "1000histograms",
Series: 1000,
HistogramsPerSeries: 1,
ExemplarsPerSeries: 1,
},
} {
b.Run(fmt.Sprintf("compress=%s/case=%v", compress, recCase.Name), func(b *testing.B) {
var (
now = time.Now()
dir = b.TempDir()
wdir = path.Join(dir, "wal")
)
require.NoError(b, os.Mkdir(wdir, 0o777))
// Generate and pre-encode a single segment that watcher will be reading.
recCase.TsFn = func(_, _ int) int64 {
return timestamp.FromTime(now.Add(1 * time.Second))
}
// A recs represents records from a single "batch", so
// a set of record slices that might have come from a single scrape or RW/OTLP receive.
records := testwal.GenerateRecords(recCase).Combine()
// Create WAL for the segment and populate a single segment.
w, err := NewSize(nil, nil, wdir, DefaultSegmentSize, compress)
require.NoError(b, err)
b.Cleanup(func() {
_ = w.Close()
})
// Log a few batches for the larger segment. In a large Prometheus instance, every Appender Commit
// notifies WAL watchers which
// Enough so it's almost full, but small enough so it fits
// the DefaultSegmentSize without compression.
const batches = 100
for range batches {
_ = PopulateTest(b, w, records, nil)
}
require.NoError(b, w.Close())
first, last, err := Segments(w.Dir())
require.NoError(b, err)
require.Equal(b, 0, last, "expected a single segments, got %v", last+1)
seg := SegmentName(w.Dir(), first)
info, err := os.Stat(seg)
require.NoError(b, err)
synthSeg, synthOffsets := syntheticSegment(b, testwal.RecordsCase{
Name: "1000samples",
// The exact shape is wrong.
Series: 100, // 100 series, 100 metadata
SamplesPerSeries: 2, // 200 samples overall
HistogramsPerSeries: 2, // 200 histograms
ExemplarsPerSeries: 1, // 100 exemplars
}, 1000, compression.Snappy)
offsetsBuf := make([]int64, 0, 60e3)
for _, tc := range []struct {
name string
segmentPath string
offsets []int // If empty, a single record is a single offset.
}{
{
name: "data=pr18062/compression=snappy",
segmentPath: prombenchSegment,
},
{
name: "data=main18062/compression=snappy",
segmentPath: prombenchSegment2,
},
{
name: "data=synth5Rec/compression=snappy",
segmentPath: synthSeg,
offsets: synthOffsets,
},
} {
b.Run(tc.name, func(b *testing.B) {
b.Run("case=one-go", func(b *testing.B) {
// Start watcher to that reads into a bench mock that only records sampleAppends.
wt := newBenchWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "test", wt, dir, true, true, true)
watcher := NewWatcher(wMetrics, nil, nil, "test", wt, b.TempDir(), true, true, true)
watcher.SetMetrics()
// Update the time because we just created samples around "now" time and watcher
// only starts watching after that time.
watcher.SetStartTime(now)
// Update the time because by default, watchers starts with start time=="now" time and watcher
// only starts reading data after that time.
watcher.SetStartTime(timestamp.Time(math.MinInt64))
segment, err := OpenReadSegment(tc.segmentPath)
require.NoError(b, err)
b.Cleanup(func() {
_ = segment.Close()
})
// Warm any caches.
segReader := NewLiveReader(watcher.logger, watcher.readerMetrics, segment)
require.NoError(b, watcher.readAndHandleError(segReader, 0, true, int64(math.MaxInt64)))
b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
b.StopTimer()
wt.Reset()
segment, err := OpenReadSegment(seg)
_, err = segment.SegmentFile.Seek(0, io.SeekStart)
require.NoError(b, err)
b.Cleanup(func() {
_ = segment.Close()
})
segReader := NewLiveReader(watcher.logger, watcher.readerMetrics, segment)
segReader = NewLiveReader(watcher.logger, watcher.readerMetrics, segment)
b.StartTimer()
b.ReportMetric(float64(info.Size()), "segmentBytesRead/op")
// Benchmarked code.
// Simulate what Start -> Run -> watch would do on a set of TSDB commits in similar time.
require.NoError(b, watcher.readAndHandleError(segReader, 0, true, int64(math.MaxInt64)))
// Close segment file.
_ = segment.Close()
// Quick check if data was actually read.
require.Equal(b, batches, wt.seriesStores)
require.Equal(b, batches, wt.metadataStores)
require.Equal(b, recCase.SamplesPerSeries*batches, wt.sampleAppends)
require.Equal(b, recCase.HistogramsPerSeries*batches, wt.histogramAppends)
require.Equal(b, batches, wt.exemplarAppends)
b.StartTimer()
b.ReportMetric(float64(segReader.Offset()), "readBytes/op")
b.ReportMetric(float64(wt.sampleAppends), "sampleAppends/op")
}
})
}
b.Run("case=per-scrape", func(b *testing.B) {
// Start watcher to that reads into a bench mock that only records sampleAppends.
wt := newBenchWriteToMock(0)
watcher := NewWatcher(wMetrics, nil, nil, "test", wt, b.TempDir(), true, true, true)
watcher.SetMetrics()
// Update the time because by default, watchers starts with start time=="now" time and watcher
// only starts reading data after that time.
watcher.SetStartTime(timestamp.Time(math.MinInt64))
segment, err := OpenReadSegment(tc.segmentPath)
require.NoError(b, err)
b.Cleanup(func() {
_ = segment.Close()
})
segReader := NewLiveReader(watcher.logger, watcher.readerMetrics, segment)
// As per https://docs.google.com/document/d/1efVAMcEw7-R_KatHHcobcFBlNsre-DoThVHI8AO2SDQ/edit?tab=t.0
// We could assume in a worst case watcher will tail segment scrape by scrape (appender commit Notifies watcher).
if len(tc.offsets) == 0 {
// Find offsets of all records (samples for now as we know input has 100% samples).
// Pack all offsets to read and we will iterate through all. It's naive, but effective (~50k elems).
offsetsBuf = offsetsBuf[:0]
for segReader.Next() {
offsetsBuf = append(offsetsBuf, segReader.Offset())
}
if err := segReader.Err(); err != nil && err != io.EOF {
b.Fatal(err)
}
}
limitReader := &io.LimitedReader{R: segment}
b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
b.StopTimer()
wt.Reset()
_, err = segment.SegmentFile.Seek(0, io.SeekStart)
require.NoError(b, err)
// TODO Pass IO reader that allows to control how much io.Reader reads are offered via segment io.Reader.
segReader = NewLiveReader(watcher.logger, watcher.readerMetrics, limitReader)
b.StartTimer()
for i := range offsetsBuf {
limitReader.N = offsetsBuf[i]
require.NoError(b, watcher.readAndHandleError(segReader, 0, true, int64(math.MaxInt64)))
}
b.ReportMetric(float64(segReader.Offset()), "readBytes/op")
b.ReportMetric(float64(wt.sampleAppends), "sampleAppends/op")
}
})
})
}
}

View file

@ -85,6 +85,7 @@ type SegmentFile interface {
io.Writer
io.Reader
io.Closer
io.Seeker
}
// Segment represents a segment file.