mirror of
https://github.com/prometheus/prometheus.git
synced 2026-06-09 08:32:26 -04:00
tsdb: Skip clean series during periodic head chunk mmap (#18272)
tsdb: Skip clean series during periodic head chunk mmap The periodic mmapHeadChunks cycle previously acquired a per-series lock on every series, even though typically >99% have nothing to mmap. This was identified as a CPU bottleneck in Grafana Mimir. Add a headChunkCount field (sync/atomic.Uint32) to memSeries that tracks the number of head chunks. It is incremented in cutNewHeadChunk and the histogram new-chunk paths, and reset by mmapChunks and truncateChunksBefore. mmapHeadChunks uses a lock-free Load to skip series with fewer than 2 head chunks, avoiding the per-series lock for clean series. sync/atomic.Uint32 (4 bytes) is used instead of go.uber.org/atomic (8 bytes) to fit in existing struct padding without growing memSeries. Chunk counts are bounded by the 3-byte field in HeadChunkRef, so cannot overflow uint32. Also fix pre-existing comment inaccuracies in the touched code: headChunks.next -> headChunks.prev, mmapHeadChunks() -> mmapChunks() in the doc comment, and a grammar error. --------- Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
This commit is contained in:
parent
1c449737e1
commit
98809e40c6
5 changed files with 250 additions and 15 deletions
37
tsdb/head.go
37
tsdb/head.go
|
|
@ -25,6 +25,7 @@ import (
|
|||
"runtime"
|
||||
"strconv"
|
||||
"sync"
|
||||
stdatomic "sync/atomic" //nolint:depguard
|
||||
"time"
|
||||
|
||||
"github.com/oklog/ulid/v2"
|
||||
|
|
@ -1027,6 +1028,7 @@ func (h *Head) loadMmappedChunks(refSeries map[chunks.HeadSeriesRef]*memSeries)
|
|||
// Hence remove this chunk.
|
||||
ms.nextAt = 0
|
||||
ms.headChunks = nil
|
||||
ms.headChunkCount.Store(0)
|
||||
ms.app = nil
|
||||
}
|
||||
return nil
|
||||
|
|
@ -1930,26 +1932,25 @@ func (h *Head) getOrCreateWithOptionalID(id chunks.HeadSeriesRef, hash uint64, l
|
|||
return s, true, nil
|
||||
}
|
||||
|
||||
// mmapHeadChunks will iterate all memSeries stored on Head and call mmapHeadChunks() on each of them.
|
||||
// mmapHeadChunks iterates all memSeries stored on Head ready for m-mapping and calls
|
||||
// mmapChunks() on each of them.
|
||||
//
|
||||
// There are two types of chunks that store samples for each memSeries:
|
||||
// A) Head chunk - stored on Go heap, when new samples are appended they go there.
|
||||
// B) M-mapped chunks - memory mapped chunks, kernel manages the memory for us on-demand, these chunks
|
||||
// are read-only.
|
||||
//
|
||||
// are read-only.
|
||||
//
|
||||
// Calling mmapHeadChunks() will iterate all memSeries and m-mmap all chunks that should be m-mapped.
|
||||
// The m-mapping operation is needs to be serialised and so it goes via central lock.
|
||||
// If there are multiple concurrent memSeries that need to m-map some chunk then they can block each-other.
|
||||
//
|
||||
// To minimise the effect of locking on TSDB operations m-mapping is serialised and done away from
|
||||
// sample append path, since waiting on a lock inside an append would lock the entire memSeries for
|
||||
// (potentially) a long time, since that could eventually delay next scrape and/or cause query timeouts.
|
||||
// M-mapping is serialised via the per-series lock and done away from the sample append path,
|
||||
// since holding the lock during an append could delay the next scrape or cause query timeouts.
|
||||
func (h *Head) mmapHeadChunks() {
|
||||
var count int
|
||||
for i := 0; i < h.series.size; i++ {
|
||||
for i := range h.series.size {
|
||||
h.series.locks[i].RLock()
|
||||
for _, series := range h.series.series[i] {
|
||||
if series.headChunkCount.Load() < 2 { // < 2 means 0 or 1 head chunks, nothing to mmap.
|
||||
continue
|
||||
}
|
||||
|
||||
series.Lock()
|
||||
count += series.mmapChunks(h.chunkDiskMapper)
|
||||
series.Unlock()
|
||||
|
|
@ -2468,8 +2469,9 @@ type memSeries struct {
|
|||
// pN is the pointer to the mmappedChunk referred to by HeadChunkID=N
|
||||
mmappedChunks []*mmappedChunk
|
||||
// Most recent chunks in memory that are still being built or waiting to be mmapped.
|
||||
// This is a linked list, headChunks points to the most recent chunk, headChunks.next points
|
||||
// This is a linked list, headChunks points to the most recent chunk, headChunks.prev points
|
||||
// to older chunk and so on.
|
||||
// Please note the headChunkCount field tracking the number of headChunks.
|
||||
headChunks *memChunk
|
||||
firstChunkID chunks.HeadChunkID // HeadChunkID for mmappedChunks[0]
|
||||
|
||||
|
|
@ -2480,6 +2482,13 @@ type memSeries struct {
|
|||
nextAt int64 // Timestamp at which to cut the next chunk.
|
||||
histogramChunkHasComputedEndTime bool // True if nextAt has been predicted for the current histograms chunk; false otherwise.
|
||||
pendingCommit bool // Whether there are samples waiting to be committed to this series.
|
||||
// headChunkCount tracks the number of head chunks.
|
||||
// It is incremented in cutNewHeadChunk and the histogram new-chunk paths,
|
||||
// and reset by mmapChunks and truncateChunksBefore.
|
||||
// Chunk counts are bounded by the 3-byte field in HeadChunkRef, so cannot overflow uint32.
|
||||
// Explicitly uses sync/atomic.Uint32 (4 bytes) to fit in the existing padding
|
||||
// between two bools and a float64.
|
||||
headChunkCount stdatomic.Uint32
|
||||
|
||||
// We keep the last value here (in addition to appending it to the chunk) so we can check for duplicates.
|
||||
lastValue float64
|
||||
|
|
@ -2546,7 +2555,7 @@ func (s *memSeries) maxTime() int64 {
|
|||
func (s *memSeries) truncateChunksBefore(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) int {
|
||||
var removedInOrder int
|
||||
if s.headChunks != nil {
|
||||
var i int
|
||||
var i uint32
|
||||
var nextChk *memChunk
|
||||
chk := s.headChunks
|
||||
for chk != nil {
|
||||
|
|
@ -2557,9 +2566,11 @@ func (s *memSeries) truncateChunksBefore(mint int64, minOOOMmapRef chunks.ChunkD
|
|||
if i == 0 {
|
||||
// This is the first chunk on the list so we need to remove the entire list.
|
||||
s.headChunks = nil
|
||||
s.headChunkCount.Store(0)
|
||||
} else {
|
||||
// This is NOT the first chunk, unlink it from parent.
|
||||
nextChk.prev = nil
|
||||
s.headChunkCount.Store(i)
|
||||
}
|
||||
s.mmappedChunks = nil
|
||||
break
|
||||
|
|
|
|||
|
|
@ -1916,6 +1916,7 @@ func (s *memSeries) appendHistogram(st, t int64, h *histogram.Histogram, appendI
|
|||
maxTime: t,
|
||||
prev: s.headChunks,
|
||||
}
|
||||
s.headChunkCount.Add(1)
|
||||
s.nextAt = rangeForTimestamp(t, o.chunkRange)
|
||||
return true, true
|
||||
}
|
||||
|
|
@ -1973,6 +1974,7 @@ func (s *memSeries) appendFloatHistogram(st, t int64, fh *histogram.FloatHistogr
|
|||
maxTime: t,
|
||||
prev: s.headChunks,
|
||||
}
|
||||
s.headChunkCount.Add(1)
|
||||
s.nextAt = rangeForTimestamp(t, o.chunkRange)
|
||||
return true, true
|
||||
}
|
||||
|
|
@ -2146,6 +2148,7 @@ func (s *memSeries) cutNewHeadChunk(mint int64, e chunkenc.Encoding, chunkRange
|
|||
maxTime: math.MinInt64,
|
||||
prev: s.headChunks,
|
||||
}
|
||||
s.headChunkCount.Add(1)
|
||||
|
||||
if chunkenc.IsValidEncoding(e) {
|
||||
var err error
|
||||
|
|
@ -2213,7 +2216,7 @@ func (s *memSeries) mmapCurrentOOOHeadChunk(o chunkOpts, logger *slog.Logger) []
|
|||
return chunkRefs
|
||||
}
|
||||
|
||||
// mmapChunks will m-map all but first chunk on s.headChunks list.
|
||||
// mmapChunks will m-map all but first chunk on s.headChunks list and update headChunkCount.
|
||||
func (s *memSeries) mmapChunks(chunkDiskMapper *chunks.ChunkDiskMapper) (count int) {
|
||||
if s.headChunks == nil || s.headChunks.prev == nil {
|
||||
// There is none or only one head chunk, so nothing to m-map here.
|
||||
|
|
@ -2235,8 +2238,9 @@ func (s *memSeries) mmapChunks(chunkDiskMapper *chunks.ChunkDiskMapper) (count i
|
|||
count++
|
||||
}
|
||||
|
||||
// Once we've written out all chunks except s.headChunks we need to unlink these from s.headChunk.
|
||||
// Remove the tail of the list, leaving only the most recent head chunk.
|
||||
s.headChunks.prev = nil
|
||||
s.headChunkCount.Store(1)
|
||||
|
||||
return count
|
||||
}
|
||||
|
|
|
|||
|
|
@ -311,3 +311,75 @@ type failingSeriesLifecycleCallback struct{}
|
|||
func (failingSeriesLifecycleCallback) PreCreation(labels.Labels) error { return errors.New("failed") }
|
||||
func (failingSeriesLifecycleCallback) PostCreation(labels.Labels) {}
|
||||
func (failingSeriesLifecycleCallback) PostDeletion(map[chunks.HeadSeriesRef]labels.Labels) {}
|
||||
|
||||
func BenchmarkMmapHeadChunks(b *testing.B) {
|
||||
for _, seriesCount := range []int{1000, 10000, 100000} {
|
||||
for _, readyPct := range []float64{0.01, 0.1, 1.0} {
|
||||
readyCount := max(int(float64(seriesCount)*readyPct), 1)
|
||||
b.Run(fmt.Sprintf("series=%d/ready=%d", seriesCount, readyCount), func(b *testing.B) {
|
||||
db := newTestDB(b)
|
||||
db.DisableCompactions()
|
||||
chunkRange := DefaultBlockDuration
|
||||
|
||||
// Create all series in batches.
|
||||
refs := make([]storage.SeriesRef, seriesCount)
|
||||
lblsPerSeries := make([]labels.Labels, seriesCount)
|
||||
ts := int64(0)
|
||||
const batchSize = 1000
|
||||
for batchStart := 0; batchStart < seriesCount; batchStart += batchSize {
|
||||
app := db.Appender(b.Context())
|
||||
batchEnd := min(batchStart+batchSize, seriesCount)
|
||||
for i := batchStart; i < batchEnd; i++ {
|
||||
lbls := labels.FromStrings("__name__", "bench", "i", strconv.Itoa(i))
|
||||
lblsPerSeries[i] = lbls
|
||||
ref, err := app.Append(0, lbls, ts, float64(i))
|
||||
require.NoError(b, err)
|
||||
refs[i] = ref
|
||||
}
|
||||
require.NoError(b, app.Commit())
|
||||
}
|
||||
|
||||
// Pick a random, spread-out set of series to make ready each iteration.
|
||||
// Using a Fisher-Yates shuffle prefix ensures coverage across stripes.
|
||||
rng := rand.New(rand.NewSource(42))
|
||||
perm := rng.Perm(seriesCount)
|
||||
readyIndices := perm[:readyCount]
|
||||
|
||||
// makeReady appends a sample past the chunk range boundary to
|
||||
// trigger a new head chunk on each ready series. Two appends
|
||||
// suffice: one near the current ts, one past nextAt.
|
||||
makeReady := func() {
|
||||
ts++ // small increment for first sample
|
||||
app := db.Appender(b.Context())
|
||||
for _, idx := range readyIndices {
|
||||
var err error
|
||||
refs[idx], err = app.Append(refs[idx], lblsPerSeries[idx], ts, float64(ts))
|
||||
require.NoError(b, err)
|
||||
}
|
||||
require.NoError(b, app.Commit())
|
||||
|
||||
ts += chunkRange // jump past nextAt to force chunk cut
|
||||
app = db.Appender(b.Context())
|
||||
for _, idx := range readyIndices {
|
||||
var err error
|
||||
refs[idx], err = app.Append(refs[idx], lblsPerSeries[idx], ts, float64(ts))
|
||||
require.NoError(b, err)
|
||||
}
|
||||
require.NoError(b, app.Commit())
|
||||
}
|
||||
|
||||
makeReady()
|
||||
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
|
||||
for range b.N {
|
||||
db.ForceHeadMMap()
|
||||
b.StopTimer()
|
||||
makeReady()
|
||||
b.StartTimer()
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1734,8 +1734,12 @@ func TestMemSeries_truncateChunks_scenarios(t *testing.T) {
|
|||
}
|
||||
require.Len(t, series.mmappedChunks, tc.mmappedChunks, "wrong number of mmapped chunks")
|
||||
|
||||
// Set headChunkCount before truncation (series.append bypasses observeChunkCreated).
|
||||
series.headChunkCount.Store(uint32(tc.headChunks))
|
||||
|
||||
truncated := series.truncateChunksBefore(tc.truncateBefore, 0)
|
||||
require.Equal(t, tc.expectedTruncated, truncated, "wrong number of truncated chunks returned")
|
||||
require.Equal(t, uint32(tc.expectedHead), series.headChunkCount.Load(), "wrong headChunkCount after truncation")
|
||||
|
||||
require.Len(t, series.mmappedChunks, tc.expectedMmap, "wrong number of mmappedChunks after truncation")
|
||||
|
||||
|
|
@ -8066,3 +8070,145 @@ func TestHead_FindLastSeriesID(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(2), id, "Should find ID 2 even when state file's last segment is the newest segment")
|
||||
}
|
||||
|
||||
func TestHead_mmapHeadChunks(t *testing.T) {
|
||||
h, _ := newTestHead(t, DefaultBlockDuration, compression.None, false)
|
||||
require.NoError(t, h.Init(0))
|
||||
|
||||
interval := DefaultBlockDuration / (4 * 120) // Same interval as other mmap tests.
|
||||
|
||||
countReady := func() int {
|
||||
n := 0
|
||||
for i := range h.series.size {
|
||||
h.series.locks[i].RLock()
|
||||
for _, s := range h.series.series[i] {
|
||||
if s.headChunkCount.Load() >= 2 {
|
||||
n++
|
||||
}
|
||||
}
|
||||
h.series.locks[i].RUnlock()
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
getCount := func(lbls labels.Labels) uint32 {
|
||||
s := h.series.getByHash(lbls.Hash(), lbls)
|
||||
require.NotNil(t, s, "series %s not found", lbls)
|
||||
return s.headChunkCount.Load()
|
||||
}
|
||||
|
||||
lblsA := labels.FromStrings("__name__", "seriesA")
|
||||
lblsB := labels.FromStrings("__name__", "seriesB")
|
||||
lblsC := labels.FromStrings("__name__", "seriesC")
|
||||
|
||||
ts := int64(0)
|
||||
|
||||
// First chunk creation should set headChunkCount to 1.
|
||||
app := h.Appender(t.Context())
|
||||
_, err := app.Append(0, lblsA, ts, 1.0)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
ts += interval
|
||||
|
||||
require.Equal(t, uint32(1), getCount(lblsA), "first chunk should set headChunkCount to 1")
|
||||
require.Equal(t, 0, countReady(), "series with only a first chunk should not be ready")
|
||||
|
||||
const chunkCutIterations = 2*DefaultSamplesPerChunk + 10
|
||||
|
||||
// Appending enough samples to trigger chunk cuts should update headChunkCount.
|
||||
var refB, refC storage.SeriesRef
|
||||
app = h.Appender(t.Context())
|
||||
for range chunkCutIterations {
|
||||
var err error
|
||||
refB, err = app.Append(refB, lblsB, ts, float64(ts))
|
||||
require.NoError(t, err)
|
||||
refC, err = app.Append(refC, lblsC, ts, float64(ts))
|
||||
require.NoError(t, err)
|
||||
ts += interval
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
require.Equal(t, 2, countReady(), "expected both series to be marked ready")
|
||||
// With ~2*DefaultSamplesPerChunk samples, we expect 3 head chunks (the count tracks total head chunks).
|
||||
require.Equal(t, uint32(3), getCount(lblsB), "series B headChunkCount should reflect actual head chunk count")
|
||||
require.Equal(t, uint32(3), getCount(lblsC), "series C headChunkCount should reflect actual head chunk count")
|
||||
|
||||
// mmapHeadChunks should reset headChunkCount to 1 (one head chunk remains).
|
||||
h.mmapHeadChunks()
|
||||
|
||||
for _, lbls := range []labels.Labels{lblsB, lblsC} {
|
||||
s := h.series.getByHash(lbls.Hash(), lbls)
|
||||
require.NotNil(t, s, "series %s not found", lbls)
|
||||
s.Lock()
|
||||
require.NotNil(t, s.headChunks, "series %s should have head chunks", lbls)
|
||||
require.Nil(t, s.headChunks.prev, "series %s should not have prev mmapped", lbls)
|
||||
require.NotEmpty(t, s.mmappedChunks, "series %s should have mmapped chunks", lbls)
|
||||
s.Unlock()
|
||||
}
|
||||
|
||||
require.Equal(t, uint32(1), getCount(lblsB), "headChunkCount should be 1 after mmap")
|
||||
require.Equal(t, uint32(1), getCount(lblsC), "headChunkCount should be 1 after mmap")
|
||||
require.Equal(t, 0, countReady(), "ready set should be empty after mmapHeadChunks")
|
||||
|
||||
// A second call should be a no-op.
|
||||
beforeMetric := prom_testutil.ToFloat64(h.metrics.mmapChunksTotal)
|
||||
h.mmapHeadChunks()
|
||||
afterMetric := prom_testutil.ToFloat64(h.metrics.mmapChunksTotal)
|
||||
require.Equal(t, beforeMetric, afterMetric, "second call should mmap 0 chunks")
|
||||
|
||||
// Only newly ready series should be processed.
|
||||
app = h.Appender(t.Context())
|
||||
for range chunkCutIterations {
|
||||
var err error
|
||||
refB, err = app.Append(refB, lblsB, ts, float64(ts))
|
||||
require.NoError(t, err)
|
||||
ts += interval
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
require.Equal(t, 1, countReady(), "only series B should be ready")
|
||||
require.Equal(t, uint32(3), getCount(lblsB), "series B headChunkCount should reflect new chunks")
|
||||
require.Equal(t, uint32(1), getCount(lblsC), "series C headChunkCount should be unchanged")
|
||||
|
||||
beforeMetric = prom_testutil.ToFloat64(h.metrics.mmapChunksTotal)
|
||||
h.mmapHeadChunks()
|
||||
afterMetric = prom_testutil.ToFloat64(h.metrics.mmapChunksTotal)
|
||||
require.Greater(t, afterMetric, beforeMetric, "third call should mmap chunks from series B")
|
||||
require.Equal(t, uint32(1), getCount(lblsB), "series B headChunkCount should be 1 after mmap")
|
||||
}
|
||||
|
||||
func TestHead_mmapHeadChunks_oooDoesNotInflateCount(t *testing.T) {
|
||||
h, _ := newTestHead(t, DefaultBlockDuration, compression.None, true /* oooEnabled */)
|
||||
require.NoError(t, h.Init(0))
|
||||
|
||||
interval := DefaultBlockDuration / (4 * 120)
|
||||
lbls := labels.FromStrings("__name__", "test")
|
||||
ts := int64(0)
|
||||
|
||||
// Create enough in-order samples to get headChunkCount >= 2.
|
||||
const chunkCutIterations = 2*DefaultSamplesPerChunk + 10
|
||||
var ref storage.SeriesRef
|
||||
app := h.Appender(t.Context())
|
||||
for range chunkCutIterations {
|
||||
var err error
|
||||
ref, err = app.Append(ref, lbls, ts, float64(ts))
|
||||
require.NoError(t, err)
|
||||
ts += interval
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
s := h.series.getByHash(lbls.Hash(), lbls)
|
||||
require.NotNil(t, s)
|
||||
countBefore := s.headChunkCount.Load()
|
||||
require.GreaterOrEqual(t, countBefore, uint32(2), "need multiple head chunks for this test")
|
||||
|
||||
// Append an OOO sample that creates an OOO chunk.
|
||||
oooTs := ts - 5*time.Minute.Milliseconds() // Within the 10m OOO window.
|
||||
app = h.Appender(t.Context())
|
||||
_, err := app.Append(0, lbls, oooTs, 999.0)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
// headChunkCount must not change from an OOO insert.
|
||||
require.Equal(t, countBefore, s.headChunkCount.Load(), "OOO insert should not inflate headChunkCount")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -579,6 +579,7 @@ func (h *Head) resetSeriesWithMMappedChunks(mSeries *memSeries, mmc, oooMmc []*m
|
|||
// Any samples replayed till now would already be compacted. Resetting the head chunk.
|
||||
mSeries.nextAt = 0
|
||||
mSeries.headChunks = nil
|
||||
mSeries.headChunkCount.Store(0)
|
||||
mSeries.app = nil
|
||||
return overlapped
|
||||
}
|
||||
|
|
@ -1660,6 +1661,7 @@ func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSerie
|
|||
}
|
||||
series.nextAt = csr.mc.maxTime // This will create a new chunk on append.
|
||||
series.headChunks = csr.mc
|
||||
series.headChunkCount.Store(uint32(csr.mc.len()))
|
||||
series.lastValue = csr.lastValue
|
||||
series.lastHistogramValue = csr.lastHistogramValue
|
||||
series.lastFloatHistogramValue = csr.lastFloatHistogramValue
|
||||
|
|
|
|||
Loading…
Reference in a new issue