diff --git a/tsdb/db.go b/tsdb/db.go index 1d73628bfd..5e969ec73d 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -236,7 +236,6 @@ type Options struct { // EnableSTStorage determines whether TSDB should write a Start Timestamp (ST) // per sample to WAL. - // TODO(bwplotka): Implement this option as per PROM-60, currently it's noop. EnableSTStorage bool // EnableMetadataWALRecords represents 'metadata-wal-records' feature flag. diff --git a/tsdb/head.go b/tsdb/head.go index 19c2538b12..a42735375e 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -203,7 +203,6 @@ type HeadOptions struct { // EnableSTStorage determines whether agent DB should write a Start Timestamp (ST) // per sample to WAL. - // TODO(bwplotka): Implement this option as per PROM-60, currently it's noop. EnableSTStorage bool } diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 846ad476e3..6cac1185a3 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -1058,8 +1058,10 @@ func (a *headAppenderBase) log() error { buf := a.head.getBytesBuffer() defer func() { a.head.putBytesBuffer(buf) }() - var rec []byte - var enc record.Encoder + var ( + rec []byte + enc = record.Encoder{EnableSTStorage: a.head.opts.EnableSTStorage} + ) if len(a.seriesRefs) > 0 { rec = enc.Series(a.seriesRefs, buf) @@ -1178,7 +1180,7 @@ type appenderCommitContext struct { oooRecords [][]byte oooCapMax int64 appendChunkOpts chunkOpts - enc record.Encoder + oooEnc record.Encoder } // commitExemplars adds all exemplars from the provided batch to the head's exemplar storage. @@ -1228,31 +1230,31 @@ func (acc *appenderCommitContext) collectOOORecords(a *headAppenderBase) { }) } } - r := acc.enc.MmapMarkers(markers, a.head.getBytesBuffer()) + r := acc.oooEnc.MmapMarkers(markers, a.head.getBytesBuffer()) acc.oooRecords = append(acc.oooRecords, r) } if len(acc.wblSamples) > 0 { - r := acc.enc.Samples(acc.wblSamples, a.head.getBytesBuffer()) + r := acc.oooEnc.Samples(acc.wblSamples, a.head.getBytesBuffer()) acc.oooRecords = append(acc.oooRecords, r) } if len(acc.wblHistograms) > 0 { - r, customBucketsHistograms := acc.enc.HistogramSamples(acc.wblHistograms, a.head.getBytesBuffer()) + r, customBucketsHistograms := acc.oooEnc.HistogramSamples(acc.wblHistograms, a.head.getBytesBuffer()) if len(r) > 0 { acc.oooRecords = append(acc.oooRecords, r) } if len(customBucketsHistograms) > 0 { - r := acc.enc.CustomBucketsHistogramSamples(customBucketsHistograms, a.head.getBytesBuffer()) + r := acc.oooEnc.CustomBucketsHistogramSamples(customBucketsHistograms, a.head.getBytesBuffer()) acc.oooRecords = append(acc.oooRecords, r) } } if len(acc.wblFloatHistograms) > 0 { - r, customBucketsFloatHistograms := acc.enc.FloatHistogramSamples(acc.wblFloatHistograms, a.head.getBytesBuffer()) + r, customBucketsFloatHistograms := acc.oooEnc.FloatHistogramSamples(acc.wblFloatHistograms, a.head.getBytesBuffer()) if len(r) > 0 { acc.oooRecords = append(acc.oooRecords, r) } if len(customBucketsFloatHistograms) > 0 { - r := acc.enc.CustomBucketsFloatHistogramSamples(customBucketsFloatHistograms, a.head.getBytesBuffer()) + r := acc.oooEnc.CustomBucketsFloatHistogramSamples(customBucketsFloatHistograms, a.head.getBytesBuffer()) acc.oooRecords = append(acc.oooRecords, r) } } @@ -1431,7 +1433,7 @@ func (a *headAppenderBase) commitFloats(b *appendBatch, acc *appenderCommitConte default: newlyStale := !value.IsStaleNaN(series.lastValue) && value.IsStaleNaN(s.V) staleToNonStale := value.IsStaleNaN(series.lastValue) && !value.IsStaleNaN(s.V) - ok, chunkCreated = series.append(s.T, s.V, a.appendID, acc.appendChunkOpts) + ok, chunkCreated = series.append(s.ST, s.T, s.V, a.appendID, acc.appendChunkOpts) if ok { if s.T < acc.inOrderMint { acc.inOrderMint = s.T @@ -1540,7 +1542,8 @@ func (a *headAppenderBase) commitHistograms(b *appendBatch, acc *appenderCommitC newlyStale = newlyStale && !value.IsStaleNaN(series.lastHistogramValue.Sum) staleToNonStale = value.IsStaleNaN(series.lastHistogramValue.Sum) && !value.IsStaleNaN(s.H.Sum) } - ok, chunkCreated = series.appendHistogram(s.T, s.H, a.appendID, acc.appendChunkOpts) + // TODO(bwplotka): Add support for ST for Histograms. + ok, chunkCreated = series.appendHistogram(0, s.T, s.H, a.appendID, acc.appendChunkOpts) if ok { if s.T < acc.inOrderMint { acc.inOrderMint = s.T @@ -1649,7 +1652,8 @@ func (a *headAppenderBase) commitFloatHistograms(b *appendBatch, acc *appenderCo newlyStale = newlyStale && !value.IsStaleNaN(series.lastFloatHistogramValue.Sum) staleToNonStale = value.IsStaleNaN(series.lastFloatHistogramValue.Sum) && !value.IsStaleNaN(s.FH.Sum) } - ok, chunkCreated = series.appendFloatHistogram(s.T, s.FH, a.appendID, acc.appendChunkOpts) + // TODO(bwplotka): Add support for ST for FloatHistograms. + ok, chunkCreated = series.appendFloatHistogram(0, s.T, s.FH, a.appendID, acc.appendChunkOpts) if ok { if s.T < acc.inOrderMint { acc.inOrderMint = s.T @@ -1741,9 +1745,10 @@ func (a *headAppenderBase) Commit() (err error) { chunkDiskMapper: h.chunkDiskMapper, chunkRange: h.chunkRange.Load(), samplesPerChunk: h.opts.SamplesPerChunk, + enableSTStorage: h.opts.EnableSTStorage, }, - enc: record.Encoder{ - EnableSTStorage: false, + oooEnc: record.Encoder{ + EnableSTStorage: a.head.opts.EnableSharding, }, } @@ -1827,19 +1832,25 @@ type chunkOpts struct { chunkDiskMapper *chunks.ChunkDiskMapper chunkRange int64 samplesPerChunk int + + enableSTStorage bool } // append adds the sample (t, v) to the series. The caller also has to provide // the appendID for isolation. (The appendID can be zero, which results in no // isolation for this append.) // Series lock must be held when calling. -func (s *memSeries) append(t int64, v float64, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) { - c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncXOR, o) +func (s *memSeries) append(st, t int64, v float64, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) { + enc := chunkenc.EncXOR + if o.enableSTStorage { + enc = chunkenc.EncXOROptST + } + + c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, enc, o) if !sampleInOrder { return sampleInOrder, chunkCreated } - // TODO(krajorama): pass ST. - s.app.Append(0, t, v) + s.app.Append(st, t, v) c.maxTime = t @@ -1859,14 +1870,16 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, o chunkOpts) (sa // In case of recoding the existing chunk, a new chunk is allocated and the old chunk is dropped. // To keep the meaning of prometheus_tsdb_head_chunks and prometheus_tsdb_head_chunks_created_total // consistent, we return chunkCreated=false in this case. -func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) { +func (s *memSeries) appendHistogram(st, t int64, h *histogram.Histogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) { // Head controls the execution of recoding, so that we own the proper // chunk reference afterwards and mmap used up chunks. // Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway. prevApp, _ := s.app.(*chunkenc.HistogramAppender) - c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, chunkenc.EncHistogram, o) + enc := chunkenc.EncHistogram + // TODO(bwplotka): Implement histogram ST encoding and switch on o.enableSTStorage. + c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, enc, o) if !sampleInOrder { return sampleInOrder, chunkCreated } @@ -1882,7 +1895,7 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui } // TODO(krajorama): pass ST. - newChunk, recoded, s.app, _ = s.app.AppendHistogram(prevApp, 0, t, h, false) // false=request a new chunk if needed + newChunk, recoded, s.app, _ = s.app.AppendHistogram(prevApp, st, t, h, false) // false=request a new chunk if needed s.lastHistogramValue = h s.lastFloatHistogramValue = nil @@ -1917,14 +1930,16 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui // In case of recoding the existing chunk, a new chunk is allocated and the old chunk is dropped. // To keep the meaning of prometheus_tsdb_head_chunks and prometheus_tsdb_head_chunks_created_total // consistent, we return chunkCreated=false in this case. -func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) { +func (s *memSeries) appendFloatHistogram(st, t int64, fh *histogram.FloatHistogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) { // Head controls the execution of recoding, so that we own the proper // chunk reference afterwards and mmap used up chunks. // Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway. prevApp, _ := s.app.(*chunkenc.FloatHistogramAppender) - c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, chunkenc.EncFloatHistogram, o) + enc := chunkenc.EncFloatHistogram + // TODO(bwplotka): Implement histogram ST encoding and switch on o.enableSTStorage. + c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, enc, o) if !sampleInOrder { return sampleInOrder, chunkCreated } @@ -1939,8 +1954,7 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, prevApp = nil } - // TODO(krajorama): pass ST. - newChunk, recoded, s.app, _ = s.app.AppendFloatHistogram(prevApp, 0, t, fh, false) // False means request a new chunk if needed. + newChunk, recoded, s.app, _ = s.app.AppendFloatHistogram(prevApp, st, t, fh, false) // False means request a new chunk if needed. s.lastHistogramValue = nil s.lastFloatHistogramValue = fh diff --git a/tsdb/head_append_v2.go b/tsdb/head_append_v2.go index 87b62df536..68fc339177 100644 --- a/tsdb/head_append_v2.go +++ b/tsdb/head_append_v2.go @@ -148,10 +148,10 @@ func (a *headAppenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t i switch { case fh != nil: isStale = value.IsStaleNaN(fh.Sum) - appErr = a.appendFloatHistogram(s, t, fh, opts.RejectOutOfOrder) + appErr = a.appendFloatHistogram(s, st, t, fh, opts.RejectOutOfOrder) case h != nil: isStale = value.IsStaleNaN(h.Sum) - appErr = a.appendHistogram(s, t, h, opts.RejectOutOfOrder) + appErr = a.appendHistogram(s, st, t, h, opts.RejectOutOfOrder) default: isStale = value.IsStaleNaN(v) if isStale { @@ -177,7 +177,7 @@ func (a *headAppenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t i // we do not need to check for the difference between "unknown // series" and "known series with stNone". } - appErr = a.appendFloat(s, t, v, opts.RejectOutOfOrder) + appErr = a.appendFloat(s, st, t, v, opts.RejectOutOfOrder) } // Handle append error, if any. if appErr != nil { @@ -218,7 +218,7 @@ func (a *headAppenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t i return storage.SeriesRef(s.ref), partialErr } -func (a *headAppenderV2) appendFloat(s *memSeries, t int64, v float64, fastRejectOOO bool) error { +func (a *headAppenderV2) appendFloat(s *memSeries, st, t int64, v float64, fastRejectOOO bool) error { s.Lock() // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise // to skip that sample from the WAL and write only in the WBL. @@ -239,12 +239,12 @@ func (a *headAppenderV2) appendFloat(s *memSeries, t int64, v float64, fastRejec } b := a.getCurrentBatch(stFloat, s.ref) - b.floats = append(b.floats, record.RefSample{Ref: s.ref, T: t, V: v}) + b.floats = append(b.floats, record.RefSample{Ref: s.ref, ST: st, T: t, V: v}) b.floatSeries = append(b.floatSeries, s) return nil } -func (a *headAppenderV2) appendHistogram(s *memSeries, t int64, h *histogram.Histogram, fastRejectOOO bool) error { +func (a *headAppenderV2) appendHistogram(s *memSeries, _, t int64, h *histogram.Histogram, fastRejectOOO bool) error { s.Lock() // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise // to skip that sample from the WAL and write only in the WBL. @@ -263,17 +263,18 @@ func (a *headAppenderV2) appendHistogram(s *memSeries, t int64, h *histogram.His if err != nil { return err } - st := stHistogram + sTyp := stHistogram if h.UsesCustomBuckets() { - st = stCustomBucketHistogram + sTyp = stCustomBucketHistogram } - b := a.getCurrentBatch(st, s.ref) + b := a.getCurrentBatch(sTyp, s.ref) + // TODO(bwplotka): Add ST support for RefHistogramSample. b.histograms = append(b.histograms, record.RefHistogramSample{Ref: s.ref, T: t, H: h}) b.histogramSeries = append(b.histogramSeries, s) return nil } -func (a *headAppenderV2) appendFloatHistogram(s *memSeries, t int64, fh *histogram.FloatHistogram, fastRejectOOO bool) error { +func (a *headAppenderV2) appendFloatHistogram(s *memSeries, _, t int64, fh *histogram.FloatHistogram, fastRejectOOO bool) error { s.Lock() // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise // to skip that sample from the WAL and write only in the WBL. @@ -292,11 +293,12 @@ func (a *headAppenderV2) appendFloatHistogram(s *memSeries, t int64, fh *histogr if err != nil { return err } - st := stFloatHistogram + sTyp := stFloatHistogram if fh.UsesCustomBuckets() { - st = stCustomBucketFloatHistogram + sTyp = stCustomBucketFloatHistogram } - b := a.getCurrentBatch(st, s.ref) + b := a.getCurrentBatch(sTyp, s.ref) + // TODO(bwplotka): Add ST support for RefFloatHistogramSample. b.floatHistograms = append(b.floatHistograms, record.RefFloatHistogramSample{Ref: s.ref, T: t, FH: fh}) b.floatHistogramSeries = append(b.floatHistogramSeries, s) return nil @@ -354,7 +356,7 @@ func (a *headAppenderV2) bestEffortAppendSTZeroSample(s *memSeries, ls labels.La ZeroThreshold: fh.ZeroThreshold, CustomValues: fh.CustomValues, } - err = a.appendFloatHistogram(s, st, zeroFloatHistogram, true) + err = a.appendFloatHistogram(s, 0, st, zeroFloatHistogram, true) case h != nil: zeroHistogram := &histogram.Histogram{ // The STZeroSample represents a counter reset by definition. @@ -364,9 +366,9 @@ func (a *headAppenderV2) bestEffortAppendSTZeroSample(s *memSeries, ls labels.La ZeroThreshold: h.ZeroThreshold, CustomValues: h.CustomValues, } - err = a.appendHistogram(s, st, zeroHistogram, true) + err = a.appendHistogram(s, 0, st, zeroHistogram, true) default: - err = a.appendFloat(s, st, 0, true) + err = a.appendFloat(s, 0, st, 0, true) } if err != nil { diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 81cb236801..75cdfaa4d7 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -61,19 +61,20 @@ import ( ) // newTestHeadDefaultOptions returns the HeadOptions that should be used by default in unit tests. -func newTestHeadDefaultOptions(chunkRange int64, oooEnabled bool) *HeadOptions { +func newTestHeadDefaultOptions(chunkRange int64, enabledOOO, enabledSTStorage bool) *HeadOptions { opts := DefaultHeadOptions() opts.ChunkRange = chunkRange opts.EnableExemplarStorage = true + opts.EnableSTStorage = enabledSTStorage opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars) - if oooEnabled { + if enabledOOO { opts.OutOfOrderTimeWindow.Store(10 * time.Minute.Milliseconds()) } return opts } -func newTestHead(t testing.TB, chunkRange int64, compressWAL compression.Type, oooEnabled bool) (*Head, *wlog.WL) { - return newTestHeadWithOptions(t, compressWAL, newTestHeadDefaultOptions(chunkRange, oooEnabled)) +func newTestHead(t testing.TB, chunkRange int64, compressWAL compression.Type, enabledOOO, enabledSTStorage bool) (*Head, *wlog.WL) { + return newTestHeadWithOptions(t, compressWAL, newTestHeadDefaultOptions(chunkRange, enabledOOO, enabledSTStorage)) } func newTestHeadWithOptions(t testing.TB, compressWAL compression.Type, opts *HeadOptions) (*Head, *wlog.WL) { @@ -102,7 +103,7 @@ func newTestHeadWithOptions(t testing.TB, compressWAL compression.Type, opts *He func BenchmarkCreateSeries(b *testing.B) { series := genSeries(b.N, 10, 0, 0) - h, _ := newTestHead(b, 10000, compression.None, false) + h, _ := newTestHead(b, 10000, compression.None, false, false) b.ReportAllocs() b.ResetTimer() @@ -256,7 +257,7 @@ func BenchmarkLoadWLs(b *testing.B) { // Rough estimates of most common % of samples that have an exemplar for each scrape. exemplarsPercentages := []float64{0, 0.5, 1, 5} lastExemplarsPerSeries := -1 - for _, enableStStorage := range []bool{false, true} { + for _, enableSTStorage := range []bool{false, true} { for _, c := range cases { missingSeriesPercentages := []float64{0, 0.1} for _, missingSeriesPct := range missingSeriesPercentages { @@ -268,7 +269,7 @@ func BenchmarkLoadWLs(b *testing.B) { continue } lastExemplarsPerSeries = exemplarsPerSeries - b.Run(fmt.Sprintf("batches=%d,seriesPerBatch=%d,samplesPerSeries=%d,exemplarsPerSeries=%d,mmappedChunkT=%d,oooSeriesPct=%.3f,oooSamplesPct=%.3f,oooCapMax=%d,missingSeriesPct=%.3f,stStorage=%v", c.batches, c.seriesPerBatch, c.samplesPerSeries, exemplarsPerSeries, c.mmappedChunkT, c.oooSeriesPct, c.oooSamplesPct, c.oooCapMax, missingSeriesPct, enableStStorage), + b.Run(fmt.Sprintf("batches=%d/seriesPerBatch=%d/samplesPerSeries=%d/exemplarsPerSeries=%d/mmappedChunkT=%d/oooSeriesPct=%.3f/oooSamplesPct=%.3f/oooCapMax=%d/missingSeriesPct=%.3f/stStorage=%v", c.batches, c.seriesPerBatch, c.samplesPerSeries, exemplarsPerSeries, c.mmappedChunkT, c.oooSeriesPct, c.oooSamplesPct, c.oooCapMax, missingSeriesPct, enableSTStorage), func(b *testing.B) { dir := b.TempDir() @@ -307,7 +308,7 @@ func BenchmarkLoadWLs(b *testing.B) { writeSeries = newWriteSeries } - buf = populateTestWL(b, wal, []any{writeSeries}, buf, enableStStorage) + buf = populateTestWL(b, wal, []any{writeSeries}, buf, enableSTStorage) } // Write samples. @@ -333,7 +334,7 @@ func BenchmarkLoadWLs(b *testing.B) { V: float64(i) * 100, }) } - buf = populateTestWL(b, wal, []any{refSamples}, buf, enableStStorage) + buf = populateTestWL(b, wal, []any{refSamples}, buf, enableSTStorage) } } @@ -345,11 +346,12 @@ func BenchmarkLoadWLs(b *testing.B) { chunkDiskMapper: chunkDiskMapper, chunkRange: c.mmappedChunkT, samplesPerChunk: DefaultSamplesPerChunk, + enableSTStorage: enableSTStorage, } for k := 0; k < c.batches*c.seriesPerBatch; k++ { // Create one mmapped chunk per series, with one sample at the given time. s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, 0, defaultIsolationDisabled, false) - s.append(c.mmappedChunkT, 42, 0, cOpts) + s.append(c.mmappedChunkT, 10, 42, 0, cOpts) // There's only one head chunk because only a single sample is appended. mmapChunks() // ignores the latest chunk, so we need to cut a new head chunk to guarantee the chunk with // the sample at c.mmappedChunkT is mmapped. @@ -372,7 +374,7 @@ func BenchmarkLoadWLs(b *testing.B) { Labels: labels.FromStrings("trace_id", fmt.Sprintf("trace-%d", i)), }) } - buf = populateTestWL(b, wal, []any{refExemplars}, buf, enableStStorage) + buf = populateTestWL(b, wal, []any{refExemplars}, buf, enableSTStorage) } } @@ -396,15 +398,16 @@ func BenchmarkLoadWLs(b *testing.B) { } refSamples = append(refSamples, record.RefSample{ Ref: ref, + ST: int64(i)*10 - 9, T: int64(i) * 10, V: float64(i) * 100, }) } if shouldAddMarkers { - populateTestWL(b, wbl, []any{refMarkers}, buf, enableStStorage) + populateTestWL(b, wbl, []any{refMarkers}, buf, enableSTStorage) } - buf = populateTestWL(b, wal, []any{refSamples}, buf, enableStStorage) - buf = populateTestWL(b, wbl, []any{refSamples}, buf, enableStStorage) + buf = populateTestWL(b, wal, []any{refSamples}, buf, enableSTStorage) + buf = populateTestWL(b, wbl, []any{refSamples}, buf, enableSTStorage) } } @@ -415,6 +418,7 @@ func BenchmarkLoadWLs(b *testing.B) { opts := DefaultHeadOptions() opts.ChunkRange = 1000 opts.ChunkDirRoot = dir + opts.EnableSTStorage = enableSTStorage if c.oooCapMax > 0 { opts.OutOfOrderCapMax.Store(c.oooCapMax) } @@ -477,7 +481,7 @@ func BenchmarkLoadRealWLs(b *testing.B) { // TestHead_InitAppenderRace_ErrOutOfBounds tests against init races with maxTime vs minTime on empty head concurrent appends. // See: https://github.com/prometheus/prometheus/pull/17963 func TestHead_InitAppenderRace_ErrOutOfBounds(t *testing.T) { - head, _ := newTestHead(t, DefaultBlockDuration, compression.None, false) + head, _ := newTestHead(t, DefaultBlockDuration, compression.None, false, false) require.NoError(t, head.Init(0)) ts := timestamp.FromTime(time.Now()) @@ -515,7 +519,7 @@ func TestHead_InitAppenderRace_ErrOutOfBounds(t *testing.T) { func TestHead_HighConcurrencyReadAndWrite(t *testing.T) { for _, appV2 := range []bool{false, true} { t.Run(fmt.Sprintf("appV2=%v", appV2), func(t *testing.T) { - head, _ := newTestHead(t, DefaultBlockDuration, compression.None, false) + head, _ := newTestHead(t, DefaultBlockDuration, compression.None, false, false) seriesCnt := 1000 readConcurrency := 2 @@ -713,9 +717,9 @@ func TestHead_HighConcurrencyReadAndWrite(t *testing.T) { } func TestHead_ReadWAL(t *testing.T) { - for _, enableStStorage := range []bool{false, true} { + for _, enableSTStorage := range []bool{false, true} { for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} { - t.Run(fmt.Sprintf("compress=%s,stStorage=%v", compress, enableStStorage), func(t *testing.T) { + t.Run(fmt.Sprintf("compress=%s/stStorage=%v", compress, enableSTStorage), func(t *testing.T) { entries := []any{ []record.RefSeries{ {Ref: 10, Labels: labels.FromStrings("a", "1")}, @@ -723,9 +727,9 @@ func TestHead_ReadWAL(t *testing.T) { {Ref: 100, Labels: labels.FromStrings("a", "3")}, }, []record.RefSample{ - {Ref: 0, T: 99, V: 1}, - {Ref: 10, T: 100, V: 2}, - {Ref: 100, T: 100, V: 3}, + {Ref: 0, ST: 1, T: 99, V: 1}, + {Ref: 10, ST: 1, T: 100, V: 2}, + {Ref: 100, ST: 1, T: 100, V: 3}, }, []record.RefSeries{ {Ref: 50, Labels: labels.FromStrings("a", "4")}, @@ -733,10 +737,10 @@ func TestHead_ReadWAL(t *testing.T) { {Ref: 101, Labels: labels.FromStrings("a", "3")}, }, []record.RefSample{ - {Ref: 10, T: 101, V: 5}, - {Ref: 50, T: 101, V: 6}, + {Ref: 10, ST: 100, T: 101, V: 5}, + {Ref: 50, ST: 100, T: 101, V: 6}, // Sample for duplicate series record. - {Ref: 101, T: 101, V: 7}, + {Ref: 101, ST: 100, T: 101, V: 7}, }, []tombstones.Stone{ {Ref: 0, Intervals: []tombstones.Interval{{Mint: 99, Maxt: 101}}}, @@ -754,9 +758,9 @@ func TestHead_ReadWAL(t *testing.T) { }, } - head, w := newTestHead(t, 1000, compress, false) + head, w := newTestHead(t, 1000, compress, false, enableSTStorage) - populateTestWL(t, w, entries, nil, enableStStorage) + populateTestWL(t, w, entries, nil, enableSTStorage) require.NoError(t, head.Init(math.MinInt64)) require.Equal(t, uint64(101), head.lastSeriesID.Load()) @@ -794,7 +798,10 @@ func TestHead_ReadWAL(t *testing.T) { // Verify samples and exemplar for series 10. c, _, _, err := s10.chunk(0, head.chunkDiskMapper, &head.memChunkPool) require.NoError(t, err) - require.Equal(t, []sample{{0, 100, 2, nil, nil}, {0, 101, 5, nil, nil}}, expandChunk(c.chunk.Iterator(nil))) + require.Equal(t, []sample{ + {1, 100, 2, nil, nil}, + {100, 101, 5, nil, nil}, + }, expandChunk(c.chunk.Iterator(nil))) q, err := head.ExemplarQuerier(context.Background()) require.NoError(t, err) @@ -837,7 +844,7 @@ func TestHead_ReadWAL(t *testing.T) { } func TestHead_WALMultiRef(t *testing.T) { - head, w := newTestHead(t, 1000, compression.None, false) + head, w := newTestHead(t, 1000, compression.None, false, false) require.NoError(t, head.Init(0)) @@ -912,9 +919,9 @@ func TestHead_WALCheckpointMultiRef(t *testing.T) { {Ref: 2, Labels: labels.FromStrings("a", "1")}, }, []record.RefSample{ - {Ref: 1, T: 100, V: 1}, - {Ref: 2, T: 200, V: 2}, - {Ref: 2, T: 500, V: 3}, + {Ref: 1, ST: 1, T: 100, V: 1}, + {Ref: 2, ST: 101, T: 200, V: 2}, + {Ref: 2, ST: 101, T: 500, V: 3}, }, }, expectedWalExpiry: 500, @@ -925,7 +932,7 @@ func TestHead_WALCheckpointMultiRef(t *testing.T) { {Ref: 2, Labels: labels.FromStrings("a", "1")}, }, []record.RefSample{ - {Ref: 2, T: 500, V: 3}, + {Ref: 2, ST: 101, T: 500, V: 3}, }, }, }, @@ -1038,7 +1045,7 @@ func TestHead_WALCheckpointMultiRef(t *testing.T) { {Ref: 2, Labels: labels.FromStrings("a", "1")}, }, []record.RefSample{ - {Ref: 2, T: 500, V: 3}, + {Ref: 2, ST: 101, T: 500, V: 3}, }, []tombstones.Stone{ {Ref: 2, Intervals: []tombstones.Interval{{Mint: 0, Maxt: 500}}}, @@ -1074,8 +1081,8 @@ func TestHead_WALCheckpointMultiRef(t *testing.T) { {Ref: 2, Labels: labels.FromStrings("a", "1")}, }, []record.RefSample{ - {Ref: 2, T: 500, V: 2}, - {Ref: 1, T: 900, V: 3}, + {Ref: 2, ST: 101, T: 500, V: 2}, + {Ref: 1, ST: 501, T: 900, V: 3}, }, []tombstones.Stone{ {Ref: 2, Intervals: []tombstones.Interval{{Mint: 0, Maxt: 750}}}, @@ -1097,17 +1104,17 @@ func TestHead_WALCheckpointMultiRef(t *testing.T) { {Ref: 1, Labels: labels.FromStrings("a", "1")}, }, []record.RefSample{ - {Ref: 1, T: 900, V: 3}, + {Ref: 1, ST: 501, T: 900, V: 3}, }, }, }, } - for _, enableStStorage := range []bool{false, true} { + for _, enableSTStorage := range []bool{false, true} { for _, tc := range cases { - t.Run(tc.name+",stStorage="+strconv.FormatBool(enableStStorage), func(t *testing.T) { - h, w := newTestHead(t, 1000, compression.None, false) - populateTestWL(t, w, tc.walEntries, nil, enableStStorage) + t.Run(fmt.Sprintf("case=%v/stStorage=%v", tc.name, enableSTStorage), func(t *testing.T) { + h, w := newTestHead(t, 1000, compression.None, false, enableSTStorage) + populateTestWL(t, w, tc.walEntries, nil, enableSTStorage) first, _, err := wlog.Segments(w.Dir()) require.NoError(t, err) @@ -1181,7 +1188,7 @@ func TestHead_KeepSeriesInWALCheckpoint(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - h, _ := newTestHead(t, 1000, compression.None, false) + h, _ := newTestHead(t, 1000, compression.None, false, false) if tc.prepare != nil { tc.prepare(t, h) @@ -1690,9 +1697,9 @@ func TestMemSeries_truncateChunks_scenarios(t *testing.T) { } func TestHeadDeleteSeriesWithoutSamples(t *testing.T) { - for _, enableStStorage := range []bool{false, true} { + for _, enableSTStorage := range []bool{false, true} { for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} { - t.Run(fmt.Sprintf("compress=%s,stStorage=%v", compress, enableStStorage), func(t *testing.T) { + t.Run(fmt.Sprintf("compress=%s,stStorage=%v", compress, enableSTStorage), func(t *testing.T) { entries := []any{ []record.RefSeries{ {Ref: 10, Labels: labels.FromStrings("a", "1")}, @@ -1708,7 +1715,7 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) { } head, w := newTestHead(t, 1000, compress, false) - populateTestWL(t, w, entries, nil, enableStStorage) + populateTestWL(t, w, entries, nil, enableSTStorage) require.NoError(t, head.Init(math.MinInt64)) @@ -2575,8 +2582,8 @@ func TestHead_ReturnsSortedLabelValues(t *testing.T) { // TestWalRepair_DecodingError ensures that a repair is run for an error // when decoding a record. func TestWalRepair_DecodingError(t *testing.T) { - for _, enableStStorage := range []bool{false, true} { - enc := record.Encoder{EnableSTStorage: enableStStorage} + for _, enableSTStorage := range []bool{false, true} { + enc := record.Encoder{EnableSTStorage: enableSTStorage} for name, test := range map[string]struct { corrFunc func(rec []byte) []byte // Func that applies the corruption to a record. rec []byte @@ -2609,7 +2616,7 @@ func TestWalRepair_DecodingError(t *testing.T) { }, } { for _, compress := range []compression.Type{compression.None, compression.Snappy, compression.Zstd} { - t.Run(fmt.Sprintf("%s,compress=%s,stStorage=%v", name, compress, enableStStorage), func(t *testing.T) { + t.Run(fmt.Sprintf("%s,compress=%s,stStorage=%v", name, compress, enableSTStorage), func(t *testing.T) { dir := t.TempDir() // Fill the wal and corrupt it. @@ -2672,9 +2679,9 @@ func TestWalRepair_DecodingError(t *testing.T) { // TestWblRepair_DecodingError ensures that a repair is run for an error // when decoding a record. func TestWblRepair_DecodingError(t *testing.T) { - for _, enableStStorage := range []bool{false, true} { - t.Run("enableStStorage="+strconv.FormatBool(enableStStorage), func(t *testing.T) { - enc := record.Encoder{EnableSTStorage: enableStStorage} + for _, enableSTStorage := range []bool{false, true} { + t.Run("enableSTStorage="+strconv.FormatBool(enableSTStorage), func(t *testing.T) { + enc := record.Encoder{EnableSTStorage: enableSTStorage} corrFunc := func(rec []byte) []byte { return rec[:3] } @@ -4378,8 +4385,8 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) { } func TestChunkSnapshot(t *testing.T) { - for _, enableStStorage := range []bool{false, true} { - t.Run("enableStStorage="+strconv.FormatBool(enableStStorage), func(t *testing.T) { + for _, enableSTStorage := range []bool{false, true} { + t.Run("enableSTStorage="+strconv.FormatBool(enableSTStorage), func(t *testing.T) { head, _ := newTestHead(t, 120*4, compression.None, false) defer func() { head.opts.EnableMemorySnapshotOnShutdown = false @@ -4525,7 +4532,7 @@ func TestChunkSnapshot(t *testing.T) { require.NoError(t, app.Commit()) // Add some tombstones. - enc := record.Encoder{EnableSTStorage: enableStStorage} + enc := record.Encoder{EnableSTStorage: enableSTStorage} for i := 1; i <= numSeries; i++ { ref := storage.SeriesRef(i) itvs := tombstones.Intervals{ @@ -4599,7 +4606,7 @@ func TestChunkSnapshot(t *testing.T) { require.NoError(t, app.Commit()) // Add more tombstones. - enc := record.Encoder{EnableSTStorage: enableStStorage} + enc := record.Encoder{EnableSTStorage: enableSTStorage} for i := 1; i <= numSeries; i++ { ref := storage.SeriesRef(i) itvs := tombstones.Intervals{ @@ -5392,8 +5399,8 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { // Tests https://github.com/prometheus/prometheus/issues/9725. func TestChunkSnapshotReplayBug(t *testing.T) { - for _, enableStStorage := range []bool{false, true} { - t.Run("enableStStorage="+strconv.FormatBool(enableStStorage), func(t *testing.T) { + for _, enableSTStorage := range []bool{false, true} { + t.Run("enableSTStorage="+strconv.FormatBool(enableSTStorage), func(t *testing.T) { dir := t.TempDir() wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy) require.NoError(t, err) @@ -5418,7 +5425,7 @@ func TestChunkSnapshotReplayBug(t *testing.T) { } // Add a sample so that the series is not garbage collected. samplesRec := record.RefSample{Ref: ref, T: 1000, V: 1000} - enc := record.Encoder{EnableSTStorage: enableStStorage} + enc := record.Encoder{EnableSTStorage: enableSTStorage} rec := enc.Series([]record.RefSeries{seriesRec}, buf) buf = rec[:0]