mirror of
https://github.com/prometheus/prometheus.git
synced 2026-05-28 04:02:21 -04:00
feat(tsdb): query of overlapping chunks with ST by source encoding
When querying chunks from storage, take into account the source encoding instead of losing ST. Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
parent
e40f988f5c
commit
7c295dbece
15 changed files with 124 additions and 10 deletions
|
|
@ -213,6 +213,10 @@ type histogramIterator struct {
|
|||
histograms []*histogram.Histogram
|
||||
}
|
||||
|
||||
func (*histogramIterator) Encoding() chunkenc.Encoding {
|
||||
return chunkenc.EncFloatHistogram
|
||||
}
|
||||
|
||||
func (h *histogramIterator) Next() chunkenc.ValueType {
|
||||
h.i++
|
||||
if h.i < len(h.histograms) {
|
||||
|
|
|
|||
|
|
@ -442,6 +442,13 @@ func newStorageSeriesIterator(series Series) *storageSeriesIterator {
|
|||
}
|
||||
}
|
||||
|
||||
func (ssi *storageSeriesIterator) Encoding() chunkenc.Encoding {
|
||||
if ssi.currH != nil {
|
||||
return chunkenc.EncFloatHistogram
|
||||
}
|
||||
return chunkenc.EncXOR
|
||||
}
|
||||
|
||||
func (ssi *storageSeriesIterator) reset(series Series) {
|
||||
ssi.floats = series.Floats
|
||||
ssi.histograms = series.Histograms
|
||||
|
|
|
|||
|
|
@ -416,6 +416,7 @@ type mockSeriesIterator struct {
|
|||
err func() error
|
||||
}
|
||||
|
||||
func (*mockSeriesIterator) Encoding() chunkenc.Encoding { return chunkenc.EncXOR }
|
||||
func (m *mockSeriesIterator) Seek(t int64) chunkenc.ValueType { return m.seek(t) }
|
||||
func (m *mockSeriesIterator) At() (int64, float64) { return m.at() }
|
||||
func (m *mockSeriesIterator) Next() chunkenc.ValueType { return m.next() }
|
||||
|
|
@ -467,6 +468,10 @@ func (*fakeSeriesIterator) AtST() int64 {
|
|||
return 0 // No start timestamps in this fake iterator.
|
||||
}
|
||||
|
||||
func (*fakeSeriesIterator) Encoding() chunkenc.Encoding {
|
||||
return chunkenc.EncXOR
|
||||
}
|
||||
|
||||
func (it *fakeSeriesIterator) Next() chunkenc.ValueType {
|
||||
it.idx++
|
||||
if it.idx >= it.nsamples {
|
||||
|
|
|
|||
|
|
@ -551,6 +551,13 @@ func (c *chainSampleIterator) Seek(t int64) chunkenc.ValueType {
|
|||
return chunkenc.ValNone
|
||||
}
|
||||
|
||||
func (c *chainSampleIterator) Encoding() chunkenc.Encoding {
|
||||
if c.curr == nil {
|
||||
panic("chainSampleIterator.At called before first .Next or after .Next returned false.")
|
||||
}
|
||||
return c.curr.Encoding()
|
||||
}
|
||||
|
||||
func (c *chainSampleIterator) At() (t int64, v float64) {
|
||||
if c.curr == nil {
|
||||
panic("chainSampleIterator.At called before first .Next or after .Next returned false.")
|
||||
|
|
|
|||
|
|
@ -1692,6 +1692,10 @@ type errIterator struct {
|
|||
err error
|
||||
}
|
||||
|
||||
func (errIterator) Encoding() chunkenc.Encoding {
|
||||
return chunkenc.EncNone
|
||||
}
|
||||
|
||||
func (errIterator) Next() chunkenc.ValueType {
|
||||
return chunkenc.ValNone
|
||||
}
|
||||
|
|
|
|||
|
|
@ -564,6 +564,22 @@ func (c *concreteSeriesIterator) AtT() int64 {
|
|||
return c.series.floats[c.floatsCur].Timestamp
|
||||
}
|
||||
|
||||
// TODO(krajorama): implement Encoding() with ST. Maybe. concreteSeriesIterator
|
||||
// is used for turning query results into an iterable, but query results do not
|
||||
// have ST.
|
||||
func (c *concreteSeriesIterator) Encoding() chunkenc.Encoding {
|
||||
switch c.curValType {
|
||||
case chunkenc.ValFloat:
|
||||
return chunkenc.EncXOR
|
||||
case chunkenc.ValHistogram:
|
||||
return chunkenc.EncHistogram
|
||||
case chunkenc.ValFloatHistogram:
|
||||
return chunkenc.EncFloatHistogram
|
||||
default:
|
||||
return chunkenc.EncNone
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(krajorama): implement AtST. Maybe. concreteSeriesIterator is used
|
||||
// for turning query results into an iterable, but query results do not have ST.
|
||||
func (*concreteSeriesIterator) AtST() int64 {
|
||||
|
|
@ -822,6 +838,10 @@ func (it *chunkedSeriesIterator) reset(chunks []prompb.Chunk, mint, maxt int64)
|
|||
}
|
||||
}
|
||||
|
||||
func (it *chunkedSeriesIterator) Encoding() chunkenc.Encoding {
|
||||
return it.cur.Encoding()
|
||||
}
|
||||
|
||||
func (it *chunkedSeriesIterator) At() (ts int64, v float64) {
|
||||
return it.cur.At()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -113,6 +113,15 @@ func NewListSeriesIterator(samples Samples) chunkenc.Iterator {
|
|||
return &listSeriesIterator{samples: samples, idx: -1}
|
||||
}
|
||||
|
||||
func (it *listSeriesIterator) Encoding() chunkenc.Encoding {
|
||||
s := it.samples.Get(it.idx)
|
||||
encoding := s.Type().ChunkEncodingWithST(s.ST())
|
||||
if encoding == chunkenc.EncNone {
|
||||
panic(fmt.Sprintf("unknown sample type %s", s.Type().String()))
|
||||
}
|
||||
return encoding
|
||||
}
|
||||
|
||||
func (it *listSeriesIterator) Reset(samples Samples) {
|
||||
it.samples = samples
|
||||
it.idx = -1
|
||||
|
|
|
|||
|
|
@ -160,6 +160,11 @@ type Iterator interface {
|
|||
// Returns 0 if the start timestamp is not implemented or not set.
|
||||
// Before the iterator has advanced, the behaviour is unspecified.
|
||||
AtST() int64
|
||||
// Encoding returns what encoding to use for storing the current sample.
|
||||
// Only call this as last resort if the encoding is really needed, and
|
||||
// the current chunk isn't accessible otherwise.
|
||||
// Before the iterator has advanced, the behaviour is unspecified.
|
||||
Encoding() Encoding
|
||||
// Err returns the current error. It should be used only after the
|
||||
// iterator is exhausted, i.e. `Next` or `Seek` have returned ValNone.
|
||||
Err() error
|
||||
|
|
@ -258,6 +263,13 @@ type mockSeriesIterator struct {
|
|||
currIndex int
|
||||
}
|
||||
|
||||
func (it *mockSeriesIterator) Encoding() Encoding {
|
||||
if it.AtST() != 0 {
|
||||
return EncXOROptST
|
||||
}
|
||||
return EncXOR
|
||||
}
|
||||
|
||||
func (*mockSeriesIterator) Seek(int64) ValueType { return ValNone }
|
||||
|
||||
func (it *mockSeriesIterator) At() (int64, float64) {
|
||||
|
|
@ -300,6 +312,7 @@ func NewNopIterator() Iterator {
|
|||
|
||||
type nopIterator struct{}
|
||||
|
||||
func (nopIterator) Encoding() Encoding { return EncNone }
|
||||
func (nopIterator) Next() ValueType { return ValNone }
|
||||
func (nopIterator) Seek(int64) ValueType { return ValNone }
|
||||
func (nopIterator) At() (int64, float64) { return math.MinInt64, 0 }
|
||||
|
|
|
|||
|
|
@ -839,6 +839,10 @@ type floatHistogramIterator struct {
|
|||
atFloatHistogramCalled bool
|
||||
}
|
||||
|
||||
func (*floatHistogramIterator) Encoding() Encoding {
|
||||
return EncFloatHistogram
|
||||
}
|
||||
|
||||
func (it *floatHistogramIterator) Seek(t int64) ValueType {
|
||||
if it.err != nil {
|
||||
return ValNone
|
||||
|
|
|
|||
|
|
@ -898,6 +898,10 @@ type histogramIterator struct {
|
|||
err error
|
||||
}
|
||||
|
||||
func (*histogramIterator) Encoding() Encoding {
|
||||
return EncHistogram
|
||||
}
|
||||
|
||||
func (it *histogramIterator) Seek(t int64) ValueType {
|
||||
if it.err != nil {
|
||||
return ValNone
|
||||
|
|
|
|||
|
|
@ -248,6 +248,10 @@ type xorIterator struct {
|
|||
err error
|
||||
}
|
||||
|
||||
func (*xorIterator) Encoding() Encoding {
|
||||
return EncXOR
|
||||
}
|
||||
|
||||
func (it *xorIterator) Seek(t int64) ValueType {
|
||||
if it.err != nil {
|
||||
return ValNone
|
||||
|
|
|
|||
|
|
@ -197,6 +197,10 @@ type xorOptSTtIterator struct {
|
|||
err error
|
||||
}
|
||||
|
||||
func (*xorOptSTtIterator) Encoding() Encoding {
|
||||
return EncXOROptST
|
||||
}
|
||||
|
||||
func (it *xorOptSTtIterator) Seek(t int64) ValueType {
|
||||
if it.err != nil {
|
||||
return ValNone
|
||||
|
|
|
|||
|
|
@ -771,6 +771,10 @@ func (p *populateWithDelSeriesIterator) Seek(t int64) chunkenc.ValueType {
|
|||
return chunkenc.ValNone
|
||||
}
|
||||
|
||||
func (p *populateWithDelSeriesIterator) Encoding() chunkenc.Encoding {
|
||||
return p.curr.Encoding()
|
||||
}
|
||||
|
||||
func (p *populateWithDelSeriesIterator) At() (int64, float64) {
|
||||
return p.curr.At()
|
||||
}
|
||||
|
|
@ -887,6 +891,7 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool {
|
|||
)
|
||||
switch valueType {
|
||||
case chunkenc.ValHistogram:
|
||||
// TODO(krajorama): handle ST capable histogram chunks when they are supported.
|
||||
newChunk = chunkenc.NewHistogramChunk()
|
||||
if app, err = newChunk.Appender(); err != nil {
|
||||
break
|
||||
|
|
@ -905,7 +910,11 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool {
|
|||
}
|
||||
}
|
||||
case chunkenc.ValFloat:
|
||||
newChunk = chunkenc.NewXORChunk()
|
||||
if p.currMeta.Chunk.Encoding() == chunkenc.EncXOROptST {
|
||||
newChunk = chunkenc.NewXOROptSTChunk()
|
||||
} else {
|
||||
newChunk = chunkenc.NewXORChunk()
|
||||
}
|
||||
if app, err = newChunk.Appender(); err != nil {
|
||||
break
|
||||
}
|
||||
|
|
@ -920,6 +929,7 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool {
|
|||
app.Append(st, t, v)
|
||||
}
|
||||
case chunkenc.ValFloatHistogram:
|
||||
// TODO(krajorama): handle ST capable float histogram chunks when they are supported.
|
||||
newChunk = chunkenc.NewFloatHistogramChunk()
|
||||
if app, err = newChunk.Appender(); err != nil {
|
||||
break
|
||||
|
|
@ -958,7 +968,6 @@ func (p *populateWithDelChunkSeriesIterator) populateCurrForSingleChunk() bool {
|
|||
// populateChunksFromIterable reads the samples from currDelIter to create
|
||||
// chunks for chunksFromIterable. It also sets p.currMetaWithChunk to the first
|
||||
// chunk.
|
||||
// TODO(krajorama): test ST when chunks support it.
|
||||
func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool {
|
||||
p.chunksFromIterable = p.chunksFromIterable[:0]
|
||||
p.chunksFromIterableIdx = -1
|
||||
|
|
@ -982,25 +991,30 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool {
|
|||
|
||||
app chunkenc.Appender
|
||||
|
||||
newChunk chunkenc.Chunk
|
||||
recoded bool
|
||||
|
||||
err error
|
||||
)
|
||||
|
||||
prevValueType := chunkenc.ValNone
|
||||
prevEncoding := chunkenc.EncNone
|
||||
|
||||
for currentValueType := firstValueType; currentValueType != chunkenc.ValNone; currentValueType = p.currDelIter.Next() {
|
||||
var (
|
||||
newChunk chunkenc.Chunk
|
||||
recoded bool
|
||||
)
|
||||
// Check if the encoding has changed (i.e. we need to create a new
|
||||
// chunk as chunks can't have multiple encoding types).
|
||||
// For the first sample, the following condition will always be true as
|
||||
// ValNone != ValFloat | ValHistogram | ValFloatHistogram.
|
||||
if currentValueType != prevValueType {
|
||||
encoding := p.currDelIter.Encoding()
|
||||
if currentValueType != prevValueType || (encoding != prevEncoding) {
|
||||
if prevValueType != chunkenc.ValNone {
|
||||
p.chunksFromIterable = append(p.chunksFromIterable, chunks.Meta{Chunk: currentChunk, MinTime: cmint, MaxTime: cmaxt})
|
||||
}
|
||||
cmint = p.currDelIter.AtT()
|
||||
if currentChunk, err = currentValueType.NewChunk(); err != nil {
|
||||
// Note: we're passing false for storeST, because we set the
|
||||
// encoding explicitly.
|
||||
if currentChunk, err = chunkenc.NewEmptyChunk(encoding, false); err != nil {
|
||||
break
|
||||
}
|
||||
if app, err = currentChunk.Appender(); err != nil {
|
||||
|
|
@ -1008,19 +1022,18 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool {
|
|||
}
|
||||
}
|
||||
|
||||
st = p.currDelIter.AtST()
|
||||
switch currentValueType {
|
||||
case chunkenc.ValFloat:
|
||||
{
|
||||
var v float64
|
||||
t, v = p.currDelIter.At()
|
||||
st = p.currDelIter.AtST()
|
||||
app.Append(st, t, v)
|
||||
}
|
||||
case chunkenc.ValHistogram:
|
||||
{
|
||||
var v *histogram.Histogram
|
||||
t, v = p.currDelIter.AtHistogram(nil)
|
||||
st = p.currDelIter.AtST()
|
||||
// No need to set prevApp as AppendHistogram will set the
|
||||
// counter reset header for the appender that's returned.
|
||||
newChunk, recoded, app, err = app.AppendHistogram(nil, st, t, v, false)
|
||||
|
|
@ -1029,7 +1042,6 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool {
|
|||
{
|
||||
var v *histogram.FloatHistogram
|
||||
t, v = p.currDelIter.AtFloatHistogram(nil)
|
||||
st = p.currDelIter.AtST()
|
||||
// No need to set prevApp as AppendHistogram will set the
|
||||
// counter reset header for the appender that's returned.
|
||||
newChunk, recoded, app, err = app.AppendFloatHistogram(nil, st, t, v, false)
|
||||
|
|
@ -1050,6 +1062,7 @@ func (p *populateWithDelChunkSeriesIterator) populateChunksFromIterable() bool {
|
|||
|
||||
cmaxt = t
|
||||
prevValueType = currentValueType
|
||||
prevEncoding = encoding
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
|
|
@ -1196,6 +1209,10 @@ type DeletedIterator struct {
|
|||
Intervals tombstones.Intervals
|
||||
}
|
||||
|
||||
func (it *DeletedIterator) Encoding() chunkenc.Encoding {
|
||||
return it.Iter.Encoding()
|
||||
}
|
||||
|
||||
func (it *DeletedIterator) At() (int64, float64) {
|
||||
return it.Iter.At()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -764,6 +764,10 @@ type mockSampleIterator struct {
|
|||
idx int
|
||||
}
|
||||
|
||||
func (it *mockSampleIterator) Encoding() chunkenc.Encoding {
|
||||
return it.s[it.idx].Type().ChunkEncodingWithST(it.s[it.idx].ST())
|
||||
}
|
||||
|
||||
func (it *mockSampleIterator) Seek(t int64) chunkenc.ValueType {
|
||||
for ; it.idx < len(it.s); it.idx++ {
|
||||
if it.idx != -1 && it.s[it.idx].T() >= t {
|
||||
|
|
|
|||
|
|
@ -245,6 +245,10 @@ type FakeSeriesIterator struct {
|
|||
idx int
|
||||
}
|
||||
|
||||
func (*FakeSeriesIterator) Encoding() chunkenc.Encoding {
|
||||
return chunkenc.EncXOR
|
||||
}
|
||||
|
||||
func (f *FakeSeriesIterator) Next() chunkenc.ValueType {
|
||||
f.idx++
|
||||
if f.idx < len(f.samples) {
|
||||
|
|
@ -308,6 +312,10 @@ type FakeHistogramSeriesIterator struct {
|
|||
idx int
|
||||
}
|
||||
|
||||
func (*FakeHistogramSeriesIterator) Encoding() chunkenc.Encoding {
|
||||
return chunkenc.EncFloatHistogram
|
||||
}
|
||||
|
||||
func (f *FakeHistogramSeriesIterator) Next() chunkenc.ValueType {
|
||||
f.idx++
|
||||
if f.idx < len(f.histograms) {
|
||||
|
|
|
|||
Loading…
Reference in a new issue