From 57c18420abbd34877bf57cd5bd1572d36e155e37 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Wed, 18 Jan 2023 13:47:22 +0100 Subject: [PATCH 1/3] histograms: General readability tweaks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Adjust doc comments to go1.19 style. - Break down some overly long lines. - Minor doc comment tweaks and fixes. - Some renaming. Some rationales for the last point: I have renamed “interjections” into “inserts”, mostly because it is shorter, and the word shows up a lot by now (and the concept is cryptic enough to not obfuscate it even more with abbreviations). I have also tried to find more descriptive naming for the “compare spans” functions. Signed-off-by: beorn7 --- tsdb/chunkenc/float_histogram.go | 55 ++++---- tsdb/chunkenc/float_histogram_test.go | 2 +- tsdb/chunkenc/histogram.go | 68 ++++----- tsdb/chunkenc/histogram_meta.go | 196 +++++++++++++------------- tsdb/chunkenc/histogram_meta_test.go | 58 ++++---- tsdb/chunkenc/histogram_test.go | 2 +- tsdb/head_append.go | 49 ++++--- 7 files changed, 218 insertions(+), 212 deletions(-) diff --git a/tsdb/chunkenc/float_histogram.go b/tsdb/chunkenc/float_histogram.go index 538af364ae..437a5712e2 100644 --- a/tsdb/chunkenc/float_histogram.go +++ b/tsdb/chunkenc/float_histogram.go @@ -217,10 +217,10 @@ func (a *FloatHistogramAppender) AppendHistogram(int64, *histogram.Histogram) { panic("appended an integer histogram to a float histogram chunk") } -// Appendable returns whether the chunk can be appended to, and if so -// whether any recoding needs to happen using the provided interjections -// (in case of any new buckets, positive or negative range, respectively). -// If the sample is a gauge histogram, AppendableGauge must be used instead. +// Appendable returns whether the chunk can be appended to, and if so whether +// any recoding needs to happen using the provided inserts (in case of any new +// buckets, positive or negative range, respectively). If the sample is a gauge +// histogram, AppendableGauge must be used instead. // // The chunk is not appendable in the following cases: // - The schema has changed. @@ -233,7 +233,7 @@ func (a *FloatHistogramAppender) AppendHistogram(int64, *histogram.Histogram) { // because of a counter reset. If the given sample is stale, it is always ok to // append. If counterReset is true, okToAppend is always false. func (a *FloatHistogramAppender) Appendable(h *histogram.FloatHistogram) ( - positiveInterjections, negativeInterjections []Interjection, + positiveInserts, negativeInserts []Insert, okToAppend, counterReset bool, ) { if a.NumSamples() > 0 && a.GetCounterResetHeader() == GaugeType { @@ -267,12 +267,12 @@ func (a *FloatHistogramAppender) Appendable(h *histogram.FloatHistogram) ( } var ok bool - positiveInterjections, ok = forwardCompareSpans(a.pSpans, h.PositiveSpans) + positiveInserts, ok = expandSpansForward(a.pSpans, h.PositiveSpans) if !ok { counterReset = true return } - negativeInterjections, ok = forwardCompareSpans(a.nSpans, h.NegativeSpans) + negativeInserts, ok = expandSpansForward(a.nSpans, h.NegativeSpans) if !ok { counterReset = true return @@ -280,7 +280,7 @@ func (a *FloatHistogramAppender) Appendable(h *histogram.FloatHistogram) ( if counterResetInAnyFloatBucket(a.pBuckets, h.PositiveBuckets, a.pSpans, h.PositiveSpans) || counterResetInAnyFloatBucket(a.nBuckets, h.NegativeBuckets, a.nSpans, h.NegativeSpans) { - counterReset, positiveInterjections, negativeInterjections = true, nil, nil + counterReset, positiveInserts, negativeInserts = true, nil, nil return } @@ -290,10 +290,11 @@ func (a *FloatHistogramAppender) Appendable(h *histogram.FloatHistogram) ( // AppendableGauge returns whether the chunk can be appended to, and if so // whether: -// 1. Any recoding needs to happen to the chunk using the provided interjections +// 1. Any recoding needs to happen to the chunk using the provided inserts // (in case of any new buckets, positive or negative range, respectively). -// 2. Any recoding needs to happen for the histogram being appended, using the backward interjections -// (in case of any missing buckets, positive or negative range, respectively). +// 2. Any recoding needs to happen for the histogram being appended, using the +// backward inserts (in case of any missing buckets, positive or negative +// range, respectively). // // This method must be only used for gauge histograms. // @@ -302,8 +303,8 @@ func (a *FloatHistogramAppender) Appendable(h *histogram.FloatHistogram) ( // - The threshold for the zero bucket has changed. // - The last sample in the chunk was stale while the current sample is not stale. func (a *FloatHistogramAppender) AppendableGauge(h *histogram.FloatHistogram) ( - positiveInterjections, negativeInterjections []Interjection, - backwardPositiveInterjections, backwardNegativeInterjections []Interjection, + positiveInserts, negativeInserts []Insert, + backwardPositiveInserts, backwardNegativeInserts []Insert, positiveSpans, negativeSpans []histogram.Span, okToAppend bool, ) { @@ -325,8 +326,8 @@ func (a *FloatHistogramAppender) AppendableGauge(h *histogram.FloatHistogram) ( return } - positiveInterjections, backwardPositiveInterjections, positiveSpans = bidirectionalCompareSpans(a.pSpans, h.PositiveSpans) - negativeInterjections, backwardNegativeInterjections, negativeSpans = bidirectionalCompareSpans(a.nSpans, h.NegativeSpans) + positiveInserts, backwardPositiveInserts, positiveSpans = expandSpansBothWays(a.pSpans, h.PositiveSpans) + negativeInserts, backwardNegativeInserts, negativeSpans = expandSpansBothWays(a.nSpans, h.NegativeSpans) okToAppend = true return } @@ -501,12 +502,12 @@ func (a *FloatHistogramAppender) writeXorValue(old *xorValue, v float64) { } // Recode converts the current chunk to accommodate an expansion of the set of -// (positive and/or negative) buckets used, according to the provided -// interjections, resulting in the honoring of the provided new positive and -// negative spans. To continue appending, use the returned Appender rather than -// the receiver of this method. +// (positive and/or negative) buckets used, according to the provided inserts, +// resulting in the honoring of the provided new positive and negative spans. To +// continue appending, use the returned Appender rather than the receiver of +// this method. func (a *FloatHistogramAppender) Recode( - positiveInterjections, negativeInterjections []Interjection, + positiveInserts, negativeInserts []Insert, positiveSpans, negativeSpans []histogram.Span, ) (Chunk, Appender) { // TODO(beorn7): This currently just decodes everything and then encodes @@ -539,11 +540,11 @@ func (a *FloatHistogramAppender) Recode( // Save the modified histogram to the new chunk. hOld.PositiveSpans, hOld.NegativeSpans = positiveSpans, negativeSpans - if len(positiveInterjections) > 0 { - hOld.PositiveBuckets = interject(hOld.PositiveBuckets, positiveBuckets, positiveInterjections, false) + if len(positiveInserts) > 0 { + hOld.PositiveBuckets = insert(hOld.PositiveBuckets, positiveBuckets, positiveInserts, false) } - if len(negativeInterjections) > 0 { - hOld.NegativeBuckets = interject(hOld.NegativeBuckets, negativeBuckets, negativeInterjections, false) + if len(negativeInserts) > 0 { + hOld.NegativeBuckets = insert(hOld.NegativeBuckets, negativeBuckets, negativeInserts, false) } app.AppendFloatHistogram(tOld, hOld) } @@ -556,15 +557,15 @@ func (a *FloatHistogramAppender) Recode( // (positive and/or negative) buckets used. func (a *FloatHistogramAppender) RecodeHistogramm( fh *histogram.FloatHistogram, - pBackwardInter, nBackwardInter []Interjection, + pBackwardInter, nBackwardInter []Insert, ) { if len(pBackwardInter) > 0 { numPositiveBuckets := countSpans(fh.PositiveSpans) - fh.PositiveBuckets = interject(fh.PositiveBuckets, make([]float64, numPositiveBuckets), pBackwardInter, false) + fh.PositiveBuckets = insert(fh.PositiveBuckets, make([]float64, numPositiveBuckets), pBackwardInter, false) } if len(nBackwardInter) > 0 { numNegativeBuckets := countSpans(fh.NegativeSpans) - fh.NegativeBuckets = interject(fh.NegativeBuckets, make([]float64, numNegativeBuckets), nBackwardInter, false) + fh.NegativeBuckets = insert(fh.NegativeBuckets, make([]float64, numNegativeBuckets), nBackwardInter, false) } } diff --git a/tsdb/chunkenc/float_histogram_test.go b/tsdb/chunkenc/float_histogram_test.go index b080fe676c..e945ce1e67 100644 --- a/tsdb/chunkenc/float_histogram_test.go +++ b/tsdb/chunkenc/float_histogram_test.go @@ -138,7 +138,7 @@ func TestFloatHistogramChunkSameBuckets(t *testing.T) { require.Equal(t, ValNone, it4.Seek(exp[len(exp)-1].t+1)) } -// Mimics the scenario described for compareSpans(). +// Mimics the scenario described for expandSpansForward. func TestFloatHistogramChunkBucketChanges(t *testing.T) { c := Chunk(NewFloatHistogramChunk()) diff --git a/tsdb/chunkenc/histogram.go b/tsdb/chunkenc/histogram.go index 6ceb866a52..dd50a01e8a 100644 --- a/tsdb/chunkenc/histogram.go +++ b/tsdb/chunkenc/histogram.go @@ -243,10 +243,10 @@ func (a *HistogramAppender) AppendFloatHistogram(int64, *histogram.FloatHistogra panic("appended a float histogram to a histogram chunk") } -// Appendable returns whether the chunk can be appended to, and if so -// whether any recoding needs to happen using the provided interjections -// (in case of any new buckets, positive or negative range, respectively). -// If the sample is a gauge histogram, AppendableGauge must be used instead. +// Appendable returns whether the chunk can be appended to, and if so whether +// any recoding needs to happen using the provided inserts (in case of any new +// buckets, positive or negative range, respectively). If the sample is a gauge +// histogram, AppendableGauge must be used instead. // // The chunk is not appendable in the following cases: // @@ -261,7 +261,7 @@ func (a *HistogramAppender) AppendFloatHistogram(int64, *histogram.FloatHistogra // because of a counter reset. If the given sample is stale, it is always ok to // append. If counterReset is true, okToAppend is always false. func (a *HistogramAppender) Appendable(h *histogram.Histogram) ( - positiveInterjections, negativeInterjections []Interjection, + positiveInserts, negativeInserts []Insert, okToAppend, counterReset bool, ) { if a.NumSamples() > 0 && a.GetCounterResetHeader() == GaugeType { @@ -295,12 +295,12 @@ func (a *HistogramAppender) Appendable(h *histogram.Histogram) ( } var ok bool - positiveInterjections, ok = forwardCompareSpans(a.pSpans, h.PositiveSpans) + positiveInserts, ok = expandSpansForward(a.pSpans, h.PositiveSpans) if !ok { counterReset = true return } - negativeInterjections, ok = forwardCompareSpans(a.nSpans, h.NegativeSpans) + negativeInserts, ok = expandSpansForward(a.nSpans, h.NegativeSpans) if !ok { counterReset = true return @@ -308,7 +308,7 @@ func (a *HistogramAppender) Appendable(h *histogram.Histogram) ( if counterResetInAnyBucket(a.pBuckets, h.PositiveBuckets, a.pSpans, h.PositiveSpans) || counterResetInAnyBucket(a.nBuckets, h.NegativeBuckets, a.nSpans, h.NegativeSpans) { - counterReset, positiveInterjections, negativeInterjections = true, nil, nil + counterReset, positiveInserts, negativeInserts = true, nil, nil return } @@ -318,10 +318,11 @@ func (a *HistogramAppender) Appendable(h *histogram.Histogram) ( // AppendableGauge returns whether the chunk can be appended to, and if so // whether: -// 1. Any recoding needs to happen to the chunk using the provided interjections +// 1. Any recoding needs to happen to the chunk using the provided inserts // (in case of any new buckets, positive or negative range, respectively). -// 2. Any recoding needs to happen for the histogram being appended, using the backward interjections -// (in case of any missing buckets, positive or negative range, respectively). +// 2. Any recoding needs to happen for the histogram being appended, using the +// backward inserts (in case of any missing buckets, positive or negative +// range, respectively). // // This method must be only used for gauge histograms. // @@ -330,8 +331,8 @@ func (a *HistogramAppender) Appendable(h *histogram.Histogram) ( // - The threshold for the zero bucket has changed. // - The last sample in the chunk was stale while the current sample is not stale. func (a *HistogramAppender) AppendableGauge(h *histogram.Histogram) ( - positiveInterjections, negativeInterjections []Interjection, - backwardPositiveInterjections, backwardNegativeInterjections []Interjection, + positiveInserts, negativeInserts []Insert, + backwardPositiveInserts, backwardNegativeInserts []Insert, positiveSpans, negativeSpans []histogram.Span, okToAppend bool, ) { @@ -353,8 +354,8 @@ func (a *HistogramAppender) AppendableGauge(h *histogram.Histogram) ( return } - positiveInterjections, backwardPositiveInterjections, positiveSpans = bidirectionalCompareSpans(a.pSpans, h.PositiveSpans) - negativeInterjections, backwardNegativeInterjections, negativeSpans = bidirectionalCompareSpans(a.nSpans, h.NegativeSpans) + positiveInserts, backwardPositiveInserts, positiveSpans = expandSpansBothWays(a.pSpans, h.PositiveSpans) + negativeInserts, backwardNegativeInserts, negativeSpans = expandSpansBothWays(a.nSpans, h.NegativeSpans) okToAppend = true return } @@ -488,8 +489,9 @@ func (a *HistogramAppender) AppendHistogram(t int64, h *histogram.Histogram) { putVarbitInt(a.b, b) } } else { - // The case for the 2nd sample with single deltas is implicitly handled correctly with the double delta code, - // so we don't need a separate single delta logic for the 2nd sample. + // The case for the 2nd sample with single deltas is implicitly + // handled correctly with the double delta code, so we don't + // need a separate single delta logic for the 2nd sample. tDelta = t - a.t cntDelta = int64(h.Count) - int64(a.cnt) @@ -539,12 +541,12 @@ func (a *HistogramAppender) AppendHistogram(t int64, h *histogram.Histogram) { } // Recode converts the current chunk to accommodate an expansion of the set of -// (positive and/or negative) buckets used, according to the provided -// interjections, resulting in the honoring of the provided new positive and -// negative spans. To continue appending, use the returned Appender rather than -// the receiver of this method. +// (positive and/or negative) buckets used, according to the provided inserts, +// resulting in the honoring of the provided new positive and negative spans. To +// continue appending, use the returned Appender rather than the receiver of +// this method. func (a *HistogramAppender) Recode( - positiveInterjections, negativeInterjections []Interjection, + positiveInserts, negativeInserts []Insert, positiveSpans, negativeSpans []histogram.Span, ) (Chunk, Appender) { // TODO(beorn7): This currently just decodes everything and then encodes @@ -577,11 +579,11 @@ func (a *HistogramAppender) Recode( // Save the modified histogram to the new chunk. hOld.PositiveSpans, hOld.NegativeSpans = positiveSpans, negativeSpans - if len(positiveInterjections) > 0 { - hOld.PositiveBuckets = interject(hOld.PositiveBuckets, positiveBuckets, positiveInterjections, true) + if len(positiveInserts) > 0 { + hOld.PositiveBuckets = insert(hOld.PositiveBuckets, positiveBuckets, positiveInserts, true) } - if len(negativeInterjections) > 0 { - hOld.NegativeBuckets = interject(hOld.NegativeBuckets, negativeBuckets, negativeInterjections, true) + if len(negativeInserts) > 0 { + hOld.NegativeBuckets = insert(hOld.NegativeBuckets, negativeBuckets, negativeInserts, true) } app.AppendHistogram(tOld, hOld) } @@ -590,19 +592,19 @@ func (a *HistogramAppender) Recode( return hc, app } -// RecodeHistogram converts the current histogram (in-place) to accommodate an expansion of the set of -// (positive and/or negative) buckets used. +// RecodeHistogram converts the current histogram (in-place) to accommodate an +// expansion of the set of (positive and/or negative) buckets used. func (a *HistogramAppender) RecodeHistogram( h *histogram.Histogram, - pBackwardInter, nBackwardInter []Interjection, + pBackwardInserts, nBackwardInserts []Insert, ) { - if len(pBackwardInter) > 0 { + if len(pBackwardInserts) > 0 { numPositiveBuckets := countSpans(h.PositiveSpans) - h.PositiveBuckets = interject(h.PositiveBuckets, make([]int64, numPositiveBuckets), pBackwardInter, true) + h.PositiveBuckets = insert(h.PositiveBuckets, make([]int64, numPositiveBuckets), pBackwardInserts, true) } - if len(nBackwardInter) > 0 { + if len(nBackwardInserts) > 0 { numNegativeBuckets := countSpans(h.NegativeSpans) - h.NegativeBuckets = interject(h.NegativeBuckets, make([]int64, numNegativeBuckets), nBackwardInter, true) + h.NegativeBuckets = insert(h.NegativeBuckets, make([]int64, numNegativeBuckets), nBackwardInserts, true) } } diff --git a/tsdb/chunkenc/histogram_meta.go b/tsdb/chunkenc/histogram_meta.go index 345b8cc519..ca60f350bc 100644 --- a/tsdb/chunkenc/histogram_meta.go +++ b/tsdb/chunkenc/histogram_meta.go @@ -19,7 +19,10 @@ import ( "github.com/prometheus/prometheus/model/histogram" ) -func writeHistogramChunkLayout(b *bstream, schema int32, zeroThreshold float64, positiveSpans, negativeSpans []histogram.Span) { +func writeHistogramChunkLayout( + b *bstream, schema int32, zeroThreshold float64, + positiveSpans, negativeSpans []histogram.Span, +) { putZeroThreshold(b, zeroThreshold) putVarbitInt(b, int64(schema)) putHistogramChunkLayoutSpans(b, positiveSpans) @@ -91,9 +94,7 @@ func readHistogramChunkLayoutSpans(b *bstreamReader) ([]histogram.Span, error) { // putZeroThreshold writes the zero threshold to the bstream. It stores typical // values in just one byte, but needs 9 bytes for other values. In detail: -// -// * If the threshold is 0, store a single zero byte. -// +// - If the threshold is 0, store a single zero byte. // - If the threshold is a power of 2 between (and including) 2^-243 and 2^10, // take the exponent from the IEEE 754 representation of the threshold, which // covers a range between (and including) -242 and 11. (2^-243 is 0.5*2^-242 @@ -103,7 +104,6 @@ func readHistogramChunkLayoutSpans(b *bstreamReader) ([]histogram.Span, error) { // threshold. The default value for the zero threshold is 2^-128 (or // 0.5*2^-127 in IEEE 754 representation) and will therefore be encoded as a // single byte (with value 116). -// // - In all other cases, store 255 as a single byte, followed by the 8 bytes of // the threshold as a float64, i.e. taking 9 bytes in total. func putZeroThreshold(b *bstream, threshold float64) { @@ -186,16 +186,16 @@ func (b *bucketIterator) Next() (int, bool) { return 0, false } -// An Interjection describes how many new buckets have to be introduced before -// processing the pos'th delta from the original slice. -type Interjection struct { +// An Insert describes how many new buckets have to be inserted before +// processing the pos'th bucket from the original slice. +type Insert struct { pos int num int } -// forwardCompareSpans returns the interjections to convert a slice of deltas to a new -// slice representing an expanded set of buckets, or false if incompatible -// (e.g. if buckets were removed). +// expandSpansForward returns the inserts to expand the bucket spans 'a' so that +// they match the spans in 'b'. 'b' must cover the same or more buckets than +// 'a', otherwise the function will return false. // // Example: // @@ -222,25 +222,25 @@ type Interjection struct { // deltas 6 -3 -3 3 -3 0 2 2 1 -5 1 // delta mods: / \ / \ / \ // -// Note that whenever any new buckets are introduced, the subsequent "old" -// bucket needs to readjust its delta to the new base of 0. Thus, for the caller -// who wants to transform the set of original deltas to a new set of deltas to -// match a new span layout that adds buckets, we simply need to generate a list -// of interjections. +// Note for histograms with delta-encoded buckets: Whenever any new buckets are +// introduced, the subsequent "old" bucket needs to readjust its delta to the +// new base of 0. Thus, for the caller who wants to transform the set of +// original deltas to a new set of deltas to match a new span layout that adds +// buckets, we simply need to generate a list of inserts. // -// Note: Within forwardCompareSpans we don't have to worry about the changes to the +// Note: Within expandSpansForward we don't have to worry about the changes to the // spans themselves, thanks to the iterators we get to work with the more useful // bucket indices (which of course directly correspond to the buckets we have to // adjust). -func forwardCompareSpans(a, b []histogram.Span) (forward []Interjection, ok bool) { +func expandSpansForward(a, b []histogram.Span) (forward []Insert, ok bool) { ai := newBucketIterator(a) bi := newBucketIterator(b) - var interjections []Interjection + var inserts []Insert - // When inter.num becomes > 0, this becomes a valid interjection that - // should be yielded when we finish a streak of new buckets. - var inter Interjection + // When inter.num becomes > 0, this becomes a valid insert that should + // be yielded when we finish a streak of new buckets. + var inter Insert av, aOK := ai.Next() bv, bOK := bi.Next() @@ -250,43 +250,46 @@ loop: case aOK && bOK: switch { case av == bv: // Both have an identical value. move on! - // Finish WIP interjection and reset. + // Finish WIP insert and reset. if inter.num > 0 { - interjections = append(interjections, inter) + inserts = append(inserts, inter) } inter.num = 0 av, aOK = ai.Next() bv, bOK = bi.Next() inter.pos++ case av < bv: // b misses a value that is in a. - return interjections, false + return inserts, false case av > bv: // a misses a value that is in b. Forward b and recompare. inter.num++ bv, bOK = bi.Next() } case aOK && !bOK: // b misses a value that is in a. - return interjections, false + return inserts, false case !aOK && bOK: // a misses a value that is in b. Forward b and recompare. inter.num++ bv, bOK = bi.Next() default: // Both iterators ran out. We're done. if inter.num > 0 { - interjections = append(interjections, inter) + inserts = append(inserts, inter) } break loop } } - return interjections, true + return inserts, true } -// bidirectionalCompareSpans does everything that forwardCompareSpans does and -// also returns interjections in the other direction (i.e. buckets missing in b that are missing in a). -func bidirectionalCompareSpans(a, b []histogram.Span) (forward, backward []Interjection, mergedSpans []histogram.Span) { +// expandSpansBothWays is similar to expandSpansForward, but now b may also +// cover an entirely different set of buckets. The function returns the +// “forward” inserts to expand 'a' to also cover all the buckets exclusively +// covered by 'b', and it returns the “backward” inserts to expand 'b' to also +// cover all the buckets exclusively covered by 'a' +func expandSpansBothWays(a, b []histogram.Span) (forward, backward []Insert, mergedSpans []histogram.Span) { ai := newBucketIterator(a) bi := newBucketIterator(b) - var interjections, bInterjections []Interjection + var fInserts, bInserts []Insert var lastBucket int addBucket := func(b int) { offset := b - lastBucket - 1 @@ -305,9 +308,10 @@ func bidirectionalCompareSpans(a, b []histogram.Span) (forward, backward []Inter lastBucket = b } - // When inter.num becomes > 0, this becomes a valid interjection that - // should be yielded when we finish a streak of new buckets. - var inter, bInter Interjection + // When fInter.num (or bInter.num, respectively) becomes > 0, this + // becomes a valid insert that should be yielded when we finish a streak + // of new buckets. + var fInter, bInter Insert av, aOK := ai.Next() bv, bOK := bi.Next() @@ -317,37 +321,37 @@ loop: case aOK && bOK: switch { case av == bv: // Both have an identical value. move on! - // Finish WIP interjection and reset. - if inter.num > 0 { - interjections = append(interjections, inter) - inter.num = 0 + // Finish WIP insert and reset. + if fInter.num > 0 { + fInserts = append(fInserts, fInter) + fInter.num = 0 } if bInter.num > 0 { - bInterjections = append(bInterjections, bInter) + bInserts = append(bInserts, bInter) bInter.num = 0 } addBucket(av) av, aOK = ai.Next() bv, bOK = bi.Next() - inter.pos++ + fInter.pos++ bInter.pos++ case av < bv: // b misses a value that is in a. bInter.num++ - // Collect the forward interjection before advancing the - // position of 'a'. - if inter.num > 0 { - interjections = append(interjections, inter) - inter.num = 0 + // Collect the forward inserts before advancing + // the position of 'a'. + if fInter.num > 0 { + fInserts = append(fInserts, fInter) + fInter.num = 0 } addBucket(av) - inter.pos++ + fInter.pos++ av, aOK = ai.Next() case av > bv: // a misses a value that is in b. Forward b and recompare. - inter.num++ - // Collect the backward interjection before advancing the + fInter.num++ + // Collect the backward inserts before advancing the // position of 'b'. if bInter.num > 0 { - bInterjections = append(bInterjections, bInter) + bInserts = append(bInserts, bInter) bInter.num = 0 } addBucket(bv) @@ -359,92 +363,92 @@ loop: addBucket(av) av, aOK = ai.Next() case !aOK && bOK: // a misses a value that is in b. Forward b and recompare. - inter.num++ + fInter.num++ addBucket(bv) bv, bOK = bi.Next() default: // Both iterators ran out. We're done. - if inter.num > 0 { - interjections = append(interjections, inter) + if fInter.num > 0 { + fInserts = append(fInserts, fInter) } if bInter.num > 0 { - bInterjections = append(bInterjections, bInter) + bInserts = append(bInserts, bInter) } break loop } } - return interjections, bInterjections, mergedSpans + return fInserts, bInserts, mergedSpans } type bucketValue interface { int64 | float64 } -// interject merges 'in' with the provided interjections and writes them into -// 'out', which must already have the appropriate length. -func interject[BV bucketValue](in, out []BV, interjections []Interjection, deltas bool) []BV { +// insert merges 'in' with the provided inserts and writes them into 'out', +// which must already have the appropriate length. 'out' is also returned for +// convenience. +func insert[BV bucketValue](in, out []BV, inserts []Insert, deltas bool) []BV { var ( - j int // Position in out. - v BV // The last value seen. - interj int // The next interjection to process. + oi int // Position in out. + v BV // The last value seen. + ii int // The next insert to process. ) for i, d := range in { - if interj < len(interjections) && i == interjections[interj].pos { - - // We have an interjection! - // Add interjection.num new delta values such that their bucket values equate 0. - // When deltas==false, it means that it is an absolute value. So we set it to 0 directly. + if ii < len(inserts) && i == inserts[ii].pos { + // We have an insert! + // Add insert.num new delta values such that their + // bucket values equate 0. When deltas==false, it means + // that it is an absolute value. So we set it to 0 + // directly. if deltas { - out[j] = -v + out[oi] = -v } else { - out[j] = 0 + out[oi] = 0 } - j++ - for x := 1; x < interjections[interj].num; x++ { - out[j] = 0 - j++ + oi++ + for x := 1; x < inserts[ii].num; x++ { + out[oi] = 0 + oi++ } - interj++ + ii++ // Now save the value from the input. The delta value we // should save is the original delta value + the last - // value of the point before the interjection (to undo - // the delta that was introduced by the interjection). - // When deltas==false, it means that it is an absolute value, + // value of the point before the insert (to undo the + // delta that was introduced by the insert). When + // deltas==false, it means that it is an absolute value, // so we set it directly to the value in the 'in' slice. if deltas { - out[j] = d + v + out[oi] = d + v } else { - out[j] = d + out[oi] = d } - j++ + oi++ v = d + v continue } - - // If there was no interjection, the original delta is still - // valid. - out[j] = d - j++ + // If there was no insert, the original delta is still valid. + out[oi] = d + oi++ v += d } - switch interj { - case len(interjections): - // All interjections processed. Nothing more to do. - case len(interjections) - 1: - // One more interjection to process at the end. + switch ii { + case len(inserts): + // All inserts processed. Nothing more to do. + case len(inserts) - 1: + // One more insert to process at the end. if deltas { - out[j] = -v + out[oi] = -v } else { - out[j] = 0 + out[oi] = 0 } - j++ - for x := 1; x < interjections[interj].num; x++ { - out[j] = 0 - j++ + oi++ + for x := 1; x < inserts[ii].num; x++ { + out[oi] = 0 + oi++ } default: - panic("unprocessed interjections left") + panic("unprocessed inserts left") } return out } diff --git a/tsdb/chunkenc/histogram_meta_test.go b/tsdb/chunkenc/histogram_meta_test.go index a4ce62f3b7..0b2b187475 100644 --- a/tsdb/chunkenc/histogram_meta_test.go +++ b/tsdb/chunkenc/histogram_meta_test.go @@ -78,7 +78,7 @@ func TestBucketIterator(t *testing.T) { }, idxs: []int{100, 101, 102, 103, 112, 113, 114, 115, 116, 117, 118, 119}, }, - // The below 2 sets ore the ones described in compareSpans's comments. + // The below 2 sets ore the ones described in expandSpansForward's comments. { spans: []histogram.Span{ {Offset: 0, Length: 2}, @@ -111,12 +111,12 @@ func TestBucketIterator(t *testing.T) { } } -func TestCompareSpansAndInterject(t *testing.T) { +func TestCompareSpansAndInsert(t *testing.T) { scenarios := []struct { - description string - spansA, spansB []histogram.Span - interjections, backwardInterjections []Interjection - bucketsIn, bucketsOut []int64 + description string + spansA, spansB []histogram.Span + fInserts, bInserts []Insert + bucketsIn, bucketsOut []int64 }{ { description: "single prepend at the beginning", @@ -126,7 +126,7 @@ func TestCompareSpansAndInterject(t *testing.T) { spansB: []histogram.Span{ {Offset: -11, Length: 4}, }, - interjections: []Interjection{ + fInserts: []Insert{ { pos: 0, num: 1, @@ -143,7 +143,7 @@ func TestCompareSpansAndInterject(t *testing.T) { spansB: []histogram.Span{ {Offset: -10, Length: 4}, }, - interjections: []Interjection{ + fInserts: []Insert{ { pos: 3, num: 1, @@ -160,7 +160,7 @@ func TestCompareSpansAndInterject(t *testing.T) { spansB: []histogram.Span{ {Offset: -12, Length: 5}, }, - interjections: []Interjection{ + fInserts: []Insert{ { pos: 0, num: 2, @@ -177,7 +177,7 @@ func TestCompareSpansAndInterject(t *testing.T) { spansB: []histogram.Span{ {Offset: -10, Length: 5}, }, - interjections: []Interjection{ + fInserts: []Insert{ { pos: 3, num: 2, @@ -194,7 +194,7 @@ func TestCompareSpansAndInterject(t *testing.T) { spansB: []histogram.Span{ {Offset: -12, Length: 7}, }, - interjections: []Interjection{ + fInserts: []Insert{ { pos: 0, num: 2, @@ -215,7 +215,7 @@ func TestCompareSpansAndInterject(t *testing.T) { spansB: []histogram.Span{ {Offset: -9, Length: 3}, }, - backwardInterjections: []Interjection{ + bInserts: []Insert{ {pos: 0, num: 1}, }, }, @@ -228,7 +228,7 @@ func TestCompareSpansAndInterject(t *testing.T) { {Offset: -10, Length: 2}, {Offset: 1, Length: 1}, }, - backwardInterjections: []Interjection{ + bInserts: []Insert{ {pos: 2, num: 1}, }, }, @@ -240,7 +240,7 @@ func TestCompareSpansAndInterject(t *testing.T) { spansB: []histogram.Span{ {Offset: -10, Length: 3}, }, - backwardInterjections: []Interjection{ + bInserts: []Insert{ {pos: 3, num: 1}, }, }, @@ -259,7 +259,7 @@ func TestCompareSpansAndInterject(t *testing.T) { {Offset: 1, Length: 4}, {Offset: 3, Length: 3}, }, - interjections: []Interjection{ + fInserts: []Insert{ { pos: 2, num: 1, @@ -277,7 +277,7 @@ func TestCompareSpansAndInterject(t *testing.T) { bucketsOut: []int64{6, -3, -3, 3, -3, 0, 2, 2, 1, -5, 1}, }, { - description: "both forward and backward interjections, complex case", + description: "both forward and backward inserts, complex case", spansA: []histogram.Span{ {Offset: 0, Length: 2}, {Offset: 2, Length: 1}, @@ -292,7 +292,7 @@ func TestCompareSpansAndInterject(t *testing.T) { {Offset: 1, Length: 1}, {Offset: 4, Length: 1}, }, - interjections: []Interjection{ + fInserts: []Insert{ { pos: 2, num: 1, @@ -306,7 +306,7 @@ func TestCompareSpansAndInterject(t *testing.T) { num: 1, }, }, - backwardInterjections: []Interjection{ + bInserts: []Insert{ { pos: 0, num: 1, @@ -329,22 +329,22 @@ func TestCompareSpansAndInterject(t *testing.T) { for _, s := range scenarios { t.Run(s.description, func(t *testing.T) { - if len(s.backwardInterjections) > 0 { - interjections, bInterjections, _ := bidirectionalCompareSpans(s.spansA, s.spansB) - require.Equal(t, s.interjections, interjections) - require.Equal(t, s.backwardInterjections, bInterjections) + if len(s.bInserts) > 0 { + fInserts, bInserts, _ := expandSpansBothWays(s.spansA, s.spansB) + require.Equal(t, s.fInserts, fInserts) + require.Equal(t, s.bInserts, bInserts) } - interjections, valid := forwardCompareSpans(s.spansA, s.spansB) - if len(s.backwardInterjections) > 0 { + inserts, valid := expandSpansForward(s.spansA, s.spansB) + if len(s.bInserts) > 0 { require.False(t, valid, "compareScan unexpectedly returned true") return } require.True(t, valid, "compareScan unexpectedly returned false") - require.Equal(t, s.interjections, interjections) + require.Equal(t, s.fInserts, inserts) gotBuckets := make([]int64, len(s.bucketsOut)) - interject(s.bucketsIn, gotBuckets, interjections, true) + insert(s.bucketsIn, gotBuckets, inserts, true) require.Equal(t, s.bucketsOut, gotBuckets) floatBucketsIn := make([]float64, len(s.bucketsIn)) @@ -362,7 +362,7 @@ func TestCompareSpansAndInterject(t *testing.T) { floatBucketsOut[i] = float64(last) } gotFloatBuckets := make([]float64, len(floatBucketsOut)) - interject(floatBucketsIn, gotFloatBuckets, interjections, false) + insert(floatBucketsIn, gotFloatBuckets, inserts, false) require.Equal(t, floatBucketsOut, gotFloatBuckets) }) } @@ -564,12 +564,12 @@ func TestSpansFromBidirectionalCompareSpans(t *testing.T) { copy(s1c, c.s1) copy(s2c, c.s2) - _, _, act := bidirectionalCompareSpans(c.s1, c.s2) + _, _, act := expandSpansBothWays(c.s1, c.s2) require.Equal(t, c.exp, act) // Check that s1 and s2 are not modified. require.Equal(t, s1c, c.s1) require.Equal(t, s2c, c.s2) - _, _, act = bidirectionalCompareSpans(c.s2, c.s1) + _, _, act = expandSpansBothWays(c.s2, c.s1) require.Equal(t, c.exp, act) } } diff --git a/tsdb/chunkenc/histogram_test.go b/tsdb/chunkenc/histogram_test.go index 73851c9dfb..c5b6faa2a0 100644 --- a/tsdb/chunkenc/histogram_test.go +++ b/tsdb/chunkenc/histogram_test.go @@ -149,7 +149,7 @@ func TestHistogramChunkSameBuckets(t *testing.T) { require.Equal(t, ValNone, it4.Seek(exp[len(exp)-1].t+1)) } -// Mimics the scenario described for compareSpans(). +// Mimics the scenario described for expandSpansForward. func TestHistogramChunkBucketChanges(t *testing.T) { c := Chunk(NewHistogramChunk()) diff --git a/tsdb/head_append.go b/tsdb/head_append.go index d4534c6695..f3585a87be 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -1141,7 +1141,6 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper // appendHistogram adds the histogram. // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. -// TODO(codesome): Support gauge histograms here. func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64) (sampleInOrder, chunkCreated bool) { // Head controls the execution of recoding, so that we own the proper // chunk reference afterwards. We check for Appendable from appender before @@ -1150,10 +1149,10 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui // meta properly. app, _ := s.app.(*chunkenc.HistogramAppender) var ( - positiveInterjections, negativeInterjections []chunkenc.Interjection - pBackwardInter, nBackwardInter []chunkenc.Interjection - pMergedSpans, nMergedSpans []histogram.Span - okToAppend, counterReset bool + pForwardInserts, nForwardInserts []chunkenc.Insert + pBackwardInserts, nBackwardInserts []chunkenc.Insert + pMergedSpans, nMergedSpans []histogram.Span + okToAppend, counterReset bool ) c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncHistogram, chunkDiskMapper, chunkRange) if !sampleInOrder { @@ -1162,32 +1161,32 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui gauge := h.CounterResetHint == histogram.GaugeType if app != nil { if gauge { - positiveInterjections, negativeInterjections, pBackwardInter, nBackwardInter, pMergedSpans, nMergedSpans, okToAppend = app.AppendableGauge(h) + pForwardInserts, nForwardInserts, pBackwardInserts, nBackwardInserts, pMergedSpans, nMergedSpans, okToAppend = app.AppendableGauge(h) } else { - positiveInterjections, negativeInterjections, okToAppend, counterReset = app.Appendable(h) + pForwardInserts, nForwardInserts, okToAppend, counterReset = app.Appendable(h) } } if !chunkCreated { - if len(pBackwardInter)+len(nBackwardInter) > 0 { + if len(pBackwardInserts)+len(nBackwardInserts) > 0 { h.PositiveSpans = pMergedSpans h.NegativeSpans = nMergedSpans - app.RecodeHistogram(h, pBackwardInter, nBackwardInter) + app.RecodeHistogram(h, pBackwardInserts, nBackwardInserts) } // We have 3 cases here // - !okToAppend -> We need to cut a new chunk. - // - okToAppend but we have interjections → Existing chunk needs + // - okToAppend but we have inserts → Existing chunk needs // recoding before we can append our histogram. - // - okToAppend and no interjections → Chunk is ready to support our histogram. + // - okToAppend and no inserts → Chunk is ready to support our histogram. if !okToAppend || counterReset { c = s.cutNewHeadChunk(t, chunkenc.EncHistogram, chunkDiskMapper, chunkRange) chunkCreated = true - } else if len(positiveInterjections) > 0 || len(negativeInterjections) > 0 { + } else if len(pForwardInserts) > 0 || len(nForwardInserts) > 0 { // New buckets have appeared. We need to recode all // prior histogram samples within the chunk before we // can process this one. chunk, app := app.Recode( - positiveInterjections, negativeInterjections, + pForwardInserts, nForwardInserts, h.PositiveSpans, h.NegativeSpans, ) c.chunk = chunk @@ -1233,10 +1232,10 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, // meta properly. app, _ := s.app.(*chunkenc.FloatHistogramAppender) var ( - positiveInterjections, negativeInterjections []chunkenc.Interjection - pBackwardInter, nBackwardInter []chunkenc.Interjection - pMergedSpans, nMergedSpans []histogram.Span - okToAppend, counterReset bool + pForwardInserts, nForwardInserts []chunkenc.Insert + pBackwardInserts, nBackwardInserts []chunkenc.Insert + pMergedSpans, nMergedSpans []histogram.Span + okToAppend, counterReset bool ) c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncFloatHistogram, chunkDiskMapper, chunkRange) if !sampleInOrder { @@ -1245,33 +1244,33 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, gauge := fh.CounterResetHint == histogram.GaugeType if app != nil { if gauge { - positiveInterjections, negativeInterjections, pBackwardInter, nBackwardInter, + pForwardInserts, nForwardInserts, pBackwardInserts, nBackwardInserts, pMergedSpans, nMergedSpans, okToAppend = app.AppendableGauge(fh) } else { - positiveInterjections, negativeInterjections, okToAppend, counterReset = app.Appendable(fh) + pForwardInserts, nForwardInserts, okToAppend, counterReset = app.Appendable(fh) } } if !chunkCreated { - if len(pBackwardInter)+len(nBackwardInter) > 0 { + if len(pBackwardInserts)+len(nBackwardInserts) > 0 { fh.PositiveSpans = pMergedSpans fh.NegativeSpans = nMergedSpans - app.RecodeHistogramm(fh, pBackwardInter, nBackwardInter) + app.RecodeHistogramm(fh, pBackwardInserts, nBackwardInserts) } // We have 3 cases here // - !okToAppend -> We need to cut a new chunk. - // - okToAppend but we have interjections → Existing chunk needs + // - okToAppend but we have inserts → Existing chunk needs // recoding before we can append our histogram. - // - okToAppend and no interjections → Chunk is ready to support our histogram. + // - okToAppend and no inserts → Chunk is ready to support our histogram. if !okToAppend || counterReset { c = s.cutNewHeadChunk(t, chunkenc.EncFloatHistogram, chunkDiskMapper, chunkRange) chunkCreated = true - } else if len(positiveInterjections) > 0 || len(negativeInterjections) > 0 { + } else if len(pForwardInserts) > 0 || len(nForwardInserts) > 0 { // New buckets have appeared. We need to recode all // prior histogram samples within the chunk before we // can process this one. chunk, app := app.Recode( - positiveInterjections, negativeInterjections, + pForwardInserts, nForwardInserts, fh.PositiveSpans, fh.NegativeSpans, ) c.chunk = chunk From 1cfc8f65a3057dfafae63b2a8d7231d0cab96c83 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Wed, 18 Jan 2023 17:59:29 +0100 Subject: [PATCH 2/3] histograms: Return actually useful counter reset hints This is a bit more conservative than we could be. As long as a chunk isn't the first in a block, we can be pretty sure that the previous chunk won't disappear. However, the incremental gain of returning NotCounterReset in these cases is probably very small and might not be worth the code complications. Wwith this, we now also pay attention to an explicitly set counter reset during ingestion. While the case doesn't show up in practice yet, there could be scenarios where the metric source knows there was a counter reset even if it might not be visible from the values in the histogram. It is also useful for testing. Signed-off-by: beorn7 --- promql/engine_test.go | 52 ++++++------ storage/merge.go | 33 ++++++- tsdb/block_test.go | 17 ++++ tsdb/chunkenc/float_histogram.go | 6 +- tsdb/chunkenc/float_histogram_test.go | 12 ++- tsdb/chunkenc/histogram.go | 12 +-- tsdb/chunkenc/histogram_meta.go | 35 ++++++++ tsdb/chunkenc/histogram_test.go | 34 +++++--- tsdb/compact_test.go | 20 ++++- tsdb/db_test.go | 118 ++++++++++++++++++++------ tsdb/head.go | 18 ++-- tsdb/head_append.go | 45 +++++++--- tsdb/head_test.go | 95 ++++++++++++++------- 13 files changed, 355 insertions(+), 142 deletions(-) diff --git a/promql/engine_test.go b/promql/engine_test.go index 0582fd5a4e..a7aaedd691 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -3125,7 +3125,7 @@ func TestRangeQuery(t *testing.T) { } } -func TestSparseHistogramRate(t *testing.T) { +func TestNativeHistogramRate(t *testing.T) { // TODO(beorn7): Integrate histograms into the PromQL testing framework // and write more tests there. test, err := NewTest(t, "") @@ -3155,20 +3155,22 @@ func TestSparseHistogramRate(t *testing.T) { require.Len(t, vector, 1) actualHistogram := vector[0].H expectedHistogram := &histogram.FloatHistogram{ - Schema: 1, - ZeroThreshold: 0.001, - ZeroCount: 1. / 15., - Count: 8. / 15., - Sum: 1.226666666666667, - PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}, - PositiveBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.}, - NegativeSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}, - NegativeBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.}, + // TODO(beorn7): This should be GaugeType. Change it once supported by PromQL. + CounterResetHint: histogram.NotCounterReset, + Schema: 1, + ZeroThreshold: 0.001, + ZeroCount: 1. / 15., + Count: 8. / 15., + Sum: 1.226666666666667, + PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}, + PositiveBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.}, + NegativeSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}, + NegativeBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.}, } require.Equal(t, expectedHistogram, actualHistogram) } -func TestSparseFloatHistogramRate(t *testing.T) { +func TestNativeFloatHistogramRate(t *testing.T) { // TODO(beorn7): Integrate histograms into the PromQL testing framework // and write more tests there. test, err := NewTest(t, "") @@ -3198,20 +3200,22 @@ func TestSparseFloatHistogramRate(t *testing.T) { require.Len(t, vector, 1) actualHistogram := vector[0].H expectedHistogram := &histogram.FloatHistogram{ - Schema: 1, - ZeroThreshold: 0.001, - ZeroCount: 1. / 15., - Count: 8. / 15., - Sum: 1.226666666666667, - PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}, - PositiveBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.}, - NegativeSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}, - NegativeBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.}, + // TODO(beorn7): This should be GaugeType. Change it once supported by PromQL. + CounterResetHint: histogram.NotCounterReset, + Schema: 1, + ZeroThreshold: 0.001, + ZeroCount: 1. / 15., + Count: 8. / 15., + Sum: 1.226666666666667, + PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}, + PositiveBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.}, + NegativeSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}, + NegativeBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.}, } require.Equal(t, expectedHistogram, actualHistogram) } -func TestSparseHistogram_HistogramCountAndSum(t *testing.T) { +func TestNativeHistogram_HistogramCountAndSum(t *testing.T) { // TODO(codesome): Integrate histograms into the PromQL testing framework // and write more tests there. h := &histogram.Histogram{ @@ -3290,7 +3294,7 @@ func TestSparseHistogram_HistogramCountAndSum(t *testing.T) { } } -func TestSparseHistogram_HistogramQuantile(t *testing.T) { +func TestNativeHistogram_HistogramQuantile(t *testing.T) { // TODO(codesome): Integrate histograms into the PromQL testing framework // and write more tests there. type subCase struct { @@ -3526,7 +3530,7 @@ func TestSparseHistogram_HistogramQuantile(t *testing.T) { } } -func TestSparseHistogram_HistogramFraction(t *testing.T) { +func TestNativeHistogram_HistogramFraction(t *testing.T) { // TODO(codesome): Integrate histograms into the PromQL testing framework // and write more tests there. type subCase struct { @@ -3961,7 +3965,7 @@ func TestSparseHistogram_HistogramFraction(t *testing.T) { } } -func TestSparseHistogram_Sum_Count_AddOperator(t *testing.T) { +func TestNativeHistogram_Sum_Count_AddOperator(t *testing.T) { // TODO(codesome): Integrate histograms into the PromQL testing framework // and write more tests there. cases := []struct { diff --git a/storage/merge.go b/storage/merge.go index ba45b12bd3..c5768c7658 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -440,6 +440,10 @@ type chainSampleIterator struct { curr chunkenc.Iterator lastT int64 + + // Whether the previous and the current sample are direct neighbors + // within the same base iterator. + consecutive bool } // Return a chainSampleIterator initialized for length entries, re-using the memory from it if possible. @@ -485,6 +489,9 @@ func (c *chainSampleIterator) Seek(t int64) chunkenc.ValueType { if c.curr != nil && c.lastT >= t { return c.curr.Seek(c.lastT) } + // Don't bother to find out if the next sample is consecutive. Callers + // of Seek usually aren't interested anyway. + c.consecutive = false c.h = samplesIteratorHeap{} for _, iter := range c.iterators { if iter.Seek(t) != chunkenc.ValNone { @@ -511,14 +518,26 @@ func (c *chainSampleIterator) AtHistogram() (int64, *histogram.Histogram) { if c.curr == nil { panic("chainSampleIterator.AtHistogram called before first .Next or after .Next returned false.") } - return c.curr.AtHistogram() + t, h := c.curr.AtHistogram() + // If the current sample is not consecutive with the previous one, we + // cannot be sure anymore that there was no counter reset. + if !c.consecutive && h.CounterResetHint == histogram.NotCounterReset { + h.CounterResetHint = histogram.UnknownCounterReset + } + return t, h } func (c *chainSampleIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { if c.curr == nil { panic("chainSampleIterator.AtFloatHistogram called before first .Next or after .Next returned false.") } - return c.curr.AtFloatHistogram() + t, fh := c.curr.AtFloatHistogram() + // If the current sample is not consecutive with the previous one, we + // cannot be sure anymore that there was no counter reset. + if !c.consecutive && fh.CounterResetHint == histogram.NotCounterReset { + fh.CounterResetHint = histogram.UnknownCounterReset + } + return t, fh } func (c *chainSampleIterator) AtT() int64 { @@ -529,7 +548,13 @@ func (c *chainSampleIterator) AtT() int64 { } func (c *chainSampleIterator) Next() chunkenc.ValueType { + var ( + currT int64 + currValueType chunkenc.ValueType + iteratorChanged bool + ) if c.h == nil { + iteratorChanged = true c.h = samplesIteratorHeap{} // We call c.curr.Next() as the first thing below. // So, we don't call Next() on it here. @@ -545,8 +570,6 @@ func (c *chainSampleIterator) Next() chunkenc.ValueType { return chunkenc.ValNone } - var currT int64 - var currValueType chunkenc.ValueType for { currValueType = c.curr.Next() if currValueType != chunkenc.ValNone { @@ -576,6 +599,7 @@ func (c *chainSampleIterator) Next() chunkenc.ValueType { } c.curr = heap.Pop(&c.h).(chunkenc.Iterator) + iteratorChanged = true currT = c.curr.AtT() currValueType = c.curr.Seek(currT) if currT != c.lastT { @@ -583,6 +607,7 @@ func (c *chainSampleIterator) Next() chunkenc.ValueType { } } + c.consecutive = !iteratorChanged c.lastT = currT return currValueType } diff --git a/tsdb/block_test.go b/tsdb/block_test.go index fdbc128f4a..ab33999623 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -632,6 +632,16 @@ func genHistogramSeries(totalSeries, labelCount int, mint, maxt, step int64, flo }, PositiveBuckets: []int64{int64(ts + 1), 1, -1, 0}, } + if ts != mint { + // By setting the counter reset hint to "no counter + // reset" for all histograms but the first, we cover the + // most common cases. If the series is manipulated later + // or spans more than one block when ingested into the + // storage, the hint has to be adjusted. Note that the + // storage itself treats this particular hint the same + // as "unknown". + h.CounterResetHint = histogram.NotCounterReset + } if floatHistogram { return sample{t: ts, fh: h.ToFloat()} } @@ -661,6 +671,13 @@ func genHistogramAndFloatSeries(totalSeries, labelCount int, mint, maxt, step in }, PositiveBuckets: []int64{int64(ts + 1), 1, -1, 0}, } + if count > 1 && count%5 != 1 { + // Same rationale for this as above in + // genHistogramSeries, just that we have to be + // smarter to find out if the previous sample + // was a histogram, too. + h.CounterResetHint = histogram.NotCounterReset + } if floatHistogram { s = sample{t: ts, fh: h.ToFloat()} } else { diff --git a/tsdb/chunkenc/float_histogram.go b/tsdb/chunkenc/float_histogram.go index 437a5712e2..b462c6d9fd 100644 --- a/tsdb/chunkenc/float_histogram.go +++ b/tsdb/chunkenc/float_histogram.go @@ -628,12 +628,8 @@ func (it *floatHistogramIterator) AtFloatHistogram() (int64, *histogram.FloatHis return it.t, &histogram.FloatHistogram{Sum: it.sum.value} } it.atFloatHistogramCalled = true - crHint := histogram.UnknownCounterReset - if it.counterResetHeader == GaugeType { - crHint = histogram.GaugeType - } return it.t, &histogram.FloatHistogram{ - CounterResetHint: crHint, + CounterResetHint: counterResetHint(it.counterResetHeader, it.numRead), Count: it.cnt.value, ZeroCount: it.zCnt.value, Sum: it.sum.value, diff --git a/tsdb/chunkenc/float_histogram_test.go b/tsdb/chunkenc/float_histogram_test.go index e945ce1e67..31d96ee7a9 100644 --- a/tsdb/chunkenc/float_histogram_test.go +++ b/tsdb/chunkenc/float_histogram_test.go @@ -66,7 +66,9 @@ func TestFloatHistogramChunkSameBuckets(t *testing.T) { h.PositiveBuckets = []int64{5, -2, 1, -2} // counts: 5, 3, 4, 2 (total 14) h.NegativeBuckets = []int64{4, -1, 1, -1} // counts: 4, 3, 4, 4 (total 15) app.AppendFloatHistogram(ts, h.ToFloat()) - exp = append(exp, floatResult{t: ts, h: h.ToFloat()}) + expH := h.ToFloat() + expH.CounterResetHint = histogram.NotCounterReset + exp = append(exp, floatResult{t: ts, h: expH}) require.Equal(t, 2, c.NumSamples()) // Add update with new appender. @@ -81,7 +83,9 @@ func TestFloatHistogramChunkSameBuckets(t *testing.T) { h.PositiveBuckets = []int64{6, 1, -3, 6} // counts: 6, 7, 4, 10 (total 27) h.NegativeBuckets = []int64{5, 1, -2, 3} // counts: 5, 6, 4, 7 (total 22) app.AppendFloatHistogram(ts, h.ToFloat()) - exp = append(exp, floatResult{t: ts, h: h.ToFloat()}) + expH = h.ToFloat() + expH.CounterResetHint = histogram.NotCounterReset + exp = append(exp, floatResult{t: ts, h: expH}) require.Equal(t, 3, c.NumSamples()) // 1. Expand iterator in simple case. @@ -209,9 +213,11 @@ func TestFloatHistogramChunkBucketChanges(t *testing.T) { h1.PositiveBuckets = []int64{6, -3, -3, 3, -3, 0, 2, 2, 1, -5, 1} h1.NegativeSpans = h2.NegativeSpans h1.NegativeBuckets = []int64{0, 1} + expH2 := h2.ToFloat() + expH2.CounterResetHint = histogram.NotCounterReset exp := []floatResult{ {t: ts1, h: h1.ToFloat()}, - {t: ts2, h: h2.ToFloat()}, + {t: ts2, h: expH2}, } it := c.Iterator(nil) var act []floatResult diff --git a/tsdb/chunkenc/histogram.go b/tsdb/chunkenc/histogram.go index dd50a01e8a..7b6a9cacb3 100644 --- a/tsdb/chunkenc/histogram.go +++ b/tsdb/chunkenc/histogram.go @@ -667,12 +667,8 @@ func (it *histogramIterator) AtHistogram() (int64, *histogram.Histogram) { return it.t, &histogram.Histogram{Sum: it.sum} } it.atHistogramCalled = true - crHint := histogram.UnknownCounterReset - if it.counterResetHeader == GaugeType { - crHint = histogram.GaugeType - } return it.t, &histogram.Histogram{ - CounterResetHint: crHint, + CounterResetHint: counterResetHint(it.counterResetHeader, it.numRead), Count: it.cnt, ZeroCount: it.zCnt, Sum: it.sum, @@ -690,12 +686,8 @@ func (it *histogramIterator) AtFloatHistogram() (int64, *histogram.FloatHistogra return it.t, &histogram.FloatHistogram{Sum: it.sum} } it.atFloatHistogramCalled = true - crHint := histogram.UnknownCounterReset - if it.counterResetHeader == GaugeType { - crHint = histogram.GaugeType - } return it.t, &histogram.FloatHistogram{ - CounterResetHint: crHint, + CounterResetHint: counterResetHint(it.counterResetHeader, it.numRead), Count: float64(it.cnt), ZeroCount: float64(it.zCnt), Sum: it.sum, diff --git a/tsdb/chunkenc/histogram_meta.go b/tsdb/chunkenc/histogram_meta.go index ca60f350bc..027eee1129 100644 --- a/tsdb/chunkenc/histogram_meta.go +++ b/tsdb/chunkenc/histogram_meta.go @@ -452,3 +452,38 @@ func insert[BV bucketValue](in, out []BV, inserts []Insert, deltas bool) []BV { } return out } + +// counterResetHint returns a CounterResetHint based on the CounterResetHeader +// and on the position into the chunk. +func counterResetHint(crh CounterResetHeader, numRead uint16) histogram.CounterResetHint { + switch { + case crh == GaugeType: + // A gauge histogram chunk only contains gauge histograms. + return histogram.GaugeType + case numRead > 1: + // In a counter histogram chunk, there will not be any counter + // resets after the first histogram. + return histogram.NotCounterReset + case crh == CounterReset: + // If the chunk was started because of a counter reset, we can + // safely return that hint. This histogram always has to be + // treated as a counter reset. + return histogram.CounterReset + default: + // Sadly, we have to return "unknown" as the hint for all other + // cases, even if we know that the chunk was started without a + // counter reset. But we cannot be sure that the previous chunk + // still exists in the TSDB, so we conservatively return + // "unknown". On the bright side, this case should be relatively + // rare. + // + // TODO(beorn7): Nevertheless, if the current chunk is in the + // middle of a block (not the first chunk in the block for this + // series), it's probably safe to assume that the previous chunk + // will exist in the TSDB for as long as the current chunk + // exist, and we could safely return + // "histogram.NotCounterReset". This needs some more work and + // might not be worth the effort and/or risk. To be vetted... + return histogram.UnknownCounterReset + } +} diff --git a/tsdb/chunkenc/histogram_test.go b/tsdb/chunkenc/histogram_test.go index c5b6faa2a0..4bb146ccdb 100644 --- a/tsdb/chunkenc/histogram_test.go +++ b/tsdb/chunkenc/histogram_test.go @@ -67,7 +67,9 @@ func TestHistogramChunkSameBuckets(t *testing.T) { h.PositiveBuckets = []int64{5, -2, 1, -2} // counts: 5, 3, 4, 2 (total 14) h.NegativeBuckets = []int64{4, -1, 1, -1} // counts: 4, 3, 4, 4 (total 15) app.AppendHistogram(ts, h) - exp = append(exp, result{t: ts, h: h, fh: h.ToFloat()}) + hExp := h.Copy() + hExp.CounterResetHint = histogram.NotCounterReset + exp = append(exp, result{t: ts, h: hExp, fh: hExp.ToFloat()}) require.Equal(t, 2, c.NumSamples()) // Add update with new appender. @@ -82,7 +84,9 @@ func TestHistogramChunkSameBuckets(t *testing.T) { h.PositiveBuckets = []int64{6, 1, -3, 6} // counts: 6, 7, 4, 10 (total 27) h.NegativeBuckets = []int64{5, 1, -2, 3} // counts: 5, 6, 4, 7 (total 22) app.AppendHistogram(ts, h) - exp = append(exp, result{t: ts, h: h, fh: h.ToFloat()}) + hExp = h.Copy() + hExp.CounterResetHint = histogram.NotCounterReset + exp = append(exp, result{t: ts, h: hExp, fh: hExp.ToFloat()}) require.Equal(t, 3, c.NumSamples()) // 1. Expand iterator in simple case. @@ -220,9 +224,11 @@ func TestHistogramChunkBucketChanges(t *testing.T) { h1.PositiveBuckets = []int64{6, -3, -3, 3, -3, 0, 2, 2, 1, -5, 1} h1.NegativeSpans = h2.NegativeSpans h1.NegativeBuckets = []int64{0, 1} + hExp := h2.Copy() + hExp.CounterResetHint = histogram.NotCounterReset exp := []result{ {t: ts1, h: h1, fh: h1.ToFloat()}, - {t: ts2, h: h2, fh: h2.ToFloat()}, + {t: ts2, h: hExp, fh: hExp.ToFloat()}, } it := c.Iterator(nil) var act []result @@ -463,11 +469,12 @@ func TestAtFloatHistogram(t *testing.T) { NegativeBuckets: []float64{1, 2, 1, 2, 2, 2, 2}, }, { - Schema: 0, - Count: 36, - Sum: 2345.6, - ZeroThreshold: 0.001, - ZeroCount: 5, + CounterResetHint: histogram.NotCounterReset, + Schema: 0, + Count: 36, + Sum: 2345.6, + ZeroThreshold: 0.001, + ZeroCount: 5, PositiveSpans: []histogram.Span{ {Offset: 0, Length: 4}, {Offset: 0, Length: 0}, @@ -482,11 +489,12 @@ func TestAtFloatHistogram(t *testing.T) { NegativeBuckets: []float64{1, 4, 2, 7, 5, 5, 2}, }, { - Schema: 0, - Count: 36, - Sum: 1111.1, - ZeroThreshold: 0.001, - ZeroCount: 5, + CounterResetHint: histogram.NotCounterReset, + Schema: 0, + Count: 36, + Sum: 1111.1, + ZeroThreshold: 0.001, + ZeroCount: 5, PositiveSpans: []histogram.Span{ {Offset: 0, Length: 4}, {Offset: 0, Length: 0}, diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 58238bea51..a3b99f87a0 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -1308,17 +1308,31 @@ func TestHeadCompactionWithHistograms(t *testing.T) { minute := func(m int) int64 { return int64(m) * time.Minute.Milliseconds() } ctx := context.Background() - appendHistogram := func(lbls labels.Labels, from, to int, h *histogram.Histogram, exp *[]tsdbutil.Sample) { + appendHistogram := func( + lbls labels.Labels, from, to int, h *histogram.Histogram, exp *[]tsdbutil.Sample, + ) { t.Helper() app := head.Appender(ctx) for tsMinute := from; tsMinute <= to; tsMinute++ { var err error if floatTest { _, err = app.AppendHistogram(0, lbls, minute(tsMinute), nil, h.ToFloat()) - *exp = append(*exp, sample{t: minute(tsMinute), fh: h.ToFloat()}) + efh := h.ToFloat() + if tsMinute == from { + efh.CounterResetHint = histogram.UnknownCounterReset + } else { + efh.CounterResetHint = histogram.NotCounterReset + } + *exp = append(*exp, sample{t: minute(tsMinute), fh: efh}) } else { _, err = app.AppendHistogram(0, lbls, minute(tsMinute), h, nil) - *exp = append(*exp, sample{t: minute(tsMinute), h: h.Copy()}) + eh := h.Copy() + if tsMinute == from { + eh.CounterResetHint = histogram.UnknownCounterReset + } else { + eh.CounterResetHint = histogram.NotCounterReset + } + *exp = append(*exp, sample{t: minute(tsMinute), h: eh}) } require.NoError(t, err) } diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 60188a7bd6..9e5623bea7 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -5836,16 +5836,23 @@ func testHistogramAppendAndQueryHelper(t *testing.T, floatHistogram bool) { }) ctx := context.Background() - appendHistogram := func(lbls labels.Labels, tsMinute int, h *histogram.Histogram, exp *[]tsdbutil.Sample) { + appendHistogram := func( + lbls labels.Labels, tsMinute int, h *histogram.Histogram, + exp *[]tsdbutil.Sample, expCRH histogram.CounterResetHint, + ) { t.Helper() var err error app := db.Appender(ctx) if floatHistogram { _, err = app.AppendHistogram(0, lbls, minute(tsMinute), nil, h.ToFloat()) - *exp = append(*exp, sample{t: minute(tsMinute), fh: h.ToFloat()}) + efh := h.ToFloat() + efh.CounterResetHint = expCRH + *exp = append(*exp, sample{t: minute(tsMinute), fh: efh}) } else { _, err = app.AppendHistogram(0, lbls, minute(tsMinute), h.Copy(), nil) - *exp = append(*exp, sample{t: minute(tsMinute), h: h.Copy()}) + eh := h.Copy() + eh.CounterResetHint = expCRH + *exp = append(*exp, sample{t: minute(tsMinute), h: eh}) } require.NoError(t, err) require.NoError(t, app.Commit()) @@ -5897,23 +5904,23 @@ func testHistogramAppendAndQueryHelper(t *testing.T, floatHistogram bool) { t.Run("series with only histograms", func(t *testing.T) { h := baseH.Copy() // This is shared across all sub tests. - appendHistogram(series1, 100, h, &exp1) + appendHistogram(series1, 100, h, &exp1, histogram.UnknownCounterReset) testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) h.PositiveBuckets[0]++ h.NegativeBuckets[0] += 2 h.Count += 10 - appendHistogram(series1, 101, h, &exp1) + appendHistogram(series1, 101, h, &exp1, histogram.NotCounterReset) testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) t.Run("changing schema", func(t *testing.T) { h.Schema = 2 - appendHistogram(series1, 102, h, &exp1) + appendHistogram(series1, 102, h, &exp1, histogram.UnknownCounterReset) testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) // Schema back to old. h.Schema = 1 - appendHistogram(series1, 103, h, &exp1) + appendHistogram(series1, 103, h, &exp1, histogram.UnknownCounterReset) testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) }) @@ -5942,7 +5949,7 @@ func testHistogramAppendAndQueryHelper(t *testing.T, floatHistogram bool) { h.PositiveSpans[1].Length++ h.PositiveBuckets = append(h.PositiveBuckets, 1) h.Count += 3 - appendHistogram(series1, 104, h, &exp1) + appendHistogram(series1, 104, h, &exp1, histogram.NotCounterReset) testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) // Because of the previous two histograms being on the active chunk, @@ -5980,14 +5987,14 @@ func testHistogramAppendAndQueryHelper(t *testing.T, floatHistogram bool) { h.Count += 3 // {2, 1, -1, 0, 1} -> {2, 1, 0, -1, 0, 1} h.PositiveBuckets = append(h.PositiveBuckets[:2], append([]int64{0}, h.PositiveBuckets[2:]...)...) - appendHistogram(series1, 105, h, &exp1) + appendHistogram(series1, 105, h, &exp1, histogram.NotCounterReset) testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) // We add 4 more histograms to clear out the buffer and see the re-encoded histograms. - appendHistogram(series1, 106, h, &exp1) - appendHistogram(series1, 107, h, &exp1) - appendHistogram(series1, 108, h, &exp1) - appendHistogram(series1, 109, h, &exp1) + appendHistogram(series1, 106, h, &exp1, histogram.NotCounterReset) + appendHistogram(series1, 107, h, &exp1, histogram.NotCounterReset) + appendHistogram(series1, 108, h, &exp1, histogram.NotCounterReset) + appendHistogram(series1, 109, h, &exp1, histogram.NotCounterReset) // Update the expected histograms to reflect the re-encoding. if floatHistogram { @@ -6020,7 +6027,7 @@ func testHistogramAppendAndQueryHelper(t *testing.T, floatHistogram bool) { t.Run("buckets disappearing", func(t *testing.T) { h.PositiveSpans[1].Length-- h.PositiveBuckets = h.PositiveBuckets[:len(h.PositiveBuckets)-1] - appendHistogram(series1, 110, h, &exp1) + appendHistogram(series1, 110, h, &exp1, histogram.CounterReset) testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) }) }) @@ -6032,9 +6039,9 @@ func testHistogramAppendAndQueryHelper(t *testing.T, floatHistogram bool) { testQuery("foo", "bar2", map[string][]tsdbutil.Sample{series2.String(): exp2}) h := baseH.Copy() - appendHistogram(series2, 103, h, &exp2) - appendHistogram(series2, 104, h, &exp2) - appendHistogram(series2, 105, h, &exp2) + appendHistogram(series2, 103, h, &exp2, histogram.UnknownCounterReset) + appendHistogram(series2, 104, h, &exp2, histogram.NotCounterReset) + appendHistogram(series2, 105, h, &exp2, histogram.NotCounterReset) testQuery("foo", "bar2", map[string][]tsdbutil.Sample{series2.String(): exp2}) // Switching between float and histograms again. @@ -6042,16 +6049,16 @@ func testHistogramAppendAndQueryHelper(t *testing.T, floatHistogram bool) { appendFloat(series2, 107, 107, &exp2) testQuery("foo", "bar2", map[string][]tsdbutil.Sample{series2.String(): exp2}) - appendHistogram(series2, 108, h, &exp2) - appendHistogram(series2, 109, h, &exp2) + appendHistogram(series2, 108, h, &exp2, histogram.UnknownCounterReset) + appendHistogram(series2, 109, h, &exp2, histogram.NotCounterReset) testQuery("foo", "bar2", map[string][]tsdbutil.Sample{series2.String(): exp2}) }) t.Run("series starting with histogram and then getting float", func(t *testing.T) { h := baseH.Copy() - appendHistogram(series3, 101, h, &exp3) - appendHistogram(series3, 102, h, &exp3) - appendHistogram(series3, 103, h, &exp3) + appendHistogram(series3, 101, h, &exp3, histogram.UnknownCounterReset) + appendHistogram(series3, 102, h, &exp3, histogram.NotCounterReset) + appendHistogram(series3, 103, h, &exp3, histogram.NotCounterReset) testQuery("foo", "bar3", map[string][]tsdbutil.Sample{series3.String(): exp3}) appendFloat(series3, 104, 100, &exp3) @@ -6060,8 +6067,8 @@ func testHistogramAppendAndQueryHelper(t *testing.T, floatHistogram bool) { testQuery("foo", "bar3", map[string][]tsdbutil.Sample{series3.String(): exp3}) // Switching between histogram and float again. - appendHistogram(series3, 107, h, &exp3) - appendHistogram(series3, 108, h, &exp3) + appendHistogram(series3, 107, h, &exp3, histogram.UnknownCounterReset) + appendHistogram(series3, 108, h, &exp3, histogram.NotCounterReset) testQuery("foo", "bar3", map[string][]tsdbutil.Sample{series3.String(): exp3}) appendFloat(series3, 109, 106, &exp3) @@ -6091,7 +6098,7 @@ func TestQueryHistogramFromBlocksWithCompaction(t *testing.T) { t.Helper() opts := DefaultOptions() - opts.AllowOverlappingCompaction = true // TODO(jesus.vazquez) This replaced AllowOverlappingBlocks, make sure that works + opts.AllowOverlappingCompaction = true // TODO(jesusvazquez): This replaced AllowOverlappingBlocks, make sure that works. db := openTestDB(t, opts, nil) t.Cleanup(func() { require.NoError(t, db.Close()) @@ -6137,7 +6144,7 @@ func TestQueryHistogramFromBlocksWithCompaction(t *testing.T) { q, err := db.Querier(ctx, math.MinInt64, math.MaxInt64) require.NoError(t, err) res := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) - require.Equal(t, exp, res) + compareSeries(t, exp, res) // Compact all the blocks together and query again. blocks := db.Blocks() @@ -6154,10 +6161,23 @@ func TestQueryHistogramFromBlocksWithCompaction(t *testing.T) { q, err = db.Querier(ctx, math.MinInt64, math.MaxInt64) require.NoError(t, err) res = query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) - require.Equal(t, exp, res) + + // After compaction, we do not require "unknown" counter resets + // due to origin from different overlapping chunks anymore. + for _, ss := range exp { + for i, s := range ss[1:] { + if s.H() != nil && ss[i].H() != nil && s.H().CounterResetHint == histogram.UnknownCounterReset { + s.H().CounterResetHint = histogram.NotCounterReset + } + if s.FH() != nil && ss[i].FH() != nil && s.FH().CounterResetHint == histogram.UnknownCounterReset { + s.FH().CounterResetHint = histogram.NotCounterReset + } + } + } + compareSeries(t, exp, res) } - for _, floatHistogram := range []bool{true} { + for _, floatHistogram := range []bool{false, true} { t.Run(fmt.Sprintf("floatHistogram=%t", floatHistogram), func(t *testing.T) { t.Run("serial blocks with only histograms", func(t *testing.T) { testBlockQuerying(t, @@ -6272,3 +6292,45 @@ func TestNativeHistogramFlag(t *testing.T) { l.String(): {sample{t: 200, h: h}, sample{t: 205, fh: h.ToFloat()}}, }, act) } + +// compareSeries essentially replaces `require.Equal(t, expected, actual) in +// situations where the actual series might contain more counter reset hints +// "unknown" than the expected series. This can easily happen for long series +// that trigger new chunks. This function therefore tolerates counter reset +// hints "CounterReset" and "NotCounterReset" in an expected series where the +// actual series contains a counter reset hint "UnknownCounterReset". +// "GaugeType" hints are still strictly checked, and any "UnknownCounterReset" +// in an expected series has to be matched precisely by the actual series. +func compareSeries(t require.TestingT, expected, actual map[string][]tsdbutil.Sample) { + if len(expected) != len(actual) { + // The reason for the difference is not the counter reset hints + // (alone), so let's use the pretty diffing by the require + // package. + require.Equal(t, expected, actual, "number of series differs") + } + for key, eSamples := range expected { + aSamples, ok := actual[key] + if !ok { + require.Equal(t, expected, actual, "expected series %q not found", key) + } + if len(eSamples) != len(aSamples) { + require.Equal(t, eSamples, aSamples, "number of samples for series %q differs", key) + } + for i, eS := range eSamples { + aS := aSamples[i] + aH, eH := aS.H(), eS.H() + aFH, eFH := aS.FH(), eS.FH() + switch { + case aH != nil && eH != nil && aH.CounterResetHint == histogram.UnknownCounterReset && eH.CounterResetHint != histogram.GaugeType: + eH = eH.Copy() + eH.CounterResetHint = histogram.UnknownCounterReset + eS = sample{t: eS.T(), h: eH} + case aFH != nil && eFH != nil && aFH.CounterResetHint == histogram.UnknownCounterReset && eFH.CounterResetHint != histogram.GaugeType: + eFH = eFH.Copy() + eFH.CounterResetHint = histogram.UnknownCounterReset + eS = sample{t: eS.T(), fh: eFH} + } + require.Equal(t, eS, aS, "sample %d in series %q differs", i, key) + } + } +} diff --git a/tsdb/head.go b/tsdb/head.go index 6432cd891b..1ef88be366 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -2040,7 +2040,7 @@ func (h *Head) updateWALReplayStatusRead(current int) { func GenerateTestHistograms(n int) (r []*histogram.Histogram) { for i := 0; i < n; i++ { - r = append(r, &histogram.Histogram{ + h := histogram.Histogram{ Count: 10 + uint64(i*8), ZeroCount: 2 + uint64(i), ZeroThreshold: 0.001, @@ -2056,9 +2056,12 @@ func GenerateTestHistograms(n int) (r []*histogram.Histogram) { {Offset: 1, Length: 2}, }, NegativeBuckets: []int64{int64(i + 1), 1, -1, 0}, - }) + } + if i > 0 { + h.CounterResetHint = histogram.NotCounterReset + } + r = append(r, &h) } - return r } @@ -2084,13 +2087,12 @@ func GenerateTestGaugeHistograms(n int) (r []*histogram.Histogram) { NegativeBuckets: []int64{int64(i + 1), 1, -1, 0}, }) } - return r } func GenerateTestFloatHistograms(n int) (r []*histogram.FloatHistogram) { for i := 0; i < n; i++ { - r = append(r, &histogram.FloatHistogram{ + h := histogram.FloatHistogram{ Count: 10 + float64(i*8), ZeroCount: 2 + float64(i), ZeroThreshold: 0.001, @@ -2106,7 +2108,11 @@ func GenerateTestFloatHistograms(n int) (r []*histogram.FloatHistogram) { {Offset: 1, Length: 2}, }, NegativeBuckets: []float64{float64(i + 1), float64(i + 2), float64(i + 1), float64(i + 1)}, - }) + } + if i > 0 { + h.CounterResetHint = histogram.NotCounterReset + } + r = append(r, &h) } return r diff --git a/tsdb/head_append.go b/tsdb/head_append.go index f3585a87be..33cfc0eb3e 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -1152,17 +1152,27 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui pForwardInserts, nForwardInserts []chunkenc.Insert pBackwardInserts, nBackwardInserts []chunkenc.Insert pMergedSpans, nMergedSpans []histogram.Span - okToAppend, counterReset bool + okToAppend, counterReset, gauge bool ) c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncHistogram, chunkDiskMapper, chunkRange) if !sampleInOrder { return sampleInOrder, chunkCreated } - gauge := h.CounterResetHint == histogram.GaugeType - if app != nil { - if gauge { - pForwardInserts, nForwardInserts, pBackwardInserts, nBackwardInserts, pMergedSpans, nMergedSpans, okToAppend = app.AppendableGauge(h) - } else { + switch h.CounterResetHint { + case histogram.GaugeType: + gauge = true + if app != nil { + pForwardInserts, nForwardInserts, + pBackwardInserts, nBackwardInserts, + pMergedSpans, nMergedSpans, + okToAppend = app.AppendableGauge(h) + } + case histogram.CounterReset: + // The caller tells us this is a counter reset, even if it + // doesn't look like one. + counterReset = true + default: + if app != nil { pForwardInserts, nForwardInserts, okToAppend, counterReset = app.Appendable(h) } } @@ -1235,18 +1245,27 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, pForwardInserts, nForwardInserts []chunkenc.Insert pBackwardInserts, nBackwardInserts []chunkenc.Insert pMergedSpans, nMergedSpans []histogram.Span - okToAppend, counterReset bool + okToAppend, counterReset, gauge bool ) c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncFloatHistogram, chunkDiskMapper, chunkRange) if !sampleInOrder { return sampleInOrder, chunkCreated } - gauge := fh.CounterResetHint == histogram.GaugeType - if app != nil { - if gauge { - pForwardInserts, nForwardInserts, pBackwardInserts, nBackwardInserts, - pMergedSpans, nMergedSpans, okToAppend = app.AppendableGauge(fh) - } else { + switch fh.CounterResetHint { + case histogram.GaugeType: + gauge = true + if app != nil { + pForwardInserts, nForwardInserts, + pBackwardInserts, nBackwardInserts, + pMergedSpans, nMergedSpans, + okToAppend = app.AppendableGauge(fh) + } + case histogram.CounterReset: + // The caller tells us this is a counter reset, even if it + // doesn't look like one. + counterReset = true + default: + if app != nil { pForwardInserts, nForwardInserts, okToAppend, counterReset = app.Appendable(fh) } } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 2a1123db81..1130bbe190 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -197,7 +197,6 @@ func BenchmarkLoadWAL(b *testing.B) { continue } lastExemplarsPerSeries = exemplarsPerSeries - // fmt.Println("exemplars per series: ", exemplarsPerSeries) b.Run(fmt.Sprintf("batches=%d,seriesPerBatch=%d,samplesPerSeries=%d,exemplarsPerSeries=%d,mmappedChunkT=%d", c.batches, c.seriesPerBatch, c.samplesPerSeries, exemplarsPerSeries, c.mmappedChunkT), func(b *testing.B) { dir := b.TempDir() @@ -2834,17 +2833,13 @@ func TestAppendHistogram(t *testing.T) { ingestTs := int64(0) app := head.Appender(context.Background()) - type timedHistogram struct { - t int64 - h *histogram.Histogram - } - expHistograms := make([]timedHistogram, 0, 2*numHistograms) + expHistograms := make([]tsdbutil.Sample, 0, 2*numHistograms) // Counter integer histograms. for _, h := range GenerateTestHistograms(numHistograms) { _, err := app.AppendHistogram(0, l, ingestTs, h, nil) require.NoError(t, err) - expHistograms = append(expHistograms, timedHistogram{ingestTs, h}) + expHistograms = append(expHistograms, sample{t: ingestTs, h: h}) ingestTs++ if ingestTs%50 == 0 { require.NoError(t, app.Commit()) @@ -2856,7 +2851,7 @@ func TestAppendHistogram(t *testing.T) { for _, h := range GenerateTestGaugeHistograms(numHistograms) { _, err := app.AppendHistogram(0, l, ingestTs, h, nil) require.NoError(t, err) - expHistograms = append(expHistograms, timedHistogram{ingestTs, h}) + expHistograms = append(expHistograms, sample{t: ingestTs, h: h}) ingestTs++ if ingestTs%50 == 0 { require.NoError(t, app.Commit()) @@ -2864,17 +2859,13 @@ func TestAppendHistogram(t *testing.T) { } } - type timedFloatHistogram struct { - t int64 - h *histogram.FloatHistogram - } - expFloatHistograms := make([]timedFloatHistogram, 0, 2*numHistograms) + expFloatHistograms := make([]tsdbutil.Sample, 0, 2*numHistograms) // Counter float histograms. for _, fh := range GenerateTestFloatHistograms(numHistograms) { _, err := app.AppendHistogram(0, l, ingestTs, nil, fh) require.NoError(t, err) - expFloatHistograms = append(expFloatHistograms, timedFloatHistogram{ingestTs, fh}) + expFloatHistograms = append(expFloatHistograms, sample{t: ingestTs, fh: fh}) ingestTs++ if ingestTs%50 == 0 { require.NoError(t, app.Commit()) @@ -2886,7 +2877,7 @@ func TestAppendHistogram(t *testing.T) { for _, fh := range GenerateTestGaugeFloatHistograms(numHistograms) { _, err := app.AppendHistogram(0, l, ingestTs, nil, fh) require.NoError(t, err) - expFloatHistograms = append(expFloatHistograms, timedFloatHistogram{ingestTs, fh}) + expFloatHistograms = append(expFloatHistograms, sample{t: ingestTs, fh: fh}) ingestTs++ if ingestTs%50 == 0 { require.NoError(t, app.Commit()) @@ -2909,20 +2900,28 @@ func TestAppendHistogram(t *testing.T) { require.False(t, ss.Next()) it := s.Iterator(nil) - actHistograms := make([]timedHistogram, 0, len(expHistograms)) - actFloatHistograms := make([]timedFloatHistogram, 0, len(expFloatHistograms)) + actHistograms := make([]tsdbutil.Sample, 0, len(expHistograms)) + actFloatHistograms := make([]tsdbutil.Sample, 0, len(expFloatHistograms)) for typ := it.Next(); typ != chunkenc.ValNone; typ = it.Next() { if typ == chunkenc.ValHistogram { ts, h := it.AtHistogram() - actHistograms = append(actHistograms, timedHistogram{ts, h}) + actHistograms = append(actHistograms, sample{t: ts, h: h}) } else if typ == chunkenc.ValFloatHistogram { ts, fh := it.AtFloatHistogram() - actFloatHistograms = append(actFloatHistograms, timedFloatHistogram{ts, fh}) + actFloatHistograms = append(actFloatHistograms, sample{t: ts, fh: fh}) } } - require.Equal(t, expHistograms, actHistograms) - require.Equal(t, expFloatHistograms, actFloatHistograms) + compareSeries( + t, + map[string][]tsdbutil.Sample{"dummy": expHistograms}, + map[string][]tsdbutil.Sample{"dummy": actHistograms}, + ) + compareSeries( + t, + map[string][]tsdbutil.Sample{"dummy": expFloatHistograms}, + map[string][]tsdbutil.Sample{"dummy": actFloatHistograms}, + ) }) } } @@ -3019,7 +3018,12 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) { h.NegativeBuckets = h.PositiveBuckets _, err := app.AppendHistogram(0, s2, int64(ts), h, nil) require.NoError(t, err) - exp[k2] = append(exp[k2], sample{t: int64(ts), h: h.Copy()}) + eh := h.Copy() + if !gauge && ts > 30 && (ts-10)%20 == 1 { + // Need "unknown" hint after float sample. + eh.CounterResetHint = histogram.UnknownCounterReset + } + exp[k2] = append(exp[k2], sample{t: int64(ts), h: eh}) if ts%20 == 0 { require.NoError(t, app.Commit()) app = head.Appender(context.Background()) @@ -3051,7 +3055,12 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) { h.NegativeBuckets = h.PositiveBuckets _, err := app.AppendHistogram(0, s2, int64(ts), nil, h) require.NoError(t, err) - exp[k2] = append(exp[k2], sample{t: int64(ts), fh: h.Copy()}) + eh := h.Copy() + if !gauge && ts > 30 && (ts-10)%20 == 1 { + // Need "unknown" hint after float sample. + eh.CounterResetHint = histogram.UnknownCounterReset + } + exp[k2] = append(exp[k2], sample{t: int64(ts), fh: eh}) if ts%20 == 0 { require.NoError(t, app.Commit()) app = head.Appender(context.Background()) @@ -3089,7 +3098,7 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) { q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime()) require.NoError(t, err) act := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "a", "b.*")) - require.Equal(t, exp, act) + compareSeries(t, exp, act) } testQuery() @@ -3506,6 +3515,11 @@ func testHistogramStaleSampleHelper(t *testing.T, floatHistogram bool) { ah.fh.Sum = 0 eh.fh = eh.fh.Copy() eh.fh.Sum = 0 + } else if i > 0 { + prev := expHistograms[i-1] + if prev.fh == nil || value.IsStaleNaN(prev.fh.Sum) { + eh.fh.CounterResetHint = histogram.UnknownCounterReset + } } require.Equal(t, eh, ah) } else { @@ -3516,6 +3530,11 @@ func testHistogramStaleSampleHelper(t *testing.T, floatHistogram bool) { ah.h.Sum = 0 eh.h = eh.h.Copy() eh.h.Sum = 0 + } else if i > 0 { + prev := expHistograms[i-1] + if prev.h == nil || value.IsStaleNaN(prev.h.Sum) { + eh.h.CounterResetHint = histogram.UnknownCounterReset + } } require.Equal(t, eh, ah) } @@ -3730,8 +3749,10 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { // If this is empty, samples above will be taken instead of this. addToExp []tsdbutil.Sample }{ + // Histograms that end up in the expected samples are copied here so that we + // can independently set the CounterResetHint later. { - samples: []tsdbutil.Sample{sample{t: 100, h: hists[1]}}, + samples: []tsdbutil.Sample{sample{t: 100, h: hists[0].Copy()}}, expChunks: 1, }, { @@ -3739,23 +3760,23 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { expChunks: 2, }, { - samples: []tsdbutil.Sample{sample{t: 210, fh: floatHists[1]}}, + samples: []tsdbutil.Sample{sample{t: 210, fh: floatHists[0].Copy()}}, expChunks: 3, }, { - samples: []tsdbutil.Sample{sample{t: 220, h: hists[1]}}, + samples: []tsdbutil.Sample{sample{t: 220, h: hists[1].Copy()}}, expChunks: 4, }, { - samples: []tsdbutil.Sample{sample{t: 230, fh: floatHists[3]}}, + samples: []tsdbutil.Sample{sample{t: 230, fh: floatHists[3].Copy()}}, expChunks: 5, }, { - samples: []tsdbutil.Sample{sample{t: 100, h: hists[2]}}, + samples: []tsdbutil.Sample{sample{t: 100, h: hists[2].Copy()}}, err: storage.ErrOutOfOrderSample, }, { - samples: []tsdbutil.Sample{sample{t: 300, h: hists[3]}}, + samples: []tsdbutil.Sample{sample{t: 300, h: hists[3].Copy()}}, expChunks: 6, }, { @@ -3763,7 +3784,7 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { err: storage.ErrOutOfOrderSample, }, { - samples: []tsdbutil.Sample{sample{t: 100, fh: floatHists[4]}}, + samples: []tsdbutil.Sample{sample{t: 100, fh: floatHists[4].Copy()}}, err: storage.ErrOutOfOrderSample, }, { @@ -3789,7 +3810,7 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { }, addToExp: []tsdbutil.Sample{ sample{t: 800, v: 8}, - sample{t: 900, h: hists[9]}, + sample{t: 900, h: hists[9].Copy()}, }, expChunks: 8, // float64 added to old chunk, only 1 new for histograms. }, @@ -3800,7 +3821,7 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { sample{t: 1100, h: hists[9]}, }, addToExp: []tsdbutil.Sample{ - sample{t: 1100, h: hists[9]}, + sample{t: 1100, h: hists[9].Copy()}, }, expChunks: 8, }, @@ -3830,6 +3851,14 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { require.NoError(t, app.Rollback()) } } + for i, s := range expResult[1:] { + switch { + case s.H() != nil && expResult[i].H() == nil: + s.(sample).h.CounterResetHint = histogram.UnknownCounterReset + case s.FH() != nil && expResult[i].FH() == nil: + s.(sample).fh.CounterResetHint = histogram.UnknownCounterReset + } + } // Query back and expect same order of samples. q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64) From 49c5b1fae4ae6500864fe8ae6bb6ac0eae085bbb Mon Sep 17 00:00:00 2001 From: beorn7 Date: Wed, 25 Jan 2023 18:23:10 +0100 Subject: [PATCH 3/3] histograms: Fix counter reset header during merging See detailed discussion: https://github.com/prometheus/prometheus/pull/11864#issuecomment-1403963451 Signed-off-by: beorn7 --- storage/merge.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/storage/merge.go b/storage/merge.go index c5768c7658..8db1f7ae83 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -533,8 +533,12 @@ func (c *chainSampleIterator) AtFloatHistogram() (int64, *histogram.FloatHistogr } t, fh := c.curr.AtFloatHistogram() // If the current sample is not consecutive with the previous one, we - // cannot be sure anymore that there was no counter reset. - if !c.consecutive && fh.CounterResetHint == histogram.NotCounterReset { + // cannot be sure anymore about counter resets for counter histograms. + // TODO(beorn7): If a `NotCounterReset` sample is followed by a + // non-consecutive `CounterReset` sample, we could keep the hint as + // `CounterReset`. But then we needed to track the previous sample + // in more detail, which might not be worth it. + if !c.consecutive && fh.CounterResetHint != histogram.GaugeType { fh.CounterResetHint = histogram.UnknownCounterReset } return t, fh