From 98809e40c64bd25d3dc4b01c1d4780713e1b3347 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Tue, 14 Apr 2026 17:11:35 +0200 Subject: [PATCH] tsdb: Skip clean series during periodic head chunk mmap (#18272) tsdb: Skip clean series during periodic head chunk mmap The periodic mmapHeadChunks cycle previously acquired a per-series lock on every series, even though typically >99% have nothing to mmap. This was identified as a CPU bottleneck in Grafana Mimir. Add a headChunkCount field (sync/atomic.Uint32) to memSeries that tracks the number of head chunks. It is incremented in cutNewHeadChunk and the histogram new-chunk paths, and reset by mmapChunks and truncateChunksBefore. mmapHeadChunks uses a lock-free Load to skip series with fewer than 2 head chunks, avoiding the per-series lock for clean series. sync/atomic.Uint32 (4 bytes) is used instead of go.uber.org/atomic (8 bytes) to fit in existing struct padding without growing memSeries. Chunk counts are bounded by the 3-byte field in HeadChunkRef, so cannot overflow uint32. Also fix pre-existing comment inaccuracies in the touched code: headChunks.next -> headChunks.prev, mmapHeadChunks() -> mmapChunks() in the doc comment, and a grammar error. --------- Signed-off-by: Arve Knudsen --- tsdb/head.go | 37 ++++++---- tsdb/head_append.go | 8 ++- tsdb/head_bench_test.go | 72 ++++++++++++++++++++ tsdb/head_test.go | 146 ++++++++++++++++++++++++++++++++++++++++ tsdb/head_wal.go | 2 + 5 files changed, 250 insertions(+), 15 deletions(-) diff --git a/tsdb/head.go b/tsdb/head.go index 73f2c999b2..e26e788aa7 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -25,6 +25,7 @@ import ( "runtime" "strconv" "sync" + stdatomic "sync/atomic" //nolint:depguard "time" "github.com/oklog/ulid/v2" @@ -1027,6 +1028,7 @@ func (h *Head) loadMmappedChunks(refSeries map[chunks.HeadSeriesRef]*memSeries) // Hence remove this chunk. ms.nextAt = 0 ms.headChunks = nil + ms.headChunkCount.Store(0) ms.app = nil } return nil @@ -1930,26 +1932,25 @@ func (h *Head) getOrCreateWithOptionalID(id chunks.HeadSeriesRef, hash uint64, l return s, true, nil } -// mmapHeadChunks will iterate all memSeries stored on Head and call mmapHeadChunks() on each of them. +// mmapHeadChunks iterates all memSeries stored on Head ready for m-mapping and calls +// mmapChunks() on each of them. // // There are two types of chunks that store samples for each memSeries: // A) Head chunk - stored on Go heap, when new samples are appended they go there. // B) M-mapped chunks - memory mapped chunks, kernel manages the memory for us on-demand, these chunks +// are read-only. // -// are read-only. -// -// Calling mmapHeadChunks() will iterate all memSeries and m-mmap all chunks that should be m-mapped. -// The m-mapping operation is needs to be serialised and so it goes via central lock. -// If there are multiple concurrent memSeries that need to m-map some chunk then they can block each-other. -// -// To minimise the effect of locking on TSDB operations m-mapping is serialised and done away from -// sample append path, since waiting on a lock inside an append would lock the entire memSeries for -// (potentially) a long time, since that could eventually delay next scrape and/or cause query timeouts. +// M-mapping is serialised via the per-series lock and done away from the sample append path, +// since holding the lock during an append could delay the next scrape or cause query timeouts. func (h *Head) mmapHeadChunks() { var count int - for i := 0; i < h.series.size; i++ { + for i := range h.series.size { h.series.locks[i].RLock() for _, series := range h.series.series[i] { + if series.headChunkCount.Load() < 2 { // < 2 means 0 or 1 head chunks, nothing to mmap. + continue + } + series.Lock() count += series.mmapChunks(h.chunkDiskMapper) series.Unlock() @@ -2468,8 +2469,9 @@ type memSeries struct { // pN is the pointer to the mmappedChunk referred to by HeadChunkID=N mmappedChunks []*mmappedChunk // Most recent chunks in memory that are still being built or waiting to be mmapped. - // This is a linked list, headChunks points to the most recent chunk, headChunks.next points + // This is a linked list, headChunks points to the most recent chunk, headChunks.prev points // to older chunk and so on. + // Please note the headChunkCount field tracking the number of headChunks. headChunks *memChunk firstChunkID chunks.HeadChunkID // HeadChunkID for mmappedChunks[0] @@ -2480,6 +2482,13 @@ type memSeries struct { nextAt int64 // Timestamp at which to cut the next chunk. histogramChunkHasComputedEndTime bool // True if nextAt has been predicted for the current histograms chunk; false otherwise. pendingCommit bool // Whether there are samples waiting to be committed to this series. + // headChunkCount tracks the number of head chunks. + // It is incremented in cutNewHeadChunk and the histogram new-chunk paths, + // and reset by mmapChunks and truncateChunksBefore. + // Chunk counts are bounded by the 3-byte field in HeadChunkRef, so cannot overflow uint32. + // Explicitly uses sync/atomic.Uint32 (4 bytes) to fit in the existing padding + // between two bools and a float64. + headChunkCount stdatomic.Uint32 // We keep the last value here (in addition to appending it to the chunk) so we can check for duplicates. lastValue float64 @@ -2546,7 +2555,7 @@ func (s *memSeries) maxTime() int64 { func (s *memSeries) truncateChunksBefore(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) int { var removedInOrder int if s.headChunks != nil { - var i int + var i uint32 var nextChk *memChunk chk := s.headChunks for chk != nil { @@ -2557,9 +2566,11 @@ func (s *memSeries) truncateChunksBefore(mint int64, minOOOMmapRef chunks.ChunkD if i == 0 { // This is the first chunk on the list so we need to remove the entire list. s.headChunks = nil + s.headChunkCount.Store(0) } else { // This is NOT the first chunk, unlink it from parent. nextChk.prev = nil + s.headChunkCount.Store(i) } s.mmappedChunks = nil break diff --git a/tsdb/head_append.go b/tsdb/head_append.go index c7143d8d96..558c39292c 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -1916,6 +1916,7 @@ func (s *memSeries) appendHistogram(st, t int64, h *histogram.Histogram, appendI maxTime: t, prev: s.headChunks, } + s.headChunkCount.Add(1) s.nextAt = rangeForTimestamp(t, o.chunkRange) return true, true } @@ -1973,6 +1974,7 @@ func (s *memSeries) appendFloatHistogram(st, t int64, fh *histogram.FloatHistogr maxTime: t, prev: s.headChunks, } + s.headChunkCount.Add(1) s.nextAt = rangeForTimestamp(t, o.chunkRange) return true, true } @@ -2146,6 +2148,7 @@ func (s *memSeries) cutNewHeadChunk(mint int64, e chunkenc.Encoding, chunkRange maxTime: math.MinInt64, prev: s.headChunks, } + s.headChunkCount.Add(1) if chunkenc.IsValidEncoding(e) { var err error @@ -2213,7 +2216,7 @@ func (s *memSeries) mmapCurrentOOOHeadChunk(o chunkOpts, logger *slog.Logger) [] return chunkRefs } -// mmapChunks will m-map all but first chunk on s.headChunks list. +// mmapChunks will m-map all but first chunk on s.headChunks list and update headChunkCount. func (s *memSeries) mmapChunks(chunkDiskMapper *chunks.ChunkDiskMapper) (count int) { if s.headChunks == nil || s.headChunks.prev == nil { // There is none or only one head chunk, so nothing to m-map here. @@ -2235,8 +2238,9 @@ func (s *memSeries) mmapChunks(chunkDiskMapper *chunks.ChunkDiskMapper) (count i count++ } - // Once we've written out all chunks except s.headChunks we need to unlink these from s.headChunk. + // Remove the tail of the list, leaving only the most recent head chunk. s.headChunks.prev = nil + s.headChunkCount.Store(1) return count } diff --git a/tsdb/head_bench_test.go b/tsdb/head_bench_test.go index d15f6cc310..5b21e4e0d6 100644 --- a/tsdb/head_bench_test.go +++ b/tsdb/head_bench_test.go @@ -311,3 +311,75 @@ type failingSeriesLifecycleCallback struct{} func (failingSeriesLifecycleCallback) PreCreation(labels.Labels) error { return errors.New("failed") } func (failingSeriesLifecycleCallback) PostCreation(labels.Labels) {} func (failingSeriesLifecycleCallback) PostDeletion(map[chunks.HeadSeriesRef]labels.Labels) {} + +func BenchmarkMmapHeadChunks(b *testing.B) { + for _, seriesCount := range []int{1000, 10000, 100000} { + for _, readyPct := range []float64{0.01, 0.1, 1.0} { + readyCount := max(int(float64(seriesCount)*readyPct), 1) + b.Run(fmt.Sprintf("series=%d/ready=%d", seriesCount, readyCount), func(b *testing.B) { + db := newTestDB(b) + db.DisableCompactions() + chunkRange := DefaultBlockDuration + + // Create all series in batches. + refs := make([]storage.SeriesRef, seriesCount) + lblsPerSeries := make([]labels.Labels, seriesCount) + ts := int64(0) + const batchSize = 1000 + for batchStart := 0; batchStart < seriesCount; batchStart += batchSize { + app := db.Appender(b.Context()) + batchEnd := min(batchStart+batchSize, seriesCount) + for i := batchStart; i < batchEnd; i++ { + lbls := labels.FromStrings("__name__", "bench", "i", strconv.Itoa(i)) + lblsPerSeries[i] = lbls + ref, err := app.Append(0, lbls, ts, float64(i)) + require.NoError(b, err) + refs[i] = ref + } + require.NoError(b, app.Commit()) + } + + // Pick a random, spread-out set of series to make ready each iteration. + // Using a Fisher-Yates shuffle prefix ensures coverage across stripes. + rng := rand.New(rand.NewSource(42)) + perm := rng.Perm(seriesCount) + readyIndices := perm[:readyCount] + + // makeReady appends a sample past the chunk range boundary to + // trigger a new head chunk on each ready series. Two appends + // suffice: one near the current ts, one past nextAt. + makeReady := func() { + ts++ // small increment for first sample + app := db.Appender(b.Context()) + for _, idx := range readyIndices { + var err error + refs[idx], err = app.Append(refs[idx], lblsPerSeries[idx], ts, float64(ts)) + require.NoError(b, err) + } + require.NoError(b, app.Commit()) + + ts += chunkRange // jump past nextAt to force chunk cut + app = db.Appender(b.Context()) + for _, idx := range readyIndices { + var err error + refs[idx], err = app.Append(refs[idx], lblsPerSeries[idx], ts, float64(ts)) + require.NoError(b, err) + } + require.NoError(b, app.Commit()) + } + + makeReady() + + b.ResetTimer() + b.ReportAllocs() + + for range b.N { + db.ForceHeadMMap() + b.StopTimer() + makeReady() + b.StartTimer() + } + }) + } + } +} diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 0b1bf6a53d..ac1c339f09 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -1734,8 +1734,12 @@ func TestMemSeries_truncateChunks_scenarios(t *testing.T) { } require.Len(t, series.mmappedChunks, tc.mmappedChunks, "wrong number of mmapped chunks") + // Set headChunkCount before truncation (series.append bypasses observeChunkCreated). + series.headChunkCount.Store(uint32(tc.headChunks)) + truncated := series.truncateChunksBefore(tc.truncateBefore, 0) require.Equal(t, tc.expectedTruncated, truncated, "wrong number of truncated chunks returned") + require.Equal(t, uint32(tc.expectedHead), series.headChunkCount.Load(), "wrong headChunkCount after truncation") require.Len(t, series.mmappedChunks, tc.expectedMmap, "wrong number of mmappedChunks after truncation") @@ -8066,3 +8070,145 @@ func TestHead_FindLastSeriesID(t *testing.T) { require.NoError(t, err) require.Equal(t, uint64(2), id, "Should find ID 2 even when state file's last segment is the newest segment") } + +func TestHead_mmapHeadChunks(t *testing.T) { + h, _ := newTestHead(t, DefaultBlockDuration, compression.None, false) + require.NoError(t, h.Init(0)) + + interval := DefaultBlockDuration / (4 * 120) // Same interval as other mmap tests. + + countReady := func() int { + n := 0 + for i := range h.series.size { + h.series.locks[i].RLock() + for _, s := range h.series.series[i] { + if s.headChunkCount.Load() >= 2 { + n++ + } + } + h.series.locks[i].RUnlock() + } + return n + } + + getCount := func(lbls labels.Labels) uint32 { + s := h.series.getByHash(lbls.Hash(), lbls) + require.NotNil(t, s, "series %s not found", lbls) + return s.headChunkCount.Load() + } + + lblsA := labels.FromStrings("__name__", "seriesA") + lblsB := labels.FromStrings("__name__", "seriesB") + lblsC := labels.FromStrings("__name__", "seriesC") + + ts := int64(0) + + // First chunk creation should set headChunkCount to 1. + app := h.Appender(t.Context()) + _, err := app.Append(0, lblsA, ts, 1.0) + require.NoError(t, err) + require.NoError(t, app.Commit()) + ts += interval + + require.Equal(t, uint32(1), getCount(lblsA), "first chunk should set headChunkCount to 1") + require.Equal(t, 0, countReady(), "series with only a first chunk should not be ready") + + const chunkCutIterations = 2*DefaultSamplesPerChunk + 10 + + // Appending enough samples to trigger chunk cuts should update headChunkCount. + var refB, refC storage.SeriesRef + app = h.Appender(t.Context()) + for range chunkCutIterations { + var err error + refB, err = app.Append(refB, lblsB, ts, float64(ts)) + require.NoError(t, err) + refC, err = app.Append(refC, lblsC, ts, float64(ts)) + require.NoError(t, err) + ts += interval + } + require.NoError(t, app.Commit()) + + require.Equal(t, 2, countReady(), "expected both series to be marked ready") + // With ~2*DefaultSamplesPerChunk samples, we expect 3 head chunks (the count tracks total head chunks). + require.Equal(t, uint32(3), getCount(lblsB), "series B headChunkCount should reflect actual head chunk count") + require.Equal(t, uint32(3), getCount(lblsC), "series C headChunkCount should reflect actual head chunk count") + + // mmapHeadChunks should reset headChunkCount to 1 (one head chunk remains). + h.mmapHeadChunks() + + for _, lbls := range []labels.Labels{lblsB, lblsC} { + s := h.series.getByHash(lbls.Hash(), lbls) + require.NotNil(t, s, "series %s not found", lbls) + s.Lock() + require.NotNil(t, s.headChunks, "series %s should have head chunks", lbls) + require.Nil(t, s.headChunks.prev, "series %s should not have prev mmapped", lbls) + require.NotEmpty(t, s.mmappedChunks, "series %s should have mmapped chunks", lbls) + s.Unlock() + } + + require.Equal(t, uint32(1), getCount(lblsB), "headChunkCount should be 1 after mmap") + require.Equal(t, uint32(1), getCount(lblsC), "headChunkCount should be 1 after mmap") + require.Equal(t, 0, countReady(), "ready set should be empty after mmapHeadChunks") + + // A second call should be a no-op. + beforeMetric := prom_testutil.ToFloat64(h.metrics.mmapChunksTotal) + h.mmapHeadChunks() + afterMetric := prom_testutil.ToFloat64(h.metrics.mmapChunksTotal) + require.Equal(t, beforeMetric, afterMetric, "second call should mmap 0 chunks") + + // Only newly ready series should be processed. + app = h.Appender(t.Context()) + for range chunkCutIterations { + var err error + refB, err = app.Append(refB, lblsB, ts, float64(ts)) + require.NoError(t, err) + ts += interval + } + require.NoError(t, app.Commit()) + + require.Equal(t, 1, countReady(), "only series B should be ready") + require.Equal(t, uint32(3), getCount(lblsB), "series B headChunkCount should reflect new chunks") + require.Equal(t, uint32(1), getCount(lblsC), "series C headChunkCount should be unchanged") + + beforeMetric = prom_testutil.ToFloat64(h.metrics.mmapChunksTotal) + h.mmapHeadChunks() + afterMetric = prom_testutil.ToFloat64(h.metrics.mmapChunksTotal) + require.Greater(t, afterMetric, beforeMetric, "third call should mmap chunks from series B") + require.Equal(t, uint32(1), getCount(lblsB), "series B headChunkCount should be 1 after mmap") +} + +func TestHead_mmapHeadChunks_oooDoesNotInflateCount(t *testing.T) { + h, _ := newTestHead(t, DefaultBlockDuration, compression.None, true /* oooEnabled */) + require.NoError(t, h.Init(0)) + + interval := DefaultBlockDuration / (4 * 120) + lbls := labels.FromStrings("__name__", "test") + ts := int64(0) + + // Create enough in-order samples to get headChunkCount >= 2. + const chunkCutIterations = 2*DefaultSamplesPerChunk + 10 + var ref storage.SeriesRef + app := h.Appender(t.Context()) + for range chunkCutIterations { + var err error + ref, err = app.Append(ref, lbls, ts, float64(ts)) + require.NoError(t, err) + ts += interval + } + require.NoError(t, app.Commit()) + + s := h.series.getByHash(lbls.Hash(), lbls) + require.NotNil(t, s) + countBefore := s.headChunkCount.Load() + require.GreaterOrEqual(t, countBefore, uint32(2), "need multiple head chunks for this test") + + // Append an OOO sample that creates an OOO chunk. + oooTs := ts - 5*time.Minute.Milliseconds() // Within the 10m OOO window. + app = h.Appender(t.Context()) + _, err := app.Append(0, lbls, oooTs, 999.0) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + // headChunkCount must not change from an OOO insert. + require.Equal(t, countBefore, s.headChunkCount.Load(), "OOO insert should not inflate headChunkCount") +} diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index e9700dd82e..d4fff5561f 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -579,6 +579,7 @@ func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc, oooMmc []*m // Any samples replayed till now would already be compacted. Resetting the head chunk. mSeries.nextAt = 0 mSeries.headChunks = nil + mSeries.headChunkCount.Store(0) mSeries.app = nil return overlapped } @@ -1660,6 +1661,7 @@ func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSerie } series.nextAt = csr.mc.maxTime // This will create a new chunk on append. series.headChunks = csr.mc + series.headChunkCount.Store(uint32(csr.mc.len())) series.lastValue = csr.lastValue series.lastHistogramValue = csr.lastHistogramValue series.lastFloatHistogramValue = csr.lastFloatHistogramValue