diff --git a/tsdb/head.go b/tsdb/head.go index de66987f6b..2637e905b0 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1437,6 +1437,7 @@ type memSeries struct { ref uint64 lset labels.Labels mmappedChunks []*mmappedChunk + mmMaxTime int64 // Max time of any mmapped chunk, only used during WAL replay. headChunk *memChunk chunkRange int64 firstChunkID int diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 5513ca6450..911cf28201 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -137,6 +137,7 @@ func BenchmarkLoadWAL(b *testing.B) { batches int seriesPerBatch int samplesPerSeries int + mmappedChunkT int64 }{ { // Less series and more samples. 2 hour WAL with 1 second scrape interval. batches: 10, @@ -153,6 +154,12 @@ func BenchmarkLoadWAL(b *testing.B) { seriesPerBatch: 1000, samplesPerSeries: 480, }, + { // 2 hour WAL with 15 second scrape interval, and mmapped chunks up to last 100 samples. + batches: 100, + seriesPerBatch: 1000, + samplesPerSeries: 480, + mmappedChunkT: 3800, + }, } labelsPerSeries := 5 @@ -169,7 +176,7 @@ func BenchmarkLoadWAL(b *testing.B) { } lastExemplarsPerSeries = exemplarsPerSeries // fmt.Println("exemplars per series: ", exemplarsPerSeries) - b.Run(fmt.Sprintf("batches=%d,seriesPerBatch=%d,samplesPerSeries=%d,exemplarsPerSeries=%d", c.batches, c.seriesPerBatch, c.samplesPerSeries, exemplarsPerSeries), + b.Run(fmt.Sprintf("batches=%d,seriesPerBatch=%d,samplesPerSeries=%d,exemplarsPerSeries=%d,mmappedChunkT=%d", c.batches, c.seriesPerBatch, c.samplesPerSeries, exemplarsPerSeries, c.mmappedChunkT), func(b *testing.B) { dir, err := ioutil.TempDir("", "test_load_wal") require.NoError(b, err) @@ -190,7 +197,7 @@ func BenchmarkLoadWAL(b *testing.B) { for j := 1; len(lbls) < labelsPerSeries; j++ { lbls[defaultLabelName+strconv.Itoa(j)] = defaultLabelValue + strconv.Itoa(j) } - refSeries = append(refSeries, record.RefSeries{Ref: uint64(i) * 100, Labels: labels.FromMap(lbls)}) + refSeries = append(refSeries, record.RefSeries{Ref: uint64(i) * 101, Labels: labels.FromMap(lbls)}) } populateTestWAL(b, w, []interface{}{refSeries}) } @@ -202,7 +209,7 @@ func BenchmarkLoadWAL(b *testing.B) { refSamples = refSamples[:0] for k := j * c.seriesPerBatch; k < (j+1)*c.seriesPerBatch; k++ { refSamples = append(refSamples, record.RefSample{ - Ref: uint64(k) * 100, + Ref: uint64(k) * 101, T: int64(i) * 10, V: float64(i) * 100, }) @@ -211,14 +218,27 @@ func BenchmarkLoadWAL(b *testing.B) { } } - // Write samples. + // Write mmapped chunks. + if c.mmappedChunkT != 0 { + chunkDiskMapper, err := chunks.NewChunkDiskMapper(mmappedChunksDir(dir), chunkenc.NewPool(), chunks.DefaultWriteBufferSize) + require.NoError(b, err) + for k := 0; k < c.batches*c.seriesPerBatch; k++ { + // Create one mmapped chunk per series, with one sample at the given time. + s := newMemSeries(labels.Labels{}, uint64(k)*101, c.mmappedChunkT, nil) + s.append(c.mmappedChunkT, 42, 0, chunkDiskMapper) + s.mmapCurrentHeadChunk(chunkDiskMapper) + } + require.NoError(b, chunkDiskMapper.Close()) + } + + // Write exemplars. refExemplars := make([]record.RefExemplar, 0, c.seriesPerBatch) for i := 0; i < exemplarsPerSeries; i++ { for j := 0; j < c.batches; j++ { refExemplars = refExemplars[:0] for k := j * c.seriesPerBatch; k < (j+1)*c.seriesPerBatch; k++ { refExemplars = append(refExemplars, record.RefExemplar{ - Ref: uint64(k) * 100, + Ref: uint64(k) * 101, T: int64(i) * 10, V: float64(i) * 100, Labels: labels.FromStrings("traceID", fmt.Sprintf("trace-%d", i)), @@ -239,6 +259,8 @@ func BenchmarkLoadWAL(b *testing.B) { require.NoError(b, err) h.Init(0) } + b.StopTimer() + w.Close() }) } } diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 08cce7e406..506a64de97 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -212,9 +212,7 @@ Outer: if created { // This is the first WAL series record for this series. - h.metrics.chunksCreated.Add(float64(len(mmc))) - h.metrics.chunks.Add(float64(len(mmc))) - mSeries.mmappedChunks = mmc + h.setMMappedChunks(mSeries, mmc) continue } @@ -258,16 +256,12 @@ Outer: } // Replacing m-mapped chunks with the new ones (could be empty). - h.metrics.chunksCreated.Add(float64(len(mmc))) - h.metrics.chunksRemoved.Add(float64(len(mSeries.mmappedChunks))) - h.metrics.chunks.Add(float64(len(mmc) - len(mSeries.mmappedChunks))) - mSeries.mmappedChunks = mmc + h.setMMappedChunks(mSeries, mmc) // Any samples replayed till now would already be compacted. Resetting the head chunk. mSeries.nextAt = 0 mSeries.headChunk = nil mSeries.app = nil - h.updateMinMaxTime(mSeries.minTime(), mSeries.maxTime()) } //nolint:staticcheck // Ignore SA6002 relax staticcheck verification. seriesPool.Put(v) @@ -359,6 +353,20 @@ Outer: return nil } +func (h *Head) setMMappedChunks(mSeries *memSeries, mmc []*mmappedChunk) { + h.metrics.chunksCreated.Add(float64(len(mmc))) + h.metrics.chunksRemoved.Add(float64(len(mSeries.mmappedChunks))) + h.metrics.chunks.Add(float64(len(mmc) - len(mSeries.mmappedChunks))) + mSeries.mmappedChunks = mmc + // Cache the last mmapped chunk time, so we can skip calling append() for samples it will reject. + if len(mmc) == 0 { + mSeries.mmMaxTime = math.MinInt64 + } else { + mSeries.mmMaxTime = mmc[len(mmc)-1].maxTime + h.updateMinMaxTime(mmc[0].minTime, mSeries.mmMaxTime) + } +} + // processWALSamples adds the samples it receives to the head and passes // the buffer received to an output channel for reuse. // Samples before the minValidTime timestamp are discarded. @@ -368,9 +376,6 @@ func (h *Head) processWALSamples( ) (unknownRefs uint64) { defer close(output) - // Mitigate lock contention in getByID. - refSeries := map[uint64]*memSeries{} - mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) for samples := range input { @@ -378,14 +383,13 @@ func (h *Head) processWALSamples( if s.T < minValidTime { continue } - ms := refSeries[s.Ref] + ms := h.series.getByID(s.Ref) if ms == nil { - ms = h.series.getByID(s.Ref) - if ms == nil { - unknownRefs++ - continue - } - refSeries[s.Ref] = ms + unknownRefs++ + continue + } + if s.T <= ms.mmMaxTime { + continue } if _, chunkCreated := ms.append(s.T, s.V, 0, h.chunkDiskMapper); chunkCreated { h.metrics.chunksCreated.Inc()