mirror of
https://github.com/prometheus/prometheus.git
synced 2026-06-08 16:12:16 -04:00
fix(tsdb): do not optimize for non-overlapping OOO chunks
During compaction this will trigger populateWithDelChunkSeriesIterator to re-encode the chunks in sequence, creating the correct counter reset hints. Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
This commit is contained in:
parent
0a05e1dbd6
commit
8bbb089725
3 changed files with 20 additions and 126 deletions
|
|
@ -6165,11 +6165,11 @@ func testOOOInterleavedImplicitCounterResets(t *testing.T, name string, scenario
|
|||
{1, 40, histogram.UnknownCounterReset}, // I1.
|
||||
{2, 40, histogram.UnknownCounterReset}, // O1.
|
||||
{3, 10, histogram.UnknownCounterReset}, // O2. Counter reset cleared by iterator change.
|
||||
{4, 30, histogram.UnknownCounterReset}, // I2. Counter reset cleared on merge.
|
||||
{4, 30, histogram.UnknownCounterReset}, // I2. Counter reset cleared by iterator change.
|
||||
},
|
||||
expectedChunks: []expectedChunk{
|
||||
{histogram.UnknownCounterReset, 2}, // I1+O2.
|
||||
{histogram.UnknownCounterReset, 2}, // O2.I2.
|
||||
{histogram.UnknownCounterReset, 2}, // I1+O2. Recoded due to having OOO chunks.
|
||||
{histogram.CounterReset, 2}, // O2.I2. Recoded due to having OOO chunks.
|
||||
},
|
||||
},
|
||||
"counter reset in OOO mmapped chunk cleared by in-memory ooo chunk": {
|
||||
|
|
@ -6189,16 +6189,15 @@ func testOOOInterleavedImplicitCounterResets(t *testing.T, name string, scenario
|
|||
{2, 20, histogram.NotCounterReset}, // MO1.
|
||||
{3, 30, histogram.NotCounterReset}, // MO1.
|
||||
{4, 10, histogram.UnknownCounterReset}, // O1. Counter reset cleared by iterator change.
|
||||
{5, 20, histogram.UnknownCounterReset}, // MO2. Counter reset cleared by merge.
|
||||
{5, 20, histogram.UnknownCounterReset}, // MO2. Counter reset cleared by iterator change.
|
||||
{6, 10, histogram.UnknownCounterReset}, // MO3. Counter reset cleared by iterator change.
|
||||
{7, 20, histogram.NotCounterReset}, // MO3.
|
||||
{8, 30, histogram.UnknownCounterReset}, // I1.
|
||||
},
|
||||
expectedChunks: []expectedChunk{
|
||||
{histogram.UnknownCounterReset, 3}, // MO1.
|
||||
{histogram.UnknownCounterReset, 1}, // O1.
|
||||
{histogram.UnknownCounterReset, 1}, // MO2.
|
||||
{histogram.UnknownCounterReset, 3}, // MO3+I1.
|
||||
{histogram.UnknownCounterReset, 3}, // MO1. Recoded due to having OOO chunks.
|
||||
{histogram.CounterReset, 2}, // O1+MO2. Recoded due to having OOO chunks.
|
||||
{histogram.CounterReset, 3}, // MO3+I1. Recoded due to having OOO chunks.
|
||||
},
|
||||
},
|
||||
"counter reset in OOO mmapped chunk cleared by another OOO mmaped chunk": {
|
||||
|
|
@ -6218,16 +6217,14 @@ func testOOOInterleavedImplicitCounterResets(t *testing.T, name string, scenario
|
|||
{2, 10, histogram.UnknownCounterReset}, // MO3. Counter reset cleared by iterator change.
|
||||
{3, 20, histogram.NotCounterReset}, // MO3.
|
||||
{4, 30, histogram.NotCounterReset}, // MO3.
|
||||
{5, 40, histogram.UnknownCounterReset}, // MO2. Counter reset cleared by merge.
|
||||
{5, 40, histogram.UnknownCounterReset}, // MO2. Counter reset cleared by iterator change.
|
||||
{6, 50, histogram.NotCounterReset}, // MO2.
|
||||
{7, 60, histogram.UnknownCounterReset}, // O1.
|
||||
{8, 100, histogram.UnknownCounterReset}, // I1.
|
||||
},
|
||||
expectedChunks: []expectedChunk{
|
||||
{histogram.UnknownCounterReset, 1}, // MO1.
|
||||
{histogram.UnknownCounterReset, 3}, // MO3
|
||||
{histogram.UnknownCounterReset, 2}, // MO2
|
||||
{histogram.UnknownCounterReset, 2}, // O1+I1.
|
||||
{histogram.UnknownCounterReset, 1}, // MO1. Recoded due to having OOO chunks.
|
||||
{histogram.CounterReset, 7}, // MO3+MO2+O1+I1. Recoded due to having OOO chunks.
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,7 +22,6 @@ import (
|
|||
|
||||
"github.com/oklog/ulid"
|
||||
|
||||
"github.com/prometheus/prometheus/model/histogram"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||
|
|
@ -148,20 +147,14 @@ func getOOOSeriesChunks(s *memSeries, mint, maxt int64, lastGarbageCollectedMmap
|
|||
// [5,7], [6,8].
|
||||
toBeMerged := tmpChks[0]
|
||||
prevIsOOO := isOOOChunkID(chunks.HeadChunkID(toBeMerged.Ref))
|
||||
if prevIsOOO {
|
||||
toBeMerged.Chunk = &multiMeta{metas: []chunks.Meta{toBeMerged}}
|
||||
}
|
||||
for _, c := range tmpChks[1:] {
|
||||
currIsOOO := isOOOChunkID(chunks.HeadChunkID(c.Ref))
|
||||
if prevIsOOO == currIsOOO && c.MinTime > toBeMerged.MaxTime {
|
||||
// This chunk doesn't overlap and we are not switching between in-order
|
||||
// and out of order. Send current toBeMerged to output and start a new
|
||||
// one.
|
||||
if !prevIsOOO && !currIsOOO && c.MinTime > toBeMerged.MaxTime {
|
||||
// This in-order chunk doesn't overlap and we are not switching between
|
||||
// in-order and out of order. Send current toBeMerged to output and start
|
||||
// a new one.
|
||||
*chks = append(*chks, toBeMerged)
|
||||
toBeMerged = c
|
||||
if currIsOOO {
|
||||
toBeMerged.Chunk = &multiMeta{metas: []chunks.Meta{toBeMerged}}
|
||||
}
|
||||
} else {
|
||||
// Merge this chunk with existing toBeMerged.
|
||||
if mm, ok := toBeMerged.Chunk.(*multiMeta); ok {
|
||||
|
|
@ -186,60 +179,6 @@ type multiMeta struct {
|
|||
metas []chunks.Meta
|
||||
}
|
||||
|
||||
// Wrapper to return a chunk where the iterator clears
|
||||
// counter resets.
|
||||
type hintClearedChunk struct {
|
||||
chunkenc.Chunk
|
||||
}
|
||||
|
||||
func (h hintClearedChunk) Iterator(it chunkenc.Iterator) chunkenc.Iterator {
|
||||
if itt, ok := it.(hintClearedIterator); ok {
|
||||
itt.it = h.Chunk.Iterator(itt.it)
|
||||
return itt
|
||||
}
|
||||
return &hintClearedIterator{it: h.Chunk.Iterator(nil)}
|
||||
}
|
||||
|
||||
type hintClearedIterator struct {
|
||||
it chunkenc.Iterator
|
||||
}
|
||||
|
||||
func (h hintClearedIterator) Next() chunkenc.ValueType {
|
||||
return h.it.Next()
|
||||
}
|
||||
|
||||
func (h hintClearedIterator) Seek(t int64) chunkenc.ValueType {
|
||||
return h.it.Seek(t)
|
||||
}
|
||||
|
||||
func (h hintClearedIterator) AtT() int64 {
|
||||
return h.it.AtT()
|
||||
}
|
||||
|
||||
func (h hintClearedIterator) At() (int64, float64) {
|
||||
return h.it.At()
|
||||
}
|
||||
|
||||
func (h hintClearedIterator) AtHistogram(to *histogram.Histogram) (int64, *histogram.Histogram) {
|
||||
t, H := h.it.AtHistogram(to)
|
||||
if H != nil && H.CounterResetHint == histogram.CounterReset {
|
||||
H.CounterResetHint = histogram.UnknownCounterReset
|
||||
}
|
||||
return t, H
|
||||
}
|
||||
|
||||
func (h hintClearedIterator) AtFloatHistogram(to *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
|
||||
t, FH := h.it.AtFloatHistogram(to)
|
||||
if FH != nil && FH.CounterResetHint == histogram.CounterReset {
|
||||
FH.CounterResetHint = histogram.UnknownCounterReset
|
||||
}
|
||||
return t, FH
|
||||
}
|
||||
|
||||
func (h hintClearedIterator) Err() error {
|
||||
return h.it.Err()
|
||||
}
|
||||
|
||||
// LabelValues needs to be overridden from the headIndexReader implementation
|
||||
// so we can return labels within either in-order range or ooo range.
|
||||
func (oh *HeadAndOOOIndexReader) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, error) {
|
||||
|
|
@ -326,26 +265,6 @@ func (cr *HeadAndOOOChunkReader) chunkOrIterable(meta chunks.Meta, copyLastChunk
|
|||
return meta.Chunk, nil, meta.MaxTime, nil
|
||||
}
|
||||
|
||||
if len(mm.metas) == 1 {
|
||||
// Only one chunk in the multiMeta, no need to iterate, but we have to
|
||||
// clear the counter reset hint for histogram chunks.
|
||||
m := mm.metas[0]
|
||||
chk := m.Chunk
|
||||
if chk == nil {
|
||||
_, cid, isOOO := unpackHeadChunkRef(m.Ref)
|
||||
var err error
|
||||
chk, _, err = cr.head.chunkFromSeries(s, cid, isOOO, m.MinTime, m.MaxTime, isoState, copyLastChunk)
|
||||
if err != nil {
|
||||
return nil, nil, 0, fmt.Errorf("invalid head chunk: %w", err)
|
||||
}
|
||||
}
|
||||
if _, ok := chk.(*chunkenc.XORChunk); ok {
|
||||
return chk, nil, m.MaxTime, nil
|
||||
}
|
||||
hcc := &hintClearedChunk{Chunk: chk}
|
||||
return hcc, nil, m.MaxTime, nil
|
||||
}
|
||||
|
||||
// We have a composite meta: construct a composite iterable.
|
||||
mc := &mergedOOOChunks{}
|
||||
for _, m := range mm.metas {
|
||||
|
|
|
|||
|
|
@ -148,8 +148,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
|
|||
// Chunk 3: [-------------------]
|
||||
// Output Graphically [-----------------------------] [-----------------------------]
|
||||
expChunks: []expChunk{
|
||||
{c: chunkInterval{0, 100, 250}, m: []chunkInterval{{0, 100, 200}, {2, 150, 250}}},
|
||||
{c: chunkInterval{1, 500, 650}, m: []chunkInterval{{1, 500, 600}, {3, 550, 650}}},
|
||||
{c: chunkInterval{0, 100, 650}, m: []chunkInterval{{0, 100, 200}, {2, 150, 250}, {1, 500, 600}, {3, 550, 650}}},
|
||||
},
|
||||
},
|
||||
{
|
||||
|
|
@ -191,10 +190,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
|
|||
// Chunk 3: [------------------]
|
||||
// Output Graphically [------------------][------------------][------------------][------------------]
|
||||
expChunks: []expChunk{
|
||||
{c: chunkInterval{0, 100, 199}},
|
||||
{c: chunkInterval{1, 200, 299}},
|
||||
{c: chunkInterval{2, 300, 399}},
|
||||
{c: chunkInterval{3, 400, 499}},
|
||||
{c: chunkInterval{0, 100, 499}, m: []chunkInterval{{0, 100, 199}, {1, 200, 299}, {2, 300, 399}, {3, 400, 499}}},
|
||||
},
|
||||
},
|
||||
{
|
||||
|
|
@ -257,8 +253,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
|
|||
// Chunk 4: [---------------------------------------]
|
||||
// Output Graphically [---------------------------------------] [------------------------------------------------]
|
||||
expChunks: []expChunk{
|
||||
{c: chunkInterval{0, 100, 300}, m: []chunkInterval{{0, 100, 300}, {2, 150, 250}}},
|
||||
{c: chunkInterval{4, 600, 850}, m: []chunkInterval{{4, 600, 800}, {3, 650, 750}, {1, 770, 850}}},
|
||||
{c: chunkInterval{0, 100, 850}, m: []chunkInterval{{0, 100, 300}, {2, 150, 250}, {4, 600, 800}, {3, 650, 750}, {1, 770, 850}}},
|
||||
},
|
||||
},
|
||||
{
|
||||
|
|
@ -277,9 +272,7 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
|
|||
// Chunk 2: [--------]
|
||||
// Output Graphically [-------] [--------] [----------]
|
||||
expChunks: []expChunk{
|
||||
{c: chunkInterval{0, 100, 150}},
|
||||
{c: chunkInterval{2, 200, 250}},
|
||||
{c: chunkInterval{1, 300, 350}},
|
||||
{c: chunkInterval{0, 100, 350}, m: []chunkInterval{{0, 100, 150}, {2, 200, 250}, {1, 300, 350}}},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
@ -334,10 +327,6 @@ func TestOOOHeadIndexReader_Series(t *testing.T) {
|
|||
}
|
||||
chunk = mm
|
||||
}
|
||||
// OOO chunks are always wrapped in multiMeta.
|
||||
if len(e.m) == 0 {
|
||||
chunk = &multiMeta{metas: []chunks.Meta{{Ref: findID(e.c.ID), Chunk: chunk, MinTime: e.c.mint, MaxTime: e.c.maxt}}}
|
||||
}
|
||||
meta := chunks.Meta{
|
||||
Chunk: chunk,
|
||||
MinTime: e.c.mint,
|
||||
|
|
@ -620,8 +609,6 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
|
|||
scenario.sampleFunc(minutes(24), 1),
|
||||
scenario.sampleFunc(minutes(26), 1),
|
||||
scenario.sampleFunc(minutes(29), 1),
|
||||
},
|
||||
{
|
||||
scenario.sampleFunc(minutes(30), 2),
|
||||
scenario.sampleFunc(minutes(32), 2),
|
||||
scenario.sampleFunc(minutes(34), 2),
|
||||
|
|
@ -675,8 +662,6 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
|
|||
scenario.sampleFunc(minutes(24), 2),
|
||||
scenario.sampleFunc(minutes(26), 2),
|
||||
scenario.sampleFunc(minutes(29), 2),
|
||||
},
|
||||
{
|
||||
scenario.sampleFunc(minutes(30), 1),
|
||||
scenario.sampleFunc(minutes(32), 1),
|
||||
scenario.sampleFunc(minutes(34), 1),
|
||||
|
|
@ -690,7 +675,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
|
|||
},
|
||||
},
|
||||
{
|
||||
name: "If chunks are not overlapped they are not converged",
|
||||
name: "If chunks are not overlapped they are still converged",
|
||||
queryMinT: minutes(0),
|
||||
queryMaxT: minutes(100),
|
||||
firstInOrderSampleAt: minutes(120),
|
||||
|
|
@ -717,8 +702,7 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
|
|||
{Ts: minutes(40), V: 3},
|
||||
{Ts: minutes(42), V: 3},
|
||||
},
|
||||
expChunkError: false,
|
||||
expSingleChunks: true,
|
||||
expChunkError: false,
|
||||
// ts (in minutes) 0 10 20 30 40 50 60 70 80 90 100
|
||||
// Query Interval [------------------------------------------------------------------------------------------]
|
||||
// Chunk 0 [-------]
|
||||
|
|
@ -733,22 +717,16 @@ func testOOOHeadChunkReader_Chunk(t *testing.T, scenario sampleTypeScenario) {
|
|||
scenario.sampleFunc(minutes(14), 0),
|
||||
scenario.sampleFunc(minutes(16), 0),
|
||||
scenario.sampleFunc(minutes(18), 0),
|
||||
},
|
||||
{
|
||||
scenario.sampleFunc(minutes(20), 1),
|
||||
scenario.sampleFunc(minutes(22), 1),
|
||||
scenario.sampleFunc(minutes(24), 1),
|
||||
scenario.sampleFunc(minutes(26), 1),
|
||||
scenario.sampleFunc(minutes(28), 1),
|
||||
},
|
||||
{
|
||||
scenario.sampleFunc(minutes(30), 2),
|
||||
scenario.sampleFunc(minutes(32), 2),
|
||||
scenario.sampleFunc(minutes(34), 2),
|
||||
scenario.sampleFunc(minutes(36), 2),
|
||||
scenario.sampleFunc(minutes(38), 2),
|
||||
},
|
||||
{
|
||||
scenario.sampleFunc(minutes(40), 3),
|
||||
scenario.sampleFunc(minutes(42), 3),
|
||||
},
|
||||
|
|
|
|||
Loading…
Reference in a new issue