diff --git a/promql/engine_test.go b/promql/engine_test.go index 0582fd5a4e..a7aaedd691 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -3125,7 +3125,7 @@ func TestRangeQuery(t *testing.T) { } } -func TestSparseHistogramRate(t *testing.T) { +func TestNativeHistogramRate(t *testing.T) { // TODO(beorn7): Integrate histograms into the PromQL testing framework // and write more tests there. test, err := NewTest(t, "") @@ -3155,20 +3155,22 @@ func TestSparseHistogramRate(t *testing.T) { require.Len(t, vector, 1) actualHistogram := vector[0].H expectedHistogram := &histogram.FloatHistogram{ - Schema: 1, - ZeroThreshold: 0.001, - ZeroCount: 1. / 15., - Count: 8. / 15., - Sum: 1.226666666666667, - PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}, - PositiveBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.}, - NegativeSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}, - NegativeBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.}, + // TODO(beorn7): This should be GaugeType. Change it once supported by PromQL. + CounterResetHint: histogram.NotCounterReset, + Schema: 1, + ZeroThreshold: 0.001, + ZeroCount: 1. / 15., + Count: 8. / 15., + Sum: 1.226666666666667, + PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}, + PositiveBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.}, + NegativeSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}, + NegativeBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.}, } require.Equal(t, expectedHistogram, actualHistogram) } -func TestSparseFloatHistogramRate(t *testing.T) { +func TestNativeFloatHistogramRate(t *testing.T) { // TODO(beorn7): Integrate histograms into the PromQL testing framework // and write more tests there. test, err := NewTest(t, "") @@ -3198,20 +3200,22 @@ func TestSparseFloatHistogramRate(t *testing.T) { require.Len(t, vector, 1) actualHistogram := vector[0].H expectedHistogram := &histogram.FloatHistogram{ - Schema: 1, - ZeroThreshold: 0.001, - ZeroCount: 1. / 15., - Count: 8. / 15., - Sum: 1.226666666666667, - PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}, - PositiveBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.}, - NegativeSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}, - NegativeBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.}, + // TODO(beorn7): This should be GaugeType. Change it once supported by PromQL. + CounterResetHint: histogram.NotCounterReset, + Schema: 1, + ZeroThreshold: 0.001, + ZeroCount: 1. / 15., + Count: 8. / 15., + Sum: 1.226666666666667, + PositiveSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}, + PositiveBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.}, + NegativeSpans: []histogram.Span{{Offset: 0, Length: 2}, {Offset: 1, Length: 2}}, + NegativeBuckets: []float64{1. / 15., 1. / 15., 1. / 15., 1. / 15.}, } require.Equal(t, expectedHistogram, actualHistogram) } -func TestSparseHistogram_HistogramCountAndSum(t *testing.T) { +func TestNativeHistogram_HistogramCountAndSum(t *testing.T) { // TODO(codesome): Integrate histograms into the PromQL testing framework // and write more tests there. h := &histogram.Histogram{ @@ -3290,7 +3294,7 @@ func TestSparseHistogram_HistogramCountAndSum(t *testing.T) { } } -func TestSparseHistogram_HistogramQuantile(t *testing.T) { +func TestNativeHistogram_HistogramQuantile(t *testing.T) { // TODO(codesome): Integrate histograms into the PromQL testing framework // and write more tests there. type subCase struct { @@ -3526,7 +3530,7 @@ func TestSparseHistogram_HistogramQuantile(t *testing.T) { } } -func TestSparseHistogram_HistogramFraction(t *testing.T) { +func TestNativeHistogram_HistogramFraction(t *testing.T) { // TODO(codesome): Integrate histograms into the PromQL testing framework // and write more tests there. type subCase struct { @@ -3961,7 +3965,7 @@ func TestSparseHistogram_HistogramFraction(t *testing.T) { } } -func TestSparseHistogram_Sum_Count_AddOperator(t *testing.T) { +func TestNativeHistogram_Sum_Count_AddOperator(t *testing.T) { // TODO(codesome): Integrate histograms into the PromQL testing framework // and write more tests there. cases := []struct { diff --git a/storage/merge.go b/storage/merge.go index ba45b12bd3..8db1f7ae83 100644 --- a/storage/merge.go +++ b/storage/merge.go @@ -440,6 +440,10 @@ type chainSampleIterator struct { curr chunkenc.Iterator lastT int64 + + // Whether the previous and the current sample are direct neighbors + // within the same base iterator. + consecutive bool } // Return a chainSampleIterator initialized for length entries, re-using the memory from it if possible. @@ -485,6 +489,9 @@ func (c *chainSampleIterator) Seek(t int64) chunkenc.ValueType { if c.curr != nil && c.lastT >= t { return c.curr.Seek(c.lastT) } + // Don't bother to find out if the next sample is consecutive. Callers + // of Seek usually aren't interested anyway. + c.consecutive = false c.h = samplesIteratorHeap{} for _, iter := range c.iterators { if iter.Seek(t) != chunkenc.ValNone { @@ -511,14 +518,30 @@ func (c *chainSampleIterator) AtHistogram() (int64, *histogram.Histogram) { if c.curr == nil { panic("chainSampleIterator.AtHistogram called before first .Next or after .Next returned false.") } - return c.curr.AtHistogram() + t, h := c.curr.AtHistogram() + // If the current sample is not consecutive with the previous one, we + // cannot be sure anymore that there was no counter reset. + if !c.consecutive && h.CounterResetHint == histogram.NotCounterReset { + h.CounterResetHint = histogram.UnknownCounterReset + } + return t, h } func (c *chainSampleIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { if c.curr == nil { panic("chainSampleIterator.AtFloatHistogram called before first .Next or after .Next returned false.") } - return c.curr.AtFloatHistogram() + t, fh := c.curr.AtFloatHistogram() + // If the current sample is not consecutive with the previous one, we + // cannot be sure anymore about counter resets for counter histograms. + // TODO(beorn7): If a `NotCounterReset` sample is followed by a + // non-consecutive `CounterReset` sample, we could keep the hint as + // `CounterReset`. But then we needed to track the previous sample + // in more detail, which might not be worth it. + if !c.consecutive && fh.CounterResetHint != histogram.GaugeType { + fh.CounterResetHint = histogram.UnknownCounterReset + } + return t, fh } func (c *chainSampleIterator) AtT() int64 { @@ -529,7 +552,13 @@ func (c *chainSampleIterator) AtT() int64 { } func (c *chainSampleIterator) Next() chunkenc.ValueType { + var ( + currT int64 + currValueType chunkenc.ValueType + iteratorChanged bool + ) if c.h == nil { + iteratorChanged = true c.h = samplesIteratorHeap{} // We call c.curr.Next() as the first thing below. // So, we don't call Next() on it here. @@ -545,8 +574,6 @@ func (c *chainSampleIterator) Next() chunkenc.ValueType { return chunkenc.ValNone } - var currT int64 - var currValueType chunkenc.ValueType for { currValueType = c.curr.Next() if currValueType != chunkenc.ValNone { @@ -576,6 +603,7 @@ func (c *chainSampleIterator) Next() chunkenc.ValueType { } c.curr = heap.Pop(&c.h).(chunkenc.Iterator) + iteratorChanged = true currT = c.curr.AtT() currValueType = c.curr.Seek(currT) if currT != c.lastT { @@ -583,6 +611,7 @@ func (c *chainSampleIterator) Next() chunkenc.ValueType { } } + c.consecutive = !iteratorChanged c.lastT = currT return currValueType } diff --git a/tsdb/block_test.go b/tsdb/block_test.go index fdbc128f4a..ab33999623 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -632,6 +632,16 @@ func genHistogramSeries(totalSeries, labelCount int, mint, maxt, step int64, flo }, PositiveBuckets: []int64{int64(ts + 1), 1, -1, 0}, } + if ts != mint { + // By setting the counter reset hint to "no counter + // reset" for all histograms but the first, we cover the + // most common cases. If the series is manipulated later + // or spans more than one block when ingested into the + // storage, the hint has to be adjusted. Note that the + // storage itself treats this particular hint the same + // as "unknown". + h.CounterResetHint = histogram.NotCounterReset + } if floatHistogram { return sample{t: ts, fh: h.ToFloat()} } @@ -661,6 +671,13 @@ func genHistogramAndFloatSeries(totalSeries, labelCount int, mint, maxt, step in }, PositiveBuckets: []int64{int64(ts + 1), 1, -1, 0}, } + if count > 1 && count%5 != 1 { + // Same rationale for this as above in + // genHistogramSeries, just that we have to be + // smarter to find out if the previous sample + // was a histogram, too. + h.CounterResetHint = histogram.NotCounterReset + } if floatHistogram { s = sample{t: ts, fh: h.ToFloat()} } else { diff --git a/tsdb/chunkenc/float_histogram.go b/tsdb/chunkenc/float_histogram.go index 538af364ae..b462c6d9fd 100644 --- a/tsdb/chunkenc/float_histogram.go +++ b/tsdb/chunkenc/float_histogram.go @@ -217,10 +217,10 @@ func (a *FloatHistogramAppender) AppendHistogram(int64, *histogram.Histogram) { panic("appended an integer histogram to a float histogram chunk") } -// Appendable returns whether the chunk can be appended to, and if so -// whether any recoding needs to happen using the provided interjections -// (in case of any new buckets, positive or negative range, respectively). -// If the sample is a gauge histogram, AppendableGauge must be used instead. +// Appendable returns whether the chunk can be appended to, and if so whether +// any recoding needs to happen using the provided inserts (in case of any new +// buckets, positive or negative range, respectively). If the sample is a gauge +// histogram, AppendableGauge must be used instead. // // The chunk is not appendable in the following cases: // - The schema has changed. @@ -233,7 +233,7 @@ func (a *FloatHistogramAppender) AppendHistogram(int64, *histogram.Histogram) { // because of a counter reset. If the given sample is stale, it is always ok to // append. If counterReset is true, okToAppend is always false. func (a *FloatHistogramAppender) Appendable(h *histogram.FloatHistogram) ( - positiveInterjections, negativeInterjections []Interjection, + positiveInserts, negativeInserts []Insert, okToAppend, counterReset bool, ) { if a.NumSamples() > 0 && a.GetCounterResetHeader() == GaugeType { @@ -267,12 +267,12 @@ func (a *FloatHistogramAppender) Appendable(h *histogram.FloatHistogram) ( } var ok bool - positiveInterjections, ok = forwardCompareSpans(a.pSpans, h.PositiveSpans) + positiveInserts, ok = expandSpansForward(a.pSpans, h.PositiveSpans) if !ok { counterReset = true return } - negativeInterjections, ok = forwardCompareSpans(a.nSpans, h.NegativeSpans) + negativeInserts, ok = expandSpansForward(a.nSpans, h.NegativeSpans) if !ok { counterReset = true return @@ -280,7 +280,7 @@ func (a *FloatHistogramAppender) Appendable(h *histogram.FloatHistogram) ( if counterResetInAnyFloatBucket(a.pBuckets, h.PositiveBuckets, a.pSpans, h.PositiveSpans) || counterResetInAnyFloatBucket(a.nBuckets, h.NegativeBuckets, a.nSpans, h.NegativeSpans) { - counterReset, positiveInterjections, negativeInterjections = true, nil, nil + counterReset, positiveInserts, negativeInserts = true, nil, nil return } @@ -290,10 +290,11 @@ func (a *FloatHistogramAppender) Appendable(h *histogram.FloatHistogram) ( // AppendableGauge returns whether the chunk can be appended to, and if so // whether: -// 1. Any recoding needs to happen to the chunk using the provided interjections +// 1. Any recoding needs to happen to the chunk using the provided inserts // (in case of any new buckets, positive or negative range, respectively). -// 2. Any recoding needs to happen for the histogram being appended, using the backward interjections -// (in case of any missing buckets, positive or negative range, respectively). +// 2. Any recoding needs to happen for the histogram being appended, using the +// backward inserts (in case of any missing buckets, positive or negative +// range, respectively). // // This method must be only used for gauge histograms. // @@ -302,8 +303,8 @@ func (a *FloatHistogramAppender) Appendable(h *histogram.FloatHistogram) ( // - The threshold for the zero bucket has changed. // - The last sample in the chunk was stale while the current sample is not stale. func (a *FloatHistogramAppender) AppendableGauge(h *histogram.FloatHistogram) ( - positiveInterjections, negativeInterjections []Interjection, - backwardPositiveInterjections, backwardNegativeInterjections []Interjection, + positiveInserts, negativeInserts []Insert, + backwardPositiveInserts, backwardNegativeInserts []Insert, positiveSpans, negativeSpans []histogram.Span, okToAppend bool, ) { @@ -325,8 +326,8 @@ func (a *FloatHistogramAppender) AppendableGauge(h *histogram.FloatHistogram) ( return } - positiveInterjections, backwardPositiveInterjections, positiveSpans = bidirectionalCompareSpans(a.pSpans, h.PositiveSpans) - negativeInterjections, backwardNegativeInterjections, negativeSpans = bidirectionalCompareSpans(a.nSpans, h.NegativeSpans) + positiveInserts, backwardPositiveInserts, positiveSpans = expandSpansBothWays(a.pSpans, h.PositiveSpans) + negativeInserts, backwardNegativeInserts, negativeSpans = expandSpansBothWays(a.nSpans, h.NegativeSpans) okToAppend = true return } @@ -501,12 +502,12 @@ func (a *FloatHistogramAppender) writeXorValue(old *xorValue, v float64) { } // Recode converts the current chunk to accommodate an expansion of the set of -// (positive and/or negative) buckets used, according to the provided -// interjections, resulting in the honoring of the provided new positive and -// negative spans. To continue appending, use the returned Appender rather than -// the receiver of this method. +// (positive and/or negative) buckets used, according to the provided inserts, +// resulting in the honoring of the provided new positive and negative spans. To +// continue appending, use the returned Appender rather than the receiver of +// this method. func (a *FloatHistogramAppender) Recode( - positiveInterjections, negativeInterjections []Interjection, + positiveInserts, negativeInserts []Insert, positiveSpans, negativeSpans []histogram.Span, ) (Chunk, Appender) { // TODO(beorn7): This currently just decodes everything and then encodes @@ -539,11 +540,11 @@ func (a *FloatHistogramAppender) Recode( // Save the modified histogram to the new chunk. hOld.PositiveSpans, hOld.NegativeSpans = positiveSpans, negativeSpans - if len(positiveInterjections) > 0 { - hOld.PositiveBuckets = interject(hOld.PositiveBuckets, positiveBuckets, positiveInterjections, false) + if len(positiveInserts) > 0 { + hOld.PositiveBuckets = insert(hOld.PositiveBuckets, positiveBuckets, positiveInserts, false) } - if len(negativeInterjections) > 0 { - hOld.NegativeBuckets = interject(hOld.NegativeBuckets, negativeBuckets, negativeInterjections, false) + if len(negativeInserts) > 0 { + hOld.NegativeBuckets = insert(hOld.NegativeBuckets, negativeBuckets, negativeInserts, false) } app.AppendFloatHistogram(tOld, hOld) } @@ -556,15 +557,15 @@ func (a *FloatHistogramAppender) Recode( // (positive and/or negative) buckets used. func (a *FloatHistogramAppender) RecodeHistogramm( fh *histogram.FloatHistogram, - pBackwardInter, nBackwardInter []Interjection, + pBackwardInter, nBackwardInter []Insert, ) { if len(pBackwardInter) > 0 { numPositiveBuckets := countSpans(fh.PositiveSpans) - fh.PositiveBuckets = interject(fh.PositiveBuckets, make([]float64, numPositiveBuckets), pBackwardInter, false) + fh.PositiveBuckets = insert(fh.PositiveBuckets, make([]float64, numPositiveBuckets), pBackwardInter, false) } if len(nBackwardInter) > 0 { numNegativeBuckets := countSpans(fh.NegativeSpans) - fh.NegativeBuckets = interject(fh.NegativeBuckets, make([]float64, numNegativeBuckets), nBackwardInter, false) + fh.NegativeBuckets = insert(fh.NegativeBuckets, make([]float64, numNegativeBuckets), nBackwardInter, false) } } @@ -627,12 +628,8 @@ func (it *floatHistogramIterator) AtFloatHistogram() (int64, *histogram.FloatHis return it.t, &histogram.FloatHistogram{Sum: it.sum.value} } it.atFloatHistogramCalled = true - crHint := histogram.UnknownCounterReset - if it.counterResetHeader == GaugeType { - crHint = histogram.GaugeType - } return it.t, &histogram.FloatHistogram{ - CounterResetHint: crHint, + CounterResetHint: counterResetHint(it.counterResetHeader, it.numRead), Count: it.cnt.value, ZeroCount: it.zCnt.value, Sum: it.sum.value, diff --git a/tsdb/chunkenc/float_histogram_test.go b/tsdb/chunkenc/float_histogram_test.go index b080fe676c..31d96ee7a9 100644 --- a/tsdb/chunkenc/float_histogram_test.go +++ b/tsdb/chunkenc/float_histogram_test.go @@ -66,7 +66,9 @@ func TestFloatHistogramChunkSameBuckets(t *testing.T) { h.PositiveBuckets = []int64{5, -2, 1, -2} // counts: 5, 3, 4, 2 (total 14) h.NegativeBuckets = []int64{4, -1, 1, -1} // counts: 4, 3, 4, 4 (total 15) app.AppendFloatHistogram(ts, h.ToFloat()) - exp = append(exp, floatResult{t: ts, h: h.ToFloat()}) + expH := h.ToFloat() + expH.CounterResetHint = histogram.NotCounterReset + exp = append(exp, floatResult{t: ts, h: expH}) require.Equal(t, 2, c.NumSamples()) // Add update with new appender. @@ -81,7 +83,9 @@ func TestFloatHistogramChunkSameBuckets(t *testing.T) { h.PositiveBuckets = []int64{6, 1, -3, 6} // counts: 6, 7, 4, 10 (total 27) h.NegativeBuckets = []int64{5, 1, -2, 3} // counts: 5, 6, 4, 7 (total 22) app.AppendFloatHistogram(ts, h.ToFloat()) - exp = append(exp, floatResult{t: ts, h: h.ToFloat()}) + expH = h.ToFloat() + expH.CounterResetHint = histogram.NotCounterReset + exp = append(exp, floatResult{t: ts, h: expH}) require.Equal(t, 3, c.NumSamples()) // 1. Expand iterator in simple case. @@ -138,7 +142,7 @@ func TestFloatHistogramChunkSameBuckets(t *testing.T) { require.Equal(t, ValNone, it4.Seek(exp[len(exp)-1].t+1)) } -// Mimics the scenario described for compareSpans(). +// Mimics the scenario described for expandSpansForward. func TestFloatHistogramChunkBucketChanges(t *testing.T) { c := Chunk(NewFloatHistogramChunk()) @@ -209,9 +213,11 @@ func TestFloatHistogramChunkBucketChanges(t *testing.T) { h1.PositiveBuckets = []int64{6, -3, -3, 3, -3, 0, 2, 2, 1, -5, 1} h1.NegativeSpans = h2.NegativeSpans h1.NegativeBuckets = []int64{0, 1} + expH2 := h2.ToFloat() + expH2.CounterResetHint = histogram.NotCounterReset exp := []floatResult{ {t: ts1, h: h1.ToFloat()}, - {t: ts2, h: h2.ToFloat()}, + {t: ts2, h: expH2}, } it := c.Iterator(nil) var act []floatResult diff --git a/tsdb/chunkenc/histogram.go b/tsdb/chunkenc/histogram.go index 6ceb866a52..7b6a9cacb3 100644 --- a/tsdb/chunkenc/histogram.go +++ b/tsdb/chunkenc/histogram.go @@ -243,10 +243,10 @@ func (a *HistogramAppender) AppendFloatHistogram(int64, *histogram.FloatHistogra panic("appended a float histogram to a histogram chunk") } -// Appendable returns whether the chunk can be appended to, and if so -// whether any recoding needs to happen using the provided interjections -// (in case of any new buckets, positive or negative range, respectively). -// If the sample is a gauge histogram, AppendableGauge must be used instead. +// Appendable returns whether the chunk can be appended to, and if so whether +// any recoding needs to happen using the provided inserts (in case of any new +// buckets, positive or negative range, respectively). If the sample is a gauge +// histogram, AppendableGauge must be used instead. // // The chunk is not appendable in the following cases: // @@ -261,7 +261,7 @@ func (a *HistogramAppender) AppendFloatHistogram(int64, *histogram.FloatHistogra // because of a counter reset. If the given sample is stale, it is always ok to // append. If counterReset is true, okToAppend is always false. func (a *HistogramAppender) Appendable(h *histogram.Histogram) ( - positiveInterjections, negativeInterjections []Interjection, + positiveInserts, negativeInserts []Insert, okToAppend, counterReset bool, ) { if a.NumSamples() > 0 && a.GetCounterResetHeader() == GaugeType { @@ -295,12 +295,12 @@ func (a *HistogramAppender) Appendable(h *histogram.Histogram) ( } var ok bool - positiveInterjections, ok = forwardCompareSpans(a.pSpans, h.PositiveSpans) + positiveInserts, ok = expandSpansForward(a.pSpans, h.PositiveSpans) if !ok { counterReset = true return } - negativeInterjections, ok = forwardCompareSpans(a.nSpans, h.NegativeSpans) + negativeInserts, ok = expandSpansForward(a.nSpans, h.NegativeSpans) if !ok { counterReset = true return @@ -308,7 +308,7 @@ func (a *HistogramAppender) Appendable(h *histogram.Histogram) ( if counterResetInAnyBucket(a.pBuckets, h.PositiveBuckets, a.pSpans, h.PositiveSpans) || counterResetInAnyBucket(a.nBuckets, h.NegativeBuckets, a.nSpans, h.NegativeSpans) { - counterReset, positiveInterjections, negativeInterjections = true, nil, nil + counterReset, positiveInserts, negativeInserts = true, nil, nil return } @@ -318,10 +318,11 @@ func (a *HistogramAppender) Appendable(h *histogram.Histogram) ( // AppendableGauge returns whether the chunk can be appended to, and if so // whether: -// 1. Any recoding needs to happen to the chunk using the provided interjections +// 1. Any recoding needs to happen to the chunk using the provided inserts // (in case of any new buckets, positive or negative range, respectively). -// 2. Any recoding needs to happen for the histogram being appended, using the backward interjections -// (in case of any missing buckets, positive or negative range, respectively). +// 2. Any recoding needs to happen for the histogram being appended, using the +// backward inserts (in case of any missing buckets, positive or negative +// range, respectively). // // This method must be only used for gauge histograms. // @@ -330,8 +331,8 @@ func (a *HistogramAppender) Appendable(h *histogram.Histogram) ( // - The threshold for the zero bucket has changed. // - The last sample in the chunk was stale while the current sample is not stale. func (a *HistogramAppender) AppendableGauge(h *histogram.Histogram) ( - positiveInterjections, negativeInterjections []Interjection, - backwardPositiveInterjections, backwardNegativeInterjections []Interjection, + positiveInserts, negativeInserts []Insert, + backwardPositiveInserts, backwardNegativeInserts []Insert, positiveSpans, negativeSpans []histogram.Span, okToAppend bool, ) { @@ -353,8 +354,8 @@ func (a *HistogramAppender) AppendableGauge(h *histogram.Histogram) ( return } - positiveInterjections, backwardPositiveInterjections, positiveSpans = bidirectionalCompareSpans(a.pSpans, h.PositiveSpans) - negativeInterjections, backwardNegativeInterjections, negativeSpans = bidirectionalCompareSpans(a.nSpans, h.NegativeSpans) + positiveInserts, backwardPositiveInserts, positiveSpans = expandSpansBothWays(a.pSpans, h.PositiveSpans) + negativeInserts, backwardNegativeInserts, negativeSpans = expandSpansBothWays(a.nSpans, h.NegativeSpans) okToAppend = true return } @@ -488,8 +489,9 @@ func (a *HistogramAppender) AppendHistogram(t int64, h *histogram.Histogram) { putVarbitInt(a.b, b) } } else { - // The case for the 2nd sample with single deltas is implicitly handled correctly with the double delta code, - // so we don't need a separate single delta logic for the 2nd sample. + // The case for the 2nd sample with single deltas is implicitly + // handled correctly with the double delta code, so we don't + // need a separate single delta logic for the 2nd sample. tDelta = t - a.t cntDelta = int64(h.Count) - int64(a.cnt) @@ -539,12 +541,12 @@ func (a *HistogramAppender) AppendHistogram(t int64, h *histogram.Histogram) { } // Recode converts the current chunk to accommodate an expansion of the set of -// (positive and/or negative) buckets used, according to the provided -// interjections, resulting in the honoring of the provided new positive and -// negative spans. To continue appending, use the returned Appender rather than -// the receiver of this method. +// (positive and/or negative) buckets used, according to the provided inserts, +// resulting in the honoring of the provided new positive and negative spans. To +// continue appending, use the returned Appender rather than the receiver of +// this method. func (a *HistogramAppender) Recode( - positiveInterjections, negativeInterjections []Interjection, + positiveInserts, negativeInserts []Insert, positiveSpans, negativeSpans []histogram.Span, ) (Chunk, Appender) { // TODO(beorn7): This currently just decodes everything and then encodes @@ -577,11 +579,11 @@ func (a *HistogramAppender) Recode( // Save the modified histogram to the new chunk. hOld.PositiveSpans, hOld.NegativeSpans = positiveSpans, negativeSpans - if len(positiveInterjections) > 0 { - hOld.PositiveBuckets = interject(hOld.PositiveBuckets, positiveBuckets, positiveInterjections, true) + if len(positiveInserts) > 0 { + hOld.PositiveBuckets = insert(hOld.PositiveBuckets, positiveBuckets, positiveInserts, true) } - if len(negativeInterjections) > 0 { - hOld.NegativeBuckets = interject(hOld.NegativeBuckets, negativeBuckets, negativeInterjections, true) + if len(negativeInserts) > 0 { + hOld.NegativeBuckets = insert(hOld.NegativeBuckets, negativeBuckets, negativeInserts, true) } app.AppendHistogram(tOld, hOld) } @@ -590,19 +592,19 @@ func (a *HistogramAppender) Recode( return hc, app } -// RecodeHistogram converts the current histogram (in-place) to accommodate an expansion of the set of -// (positive and/or negative) buckets used. +// RecodeHistogram converts the current histogram (in-place) to accommodate an +// expansion of the set of (positive and/or negative) buckets used. func (a *HistogramAppender) RecodeHistogram( h *histogram.Histogram, - pBackwardInter, nBackwardInter []Interjection, + pBackwardInserts, nBackwardInserts []Insert, ) { - if len(pBackwardInter) > 0 { + if len(pBackwardInserts) > 0 { numPositiveBuckets := countSpans(h.PositiveSpans) - h.PositiveBuckets = interject(h.PositiveBuckets, make([]int64, numPositiveBuckets), pBackwardInter, true) + h.PositiveBuckets = insert(h.PositiveBuckets, make([]int64, numPositiveBuckets), pBackwardInserts, true) } - if len(nBackwardInter) > 0 { + if len(nBackwardInserts) > 0 { numNegativeBuckets := countSpans(h.NegativeSpans) - h.NegativeBuckets = interject(h.NegativeBuckets, make([]int64, numNegativeBuckets), nBackwardInter, true) + h.NegativeBuckets = insert(h.NegativeBuckets, make([]int64, numNegativeBuckets), nBackwardInserts, true) } } @@ -665,12 +667,8 @@ func (it *histogramIterator) AtHistogram() (int64, *histogram.Histogram) { return it.t, &histogram.Histogram{Sum: it.sum} } it.atHistogramCalled = true - crHint := histogram.UnknownCounterReset - if it.counterResetHeader == GaugeType { - crHint = histogram.GaugeType - } return it.t, &histogram.Histogram{ - CounterResetHint: crHint, + CounterResetHint: counterResetHint(it.counterResetHeader, it.numRead), Count: it.cnt, ZeroCount: it.zCnt, Sum: it.sum, @@ -688,12 +686,8 @@ func (it *histogramIterator) AtFloatHistogram() (int64, *histogram.FloatHistogra return it.t, &histogram.FloatHistogram{Sum: it.sum} } it.atFloatHistogramCalled = true - crHint := histogram.UnknownCounterReset - if it.counterResetHeader == GaugeType { - crHint = histogram.GaugeType - } return it.t, &histogram.FloatHistogram{ - CounterResetHint: crHint, + CounterResetHint: counterResetHint(it.counterResetHeader, it.numRead), Count: float64(it.cnt), ZeroCount: float64(it.zCnt), Sum: it.sum, diff --git a/tsdb/chunkenc/histogram_meta.go b/tsdb/chunkenc/histogram_meta.go index 345b8cc519..027eee1129 100644 --- a/tsdb/chunkenc/histogram_meta.go +++ b/tsdb/chunkenc/histogram_meta.go @@ -19,7 +19,10 @@ import ( "github.com/prometheus/prometheus/model/histogram" ) -func writeHistogramChunkLayout(b *bstream, schema int32, zeroThreshold float64, positiveSpans, negativeSpans []histogram.Span) { +func writeHistogramChunkLayout( + b *bstream, schema int32, zeroThreshold float64, + positiveSpans, negativeSpans []histogram.Span, +) { putZeroThreshold(b, zeroThreshold) putVarbitInt(b, int64(schema)) putHistogramChunkLayoutSpans(b, positiveSpans) @@ -91,9 +94,7 @@ func readHistogramChunkLayoutSpans(b *bstreamReader) ([]histogram.Span, error) { // putZeroThreshold writes the zero threshold to the bstream. It stores typical // values in just one byte, but needs 9 bytes for other values. In detail: -// -// * If the threshold is 0, store a single zero byte. -// +// - If the threshold is 0, store a single zero byte. // - If the threshold is a power of 2 between (and including) 2^-243 and 2^10, // take the exponent from the IEEE 754 representation of the threshold, which // covers a range between (and including) -242 and 11. (2^-243 is 0.5*2^-242 @@ -103,7 +104,6 @@ func readHistogramChunkLayoutSpans(b *bstreamReader) ([]histogram.Span, error) { // threshold. The default value for the zero threshold is 2^-128 (or // 0.5*2^-127 in IEEE 754 representation) and will therefore be encoded as a // single byte (with value 116). -// // - In all other cases, store 255 as a single byte, followed by the 8 bytes of // the threshold as a float64, i.e. taking 9 bytes in total. func putZeroThreshold(b *bstream, threshold float64) { @@ -186,16 +186,16 @@ func (b *bucketIterator) Next() (int, bool) { return 0, false } -// An Interjection describes how many new buckets have to be introduced before -// processing the pos'th delta from the original slice. -type Interjection struct { +// An Insert describes how many new buckets have to be inserted before +// processing the pos'th bucket from the original slice. +type Insert struct { pos int num int } -// forwardCompareSpans returns the interjections to convert a slice of deltas to a new -// slice representing an expanded set of buckets, or false if incompatible -// (e.g. if buckets were removed). +// expandSpansForward returns the inserts to expand the bucket spans 'a' so that +// they match the spans in 'b'. 'b' must cover the same or more buckets than +// 'a', otherwise the function will return false. // // Example: // @@ -222,25 +222,25 @@ type Interjection struct { // deltas 6 -3 -3 3 -3 0 2 2 1 -5 1 // delta mods: / \ / \ / \ // -// Note that whenever any new buckets are introduced, the subsequent "old" -// bucket needs to readjust its delta to the new base of 0. Thus, for the caller -// who wants to transform the set of original deltas to a new set of deltas to -// match a new span layout that adds buckets, we simply need to generate a list -// of interjections. +// Note for histograms with delta-encoded buckets: Whenever any new buckets are +// introduced, the subsequent "old" bucket needs to readjust its delta to the +// new base of 0. Thus, for the caller who wants to transform the set of +// original deltas to a new set of deltas to match a new span layout that adds +// buckets, we simply need to generate a list of inserts. // -// Note: Within forwardCompareSpans we don't have to worry about the changes to the +// Note: Within expandSpansForward we don't have to worry about the changes to the // spans themselves, thanks to the iterators we get to work with the more useful // bucket indices (which of course directly correspond to the buckets we have to // adjust). -func forwardCompareSpans(a, b []histogram.Span) (forward []Interjection, ok bool) { +func expandSpansForward(a, b []histogram.Span) (forward []Insert, ok bool) { ai := newBucketIterator(a) bi := newBucketIterator(b) - var interjections []Interjection + var inserts []Insert - // When inter.num becomes > 0, this becomes a valid interjection that - // should be yielded when we finish a streak of new buckets. - var inter Interjection + // When inter.num becomes > 0, this becomes a valid insert that should + // be yielded when we finish a streak of new buckets. + var inter Insert av, aOK := ai.Next() bv, bOK := bi.Next() @@ -250,43 +250,46 @@ loop: case aOK && bOK: switch { case av == bv: // Both have an identical value. move on! - // Finish WIP interjection and reset. + // Finish WIP insert and reset. if inter.num > 0 { - interjections = append(interjections, inter) + inserts = append(inserts, inter) } inter.num = 0 av, aOK = ai.Next() bv, bOK = bi.Next() inter.pos++ case av < bv: // b misses a value that is in a. - return interjections, false + return inserts, false case av > bv: // a misses a value that is in b. Forward b and recompare. inter.num++ bv, bOK = bi.Next() } case aOK && !bOK: // b misses a value that is in a. - return interjections, false + return inserts, false case !aOK && bOK: // a misses a value that is in b. Forward b and recompare. inter.num++ bv, bOK = bi.Next() default: // Both iterators ran out. We're done. if inter.num > 0 { - interjections = append(interjections, inter) + inserts = append(inserts, inter) } break loop } } - return interjections, true + return inserts, true } -// bidirectionalCompareSpans does everything that forwardCompareSpans does and -// also returns interjections in the other direction (i.e. buckets missing in b that are missing in a). -func bidirectionalCompareSpans(a, b []histogram.Span) (forward, backward []Interjection, mergedSpans []histogram.Span) { +// expandSpansBothWays is similar to expandSpansForward, but now b may also +// cover an entirely different set of buckets. The function returns the +// “forward” inserts to expand 'a' to also cover all the buckets exclusively +// covered by 'b', and it returns the “backward” inserts to expand 'b' to also +// cover all the buckets exclusively covered by 'a' +func expandSpansBothWays(a, b []histogram.Span) (forward, backward []Insert, mergedSpans []histogram.Span) { ai := newBucketIterator(a) bi := newBucketIterator(b) - var interjections, bInterjections []Interjection + var fInserts, bInserts []Insert var lastBucket int addBucket := func(b int) { offset := b - lastBucket - 1 @@ -305,9 +308,10 @@ func bidirectionalCompareSpans(a, b []histogram.Span) (forward, backward []Inter lastBucket = b } - // When inter.num becomes > 0, this becomes a valid interjection that - // should be yielded when we finish a streak of new buckets. - var inter, bInter Interjection + // When fInter.num (or bInter.num, respectively) becomes > 0, this + // becomes a valid insert that should be yielded when we finish a streak + // of new buckets. + var fInter, bInter Insert av, aOK := ai.Next() bv, bOK := bi.Next() @@ -317,37 +321,37 @@ loop: case aOK && bOK: switch { case av == bv: // Both have an identical value. move on! - // Finish WIP interjection and reset. - if inter.num > 0 { - interjections = append(interjections, inter) - inter.num = 0 + // Finish WIP insert and reset. + if fInter.num > 0 { + fInserts = append(fInserts, fInter) + fInter.num = 0 } if bInter.num > 0 { - bInterjections = append(bInterjections, bInter) + bInserts = append(bInserts, bInter) bInter.num = 0 } addBucket(av) av, aOK = ai.Next() bv, bOK = bi.Next() - inter.pos++ + fInter.pos++ bInter.pos++ case av < bv: // b misses a value that is in a. bInter.num++ - // Collect the forward interjection before advancing the - // position of 'a'. - if inter.num > 0 { - interjections = append(interjections, inter) - inter.num = 0 + // Collect the forward inserts before advancing + // the position of 'a'. + if fInter.num > 0 { + fInserts = append(fInserts, fInter) + fInter.num = 0 } addBucket(av) - inter.pos++ + fInter.pos++ av, aOK = ai.Next() case av > bv: // a misses a value that is in b. Forward b and recompare. - inter.num++ - // Collect the backward interjection before advancing the + fInter.num++ + // Collect the backward inserts before advancing the // position of 'b'. if bInter.num > 0 { - bInterjections = append(bInterjections, bInter) + bInserts = append(bInserts, bInter) bInter.num = 0 } addBucket(bv) @@ -359,92 +363,127 @@ loop: addBucket(av) av, aOK = ai.Next() case !aOK && bOK: // a misses a value that is in b. Forward b and recompare. - inter.num++ + fInter.num++ addBucket(bv) bv, bOK = bi.Next() default: // Both iterators ran out. We're done. - if inter.num > 0 { - interjections = append(interjections, inter) + if fInter.num > 0 { + fInserts = append(fInserts, fInter) } if bInter.num > 0 { - bInterjections = append(bInterjections, bInter) + bInserts = append(bInserts, bInter) } break loop } } - return interjections, bInterjections, mergedSpans + return fInserts, bInserts, mergedSpans } type bucketValue interface { int64 | float64 } -// interject merges 'in' with the provided interjections and writes them into -// 'out', which must already have the appropriate length. -func interject[BV bucketValue](in, out []BV, interjections []Interjection, deltas bool) []BV { +// insert merges 'in' with the provided inserts and writes them into 'out', +// which must already have the appropriate length. 'out' is also returned for +// convenience. +func insert[BV bucketValue](in, out []BV, inserts []Insert, deltas bool) []BV { var ( - j int // Position in out. - v BV // The last value seen. - interj int // The next interjection to process. + oi int // Position in out. + v BV // The last value seen. + ii int // The next insert to process. ) for i, d := range in { - if interj < len(interjections) && i == interjections[interj].pos { - - // We have an interjection! - // Add interjection.num new delta values such that their bucket values equate 0. - // When deltas==false, it means that it is an absolute value. So we set it to 0 directly. + if ii < len(inserts) && i == inserts[ii].pos { + // We have an insert! + // Add insert.num new delta values such that their + // bucket values equate 0. When deltas==false, it means + // that it is an absolute value. So we set it to 0 + // directly. if deltas { - out[j] = -v + out[oi] = -v } else { - out[j] = 0 + out[oi] = 0 } - j++ - for x := 1; x < interjections[interj].num; x++ { - out[j] = 0 - j++ + oi++ + for x := 1; x < inserts[ii].num; x++ { + out[oi] = 0 + oi++ } - interj++ + ii++ // Now save the value from the input. The delta value we // should save is the original delta value + the last - // value of the point before the interjection (to undo - // the delta that was introduced by the interjection). - // When deltas==false, it means that it is an absolute value, + // value of the point before the insert (to undo the + // delta that was introduced by the insert). When + // deltas==false, it means that it is an absolute value, // so we set it directly to the value in the 'in' slice. if deltas { - out[j] = d + v + out[oi] = d + v } else { - out[j] = d + out[oi] = d } - j++ + oi++ v = d + v continue } - - // If there was no interjection, the original delta is still - // valid. - out[j] = d - j++ + // If there was no insert, the original delta is still valid. + out[oi] = d + oi++ v += d } - switch interj { - case len(interjections): - // All interjections processed. Nothing more to do. - case len(interjections) - 1: - // One more interjection to process at the end. + switch ii { + case len(inserts): + // All inserts processed. Nothing more to do. + case len(inserts) - 1: + // One more insert to process at the end. if deltas { - out[j] = -v + out[oi] = -v } else { - out[j] = 0 + out[oi] = 0 } - j++ - for x := 1; x < interjections[interj].num; x++ { - out[j] = 0 - j++ + oi++ + for x := 1; x < inserts[ii].num; x++ { + out[oi] = 0 + oi++ } default: - panic("unprocessed interjections left") + panic("unprocessed inserts left") } return out } + +// counterResetHint returns a CounterResetHint based on the CounterResetHeader +// and on the position into the chunk. +func counterResetHint(crh CounterResetHeader, numRead uint16) histogram.CounterResetHint { + switch { + case crh == GaugeType: + // A gauge histogram chunk only contains gauge histograms. + return histogram.GaugeType + case numRead > 1: + // In a counter histogram chunk, there will not be any counter + // resets after the first histogram. + return histogram.NotCounterReset + case crh == CounterReset: + // If the chunk was started because of a counter reset, we can + // safely return that hint. This histogram always has to be + // treated as a counter reset. + return histogram.CounterReset + default: + // Sadly, we have to return "unknown" as the hint for all other + // cases, even if we know that the chunk was started without a + // counter reset. But we cannot be sure that the previous chunk + // still exists in the TSDB, so we conservatively return + // "unknown". On the bright side, this case should be relatively + // rare. + // + // TODO(beorn7): Nevertheless, if the current chunk is in the + // middle of a block (not the first chunk in the block for this + // series), it's probably safe to assume that the previous chunk + // will exist in the TSDB for as long as the current chunk + // exist, and we could safely return + // "histogram.NotCounterReset". This needs some more work and + // might not be worth the effort and/or risk. To be vetted... + return histogram.UnknownCounterReset + } +} diff --git a/tsdb/chunkenc/histogram_meta_test.go b/tsdb/chunkenc/histogram_meta_test.go index a4ce62f3b7..0b2b187475 100644 --- a/tsdb/chunkenc/histogram_meta_test.go +++ b/tsdb/chunkenc/histogram_meta_test.go @@ -78,7 +78,7 @@ func TestBucketIterator(t *testing.T) { }, idxs: []int{100, 101, 102, 103, 112, 113, 114, 115, 116, 117, 118, 119}, }, - // The below 2 sets ore the ones described in compareSpans's comments. + // The below 2 sets ore the ones described in expandSpansForward's comments. { spans: []histogram.Span{ {Offset: 0, Length: 2}, @@ -111,12 +111,12 @@ func TestBucketIterator(t *testing.T) { } } -func TestCompareSpansAndInterject(t *testing.T) { +func TestCompareSpansAndInsert(t *testing.T) { scenarios := []struct { - description string - spansA, spansB []histogram.Span - interjections, backwardInterjections []Interjection - bucketsIn, bucketsOut []int64 + description string + spansA, spansB []histogram.Span + fInserts, bInserts []Insert + bucketsIn, bucketsOut []int64 }{ { description: "single prepend at the beginning", @@ -126,7 +126,7 @@ func TestCompareSpansAndInterject(t *testing.T) { spansB: []histogram.Span{ {Offset: -11, Length: 4}, }, - interjections: []Interjection{ + fInserts: []Insert{ { pos: 0, num: 1, @@ -143,7 +143,7 @@ func TestCompareSpansAndInterject(t *testing.T) { spansB: []histogram.Span{ {Offset: -10, Length: 4}, }, - interjections: []Interjection{ + fInserts: []Insert{ { pos: 3, num: 1, @@ -160,7 +160,7 @@ func TestCompareSpansAndInterject(t *testing.T) { spansB: []histogram.Span{ {Offset: -12, Length: 5}, }, - interjections: []Interjection{ + fInserts: []Insert{ { pos: 0, num: 2, @@ -177,7 +177,7 @@ func TestCompareSpansAndInterject(t *testing.T) { spansB: []histogram.Span{ {Offset: -10, Length: 5}, }, - interjections: []Interjection{ + fInserts: []Insert{ { pos: 3, num: 2, @@ -194,7 +194,7 @@ func TestCompareSpansAndInterject(t *testing.T) { spansB: []histogram.Span{ {Offset: -12, Length: 7}, }, - interjections: []Interjection{ + fInserts: []Insert{ { pos: 0, num: 2, @@ -215,7 +215,7 @@ func TestCompareSpansAndInterject(t *testing.T) { spansB: []histogram.Span{ {Offset: -9, Length: 3}, }, - backwardInterjections: []Interjection{ + bInserts: []Insert{ {pos: 0, num: 1}, }, }, @@ -228,7 +228,7 @@ func TestCompareSpansAndInterject(t *testing.T) { {Offset: -10, Length: 2}, {Offset: 1, Length: 1}, }, - backwardInterjections: []Interjection{ + bInserts: []Insert{ {pos: 2, num: 1}, }, }, @@ -240,7 +240,7 @@ func TestCompareSpansAndInterject(t *testing.T) { spansB: []histogram.Span{ {Offset: -10, Length: 3}, }, - backwardInterjections: []Interjection{ + bInserts: []Insert{ {pos: 3, num: 1}, }, }, @@ -259,7 +259,7 @@ func TestCompareSpansAndInterject(t *testing.T) { {Offset: 1, Length: 4}, {Offset: 3, Length: 3}, }, - interjections: []Interjection{ + fInserts: []Insert{ { pos: 2, num: 1, @@ -277,7 +277,7 @@ func TestCompareSpansAndInterject(t *testing.T) { bucketsOut: []int64{6, -3, -3, 3, -3, 0, 2, 2, 1, -5, 1}, }, { - description: "both forward and backward interjections, complex case", + description: "both forward and backward inserts, complex case", spansA: []histogram.Span{ {Offset: 0, Length: 2}, {Offset: 2, Length: 1}, @@ -292,7 +292,7 @@ func TestCompareSpansAndInterject(t *testing.T) { {Offset: 1, Length: 1}, {Offset: 4, Length: 1}, }, - interjections: []Interjection{ + fInserts: []Insert{ { pos: 2, num: 1, @@ -306,7 +306,7 @@ func TestCompareSpansAndInterject(t *testing.T) { num: 1, }, }, - backwardInterjections: []Interjection{ + bInserts: []Insert{ { pos: 0, num: 1, @@ -329,22 +329,22 @@ func TestCompareSpansAndInterject(t *testing.T) { for _, s := range scenarios { t.Run(s.description, func(t *testing.T) { - if len(s.backwardInterjections) > 0 { - interjections, bInterjections, _ := bidirectionalCompareSpans(s.spansA, s.spansB) - require.Equal(t, s.interjections, interjections) - require.Equal(t, s.backwardInterjections, bInterjections) + if len(s.bInserts) > 0 { + fInserts, bInserts, _ := expandSpansBothWays(s.spansA, s.spansB) + require.Equal(t, s.fInserts, fInserts) + require.Equal(t, s.bInserts, bInserts) } - interjections, valid := forwardCompareSpans(s.spansA, s.spansB) - if len(s.backwardInterjections) > 0 { + inserts, valid := expandSpansForward(s.spansA, s.spansB) + if len(s.bInserts) > 0 { require.False(t, valid, "compareScan unexpectedly returned true") return } require.True(t, valid, "compareScan unexpectedly returned false") - require.Equal(t, s.interjections, interjections) + require.Equal(t, s.fInserts, inserts) gotBuckets := make([]int64, len(s.bucketsOut)) - interject(s.bucketsIn, gotBuckets, interjections, true) + insert(s.bucketsIn, gotBuckets, inserts, true) require.Equal(t, s.bucketsOut, gotBuckets) floatBucketsIn := make([]float64, len(s.bucketsIn)) @@ -362,7 +362,7 @@ func TestCompareSpansAndInterject(t *testing.T) { floatBucketsOut[i] = float64(last) } gotFloatBuckets := make([]float64, len(floatBucketsOut)) - interject(floatBucketsIn, gotFloatBuckets, interjections, false) + insert(floatBucketsIn, gotFloatBuckets, inserts, false) require.Equal(t, floatBucketsOut, gotFloatBuckets) }) } @@ -564,12 +564,12 @@ func TestSpansFromBidirectionalCompareSpans(t *testing.T) { copy(s1c, c.s1) copy(s2c, c.s2) - _, _, act := bidirectionalCompareSpans(c.s1, c.s2) + _, _, act := expandSpansBothWays(c.s1, c.s2) require.Equal(t, c.exp, act) // Check that s1 and s2 are not modified. require.Equal(t, s1c, c.s1) require.Equal(t, s2c, c.s2) - _, _, act = bidirectionalCompareSpans(c.s2, c.s1) + _, _, act = expandSpansBothWays(c.s2, c.s1) require.Equal(t, c.exp, act) } } diff --git a/tsdb/chunkenc/histogram_test.go b/tsdb/chunkenc/histogram_test.go index 73851c9dfb..4bb146ccdb 100644 --- a/tsdb/chunkenc/histogram_test.go +++ b/tsdb/chunkenc/histogram_test.go @@ -67,7 +67,9 @@ func TestHistogramChunkSameBuckets(t *testing.T) { h.PositiveBuckets = []int64{5, -2, 1, -2} // counts: 5, 3, 4, 2 (total 14) h.NegativeBuckets = []int64{4, -1, 1, -1} // counts: 4, 3, 4, 4 (total 15) app.AppendHistogram(ts, h) - exp = append(exp, result{t: ts, h: h, fh: h.ToFloat()}) + hExp := h.Copy() + hExp.CounterResetHint = histogram.NotCounterReset + exp = append(exp, result{t: ts, h: hExp, fh: hExp.ToFloat()}) require.Equal(t, 2, c.NumSamples()) // Add update with new appender. @@ -82,7 +84,9 @@ func TestHistogramChunkSameBuckets(t *testing.T) { h.PositiveBuckets = []int64{6, 1, -3, 6} // counts: 6, 7, 4, 10 (total 27) h.NegativeBuckets = []int64{5, 1, -2, 3} // counts: 5, 6, 4, 7 (total 22) app.AppendHistogram(ts, h) - exp = append(exp, result{t: ts, h: h, fh: h.ToFloat()}) + hExp = h.Copy() + hExp.CounterResetHint = histogram.NotCounterReset + exp = append(exp, result{t: ts, h: hExp, fh: hExp.ToFloat()}) require.Equal(t, 3, c.NumSamples()) // 1. Expand iterator in simple case. @@ -149,7 +153,7 @@ func TestHistogramChunkSameBuckets(t *testing.T) { require.Equal(t, ValNone, it4.Seek(exp[len(exp)-1].t+1)) } -// Mimics the scenario described for compareSpans(). +// Mimics the scenario described for expandSpansForward. func TestHistogramChunkBucketChanges(t *testing.T) { c := Chunk(NewHistogramChunk()) @@ -220,9 +224,11 @@ func TestHistogramChunkBucketChanges(t *testing.T) { h1.PositiveBuckets = []int64{6, -3, -3, 3, -3, 0, 2, 2, 1, -5, 1} h1.NegativeSpans = h2.NegativeSpans h1.NegativeBuckets = []int64{0, 1} + hExp := h2.Copy() + hExp.CounterResetHint = histogram.NotCounterReset exp := []result{ {t: ts1, h: h1, fh: h1.ToFloat()}, - {t: ts2, h: h2, fh: h2.ToFloat()}, + {t: ts2, h: hExp, fh: hExp.ToFloat()}, } it := c.Iterator(nil) var act []result @@ -463,11 +469,12 @@ func TestAtFloatHistogram(t *testing.T) { NegativeBuckets: []float64{1, 2, 1, 2, 2, 2, 2}, }, { - Schema: 0, - Count: 36, - Sum: 2345.6, - ZeroThreshold: 0.001, - ZeroCount: 5, + CounterResetHint: histogram.NotCounterReset, + Schema: 0, + Count: 36, + Sum: 2345.6, + ZeroThreshold: 0.001, + ZeroCount: 5, PositiveSpans: []histogram.Span{ {Offset: 0, Length: 4}, {Offset: 0, Length: 0}, @@ -482,11 +489,12 @@ func TestAtFloatHistogram(t *testing.T) { NegativeBuckets: []float64{1, 4, 2, 7, 5, 5, 2}, }, { - Schema: 0, - Count: 36, - Sum: 1111.1, - ZeroThreshold: 0.001, - ZeroCount: 5, + CounterResetHint: histogram.NotCounterReset, + Schema: 0, + Count: 36, + Sum: 1111.1, + ZeroThreshold: 0.001, + ZeroCount: 5, PositiveSpans: []histogram.Span{ {Offset: 0, Length: 4}, {Offset: 0, Length: 0}, diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 58238bea51..a3b99f87a0 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -1308,17 +1308,31 @@ func TestHeadCompactionWithHistograms(t *testing.T) { minute := func(m int) int64 { return int64(m) * time.Minute.Milliseconds() } ctx := context.Background() - appendHistogram := func(lbls labels.Labels, from, to int, h *histogram.Histogram, exp *[]tsdbutil.Sample) { + appendHistogram := func( + lbls labels.Labels, from, to int, h *histogram.Histogram, exp *[]tsdbutil.Sample, + ) { t.Helper() app := head.Appender(ctx) for tsMinute := from; tsMinute <= to; tsMinute++ { var err error if floatTest { _, err = app.AppendHistogram(0, lbls, minute(tsMinute), nil, h.ToFloat()) - *exp = append(*exp, sample{t: minute(tsMinute), fh: h.ToFloat()}) + efh := h.ToFloat() + if tsMinute == from { + efh.CounterResetHint = histogram.UnknownCounterReset + } else { + efh.CounterResetHint = histogram.NotCounterReset + } + *exp = append(*exp, sample{t: minute(tsMinute), fh: efh}) } else { _, err = app.AppendHistogram(0, lbls, minute(tsMinute), h, nil) - *exp = append(*exp, sample{t: minute(tsMinute), h: h.Copy()}) + eh := h.Copy() + if tsMinute == from { + eh.CounterResetHint = histogram.UnknownCounterReset + } else { + eh.CounterResetHint = histogram.NotCounterReset + } + *exp = append(*exp, sample{t: minute(tsMinute), h: eh}) } require.NoError(t, err) } diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 60188a7bd6..9e5623bea7 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -5836,16 +5836,23 @@ func testHistogramAppendAndQueryHelper(t *testing.T, floatHistogram bool) { }) ctx := context.Background() - appendHistogram := func(lbls labels.Labels, tsMinute int, h *histogram.Histogram, exp *[]tsdbutil.Sample) { + appendHistogram := func( + lbls labels.Labels, tsMinute int, h *histogram.Histogram, + exp *[]tsdbutil.Sample, expCRH histogram.CounterResetHint, + ) { t.Helper() var err error app := db.Appender(ctx) if floatHistogram { _, err = app.AppendHistogram(0, lbls, minute(tsMinute), nil, h.ToFloat()) - *exp = append(*exp, sample{t: minute(tsMinute), fh: h.ToFloat()}) + efh := h.ToFloat() + efh.CounterResetHint = expCRH + *exp = append(*exp, sample{t: minute(tsMinute), fh: efh}) } else { _, err = app.AppendHistogram(0, lbls, minute(tsMinute), h.Copy(), nil) - *exp = append(*exp, sample{t: minute(tsMinute), h: h.Copy()}) + eh := h.Copy() + eh.CounterResetHint = expCRH + *exp = append(*exp, sample{t: minute(tsMinute), h: eh}) } require.NoError(t, err) require.NoError(t, app.Commit()) @@ -5897,23 +5904,23 @@ func testHistogramAppendAndQueryHelper(t *testing.T, floatHistogram bool) { t.Run("series with only histograms", func(t *testing.T) { h := baseH.Copy() // This is shared across all sub tests. - appendHistogram(series1, 100, h, &exp1) + appendHistogram(series1, 100, h, &exp1, histogram.UnknownCounterReset) testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) h.PositiveBuckets[0]++ h.NegativeBuckets[0] += 2 h.Count += 10 - appendHistogram(series1, 101, h, &exp1) + appendHistogram(series1, 101, h, &exp1, histogram.NotCounterReset) testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) t.Run("changing schema", func(t *testing.T) { h.Schema = 2 - appendHistogram(series1, 102, h, &exp1) + appendHistogram(series1, 102, h, &exp1, histogram.UnknownCounterReset) testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) // Schema back to old. h.Schema = 1 - appendHistogram(series1, 103, h, &exp1) + appendHistogram(series1, 103, h, &exp1, histogram.UnknownCounterReset) testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) }) @@ -5942,7 +5949,7 @@ func testHistogramAppendAndQueryHelper(t *testing.T, floatHistogram bool) { h.PositiveSpans[1].Length++ h.PositiveBuckets = append(h.PositiveBuckets, 1) h.Count += 3 - appendHistogram(series1, 104, h, &exp1) + appendHistogram(series1, 104, h, &exp1, histogram.NotCounterReset) testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) // Because of the previous two histograms being on the active chunk, @@ -5980,14 +5987,14 @@ func testHistogramAppendAndQueryHelper(t *testing.T, floatHistogram bool) { h.Count += 3 // {2, 1, -1, 0, 1} -> {2, 1, 0, -1, 0, 1} h.PositiveBuckets = append(h.PositiveBuckets[:2], append([]int64{0}, h.PositiveBuckets[2:]...)...) - appendHistogram(series1, 105, h, &exp1) + appendHistogram(series1, 105, h, &exp1, histogram.NotCounterReset) testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) // We add 4 more histograms to clear out the buffer and see the re-encoded histograms. - appendHistogram(series1, 106, h, &exp1) - appendHistogram(series1, 107, h, &exp1) - appendHistogram(series1, 108, h, &exp1) - appendHistogram(series1, 109, h, &exp1) + appendHistogram(series1, 106, h, &exp1, histogram.NotCounterReset) + appendHistogram(series1, 107, h, &exp1, histogram.NotCounterReset) + appendHistogram(series1, 108, h, &exp1, histogram.NotCounterReset) + appendHistogram(series1, 109, h, &exp1, histogram.NotCounterReset) // Update the expected histograms to reflect the re-encoding. if floatHistogram { @@ -6020,7 +6027,7 @@ func testHistogramAppendAndQueryHelper(t *testing.T, floatHistogram bool) { t.Run("buckets disappearing", func(t *testing.T) { h.PositiveSpans[1].Length-- h.PositiveBuckets = h.PositiveBuckets[:len(h.PositiveBuckets)-1] - appendHistogram(series1, 110, h, &exp1) + appendHistogram(series1, 110, h, &exp1, histogram.CounterReset) testQuery("foo", "bar1", map[string][]tsdbutil.Sample{series1.String(): exp1}) }) }) @@ -6032,9 +6039,9 @@ func testHistogramAppendAndQueryHelper(t *testing.T, floatHistogram bool) { testQuery("foo", "bar2", map[string][]tsdbutil.Sample{series2.String(): exp2}) h := baseH.Copy() - appendHistogram(series2, 103, h, &exp2) - appendHistogram(series2, 104, h, &exp2) - appendHistogram(series2, 105, h, &exp2) + appendHistogram(series2, 103, h, &exp2, histogram.UnknownCounterReset) + appendHistogram(series2, 104, h, &exp2, histogram.NotCounterReset) + appendHistogram(series2, 105, h, &exp2, histogram.NotCounterReset) testQuery("foo", "bar2", map[string][]tsdbutil.Sample{series2.String(): exp2}) // Switching between float and histograms again. @@ -6042,16 +6049,16 @@ func testHistogramAppendAndQueryHelper(t *testing.T, floatHistogram bool) { appendFloat(series2, 107, 107, &exp2) testQuery("foo", "bar2", map[string][]tsdbutil.Sample{series2.String(): exp2}) - appendHistogram(series2, 108, h, &exp2) - appendHistogram(series2, 109, h, &exp2) + appendHistogram(series2, 108, h, &exp2, histogram.UnknownCounterReset) + appendHistogram(series2, 109, h, &exp2, histogram.NotCounterReset) testQuery("foo", "bar2", map[string][]tsdbutil.Sample{series2.String(): exp2}) }) t.Run("series starting with histogram and then getting float", func(t *testing.T) { h := baseH.Copy() - appendHistogram(series3, 101, h, &exp3) - appendHistogram(series3, 102, h, &exp3) - appendHistogram(series3, 103, h, &exp3) + appendHistogram(series3, 101, h, &exp3, histogram.UnknownCounterReset) + appendHistogram(series3, 102, h, &exp3, histogram.NotCounterReset) + appendHistogram(series3, 103, h, &exp3, histogram.NotCounterReset) testQuery("foo", "bar3", map[string][]tsdbutil.Sample{series3.String(): exp3}) appendFloat(series3, 104, 100, &exp3) @@ -6060,8 +6067,8 @@ func testHistogramAppendAndQueryHelper(t *testing.T, floatHistogram bool) { testQuery("foo", "bar3", map[string][]tsdbutil.Sample{series3.String(): exp3}) // Switching between histogram and float again. - appendHistogram(series3, 107, h, &exp3) - appendHistogram(series3, 108, h, &exp3) + appendHistogram(series3, 107, h, &exp3, histogram.UnknownCounterReset) + appendHistogram(series3, 108, h, &exp3, histogram.NotCounterReset) testQuery("foo", "bar3", map[string][]tsdbutil.Sample{series3.String(): exp3}) appendFloat(series3, 109, 106, &exp3) @@ -6091,7 +6098,7 @@ func TestQueryHistogramFromBlocksWithCompaction(t *testing.T) { t.Helper() opts := DefaultOptions() - opts.AllowOverlappingCompaction = true // TODO(jesus.vazquez) This replaced AllowOverlappingBlocks, make sure that works + opts.AllowOverlappingCompaction = true // TODO(jesusvazquez): This replaced AllowOverlappingBlocks, make sure that works. db := openTestDB(t, opts, nil) t.Cleanup(func() { require.NoError(t, db.Close()) @@ -6137,7 +6144,7 @@ func TestQueryHistogramFromBlocksWithCompaction(t *testing.T) { q, err := db.Querier(ctx, math.MinInt64, math.MaxInt64) require.NoError(t, err) res := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) - require.Equal(t, exp, res) + compareSeries(t, exp, res) // Compact all the blocks together and query again. blocks := db.Blocks() @@ -6154,10 +6161,23 @@ func TestQueryHistogramFromBlocksWithCompaction(t *testing.T) { q, err = db.Querier(ctx, math.MinInt64, math.MaxInt64) require.NoError(t, err) res = query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) - require.Equal(t, exp, res) + + // After compaction, we do not require "unknown" counter resets + // due to origin from different overlapping chunks anymore. + for _, ss := range exp { + for i, s := range ss[1:] { + if s.H() != nil && ss[i].H() != nil && s.H().CounterResetHint == histogram.UnknownCounterReset { + s.H().CounterResetHint = histogram.NotCounterReset + } + if s.FH() != nil && ss[i].FH() != nil && s.FH().CounterResetHint == histogram.UnknownCounterReset { + s.FH().CounterResetHint = histogram.NotCounterReset + } + } + } + compareSeries(t, exp, res) } - for _, floatHistogram := range []bool{true} { + for _, floatHistogram := range []bool{false, true} { t.Run(fmt.Sprintf("floatHistogram=%t", floatHistogram), func(t *testing.T) { t.Run("serial blocks with only histograms", func(t *testing.T) { testBlockQuerying(t, @@ -6272,3 +6292,45 @@ func TestNativeHistogramFlag(t *testing.T) { l.String(): {sample{t: 200, h: h}, sample{t: 205, fh: h.ToFloat()}}, }, act) } + +// compareSeries essentially replaces `require.Equal(t, expected, actual) in +// situations where the actual series might contain more counter reset hints +// "unknown" than the expected series. This can easily happen for long series +// that trigger new chunks. This function therefore tolerates counter reset +// hints "CounterReset" and "NotCounterReset" in an expected series where the +// actual series contains a counter reset hint "UnknownCounterReset". +// "GaugeType" hints are still strictly checked, and any "UnknownCounterReset" +// in an expected series has to be matched precisely by the actual series. +func compareSeries(t require.TestingT, expected, actual map[string][]tsdbutil.Sample) { + if len(expected) != len(actual) { + // The reason for the difference is not the counter reset hints + // (alone), so let's use the pretty diffing by the require + // package. + require.Equal(t, expected, actual, "number of series differs") + } + for key, eSamples := range expected { + aSamples, ok := actual[key] + if !ok { + require.Equal(t, expected, actual, "expected series %q not found", key) + } + if len(eSamples) != len(aSamples) { + require.Equal(t, eSamples, aSamples, "number of samples for series %q differs", key) + } + for i, eS := range eSamples { + aS := aSamples[i] + aH, eH := aS.H(), eS.H() + aFH, eFH := aS.FH(), eS.FH() + switch { + case aH != nil && eH != nil && aH.CounterResetHint == histogram.UnknownCounterReset && eH.CounterResetHint != histogram.GaugeType: + eH = eH.Copy() + eH.CounterResetHint = histogram.UnknownCounterReset + eS = sample{t: eS.T(), h: eH} + case aFH != nil && eFH != nil && aFH.CounterResetHint == histogram.UnknownCounterReset && eFH.CounterResetHint != histogram.GaugeType: + eFH = eFH.Copy() + eFH.CounterResetHint = histogram.UnknownCounterReset + eS = sample{t: eS.T(), fh: eFH} + } + require.Equal(t, eS, aS, "sample %d in series %q differs", i, key) + } + } +} diff --git a/tsdb/head.go b/tsdb/head.go index 6432cd891b..1ef88be366 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -2040,7 +2040,7 @@ func (h *Head) updateWALReplayStatusRead(current int) { func GenerateTestHistograms(n int) (r []*histogram.Histogram) { for i := 0; i < n; i++ { - r = append(r, &histogram.Histogram{ + h := histogram.Histogram{ Count: 10 + uint64(i*8), ZeroCount: 2 + uint64(i), ZeroThreshold: 0.001, @@ -2056,9 +2056,12 @@ func GenerateTestHistograms(n int) (r []*histogram.Histogram) { {Offset: 1, Length: 2}, }, NegativeBuckets: []int64{int64(i + 1), 1, -1, 0}, - }) + } + if i > 0 { + h.CounterResetHint = histogram.NotCounterReset + } + r = append(r, &h) } - return r } @@ -2084,13 +2087,12 @@ func GenerateTestGaugeHistograms(n int) (r []*histogram.Histogram) { NegativeBuckets: []int64{int64(i + 1), 1, -1, 0}, }) } - return r } func GenerateTestFloatHistograms(n int) (r []*histogram.FloatHistogram) { for i := 0; i < n; i++ { - r = append(r, &histogram.FloatHistogram{ + h := histogram.FloatHistogram{ Count: 10 + float64(i*8), ZeroCount: 2 + float64(i), ZeroThreshold: 0.001, @@ -2106,7 +2108,11 @@ func GenerateTestFloatHistograms(n int) (r []*histogram.FloatHistogram) { {Offset: 1, Length: 2}, }, NegativeBuckets: []float64{float64(i + 1), float64(i + 2), float64(i + 1), float64(i + 1)}, - }) + } + if i > 0 { + h.CounterResetHint = histogram.NotCounterReset + } + r = append(r, &h) } return r diff --git a/tsdb/head_append.go b/tsdb/head_append.go index d4534c6695..33cfc0eb3e 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -1141,7 +1141,6 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper // appendHistogram adds the histogram. // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. -// TODO(codesome): Support gauge histograms here. func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64) (sampleInOrder, chunkCreated bool) { // Head controls the execution of recoding, so that we own the proper // chunk reference afterwards. We check for Appendable from appender before @@ -1150,44 +1149,54 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui // meta properly. app, _ := s.app.(*chunkenc.HistogramAppender) var ( - positiveInterjections, negativeInterjections []chunkenc.Interjection - pBackwardInter, nBackwardInter []chunkenc.Interjection - pMergedSpans, nMergedSpans []histogram.Span - okToAppend, counterReset bool + pForwardInserts, nForwardInserts []chunkenc.Insert + pBackwardInserts, nBackwardInserts []chunkenc.Insert + pMergedSpans, nMergedSpans []histogram.Span + okToAppend, counterReset, gauge bool ) c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncHistogram, chunkDiskMapper, chunkRange) if !sampleInOrder { return sampleInOrder, chunkCreated } - gauge := h.CounterResetHint == histogram.GaugeType - if app != nil { - if gauge { - positiveInterjections, negativeInterjections, pBackwardInter, nBackwardInter, pMergedSpans, nMergedSpans, okToAppend = app.AppendableGauge(h) - } else { - positiveInterjections, negativeInterjections, okToAppend, counterReset = app.Appendable(h) + switch h.CounterResetHint { + case histogram.GaugeType: + gauge = true + if app != nil { + pForwardInserts, nForwardInserts, + pBackwardInserts, nBackwardInserts, + pMergedSpans, nMergedSpans, + okToAppend = app.AppendableGauge(h) + } + case histogram.CounterReset: + // The caller tells us this is a counter reset, even if it + // doesn't look like one. + counterReset = true + default: + if app != nil { + pForwardInserts, nForwardInserts, okToAppend, counterReset = app.Appendable(h) } } if !chunkCreated { - if len(pBackwardInter)+len(nBackwardInter) > 0 { + if len(pBackwardInserts)+len(nBackwardInserts) > 0 { h.PositiveSpans = pMergedSpans h.NegativeSpans = nMergedSpans - app.RecodeHistogram(h, pBackwardInter, nBackwardInter) + app.RecodeHistogram(h, pBackwardInserts, nBackwardInserts) } // We have 3 cases here // - !okToAppend -> We need to cut a new chunk. - // - okToAppend but we have interjections → Existing chunk needs + // - okToAppend but we have inserts → Existing chunk needs // recoding before we can append our histogram. - // - okToAppend and no interjections → Chunk is ready to support our histogram. + // - okToAppend and no inserts → Chunk is ready to support our histogram. if !okToAppend || counterReset { c = s.cutNewHeadChunk(t, chunkenc.EncHistogram, chunkDiskMapper, chunkRange) chunkCreated = true - } else if len(positiveInterjections) > 0 || len(negativeInterjections) > 0 { + } else if len(pForwardInserts) > 0 || len(nForwardInserts) > 0 { // New buckets have appeared. We need to recode all // prior histogram samples within the chunk before we // can process this one. chunk, app := app.Recode( - positiveInterjections, negativeInterjections, + pForwardInserts, nForwardInserts, h.PositiveSpans, h.NegativeSpans, ) c.chunk = chunk @@ -1233,45 +1242,54 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, // meta properly. app, _ := s.app.(*chunkenc.FloatHistogramAppender) var ( - positiveInterjections, negativeInterjections []chunkenc.Interjection - pBackwardInter, nBackwardInter []chunkenc.Interjection - pMergedSpans, nMergedSpans []histogram.Span - okToAppend, counterReset bool + pForwardInserts, nForwardInserts []chunkenc.Insert + pBackwardInserts, nBackwardInserts []chunkenc.Insert + pMergedSpans, nMergedSpans []histogram.Span + okToAppend, counterReset, gauge bool ) c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncFloatHistogram, chunkDiskMapper, chunkRange) if !sampleInOrder { return sampleInOrder, chunkCreated } - gauge := fh.CounterResetHint == histogram.GaugeType - if app != nil { - if gauge { - positiveInterjections, negativeInterjections, pBackwardInter, nBackwardInter, - pMergedSpans, nMergedSpans, okToAppend = app.AppendableGauge(fh) - } else { - positiveInterjections, negativeInterjections, okToAppend, counterReset = app.Appendable(fh) + switch fh.CounterResetHint { + case histogram.GaugeType: + gauge = true + if app != nil { + pForwardInserts, nForwardInserts, + pBackwardInserts, nBackwardInserts, + pMergedSpans, nMergedSpans, + okToAppend = app.AppendableGauge(fh) + } + case histogram.CounterReset: + // The caller tells us this is a counter reset, even if it + // doesn't look like one. + counterReset = true + default: + if app != nil { + pForwardInserts, nForwardInserts, okToAppend, counterReset = app.Appendable(fh) } } if !chunkCreated { - if len(pBackwardInter)+len(nBackwardInter) > 0 { + if len(pBackwardInserts)+len(nBackwardInserts) > 0 { fh.PositiveSpans = pMergedSpans fh.NegativeSpans = nMergedSpans - app.RecodeHistogramm(fh, pBackwardInter, nBackwardInter) + app.RecodeHistogramm(fh, pBackwardInserts, nBackwardInserts) } // We have 3 cases here // - !okToAppend -> We need to cut a new chunk. - // - okToAppend but we have interjections → Existing chunk needs + // - okToAppend but we have inserts → Existing chunk needs // recoding before we can append our histogram. - // - okToAppend and no interjections → Chunk is ready to support our histogram. + // - okToAppend and no inserts → Chunk is ready to support our histogram. if !okToAppend || counterReset { c = s.cutNewHeadChunk(t, chunkenc.EncFloatHistogram, chunkDiskMapper, chunkRange) chunkCreated = true - } else if len(positiveInterjections) > 0 || len(negativeInterjections) > 0 { + } else if len(pForwardInserts) > 0 || len(nForwardInserts) > 0 { // New buckets have appeared. We need to recode all // prior histogram samples within the chunk before we // can process this one. chunk, app := app.Recode( - positiveInterjections, negativeInterjections, + pForwardInserts, nForwardInserts, fh.PositiveSpans, fh.NegativeSpans, ) c.chunk = chunk diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 2a1123db81..1130bbe190 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -197,7 +197,6 @@ func BenchmarkLoadWAL(b *testing.B) { continue } lastExemplarsPerSeries = exemplarsPerSeries - // fmt.Println("exemplars per series: ", exemplarsPerSeries) b.Run(fmt.Sprintf("batches=%d,seriesPerBatch=%d,samplesPerSeries=%d,exemplarsPerSeries=%d,mmappedChunkT=%d", c.batches, c.seriesPerBatch, c.samplesPerSeries, exemplarsPerSeries, c.mmappedChunkT), func(b *testing.B) { dir := b.TempDir() @@ -2834,17 +2833,13 @@ func TestAppendHistogram(t *testing.T) { ingestTs := int64(0) app := head.Appender(context.Background()) - type timedHistogram struct { - t int64 - h *histogram.Histogram - } - expHistograms := make([]timedHistogram, 0, 2*numHistograms) + expHistograms := make([]tsdbutil.Sample, 0, 2*numHistograms) // Counter integer histograms. for _, h := range GenerateTestHistograms(numHistograms) { _, err := app.AppendHistogram(0, l, ingestTs, h, nil) require.NoError(t, err) - expHistograms = append(expHistograms, timedHistogram{ingestTs, h}) + expHistograms = append(expHistograms, sample{t: ingestTs, h: h}) ingestTs++ if ingestTs%50 == 0 { require.NoError(t, app.Commit()) @@ -2856,7 +2851,7 @@ func TestAppendHistogram(t *testing.T) { for _, h := range GenerateTestGaugeHistograms(numHistograms) { _, err := app.AppendHistogram(0, l, ingestTs, h, nil) require.NoError(t, err) - expHistograms = append(expHistograms, timedHistogram{ingestTs, h}) + expHistograms = append(expHistograms, sample{t: ingestTs, h: h}) ingestTs++ if ingestTs%50 == 0 { require.NoError(t, app.Commit()) @@ -2864,17 +2859,13 @@ func TestAppendHistogram(t *testing.T) { } } - type timedFloatHistogram struct { - t int64 - h *histogram.FloatHistogram - } - expFloatHistograms := make([]timedFloatHistogram, 0, 2*numHistograms) + expFloatHistograms := make([]tsdbutil.Sample, 0, 2*numHistograms) // Counter float histograms. for _, fh := range GenerateTestFloatHistograms(numHistograms) { _, err := app.AppendHistogram(0, l, ingestTs, nil, fh) require.NoError(t, err) - expFloatHistograms = append(expFloatHistograms, timedFloatHistogram{ingestTs, fh}) + expFloatHistograms = append(expFloatHistograms, sample{t: ingestTs, fh: fh}) ingestTs++ if ingestTs%50 == 0 { require.NoError(t, app.Commit()) @@ -2886,7 +2877,7 @@ func TestAppendHistogram(t *testing.T) { for _, fh := range GenerateTestGaugeFloatHistograms(numHistograms) { _, err := app.AppendHistogram(0, l, ingestTs, nil, fh) require.NoError(t, err) - expFloatHistograms = append(expFloatHistograms, timedFloatHistogram{ingestTs, fh}) + expFloatHistograms = append(expFloatHistograms, sample{t: ingestTs, fh: fh}) ingestTs++ if ingestTs%50 == 0 { require.NoError(t, app.Commit()) @@ -2909,20 +2900,28 @@ func TestAppendHistogram(t *testing.T) { require.False(t, ss.Next()) it := s.Iterator(nil) - actHistograms := make([]timedHistogram, 0, len(expHistograms)) - actFloatHistograms := make([]timedFloatHistogram, 0, len(expFloatHistograms)) + actHistograms := make([]tsdbutil.Sample, 0, len(expHistograms)) + actFloatHistograms := make([]tsdbutil.Sample, 0, len(expFloatHistograms)) for typ := it.Next(); typ != chunkenc.ValNone; typ = it.Next() { if typ == chunkenc.ValHistogram { ts, h := it.AtHistogram() - actHistograms = append(actHistograms, timedHistogram{ts, h}) + actHistograms = append(actHistograms, sample{t: ts, h: h}) } else if typ == chunkenc.ValFloatHistogram { ts, fh := it.AtFloatHistogram() - actFloatHistograms = append(actFloatHistograms, timedFloatHistogram{ts, fh}) + actFloatHistograms = append(actFloatHistograms, sample{t: ts, fh: fh}) } } - require.Equal(t, expHistograms, actHistograms) - require.Equal(t, expFloatHistograms, actFloatHistograms) + compareSeries( + t, + map[string][]tsdbutil.Sample{"dummy": expHistograms}, + map[string][]tsdbutil.Sample{"dummy": actHistograms}, + ) + compareSeries( + t, + map[string][]tsdbutil.Sample{"dummy": expFloatHistograms}, + map[string][]tsdbutil.Sample{"dummy": actFloatHistograms}, + ) }) } } @@ -3019,7 +3018,12 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) { h.NegativeBuckets = h.PositiveBuckets _, err := app.AppendHistogram(0, s2, int64(ts), h, nil) require.NoError(t, err) - exp[k2] = append(exp[k2], sample{t: int64(ts), h: h.Copy()}) + eh := h.Copy() + if !gauge && ts > 30 && (ts-10)%20 == 1 { + // Need "unknown" hint after float sample. + eh.CounterResetHint = histogram.UnknownCounterReset + } + exp[k2] = append(exp[k2], sample{t: int64(ts), h: eh}) if ts%20 == 0 { require.NoError(t, app.Commit()) app = head.Appender(context.Background()) @@ -3051,7 +3055,12 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) { h.NegativeBuckets = h.PositiveBuckets _, err := app.AppendHistogram(0, s2, int64(ts), nil, h) require.NoError(t, err) - exp[k2] = append(exp[k2], sample{t: int64(ts), fh: h.Copy()}) + eh := h.Copy() + if !gauge && ts > 30 && (ts-10)%20 == 1 { + // Need "unknown" hint after float sample. + eh.CounterResetHint = histogram.UnknownCounterReset + } + exp[k2] = append(exp[k2], sample{t: int64(ts), fh: eh}) if ts%20 == 0 { require.NoError(t, app.Commit()) app = head.Appender(context.Background()) @@ -3089,7 +3098,7 @@ func TestHistogramInWALAndMmapChunk(t *testing.T) { q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime()) require.NoError(t, err) act := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "a", "b.*")) - require.Equal(t, exp, act) + compareSeries(t, exp, act) } testQuery() @@ -3506,6 +3515,11 @@ func testHistogramStaleSampleHelper(t *testing.T, floatHistogram bool) { ah.fh.Sum = 0 eh.fh = eh.fh.Copy() eh.fh.Sum = 0 + } else if i > 0 { + prev := expHistograms[i-1] + if prev.fh == nil || value.IsStaleNaN(prev.fh.Sum) { + eh.fh.CounterResetHint = histogram.UnknownCounterReset + } } require.Equal(t, eh, ah) } else { @@ -3516,6 +3530,11 @@ func testHistogramStaleSampleHelper(t *testing.T, floatHistogram bool) { ah.h.Sum = 0 eh.h = eh.h.Copy() eh.h.Sum = 0 + } else if i > 0 { + prev := expHistograms[i-1] + if prev.h == nil || value.IsStaleNaN(prev.h.Sum) { + eh.h.CounterResetHint = histogram.UnknownCounterReset + } } require.Equal(t, eh, ah) } @@ -3730,8 +3749,10 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { // If this is empty, samples above will be taken instead of this. addToExp []tsdbutil.Sample }{ + // Histograms that end up in the expected samples are copied here so that we + // can independently set the CounterResetHint later. { - samples: []tsdbutil.Sample{sample{t: 100, h: hists[1]}}, + samples: []tsdbutil.Sample{sample{t: 100, h: hists[0].Copy()}}, expChunks: 1, }, { @@ -3739,23 +3760,23 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { expChunks: 2, }, { - samples: []tsdbutil.Sample{sample{t: 210, fh: floatHists[1]}}, + samples: []tsdbutil.Sample{sample{t: 210, fh: floatHists[0].Copy()}}, expChunks: 3, }, { - samples: []tsdbutil.Sample{sample{t: 220, h: hists[1]}}, + samples: []tsdbutil.Sample{sample{t: 220, h: hists[1].Copy()}}, expChunks: 4, }, { - samples: []tsdbutil.Sample{sample{t: 230, fh: floatHists[3]}}, + samples: []tsdbutil.Sample{sample{t: 230, fh: floatHists[3].Copy()}}, expChunks: 5, }, { - samples: []tsdbutil.Sample{sample{t: 100, h: hists[2]}}, + samples: []tsdbutil.Sample{sample{t: 100, h: hists[2].Copy()}}, err: storage.ErrOutOfOrderSample, }, { - samples: []tsdbutil.Sample{sample{t: 300, h: hists[3]}}, + samples: []tsdbutil.Sample{sample{t: 300, h: hists[3].Copy()}}, expChunks: 6, }, { @@ -3763,7 +3784,7 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { err: storage.ErrOutOfOrderSample, }, { - samples: []tsdbutil.Sample{sample{t: 100, fh: floatHists[4]}}, + samples: []tsdbutil.Sample{sample{t: 100, fh: floatHists[4].Copy()}}, err: storage.ErrOutOfOrderSample, }, { @@ -3789,7 +3810,7 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { }, addToExp: []tsdbutil.Sample{ sample{t: 800, v: 8}, - sample{t: 900, h: hists[9]}, + sample{t: 900, h: hists[9].Copy()}, }, expChunks: 8, // float64 added to old chunk, only 1 new for histograms. }, @@ -3800,7 +3821,7 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { sample{t: 1100, h: hists[9]}, }, addToExp: []tsdbutil.Sample{ - sample{t: 1100, h: hists[9]}, + sample{t: 1100, h: hists[9].Copy()}, }, expChunks: 8, }, @@ -3830,6 +3851,14 @@ func TestAppendingDifferentEncodingToSameSeries(t *testing.T) { require.NoError(t, app.Rollback()) } } + for i, s := range expResult[1:] { + switch { + case s.H() != nil && expResult[i].H() == nil: + s.(sample).h.CounterResetHint = histogram.UnknownCounterReset + case s.FH() != nil && expResult[i].FH() == nil: + s.(sample).fh.CounterResetHint = histogram.UnknownCounterReset + } + } // Query back and expect same order of samples. q, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64)