From 7991bcbff95265b011d58e69fd78329bb45b560a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gy=C3=B6rgy=20Krajcsovits?= Date: Wed, 18 Feb 2026 18:51:33 +0100 Subject: [PATCH] feat(tsdb): adopt head append changes from 18026 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: György Krajcsovits --- storage/series.go | 4 +- tsdb/chunkenc/chunk.go | 52 +---------- tsdb/chunks/chunks.go | 2 +- tsdb/db.go | 1 + tsdb/db_append_v2_test.go | 175 ++++++++++++++++++++++++++++++++++++ tsdb/db_test.go | 63 +++++++++++++ tsdb/head.go | 4 +- tsdb/head_append.go | 40 ++++----- tsdb/head_append_v2.go | 11 +-- tsdb/head_append_v2_test.go | 148 ++++++++++++++++++++++++++++-- tsdb/head_test.go | 136 +++++++++++++++++++++++++++- tsdb/head_wal.go | 6 +- tsdb/ooo_head.go | 8 +- tsdb/querier.go | 4 +- tsdb/querier_test.go | 2 +- 15 files changed, 554 insertions(+), 102 deletions(-) diff --git a/storage/series.go b/storage/series.go index 59f6685793..d7b21a0dc2 100644 --- a/storage/series.go +++ b/storage/series.go @@ -115,7 +115,7 @@ func NewListSeriesIterator(samples Samples) chunkenc.Iterator { func (it *listSeriesIterator) Encoding() chunkenc.Encoding { s := it.samples.Get(it.idx) - encoding := s.Type().ChunkEncodingWithST(s.ST()) + encoding := s.Type().ChunkEncoding(s.ST() != 0) if encoding == chunkenc.EncNone { panic(fmt.Sprintf("unknown sample type %s", s.Type().String())) } @@ -357,7 +357,7 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { if typ != lastType || lastHadST != hasST || i >= seriesToChunkEncoderSplit { // Create a new chunk if the sample type changed or too many samples in the current one. chks = appendChunk(chks, mint, maxt, chk) - chk, err = chunkenc.NewEmptyChunk(typ.ChunkEncoding(), hasST) + chk, err = typ.NewChunk(hasST) if err != nil { return errChunksIterator{err: err} } diff --git a/tsdb/chunkenc/chunk.go b/tsdb/chunkenc/chunk.go index cd69c25093..f06a5bf208 100644 --- a/tsdb/chunkenc/chunk.go +++ b/tsdb/chunkenc/chunk.go @@ -196,36 +196,7 @@ func (v ValueType) String() string { } } -func (v ValueType) ChunkEncoding() Encoding { - switch v { - case ValFloat: - return EncXOR - case ValHistogram: - return EncHistogram - case ValFloatHistogram: - return EncFloatHistogram - default: - return EncNone - } -} - -func (v ValueType) ChunkEncodingWithST(st int64) Encoding { - switch v { - case ValFloat: - if st != 0 { - return EncXOROptST - } - return EncXOR - case ValHistogram: - return EncHistogram - case ValFloatHistogram: - return EncFloatHistogram - default: - return EncNone - } -} - -func (v ValueType) ChunkEncodingWithStoreST(storeST bool) Encoding { +func (v ValueType) ChunkEncoding(storeST bool) Encoding { switch v { case ValFloat: if storeST { @@ -242,21 +213,7 @@ func (v ValueType) ChunkEncodingWithStoreST(storeST bool) Encoding { } func (v ValueType) NewChunk(storeST bool) (Chunk, error) { - switch v { - case ValFloat: - if storeST { - return NewXOROptSTChunk(), nil - } - return NewXORChunk(), nil - case ValHistogram: - // TODO(krajorama): return a ST capable histogram chunk when they are supported. - return NewHistogramChunk(), nil - case ValFloatHistogram: - // TODO(krajorama): return a ST capable float histogram chunk when they are supported. - return NewFloatHistogramChunk(), nil - default: - return nil, fmt.Errorf("value type %v unsupported", v) - } + return NewEmptyChunk(v.ChunkEncoding(storeST)) } // MockSeriesIterator returns an iterator for a mock series with custom @@ -451,12 +408,9 @@ func FromData(e Encoding, d []byte) (Chunk, error) { } // NewEmptyChunk returns an empty chunk for the given encoding. -func NewEmptyChunk(e Encoding, storeST bool) (Chunk, error) { +func NewEmptyChunk(e Encoding) (Chunk, error) { switch e { case EncXOR: - if storeST { - return NewXOROptSTChunk(), nil - } return NewXORChunk(), nil case EncHistogram: return NewHistogramChunk(), nil diff --git a/tsdb/chunks/chunks.go b/tsdb/chunks/chunks.go index 88835b382a..6084f7148e 100644 --- a/tsdb/chunks/chunks.go +++ b/tsdb/chunks/chunks.go @@ -166,7 +166,7 @@ func ChunkFromSamplesGeneric(s Samples) (Meta, error) { } // Request storing ST in the chunk if available. - c, err := chunkenc.NewEmptyChunk(sampleType.ChunkEncoding(), hasST) + c, err := sampleType.NewChunk(hasST) if err != nil { return Meta{}, err } diff --git a/tsdb/db.go b/tsdb/db.go index 1d73628bfd..81c7a6c460 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -1044,6 +1044,7 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn headOpts.OutOfOrderCapMax.Store(opts.OutOfOrderCapMax) headOpts.EnableSharding = opts.EnableSharding headOpts.EnableSTAsZeroSample = opts.EnableSTAsZeroSample + headOpts.EnableSTStorage.Store(opts.EnableSTStorage) headOpts.EnableMetadataWALRecords = opts.EnableMetadataWALRecords if opts.WALReplayConcurrency > 0 { headOpts.WALReplayConcurrency = opts.WALReplayConcurrency diff --git a/tsdb/db_append_v2_test.go b/tsdb/db_append_v2_test.go index 08e97d1113..9fb5ea07ac 100644 --- a/tsdb/db_append_v2_test.go +++ b/tsdb/db_append_v2_test.go @@ -7512,6 +7512,70 @@ func TestAbortBlockCompactions_AppendV2(t *testing.T) { require.Equal(t, 4, compactions, "expected 4 compactions to be completed") } +// TestCompactHeadWithSTStorage_AppendV2 ensures that when EnableSTStorage is true, +// compacted blocks contain chunks with EncXOROptST encoding for float samples. +func TestCompactHeadWithSTStorage_AppendV2(t *testing.T) { + t.Parallel() + + opts := &Options{ + RetentionDuration: int64(time.Hour * 24 * 15 / time.Millisecond), + NoLockfile: true, + MinBlockDuration: int64(time.Hour * 2 / time.Millisecond), + MaxBlockDuration: int64(time.Hour * 2 / time.Millisecond), + WALCompression: compression.Snappy, + EnableSTStorage: true, + } + db := newTestDB(t, withOpts(opts)) + ctx := context.Background() + app := db.AppenderV2(ctx) + + maxt := 100 + for i := range maxt { + // AppendV2 signature: (ref, labels, st, t, v, h, fh, opts) + // st=0 (start timestamp), t=i (sample timestamp) + // TODO(krajorama): verify with non zero st once the API supports it. + _, err := app.Append(0, labels.FromStrings("a", "b"), 0, int64(i), float64(i), nil, nil, storage.AOptions{}) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + + // Compact the Head to create a new block. + require.NoError(t, db.CompactHead(NewRangeHead(db.Head(), 0, int64(maxt)-1))) + // Check that we have exactly one block. + require.Len(t, db.Blocks(), 1) + b := db.Blocks()[0] + + // Open chunk reader and index reader. + chunkr, err := b.Chunks() + require.NoError(t, err) + defer chunkr.Close() + + indexr, err := b.Index() + require.NoError(t, err) + defer indexr.Close() + + // Get postings for the series. + p, err := indexr.Postings(ctx, "a", "b") + require.NoError(t, err) + + chunkCount := 0 + for p.Next() { + var builder labels.ScratchBuilder + var chks []chunks.Meta + require.NoError(t, indexr.Series(p.At(), &builder, &chks)) + + for _, chk := range chks { + c, _, err := chunkr.ChunkOrIterable(chk) + require.NoError(t, err) + require.Equal(t, chunkenc.EncXOROptST, c.Encoding(), + "expected EncXOROptST encoding when EnableSTStorage=true, got %s", c.Encoding()) + chunkCount++ + } + } + require.NoError(t, p.Err()) + require.Positive(t, chunkCount, "expected at least one chunk") +} + func TestNewCompactorFunc_AppendV2(t *testing.T) { opts := DefaultOptions() block1 := ulid.MustNew(1, nil) @@ -7543,3 +7607,114 @@ func TestNewCompactorFunc_AppendV2(t *testing.T) { require.Len(t, ulids, 1) require.Equal(t, block2, ulids[0]) } + +// TestDBAppenderV2_STStorage_OutOfOrder verifies that ST storage works correctly +// when samples are appended out of order and can be queried using ChunkQuerier. +func TestDBAppenderV2_STStorage_OutOfOrder(t *testing.T) { + testHistogram := tsdbutil.GenerateTestHistogram(1) + testHistogram.CounterResetHint = histogram.NotCounterReset + + testCases := []struct { + name string + appendSamples []chunks.Sample // Samples in append order (out of order) + expectedSamples []chunks.Sample // Expected samples in time order after query + }{ + { + name: "Float samples out of order", + appendSamples: []chunks.Sample{ + newSample(20, 200, 2.0, nil, nil), // Append second sample first + newSample(10, 100, 1.0, nil, nil), // Append first sample second (OOO) + newSample(30, 300, 3.0, nil, nil), // Append third sample last + newSample(25, 250, 2.5, nil, nil), // Append middle sample (OOO) + }, + expectedSamples: []chunks.Sample{ + newSample(10, 100, 1.0, nil, nil), + newSample(20, 200, 2.0, nil, nil), + newSample(25, 250, 2.5, nil, nil), + newSample(30, 300, 3.0, nil, nil), + }, + }, + { + name: "Histogram samples out of order", + appendSamples: []chunks.Sample{ + newSample(30, 300, 0, testHistogram, nil), // Append third sample first + newSample(10, 100, 0, testHistogram, nil), // Append first sample second (OOO) + newSample(20, 200, 0, testHistogram, nil), // Append second sample last (OOO) + }, + // Histograms don't support ST storage yet, should return 0 for ST + expectedSamples: []chunks.Sample{ + newSample(0, 100, 0, testHistogram, nil), + newSample(0, 200, 0, testHistogram, nil), + newSample(0, 300, 0, testHistogram, nil), + }, + }, + { + name: "Mixed float samples with same ST", + appendSamples: []chunks.Sample{ + newSample(10, 200, 2.0, nil, nil), + newSample(10, 100, 1.0, nil, nil), // OOO with same ST + newSample(10, 300, 3.0, nil, nil), + }, + expectedSamples: []chunks.Sample{ + newSample(10, 100, 1.0, nil, nil), + newSample(10, 200, 2.0, nil, nil), + newSample(10, 300, 3.0, nil, nil), + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + opts := DefaultOptions() + opts.OutOfOrderTimeWindow = 300 * time.Minute.Milliseconds() + opts.EnableSTStorage = true + db := newTestDB(t, withOpts(opts)) + db.DisableCompactions() + + lbls := labels.FromStrings("foo", "bar") + + // Append samples in the specified (out of order) sequence + for _, s := range tc.appendSamples { + app := db.AppenderV2(context.Background()) + _, err := app.Append(0, lbls, s.ST(), s.T(), s.F(), s.H(), s.FH(), storage.AOptions{}) + require.NoError(t, err, "Appending OOO sample with ST should succeed") + require.NoError(t, app.Commit(), "Committing OOO sample with ST should succeed") + } + + // Query using ChunkQuerier to verify ST values + querier, err := db.ChunkQuerier(math.MinInt64, math.MaxInt64) + require.NoError(t, err) + defer querier.Close() + + ss := querier.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + require.True(t, ss.Next(), "Should have series") + series := ss.At() + require.NoError(t, ss.Err()) + require.False(t, ss.Next(), "Should have only one series") + + // Iterate through chunks and collect samples using storage.ExpandSamples + chunkIt := series.Iterator(nil) + var actualSamples []chunks.Sample + + for chunkIt.Next() { + chk := chunkIt.At() + it := chk.Chunk.Iterator(nil) + samples, err := storage.ExpandSamples(it, newSample) + require.NoError(t, err) + actualSamples = append(actualSamples, samples...) + } + require.NoError(t, chunkIt.Err()) + + // Verify samples are in time order with correct values + // Use requireEqualSamplesIgnoreCounterResets to ignore histogram counter reset hints + requireEqualSamples(t, lbls.String(), tc.expectedSamples, actualSamples, requireEqualSamplesIgnoreCounterResets) + + // Additionally verify ST values match expectations + require.Len(t, actualSamples, len(tc.expectedSamples)) + for i, expected := range tc.expectedSamples { + actual := actualSamples[i] + require.Equal(t, expected.ST(), actual.ST(), "Sample %d: ST should match", i) + } + }) + } +} diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 13464c26e5..a9db077228 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -9626,3 +9626,66 @@ func TestStaleSeriesCompactionWithZeroSeries(t *testing.T) { // Should still have no blocks since there was nothing to compact. require.Empty(t, db.Blocks()) } + +// TestCompactHeadWithSTStorage ensures that when EnableSTStorage is true, +// compacted blocks contain chunks with EncXOR encoding for float samples +// when using the original Appender (which does not support start timestamps). +func TestCompactHeadWithSTStorage(t *testing.T) { + t.Parallel() + + opts := &Options{ + RetentionDuration: int64(time.Hour * 24 * 15 / time.Millisecond), + NoLockfile: true, + MinBlockDuration: int64(time.Hour * 2 / time.Millisecond), + MaxBlockDuration: int64(time.Hour * 2 / time.Millisecond), + WALCompression: compression.Snappy, + EnableSTStorage: true, + } + db := newTestDB(t, withOpts(opts)) + ctx := context.Background() + app := db.Appender(ctx) + + maxt := 100 + for i := range maxt { + // Original Appender signature: (ref, labels, t, v) + _, err := app.Append(0, labels.FromStrings("a", "b"), int64(i), float64(i)) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + + // Compact the Head to create a new block. + require.NoError(t, db.CompactHead(NewRangeHead(db.Head(), 0, int64(maxt)-1))) + // Check that we have exactly one block. + require.Len(t, db.Blocks(), 1) + b := db.Blocks()[0] + + // Open chunk reader and index reader. + chunkr, err := b.Chunks() + require.NoError(t, err) + defer chunkr.Close() + + indexr, err := b.Index() + require.NoError(t, err) + defer indexr.Close() + + // Get postings for the series. + p, err := indexr.Postings(ctx, "a", "b") + require.NoError(t, err) + + chunkCount := 0 + for p.Next() { + var builder labels.ScratchBuilder + var chks []chunks.Meta + require.NoError(t, indexr.Series(p.At(), &builder, &chks)) + + for _, chk := range chks { + c, _, err := chunkr.ChunkOrIterable(chk) + require.NoError(t, err) + require.Equal(t, chunkenc.EncXOROptST, c.Encoding(), + "expected EncXOR encoding when using original Appender, got %s", c.Encoding()) + chunkCount++ + } + } + require.NoError(t, p.Err()) + require.Positive(t, chunkCount, "expected at least one chunk") +} diff --git a/tsdb/head.go b/tsdb/head.go index e88a5e0803..917bd666d3 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -161,8 +161,8 @@ type HeadOptions struct { OutOfOrderCapMax atomic.Int64 // EnableSTStorage determines whether databases (WAL/WBL, tsdb, - // agent) should set a Start Time value per sample. Currently not - // user-settable and only set in tests. + // agent) should set a Start Time value per sample. + // Represents 'st-storage' feature flag. EnableSTStorage atomic.Bool ChunkRange int64 diff --git a/tsdb/head_append.go b/tsdb/head_append.go index ebadb1010a..9f0e7b4142 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -1389,8 +1389,7 @@ func (a *headAppenderBase) commitFloats(b *appendBatch, acc *appenderCommitConte // Sample is OOO and OOO handling is enabled // and the delta is within the OOO tolerance. var mmapRefs []chunks.ChunkDiskMapperRef - // TODO(krajorama,ywwg): Pass ST when available in WAL. - ok, chunkCreated, mmapRefs = series.insert(a.storeST, 0, s.T, s.V, nil, nil, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger) + ok, chunkCreated, mmapRefs = series.insert(a.storeST, s.ST, s.T, s.V, nil, nil, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger) if chunkCreated { r, ok := acc.oooMmapMarkers[series.ref] if !ok || r != nil { @@ -1434,8 +1433,7 @@ func (a *headAppenderBase) commitFloats(b *appendBatch, acc *appenderCommitConte default: newlyStale := !value.IsStaleNaN(series.lastValue) && value.IsStaleNaN(s.V) staleToNonStale := value.IsStaleNaN(series.lastValue) && !value.IsStaleNaN(s.V) - // TODO(krajorama,ywwg): pass ST when available in WAL. - ok, chunkCreated = series.append(a.storeST, 0, s.T, s.V, a.appendID, acc.appendChunkOpts) + ok, chunkCreated = series.append(a.storeST, s.ST, s.T, s.V, a.appendID, acc.appendChunkOpts) if ok { if s.T < acc.inOrderMint { acc.inOrderMint = s.T @@ -1842,7 +1840,7 @@ type chunkOpts struct { // isolation for this append.) // Series lock must be held when calling. func (s *memSeries) append(storeST bool, st, t int64, v float64, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) { - c, sampleInOrder, chunkCreated := s.appendPreprocessor(storeST, t, chunkenc.ValFloat.ChunkEncodingWithStoreST(storeST), o) + c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.ValFloat.ChunkEncoding(storeST), o) if !sampleInOrder { return sampleInOrder, chunkCreated } @@ -1873,7 +1871,7 @@ func (s *memSeries) appendHistogram(storeST bool, st, t int64, h *histogram.Hist // Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway. prevApp, _ := s.app.(*chunkenc.HistogramAppender) - c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(storeST, t, chunkenc.ValHistogram.ChunkEncodingWithStoreST(storeST), o) + c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, chunkenc.ValHistogram.ChunkEncoding(storeST), o) if !sampleInOrder { return sampleInOrder, chunkCreated } @@ -1930,7 +1928,7 @@ func (s *memSeries) appendFloatHistogram(storeST bool, st, t int64, fh *histogra // Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway. prevApp, _ := s.app.(*chunkenc.FloatHistogramAppender) - c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(storeST, t, chunkenc.ValFloatHistogram.ChunkEncodingWithStoreST(storeST), o) + c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, chunkenc.ValFloatHistogram.ChunkEncoding(storeST), o) if !sampleInOrder { return sampleInOrder, chunkCreated } @@ -1979,7 +1977,7 @@ func (s *memSeries) appendFloatHistogram(storeST bool, st, t int64, fh *histogra // number of samples they contain with a soft cap in bytes. // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. // This should be called only when appending data. -func (s *memSeries) appendPreprocessor(storeST bool, t int64, e chunkenc.Encoding, o chunkOpts) (c *memChunk, sampleInOrder, chunkCreated bool) { +func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts) (c *memChunk, sampleInOrder, chunkCreated bool) { // We target chunkenc.MaxBytesPerXORChunk as a hard for the size of an XOR chunk. We must determine whether to cut // a new head chunk without knowing the size of the next sample, however, so we assume the next sample will be a // maximally-sized sample (19 bytes). @@ -1993,7 +1991,7 @@ func (s *memSeries) appendPreprocessor(storeST bool, t int64, e chunkenc.Encodin return c, false, false } // There is no head chunk in this series yet, create the first chunk for the sample. - c = s.cutNewHeadChunk(storeST, t, e, o.chunkRange) + c = s.cutNewHeadChunk(t, e, o.chunkRange) chunkCreated = true } @@ -2004,14 +2002,14 @@ func (s *memSeries) appendPreprocessor(storeST bool, t int64, e chunkenc.Encodin // Check the chunk size, unless we just created it and if the chunk is too large, cut a new one. if !chunkCreated && len(c.chunk.Bytes()) > maxBytesPerXORChunk { - c = s.cutNewHeadChunk(storeST, t, e, o.chunkRange) + c = s.cutNewHeadChunk(t, e, o.chunkRange) chunkCreated = true } if c.chunk.Encoding() != e { // The chunk encoding expected by this append is different than the head chunk's // encoding. So we cut a new chunk with the expected encoding. - c = s.cutNewHeadChunk(storeST, t, e, o.chunkRange) + c = s.cutNewHeadChunk(t, e, o.chunkRange) chunkCreated = true } @@ -2036,7 +2034,7 @@ func (s *memSeries) appendPreprocessor(storeST bool, t int64, e chunkenc.Encodin // as we expect more chunks to come. // Note that next chunk will have its nextAt recalculated for the new rate. if t >= s.nextAt || numSamples >= o.samplesPerChunk*2 { - c = s.cutNewHeadChunk(storeST, t, e, o.chunkRange) + c = s.cutNewHeadChunk(t, e, o.chunkRange) chunkCreated = true } @@ -2047,7 +2045,7 @@ func (s *memSeries) appendPreprocessor(storeST bool, t int64, e chunkenc.Encodin // cut based on their size in bytes. // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. // This should be called only when appending data. -func (s *memSeries) histogramsAppendPreprocessor(storeST bool, t int64, e chunkenc.Encoding, o chunkOpts) (c *memChunk, sampleInOrder, chunkCreated bool) { +func (s *memSeries) histogramsAppendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts) (c *memChunk, sampleInOrder, chunkCreated bool) { c = s.headChunks if c == nil { @@ -2056,7 +2054,7 @@ func (s *memSeries) histogramsAppendPreprocessor(storeST bool, t int64, e chunke return c, false, false } // There is no head chunk in this series yet, create the first chunk for the sample. - c = s.cutNewHeadChunk(storeST, t, e, o.chunkRange) + c = s.cutNewHeadChunk(t, e, o.chunkRange) chunkCreated = true } @@ -2068,7 +2066,7 @@ func (s *memSeries) histogramsAppendPreprocessor(storeST bool, t int64, e chunke if c.chunk.Encoding() != e { // The chunk encoding expected by this append is different than the head chunk's // encoding. So we cut a new chunk with the expected encoding. - c = s.cutNewHeadChunk(storeST, t, e, o.chunkRange) + c = s.cutNewHeadChunk(t, e, o.chunkRange) chunkCreated = true } @@ -2109,7 +2107,7 @@ func (s *memSeries) histogramsAppendPreprocessor(storeST bool, t int64, e chunke // increased or if the bucket/span count has increased. // Note that next chunk will have its nextAt recalculated for the new rate. if (t >= s.nextAt || numBytes >= targetBytes*2) && (numSamples >= chunkenc.MinSamplesPerHistogramChunk || t >= nextChunkRangeStart) { - c = s.cutNewHeadChunk(storeST, t, e, o.chunkRange) + c = s.cutNewHeadChunk(t, e, o.chunkRange) chunkCreated = true } @@ -2134,7 +2132,7 @@ func computeChunkEndTime(start, cur, maxT int64, ratioToFull float64) int64 { return int64(float64(start) + float64(maxT-start)/math.Floor(n)) } -func (s *memSeries) cutNewHeadChunk(storeST bool, mint int64, e chunkenc.Encoding, chunkRange int64) *memChunk { +func (s *memSeries) cutNewHeadChunk(mint int64, e chunkenc.Encoding, chunkRange int64) *memChunk { // When cutting a new head chunk we create a new memChunk instance with .prev // pointing at the current .headChunks, so it forms a linked list. // All but first headChunks list elements will be m-mapped as soon as possible @@ -2147,16 +2145,12 @@ func (s *memSeries) cutNewHeadChunk(storeST bool, mint int64, e chunkenc.Encodin if chunkenc.IsValidEncoding(e) { var err error - s.headChunks.chunk, err = chunkenc.NewEmptyChunk(e, storeST) + s.headChunks.chunk, err = chunkenc.NewEmptyChunk(e) if err != nil { panic(err) // This should never happen. } } else { - var err error - s.headChunks.chunk, err = chunkenc.NewEmptyChunk(chunkenc.EncXOR, storeST) - if err != nil { - panic(err) // This should never happen. - } + s.headChunks.chunk = chunkenc.NewXORChunk() } // Set upper bound on when the next chunk must be started. An earlier timestamp diff --git a/tsdb/head_append_v2.go b/tsdb/head_append_v2.go index 87b62df536..769b55c262 100644 --- a/tsdb/head_append_v2.go +++ b/tsdb/head_append_v2.go @@ -95,6 +95,7 @@ func (h *Head) appenderV2() *headAppenderV2 { typesInBatch: h.getTypeMap(), appendID: appendID, cleanupAppendIDsBelow: cleanupAppendIDsBelow, + storeST: h.opts.EnableSTStorage.Load(), }, } } @@ -141,7 +142,7 @@ func (a *headAppenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t i } // TODO(bwplotka): Handle ST natively (as per PROM-60). - if a.head.opts.EnableSTAsZeroSample && st != 0 { + if st != 0 && a.head.opts.EnableSTAsZeroSample { a.bestEffortAppendSTZeroSample(s, ls, st, t, h, fh) } @@ -177,7 +178,7 @@ func (a *headAppenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t i // we do not need to check for the difference between "unknown // series" and "known series with stNone". } - appErr = a.appendFloat(s, t, v, opts.RejectOutOfOrder) + appErr = a.appendFloat(s, st, t, v, opts.RejectOutOfOrder) } // Handle append error, if any. if appErr != nil { @@ -218,7 +219,7 @@ func (a *headAppenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t i return storage.SeriesRef(s.ref), partialErr } -func (a *headAppenderV2) appendFloat(s *memSeries, t int64, v float64, fastRejectOOO bool) error { +func (a *headAppenderV2) appendFloat(s *memSeries, st, t int64, v float64, fastRejectOOO bool) error { s.Lock() // TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise // to skip that sample from the WAL and write only in the WBL. @@ -239,7 +240,7 @@ func (a *headAppenderV2) appendFloat(s *memSeries, t int64, v float64, fastRejec } b := a.getCurrentBatch(stFloat, s.ref) - b.floats = append(b.floats, record.RefSample{Ref: s.ref, T: t, V: v}) + b.floats = append(b.floats, record.RefSample{Ref: s.ref, ST: st, T: t, V: v}) b.floatSeries = append(b.floatSeries, s) return nil } @@ -366,7 +367,7 @@ func (a *headAppenderV2) bestEffortAppendSTZeroSample(s *memSeries, ls labels.La } err = a.appendHistogram(s, st, zeroHistogram, true) default: - err = a.appendFloat(s, st, 0, true) + err = a.appendFloat(s, 0, st, 0, true) } if err != nil { diff --git a/tsdb/head_append_v2_test.go b/tsdb/head_append_v2_test.go index cbac061f1a..4ab3144aef 100644 --- a/tsdb/head_append_v2_test.go +++ b/tsdb/head_append_v2_test.go @@ -2925,13 +2925,15 @@ func TestChunkSnapshotTakenAfterIncompleteSnapshot_AppenderV2(t *testing.T) { // TestWBLReplay checks the replay at a low level. func TestWBLReplay_AppenderV2(t *testing.T) { for name, scenario := range sampleTypeScenarios { - t.Run(name, func(t *testing.T) { - testWBLReplayAppenderV2(t, scenario) - }) + for _, enableSTstorage := range []bool{false, true} { + t.Run(fmt.Sprintf("%s/st-storage=%v", name, enableSTstorage), func(t *testing.T) { + testWBLReplayAppenderV2(t, scenario, enableSTstorage) + }) + } } } -func testWBLReplayAppenderV2(t *testing.T, scenario sampleTypeScenario) { +func testWBLReplayAppenderV2(t *testing.T, scenario sampleTypeScenario, enableSTstorage bool) { dir := t.TempDir() wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy) require.NoError(t, err) @@ -2942,6 +2944,7 @@ func testWBLReplayAppenderV2(t *testing.T, scenario sampleTypeScenario) { opts.ChunkRange = 1000 opts.ChunkDirRoot = dir opts.OutOfOrderTimeWindow.Store(30 * time.Minute.Milliseconds()) + opts.EnableSTStorage.Store(enableSTstorage) h, err := NewHead(nil, nil, wal, oooWlog, opts, nil) require.NoError(t, err) @@ -2993,7 +2996,7 @@ func testWBLReplayAppenderV2(t *testing.T, scenario sampleTypeScenario) { require.False(t, ok) require.NotNil(t, ms) - chks, err := ms.ooo.oooHeadChunk.chunk.ToEncodedChunks(false, math.MinInt64, math.MaxInt64) + chks, err := ms.ooo.oooHeadChunk.chunk.ToEncodedChunks(h.opts.EnableSTStorage.Load(), math.MinInt64, math.MaxInt64) require.NoError(t, err) require.Len(t, chks, 1) @@ -4754,3 +4757,138 @@ func TestHeadAppenderV2_Append_HistogramStalenessConversionMetrics(t *testing.T) }) } } + +// TestHeadAppender_STStorage verifies that when EnableSTStorage is true, +// start timestamps are properly stored in chunks and returned by queries. +// This test uses AppenderV2 which has native ST support. +func TestHeadAppenderV2_STStorage(t *testing.T) { + testHistogram := tsdbutil.GenerateTestHistogram(1) + testHistogram.CounterResetHint = histogram.NotCounterReset + + type sampleData struct { + st int64 + ts int64 + fSample float64 + h *histogram.Histogram + } + + testCases := []struct { + name string + samples []sampleData + expectedSTs []int64 // Expected ST values + isHistogram bool + }{ + { + name: "Float samples with ST", + samples: []sampleData{ + {st: 10, ts: 100, fSample: 1.0}, + {st: 20, ts: 200, fSample: 2.0}, + {st: 30, ts: 300, fSample: 3.0}, + }, + expectedSTs: []int64{10, 20, 30}, + isHistogram: false, + }, + { + name: "Float samples with varying ST", + samples: []sampleData{ + {st: 5, ts: 100, fSample: 1.0}, + {st: 5, ts: 200, fSample: 2.0}, // Same ST + {st: 150, ts: 300, fSample: 3.0}, // Different ST + }, + expectedSTs: []int64{5, 5, 150}, + isHistogram: false, + }, + { + name: "Histogram samples", + samples: []sampleData{ + {st: 10, ts: 100, h: testHistogram}, + {st: 20, ts: 200, h: testHistogram}, + {st: 30, ts: 300, h: testHistogram}, + }, + // Histograms don't support ST storage yet, should return 0 + expectedSTs: []int64{0, 0, 0}, + isHistogram: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + opts := newTestHeadDefaultOptions(DefaultBlockDuration, false) + opts.EnableSTStorage.Store(true) + h, _ := newTestHeadWithOptions(t, compression.None, opts) + + lbls := labels.FromStrings("foo", "bar") + + // Use AppenderV2 which has native ST support + a := h.AppenderV2(context.Background()) + for _, s := range tc.samples { + _, err := a.Append(0, lbls, s.st, s.ts, s.fSample, s.h, nil, storage.AOptions{}) + require.NoError(t, err) + } + require.NoError(t, a.Commit()) + + // Verify ST values are stored in chunks + ctx := context.Background() + idxReader, err := h.Index() + require.NoError(t, err) + defer idxReader.Close() + + chkReader, err := h.Chunks() + require.NoError(t, err) + defer chkReader.Close() + + p, err := idxReader.Postings(ctx, "foo", "bar") + require.NoError(t, err) + + var lblBuilder labels.ScratchBuilder + require.True(t, p.Next()) + sRef := p.At() + + var chkMetas []chunks.Meta + require.NoError(t, idxReader.Series(sRef, &lblBuilder, &chkMetas)) + + // Read chunks and verify ST values + var actualSTs []int64 + for _, meta := range chkMetas { + chk, iterable, err := chkReader.ChunkOrIterable(meta) + require.NoError(t, err) + require.Nil(t, iterable) + + it := chk.Iterator(nil) + for it.Next() != chunkenc.ValNone { + st := it.AtST() + actualSTs = append(actualSTs, st) + } + require.NoError(t, it.Err()) + } + + // Verify expected ST values + if tc.isHistogram { + require.Equal(t, tc.expectedSTs, actualSTs, "Histogram samples should return 0 for ST") + } else { + require.Equal(t, tc.expectedSTs, actualSTs, "Float samples should have ST stored") + } + + // Also verify via querier + q, err := NewBlockQuerier(h, math.MinInt64, math.MaxInt64) + require.NoError(t, err) + defer q.Close() + + ss := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + require.True(t, ss.Next()) + series := ss.At() + require.NoError(t, ss.Err()) + + seriesIt := series.Iterator(nil) + var queriedSTs []int64 + for seriesIt.Next() != chunkenc.ValNone { + st := seriesIt.AtST() + queriedSTs = append(queriedSTs, st) + } + require.NoError(t, seriesIt.Err()) + + // Verify querier returns same ST values + require.Equal(t, tc.expectedSTs, queriedSTs, "Querier should return same ST values as chunk iterator") + }) + } +} diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 211a2fec08..ed447c5d50 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -349,7 +349,7 @@ func BenchmarkLoadWLs(b *testing.B) { for k := 0; k < c.batches*c.seriesPerBatch; k++ { // Create one mmapped chunk per series, with one sample at the given time. s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, 0, defaultIsolationDisabled, false) - s.append(c.mmappedChunkT, 42, 0, cOpts) + s.append(false, 0, c.mmappedChunkT, 42, 0, cOpts) // There's only one head chunk because only a single sample is appended. mmapChunks() // ignores the latest chunk, so we need to cut a new head chunk to guarantee the chunk with // the sample at c.mmappedChunkT is mmapped. @@ -7450,3 +7450,137 @@ func TestHeadAppender_WBLEncoder_EnableSTStorage(t *testing.T) { }) } } + +// TestHeadAppender_STStorage_Disabled verifies that when EnableSTStorage is false, +// start timestamps are NOT stored in chunks (AtST returns 0). +func TestHeadAppender_STStorage_Disabled(t *testing.T) { + type sampleData struct { + st int64 + ts int64 + fSample float64 + } + + samples := []sampleData{ + {st: 10, ts: 100, fSample: 1.0}, + {st: 20, ts: 200, fSample: 2.0}, + {st: 30, ts: 300, fSample: 3.0}, + } + + opts := newTestHeadDefaultOptions(DefaultBlockDuration, false) + opts.EnableSTStorage.Store(false) // Explicitly disable ST storage + h, _ := newTestHeadWithOptions(t, compression.None, opts) + + lbls := labels.FromStrings("foo", "bar") + + // Use AppenderV2 to append samples with ST values + a := h.AppenderV2(context.Background()) + for _, s := range samples { + _, err := a.Append(0, lbls, s.st, s.ts, s.fSample, nil, nil, storage.AOptions{}) + require.NoError(t, err) + } + require.NoError(t, a.Commit()) + + // Verify ST values are NOT stored (should all be 0) + ctx := context.Background() + idxReader, err := h.Index() + require.NoError(t, err) + defer idxReader.Close() + + chkReader, err := h.Chunks() + require.NoError(t, err) + defer chkReader.Close() + + p, err := idxReader.Postings(ctx, "foo", "bar") + require.NoError(t, err) + + var lblBuilder labels.ScratchBuilder + require.True(t, p.Next()) + sRef := p.At() + + var chkMetas []chunks.Meta + require.NoError(t, idxReader.Series(sRef, &lblBuilder, &chkMetas)) + + // Read chunks and verify all ST values are 0 + for _, meta := range chkMetas { + chk, iterable, err := chkReader.ChunkOrIterable(meta) + require.NoError(t, err) + require.Nil(t, iterable) + + it := chk.Iterator(nil) + for it.Next() != chunkenc.ValNone { + st := it.AtST() + require.Equal(t, int64(0), st, "ST should be 0 when EnableSTStorage is false") + } + require.NoError(t, it.Err()) + } +} + +// TestHeadAppender_STStorage_ChunkEncoding verifies that the correct chunk encoding +// is used based on EnableSTStorage setting. +func TestHeadAppender_STStorage_ChunkEncoding(t *testing.T) { + samples := []struct { + st int64 + ts int64 + fSample float64 + }{ + {st: 10, ts: 100, fSample: 1.0}, + {st: 20, ts: 200, fSample: 2.0}, + } + + for _, enableST := range []bool{false, true} { + t.Run(fmt.Sprintf("EnableSTStorage=%t", enableST), func(t *testing.T) { + opts := newTestHeadDefaultOptions(DefaultBlockDuration, false) + opts.EnableSTStorage.Store(enableST) + h, _ := newTestHeadWithOptions(t, compression.None, opts) + + lbls := labels.FromStrings("foo", "bar") + a := h.Appender(context.Background()) + for _, s := range samples { + _, err := a.AppendSTZeroSample(0, lbls, s.ts, s.st) + require.NoError(t, err) + _, err = a.Append(0, lbls, s.ts, s.fSample) + require.NoError(t, err) + } + require.NoError(t, a.Commit()) + + // Check chunk encoding + ctx := context.Background() + idxReader, err := h.Index() + require.NoError(t, err) + defer idxReader.Close() + + chkReader, err := h.Chunks() + require.NoError(t, err) + defer chkReader.Close() + + p, err := idxReader.Postings(ctx, "foo", "bar") + require.NoError(t, err) + + var lblBuilder labels.ScratchBuilder + require.True(t, p.Next()) + sRef := p.At() + + var chkMetas []chunks.Meta + require.NoError(t, idxReader.Series(sRef, &lblBuilder, &chkMetas)) + require.NotEmpty(t, chkMetas) + + // Verify encoding + for _, meta := range chkMetas { + chk, iterable, err := chkReader.ChunkOrIterable(meta) + require.NoError(t, err) + require.Nil(t, iterable) + + encoding := chk.Encoding() + if enableST { + // Should use ST-capable encoding + require.Equal(t, chunkenc.EncXOROptST, encoding, + "Expected ST-capable encoding when EnableSTStorage is true") + } else { + // Should use regular XOR encoding + require.Equal(t, chunkenc.EncXOR, encoding, + "Expected regular XOR encoding when EnableSTStorage is false") + } + } + }) + } +} diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index cb9916067f..174d428b58 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -669,8 +669,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp h.numStaleSeries.Dec() } - // TODO(krajorama,ywwg): Pass ST when available in WBL. - if _, chunkCreated := ms.append(wp.storeST, 0, s.T, s.V, 0, appendChunkOpts); chunkCreated { + if _, chunkCreated := ms.append(wp.storeST, s.ST, s.T, s.V, 0, appendChunkOpts); chunkCreated { h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() _ = ms.mmapChunks(h.chunkDiskMapper) @@ -1105,8 +1104,7 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (map[chunks.HeadSeriesR missingSeries[s.Ref] = struct{}{} continue } - // TODO(krajorama,ywwg): Pass ST when available in WBL. - ok, chunkCreated, _ := ms.insert(wp.storeST, 0, s.T, s.V, nil, nil, h.chunkDiskMapper, oooCapMax, h.logger) + ok, chunkCreated, _ := ms.insert(wp.storeST, s.ST, s.T, s.V, nil, nil, h.chunkDiskMapper, oooCapMax, h.logger) if chunkCreated { h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() diff --git a/tsdb/ooo_head.go b/tsdb/ooo_head.go index 5a8252144b..40c4210abc 100644 --- a/tsdb/ooo_head.go +++ b/tsdb/ooo_head.go @@ -93,7 +93,7 @@ func (o *OOOChunk) ToEncodedChunks(storeST bool, mint, maxt int64) (chks []memCh if s.t > maxt { break } - encoding := chunkenc.EncXOR + encoding := chunkenc.ValFloat.ChunkEncoding(storeST) switch { case s.h != nil: // TODO(krajorama): use ST capable histogram chunk. @@ -101,10 +101,6 @@ func (o *OOOChunk) ToEncodedChunks(storeST bool, mint, maxt int64) (chks []memCh case s.fh != nil: // TODO(krajorama): use ST capable float histogram chunk. encoding = chunkenc.EncFloatHistogram - default: - if storeST { - encoding = chunkenc.EncXOROptST - } } // prevApp is the appender for the previous sample. @@ -115,7 +111,7 @@ func (o *OOOChunk) ToEncodedChunks(storeST bool, mint, maxt int64) (chks []memCh chks = append(chks, memChunk{chunk, cmint, cmaxt, nil}) } cmint = s.t - chunk, err = chunkenc.NewEmptyChunk(encoding, storeST) + chunk, err = chunkenc.NewEmptyChunk(encoding) if err != nil { // This should never happen. No point using a default type as // calling the wrong append function would panic. diff --git a/tsdb/querier.go b/tsdb/querier.go index 7f2492f846..461b09a7f2 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -1012,9 +1012,7 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool { p.chunksFromIterable = append(p.chunksFromIterable, chunks.Meta{Chunk: currentChunk, MinTime: cmint, MaxTime: cmaxt}) } cmint = p.currDelIter.AtT() - // Note: we're passing false for storeST, because we set the - // encoding explicitly. - if currentChunk, err = chunkenc.NewEmptyChunk(encoding, false); err != nil { + if currentChunk, err = chunkenc.NewEmptyChunk(encoding); err != nil { break } if app, err = currentChunk.Appender(); err != nil { diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index dc443bfc5e..46a8bc6127 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -765,7 +765,7 @@ type mockSampleIterator struct { } func (it *mockSampleIterator) Encoding() chunkenc.Encoding { - return it.s[it.idx].Type().ChunkEncodingWithST(it.s[it.idx].ST()) + return it.s[it.idx].Type().ChunkEncoding(it.s[it.idx].ST() != 0) } func (it *mockSampleIterator) Seek(t int64) chunkenc.ValueType {