From 8bbb08972509b09581a0fd7c7d373f86c1531c30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gy=C3=B6rgy=20Krajcsovits?= Date: Tue, 29 Oct 2024 08:12:22 +0100 Subject: [PATCH] fix(tsdb): do not optimize for non-overlapping OOO chunks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit During compaction this will trigger populateWithDelChunkSeriesIterator to re-encode the chunks in sequence, creating the correct counter reset hints. Signed-off-by: György Krajcsovits --- tsdb/db_test.go | 23 +++++----- tsdb/ooo_head_read.go | 89 ++------------------------------------ tsdb/ooo_head_read_test.go | 34 +++------------ 3 files changed, 20 insertions(+), 126 deletions(-) diff --git a/tsdb/db_test.go b/tsdb/db_test.go index fcc5def894..c37726a1e2 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -6165,11 +6165,11 @@ func testOOOInterleavedImplicitCounterResets(t *testing.T, name string, scenario {1, 40, histogram.UnknownCounterReset}, // I1. {2, 40, histogram.UnknownCounterReset}, // O1. {3, 10, histogram.UnknownCounterReset}, // O2. Counter reset cleared by iterator change. - {4, 30, histogram.UnknownCounterReset}, // I2. Counter reset cleared on merge. + {4, 30, histogram.UnknownCounterReset}, // I2. Counter reset cleared by iterator change. }, expectedChunks: []expectedChunk{ - {histogram.UnknownCounterReset, 2}, // I1+O2. - {histogram.UnknownCounterReset, 2}, // O2.I2. + {histogram.UnknownCounterReset, 2}, // I1+O2. Recoded due to having OOO chunks. + {histogram.CounterReset, 2}, // O2.I2. Recoded due to having OOO chunks. }, }, "counter reset in OOO mmapped chunk cleared by in-memory ooo chunk": { @@ -6189,16 +6189,15 @@ func testOOOInterleavedImplicitCounterResets(t *testing.T, name string, scenario {2, 20, histogram.NotCounterReset}, // MO1. {3, 30, histogram.NotCounterReset}, // MO1. {4, 10, histogram.UnknownCounterReset}, // O1. Counter reset cleared by iterator change. - {5, 20, histogram.UnknownCounterReset}, // MO2. Counter reset cleared by merge. + {5, 20, histogram.UnknownCounterReset}, // MO2. Counter reset cleared by iterator change. {6, 10, histogram.UnknownCounterReset}, // MO3. Counter reset cleared by iterator change. {7, 20, histogram.NotCounterReset}, // MO3. {8, 30, histogram.UnknownCounterReset}, // I1. }, expectedChunks: []expectedChunk{ - {histogram.UnknownCounterReset, 3}, // MO1. - {histogram.UnknownCounterReset, 1}, // O1. - {histogram.UnknownCounterReset, 1}, // MO2. - {histogram.UnknownCounterReset, 3}, // MO3+I1. + {histogram.UnknownCounterReset, 3}, // MO1. Recoded due to having OOO chunks. + {histogram.CounterReset, 2}, // O1+MO2. Recoded due to having OOO chunks. + {histogram.CounterReset, 3}, // MO3+I1. Recoded due to having OOO chunks. }, }, "counter reset in OOO mmapped chunk cleared by another OOO mmaped chunk": { @@ -6218,16 +6217,14 @@ func testOOOInterleavedImplicitCounterResets(t *testing.T, name string, scenario {2, 10, histogram.UnknownCounterReset}, // MO3. Counter reset cleared by iterator change. {3, 20, histogram.NotCounterReset}, // MO3. {4, 30, histogram.NotCounterReset}, // MO3. - {5, 40, histogram.UnknownCounterReset}, // MO2. Counter reset cleared by merge. + {5, 40, histogram.UnknownCounterReset}, // MO2. Counter reset cleared by iterator change. {6, 50, histogram.NotCounterReset}, // MO2. {7, 60, histogram.UnknownCounterReset}, // O1. {8, 100, histogram.UnknownCounterReset}, // I1. }, expectedChunks: []expectedChunk{ - {histogram.UnknownCounterReset, 1}, // MO1. - {histogram.UnknownCounterReset, 3}, // MO3 - {histogram.UnknownCounterReset, 2}, // MO2 - {histogram.UnknownCounterReset, 2}, // O1+I1. + {histogram.UnknownCounterReset, 1}, // MO1. Recoded due to having OOO chunks. + {histogram.CounterReset, 7}, // MO3+MO2+O1+I1. Recoded due to having OOO chunks. }, }, } diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index 21a1fd79b7..1468ff27f6 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -22,7 +22,6 @@ import ( "github.com/oklog/ulid" - "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" @@ -148,20 +147,14 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap // [5,7], [6,8]. toBeMerged := tmpChks[0] prevIsOOO := isOOOChunkID(chunks.HeadChunkID(toBeMerged.Ref)) - if prevIsOOO { - toBeMerged.Chunk = &multiMeta{metas: []chunks.Meta{toBeMerged}} - } for _, c := range tmpChks[1:] { currIsOOO := isOOOChunkID(chunks.HeadChunkID(c.Ref)) - if prevIsOOO == currIsOOO && c.MinTime > toBeMerged.MaxTime { - // This chunk doesn't overlap and we are not switching between in-order - // and out of order. Send current toBeMerged to output and start a new - // one. + if !prevIsOOO && !currIsOOO && c.MinTime > toBeMerged.MaxTime { + // This in-order chunk doesn't overlap and we are not switching between + // in-order and out of order. Send current toBeMerged to output and start + // a new one. *chks = append(*chks, toBeMerged) toBeMerged = c - if currIsOOO { - toBeMerged.Chunk = &multiMeta{metas: []chunks.Meta{toBeMerged}} - } } else { // Merge this chunk with existing toBeMerged. if mm, ok := toBeMerged.Chunk.(*multiMeta); ok { @@ -186,60 +179,6 @@ type multiMeta struct { metas []chunks.Meta } -// Wrapper to return a chunk where the iterator clears -// counter resets. -type hintClearedChunk struct { - chunkenc.Chunk -} - -func (h hintClearedChunk) Iterator(it chunkenc.Iterator) chunkenc.Iterator { - if itt, ok := it.(hintClearedIterator); ok { - itt.it = h.Chunk.Iterator(itt.it) - return itt - } - return &hintClearedIterator{it: h.Chunk.Iterator(nil)} -} - -type hintClearedIterator struct { - it chunkenc.Iterator -} - -func (h hintClearedIterator) Next() chunkenc.ValueType { - return h.it.Next() -} - -func (h hintClearedIterator) Seek(t int64) chunkenc.ValueType { - return h.it.Seek(t) -} - -func (h hintClearedIterator) AtT() int64 { - return h.it.AtT() -} - -func (h hintClearedIterator) At() (int64, float64) { - return h.it.At() -} - -func (h hintClearedIterator) AtHistogram(to *histogram.Histogram) (int64, *histogram.Histogram) { - t, H := h.it.AtHistogram(to) - if H != nil && H.CounterResetHint == histogram.CounterReset { - H.CounterResetHint = histogram.UnknownCounterReset - } - return t, H -} - -func (h hintClearedIterator) AtFloatHistogram(to *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { - t, FH := h.it.AtFloatHistogram(to) - if FH != nil && FH.CounterResetHint == histogram.CounterReset { - FH.CounterResetHint = histogram.UnknownCounterReset - } - return t, FH -} - -func (h hintClearedIterator) Err() error { - return h.it.Err() -} - // LabelValues needs to be overridden from the headIndexReader implementation // so we can return labels within either in-order range or ooo range. func (oh *HeadAndOOOIndexReader) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) { @@ -326,26 +265,6 @@ func (cr *HeadAndOOOChunkReader) chunkOrIterable(meta chunks.Meta, copyLastChunk return meta.Chunk, nil, meta.MaxTime, nil } - if len(mm.metas) == 1 { - // Only one chunk in the multiMeta, no need to iterate, but we have to - // clear the counter reset hint for histogram chunks. - m := mm.metas[0] - chk := m.Chunk - if chk == nil { - _, cid, isOOO := unpackHeadChunkRef(m.Ref) - var err error - chk, _, err = cr.head.chunkFromSeries(s, cid, isOOO, m.MinTime, m.MaxTime, isoState, copyLastChunk) - if err != nil { - return nil, nil, 0, fmt.Errorf("invalid head chunk: %w", err) - } - } - if _, ok := chk.(*chunkenc.XORChunk); ok { - return chk, nil, m.MaxTime, nil - } - hcc := &hintClearedChunk{Chunk: chk} - return hcc, nil, m.MaxTime, nil - } - // We have a composite meta: construct a composite iterable. mc := &mergedOOOChunks{} for _, m := range mm.metas { diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index cd7ee877cc..bd9062bd3f 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -148,8 +148,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { // Chunk 3: [-------------------] // Output Graphically [-----------------------------] [-----------------------------] expChunks: []expChunk{ - {c: chunkInterval{0, 100, 250}, m: []chunkInterval{{0, 100, 200}, {2, 150, 250}}}, - {c: chunkInterval{1, 500, 650}, m: []chunkInterval{{1, 500, 600}, {3, 550, 650}}}, + {c: chunkInterval{0, 100, 650}, m: []chunkInterval{{0, 100, 200}, {2, 150, 250}, {1, 500, 600}, {3, 550, 650}}}, }, }, { @@ -191,10 +190,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { // Chunk 3: [------------------] // Output Graphically [------------------][------------------][------------------][------------------] expChunks: []expChunk{ - {c: chunkInterval{0, 100, 199}}, - {c: chunkInterval{1, 200, 299}}, - {c: chunkInterval{2, 300, 399}}, - {c: chunkInterval{3, 400, 499}}, + {c: chunkInterval{0, 100, 499}, m: []chunkInterval{{0, 100, 199}, {1, 200, 299}, {2, 300, 399}, {3, 400, 499}}}, }, }, { @@ -257,8 +253,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { // Chunk 4: [---------------------------------------] // Output Graphically [---------------------------------------] [------------------------------------------------] expChunks: []expChunk{ - {c: chunkInterval{0, 100, 300}, m: []chunkInterval{{0, 100, 300}, {2, 150, 250}}}, - {c: chunkInterval{4, 600, 850}, m: []chunkInterval{{4, 600, 800}, {3, 650, 750}, {1, 770, 850}}}, + {c: chunkInterval{0, 100, 850}, m: []chunkInterval{{0, 100, 300}, {2, 150, 250}, {4, 600, 800}, {3, 650, 750}, {1, 770, 850}}}, }, }, { @@ -277,9 +272,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { // Chunk 2: [--------] // Output Graphically [-------] [--------] [----------] expChunks: []expChunk{ - {c: chunkInterval{0, 100, 150}}, - {c: chunkInterval{2, 200, 250}}, - {c: chunkInterval{1, 300, 350}}, + {c: chunkInterval{0, 100, 350}, m: []chunkInterval{{0, 100, 150}, {2, 200, 250}, {1, 300, 350}}}, }, }, } @@ -334,10 +327,6 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { } chunk = mm } - // OOO chunks are always wrapped in multiMeta. - if len(e.m) == 0 { - chunk = &multiMeta{metas: []chunks.Meta{{Ref: findID(e.c.ID), Chunk: chunk, MinTime: e.c.mint, MaxTime: e.c.maxt}}} - } meta := chunks.Meta{ Chunk: chunk, MinTime: e.c.mint, @@ -620,8 +609,6 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) { scenario.sampleFunc(minutes(24), 1), scenario.sampleFunc(minutes(26), 1), scenario.sampleFunc(minutes(29), 1), - }, - { scenario.sampleFunc(minutes(30), 2), scenario.sampleFunc(minutes(32), 2), scenario.sampleFunc(minutes(34), 2), @@ -675,8 +662,6 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) { scenario.sampleFunc(minutes(24), 2), scenario.sampleFunc(minutes(26), 2), scenario.sampleFunc(minutes(29), 2), - }, - { scenario.sampleFunc(minutes(30), 1), scenario.sampleFunc(minutes(32), 1), scenario.sampleFunc(minutes(34), 1), @@ -690,7 +675,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) { }, }, { - name: "If chunks are not overlapped they are not converged", + name: "If chunks are not overlapped they are still converged", queryMinT: minutes(0), queryMaxT: minutes(100), firstInOrderSampleAt: minutes(120), @@ -717,8 +702,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) { {Ts: minutes(40), V: 3}, {Ts: minutes(42), V: 3}, }, - expChunkError: false, - expSingleChunks: true, + expChunkError: false, // ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100 // Query Interval [------------------------------------------------------------------------------------------] // Chunk 0 [-------] @@ -733,22 +717,16 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) { scenario.sampleFunc(minutes(14), 0), scenario.sampleFunc(minutes(16), 0), scenario.sampleFunc(minutes(18), 0), - }, - { scenario.sampleFunc(minutes(20), 1), scenario.sampleFunc(minutes(22), 1), scenario.sampleFunc(minutes(24), 1), scenario.sampleFunc(minutes(26), 1), scenario.sampleFunc(minutes(28), 1), - }, - { scenario.sampleFunc(minutes(30), 2), scenario.sampleFunc(minutes(32), 2), scenario.sampleFunc(minutes(34), 2), scenario.sampleFunc(minutes(36), 2), scenario.sampleFunc(minutes(38), 2), - }, - { scenario.sampleFunc(minutes(40), 3), scenario.sampleFunc(minutes(42), 3), },