From 7c295dbececa3797e7d41ac9fd5641c4a19e8f13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gy=C3=B6rgy=20Krajcsovits?= Date: Thu, 5 Feb 2026 08:33:21 +0100 Subject: [PATCH] feat(tsdb): query of overlapping chunks with ST by source encoding MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When querying chunks from storage, take into account the source encoding instead of losing ST. Signed-off-by: György Krajcsovits --- promql/histogram_stats_iterator_test.go | 4 +++ promql/value.go | 7 +++++ storage/buffer_test.go | 5 ++++ storage/merge.go | 7 +++++ storage/merge_test.go | 4 +++ storage/remote/codec.go | 20 +++++++++++++ storage/series.go | 9 ++++++ tsdb/chunkenc/chunk.go | 13 +++++++++ tsdb/chunkenc/float_histogram.go | 4 +++ tsdb/chunkenc/histogram.go | 4 +++ tsdb/chunkenc/xor.go | 4 +++ tsdb/chunkenc/xoroptst.go | 4 +++ tsdb/querier.go | 37 ++++++++++++++++++------- tsdb/querier_test.go | 4 +++ web/api/testhelpers/mocks.go | 8 ++++++ 15 files changed, 124 insertions(+), 10 deletions(-) diff --git a/promql/histogram_stats_iterator_test.go b/promql/histogram_stats_iterator_test.go index d3a76820da..ddaf250c1b 100644 --- a/promql/histogram_stats_iterator_test.go +++ b/promql/histogram_stats_iterator_test.go @@ -213,6 +213,10 @@ type histogramIterator struct { histograms []*histogram.Histogram } +func (*histogramIterator) Encoding() chunkenc.Encoding { + return chunkenc.EncFloatHistogram +} + func (h *histogramIterator) Next() chunkenc.ValueType { h.i++ if h.i < len(h.histograms) { diff --git a/promql/value.go b/promql/value.go index 17afdfc410..499a2ca32b 100644 --- a/promql/value.go +++ b/promql/value.go @@ -442,6 +442,13 @@ func newStorageSeriesIterator(series Series) *storageSeriesIterator { } } +func (ssi *storageSeriesIterator) Encoding() chunkenc.Encoding { + if ssi.currH != nil { + return chunkenc.EncFloatHistogram + } + return chunkenc.EncXOR +} + func (ssi *storageSeriesIterator) reset(series Series) { ssi.floats = series.Floats ssi.histograms = series.Histograms diff --git a/storage/buffer_test.go b/storage/buffer_test.go index 61d1601bc0..9118211d59 100644 --- a/storage/buffer_test.go +++ b/storage/buffer_test.go @@ -416,6 +416,7 @@ type mockSeriesIterator struct { err func() error } +func (*mockSeriesIterator) Encoding() chunkenc.Encoding { return chunkenc.EncXOR } func (m *mockSeriesIterator) Seek(t int64) chunkenc.ValueType { return m.seek(t) } func (m *mockSeriesIterator) At() (int64, float64) { return m.at() } func (m *mockSeriesIterator) Next() chunkenc.ValueType { return m.next() } @@ -467,6 +468,10 @@ func (*fakeSeriesIterator) AtST() int64 { return 0 // No start timestamps in this fake iterator. } +func (*fakeSeriesIterator) Encoding() chunkenc.Encoding { + return chunkenc.EncXOR +} + func (it *fakeSeriesIterator) Next() chunkenc.ValueType { it.idx++ if it.idx >= it.nsamples { diff --git a/storage/merge.go b/storage/merge.go index 76bf0994e0..4666338910 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -551,6 +551,13 @@ func (c *chainSampleIterator) Seek(t int64) chunkenc.ValueType { return chunkenc.ValNone } +func (c *chainSampleIterator) Encoding() chunkenc.Encoding { + if c.curr == nil { + panic("chainSampleIterator.At called before first .Next or after .Next returned false.") + } + return c.curr.Encoding() +} + func (c *chainSampleIterator) At() (t int64, v float64) { if c.curr == nil { panic("chainSampleIterator.At called before first .Next or after .Next returned false.") diff --git a/storage/merge_test.go b/storage/merge_test.go index e42a6a4ce1..04c6580ba4 100644 --- a/storage/merge_test.go +++ b/storage/merge_test.go @@ -1692,6 +1692,10 @@ type errIterator struct { err error } +func (errIterator) Encoding() chunkenc.Encoding { + return chunkenc.EncNone +} + func (errIterator) Next() chunkenc.ValueType { return chunkenc.ValNone } diff --git a/storage/remote/codec.go b/storage/remote/codec.go index c689a51164..f8c759fa6a 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -564,6 +564,22 @@ func (c *concreteSeriesIterator) AtT() int64 { return c.series.floats[c.floatsCur].Timestamp } +// TODO(krajorama): implement Encoding() with ST. Maybe. concreteSeriesIterator +// is used for turning query results into an iterable, but query results do not +// have ST. +func (c *concreteSeriesIterator) Encoding() chunkenc.Encoding { + switch c.curValType { + case chunkenc.ValFloat: + return chunkenc.EncXOR + case chunkenc.ValHistogram: + return chunkenc.EncHistogram + case chunkenc.ValFloatHistogram: + return chunkenc.EncFloatHistogram + default: + return chunkenc.EncNone + } +} + // TODO(krajorama): implement AtST. Maybe. concreteSeriesIterator is used // for turning query results into an iterable, but query results do not have ST. func (*concreteSeriesIterator) AtST() int64 { @@ -822,6 +838,10 @@ func (it *chunkedSeriesIterator) reset(chunks []prompb.Chunk, mint, maxt int64) } } +func (it *chunkedSeriesIterator) Encoding() chunkenc.Encoding { + return it.cur.Encoding() +} + func (it *chunkedSeriesIterator) At() (ts int64, v float64) { return it.cur.At() } diff --git a/storage/series.go b/storage/series.go index 2d7f643826..59f6685793 100644 --- a/storage/series.go +++ b/storage/series.go @@ -113,6 +113,15 @@ func NewListSeriesIterator(samples Samples) chunkenc.Iterator { return &listSeriesIterator{samples: samples, idx: -1} } +func (it *listSeriesIterator) Encoding() chunkenc.Encoding { + s := it.samples.Get(it.idx) + encoding := s.Type().ChunkEncodingWithST(s.ST()) + if encoding == chunkenc.EncNone { + panic(fmt.Sprintf("unknown sample type %s", s.Type().String())) + } + return encoding +} + func (it *listSeriesIterator) Reset(samples Samples) { it.samples = samples it.idx = -1 diff --git a/tsdb/chunkenc/chunk.go b/tsdb/chunkenc/chunk.go index 023356593f..a9f0b1e17f 100644 --- a/tsdb/chunkenc/chunk.go +++ b/tsdb/chunkenc/chunk.go @@ -160,6 +160,11 @@ type Iterator interface { // Returns 0 if the start timestamp is not implemented or not set. // Before the iterator has advanced, the behaviour is unspecified. AtST() int64 + // Encoding returns what encoding to use for storing the current sample. + // Only call this as last resort if the encoding is really needed, and + // the current chunk isn't accessible otherwise. + // Before the iterator has advanced, the behaviour is unspecified. + Encoding() Encoding // Err returns the current error. It should be used only after the // iterator is exhausted, i.e. `Next` or `Seek` have returned ValNone. Err() error @@ -258,6 +263,13 @@ type mockSeriesIterator struct { currIndex int } +func (it *mockSeriesIterator) Encoding() Encoding { + if it.AtST() != 0 { + return EncXOROptST + } + return EncXOR +} + func (*mockSeriesIterator) Seek(int64) ValueType { return ValNone } func (it *mockSeriesIterator) At() (int64, float64) { @@ -300,6 +312,7 @@ func NewNopIterator() Iterator { type nopIterator struct{} +func (nopIterator) Encoding() Encoding { return EncNone } func (nopIterator) Next() ValueType { return ValNone } func (nopIterator) Seek(int64) ValueType { return ValNone } func (nopIterator) At() (int64, float64) { return math.MinInt64, 0 } diff --git a/tsdb/chunkenc/float_histogram.go b/tsdb/chunkenc/float_histogram.go index 6af2fa68e2..28908325c0 100644 --- a/tsdb/chunkenc/float_histogram.go +++ b/tsdb/chunkenc/float_histogram.go @@ -839,6 +839,10 @@ type floatHistogramIterator struct { atFloatHistogramCalled bool } +func (*floatHistogramIterator) Encoding() Encoding { + return EncFloatHistogram +} + func (it *floatHistogramIterator) Seek(t int64) ValueType { if it.err != nil { return ValNone diff --git a/tsdb/chunkenc/histogram.go b/tsdb/chunkenc/histogram.go index 4e77f387d3..27d3318a2b 100644 --- a/tsdb/chunkenc/histogram.go +++ b/tsdb/chunkenc/histogram.go @@ -898,6 +898,10 @@ type histogramIterator struct { err error } +func (*histogramIterator) Encoding() Encoding { + return EncHistogram +} + func (it *histogramIterator) Seek(t int64) ValueType { if it.err != nil { return ValNone diff --git a/tsdb/chunkenc/xor.go b/tsdb/chunkenc/xor.go index 5a9a59dc22..860ed105b1 100644 --- a/tsdb/chunkenc/xor.go +++ b/tsdb/chunkenc/xor.go @@ -248,6 +248,10 @@ type xorIterator struct { err error } +func (*xorIterator) Encoding() Encoding { + return EncXOR +} + func (it *xorIterator) Seek(t int64) ValueType { if it.err != nil { return ValNone diff --git a/tsdb/chunkenc/xoroptst.go b/tsdb/chunkenc/xoroptst.go index b138ddbdf4..90265b4543 100644 --- a/tsdb/chunkenc/xoroptst.go +++ b/tsdb/chunkenc/xoroptst.go @@ -197,6 +197,10 @@ type xorOptSTtIterator struct { err error } +func (*xorOptSTtIterator) Encoding() Encoding { + return EncXOROptST +} + func (it *xorOptSTtIterator) Seek(t int64) ValueType { if it.err != nil { return ValNone diff --git a/tsdb/querier.go b/tsdb/querier.go index ac7a14e1b3..7f2492f846 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -771,6 +771,10 @@ func (p *populateWithDelSeriesIterator) Seek(t int64) chunkenc.ValueType { return chunkenc.ValNone } +func (p *populateWithDelSeriesIterator) Encoding() chunkenc.Encoding { + return p.curr.Encoding() +} + func (p *populateWithDelSeriesIterator) At() (int64, float64) { return p.curr.At() } @@ -887,6 +891,7 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool { ) switch valueType { case chunkenc.ValHistogram: + // TODO(krajorama): handle ST capable histogram chunks when they are supported. newChunk = chunkenc.NewHistogramChunk() if app, err = newChunk.Appender(); err != nil { break @@ -905,7 +910,11 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool { } } case chunkenc.ValFloat: - newChunk = chunkenc.NewXORChunk() + if p.currMeta.Chunk.Encoding() == chunkenc.EncXOROptST { + newChunk = chunkenc.NewXOROptSTChunk() + } else { + newChunk = chunkenc.NewXORChunk() + } if app, err = newChunk.Appender(); err != nil { break } @@ -920,6 +929,7 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool { app.Append(st, t, v) } case chunkenc.ValFloatHistogram: + // TODO(krajorama): handle ST capable float histogram chunks when they are supported. newChunk = chunkenc.NewFloatHistogramChunk() if app, err = newChunk.Appender(); err != nil { break @@ -958,7 +968,6 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool { // populateChunksFromIterable reads the samples from currDelIter to create // chunks for chunksFromIterable. It also sets p.currMetaWithChunk to the first // chunk. -// TODO(krajorama): test ST when chunks support it. func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool { p.chunksFromIterable = p.chunksFromIterable[:0] p.chunksFromIterableIdx = -1 @@ -982,25 +991,30 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool { app chunkenc.Appender - newChunk chunkenc.Chunk - recoded bool - err error ) prevValueType := chunkenc.ValNone + prevEncoding := chunkenc.EncNone for currentValueType := firstValueType; currentValueType != chunkenc.ValNone; currentValueType = p.currDelIter.Next() { + var ( + newChunk chunkenc.Chunk + recoded bool + ) // Check if the encoding has changed (i.e. we need to create a new // chunk as chunks can't have multiple encoding types). // For the first sample, the following condition will always be true as // ValNone != ValFloat | ValHistogram | ValFloatHistogram. - if currentValueType != prevValueType { + encoding := p.currDelIter.Encoding() + if currentValueType != prevValueType || (encoding != prevEncoding) { if prevValueType != chunkenc.ValNone { p.chunksFromIterable = append(p.chunksFromIterable, chunks.Meta{Chunk: currentChunk, MinTime: cmint, MaxTime: cmaxt}) } cmint = p.currDelIter.AtT() - if currentChunk, err = currentValueType.NewChunk(); err != nil { + // Note: we're passing false for storeST, because we set the + // encoding explicitly. + if currentChunk, err = chunkenc.NewEmptyChunk(encoding, false); err != nil { break } if app, err = currentChunk.Appender(); err != nil { @@ -1008,19 +1022,18 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool { } } + st = p.currDelIter.AtST() switch currentValueType { case chunkenc.ValFloat: { var v float64 t, v = p.currDelIter.At() - st = p.currDelIter.AtST() app.Append(st, t, v) } case chunkenc.ValHistogram: { var v *histogram.Histogram t, v = p.currDelIter.AtHistogram(nil) - st = p.currDelIter.AtST() // No need to set prevApp as AppendHistogram will set the // counter reset header for the appender that's returned. newChunk, recoded, app, err = app.AppendHistogram(nil, st, t, v, false) @@ -1029,7 +1042,6 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool { { var v *histogram.FloatHistogram t, v = p.currDelIter.AtFloatHistogram(nil) - st = p.currDelIter.AtST() // No need to set prevApp as AppendHistogram will set the // counter reset header for the appender that's returned. newChunk, recoded, app, err = app.AppendFloatHistogram(nil, st, t, v, false) @@ -1050,6 +1062,7 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool { cmaxt = t prevValueType = currentValueType + prevEncoding = encoding } if err != nil { @@ -1196,6 +1209,10 @@ type DeletedIterator struct { Intervals tombstones.Intervals } +func (it *DeletedIterator) Encoding() chunkenc.Encoding { + return it.Iter.Encoding() +} + func (it *DeletedIterator) At() (int64, float64) { return it.Iter.At() } diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index 4387635959..25a56f8b9b 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -764,6 +764,10 @@ type mockSampleIterator struct { idx int } +func (it *mockSampleIterator) Encoding() chunkenc.Encoding { + return it.s[it.idx].Type().ChunkEncodingWithST(it.s[it.idx].ST()) +} + func (it *mockSampleIterator) Seek(t int64) chunkenc.ValueType { for ; it.idx < len(it.s); it.idx++ { if it.idx != -1 && it.s[it.idx].T() >= t { diff --git a/web/api/testhelpers/mocks.go b/web/api/testhelpers/mocks.go index 527febb727..107728e37e 100644 --- a/web/api/testhelpers/mocks.go +++ b/web/api/testhelpers/mocks.go @@ -245,6 +245,10 @@ type FakeSeriesIterator struct { idx int } +func (*FakeSeriesIterator) Encoding() chunkenc.Encoding { + return chunkenc.EncXOR +} + func (f *FakeSeriesIterator) Next() chunkenc.ValueType { f.idx++ if f.idx < len(f.samples) { @@ -308,6 +312,10 @@ type FakeHistogramSeriesIterator struct { idx int } +func (*FakeHistogramSeriesIterator) Encoding() chunkenc.Encoding { + return chunkenc.EncFloatHistogram +} + func (f *FakeHistogramSeriesIterator) Next() chunkenc.ValueType { f.idx++ if f.idx < len(f.histograms) {