fix(tsdb): clear the counter reset hint from OOO chunks unless explicit

We cannot trust the counter reset because it might be invalidated by
another chunk (in-order or ooo).
We always try to honor explicit counter resets on the other hand.

Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
György Krajcsovits 2024-10-29 12:14:22 +01:00
parent d73de5255b
commit ee6f442168
8 changed files with 82 additions and 26 deletions

View file

@ -92,6 +92,11 @@ func (c *FloatHistogramChunk) GetCounterResetHeader() CounterResetHeader {
return CounterResetHeader(c.Bytes()[2] & CounterResetHeaderMask)
}
// ClearCounterReset sets the counter reset header to UnknownCounterReset.
func (c *FloatHistogramChunk) ClearCounterReset() {
c.Bytes()[2] = (c.Bytes()[2] & (^CounterResetHeaderMask)) | byte(UnknownCounterReset)
}
// Compact implements the Chunk interface.
func (c *FloatHistogramChunk) Compact() {
if l := len(c.b.stream); cap(c.b.stream) > l+chunkCompactCapacityThreshold {

View file

@ -103,6 +103,11 @@ func (c *HistogramChunk) GetCounterResetHeader() CounterResetHeader {
return CounterResetHeader(c.Bytes()[2] & CounterResetHeaderMask)
}
// ClearCounterReset sets the counter reset header to UnknownCounterReset.
func (c *HistogramChunk) ClearCounterReset() {
c.Bytes()[2] = (c.Bytes()[2] & (^CounterResetHeaderMask)) | byte(UnknownCounterReset)
}
// Compact implements the Chunk interface.
func (c *HistogramChunk) Compact() {
if l := len(c.b.stream); cap(c.b.stream) > l+chunkCompactCapacityThreshold {

View file

@ -6164,12 +6164,12 @@ func testOOOInterleavedImplicitCounterResets(t *testing.T, name string, scenario
expectedSamples: []expectedTsValue{
{1, 40, histogram.UnknownCounterReset}, // I1.
{2, 40, histogram.UnknownCounterReset}, // O1.
{3, 10, histogram.UnknownCounterReset}, // O2. Counter reset cleared by iterator change.
{3, 10, histogram.UnknownCounterReset}, // O2.
{4, 30, histogram.UnknownCounterReset}, // I2. Counter reset cleared by iterator change.
},
expectedChunks: []expectedChunk{
{histogram.UnknownCounterReset, 2}, // I1+O2. Recoded due to having OOO chunks.
{histogram.CounterReset, 2}, // O2.I2. Recoded due to having OOO chunks.
{histogram.UnknownCounterReset, 2}, // I1+O2. Recoded due to having INO and OOO chunks.
{histogram.UnknownCounterReset, 2}, // O2.I2. Recoded due to having INO and OOO chunks.
},
},
"counter reset in OOO mmapped chunk cleared by in-memory ooo chunk": {
@ -6189,15 +6189,16 @@ func testOOOInterleavedImplicitCounterResets(t *testing.T, name string, scenario
{2, 20, histogram.NotCounterReset}, // MO1.
{3, 30, histogram.NotCounterReset}, // MO1.
{4, 10, histogram.UnknownCounterReset}, // O1. Counter reset cleared by iterator change.
{5, 20, histogram.UnknownCounterReset}, // MO2. Counter reset cleared by iterator change.
{6, 10, histogram.UnknownCounterReset}, // MO3. Counter reset cleared by iterator change.
{5, 20, histogram.UnknownCounterReset}, // MO2.
{6, 10, histogram.UnknownCounterReset}, // MO3.
{7, 20, histogram.NotCounterReset}, // MO3.
{8, 30, histogram.UnknownCounterReset}, // I1.
},
expectedChunks: []expectedChunk{
{histogram.UnknownCounterReset, 3}, // MO1. Recoded due to having OOO chunks.
{histogram.CounterReset, 2}, // O1+MO2. Recoded due to having OOO chunks.
{histogram.CounterReset, 3}, // MO3+I1. Recoded due to having OOO chunks.
{histogram.UnknownCounterReset, 3}, // MO1.
{histogram.UnknownCounterReset, 1}, // O1.
{histogram.UnknownCounterReset, 1}, // MO2.
{histogram.UnknownCounterReset, 3}, // MO3+I1. Recoded due to having INO and OOO chunks.
},
},
"counter reset in OOO mmapped chunk cleared by another OOO mmaped chunk": {
@ -6214,17 +6215,19 @@ func testOOOInterleavedImplicitCounterResets(t *testing.T, name string, scenario
oooCap: 3,
expectedSamples: []expectedTsValue{
{1, 50, histogram.UnknownCounterReset}, // MO1.
{2, 10, histogram.UnknownCounterReset}, // MO3. Counter reset cleared by iterator change.
{2, 10, histogram.UnknownCounterReset}, // MO3.
{3, 20, histogram.NotCounterReset}, // MO3.
{4, 30, histogram.NotCounterReset}, // MO3.
{5, 40, histogram.UnknownCounterReset}, // MO2. Counter reset cleared by iterator change.
{5, 40, histogram.UnknownCounterReset}, // MO2.
{6, 50, histogram.NotCounterReset}, // MO2.
{7, 60, histogram.UnknownCounterReset}, // O1.
{8, 100, histogram.UnknownCounterReset}, // I1.
},
expectedChunks: []expectedChunk{
{histogram.UnknownCounterReset, 1}, // MO1. Recoded due to having OOO chunks.
{histogram.CounterReset, 7}, // MO3+MO2+O1+I1. Recoded due to having OOO chunks.
{histogram.UnknownCounterReset, 1}, // MO1.
{histogram.UnknownCounterReset, 3}, // MO3.
{histogram.UnknownCounterReset, 2}, // MO2.
{histogram.UnknownCounterReset, 2}, // O1+I1. Recoded due to having INO and OOO chunks.
},
},
}

View file

@ -4769,7 +4769,7 @@ func TestOOOHistogramCounterResetHeaders(t *testing.T) {
numSamples: 2,
},
expOOOMmappedChunks{
header: chunkenc.CounterReset,
header: chunkenc.UnknownCounterReset,
mint: 122,
maxt: 124,
numSamples: 3,
@ -4797,7 +4797,7 @@ func TestOOOHistogramCounterResetHeaders(t *testing.T) {
numSamples: 1,
},
expOOOMmappedChunks{
header: chunkenc.CounterReset,
header: chunkenc.UnknownCounterReset,
mint: 205,
maxt: 205,
numSamples: 1,
@ -4809,7 +4809,7 @@ func TestOOOHistogramCounterResetHeaders(t *testing.T) {
numSamples: 1,
},
expOOOMmappedChunks{
header: chunkenc.CounterReset,
header: chunkenc.UnknownCounterReset,
mint: 215,
maxt: 220,
numSamples: 2,

View file

@ -136,6 +136,12 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error
newChunk, recoded, app, _ = app.AppendHistogram(prevHApp, s.t, s.h, false)
if newChunk != nil { // A new chunk was allocated.
if !recoded {
hc := newChunk.(*chunkenc.HistogramChunk)
if s.h.CounterResetHint != histogram.CounterReset && hc.GetCounterResetHeader() == chunkenc.CounterReset {
// Clear the detected counter reset in the chunk as we cannot trust
// it in OOO.
hc.ClearCounterReset()
}
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})
cmint = s.t
}
@ -151,6 +157,12 @@ func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error
newChunk, recoded, app, _ = app.AppendFloatHistogram(prevHApp, s.t, s.fh, false)
if newChunk != nil { // A new chunk was allocated.
if !recoded {
hc := newChunk.(*chunkenc.FloatHistogramChunk)
if s.fh.CounterResetHint != histogram.CounterReset && hc.GetCounterResetHeader() == chunkenc.CounterReset {
// Clear the detected counter reset in the chunk as we cannot trust
// it in OOO.
hc.ClearCounterReset()
}
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})
cmint = s.t
}

View file

@ -149,10 +149,10 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap
prevIsOOO := isOOOChunkID(chunks.HeadChunkID(toBeMerged.Ref))
for _, c := range tmpChks[1:] {
currIsOOO := isOOOChunkID(chunks.HeadChunkID(c.Ref))
if !prevIsOOO && !currIsOOO && c.MinTime > toBeMerged.MaxTime {
// This in-order chunk doesn't overlap and we are not switching between
// in-order and out of order. Send current toBeMerged to output and start
// a new one.
if prevIsOOO == currIsOOO && c.MinTime > toBeMerged.MaxTime {
// This chunk doesn't overlap and we are not switching between in-order
// and out of order. Send current toBeMerged to output and start a new
// one.
*chks = append(*chks, toBeMerged)
toBeMerged = c
} else {

View file

@ -148,7 +148,8 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
// Chunk 3: [-------------------]
// Output Graphically [-----------------------------] [-----------------------------]
expChunks: []expChunk{
{c: chunkInterval{0, 100, 650}, m: []chunkInterval{{0, 100, 200}, {2, 150, 250}, {1, 500, 600}, {3, 550, 650}}},
{c: chunkInterval{0, 100, 250}, m: []chunkInterval{{0, 100, 200}, {2, 150, 250}}},
{c: chunkInterval{1, 500, 650}, m: []chunkInterval{{1, 500, 600}, {3, 550, 650}}},
},
},
{
@ -190,7 +191,10 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
// Chunk 3: [------------------]
// Output Graphically [------------------][------------------][------------------][------------------]
expChunks: []expChunk{
{c: chunkInterval{0, 100, 499}, m: []chunkInterval{{0, 100, 199}, {1, 200, 299}, {2, 300, 399}, {3, 400, 499}}},
{c: chunkInterval{0, 100, 199}},
{c: chunkInterval{1, 200, 299}},
{c: chunkInterval{2, 300, 399}},
{c: chunkInterval{3, 400, 499}},
},
},
{
@ -253,7 +257,8 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
// Chunk 4: [---------------------------------------]
// Output Graphically [---------------------------------------] [------------------------------------------------]
expChunks: []expChunk{
{c: chunkInterval{0, 100, 850}, m: []chunkInterval{{0, 100, 300}, {2, 150, 250}, {4, 600, 800}, {3, 650, 750}, {1, 770, 850}}},
{c: chunkInterval{0, 100, 300}, m: []chunkInterval{{0, 100, 300}, {2, 150, 250}}},
{c: chunkInterval{4, 600, 850}, m: []chunkInterval{{4, 600, 800}, {3, 650, 750}, {1, 770, 850}}},
},
},
{
@ -272,7 +277,9 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
// Chunk 2: [--------]
// Output Graphically [-------] [--------] [----------]
expChunks: []expChunk{
{c: chunkInterval{0, 100, 350}, m: []chunkInterval{{0, 100, 150}, {2, 200, 250}, {1, 300, 350}}},
{c: chunkInterval{0, 100, 150}},
{c: chunkInterval{2, 200, 250}},
{c: chunkInterval{1, 300, 350}},
},
},
}
@ -609,6 +616,8 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
scenario.sampleFunc(minutes(24), 1),
scenario.sampleFunc(minutes(26), 1),
scenario.sampleFunc(minutes(29), 1),
},
{
scenario.sampleFunc(minutes(30), 2),
scenario.sampleFunc(minutes(32), 2),
scenario.sampleFunc(minutes(34), 2),
@ -662,6 +671,8 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
scenario.sampleFunc(minutes(24), 2),
scenario.sampleFunc(minutes(26), 2),
scenario.sampleFunc(minutes(29), 2),
},
{
scenario.sampleFunc(minutes(30), 1),
scenario.sampleFunc(minutes(32), 1),
scenario.sampleFunc(minutes(34), 1),
@ -675,7 +686,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
},
},
{
name: "If chunks are not overlapped they are still converged",
name: "If chunks are not overlapped they are not converged",
queryMinT: minutes(0),
queryMaxT: minutes(100),
firstInOrderSampleAt: minutes(120),
@ -702,7 +713,8 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
{Ts: minutes(40), V: 3},
{Ts: minutes(42), V: 3},
},
expChunkError: false,
expChunkError: false,
expSingleChunks: true,
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
// Query Interval [------------------------------------------------------------------------------------------]
// Chunk 0 [-------]
@ -717,16 +729,22 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
scenario.sampleFunc(minutes(14), 0),
scenario.sampleFunc(minutes(16), 0),
scenario.sampleFunc(minutes(18), 0),
},
{
scenario.sampleFunc(minutes(20), 1),
scenario.sampleFunc(minutes(22), 1),
scenario.sampleFunc(minutes(24), 1),
scenario.sampleFunc(minutes(26), 1),
scenario.sampleFunc(minutes(28), 1),
},
{
scenario.sampleFunc(minutes(30), 2),
scenario.sampleFunc(minutes(32), 2),
scenario.sampleFunc(minutes(34), 2),
scenario.sampleFunc(minutes(36), 2),
scenario.sampleFunc(minutes(38), 2),
},
{
scenario.sampleFunc(minutes(40), 3),
scenario.sampleFunc(minutes(42), 3),
},

View file

@ -167,6 +167,8 @@ func TestOOOChunks_ToEncodedChunks(t *testing.T) {
h2 := h1.Copy()
h2.PositiveSpans = append(h2.PositiveSpans, histogram.Span{Offset: 1, Length: 1})
h2.PositiveBuckets = append(h2.PositiveBuckets, 12)
h2explicit := h2.Copy()
h2explicit.CounterResetHint = histogram.CounterReset
testCases := map[string]struct {
samples []sample
@ -199,11 +201,22 @@ func TestOOOChunks_ToEncodedChunks(t *testing.T) {
{encoding: chunkenc.EncXOR, minTime: 1200, maxTime: 1200},
},
},
"has a counter reset": {
"has an implicit counter reset": {
samples: []sample{
{t: 1000, h: h2},
{t: 1100, h: h1},
},
expectedCounterResets: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.UnknownCounterReset},
expectedChunks: []chunkVerify{
{encoding: chunkenc.EncHistogram, minTime: 1000, maxTime: 1000},
{encoding: chunkenc.EncHistogram, minTime: 1100, maxTime: 1100},
},
},
"has an explicit counter reset": {
samples: []sample{
{t: 1000, h: h1},
{t: 1100, h: h2explicit},
},
expectedCounterResets: []histogram.CounterResetHint{histogram.UnknownCounterReset, histogram.CounterReset},
expectedChunks: []chunkVerify{
{encoding: chunkenc.EncHistogram, minTime: 1000, maxTime: 1000},