diff --git a/tsdb/db_test.go b/tsdb/db_test.go index d6e5d39230..fcc5def894 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -6169,7 +6169,7 @@ func testOOOInterleavedImplicitCounterResets(t *testing.T, name string, scenario }, expectedChunks: []expectedChunk{ {histogram.UnknownCounterReset, 2}, // I1+O2. - {histogram.CounterReset, 2}, // O2.I2. + {histogram.UnknownCounterReset, 2}, // O2.I2. }, }, "counter reset in OOO mmapped chunk cleared by in-memory ooo chunk": { @@ -6196,8 +6196,9 @@ func testOOOInterleavedImplicitCounterResets(t *testing.T, name string, scenario }, expectedChunks: []expectedChunk{ {histogram.UnknownCounterReset, 3}, // MO1. - {histogram.CounterReset, 2}, // O1+MO2. - {histogram.CounterReset, 3}, // MO3+I1. + {histogram.UnknownCounterReset, 1}, // O1. + {histogram.UnknownCounterReset, 1}, // MO2. + {histogram.UnknownCounterReset, 3}, // MO3+I1. }, }, "counter reset in OOO mmapped chunk cleared by another OOO mmaped chunk": { @@ -6224,7 +6225,9 @@ func testOOOInterleavedImplicitCounterResets(t *testing.T, name string, scenario }, expectedChunks: []expectedChunk{ {histogram.UnknownCounterReset, 1}, // MO1. - {histogram.CounterReset, 7}, // MO3+MO2+O1+I1. + {histogram.UnknownCounterReset, 3}, // MO3 + {histogram.UnknownCounterReset, 2}, // MO2 + {histogram.UnknownCounterReset, 2}, // O1+I1. }, }, } diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index 0f2a6d14eb..21a1fd79b7 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -22,6 +22,7 @@ import ( "github.com/oklog/ulid" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" @@ -146,15 +147,21 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap // In the example 5 overlaps with 7 and 6 overlaps with 8 so we will return // [5,7], [6,8]. toBeMerged := tmpChks[0] - prevIsOOO := false - for i, c := range tmpChks[1:] { + prevIsOOO := isOOOChunkID(chunks.HeadChunkID(toBeMerged.Ref)) + if prevIsOOO { + toBeMerged.Chunk = &multiMeta{metas: []chunks.Meta{toBeMerged}} + } + for _, c := range tmpChks[1:] { currIsOOO := isOOOChunkID(chunks.HeadChunkID(c.Ref)) - if c.MinTime > toBeMerged.MaxTime && (i == 0 || prevIsOOO == currIsOOO) && !currIsOOO { + if prevIsOOO == currIsOOO && c.MinTime > toBeMerged.MaxTime { // This chunk doesn't overlap and we are not switching between in-order - // and out-of-order chunks. Send current toBeMerged to output and start - // a new one. + // and out of order. Send current toBeMerged to output and start a new + // one. *chks = append(*chks, toBeMerged) toBeMerged = c + if currIsOOO { + toBeMerged.Chunk = &multiMeta{metas: []chunks.Meta{toBeMerged}} + } } else { // Merge this chunk with existing toBeMerged. if mm, ok := toBeMerged.Chunk.(*multiMeta); ok { @@ -179,6 +186,60 @@ type multiMeta struct { metas []chunks.Meta } +// Wrapper to return a chunk where the iterator clears +// counter resets. +type hintClearedChunk struct { + chunkenc.Chunk +} + +func (h hintClearedChunk) Iterator(it chunkenc.Iterator) chunkenc.Iterator { + if itt, ok := it.(hintClearedIterator); ok { + itt.it = h.Chunk.Iterator(itt.it) + return itt + } + return &hintClearedIterator{it: h.Chunk.Iterator(nil)} +} + +type hintClearedIterator struct { + it chunkenc.Iterator +} + +func (h hintClearedIterator) Next() chunkenc.ValueType { + return h.it.Next() +} + +func (h hintClearedIterator) Seek(t int64) chunkenc.ValueType { + return h.it.Seek(t) +} + +func (h hintClearedIterator) AtT() int64 { + return h.it.AtT() +} + +func (h hintClearedIterator) At() (int64, float64) { + return h.it.At() +} + +func (h hintClearedIterator) AtHistogram(to *histogram.Histogram) (int64, *histogram.Histogram) { + t, H := h.it.AtHistogram(to) + if H != nil && H.CounterResetHint == histogram.CounterReset { + H.CounterResetHint = histogram.UnknownCounterReset + } + return t, H +} + +func (h hintClearedIterator) AtFloatHistogram(to *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { + t, FH := h.it.AtFloatHistogram(to) + if FH != nil && FH.CounterResetHint == histogram.CounterReset { + FH.CounterResetHint = histogram.UnknownCounterReset + } + return t, FH +} + +func (h hintClearedIterator) Err() error { + return h.it.Err() +} + // LabelValues needs to be overridden from the headIndexReader implementation // so we can return labels within either in-order range or ooo range. func (oh *HeadAndOOOIndexReader) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) { @@ -264,6 +325,27 @@ func (cr *HeadAndOOOChunkReader) chunkOrIterable(meta chunks.Meta, copyLastChunk if !ok { // Complete chunk was supplied. return meta.Chunk, nil, meta.MaxTime, nil } + + if len(mm.metas) == 1 { + // Only one chunk in the multiMeta, no need to iterate, but we have to + // clear the counter reset hint for histogram chunks. + m := mm.metas[0] + chk := m.Chunk + if chk == nil { + _, cid, isOOO := unpackHeadChunkRef(m.Ref) + var err error + chk, _, err = cr.head.chunkFromSeries(s, cid, isOOO, m.MinTime, m.MaxTime, isoState, copyLastChunk) + if err != nil { + return nil, nil, 0, fmt.Errorf("invalid head chunk: %w", err) + } + } + if _, ok := chk.(*chunkenc.XORChunk); ok { + return chk, nil, m.MaxTime, nil + } + hcc := &hintClearedChunk{Chunk: chk} + return hcc, nil, m.MaxTime, nil + } + // We have a composite meta: construct a composite iterable. mc := &mergedOOOChunks{} for _, m := range mm.metas { diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index 17f551dd7d..cd7ee877cc 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -334,6 +334,10 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { } chunk = mm } + // OOO chunks are always wrapped in multiMeta. + if len(e.m) == 0 { + chunk = &multiMeta{metas: []chunks.Meta{{Ref: findID(e.c.ID), Chunk: chunk, MinTime: e.c.mint, MaxTime: e.c.maxt}}} + } meta := chunks.Meta{ Chunk: chunk, MinTime: e.c.mint,