feat(tsdb): adopt head append changes from 18026

Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
György Krajcsovits 2026-02-18 18:51:33 +01:00
parent d2a17e7cb3
commit 7991bcbff9
No known key found for this signature in database
GPG key ID: 47A8F9CE80FD7C7F
15 changed files with 554 additions and 102 deletions

View file

@ -115,7 +115,7 @@ func NewListSeriesIterator(samples Samples) chunkenc.Iterator {
func (it *listSeriesIterator) Encoding() chunkenc.Encoding {
s := it.samples.Get(it.idx)
encoding := s.Type().ChunkEncodingWithST(s.ST())
encoding := s.Type().ChunkEncoding(s.ST() != 0)
if encoding == chunkenc.EncNone {
panic(fmt.Sprintf("unknown sample type %s", s.Type().String()))
}
@ -357,7 +357,7 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator {
if typ != lastType || lastHadST != hasST || i >= seriesToChunkEncoderSplit {
// Create a new chunk if the sample type changed or too many samples in the current one.
chks = appendChunk(chks, mint, maxt, chk)
chk, err = chunkenc.NewEmptyChunk(typ.ChunkEncoding(), hasST)
chk, err = typ.NewChunk(hasST)
if err != nil {
return errChunksIterator{err: err}
}

View file

@ -196,36 +196,7 @@ func (v ValueType) String() string {
}
}
func (v ValueType) ChunkEncoding() Encoding {
switch v {
case ValFloat:
return EncXOR
case ValHistogram:
return EncHistogram
case ValFloatHistogram:
return EncFloatHistogram
default:
return EncNone
}
}
func (v ValueType) ChunkEncodingWithST(st int64) Encoding {
switch v {
case ValFloat:
if st != 0 {
return EncXOROptST
}
return EncXOR
case ValHistogram:
return EncHistogram
case ValFloatHistogram:
return EncFloatHistogram
default:
return EncNone
}
}
func (v ValueType) ChunkEncodingWithStoreST(storeST bool) Encoding {
func (v ValueType) ChunkEncoding(storeST bool) Encoding {
switch v {
case ValFloat:
if storeST {
@ -242,21 +213,7 @@ func (v ValueType) ChunkEncodingWithStoreST(storeST bool) Encoding {
}
func (v ValueType) NewChunk(storeST bool) (Chunk, error) {
switch v {
case ValFloat:
if storeST {
return NewXOROptSTChunk(), nil
}
return NewXORChunk(), nil
case ValHistogram:
// TODO(krajorama): return a ST capable histogram chunk when they are supported.
return NewHistogramChunk(), nil
case ValFloatHistogram:
// TODO(krajorama): return a ST capable float histogram chunk when they are supported.
return NewFloatHistogramChunk(), nil
default:
return nil, fmt.Errorf("value type %v unsupported", v)
}
return NewEmptyChunk(v.ChunkEncoding(storeST))
}
// MockSeriesIterator returns an iterator for a mock series with custom
@ -451,12 +408,9 @@ func FromData(e Encoding, d []byte) (Chunk, error) {
}
// NewEmptyChunk returns an empty chunk for the given encoding.
func NewEmptyChunk(e Encoding, storeST bool) (Chunk, error) {
func NewEmptyChunk(e Encoding) (Chunk, error) {
switch e {
case EncXOR:
if storeST {
return NewXOROptSTChunk(), nil
}
return NewXORChunk(), nil
case EncHistogram:
return NewHistogramChunk(), nil

View file

@ -166,7 +166,7 @@ func ChunkFromSamplesGeneric(s Samples) (Meta, error) {
}
// Request storing ST in the chunk if available.
c, err := chunkenc.NewEmptyChunk(sampleType.ChunkEncoding(), hasST)
c, err := sampleType.NewChunk(hasST)
if err != nil {
return Meta{}, err
}

View file

@ -1044,6 +1044,7 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn
headOpts.OutOfOrderCapMax.Store(opts.OutOfOrderCapMax)
headOpts.EnableSharding = opts.EnableSharding
headOpts.EnableSTAsZeroSample = opts.EnableSTAsZeroSample
headOpts.EnableSTStorage.Store(opts.EnableSTStorage)
headOpts.EnableMetadataWALRecords = opts.EnableMetadataWALRecords
if opts.WALReplayConcurrency > 0 {
headOpts.WALReplayConcurrency = opts.WALReplayConcurrency

View file

@ -7512,6 +7512,70 @@ func TestAbortBlockCompactions_AppendV2(t *testing.T) {
require.Equal(t, 4, compactions, "expected 4 compactions to be completed")
}
// TestCompactHeadWithSTStorage_AppendV2 ensures that when EnableSTStorage is true,
// compacted blocks contain chunks with EncXOROptST encoding for float samples.
func TestCompactHeadWithSTStorage_AppendV2(t *testing.T) {
t.Parallel()
opts := &Options{
RetentionDuration: int64(time.Hour * 24 * 15 / time.Millisecond),
NoLockfile: true,
MinBlockDuration: int64(time.Hour * 2 / time.Millisecond),
MaxBlockDuration: int64(time.Hour * 2 / time.Millisecond),
WALCompression: compression.Snappy,
EnableSTStorage: true,
}
db := newTestDB(t, withOpts(opts))
ctx := context.Background()
app := db.AppenderV2(ctx)
maxt := 100
for i := range maxt {
// AppendV2 signature: (ref, labels, st, t, v, h, fh, opts)
// st=0 (start timestamp), t=i (sample timestamp)
// TODO(krajorama): verify with non zero st once the API supports it.
_, err := app.Append(0, labels.FromStrings("a", "b"), 0, int64(i), float64(i), nil, nil, storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, app.Commit())
// Compact the Head to create a new block.
require.NoError(t, db.CompactHead(NewRangeHead(db.Head(), 0, int64(maxt)-1)))
// Check that we have exactly one block.
require.Len(t, db.Blocks(), 1)
b := db.Blocks()[0]
// Open chunk reader and index reader.
chunkr, err := b.Chunks()
require.NoError(t, err)
defer chunkr.Close()
indexr, err := b.Index()
require.NoError(t, err)
defer indexr.Close()
// Get postings for the series.
p, err := indexr.Postings(ctx, "a", "b")
require.NoError(t, err)
chunkCount := 0
for p.Next() {
var builder labels.ScratchBuilder
var chks []chunks.Meta
require.NoError(t, indexr.Series(p.At(), &builder, &chks))
for _, chk := range chks {
c, _, err := chunkr.ChunkOrIterable(chk)
require.NoError(t, err)
require.Equal(t, chunkenc.EncXOROptST, c.Encoding(),
"expected EncXOROptST encoding when EnableSTStorage=true, got %s", c.Encoding())
chunkCount++
}
}
require.NoError(t, p.Err())
require.Positive(t, chunkCount, "expected at least one chunk")
}
func TestNewCompactorFunc_AppendV2(t *testing.T) {
opts := DefaultOptions()
block1 := ulid.MustNew(1, nil)
@ -7543,3 +7607,114 @@ func TestNewCompactorFunc_AppendV2(t *testing.T) {
require.Len(t, ulids, 1)
require.Equal(t, block2, ulids[0])
}
// TestDBAppenderV2_STStorage_OutOfOrder verifies that ST storage works correctly
// when samples are appended out of order and can be queried using ChunkQuerier.
func TestDBAppenderV2_STStorage_OutOfOrder(t *testing.T) {
testHistogram := tsdbutil.GenerateTestHistogram(1)
testHistogram.CounterResetHint = histogram.NotCounterReset
testCases := []struct {
name string
appendSamples []chunks.Sample // Samples in append order (out of order)
expectedSamples []chunks.Sample // Expected samples in time order after query
}{
{
name: "Float samples out of order",
appendSamples: []chunks.Sample{
newSample(20, 200, 2.0, nil, nil), // Append second sample first
newSample(10, 100, 1.0, nil, nil), // Append first sample second (OOO)
newSample(30, 300, 3.0, nil, nil), // Append third sample last
newSample(25, 250, 2.5, nil, nil), // Append middle sample (OOO)
},
expectedSamples: []chunks.Sample{
newSample(10, 100, 1.0, nil, nil),
newSample(20, 200, 2.0, nil, nil),
newSample(25, 250, 2.5, nil, nil),
newSample(30, 300, 3.0, nil, nil),
},
},
{
name: "Histogram samples out of order",
appendSamples: []chunks.Sample{
newSample(30, 300, 0, testHistogram, nil), // Append third sample first
newSample(10, 100, 0, testHistogram, nil), // Append first sample second (OOO)
newSample(20, 200, 0, testHistogram, nil), // Append second sample last (OOO)
},
// Histograms don't support ST storage yet, should return 0 for ST
expectedSamples: []chunks.Sample{
newSample(0, 100, 0, testHistogram, nil),
newSample(0, 200, 0, testHistogram, nil),
newSample(0, 300, 0, testHistogram, nil),
},
},
{
name: "Mixed float samples with same ST",
appendSamples: []chunks.Sample{
newSample(10, 200, 2.0, nil, nil),
newSample(10, 100, 1.0, nil, nil), // OOO with same ST
newSample(10, 300, 3.0, nil, nil),
},
expectedSamples: []chunks.Sample{
newSample(10, 100, 1.0, nil, nil),
newSample(10, 200, 2.0, nil, nil),
newSample(10, 300, 3.0, nil, nil),
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
opts := DefaultOptions()
opts.OutOfOrderTimeWindow = 300 * time.Minute.Milliseconds()
opts.EnableSTStorage = true
db := newTestDB(t, withOpts(opts))
db.DisableCompactions()
lbls := labels.FromStrings("foo", "bar")
// Append samples in the specified (out of order) sequence
for _, s := range tc.appendSamples {
app := db.AppenderV2(context.Background())
_, err := app.Append(0, lbls, s.ST(), s.T(), s.F(), s.H(), s.FH(), storage.AOptions{})
require.NoError(t, err, "Appending OOO sample with ST should succeed")
require.NoError(t, app.Commit(), "Committing OOO sample with ST should succeed")
}
// Query using ChunkQuerier to verify ST values
querier, err := db.ChunkQuerier(math.MinInt64, math.MaxInt64)
require.NoError(t, err)
defer querier.Close()
ss := querier.Select(context.Background(), false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
require.True(t, ss.Next(), "Should have series")
series := ss.At()
require.NoError(t, ss.Err())
require.False(t, ss.Next(), "Should have only one series")
// Iterate through chunks and collect samples using storage.ExpandSamples
chunkIt := series.Iterator(nil)
var actualSamples []chunks.Sample
for chunkIt.Next() {
chk := chunkIt.At()
it := chk.Chunk.Iterator(nil)
samples, err := storage.ExpandSamples(it, newSample)
require.NoError(t, err)
actualSamples = append(actualSamples, samples...)
}
require.NoError(t, chunkIt.Err())
// Verify samples are in time order with correct values
// Use requireEqualSamplesIgnoreCounterResets to ignore histogram counter reset hints
requireEqualSamples(t, lbls.String(), tc.expectedSamples, actualSamples, requireEqualSamplesIgnoreCounterResets)
// Additionally verify ST values match expectations
require.Len(t, actualSamples, len(tc.expectedSamples))
for i, expected := range tc.expectedSamples {
actual := actualSamples[i]
require.Equal(t, expected.ST(), actual.ST(), "Sample %d: ST should match", i)
}
})
}
}

View file

@ -9626,3 +9626,66 @@ func TestStaleSeriesCompactionWithZeroSeries(t *testing.T) {
// Should still have no blocks since there was nothing to compact.
require.Empty(t, db.Blocks())
}
// TestCompactHeadWithSTStorage ensures that when EnableSTStorage is true,
// compacted blocks contain chunks with EncXOR encoding for float samples
// when using the original Appender (which does not support start timestamps).
func TestCompactHeadWithSTStorage(t *testing.T) {
t.Parallel()
opts := &Options{
RetentionDuration: int64(time.Hour * 24 * 15 / time.Millisecond),
NoLockfile: true,
MinBlockDuration: int64(time.Hour * 2 / time.Millisecond),
MaxBlockDuration: int64(time.Hour * 2 / time.Millisecond),
WALCompression: compression.Snappy,
EnableSTStorage: true,
}
db := newTestDB(t, withOpts(opts))
ctx := context.Background()
app := db.Appender(ctx)
maxt := 100
for i := range maxt {
// Original Appender signature: (ref, labels, t, v)
_, err := app.Append(0, labels.FromStrings("a", "b"), int64(i), float64(i))
require.NoError(t, err)
}
require.NoError(t, app.Commit())
// Compact the Head to create a new block.
require.NoError(t, db.CompactHead(NewRangeHead(db.Head(), 0, int64(maxt)-1)))
// Check that we have exactly one block.
require.Len(t, db.Blocks(), 1)
b := db.Blocks()[0]
// Open chunk reader and index reader.
chunkr, err := b.Chunks()
require.NoError(t, err)
defer chunkr.Close()
indexr, err := b.Index()
require.NoError(t, err)
defer indexr.Close()
// Get postings for the series.
p, err := indexr.Postings(ctx, "a", "b")
require.NoError(t, err)
chunkCount := 0
for p.Next() {
var builder labels.ScratchBuilder
var chks []chunks.Meta
require.NoError(t, indexr.Series(p.At(), &builder, &chks))
for _, chk := range chks {
c, _, err := chunkr.ChunkOrIterable(chk)
require.NoError(t, err)
require.Equal(t, chunkenc.EncXOROptST, c.Encoding(),
"expected EncXOR encoding when using original Appender, got %s", c.Encoding())
chunkCount++
}
}
require.NoError(t, p.Err())
require.Positive(t, chunkCount, "expected at least one chunk")
}

View file

@ -161,8 +161,8 @@ type HeadOptions struct {
OutOfOrderCapMax atomic.Int64
// EnableSTStorage determines whether databases (WAL/WBL, tsdb,
// agent) should set a Start Time value per sample. Currently not
// user-settable and only set in tests.
// agent) should set a Start Time value per sample.
// Represents 'st-storage' feature flag.
EnableSTStorage atomic.Bool
ChunkRange int64

View file

@ -1389,8 +1389,7 @@ func (a *headAppenderBase) commitFloats(b *appendBatch, acc *appenderCommitConte
// Sample is OOO and OOO handling is enabled
// and the delta is within the OOO tolerance.
var mmapRefs []chunks.ChunkDiskMapperRef
// TODO(krajorama,ywwg): Pass ST when available in WAL.
ok, chunkCreated, mmapRefs = series.insert(a.storeST, 0, s.T, s.V, nil, nil, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger)
ok, chunkCreated, mmapRefs = series.insert(a.storeST, s.ST, s.T, s.V, nil, nil, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger)
if chunkCreated {
r, ok := acc.oooMmapMarkers[series.ref]
if !ok || r != nil {
@ -1434,8 +1433,7 @@ func (a *headAppenderBase) commitFloats(b *appendBatch, acc *appenderCommitConte
default:
newlyStale := !value.IsStaleNaN(series.lastValue) && value.IsStaleNaN(s.V)
staleToNonStale := value.IsStaleNaN(series.lastValue) && !value.IsStaleNaN(s.V)
// TODO(krajorama,ywwg): pass ST when available in WAL.
ok, chunkCreated = series.append(a.storeST, 0, s.T, s.V, a.appendID, acc.appendChunkOpts)
ok, chunkCreated = series.append(a.storeST, s.ST, s.T, s.V, a.appendID, acc.appendChunkOpts)
if ok {
if s.T < acc.inOrderMint {
acc.inOrderMint = s.T
@ -1842,7 +1840,7 @@ type chunkOpts struct {
// isolation for this append.)
// Series lock must be held when calling.
func (s *memSeries) append(storeST bool, st, t int64, v float64, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
c, sampleInOrder, chunkCreated := s.appendPreprocessor(storeST, t, chunkenc.ValFloat.ChunkEncodingWithStoreST(storeST), o)
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.ValFloat.ChunkEncoding(storeST), o)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
@ -1873,7 +1871,7 @@ func (s *memSeries) appendHistogram(storeST bool, st, t int64, h *histogram.Hist
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
prevApp, _ := s.app.(*chunkenc.HistogramAppender)
c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(storeST, t, chunkenc.ValHistogram.ChunkEncodingWithStoreST(storeST), o)
c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, chunkenc.ValHistogram.ChunkEncoding(storeST), o)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
@ -1930,7 +1928,7 @@ func (s *memSeries) appendFloatHistogram(storeST bool, st, t int64, fh *histogra
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
prevApp, _ := s.app.(*chunkenc.FloatHistogramAppender)
c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(storeST, t, chunkenc.ValFloatHistogram.ChunkEncodingWithStoreST(storeST), o)
c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, chunkenc.ValFloatHistogram.ChunkEncoding(storeST), o)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
@ -1979,7 +1977,7 @@ func (s *memSeries) appendFloatHistogram(storeST bool, st, t int64, fh *histogra
// number of samples they contain with a soft cap in bytes.
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
// This should be called only when appending data.
func (s *memSeries) appendPreprocessor(storeST bool, t int64, e chunkenc.Encoding, o chunkOpts) (c *memChunk, sampleInOrder, chunkCreated bool) {
func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts) (c *memChunk, sampleInOrder, chunkCreated bool) {
// We target chunkenc.MaxBytesPerXORChunk as a hard for the size of an XOR chunk. We must determine whether to cut
// a new head chunk without knowing the size of the next sample, however, so we assume the next sample will be a
// maximally-sized sample (19 bytes).
@ -1993,7 +1991,7 @@ func (s *memSeries) appendPreprocessor(storeST bool, t int64, e chunkenc.Encodin
return c, false, false
}
// There is no head chunk in this series yet, create the first chunk for the sample.
c = s.cutNewHeadChunk(storeST, t, e, o.chunkRange)
c = s.cutNewHeadChunk(t, e, o.chunkRange)
chunkCreated = true
}
@ -2004,14 +2002,14 @@ func (s *memSeries) appendPreprocessor(storeST bool, t int64, e chunkenc.Encodin
// Check the chunk size, unless we just created it and if the chunk is too large, cut a new one.
if !chunkCreated && len(c.chunk.Bytes()) > maxBytesPerXORChunk {
c = s.cutNewHeadChunk(storeST, t, e, o.chunkRange)
c = s.cutNewHeadChunk(t, e, o.chunkRange)
chunkCreated = true
}
if c.chunk.Encoding() != e {
// The chunk encoding expected by this append is different than the head chunk's
// encoding. So we cut a new chunk with the expected encoding.
c = s.cutNewHeadChunk(storeST, t, e, o.chunkRange)
c = s.cutNewHeadChunk(t, e, o.chunkRange)
chunkCreated = true
}
@ -2036,7 +2034,7 @@ func (s *memSeries) appendPreprocessor(storeST bool, t int64, e chunkenc.Encodin
// as we expect more chunks to come.
// Note that next chunk will have its nextAt recalculated for the new rate.
if t >= s.nextAt || numSamples >= o.samplesPerChunk*2 {
c = s.cutNewHeadChunk(storeST, t, e, o.chunkRange)
c = s.cutNewHeadChunk(t, e, o.chunkRange)
chunkCreated = true
}
@ -2047,7 +2045,7 @@ func (s *memSeries) appendPreprocessor(storeST bool, t int64, e chunkenc.Encodin
// cut based on their size in bytes.
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
// This should be called only when appending data.
func (s *memSeries) histogramsAppendPreprocessor(storeST bool, t int64, e chunkenc.Encoding, o chunkOpts) (c *memChunk, sampleInOrder, chunkCreated bool) {
func (s *memSeries) histogramsAppendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts) (c *memChunk, sampleInOrder, chunkCreated bool) {
c = s.headChunks
if c == nil {
@ -2056,7 +2054,7 @@ func (s *memSeries) histogramsAppendPreprocessor(storeST bool, t int64, e chunke
return c, false, false
}
// There is no head chunk in this series yet, create the first chunk for the sample.
c = s.cutNewHeadChunk(storeST, t, e, o.chunkRange)
c = s.cutNewHeadChunk(t, e, o.chunkRange)
chunkCreated = true
}
@ -2068,7 +2066,7 @@ func (s *memSeries) histogramsAppendPreprocessor(storeST bool, t int64, e chunke
if c.chunk.Encoding() != e {
// The chunk encoding expected by this append is different than the head chunk's
// encoding. So we cut a new chunk with the expected encoding.
c = s.cutNewHeadChunk(storeST, t, e, o.chunkRange)
c = s.cutNewHeadChunk(t, e, o.chunkRange)
chunkCreated = true
}
@ -2109,7 +2107,7 @@ func (s *memSeries) histogramsAppendPreprocessor(storeST bool, t int64, e chunke
// increased or if the bucket/span count has increased.
// Note that next chunk will have its nextAt recalculated for the new rate.
if (t >= s.nextAt || numBytes >= targetBytes*2) && (numSamples >= chunkenc.MinSamplesPerHistogramChunk || t >= nextChunkRangeStart) {
c = s.cutNewHeadChunk(storeST, t, e, o.chunkRange)
c = s.cutNewHeadChunk(t, e, o.chunkRange)
chunkCreated = true
}
@ -2134,7 +2132,7 @@ func computeChunkEndTime(start, cur, maxT int64, ratioToFull float64) int64 {
return int64(float64(start) + float64(maxT-start)/math.Floor(n))
}
func (s *memSeries) cutNewHeadChunk(storeST bool, mint int64, e chunkenc.Encoding, chunkRange int64) *memChunk {
func (s *memSeries) cutNewHeadChunk(mint int64, e chunkenc.Encoding, chunkRange int64) *memChunk {
// When cutting a new head chunk we create a new memChunk instance with .prev
// pointing at the current .headChunks, so it forms a linked list.
// All but first headChunks list elements will be m-mapped as soon as possible
@ -2147,16 +2145,12 @@ func (s *memSeries) cutNewHeadChunk(storeST bool, mint int64, e chunkenc.Encodin
if chunkenc.IsValidEncoding(e) {
var err error
s.headChunks.chunk, err = chunkenc.NewEmptyChunk(e, storeST)
s.headChunks.chunk, err = chunkenc.NewEmptyChunk(e)
if err != nil {
panic(err) // This should never happen.
}
} else {
var err error
s.headChunks.chunk, err = chunkenc.NewEmptyChunk(chunkenc.EncXOR, storeST)
if err != nil {
panic(err) // This should never happen.
}
s.headChunks.chunk = chunkenc.NewXORChunk()
}
// Set upper bound on when the next chunk must be started. An earlier timestamp

View file

@ -95,6 +95,7 @@ func (h *Head) appenderV2() *headAppenderV2 {
typesInBatch: h.getTypeMap(),
appendID: appendID,
cleanupAppendIDsBelow: cleanupAppendIDsBelow,
storeST: h.opts.EnableSTStorage.Load(),
},
}
}
@ -141,7 +142,7 @@ func (a *headAppenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t i
}
// TODO(bwplotka): Handle ST natively (as per PROM-60).
if a.head.opts.EnableSTAsZeroSample && st != 0 {
if st != 0 && a.head.opts.EnableSTAsZeroSample {
a.bestEffortAppendSTZeroSample(s, ls, st, t, h, fh)
}
@ -177,7 +178,7 @@ func (a *headAppenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t i
// we do not need to check for the difference between "unknown
// series" and "known series with stNone".
}
appErr = a.appendFloat(s, t, v, opts.RejectOutOfOrder)
appErr = a.appendFloat(s, st, t, v, opts.RejectOutOfOrder)
}
// Handle append error, if any.
if appErr != nil {
@ -218,7 +219,7 @@ func (a *headAppenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t i
return storage.SeriesRef(s.ref), partialErr
}
func (a *headAppenderV2) appendFloat(s *memSeries, t int64, v float64, fastRejectOOO bool) error {
func (a *headAppenderV2) appendFloat(s *memSeries, st, t int64, v float64, fastRejectOOO bool) error {
s.Lock()
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
// to skip that sample from the WAL and write only in the WBL.
@ -239,7 +240,7 @@ func (a *headAppenderV2) appendFloat(s *memSeries, t int64, v float64, fastRejec
}
b := a.getCurrentBatch(stFloat, s.ref)
b.floats = append(b.floats, record.RefSample{Ref: s.ref, T: t, V: v})
b.floats = append(b.floats, record.RefSample{Ref: s.ref, ST: st, T: t, V: v})
b.floatSeries = append(b.floatSeries, s)
return nil
}
@ -366,7 +367,7 @@ func (a *headAppenderV2) bestEffortAppendSTZeroSample(s *memSeries, ls labels.La
}
err = a.appendHistogram(s, st, zeroHistogram, true)
default:
err = a.appendFloat(s, st, 0, true)
err = a.appendFloat(s, 0, st, 0, true)
}
if err != nil {

View file

@ -2925,13 +2925,15 @@ func TestChunkSnapshotTakenAfterIncompleteSnapshot_AppenderV2(t *testing.T) {
// TestWBLReplay checks the replay at a low level.
func TestWBLReplay_AppenderV2(t *testing.T) {
for name, scenario := range sampleTypeScenarios {
t.Run(name, func(t *testing.T) {
testWBLReplayAppenderV2(t, scenario)
})
for _, enableSTstorage := range []bool{false, true} {
t.Run(fmt.Sprintf("%s/st-storage=%v", name, enableSTstorage), func(t *testing.T) {
testWBLReplayAppenderV2(t, scenario, enableSTstorage)
})
}
}
}
func testWBLReplayAppenderV2(t *testing.T, scenario sampleTypeScenario) {
func testWBLReplayAppenderV2(t *testing.T, scenario sampleTypeScenario, enableSTstorage bool) {
dir := t.TempDir()
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy)
require.NoError(t, err)
@ -2942,6 +2944,7 @@ func testWBLReplayAppenderV2(t *testing.T, scenario sampleTypeScenario) {
opts.ChunkRange = 1000
opts.ChunkDirRoot = dir
opts.OutOfOrderTimeWindow.Store(30 * time.Minute.Milliseconds())
opts.EnableSTStorage.Store(enableSTstorage)
h, err := NewHead(nil, nil, wal, oooWlog, opts, nil)
require.NoError(t, err)
@ -2993,7 +2996,7 @@ func testWBLReplayAppenderV2(t *testing.T, scenario sampleTypeScenario) {
require.False(t, ok)
require.NotNil(t, ms)
chks, err := ms.ooo.oooHeadChunk.chunk.ToEncodedChunks(false, math.MinInt64, math.MaxInt64)
chks, err := ms.ooo.oooHeadChunk.chunk.ToEncodedChunks(h.opts.EnableSTStorage.Load(), math.MinInt64, math.MaxInt64)
require.NoError(t, err)
require.Len(t, chks, 1)
@ -4754,3 +4757,138 @@ func TestHeadAppenderV2_Append_HistogramStalenessConversionMetrics(t *testing.T)
})
}
}
// TestHeadAppender_STStorage verifies that when EnableSTStorage is true,
// start timestamps are properly stored in chunks and returned by queries.
// This test uses AppenderV2 which has native ST support.
func TestHeadAppenderV2_STStorage(t *testing.T) {
testHistogram := tsdbutil.GenerateTestHistogram(1)
testHistogram.CounterResetHint = histogram.NotCounterReset
type sampleData struct {
st int64
ts int64
fSample float64
h *histogram.Histogram
}
testCases := []struct {
name string
samples []sampleData
expectedSTs []int64 // Expected ST values
isHistogram bool
}{
{
name: "Float samples with ST",
samples: []sampleData{
{st: 10, ts: 100, fSample: 1.0},
{st: 20, ts: 200, fSample: 2.0},
{st: 30, ts: 300, fSample: 3.0},
},
expectedSTs: []int64{10, 20, 30},
isHistogram: false,
},
{
name: "Float samples with varying ST",
samples: []sampleData{
{st: 5, ts: 100, fSample: 1.0},
{st: 5, ts: 200, fSample: 2.0}, // Same ST
{st: 150, ts: 300, fSample: 3.0}, // Different ST
},
expectedSTs: []int64{5, 5, 150},
isHistogram: false,
},
{
name: "Histogram samples",
samples: []sampleData{
{st: 10, ts: 100, h: testHistogram},
{st: 20, ts: 200, h: testHistogram},
{st: 30, ts: 300, h: testHistogram},
},
// Histograms don't support ST storage yet, should return 0
expectedSTs: []int64{0, 0, 0},
isHistogram: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
opts := newTestHeadDefaultOptions(DefaultBlockDuration, false)
opts.EnableSTStorage.Store(true)
h, _ := newTestHeadWithOptions(t, compression.None, opts)
lbls := labels.FromStrings("foo", "bar")
// Use AppenderV2 which has native ST support
a := h.AppenderV2(context.Background())
for _, s := range tc.samples {
_, err := a.Append(0, lbls, s.st, s.ts, s.fSample, s.h, nil, storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, a.Commit())
// Verify ST values are stored in chunks
ctx := context.Background()
idxReader, err := h.Index()
require.NoError(t, err)
defer idxReader.Close()
chkReader, err := h.Chunks()
require.NoError(t, err)
defer chkReader.Close()
p, err := idxReader.Postings(ctx, "foo", "bar")
require.NoError(t, err)
var lblBuilder labels.ScratchBuilder
require.True(t, p.Next())
sRef := p.At()
var chkMetas []chunks.Meta
require.NoError(t, idxReader.Series(sRef, &lblBuilder, &chkMetas))
// Read chunks and verify ST values
var actualSTs []int64
for _, meta := range chkMetas {
chk, iterable, err := chkReader.ChunkOrIterable(meta)
require.NoError(t, err)
require.Nil(t, iterable)
it := chk.Iterator(nil)
for it.Next() != chunkenc.ValNone {
st := it.AtST()
actualSTs = append(actualSTs, st)
}
require.NoError(t, it.Err())
}
// Verify expected ST values
if tc.isHistogram {
require.Equal(t, tc.expectedSTs, actualSTs, "Histogram samples should return 0 for ST")
} else {
require.Equal(t, tc.expectedSTs, actualSTs, "Float samples should have ST stored")
}
// Also verify via querier
q, err := NewBlockQuerier(h, math.MinInt64, math.MaxInt64)
require.NoError(t, err)
defer q.Close()
ss := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
require.True(t, ss.Next())
series := ss.At()
require.NoError(t, ss.Err())
seriesIt := series.Iterator(nil)
var queriedSTs []int64
for seriesIt.Next() != chunkenc.ValNone {
st := seriesIt.AtST()
queriedSTs = append(queriedSTs, st)
}
require.NoError(t, seriesIt.Err())
// Verify querier returns same ST values
require.Equal(t, tc.expectedSTs, queriedSTs, "Querier should return same ST values as chunk iterator")
})
}
}

View file

@ -349,7 +349,7 @@ func BenchmarkLoadWLs(b *testing.B) {
for k := 0; k < c.batches*c.seriesPerBatch; k++ {
// Create one mmapped chunk per series, with one sample at the given time.
s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, 0, defaultIsolationDisabled, false)
s.append(c.mmappedChunkT, 42, 0, cOpts)
s.append(false, 0, c.mmappedChunkT, 42, 0, cOpts)
// There's only one head chunk because only a single sample is appended. mmapChunks()
// ignores the latest chunk, so we need to cut a new head chunk to guarantee the chunk with
// the sample at c.mmappedChunkT is mmapped.
@ -7450,3 +7450,137 @@ func TestHeadAppender_WBLEncoder_EnableSTStorage(t *testing.T) {
})
}
}
// TestHeadAppender_STStorage_Disabled verifies that when EnableSTStorage is false,
// start timestamps are NOT stored in chunks (AtST returns 0).
func TestHeadAppender_STStorage_Disabled(t *testing.T) {
type sampleData struct {
st int64
ts int64
fSample float64
}
samples := []sampleData{
{st: 10, ts: 100, fSample: 1.0},
{st: 20, ts: 200, fSample: 2.0},
{st: 30, ts: 300, fSample: 3.0},
}
opts := newTestHeadDefaultOptions(DefaultBlockDuration, false)
opts.EnableSTStorage.Store(false) // Explicitly disable ST storage
h, _ := newTestHeadWithOptions(t, compression.None, opts)
lbls := labels.FromStrings("foo", "bar")
// Use AppenderV2 to append samples with ST values
a := h.AppenderV2(context.Background())
for _, s := range samples {
_, err := a.Append(0, lbls, s.st, s.ts, s.fSample, nil, nil, storage.AOptions{})
require.NoError(t, err)
}
require.NoError(t, a.Commit())
// Verify ST values are NOT stored (should all be 0)
ctx := context.Background()
idxReader, err := h.Index()
require.NoError(t, err)
defer idxReader.Close()
chkReader, err := h.Chunks()
require.NoError(t, err)
defer chkReader.Close()
p, err := idxReader.Postings(ctx, "foo", "bar")
require.NoError(t, err)
var lblBuilder labels.ScratchBuilder
require.True(t, p.Next())
sRef := p.At()
var chkMetas []chunks.Meta
require.NoError(t, idxReader.Series(sRef, &lblBuilder, &chkMetas))
// Read chunks and verify all ST values are 0
for _, meta := range chkMetas {
chk, iterable, err := chkReader.ChunkOrIterable(meta)
require.NoError(t, err)
require.Nil(t, iterable)
it := chk.Iterator(nil)
for it.Next() != chunkenc.ValNone {
st := it.AtST()
require.Equal(t, int64(0), st, "ST should be 0 when EnableSTStorage is false")
}
require.NoError(t, it.Err())
}
}
// TestHeadAppender_STStorage_ChunkEncoding verifies that the correct chunk encoding
// is used based on EnableSTStorage setting.
func TestHeadAppender_STStorage_ChunkEncoding(t *testing.T) {
samples := []struct {
st int64
ts int64
fSample float64
}{
{st: 10, ts: 100, fSample: 1.0},
{st: 20, ts: 200, fSample: 2.0},
}
for _, enableST := range []bool{false, true} {
t.Run(fmt.Sprintf("EnableSTStorage=%t", enableST), func(t *testing.T) {
opts := newTestHeadDefaultOptions(DefaultBlockDuration, false)
opts.EnableSTStorage.Store(enableST)
h, _ := newTestHeadWithOptions(t, compression.None, opts)
lbls := labels.FromStrings("foo", "bar")
a := h.Appender(context.Background())
for _, s := range samples {
_, err := a.AppendSTZeroSample(0, lbls, s.ts, s.st)
require.NoError(t, err)
_, err = a.Append(0, lbls, s.ts, s.fSample)
require.NoError(t, err)
}
require.NoError(t, a.Commit())
// Check chunk encoding
ctx := context.Background()
idxReader, err := h.Index()
require.NoError(t, err)
defer idxReader.Close()
chkReader, err := h.Chunks()
require.NoError(t, err)
defer chkReader.Close()
p, err := idxReader.Postings(ctx, "foo", "bar")
require.NoError(t, err)
var lblBuilder labels.ScratchBuilder
require.True(t, p.Next())
sRef := p.At()
var chkMetas []chunks.Meta
require.NoError(t, idxReader.Series(sRef, &lblBuilder, &chkMetas))
require.NotEmpty(t, chkMetas)
// Verify encoding
for _, meta := range chkMetas {
chk, iterable, err := chkReader.ChunkOrIterable(meta)
require.NoError(t, err)
require.Nil(t, iterable)
encoding := chk.Encoding()
if enableST {
// Should use ST-capable encoding
require.Equal(t, chunkenc.EncXOROptST, encoding,
"Expected ST-capable encoding when EnableSTStorage is true")
} else {
// Should use regular XOR encoding
require.Equal(t, chunkenc.EncXOR, encoding,
"Expected regular XOR encoding when EnableSTStorage is false")
}
}
})
}
}

View file

@ -669,8 +669,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
h.numStaleSeries.Dec()
}
// TODO(krajorama,ywwg): Pass ST when available in WBL.
if _, chunkCreated := ms.append(wp.storeST, 0, s.T, s.V, 0, appendChunkOpts); chunkCreated {
if _, chunkCreated := ms.append(wp.storeST, s.ST, s.T, s.V, 0, appendChunkOpts); chunkCreated {
h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc()
_ = ms.mmapChunks(h.chunkDiskMapper)
@ -1105,8 +1104,7 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (map[chunks.HeadSeriesR
missingSeries[s.Ref] = struct{}{}
continue
}
// TODO(krajorama,ywwg): Pass ST when available in WBL.
ok, chunkCreated, _ := ms.insert(wp.storeST, 0, s.T, s.V, nil, nil, h.chunkDiskMapper, oooCapMax, h.logger)
ok, chunkCreated, _ := ms.insert(wp.storeST, s.ST, s.T, s.V, nil, nil, h.chunkDiskMapper, oooCapMax, h.logger)
if chunkCreated {
h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc()

View file

@ -93,7 +93,7 @@ func (o *OOOChunk) ToEncodedChunks(storeST bool, mint, maxt int64) (chks []memCh
if s.t > maxt {
break
}
encoding := chunkenc.EncXOR
encoding := chunkenc.ValFloat.ChunkEncoding(storeST)
switch {
case s.h != nil:
// TODO(krajorama): use ST capable histogram chunk.
@ -101,10 +101,6 @@ func (o *OOOChunk) ToEncodedChunks(storeST bool, mint, maxt int64) (chks []memCh
case s.fh != nil:
// TODO(krajorama): use ST capable float histogram chunk.
encoding = chunkenc.EncFloatHistogram
default:
if storeST {
encoding = chunkenc.EncXOROptST
}
}
// prevApp is the appender for the previous sample.
@ -115,7 +111,7 @@ func (o *OOOChunk) ToEncodedChunks(storeST bool, mint, maxt int64) (chks []memCh
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})
}
cmint = s.t
chunk, err = chunkenc.NewEmptyChunk(encoding, storeST)
chunk, err = chunkenc.NewEmptyChunk(encoding)
if err != nil {
// This should never happen. No point using a default type as
// calling the wrong append function would panic.

View file

@ -1012,9 +1012,7 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool {
p.chunksFromIterable = append(p.chunksFromIterable, chunks.Meta{Chunk: currentChunk, MinTime: cmint, MaxTime: cmaxt})
}
cmint = p.currDelIter.AtT()
// Note: we're passing false for storeST, because we set the
// encoding explicitly.
if currentChunk, err = chunkenc.NewEmptyChunk(encoding, false); err != nil {
if currentChunk, err = chunkenc.NewEmptyChunk(encoding); err != nil {
break
}
if app, err = currentChunk.Appender(); err != nil {

View file

@ -765,7 +765,7 @@ type mockSampleIterator struct {
}
func (it *mockSampleIterator) Encoding() chunkenc.Encoding {
return it.s[it.idx].Type().ChunkEncodingWithST(it.s[it.idx].ST())
return it.s[it.idx].Type().ChunkEncoding(it.s[it.idx].ST() != 0)
}
func (it *mockSampleIterator) Seek(t int64) chunkenc.ValueType {