From d7f5129042fb49ca36ed32ddc4d2fad0d1c006f5 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Wed, 4 Jan 2023 15:30:06 +0530 Subject: [PATCH] tsdb: Add logic to determine appendable gauge float histograms This is to check if a gauge histogram can be appended to the given chunk. If not, it tells what changes to make to the chunk and the histogram if possible. Signed-off-by: Ganesh Vernekar --- tsdb/chunkenc/float_histogram.go | 46 ++++++- tsdb/chunkenc/float_histogram_test.go | 167 ++++++++++++++++++++++++++ tsdb/chunkenc/histogram.go | 4 +- tsdb/chunkenc/histogram_meta.go | 80 +++++++++++- tsdb/chunkenc/histogram_meta_test.go | 106 +++++++++++++--- 5 files changed, 375 insertions(+), 28 deletions(-) diff --git a/tsdb/chunkenc/float_histogram.go b/tsdb/chunkenc/float_histogram.go index 142dc42035..a4cb03a134 100644 --- a/tsdb/chunkenc/float_histogram.go +++ b/tsdb/chunkenc/float_histogram.go @@ -215,14 +215,10 @@ func (a *FloatHistogramAppender) AppendHistogram(int64, *histogram.Histogram) { // The chunk is not appendable in the following cases: // // • The schema has changed. -// // • The threshold for the zero bucket has changed. -// // • Any buckets have disappeared. -// // • There was a counter reset in the count of observations or in any bucket, // including the zero bucket. -// // • The last sample in the chunk was stale while the current sample is not stale. // // The method returns an additional boolean set to true if it is not appendable @@ -260,12 +256,12 @@ func (a *FloatHistogramAppender) Appendable(h *histogram.FloatHistogram) ( } var ok bool - positiveInterjections, ok = compareSpans(a.pSpans, h.PositiveSpans) + positiveInterjections, ok = forwardCompareSpans(a.pSpans, h.PositiveSpans) if !ok { counterReset = true return } - negativeInterjections, ok = compareSpans(a.nSpans, h.NegativeSpans) + negativeInterjections, ok = forwardCompareSpans(a.nSpans, h.NegativeSpans) if !ok { counterReset = true return @@ -281,6 +277,44 @@ func (a *FloatHistogramAppender) Appendable(h *histogram.FloatHistogram) ( return } +// AppendableGauge returns whether the chunk can be appended to, and if so +// whether: +// 1. Any recoding needs to happen to the chunk using the provided interjections +// (in case of any new buckets, positive or negative range, respectively). +// 2. Any recoding needs to happen for the histogram being appended, using the backward interjections +// (in case of any missing buckets, positive or negative range, respectively). +// +// The chunk is not appendable in the following cases: +// +// • The schema has changed. +// • The threshold for the zero bucket has changed. +// • The last sample in the chunk was stale while the current sample is not stale. +func (a *FloatHistogramAppender) AppendableGauge(h *histogram.FloatHistogram) ( + positiveInterjections, negativeInterjections []Interjection, + backwardPositiveInterjections, backwardNegativeInterjections []Interjection, + okToAppend bool, +) { + if value.IsStaleNaN(h.Sum) { + // This is a stale sample whose buckets and spans don't matter. + okToAppend = true + return + } + if value.IsStaleNaN(a.sum.value) { + // If the last sample was stale, then we can only accept stale + // samples in this chunk. + return + } + + if h.Schema != a.schema || h.ZeroThreshold != a.zThreshold { + return + } + + positiveInterjections, backwardPositiveInterjections = bidirectionalCompareSpans(a.pSpans, h.PositiveSpans) + negativeInterjections, backwardNegativeInterjections = bidirectionalCompareSpans(a.nSpans, h.NegativeSpans) + okToAppend = true + return +} + // counterResetInAnyFloatBucket returns true if there was a counter reset for any // bucket. This should be called only when the bucket layout is the same or new // buckets were added. It does not handle the case of buckets missing. diff --git a/tsdb/chunkenc/float_histogram_test.go b/tsdb/chunkenc/float_histogram_test.go index 9308c3b3d8..b326d4924a 100644 --- a/tsdb/chunkenc/float_histogram_test.go +++ b/tsdb/chunkenc/float_histogram_test.go @@ -357,3 +357,170 @@ func TestFloatHistogramChunkAppendable(t *testing.T) { require.True(t, cr) } } + +func TestFloatHistogramChunkAppendableGauge(t *testing.T) { + c := Chunk(NewFloatHistogramChunk()) + + // Create fresh appender and add the first histogram. + app, err := c.Appender() + require.NoError(t, err) + require.Equal(t, 0, c.NumSamples()) + + ts := int64(1234567890) + h1 := &histogram.FloatHistogram{ + Count: 5, + ZeroCount: 2, + Sum: 18.4, + ZeroThreshold: 1e-125, + Schema: 1, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 2, Length: 1}, + {Offset: 3, Length: 2}, + {Offset: 3, Length: 1}, + {Offset: 1, Length: 1}, + }, + PositiveBuckets: []float64{6, 3, 3, 2, 4, 5, 1}, + } + + app.AppendFloatHistogram(ts, h1.Copy()) + require.Equal(t, 1, c.NumSamples()) + + { // Schema change. + h2 := h1.Copy() + h2.Schema++ + hApp, _ := app.(*FloatHistogramAppender) + _, _, _, _, ok := hApp.AppendableGauge(h2) + require.False(t, ok) + } + + { // Zero threshold change. + h2 := h1.Copy() + h2.ZeroThreshold += 0.1 + hApp, _ := app.(*FloatHistogramAppender) + _, _, _, _, ok := hApp.AppendableGauge(h2) + require.False(t, ok) + } + + { // New histogram that has more buckets. + h2 := h1.Copy() + h2.PositiveSpans = []histogram.Span{ + {Offset: 0, Length: 3}, + {Offset: 1, Length: 1}, + {Offset: 1, Length: 4}, + {Offset: 3, Length: 3}, + } + h2.Count += 9 + h2.ZeroCount++ + h2.Sum = 30 + h2.PositiveBuckets = []float64{7, 5, 1, 3, 1, 0, 2, 5, 5, 0, 1} + + hApp, _ := app.(*FloatHistogramAppender) + pI, nI, pBackwardI, nBackwardI, ok := hApp.AppendableGauge(h2) + require.Greater(t, len(pI), 0) + require.Len(t, nI, 0) + require.Len(t, pBackwardI, 0) + require.Len(t, nBackwardI, 0) + require.True(t, ok) + } + + { // New histogram that has buckets missing. + h2 := h1.Copy() + h2.PositiveSpans = []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 2, Length: 1}, + {Offset: 3, Length: 1}, + {Offset: 4, Length: 1}, + {Offset: 1, Length: 1}, + } + h2.Count -= 4 + h2.Sum-- + h2.PositiveBuckets = []float64{6, 3, 3, 2, 5, 1} + + hApp, _ := app.(*FloatHistogramAppender) + pI, nI, pBackwardI, nBackwardI, ok := hApp.AppendableGauge(h2) + require.Len(t, pI, 0) + require.Len(t, nI, 0) + require.Greater(t, len(pBackwardI), 0) + require.Len(t, nBackwardI, 0) + require.True(t, ok) + } + + { // New histogram that has a bucket missing and new buckets. + h2 := h1.Copy() + h2.PositiveSpans = []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 5, Length: 2}, + {Offset: 3, Length: 1}, + {Offset: 1, Length: 1}, + } + h2.Sum = 21 + h2.PositiveBuckets = []float64{6, 3, 2, 4, 5, 1} + + hApp, _ := app.(*FloatHistogramAppender) + pI, nI, pBackwardI, nBackwardI, ok := hApp.AppendableGauge(h2) + require.Greater(t, len(pI), 0) + require.Greater(t, len(pBackwardI), 0) + require.Len(t, nI, 0) + require.Len(t, nBackwardI, 0) + require.True(t, ok) + } + + { // New histogram that has a counter reset while buckets are same. + h2 := h1.Copy() + h2.Sum = 23 + h2.PositiveBuckets = []float64{6, 2, 3, 2, 4, 5, 1} + + hApp, _ := app.(*FloatHistogramAppender) + pI, nI, pBackwardI, nBackwardI, ok := hApp.AppendableGauge(h2) + require.Len(t, pI, 0) + require.Len(t, nI, 0) + require.Len(t, pBackwardI, 0) + require.Len(t, nBackwardI, 0) + require.True(t, ok) + } + + { // New histogram that has a counter reset while new buckets were added. + h2 := h1.Copy() + h2.PositiveSpans = []histogram.Span{ + {Offset: 0, Length: 3}, + {Offset: 1, Length: 1}, + {Offset: 1, Length: 4}, + {Offset: 3, Length: 3}, + } + h2.Sum = 29 + h2.PositiveBuckets = []float64{7, 5, 1, 3, 1, 0, 2, 5, 5, 0, 0} + + hApp, _ := app.(*FloatHistogramAppender) + pI, nI, pBackwardI, nBackwardI, ok := hApp.AppendableGauge(h2) + require.Greater(t, len(pI), 0) + require.Len(t, nI, 0) + require.Len(t, pBackwardI, 0) + require.Len(t, nBackwardI, 0) + require.True(t, ok) + } + + { + // New histogram that has a counter reset while new buckets were + // added before the first bucket and reset on first bucket. + h2 := h1.Copy() + h2.PositiveSpans = []histogram.Span{ + {Offset: -3, Length: 2}, + {Offset: 1, Length: 2}, + {Offset: 2, Length: 1}, + {Offset: 3, Length: 2}, + {Offset: 3, Length: 1}, + {Offset: 1, Length: 1}, + } + h2.Sum = 26 + h2.PositiveBuckets = []float64{1, 2, 5, 3, 3, 2, 4, 5, 1} + + hApp, _ := app.(*FloatHistogramAppender) + pI, nI, pBackwardI, nBackwardI, ok := hApp.AppendableGauge(h2) + require.Greater(t, len(pI), 0) + require.Len(t, nI, 0) + require.Len(t, pBackwardI, 0) + require.Len(t, nBackwardI, 0) + require.True(t, ok) + } +} diff --git a/tsdb/chunkenc/histogram.go b/tsdb/chunkenc/histogram.go index c633c14204..9b26e5472b 100644 --- a/tsdb/chunkenc/histogram.go +++ b/tsdb/chunkenc/histogram.go @@ -286,12 +286,12 @@ func (a *HistogramAppender) Appendable(h *histogram.Histogram) ( } var ok bool - positiveInterjections, ok = compareSpans(a.pSpans, h.PositiveSpans) + positiveInterjections, ok = forwardCompareSpans(a.pSpans, h.PositiveSpans) if !ok { counterReset = true return } - negativeInterjections, ok = compareSpans(a.nSpans, h.NegativeSpans) + negativeInterjections, ok = forwardCompareSpans(a.nSpans, h.NegativeSpans) if !ok { counterReset = true return diff --git a/tsdb/chunkenc/histogram_meta.go b/tsdb/chunkenc/histogram_meta.go index 34768afb28..9629c72de7 100644 --- a/tsdb/chunkenc/histogram_meta.go +++ b/tsdb/chunkenc/histogram_meta.go @@ -191,7 +191,7 @@ type Interjection struct { num int } -// compareSpans returns the interjections to convert a slice of deltas to a new +// forwardCompareSpans returns the interjections to convert a slice of deltas to a new // slice representing an expanded set of buckets, or false if incompatible // (e.g. if buckets were removed). // @@ -226,11 +226,11 @@ type Interjection struct { // match a new span layout that adds buckets, we simply need to generate a list // of interjections. // -// Note: Within compareSpans we don't have to worry about the changes to the +// Note: Within forwardCompareSpans we don't have to worry about the changes to the // spans themselves, thanks to the iterators we get to work with the more useful // bucket indices (which of course directly correspond to the buckets we have to // adjust). -func compareSpans(a, b []histogram.Span) ([]Interjection, bool) { +func forwardCompareSpans(a, b []histogram.Span) (forward []Interjection, ok bool) { ai := newBucketIterator(a) bi := newBucketIterator(b) @@ -278,6 +278,80 @@ loop: return interjections, true } +// bidirectionalCompareSpans does everything that forwardCompareSpans does and +// also returns interjections in the other direction (i.e. buckets missing in b that are missing in a). +func bidirectionalCompareSpans(a, b []histogram.Span) (forward, backward []Interjection) { + ai := newBucketIterator(a) + bi := newBucketIterator(b) + + var interjections, bInterjections []Interjection + + // When inter.num becomes > 0, this becomes a valid interjection that + // should be yielded when we finish a streak of new buckets. + var inter, bInter Interjection + + av, aOK := ai.Next() + bv, bOK := bi.Next() +loop: + for { + switch { + case aOK && bOK: + switch { + case av == bv: // Both have an identical value. move on! + // Finish WIP interjection and reset. + if inter.num > 0 { + interjections = append(interjections, inter) + inter.num = 0 + } + if bInter.num > 0 { + bInterjections = append(bInterjections, bInter) + bInter.num = 0 + } + av, aOK = ai.Next() + bv, bOK = bi.Next() + inter.pos++ + bInter.pos++ + case av < bv: // b misses a value that is in a. + bInter.num++ + // Collect the forward interjection before advancing the + // position of 'a'. + if inter.num > 0 { + interjections = append(interjections, inter) + inter.num = 0 + } + inter.pos++ + av, aOK = ai.Next() + case av > bv: // a misses a value that is in b. Forward b and recompare. + inter.num++ + // Collect the backward interjection before advancing the + // position of 'b'. + if bInter.num > 0 { + bInterjections = append(bInterjections, bInter) + bInter.num = 0 + } + bInter.pos++ + bv, bOK = bi.Next() + } + case aOK && !bOK: // b misses a value that is in a. + bInter.num++ + av, aOK = ai.Next() + case !aOK && bOK: // a misses a value that is in b. Forward b and recompare. + inter.num++ + bv, bOK = bi.Next() + default: // Both iterators ran out. We're done. + if inter.num > 0 { + interjections = append(interjections, inter) + } + if bInter.num > 0 { + bInterjections = append(bInterjections, bInter) + } + break loop + } + } + + return interjections, bInterjections +} + // interject merges 'in' with the provided interjections and writes them into // 'out', which must already have the appropriate length. func interject[BV bucketValue](in, out []BV, interjections []Interjection, deltas bool) []BV { diff --git a/tsdb/chunkenc/histogram_meta_test.go b/tsdb/chunkenc/histogram_meta_test.go index 30d2eef3a8..a3cb31624a 100644 --- a/tsdb/chunkenc/histogram_meta_test.go +++ b/tsdb/chunkenc/histogram_meta_test.go @@ -111,13 +111,12 @@ func TestBucketIterator(t *testing.T) { } } -func TestInterjection(t *testing.T) { +func TestCompareSpansAndInterject(t *testing.T) { scenarios := []struct { - description string - spansA, spansB []histogram.Span - valid bool - interjections []Interjection - bucketsIn, bucketsOut []int64 + description string + spansA, spansB []histogram.Span + interjections, backwardInterjections []Interjection + bucketsIn, bucketsOut []int64 }{ { description: "single prepend at the beginning", @@ -127,7 +126,6 @@ func TestInterjection(t *testing.T) { spansB: []histogram.Span{ {Offset: -11, Length: 4}, }, - valid: true, interjections: []Interjection{ { pos: 0, @@ -145,7 +143,6 @@ func TestInterjection(t *testing.T) { spansB: []histogram.Span{ {Offset: -10, Length: 4}, }, - valid: true, interjections: []Interjection{ { pos: 3, @@ -163,7 +160,6 @@ func TestInterjection(t *testing.T) { spansB: []histogram.Span{ {Offset: -12, Length: 5}, }, - valid: true, interjections: []Interjection{ { pos: 0, @@ -181,7 +177,6 @@ func TestInterjection(t *testing.T) { spansB: []histogram.Span{ {Offset: -10, Length: 5}, }, - valid: true, interjections: []Interjection{ { pos: 3, @@ -199,7 +194,6 @@ func TestInterjection(t *testing.T) { spansB: []histogram.Span{ {Offset: -12, Length: 7}, }, - valid: true, interjections: []Interjection{ { pos: 0, @@ -221,7 +215,9 @@ func TestInterjection(t *testing.T) { spansB: []histogram.Span{ {Offset: -9, Length: 3}, }, - valid: false, + backwardInterjections: []Interjection{ + {pos: 0, num: 1}, + }, }, { description: "single removal of bucket in the middle", @@ -232,7 +228,9 @@ func TestInterjection(t *testing.T) { {Offset: -10, Length: 2}, {Offset: 1, Length: 1}, }, - valid: false, + backwardInterjections: []Interjection{ + {pos: 2, num: 1}, + }, }, { description: "single removal of bucket at the end", @@ -242,7 +240,9 @@ func TestInterjection(t *testing.T) { spansB: []histogram.Span{ {Offset: -10, Length: 3}, }, - valid: false, + backwardInterjections: []Interjection{ + {pos: 3, num: 1}, + }, }, { description: "as described in doc comment", @@ -259,7 +259,6 @@ func TestInterjection(t *testing.T) { {Offset: 1, Length: 4}, {Offset: 3, Length: 3}, }, - valid: true, interjections: []Interjection{ { pos: 2, @@ -277,12 +276,67 @@ func TestInterjection(t *testing.T) { bucketsIn: []int64{6, -3, 0, -1, 2, 1, -4}, bucketsOut: []int64{6, -3, -3, 3, -3, 0, 2, 2, 1, -5, 1}, }, + { + description: "both forward and backward interjections, complex case", + spansA: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 2, Length: 1}, + {Offset: 3, Length: 2}, + {Offset: 3, Length: 1}, + {Offset: 1, Length: 1}, + }, + spansB: []histogram.Span{ + {Offset: 1, Length: 2}, + {Offset: 1, Length: 1}, + {Offset: 1, Length: 2}, + {Offset: 1, Length: 1}, + {Offset: 4, Length: 1}, + }, + interjections: []Interjection{ + { + pos: 2, + num: 1, + }, + { + pos: 3, + num: 2, + }, + { + pos: 6, + num: 1, + }, + }, + backwardInterjections: []Interjection{ + { + pos: 0, + num: 1, + }, + { + pos: 5, + num: 1, + }, + { + pos: 6, + num: 1, + }, + { + pos: 7, + num: 1, + }, + }, + }, } for _, s := range scenarios { t.Run(s.description, func(t *testing.T) { - interjections, valid := compareSpans(s.spansA, s.spansB) - if !s.valid { + if len(s.backwardInterjections) > 0 { + interjections, bInterjections := bidirectionalCompareSpans(s.spansA, s.spansB) + require.Equal(t, s.interjections, interjections) + require.Equal(t, s.backwardInterjections, bInterjections) + } + + interjections, valid := forwardCompareSpans(s.spansA, s.spansB) + if len(s.backwardInterjections) > 0 { require.False(t, valid, "compareScan unexpectedly returned true") return } @@ -292,6 +346,24 @@ func TestInterjection(t *testing.T) { gotBuckets := make([]int64, len(s.bucketsOut)) interject(s.bucketsIn, gotBuckets, interjections, true) require.Equal(t, s.bucketsOut, gotBuckets) + + floatBucketsIn := make([]float64, len(s.bucketsIn)) + last := s.bucketsIn[0] + floatBucketsIn[0] = float64(last) + for i := 1; i < len(floatBucketsIn); i++ { + last += s.bucketsIn[i] + floatBucketsIn[i] = float64(last) + } + floatBucketsOut := make([]float64, len(s.bucketsOut)) + last = s.bucketsOut[0] + floatBucketsOut[0] = float64(last) + for i := 1; i < len(floatBucketsOut); i++ { + last += s.bucketsOut[i] + floatBucketsOut[i] = float64(last) + } + gotFloatBuckets := make([]float64, len(floatBucketsOut)) + interject(floatBucketsIn, gotFloatBuckets, interjections, false) + require.Equal(t, floatBucketsOut, gotFloatBuckets) }) } }