From ee6f44216880d770f1b93364e78fbc67b6d11fbc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gy=C3=B6rgy=20Krajcsovits?= Date: Tue, 29 Oct 2024 12:14:22 +0100 Subject: [PATCH] fix(tsdb): clear the counter reset hint from OOO chunks unless explicit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We cannot trust the counter reset because it might be invalidated by another chunk (in-order or ooo). We always try to honor explicit counter resets on the other hand. Signed-off-by: György Krajcsovits --- tsdb/chunkenc/float_histogram.go | 5 +++++ tsdb/chunkenc/histogram.go | 5 +++++ tsdb/db_test.go | 27 +++++++++++++++------------ tsdb/head_test.go | 6 +++--- tsdb/ooo_head.go | 12 ++++++++++++ tsdb/ooo_head_read.go | 8 ++++---- tsdb/ooo_head_read_test.go | 30 ++++++++++++++++++++++++------ tsdb/ooo_head_test.go | 15 ++++++++++++++- 8 files changed, 82 insertions(+), 26 deletions(-) diff --git a/tsdb/chunkenc/float_histogram.go b/tsdb/chunkenc/float_histogram.go index f18eb77dad..8e7042a056 100644 --- a/tsdb/chunkenc/float_histogram.go +++ b/tsdb/chunkenc/float_histogram.go @@ -92,6 +92,11 @@ func (c *FloatHistogramChunk) GetCounterResetHeader() CounterResetHeader { return CounterResetHeader(c.Bytes()[2] & CounterResetHeaderMask) } +// ClearCounterReset sets the counter reset header to UnknownCounterReset. +func (c *FloatHistogramChunk) ClearCounterReset() { + c.Bytes()[2] = (c.Bytes()[2] & (^CounterResetHeaderMask)) | byte(UnknownCounterReset) +} + // Compact implements the Chunk interface. func (c *FloatHistogramChunk) Compact() { if l := len(c.b.stream); cap(c.b.stream) > l+chunkCompactCapacityThreshold { diff --git a/tsdb/chunkenc/histogram.go b/tsdb/chunkenc/histogram.go index f8796d64ec..9479ec9f6d 100644 --- a/tsdb/chunkenc/histogram.go +++ b/tsdb/chunkenc/histogram.go @@ -103,6 +103,11 @@ func (c *HistogramChunk) GetCounterResetHeader() CounterResetHeader { return CounterResetHeader(c.Bytes()[2] & CounterResetHeaderMask) } +// ClearCounterReset sets the counter reset header to UnknownCounterReset. +func (c *HistogramChunk) ClearCounterReset() { + c.Bytes()[2] = (c.Bytes()[2] & (^CounterResetHeaderMask)) | byte(UnknownCounterReset) +} + // Compact implements the Chunk interface. func (c *HistogramChunk) Compact() { if l := len(c.b.stream); cap(c.b.stream) > l+chunkCompactCapacityThreshold { diff --git a/tsdb/db_test.go b/tsdb/db_test.go index c37726a1e2..5c329dac0c 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -6164,12 +6164,12 @@ func testOOOInterleavedImplicitCounterResets(t *testing.T, name string, scenario expectedSamples: []expectedTsValue{ {1, 40, histogram.UnknownCounterReset}, // I1. {2, 40, histogram.UnknownCounterReset}, // O1. - {3, 10, histogram.UnknownCounterReset}, // O2. Counter reset cleared by iterator change. + {3, 10, histogram.UnknownCounterReset}, // O2. {4, 30, histogram.UnknownCounterReset}, // I2. Counter reset cleared by iterator change. }, expectedChunks: []expectedChunk{ - {histogram.UnknownCounterReset, 2}, // I1+O2. Recoded due to having OOO chunks. - {histogram.CounterReset, 2}, // O2.I2. Recoded due to having OOO chunks. + {histogram.UnknownCounterReset, 2}, // I1+O2. Recoded due to having INO and OOO chunks. + {histogram.UnknownCounterReset, 2}, // O2.I2. Recoded due to having INO and OOO chunks. }, }, "counter reset in OOO mmapped chunk cleared by in-memory ooo chunk": { @@ -6189,15 +6189,16 @@ func testOOOInterleavedImplicitCounterResets(t *testing.T, name string, scenario {2, 20, histogram.NotCounterReset}, // MO1. {3, 30, histogram.NotCounterReset}, // MO1. {4, 10, histogram.UnknownCounterReset}, // O1. Counter reset cleared by iterator change. - {5, 20, histogram.UnknownCounterReset}, // MO2. Counter reset cleared by iterator change. - {6, 10, histogram.UnknownCounterReset}, // MO3. Counter reset cleared by iterator change. + {5, 20, histogram.UnknownCounterReset}, // MO2. + {6, 10, histogram.UnknownCounterReset}, // MO3. {7, 20, histogram.NotCounterReset}, // MO3. {8, 30, histogram.UnknownCounterReset}, // I1. }, expectedChunks: []expectedChunk{ - {histogram.UnknownCounterReset, 3}, // MO1. Recoded due to having OOO chunks. - {histogram.CounterReset, 2}, // O1+MO2. Recoded due to having OOO chunks. - {histogram.CounterReset, 3}, // MO3+I1. Recoded due to having OOO chunks. + {histogram.UnknownCounterReset, 3}, // MO1. + {histogram.UnknownCounterReset, 1}, // O1. + {histogram.UnknownCounterReset, 1}, // MO2. + {histogram.UnknownCounterReset, 3}, // MO3+I1. Recoded due to having INO and OOO chunks. }, }, "counter reset in OOO mmapped chunk cleared by another OOO mmaped chunk": { @@ -6214,17 +6215,19 @@ func testOOOInterleavedImplicitCounterResets(t *testing.T, name string, scenario oooCap: 3, expectedSamples: []expectedTsValue{ {1, 50, histogram.UnknownCounterReset}, // MO1. - {2, 10, histogram.UnknownCounterReset}, // MO3. Counter reset cleared by iterator change. + {2, 10, histogram.UnknownCounterReset}, // MO3. {3, 20, histogram.NotCounterReset}, // MO3. {4, 30, histogram.NotCounterReset}, // MO3. - {5, 40, histogram.UnknownCounterReset}, // MO2. Counter reset cleared by iterator change. + {5, 40, histogram.UnknownCounterReset}, // MO2. {6, 50, histogram.NotCounterReset}, // MO2. {7, 60, histogram.UnknownCounterReset}, // O1. {8, 100, histogram.UnknownCounterReset}, // I1. }, expectedChunks: []expectedChunk{ - {histogram.UnknownCounterReset, 1}, // MO1. Recoded due to having OOO chunks. - {histogram.CounterReset, 7}, // MO3+MO2+O1+I1. Recoded due to having OOO chunks. + {histogram.UnknownCounterReset, 1}, // MO1. + {histogram.UnknownCounterReset, 3}, // MO3. + {histogram.UnknownCounterReset, 2}, // MO2. + {histogram.UnknownCounterReset, 2}, // O1+I1. Recoded due to having INO and OOO chunks. }, }, } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index cc9daa97fe..143bbe9450 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -4769,7 +4769,7 @@ func TestOOOHistogramCounterResetHeaders(t *testing.T) { numSamples: 2, }, expOOOMmappedChunks{ - header: chunkenc.CounterReset, + header: chunkenc.UnknownCounterReset, mint: 122, maxt: 124, numSamples: 3, @@ -4797,7 +4797,7 @@ func TestOOOHistogramCounterResetHeaders(t *testing.T) { numSamples: 1, }, expOOOMmappedChunks{ - header: chunkenc.CounterReset, + header: chunkenc.UnknownCounterReset, mint: 205, maxt: 205, numSamples: 1, @@ -4809,7 +4809,7 @@ func TestOOOHistogramCounterResetHeaders(t *testing.T) { numSamples: 1, }, expOOOMmappedChunks{ - header: chunkenc.CounterReset, + header: chunkenc.UnknownCounterReset, mint: 215, maxt: 220, numSamples: 2, diff --git a/tsdb/ooo_head.go b/tsdb/ooo_head.go index 0ed9f36484..9ccd1d9ac6 100644 --- a/tsdb/ooo_head.go +++ b/tsdb/ooo_head.go @@ -136,6 +136,12 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error newChunk, recoded, app, _ = app.AppendHistogram(prevHApp, s.t, s.h, false) if newChunk != nil { // A new chunk was allocated. if !recoded { + hc := newChunk.(*chunkenc.HistogramChunk) + if s.h.CounterResetHint != histogram.CounterReset && hc.GetCounterResetHeader() == chunkenc.CounterReset { + // Clear the detected counter reset in the chunk as we cannot trust + // it in OOO. + hc.ClearCounterReset() + } chks = append(chks, memChunk{chunk, cmint, cmaxt, nil}) cmint = s.t } @@ -151,6 +157,12 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error newChunk, recoded, app, _ = app.AppendFloatHistogram(prevHApp, s.t, s.fh, false) if newChunk != nil { // A new chunk was allocated. if !recoded { + hc := newChunk.(*chunkenc.FloatHistogramChunk) + if s.fh.CounterResetHint != histogram.CounterReset && hc.GetCounterResetHeader() == chunkenc.CounterReset { + // Clear the detected counter reset in the chunk as we cannot trust + // it in OOO. + hc.ClearCounterReset() + } chks = append(chks, memChunk{chunk, cmint, cmaxt, nil}) cmint = s.t } diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index 945c2c66f6..1c1683ea26 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -149,10 +149,10 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap prevIsOOO := isOOOChunkID(chunks.HeadChunkID(toBeMerged.Ref)) for _, c := range tmpChks[1:] { currIsOOO := isOOOChunkID(chunks.HeadChunkID(c.Ref)) - if !prevIsOOO && !currIsOOO && c.MinTime > toBeMerged.MaxTime { - // This in-order chunk doesn't overlap and we are not switching between - // in-order and out of order. Send current toBeMerged to output and start - // a new one. + if prevIsOOO == currIsOOO && c.MinTime > toBeMerged.MaxTime { + // This chunk doesn't overlap and we are not switching between in-order + // and out of order. Send current toBeMerged to output and start a new + // one. *chks = append(*chks, toBeMerged) toBeMerged = c } else { diff --git a/tsdb/ooo_head_read_test.go b/tsdb/ooo_head_read_test.go index bd9062bd3f..17f551dd7d 100644 --- a/tsdb/ooo_head_read_test.go +++ b/tsdb/ooo_head_read_test.go @@ -148,7 +148,8 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { // Chunk 3: [-------------------] // Output Graphically [-----------------------------] [-----------------------------] expChunks: []expChunk{ - {c: chunkInterval{0, 100, 650}, m: []chunkInterval{{0, 100, 200}, {2, 150, 250}, {1, 500, 600}, {3, 550, 650}}}, + {c: chunkInterval{0, 100, 250}, m: []chunkInterval{{0, 100, 200}, {2, 150, 250}}}, + {c: chunkInterval{1, 500, 650}, m: []chunkInterval{{1, 500, 600}, {3, 550, 650}}}, }, }, { @@ -190,7 +191,10 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { // Chunk 3: [------------------] // Output Graphically [------------------][------------------][------------------][------------------] expChunks: []expChunk{ - {c: chunkInterval{0, 100, 499}, m: []chunkInterval{{0, 100, 199}, {1, 200, 299}, {2, 300, 399}, {3, 400, 499}}}, + {c: chunkInterval{0, 100, 199}}, + {c: chunkInterval{1, 200, 299}}, + {c: chunkInterval{2, 300, 399}}, + {c: chunkInterval{3, 400, 499}}, }, }, { @@ -253,7 +257,8 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { // Chunk 4: [---------------------------------------] // Output Graphically [---------------------------------------] [------------------------------------------------] expChunks: []expChunk{ - {c: chunkInterval{0, 100, 850}, m: []chunkInterval{{0, 100, 300}, {2, 150, 250}, {4, 600, 800}, {3, 650, 750}, {1, 770, 850}}}, + {c: chunkInterval{0, 100, 300}, m: []chunkInterval{{0, 100, 300}, {2, 150, 250}}}, + {c: chunkInterval{4, 600, 850}, m: []chunkInterval{{4, 600, 800}, {3, 650, 750}, {1, 770, 850}}}, }, }, { @@ -272,7 +277,9 @@ func TestOOOHeadIndexReader_Series(t *testing.T) { // Chunk 2: [--------] // Output Graphically [-------] [--------] [----------] expChunks: []expChunk{ - {c: chunkInterval{0, 100, 350}, m: []chunkInterval{{0, 100, 150}, {2, 200, 250}, {1, 300, 350}}}, + {c: chunkInterval{0, 100, 150}}, + {c: chunkInterval{2, 200, 250}}, + {c: chunkInterval{1, 300, 350}}, }, }, } @@ -609,6 +616,8 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) { scenario.sampleFunc(minutes(24), 1), scenario.sampleFunc(minutes(26), 1), scenario.sampleFunc(minutes(29), 1), + }, + { scenario.sampleFunc(minutes(30), 2), scenario.sampleFunc(minutes(32), 2), scenario.sampleFunc(minutes(34), 2), @@ -662,6 +671,8 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) { scenario.sampleFunc(minutes(24), 2), scenario.sampleFunc(minutes(26), 2), scenario.sampleFunc(minutes(29), 2), + }, + { scenario.sampleFunc(minutes(30), 1), scenario.sampleFunc(minutes(32), 1), scenario.sampleFunc(minutes(34), 1), @@ -675,7 +686,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) { }, }, { - name: "If chunks are not overlapped they are still converged", + name: "If chunks are not overlapped they are not converged", queryMinT: minutes(0), queryMaxT: minutes(100), firstInOrderSampleAt: minutes(120), @@ -702,7 +713,8 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) { {Ts: minutes(40), V: 3}, {Ts: minutes(42), V: 3}, }, - expChunkError: false, + expChunkError: false, + expSingleChunks: true, // ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100 // Query Interval [------------------------------------------------------------------------------------------] // Chunk 0 [-------] @@ -717,16 +729,22 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) { scenario.sampleFunc(minutes(14), 0), scenario.sampleFunc(minutes(16), 0), scenario.sampleFunc(minutes(18), 0), + }, + { scenario.sampleFunc(minutes(20), 1), scenario.sampleFunc(minutes(22), 1), scenario.sampleFunc(minutes(24), 1), scenario.sampleFunc(minutes(26), 1), scenario.sampleFunc(minutes(28), 1), + }, + { scenario.sampleFunc(minutes(30), 2), scenario.sampleFunc(minutes(32), 2), scenario.sampleFunc(minutes(34), 2), scenario.sampleFunc(minutes(36), 2), scenario.sampleFunc(minutes(38), 2), + }, + { scenario.sampleFunc(minutes(40), 3), scenario.sampleFunc(minutes(42), 3), }, diff --git a/tsdb/ooo_head_test.go b/tsdb/ooo_head_test.go index b9badfea21..830d575b14 100644 --- a/tsdb/ooo_head_test.go +++ b/tsdb/ooo_head_test.go @@ -167,6 +167,8 @@ func TestOOOChunks_ToEncodedChunks(t *testing.T) { h2 := h1.Copy() h2.PositiveSpans = append(h2.PositiveSpans, histogram.Span{Offset: 1, Length: 1}) h2.PositiveBuckets = append(h2.PositiveBuckets, 12) + h2explicit := h2.Copy() + h2explicit.CounterResetHint = histogram.CounterReset testCases := map[string]struct { samples []sample @@ -199,11 +201,22 @@ func TestOOOChunks_ToEncodedChunks(t *testing.T) { {encoding: chunkenc.EncXOR, minTime: 1200, maxTime: 1200}, }, }, - "has a counter reset": { + "has an implicit counter reset": { samples: []sample{ {t: 1000, h: h2}, {t: 1100, h: h1}, }, + expectedCounterResets: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.UnknownCounterReset}, + expectedChunks: []chunkVerify{ + {encoding: chunkenc.EncHistogram, minTime: 1000, maxTime: 1000}, + {encoding: chunkenc.EncHistogram, minTime: 1100, maxTime: 1100}, + }, + }, + "has an explicit counter reset": { + samples: []sample{ + {t: 1000, h: h1}, + {t: 1100, h: h2explicit}, + }, expectedCounterResets: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.CounterReset}, expectedChunks: []chunkVerify{ {encoding: chunkenc.EncHistogram, minTime: 1000, maxTime: 1000},