From e40f988f5c417e18c5dac1b5ee19e8599cb3488f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gy=C3=B6rgy=20Krajcsovits?= Date: Tue, 17 Feb 2026 17:25:14 +0100 Subject: [PATCH] feat(tsdb): allow appending to ST capable XOR chunk optionally MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See PR description for uptodate info on details. Signed-off-by: György Krajcsovits --- storage/series.go | 19 +++++---- tsdb/chunkenc/chunk.go | 30 +++++++++++++- tsdb/chunks/chunks.go | 16 ++++++- tsdb/chunks/chunks_test.go | 33 +++++++++++++++ tsdb/head_append.go | 83 ++++++++++++++++++++----------------- tsdb/head_append_v2_test.go | 2 +- tsdb/head_read_test.go | 2 +- tsdb/head_test.go | 58 +++++++++++++------------- tsdb/head_wal.go | 32 +++++++++----- tsdb/ooo_head.go | 51 +++++++++++------------ tsdb/ooo_head_read.go | 10 ++--- tsdb/ooo_head_test.go | 12 +++--- 12 files changed, 220 insertions(+), 128 deletions(-) diff --git a/storage/series.go b/storage/series.go index bf6df7db3e..2d7f643826 100644 --- a/storage/series.go +++ b/storage/series.go @@ -341,11 +341,14 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { i := 0 seriesIter := s.Series.Iterator(nil) lastType := chunkenc.ValNone + lastHadST := false for typ := seriesIter.Next(); typ != chunkenc.ValNone; typ = seriesIter.Next() { - if typ != lastType || i >= seriesToChunkEncoderSplit { + st := seriesIter.AtST() + hasST := st != 0 + if typ != lastType || lastHadST != hasST || i >= seriesToChunkEncoderSplit { // Create a new chunk if the sample type changed or too many samples in the current one. chks = appendChunk(chks, mint, maxt, chk) - chk, err = chunkenc.NewEmptyChunk(typ.ChunkEncoding()) + chk, err = chunkenc.NewEmptyChunk(typ.ChunkEncoding(), hasST) if err != nil { return errChunksIterator{err: err} } @@ -358,21 +361,20 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { i = 0 } lastType = typ + lastHadST = hasST var ( - st, t int64 - v float64 - h *histogram.Histogram - fh *histogram.FloatHistogram + t int64 + v float64 + h *histogram.Histogram + fh *histogram.FloatHistogram ) switch typ { case chunkenc.ValFloat: t, v = seriesIter.At() - st = seriesIter.AtST() app.Append(st, t, v) case chunkenc.ValHistogram: t, h = seriesIter.AtHistogram(nil) - st = seriesIter.AtST() newChk, recoded, app, err = app.AppendHistogram(nil, st, t, h, false) if err != nil { return errChunksIterator{err: err} @@ -388,7 +390,6 @@ func (s *seriesToChunkEncoder) Iterator(it chunks.Iterator) chunks.Iterator { } case chunkenc.ValFloatHistogram: t, fh = seriesIter.AtFloatHistogram(nil) - st = seriesIter.AtST() newChk, recoded, app, err = app.AppendFloatHistogram(nil, st, t, fh, false) if err != nil { return errChunksIterator{err: err} diff --git a/tsdb/chunkenc/chunk.go b/tsdb/chunkenc/chunk.go index 6fb8de2a77..023356593f 100644 --- a/tsdb/chunkenc/chunk.go +++ b/tsdb/chunkenc/chunk.go @@ -76,6 +76,8 @@ type Chunk interface { Bytes() []byte // Encoding returns the encoding type of the chunk. + // If the chunk is capable of storing ST (start timestamps), it should + // return the appropriate encoding type (e.g., EncXOROptST). Encoding() Encoding // Appender returns an appender to append samples to the chunk. @@ -202,13 +204,34 @@ func (v ValueType) ChunkEncoding() Encoding { } } -func (v ValueType) NewChunk() (Chunk, error) { +func (v ValueType) ChunkEncodingWithST(st int64) Encoding { switch v { case ValFloat: + if st != 0 { + return EncXOROptST + } + return EncXOR + case ValHistogram: + return EncHistogram + case ValFloatHistogram: + return EncFloatHistogram + default: + return EncNone + } +} + +func (v ValueType) NewChunk(storeST bool) (Chunk, error) { + switch v { + case ValFloat: + if storeST { + return NewXOROptSTChunk(), nil + } return NewXORChunk(), nil case ValHistogram: + // TODO(krajorama): return a ST capable histogram chunk when they are supported. return NewHistogramChunk(), nil case ValFloatHistogram: + // TODO(krajorama): return a ST capable float histogram chunk when they are supported. return NewFloatHistogramChunk(), nil default: return nil, fmt.Errorf("value type %v unsupported", v) @@ -399,9 +422,12 @@ func FromData(e Encoding, d []byte) (Chunk, error) { } // NewEmptyChunk returns an empty chunk for the given encoding. -func NewEmptyChunk(e Encoding) (Chunk, error) { +func NewEmptyChunk(e Encoding, storeST bool) (Chunk, error) { switch e { case EncXOR: + if storeST { + return NewXOROptSTChunk(), nil + } return NewXORChunk(), nil case EncHistogram: return NewHistogramChunk(), nil diff --git a/tsdb/chunks/chunks.go b/tsdb/chunks/chunks.go index 9b4e011562..88835b382a 100644 --- a/tsdb/chunks/chunks.go +++ b/tsdb/chunks/chunks.go @@ -135,7 +135,9 @@ type Meta struct { } // ChunkFromSamples requires all samples to have the same type. -// TODO(krajorama): test with ST when chunk formats support it. +// It is not efficient and meant for testing purposes only. +// It scans the samples to determine whether any sample has ST set and +// creates a chunk accordingly. func ChunkFromSamples(s []Sample) (Meta, error) { return ChunkFromSamplesGeneric(SampleSlice(s)) } @@ -154,7 +156,17 @@ func ChunkFromSamplesGeneric(s Samples) (Meta, error) { } sampleType := s.Get(0).Type() - c, err := chunkenc.NewEmptyChunk(sampleType.ChunkEncoding()) + + hasST := false + for i := range s.Len() { + if s.Get(i).ST() != 0 { + hasST = true + break + } + } + + // Request storing ST in the chunk if available. + c, err := chunkenc.NewEmptyChunk(sampleType.ChunkEncoding(), hasST) if err != nil { return Meta{}, err } diff --git a/tsdb/chunks/chunks_test.go b/tsdb/chunks/chunks_test.go index f40f996fde..e3b0454e37 100644 --- a/tsdb/chunks/chunks_test.go +++ b/tsdb/chunks/chunks_test.go @@ -19,6 +19,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/tsdbutil" ) @@ -58,3 +59,35 @@ func TestWriterWithDefaultSegmentSize(t *testing.T) { require.NoError(t, err) require.Len(t, d, 1, "expected only one segment to be created to hold both chunks") } + +func TestChunkFromSamplesWithST(t *testing.T) { + // Create samples with explicit ST (source timestamp) values + samples := []Sample{ + sample{t: 10, f: 11, st: 5}, + sample{t: 20, f: 12, st: 15}, + sample{t: 30, f: 13, st: 25}, + } + + chk, err := ChunkFromSamples(samples) + require.NoError(t, err) + require.NotNil(t, chk.Chunk) + + // Verify MinTime and MaxTime + require.Equal(t, int64(10), chk.MinTime) + require.Equal(t, int64(30), chk.MaxTime) + + // Iterate over the chunk and verify ST values are preserved + it := chk.Chunk.Iterator(nil) + idx := 0 + for vt := it.Next(); vt != chunkenc.ValNone; vt = it.Next() { + require.Equal(t, chunkenc.ValFloat, vt) + ts, v := it.At() + st := it.AtST() + require.Equal(t, samples[idx].ST(), st, "ST mismatch at index %d", idx) + require.Equal(t, samples[idx].T(), ts, "T mismatch at index %d", idx) + require.Equal(t, samples[idx].F(), v, "F mismatch at index %d", idx) + idx++ + } + require.NoError(t, it.Err()) + require.Equal(t, len(samples), idx, "expected all samples to be iterated") +} diff --git a/tsdb/head_append.go b/tsdb/head_append.go index e673466ccc..de449e3401 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -185,6 +185,7 @@ func (h *Head) appender() *headAppender { typesInBatch: h.getTypeMap(), appendID: appendID, cleanupAppendIDsBelow: cleanupAppendIDsBelow, + storeST: h.opts.EnableSTStorage.Load(), }, } } @@ -412,6 +413,7 @@ type headAppenderBase struct { appendID, cleanupAppendIDsBelow uint64 closed bool + storeST bool } type headAppender struct { headAppenderBase @@ -1387,7 +1389,8 @@ func (a *headAppenderBase) commitFloats(b *appendBatch, acc *appenderCommitConte // Sample is OOO and OOO handling is enabled // and the delta is within the OOO tolerance. var mmapRefs []chunks.ChunkDiskMapperRef - ok, chunkCreated, mmapRefs = series.insert(s.T, s.V, nil, nil, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger) + // TODO(krajorama,ywwg): Pass ST when available in WAL. + ok, chunkCreated, mmapRefs = series.insert(a.storeST, 0, s.T, s.V, nil, nil, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger) if chunkCreated { r, ok := acc.oooMmapMarkers[series.ref] if !ok || r != nil { @@ -1431,7 +1434,8 @@ func (a *headAppenderBase) commitFloats(b *appendBatch, acc *appenderCommitConte default: newlyStale := !value.IsStaleNaN(series.lastValue) && value.IsStaleNaN(s.V) staleToNonStale := value.IsStaleNaN(series.lastValue) && !value.IsStaleNaN(s.V) - ok, chunkCreated = series.append(s.T, s.V, a.appendID, acc.appendChunkOpts) + // TODO(krajorama,ywwg): pass ST when available in WAL. + ok, chunkCreated = series.append(a.storeST, 0, s.T, s.V, a.appendID, acc.appendChunkOpts) if ok { if s.T < acc.inOrderMint { acc.inOrderMint = s.T @@ -1492,7 +1496,8 @@ func (a *headAppenderBase) commitHistograms(b *appendBatch, acc *appenderCommitC // Sample is OOO and OOO handling is enabled // and the delta is within the OOO tolerance. var mmapRefs []chunks.ChunkDiskMapperRef - ok, chunkCreated, mmapRefs = series.insert(s.T, 0, s.H, nil, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger) + // TODO(krajorama,ywwg): Pass ST when available in WAL. + ok, chunkCreated, mmapRefs = series.insert(a.storeST, 0, s.T, 0, s.H, nil, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger) if chunkCreated { r, ok := acc.oooMmapMarkers[series.ref] if !ok || r != nil { @@ -1540,7 +1545,8 @@ func (a *headAppenderBase) commitHistograms(b *appendBatch, acc *appenderCommitC newlyStale = newlyStale && !value.IsStaleNaN(series.lastHistogramValue.Sum) staleToNonStale = value.IsStaleNaN(series.lastHistogramValue.Sum) && !value.IsStaleNaN(s.H.Sum) } - ok, chunkCreated = series.appendHistogram(s.T, s.H, a.appendID, acc.appendChunkOpts) + // TODO(krajorama,ywwg): pass ST when available in WAL. + ok, chunkCreated = series.appendHistogram(a.storeST, 0, s.T, s.H, a.appendID, acc.appendChunkOpts) if ok { if s.T < acc.inOrderMint { acc.inOrderMint = s.T @@ -1601,7 +1607,8 @@ func (a *headAppenderBase) commitFloatHistograms(b *appendBatch, acc *appenderCo // Sample is OOO and OOO handling is enabled // and the delta is within the OOO tolerance. var mmapRefs []chunks.ChunkDiskMapperRef - ok, chunkCreated, mmapRefs = series.insert(s.T, 0, nil, s.FH, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger) + // TODO(krajorama,ywwg): Pass ST when available in WAL. + ok, chunkCreated, mmapRefs = series.insert(a.storeST, 0, s.T, 0, nil, s.FH, a.head.chunkDiskMapper, acc.oooCapMax, a.head.logger) if chunkCreated { r, ok := acc.oooMmapMarkers[series.ref] if !ok || r != nil { @@ -1649,7 +1656,8 @@ func (a *headAppenderBase) commitFloatHistograms(b *appendBatch, acc *appenderCo newlyStale = newlyStale && !value.IsStaleNaN(series.lastFloatHistogramValue.Sum) staleToNonStale = value.IsStaleNaN(series.lastFloatHistogramValue.Sum) && !value.IsStaleNaN(s.FH.Sum) } - ok, chunkCreated = series.appendFloatHistogram(s.T, s.FH, a.appendID, acc.appendChunkOpts) + // TODO(krajorama,ywwg): pass ST when available in WAL. + ok, chunkCreated = series.appendFloatHistogram(a.storeST, 0, s.T, s.FH, a.appendID, acc.appendChunkOpts) if ok { if s.T < acc.inOrderMint { acc.inOrderMint = s.T @@ -1799,18 +1807,18 @@ func (a *headAppenderBase) Commit() (err error) { } // insert is like append, except it inserts. Used for OOO samples. -func (s *memSeries) insert(t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64, logger *slog.Logger) (inserted, chunkCreated bool, mmapRefs []chunks.ChunkDiskMapperRef) { +func (s *memSeries) insert(storeST bool, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64, logger *slog.Logger) (inserted, chunkCreated bool, mmapRefs []chunks.ChunkDiskMapperRef) { if s.ooo == nil { s.ooo = &memSeriesOOOFields{} } c := s.ooo.oooHeadChunk if c == nil || c.chunk.NumSamples() == int(oooCapMax) { // Note: If no new samples come in then we rely on compaction to clean up stale in-memory OOO chunks. - c, mmapRefs = s.cutNewOOOHeadChunk(t, chunkDiskMapper, logger) + c, mmapRefs = s.cutNewOOOHeadChunk(storeST, t, chunkDiskMapper, logger) chunkCreated = true } - ok := c.chunk.Insert(t, v, h, fh) + ok := c.chunk.Insert(st, t, v, h, fh) if ok { if chunkCreated || t < c.minTime { c.minTime = t @@ -1833,13 +1841,12 @@ type chunkOpts struct { // the appendID for isolation. (The appendID can be zero, which results in no // isolation for this append.) // Series lock must be held when calling. -func (s *memSeries) append(t int64, v float64, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) { - c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncXOR, o) +func (s *memSeries) append(storeST bool, st, t int64, v float64, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) { + c, sampleInOrder, chunkCreated := s.appendPreprocessor(storeST, t, chunkenc.EncXOR, o) if !sampleInOrder { return sampleInOrder, chunkCreated } - // TODO(krajorama): pass ST. - s.app.Append(0, t, v) + s.app.Append(st, t, v) c.maxTime = t @@ -1859,14 +1866,14 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, o chunkOpts) (sa // In case of recoding the existing chunk, a new chunk is allocated and the old chunk is dropped. // To keep the meaning of prometheus_tsdb_head_chunks and prometheus_tsdb_head_chunks_created_total // consistent, we return chunkCreated=false in this case. -func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) { +func (s *memSeries) appendHistogram(storeST bool, st, t int64, h *histogram.Histogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) { // Head controls the execution of recoding, so that we own the proper // chunk reference afterwards and mmap used up chunks. // Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway. prevApp, _ := s.app.(*chunkenc.HistogramAppender) - c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, chunkenc.EncHistogram, o) + c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(storeST, t, chunkenc.EncHistogram, o) if !sampleInOrder { return sampleInOrder, chunkCreated } @@ -1881,8 +1888,7 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui prevApp = nil } - // TODO(krajorama): pass ST. - newChunk, recoded, s.app, _ = s.app.AppendHistogram(prevApp, 0, t, h, false) // false=request a new chunk if needed + newChunk, recoded, s.app, _ = s.app.AppendHistogram(prevApp, st, t, h, false) // false=request a new chunk if needed s.lastHistogramValue = h s.lastFloatHistogramValue = nil @@ -1917,14 +1923,14 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui // In case of recoding the existing chunk, a new chunk is allocated and the old chunk is dropped. // To keep the meaning of prometheus_tsdb_head_chunks and prometheus_tsdb_head_chunks_created_total // consistent, we return chunkCreated=false in this case. -func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) { +func (s *memSeries) appendFloatHistogram(storeST bool, st, t int64, fh *histogram.FloatHistogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) { // Head controls the execution of recoding, so that we own the proper // chunk reference afterwards and mmap used up chunks. // Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway. prevApp, _ := s.app.(*chunkenc.FloatHistogramAppender) - c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(t, chunkenc.EncFloatHistogram, o) + c, sampleInOrder, chunkCreated := s.histogramsAppendPreprocessor(storeST, t, chunkenc.EncFloatHistogram, o) if !sampleInOrder { return sampleInOrder, chunkCreated } @@ -1939,8 +1945,7 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, prevApp = nil } - // TODO(krajorama): pass ST. - newChunk, recoded, s.app, _ = s.app.AppendFloatHistogram(prevApp, 0, t, fh, false) // False means request a new chunk if needed. + newChunk, recoded, s.app, _ = s.app.AppendFloatHistogram(prevApp, st, t, fh, false) // False means request a new chunk if needed. s.lastHistogramValue = nil s.lastFloatHistogramValue = fh @@ -1974,7 +1979,7 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, // number of samples they contain with a soft cap in bytes. // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. // This should be called only when appending data. -func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts) (c *memChunk, sampleInOrder, chunkCreated bool) { +func (s *memSeries) appendPreprocessor(storeST bool, t int64, e chunkenc.Encoding, o chunkOpts) (c *memChunk, sampleInOrder, chunkCreated bool) { // We target chunkenc.MaxBytesPerXORChunk as a hard for the size of an XOR chunk. We must determine whether to cut // a new head chunk without knowing the size of the next sample, however, so we assume the next sample will be a // maximally-sized sample (19 bytes). @@ -1988,7 +1993,7 @@ func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts return c, false, false } // There is no head chunk in this series yet, create the first chunk for the sample. - c = s.cutNewHeadChunk(t, e, o.chunkRange) + c = s.cutNewHeadChunk(storeST, t, e, o.chunkRange) chunkCreated = true } @@ -1999,14 +2004,14 @@ func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts // Check the chunk size, unless we just created it and if the chunk is too large, cut a new one. if !chunkCreated && len(c.chunk.Bytes()) > maxBytesPerXORChunk { - c = s.cutNewHeadChunk(t, e, o.chunkRange) + c = s.cutNewHeadChunk(storeST, t, e, o.chunkRange) chunkCreated = true } if c.chunk.Encoding() != e { // The chunk encoding expected by this append is different than the head chunk's // encoding. So we cut a new chunk with the expected encoding. - c = s.cutNewHeadChunk(t, e, o.chunkRange) + c = s.cutNewHeadChunk(storeST, t, e, o.chunkRange) chunkCreated = true } @@ -2031,7 +2036,7 @@ func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts // as we expect more chunks to come. // Note that next chunk will have its nextAt recalculated for the new rate. if t >= s.nextAt || numSamples >= o.samplesPerChunk*2 { - c = s.cutNewHeadChunk(t, e, o.chunkRange) + c = s.cutNewHeadChunk(storeST, t, e, o.chunkRange) chunkCreated = true } @@ -2042,7 +2047,7 @@ func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts // cut based on their size in bytes. // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. // This should be called only when appending data. -func (s *memSeries) histogramsAppendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts) (c *memChunk, sampleInOrder, chunkCreated bool) { +func (s *memSeries) histogramsAppendPreprocessor(storeST bool, t int64, e chunkenc.Encoding, o chunkOpts) (c *memChunk, sampleInOrder, chunkCreated bool) { c = s.headChunks if c == nil { @@ -2051,7 +2056,7 @@ func (s *memSeries) histogramsAppendPreprocessor(t int64, e chunkenc.Encoding, o return c, false, false } // There is no head chunk in this series yet, create the first chunk for the sample. - c = s.cutNewHeadChunk(t, e, o.chunkRange) + c = s.cutNewHeadChunk(storeST, t, e, o.chunkRange) chunkCreated = true } @@ -2063,7 +2068,7 @@ func (s *memSeries) histogramsAppendPreprocessor(t int64, e chunkenc.Encoding, o if c.chunk.Encoding() != e { // The chunk encoding expected by this append is different than the head chunk's // encoding. So we cut a new chunk with the expected encoding. - c = s.cutNewHeadChunk(t, e, o.chunkRange) + c = s.cutNewHeadChunk(storeST, t, e, o.chunkRange) chunkCreated = true } @@ -2104,7 +2109,7 @@ func (s *memSeries) histogramsAppendPreprocessor(t int64, e chunkenc.Encoding, o // increased or if the bucket/span count has increased. // Note that next chunk will have its nextAt recalculated for the new rate. if (t >= s.nextAt || numBytes >= targetBytes*2) && (numSamples >= chunkenc.MinSamplesPerHistogramChunk || t >= nextChunkRangeStart) { - c = s.cutNewHeadChunk(t, e, o.chunkRange) + c = s.cutNewHeadChunk(storeST, t, e, o.chunkRange) chunkCreated = true } @@ -2129,7 +2134,7 @@ func computeChunkEndTime(start, cur, maxT int64, ratioToFull float64) int64 { return int64(float64(start) + float64(maxT-start)/math.Floor(n)) } -func (s *memSeries) cutNewHeadChunk(mint int64, e chunkenc.Encoding, chunkRange int64) *memChunk { +func (s *memSeries) cutNewHeadChunk(storeST bool, mint int64, e chunkenc.Encoding, chunkRange int64) *memChunk { // When cutting a new head chunk we create a new memChunk instance with .prev // pointing at the current .headChunks, so it forms a linked list. // All but first headChunks list elements will be m-mapped as soon as possible @@ -2142,12 +2147,16 @@ func (s *memSeries) cutNewHeadChunk(mint int64, e chunkenc.Encoding, chunkRange if chunkenc.IsValidEncoding(e) { var err error - s.headChunks.chunk, err = chunkenc.NewEmptyChunk(e) + s.headChunks.chunk, err = chunkenc.NewEmptyChunk(e, storeST) if err != nil { panic(err) // This should never happen. } } else { - s.headChunks.chunk = chunkenc.NewXORChunk() + var err error + s.headChunks.chunk, err = chunkenc.NewEmptyChunk(chunkenc.EncXOR, storeST) + if err != nil { + panic(err) // This should never happen. + } } // Set upper bound on when the next chunk must be started. An earlier timestamp @@ -2164,8 +2173,8 @@ func (s *memSeries) cutNewHeadChunk(mint int64, e chunkenc.Encoding, chunkRange // cutNewOOOHeadChunk cuts a new OOO chunk and m-maps the old chunk. // The caller must ensure that s is locked and s.ooo is not nil. -func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper, logger *slog.Logger) (*oooHeadChunk, []chunks.ChunkDiskMapperRef) { - ref := s.mmapCurrentOOOHeadChunk(chunkDiskMapper, logger) +func (s *memSeries) cutNewOOOHeadChunk(storeST bool, mint int64, chunkDiskMapper *chunks.ChunkDiskMapper, logger *slog.Logger) (*oooHeadChunk, []chunks.ChunkDiskMapperRef) { + ref := s.mmapCurrentOOOHeadChunk(storeST, chunkDiskMapper, logger) s.ooo.oooHeadChunk = &oooHeadChunk{ chunk: NewOOOChunk(), @@ -2177,12 +2186,12 @@ func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.Chunk } // s must be locked when calling. -func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper, logger *slog.Logger) []chunks.ChunkDiskMapperRef { +func (s *memSeries) mmapCurrentOOOHeadChunk(storeST bool, chunkDiskMapper *chunks.ChunkDiskMapper, logger *slog.Logger) []chunks.ChunkDiskMapperRef { if s.ooo == nil || s.ooo.oooHeadChunk == nil { // OOO is not enabled or there is no head chunk, so nothing to m-map here. return nil } - chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(math.MinInt64, math.MaxInt64) + chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(storeST, math.MinInt64, math.MaxInt64) if err != nil { handleChunkWriteError(err) return nil diff --git a/tsdb/head_append_v2_test.go b/tsdb/head_append_v2_test.go index 539ac22fd7..cbac061f1a 100644 --- a/tsdb/head_append_v2_test.go +++ b/tsdb/head_append_v2_test.go @@ -2993,7 +2993,7 @@ func testWBLReplayAppenderV2(t *testing.T, scenario sampleTypeScenario) { require.False(t, ok) require.NotNil(t, ms) - chks, err := ms.ooo.oooHeadChunk.chunk.ToEncodedChunks(math.MinInt64, math.MaxInt64) + chks, err := ms.ooo.oooHeadChunk.chunk.ToEncodedChunks(false, math.MinInt64, math.MaxInt64) require.NoError(t, err) require.Len(t, chks, 1) diff --git a/tsdb/head_read_test.go b/tsdb/head_read_test.go index cf55973a01..46d129510e 100644 --- a/tsdb/head_read_test.go +++ b/tsdb/head_read_test.go @@ -33,7 +33,7 @@ func TestMemSeries_chunk(t *testing.T) { appendSamples := func(t *testing.T, s *memSeries, start, end int64, cdm *chunks.ChunkDiskMapper) { for i := start; i < end; i += chunkStep { - ok, _ := s.append(i, float64(i), 0, chunkOpts{ + ok, _ := s.append(false, 0, i, float64(i), 0, chunkOpts{ chunkDiskMapper: cdm, chunkRange: chunkRange, samplesPerChunk: DefaultSamplesPerChunk, diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 91cd742330..34f7637b1c 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -1492,7 +1492,7 @@ func TestMemSeries_truncateChunks(t *testing.T) { s := newMemSeries(labels.FromStrings("a", "b"), 1, 0, defaultIsolationDisabled, false) for i := 0; i < 4000; i += 5 { - ok, _ := s.append(int64(i), float64(i), 0, cOpts) + ok, _ := s.append(false, 0, int64(i), float64(i), 0, cOpts) require.True(t, ok, "sample append failed") } s.mmapChunks(chunkDiskMapper) @@ -1642,7 +1642,7 @@ func TestMemSeries_truncateChunks_scenarios(t *testing.T) { if tc.mmappedChunks > 0 { headStart = (tc.mmappedChunks + 1) * chunkRange for i := 0; i < (tc.mmappedChunks+1)*chunkRange; i += chunkStep { - ok, _ := series.append(int64(i), float64(i), 0, cOpts) + ok, _ := series.append(false, 0, int64(i), float64(i), 0, cOpts) require.True(t, ok, "sample append failed") } series.mmapChunks(chunkDiskMapper) @@ -1652,7 +1652,7 @@ func TestMemSeries_truncateChunks_scenarios(t *testing.T) { series.headChunks = nil } else { for i := headStart; i < chunkRange*(tc.mmappedChunks+tc.headChunks); i += chunkStep { - ok, _ := series.append(int64(i), float64(i), 0, cOpts) + ok, _ := series.append(false, 0, int64(i), float64(i), 0, cOpts) require.True(t, ok, "sample append failed: %d", i) } } @@ -2202,20 +2202,20 @@ func TestMemSeries_append(t *testing.T) { // Add first two samples at the very end of a chunk range and the next two // on and after it. // New chunk must correctly be cut at 1000. - ok, chunkCreated := s.append(998, 1, 0, cOpts) + ok, chunkCreated := s.append(false, 0, 998, 1, 0, cOpts) require.True(t, ok, "append failed") require.True(t, chunkCreated, "first sample created chunk") - ok, chunkCreated = s.append(999, 2, 0, cOpts) + ok, chunkCreated = s.append(false, 0, 999, 2, 0, cOpts) require.True(t, ok, "append failed") require.False(t, chunkCreated, "second sample should use same chunk") s.mmapChunks(chunkDiskMapper) - ok, chunkCreated = s.append(1000, 3, 0, cOpts) + ok, chunkCreated = s.append(false, 0, 1000, 3, 0, cOpts) require.True(t, ok, "append failed") require.True(t, chunkCreated, "expected new chunk on boundary") - ok, chunkCreated = s.append(1001, 4, 0, cOpts) + ok, chunkCreated = s.append(false, 0, 1001, 4, 0, cOpts) require.True(t, ok, "append failed") require.False(t, chunkCreated, "second sample should use same chunk") @@ -2229,7 +2229,7 @@ func TestMemSeries_append(t *testing.T) { // Fill the range [1000,2000) with many samples. Intermediate chunks should be cut // at approximately 120 samples per chunk. for i := 1; i < 1000; i++ { - ok, _ := s.append(1001+int64(i), float64(i), 0, cOpts) + ok, _ := s.append(false, 0, 1001+int64(i), float64(i), 0, cOpts) require.True(t, ok, "append failed") } s.mmapChunks(chunkDiskMapper) @@ -2270,19 +2270,19 @@ func TestMemSeries_appendHistogram(t *testing.T) { // Add first two samples at the very end of a chunk range and the next two // on and after it. // New chunk must correctly be cut at 1000. - ok, chunkCreated := s.appendHistogram(998, histograms[0], 0, cOpts) + ok, chunkCreated := s.appendHistogram(false, 0, 998, histograms[0], 0, cOpts) require.True(t, ok, "append failed") require.True(t, chunkCreated, "first sample created chunk") - ok, chunkCreated = s.appendHistogram(999, histograms[1], 0, cOpts) + ok, chunkCreated = s.appendHistogram(false, 0, 999, histograms[1], 0, cOpts) require.True(t, ok, "append failed") require.False(t, chunkCreated, "second sample should use same chunk") - ok, chunkCreated = s.appendHistogram(1000, histograms[2], 0, cOpts) + ok, chunkCreated = s.appendHistogram(false, 0, 1000, histograms[2], 0, cOpts) require.True(t, ok, "append failed") require.True(t, chunkCreated, "expected new chunk on boundary") - ok, chunkCreated = s.appendHistogram(1001, histograms[3], 0, cOpts) + ok, chunkCreated = s.appendHistogram(false, 0, 1001, histograms[3], 0, cOpts) require.True(t, ok, "append failed") require.False(t, chunkCreated, "second sample should use same chunk") @@ -2293,7 +2293,7 @@ func TestMemSeries_appendHistogram(t *testing.T) { require.Equal(t, int64(1000), s.headChunks.minTime, "wrong chunk range") require.Equal(t, int64(1001), s.headChunks.maxTime, "wrong chunk range") - ok, chunkCreated = s.appendHistogram(1002, histogramWithOneMoreBucket, 0, cOpts) + ok, chunkCreated = s.appendHistogram(false, 0, 1002, histogramWithOneMoreBucket, 0, cOpts) require.True(t, ok, "append failed") require.False(t, chunkCreated, "third sample should trigger a re-encoded chunk") @@ -2328,7 +2328,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) { var nextTs int64 var totalAppendedSamples int for i := range samplesPerChunk / 4 { - ok, _ := s.append(nextTs, float64(i), 0, cOpts) + ok, _ := s.append(false, 0, nextTs, float64(i), 0, cOpts) require.Truef(t, ok, "slow sample %d was not appended", i) nextTs += slowRate totalAppendedSamples++ @@ -2337,12 +2337,12 @@ func TestMemSeries_append_atVariableRate(t *testing.T) { // Suddenly, the rate increases and we receive a sample every millisecond. for i := range math.MaxUint16 { - ok, _ := s.append(nextTs, float64(i), 0, cOpts) + ok, _ := s.append(false, 0, nextTs, float64(i), 0, cOpts) require.Truef(t, ok, "quick sample %d was not appended", i) nextTs++ totalAppendedSamples++ } - ok, chunkCreated := s.append(DefaultBlockDuration, float64(0), 0, cOpts) + ok, chunkCreated := s.append(false, 0, DefaultBlockDuration, float64(0), 0, cOpts) require.True(t, ok, "new chunk sample was not appended") require.True(t, chunkCreated, "sample at block duration timestamp should create a new chunk") @@ -2371,18 +2371,18 @@ func TestGCChunkAccess(t *testing.T) { s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"), false) // Appending 2 samples for the first chunk. - ok, chunkCreated := s.append(0, 0, 0, cOpts) + ok, chunkCreated := s.append(false, 0, 0, 0, 0, cOpts) require.True(t, ok, "series append failed") require.True(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(999, 999, 0, cOpts) + ok, chunkCreated = s.append(false, 0, 999, 999, 0, cOpts) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunks was created") // A new chunks should be created here as it's beyond the chunk range. - ok, chunkCreated = s.append(1000, 1000, 0, cOpts) + ok, chunkCreated = s.append(false, 0, 1000, 1000, 0, cOpts) require.True(t, ok, "series append failed") require.True(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(1999, 1999, 0, cOpts) + ok, chunkCreated = s.append(false, 0, 1999, 1999, 0, cOpts) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunks was created") @@ -2427,18 +2427,18 @@ func TestGCSeriesAccess(t *testing.T) { s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"), false) // Appending 2 samples for the first chunk. - ok, chunkCreated := s.append(0, 0, 0, cOpts) + ok, chunkCreated := s.append(false, 0, 0, 0, 0, cOpts) require.True(t, ok, "series append failed") require.True(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(999, 999, 0, cOpts) + ok, chunkCreated = s.append(false, 0, 999, 999, 0, cOpts) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunks was created") // A new chunks should be created here as it's beyond the chunk range. - ok, chunkCreated = s.append(1000, 1000, 0, cOpts) + ok, chunkCreated = s.append(false, 0, 1000, 1000, 0, cOpts) require.True(t, ok, "series append failed") require.True(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(1999, 1999, 0, cOpts) + ok, chunkCreated = s.append(false, 0, 1999, 1999, 0, cOpts) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunks was created") @@ -2775,10 +2775,10 @@ func TestHeadReadWriterRepair(t *testing.T) { require.True(t, created, "series was not created") for i := range 7 { - ok, chunkCreated := s.append(int64(i*chunkRange), float64(i*chunkRange), 0, cOpts) + ok, chunkCreated := s.append(false, 0, int64(i*chunkRange), float64(i*chunkRange), 0, cOpts) require.True(t, ok, "series append failed") require.True(t, chunkCreated, "chunk was not created") - ok, chunkCreated = s.append(int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, cOpts) + ok, chunkCreated = s.append(false, 0, int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, cOpts) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunk was created") h.chunkDiskMapper.CutNewFile() @@ -3118,7 +3118,7 @@ func TestIsolationAppendIDZeroIsNoop(t *testing.T) { s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"), false) - ok, _ := s.append(0, 0, 0, cOpts) + ok, _ := s.append(false, 0, 0, 0, 0, cOpts) require.True(t, ok, "Series append failed.") require.Equal(t, 0, int(s.txs.txIDCount), "Series should not have an appendID after append with appendID=0.") } @@ -3678,7 +3678,7 @@ func TestIteratorSeekIntoBuffer(t *testing.T) { s := newMemSeries(labels.Labels{}, 1, 0, defaultIsolationDisabled, false) for i := range 7 { - ok, _ := s.append(int64(i), float64(i), 0, cOpts) + ok, _ := s.append(false, 0, int64(i), float64(i), 0, cOpts) require.True(t, ok, "sample append failed") } @@ -5569,7 +5569,7 @@ func testWBLReplay(t *testing.T, scenario sampleTypeScenario) { require.False(t, ok) require.NotNil(t, ms) - chks, err := ms.ooo.oooHeadChunk.chunk.ToEncodedChunks(math.MinInt64, math.MaxInt64) + chks, err := ms.ooo.oooHeadChunk.chunk.ToEncodedChunks(false, math.MinInt64, math.MaxInt64) require.NoError(t, err) require.Len(t, chks, 1) diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 0a54ae3878..cb9916067f 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -115,8 +115,9 @@ func (h *Head) loadWAL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch }() wg.Add(concurrency) + storeST := h.opts.EnableSTStorage.Load() for i := range concurrency { - processors[i].setup() + processors[i].setup(storeST) go func(wp *walSubsetProcessor) { missingSeries, unknownSamples, unknownHistograms, overlapping := wp.processWALSamples(h, mmappedChunks, oooMmappedChunks) @@ -576,6 +577,7 @@ type walSubsetProcessor struct { input chan walSubsetProcessorInputItem output chan []record.RefSample histogramsOutput chan []histogramRecord + storeST bool } type walSubsetProcessorInputItem struct { @@ -586,10 +588,11 @@ type walSubsetProcessorInputItem struct { deletedSeriesRefs []chunks.HeadSeriesRef } -func (wp *walSubsetProcessor) setup() { +func (wp *walSubsetProcessor) setup(storeST bool) { wp.input = make(chan walSubsetProcessorInputItem, 300) wp.output = make(chan []record.RefSample, 300) wp.histogramsOutput = make(chan []histogramRecord, 300) + wp.storeST = storeST } func (wp *walSubsetProcessor) closeAndDrain() { @@ -666,7 +669,8 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp h.numStaleSeries.Dec() } - if _, chunkCreated := ms.append(s.T, s.V, 0, appendChunkOpts); chunkCreated { + // TODO(krajorama,ywwg): Pass ST when available in WBL. + if _, chunkCreated := ms.append(wp.storeST, 0, s.T, s.V, 0, appendChunkOpts); chunkCreated { h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() _ = ms.mmapChunks(h.chunkDiskMapper) @@ -703,14 +707,16 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp newlyStale = newlyStale && !value.IsStaleNaN(ms.lastHistogramValue.Sum) staleToNonStale = value.IsStaleNaN(ms.lastHistogramValue.Sum) && !value.IsStaleNaN(s.h.Sum) } - _, chunkCreated = ms.appendHistogram(s.t, s.h, 0, appendChunkOpts) + // TODO(krajorama,ywwg): Pass ST when available in WBL. + _, chunkCreated = ms.appendHistogram(wp.storeST, 0, s.t, s.h, 0, appendChunkOpts) } else { newlyStale = value.IsStaleNaN(s.fh.Sum) if ms.lastFloatHistogramValue != nil { newlyStale = newlyStale && !value.IsStaleNaN(ms.lastFloatHistogramValue.Sum) staleToNonStale = value.IsStaleNaN(ms.lastFloatHistogramValue.Sum) && !value.IsStaleNaN(s.fh.Sum) } - _, chunkCreated = ms.appendFloatHistogram(s.t, s.fh, 0, appendChunkOpts) + // TODO(krajorama,ywwg): Pass ST when available in WBL. + _, chunkCreated = ms.appendFloatHistogram(wp.storeST, 0, s.t, s.fh, 0, appendChunkOpts) } if newlyStale { h.numStaleSeries.Inc() @@ -779,8 +785,9 @@ func (h *Head) loadWBL(r *wlog.Reader, syms *labels.SymbolTable, multiRef map[ch }() wg.Add(concurrency) + storeST := h.opts.EnableSTStorage.Load() for i := range concurrency { - processors[i].setup() + processors[i].setup(storeST) go func(wp *wblSubsetProcessor) { missingSeries, unknownSamples, unknownHistograms := wp.processWBLSamples(h) @@ -1025,6 +1032,7 @@ type wblSubsetProcessor struct { input chan wblSubsetProcessorInputItem output chan []record.RefSample histogramsOutput chan []histogramRecord + storeST bool } type wblSubsetProcessorInputItem struct { @@ -1033,10 +1041,11 @@ type wblSubsetProcessorInputItem struct { histogramSamples []histogramRecord } -func (wp *wblSubsetProcessor) setup() { +func (wp *wblSubsetProcessor) setup(storeST bool) { wp.output = make(chan []record.RefSample, 300) wp.histogramsOutput = make(chan []histogramRecord, 300) wp.input = make(chan wblSubsetProcessorInputItem, 300) + wp.storeST = storeST } func (wp *wblSubsetProcessor) closeAndDrain() { @@ -1096,7 +1105,8 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (map[chunks.HeadSeriesR missingSeries[s.Ref] = struct{}{} continue } - ok, chunkCreated, _ := ms.insert(s.T, s.V, nil, nil, h.chunkDiskMapper, oooCapMax, h.logger) + // TODO(krajorama,ywwg): Pass ST when available in WBL. + ok, chunkCreated, _ := ms.insert(wp.storeST, 0, s.T, s.V, nil, nil, h.chunkDiskMapper, oooCapMax, h.logger) if chunkCreated { h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() @@ -1124,9 +1134,11 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (map[chunks.HeadSeriesR var chunkCreated bool var ok bool if s.h != nil { - ok, chunkCreated, _ = ms.insert(s.t, 0, s.h, nil, h.chunkDiskMapper, oooCapMax, h.logger) + // TODO(krajorama,ywwg): Pass ST when available in WBL. + ok, chunkCreated, _ = ms.insert(wp.storeST, 0, s.t, 0, s.h, nil, h.chunkDiskMapper, oooCapMax, h.logger) } else { - ok, chunkCreated, _ = ms.insert(s.t, 0, nil, s.fh, h.chunkDiskMapper, oooCapMax, h.logger) + // TODO(krajorama,ywwg): Pass ST when available in WBL. + ok, chunkCreated, _ = ms.insert(wp.storeST, 0, s.t, 0, nil, s.fh, h.chunkDiskMapper, oooCapMax, h.logger) } if chunkCreated { h.metrics.chunksCreated.Inc() diff --git a/tsdb/ooo_head.go b/tsdb/ooo_head.go index f9746c4c61..5a8252144b 100644 --- a/tsdb/ooo_head.go +++ b/tsdb/ooo_head.go @@ -34,14 +34,13 @@ func NewOOOChunk() *OOOChunk { // Insert inserts the sample such that order is maintained. // Returns false if insert was not possible due to the same timestamp already existing. -func (o *OOOChunk) Insert(t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram) bool { +func (o *OOOChunk) Insert(st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram) bool { // Although out-of-order samples can be out-of-order amongst themselves, we // are opinionated and expect them to be usually in-order meaning we could // try to append at the end first if the new timestamp is higher than the // last known timestamp. if len(o.samples) == 0 || t > o.samples[len(o.samples)-1].t { - // TODO(krajorama): pass ST. - o.samples = append(o.samples, sample{0, t, v, h, fh}) + o.samples = append(o.samples, sample{st, t, v, h, fh}) return true } @@ -50,8 +49,7 @@ func (o *OOOChunk) Insert(t int64, v float64, h *histogram.Histogram, fh *histog if i >= len(o.samples) { // none found. append it at the end - // TODO(krajorama): pass ST. - o.samples = append(o.samples, sample{0, t, v, h, fh}) + o.samples = append(o.samples, sample{st, t, v, h, fh}) return true } @@ -63,8 +61,7 @@ func (o *OOOChunk) Insert(t int64, v float64, h *histogram.Histogram, fh *histog // Expand length by 1 to make room. use a zero sample, we will overwrite it anyway. o.samples = append(o.samples, sample{}) copy(o.samples[i+1:], o.samples[i:]) - // TODO(krajorama): pass ST. - o.samples[i] = sample{0, t, v, h, fh} + o.samples[i] = sample{st, t, v, h, fh} return true } @@ -76,7 +73,7 @@ func (o *OOOChunk) NumSamples() int { // ToEncodedChunks returns chunks with the samples in the OOOChunk. // //nolint:revive -func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error) { +func (o *OOOChunk) ToEncodedChunks(storeST bool, mint, maxt int64) (chks []memChunk, err error) { if len(o.samples) == 0 { return nil, nil } @@ -97,10 +94,17 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error break } encoding := chunkenc.EncXOR - if s.h != nil { + switch { + case s.h != nil: + // TODO(krajorama): use ST capable histogram chunk. encoding = chunkenc.EncHistogram - } else if s.fh != nil { + case s.fh != nil: + // TODO(krajorama): use ST capable float histogram chunk. encoding = chunkenc.EncFloatHistogram + default: + if storeST { + encoding = chunkenc.EncXOROptST + } } // prevApp is the appender for the previous sample. @@ -111,15 +115,11 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error chks = append(chks, memChunk{chunk, cmint, cmaxt, nil}) } cmint = s.t - switch encoding { - case chunkenc.EncXOR: - chunk = chunkenc.NewXORChunk() - case chunkenc.EncHistogram: - chunk = chunkenc.NewHistogramChunk() - case chunkenc.EncFloatHistogram: - chunk = chunkenc.NewFloatHistogramChunk() - default: - chunk = chunkenc.NewXORChunk() + chunk, err = chunkenc.NewEmptyChunk(encoding, storeST) + if err != nil { + // This should never happen. No point using a default type as + // calling the wrong append function would panic. + return chks, err } app, err = chunk.Appender() if err != nil { @@ -127,18 +127,17 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error } } switch encoding { - case chunkenc.EncXOR: - // TODO(krajorama): pass ST. - app.Append(0, s.t, s.f) + case chunkenc.EncXOR, chunkenc.EncXOROptST: + app.Append(s.st, s.t, s.f) case chunkenc.EncHistogram: + // TODO(krajorama): handle ST capable histogram chunk. // Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway. prevHApp, _ := prevApp.(*chunkenc.HistogramAppender) var ( newChunk chunkenc.Chunk recoded bool ) - // TODO(krajorama): pass ST. - newChunk, recoded, app, _ = app.AppendHistogram(prevHApp, 0, s.t, s.h, false) + newChunk, recoded, app, _ = app.AppendHistogram(prevHApp, s.st, s.t, s.h, false) if newChunk != nil { // A new chunk was allocated. if !recoded { chks = append(chks, memChunk{chunk, cmint, cmaxt, nil}) @@ -147,14 +146,14 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error chunk = newChunk } case chunkenc.EncFloatHistogram: + // TODO(krajorama): handle ST capable float histogram chunk. // Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway. prevHApp, _ := prevApp.(*chunkenc.FloatHistogramAppender) var ( newChunk chunkenc.Chunk recoded bool ) - // TODO(krajorama): pass ST. - newChunk, recoded, app, _ = app.AppendFloatHistogram(prevHApp, 0, s.t, s.fh, false) + newChunk, recoded, app, _ = app.AppendFloatHistogram(prevHApp, s.st, s.t, s.fh, false) if newChunk != nil { // A new chunk was allocated. if !recoded { chks = append(chks, memChunk{chunk, cmint, cmaxt, nil}) diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index 5d2347c2d7..b83f101642 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -77,7 +77,7 @@ func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.S *chks = (*chks)[:0] if s.ooo != nil { - return getOOOSeriesChunks(s, oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, oh.inoMint, chks) + return getOOOSeriesChunks(s, oh.head.opts.EnableSTStorage.Load(), oh.mint, oh.maxt, oh.lastGarbageCollectedMmapRef, 0, true, oh.inoMint, chks) } *chks = appendSeriesChunks(s, oh.inoMint, oh.maxt, *chks) return nil @@ -88,7 +88,7 @@ func (oh *HeadAndOOOIndexReader) Series(ref storage.SeriesRef, builder *labels.S // // maxMmapRef tells upto what max m-map chunk that we can consider. If it is non-0, then // the oooHeadChunk will not be considered. -func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, includeInOrder bool, inoMint int64, chks *[]chunks.Meta) error { +func getOOOSeriesChunks(s *memSeries, storeST bool, mint, maxt int64, lastGarbageCollectedMmapRef, maxMmapRef chunks.ChunkDiskMapperRef, includeInOrder bool, inoMint int64, chks *[]chunks.Meta) error { tmpChks := make([]chunks.Meta, 0, len(s.ooo.oooMmappedChunks)) addChunk := func(minT, maxT int64, ref chunks.ChunkRef, chunk chunkenc.Chunk) { @@ -106,7 +106,7 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap if c.OverlapsClosedInterval(mint, maxt) && maxMmapRef == 0 { ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.ooo.oooMmappedChunks)))) if len(c.chunk.samples) > 0 { // Empty samples happens in tests, at least. - chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(c.minTime, c.maxTime) + chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(storeST, c.minTime, c.maxTime) if err != nil { handleChunkWriteError(err) return nil @@ -347,7 +347,7 @@ func NewOOOCompactionHead(ctx context.Context, head *Head) (*OOOCompactionHead, } var lastMmapRef chunks.ChunkDiskMapperRef - mmapRefs := ms.mmapCurrentOOOHeadChunk(head.chunkDiskMapper, head.logger) + mmapRefs := ms.mmapCurrentOOOHeadChunk(head.opts.EnableSTStorage.Load(), head.chunkDiskMapper, head.logger) if len(mmapRefs) == 0 && len(ms.ooo.oooMmappedChunks) > 0 { // Nothing was m-mapped. So take the mmapRef from the existing slice if it exists. mmapRefs = []chunks.ChunkDiskMapperRef{ms.ooo.oooMmappedChunks[len(ms.ooo.oooMmappedChunks)-1].ref} @@ -481,7 +481,7 @@ func (ir *OOOCompactionHeadIndexReader) Series(ref storage.SeriesRef, builder *l return nil } - return getOOOSeriesChunks(s, ir.ch.mint, ir.ch.maxt, 0, ir.ch.lastMmapRef, false, 0, chks) + return getOOOSeriesChunks(s, ir.ch.head.opts.EnableSTStorage.Load(), ir.ch.mint, ir.ch.maxt, 0, ir.ch.lastMmapRef, false, 0, chks) } func (*OOOCompactionHeadIndexReader) SortedLabelValues(_ context.Context, _ string, _ *storage.LabelHints, _ ...*labels.Matcher) ([]string, error) { diff --git a/tsdb/ooo_head_test.go b/tsdb/ooo_head_test.go index 99cd357a30..f15382229b 100644 --- a/tsdb/ooo_head_test.go +++ b/tsdb/ooo_head_test.go @@ -85,7 +85,7 @@ func testOOOInsert(t *testing.T, chunk.samples = make([]sample, numPreExisting) chunk.samples = makeEvenSampleSlice(numPreExisting, sampleFunc) newSample := sampleFunc(valOdd(insertPos)) - chunk.Insert(newSample.t, newSample.f, newSample.h, newSample.fh) + chunk.Insert(newSample.st, newSample.t, newSample.f, newSample.h, newSample.fh) var expSamples []sample // Our expected new samples slice, will be first the original samples. @@ -145,7 +145,7 @@ func testOOOInsertDuplicate(t *testing.T, dupSample := chunk.samples[dupPos] dupSample.f = 0.123 - ok := chunk.Insert(dupSample.t, dupSample.f, dupSample.h, dupSample.fh) + ok := chunk.Insert(dupSample.st, dupSample.t, dupSample.f, dupSample.h, dupSample.fh) expSamples := makeEvenSampleSlice(num, sampleFunc) // We expect no change. require.False(t, ok) @@ -252,17 +252,17 @@ func TestOOOChunks_ToEncodedChunks(t *testing.T) { for _, s := range tc.samples { switch s.Type() { case chunkenc.ValFloat: - oooChunk.Insert(s.t, s.f, nil, nil) + oooChunk.Insert(s.st, s.t, s.f, nil, nil) case chunkenc.ValHistogram: - oooChunk.Insert(s.t, 0, s.h.Copy(), nil) + oooChunk.Insert(s.st, s.t, 0, s.h.Copy(), nil) case chunkenc.ValFloatHistogram: - oooChunk.Insert(s.t, 0, nil, s.fh.Copy()) + oooChunk.Insert(s.st, s.t, 0, nil, s.fh.Copy()) default: t.Fatalf("unexpected sample type %d", s.Type()) } } - chunks, err := oooChunk.ToEncodedChunks(math.MinInt64, math.MaxInt64) + chunks, err := oooChunk.ToEncodedChunks(false, math.MinInt64, math.MaxInt64) require.NoError(t, err) require.Len(t, chunks, len(tc.expectedChunks), "number of chunks") sampleIndex := 0