mirror of
https://github.com/prometheus/prometheus.git
synced 2026-06-09 00:22:19 -04:00
perf(tsdb): do not recode single non-overlapping chunks
If an OOO chunk is non-overlapping, do not recode, only clear the counter reset header. Except for XOR chunks, they don't care. Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
parent
e9ba0c1047
commit
0a05e1dbd6
3 changed files with 98 additions and 9 deletions
|
|
@ -6169,7 +6169,7 @@ func testOOOInterleavedImplicitCounterResets(t *testing.T, name string, scenario
|
|||
},
|
||||
expectedChunks: []expectedChunk{
|
||||
{histogram.UnknownCounterReset, 2}, // I1+O2.
|
||||
{histogram.CounterReset, 2}, // O2.I2.
|
||||
{histogram.UnknownCounterReset, 2}, // O2.I2.
|
||||
},
|
||||
},
|
||||
"counter reset in OOO mmapped chunk cleared by in-memory ooo chunk": {
|
||||
|
|
@ -6196,8 +6196,9 @@ func testOOOInterleavedImplicitCounterResets(t *testing.T, name string, scenario
|
|||
},
|
||||
expectedChunks: []expectedChunk{
|
||||
{histogram.UnknownCounterReset, 3}, // MO1.
|
||||
{histogram.CounterReset, 2}, // O1+MO2.
|
||||
{histogram.CounterReset, 3}, // MO3+I1.
|
||||
{histogram.UnknownCounterReset, 1}, // O1.
|
||||
{histogram.UnknownCounterReset, 1}, // MO2.
|
||||
{histogram.UnknownCounterReset, 3}, // MO3+I1.
|
||||
},
|
||||
},
|
||||
"counter reset in OOO mmapped chunk cleared by another OOO mmaped chunk": {
|
||||
|
|
@ -6224,7 +6225,9 @@ func testOOOInterleavedImplicitCounterResets(t *testing.T, name string, scenario
|
|||
},
|
||||
expectedChunks: []expectedChunk{
|
||||
{histogram.UnknownCounterReset, 1}, // MO1.
|
||||
{histogram.CounterReset, 7}, // MO3+MO2+O1+I1.
|
||||
{histogram.UnknownCounterReset, 3}, // MO3
|
||||
{histogram.UnknownCounterReset, 2}, // MO2
|
||||
{histogram.UnknownCounterReset, 2}, // O1+I1.
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import (
|
|||
|
||||
"github.com/oklog/ulid"
|
||||
|
||||
"github.com/prometheus/prometheus/model/histogram"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
|
|
@ -146,15 +147,21 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap
|
|||
// In the example 5 overlaps with 7 and 6 overlaps with 8 so we will return
|
||||
// [5,7], [6,8].
|
||||
toBeMerged := tmpChks[0]
|
||||
prevIsOOO := false
|
||||
for i, c := range tmpChks[1:] {
|
||||
prevIsOOO := isOOOChunkID(chunks.HeadChunkID(toBeMerged.Ref))
|
||||
if prevIsOOO {
|
||||
toBeMerged.Chunk = &multiMeta{metas: []chunks.Meta{toBeMerged}}
|
||||
}
|
||||
for _, c := range tmpChks[1:] {
|
||||
currIsOOO := isOOOChunkID(chunks.HeadChunkID(c.Ref))
|
||||
if c.MinTime > toBeMerged.MaxTime && (i == 0 || prevIsOOO == currIsOOO) && !currIsOOO {
|
||||
if prevIsOOO == currIsOOO && c.MinTime > toBeMerged.MaxTime {
|
||||
// This chunk doesn't overlap and we are not switching between in-order
|
||||
// and out-of-order chunks. Send current toBeMerged to output and start
|
||||
// a new one.
|
||||
// and out of order. Send current toBeMerged to output and start a new
|
||||
// one.
|
||||
*chks = append(*chks, toBeMerged)
|
||||
toBeMerged = c
|
||||
if currIsOOO {
|
||||
toBeMerged.Chunk = &multiMeta{metas: []chunks.Meta{toBeMerged}}
|
||||
}
|
||||
} else {
|
||||
// Merge this chunk with existing toBeMerged.
|
||||
if mm, ok := toBeMerged.Chunk.(*multiMeta); ok {
|
||||
|
|
@ -179,6 +186,60 @@ type multiMeta struct {
|
|||
metas []chunks.Meta
|
||||
}
|
||||
|
||||
// Wrapper to return a chunk where the iterator clears
|
||||
// counter resets.
|
||||
type hintClearedChunk struct {
|
||||
chunkenc.Chunk
|
||||
}
|
||||
|
||||
func (h hintClearedChunk) Iterator(it chunkenc.Iterator) chunkenc.Iterator {
|
||||
if itt, ok := it.(hintClearedIterator); ok {
|
||||
itt.it = h.Chunk.Iterator(itt.it)
|
||||
return itt
|
||||
}
|
||||
return &hintClearedIterator{it: h.Chunk.Iterator(nil)}
|
||||
}
|
||||
|
||||
type hintClearedIterator struct {
|
||||
it chunkenc.Iterator
|
||||
}
|
||||
|
||||
func (h hintClearedIterator) Next() chunkenc.ValueType {
|
||||
return h.it.Next()
|
||||
}
|
||||
|
||||
func (h hintClearedIterator) Seek(t int64) chunkenc.ValueType {
|
||||
return h.it.Seek(t)
|
||||
}
|
||||
|
||||
func (h hintClearedIterator) AtT() int64 {
|
||||
return h.it.AtT()
|
||||
}
|
||||
|
||||
func (h hintClearedIterator) At() (int64, float64) {
|
||||
return h.it.At()
|
||||
}
|
||||
|
||||
func (h hintClearedIterator) AtHistogram(to *histogram.Histogram) (int64, *histogram.Histogram) {
|
||||
t, H := h.it.AtHistogram(to)
|
||||
if H != nil && H.CounterResetHint == histogram.CounterReset {
|
||||
H.CounterResetHint = histogram.UnknownCounterReset
|
||||
}
|
||||
return t, H
|
||||
}
|
||||
|
||||
func (h hintClearedIterator) AtFloatHistogram(to *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
|
||||
t, FH := h.it.AtFloatHistogram(to)
|
||||
if FH != nil && FH.CounterResetHint == histogram.CounterReset {
|
||||
FH.CounterResetHint = histogram.UnknownCounterReset
|
||||
}
|
||||
return t, FH
|
||||
}
|
||||
|
||||
func (h hintClearedIterator) Err() error {
|
||||
return h.it.Err()
|
||||
}
|
||||
|
||||
// LabelValues needs to be overridden from the headIndexReader implementation
|
||||
// so we can return labels within either in-order range or ooo range.
|
||||
func (oh *HeadAndOOOIndexReader) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) {
|
||||
|
|
@ -264,6 +325,27 @@ func (cr *HeadAndOOOChunkReader) chunkOrIterable(meta chunks.Meta, copyLastChunk
|
|||
if !ok { // Complete chunk was supplied.
|
||||
return meta.Chunk, nil, meta.MaxTime, nil
|
||||
}
|
||||
|
||||
if len(mm.metas) == 1 {
|
||||
// Only one chunk in the multiMeta, no need to iterate, but we have to
|
||||
// clear the counter reset hint for histogram chunks.
|
||||
m := mm.metas[0]
|
||||
chk := m.Chunk
|
||||
if chk == nil {
|
||||
_, cid, isOOO := unpackHeadChunkRef(m.Ref)
|
||||
var err error
|
||||
chk, _, err = cr.head.chunkFromSeries(s, cid, isOOO, m.MinTime, m.MaxTime, isoState, copyLastChunk)
|
||||
if err != nil {
|
||||
return nil, nil, 0, fmt.Errorf("invalid head chunk: %w", err)
|
||||
}
|
||||
}
|
||||
if _, ok := chk.(*chunkenc.XORChunk); ok {
|
||||
return chk, nil, m.MaxTime, nil
|
||||
}
|
||||
hcc := &hintClearedChunk{Chunk: chk}
|
||||
return hcc, nil, m.MaxTime, nil
|
||||
}
|
||||
|
||||
// We have a composite meta: construct a composite iterable.
|
||||
mc := &mergedOOOChunks{}
|
||||
for _, m := range mm.metas {
|
||||
|
|
|
|||
|
|
@ -334,6 +334,10 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
|
|||
}
|
||||
chunk = mm
|
||||
}
|
||||
// OOO chunks are always wrapped in multiMeta.
|
||||
if len(e.m) == 0 {
|
||||
chunk = &multiMeta{metas: []chunks.Meta{{Ref: findID(e.c.ID), Chunk: chunk, MinTime: e.c.mint, MaxTime: e.c.maxt}}}
|
||||
}
|
||||
meta := chunks.Meta{
|
||||
Chunk: chunk,
|
||||
MinTime: e.c.mint,
|
||||
|
|
|
|||
Loading…
Reference in a new issue