diff --git a/tsdb/chunkenc/float_histogram_st.go b/tsdb/chunkenc/float_histogram_st.go index 8a0babc627..e83ca45618 100644 --- a/tsdb/chunkenc/float_histogram_st.go +++ b/tsdb/chunkenc/float_histogram_st.go @@ -193,6 +193,14 @@ func (a *FloatHistogramSTAppender) NumSamples() int { return int(binary.BigEndian.Uint16(a.b.bytes()) & histogramSTSampleCountMask) } +// setNumSamples writes the sample count into the low 14 bits of bytes 0-1, +// preserving the counter-reset header in the top 2 bits of byte 0. +func (a *FloatHistogramSTAppender) setNumSamples(num int) { + buf := a.b.bytes() + crBits := buf[0] & CounterResetHeaderMask + binary.BigEndian.PutUint16(buf, uint16(crBits)<<8|uint16(num)&histogramSTSampleCountMask) +} + func (a *FloatHistogramSTAppender) appendable(h *histogram.FloatHistogram) ( positiveInserts, negativeInserts []Insert, backwardPositiveInserts, backwardNegativeInserts []Insert, @@ -288,115 +296,17 @@ func (a *FloatHistogramSTAppender) appendableGauge(h *histogram.FloatHistogram) return positiveInserts, negativeInserts, backwardPositiveInserts, backwardNegativeInserts, positiveSpans, negativeSpans, okToAppend } -func (a *FloatHistogramSTAppender) appendFloatHistogram(t int64, h *histogram.FloatHistogram) { - var tDelta int64 - num := binary.BigEndian.Uint16(a.b.bytes()) & 0x3FFF - - if value.IsStaleNaN(h.Sum) { - // Emptying out other fields to write no buckets, and an empty - // layout in case of first histogram in the chunk. - h = &histogram.FloatHistogram{Sum: h.Sum} - } - - if num == 0 { - // The first append gets the privilege to dictate the layout - // but it's also responsible for encoding it into the chunk! - writeHistogramChunkLayout(a.b, h.Schema, h.ZeroThreshold, h.PositiveSpans, h.NegativeSpans, h.CustomValues) - a.schema = h.Schema - a.zThreshold = h.ZeroThreshold - - if len(h.PositiveSpans) > 0 { - a.pSpans = make([]histogram.Span, len(h.PositiveSpans)) - copy(a.pSpans, h.PositiveSpans) - } else { - a.pSpans = nil - } - if len(h.NegativeSpans) > 0 { - a.nSpans = make([]histogram.Span, len(h.NegativeSpans)) - copy(a.nSpans, h.NegativeSpans) - } else { - a.nSpans = nil - } - if len(h.CustomValues) > 0 { - a.customValues = make([]float64, len(h.CustomValues)) - copy(a.customValues, h.CustomValues) - } else { - a.customValues = nil - } - - numPBuckets, numNBuckets := countSpans(h.PositiveSpans), countSpans(h.NegativeSpans) - if numPBuckets > 0 { - a.pBuckets = make([]xorValue, numPBuckets) - for i := range numPBuckets { - a.pBuckets[i] = xorValue{ - value: h.PositiveBuckets[i], - leading: 0xff, - } - } - } else { - a.pBuckets = nil - } - if numNBuckets > 0 { - a.nBuckets = make([]xorValue, numNBuckets) - for i := range numNBuckets { - a.nBuckets[i] = xorValue{ - value: h.NegativeBuckets[i], - leading: 0xff, - } - } - } else { - a.nBuckets = nil - } - - // Now store the actual data. - putVarbitInt(a.b, t) - a.b.writeBits(math.Float64bits(h.Count), 64) - a.b.writeBits(math.Float64bits(h.ZeroCount), 64) - a.b.writeBits(math.Float64bits(h.Sum), 64) - a.cnt.value = h.Count - a.zCnt.value = h.ZeroCount - a.sum.value = h.Sum - for _, b := range h.PositiveBuckets { - a.b.writeBits(math.Float64bits(b), 64) - } - for _, b := range h.NegativeBuckets { - a.b.writeBits(math.Float64bits(b), 64) - } - } else { - // The case for the 2nd sample with single deltas is implicitly handled correctly with the double delta code, - // so we don't need a separate single delta logic for the 2nd sample. - tDelta = t - a.t - tDod := tDelta - a.tDelta - putVarbitInt(a.b, tDod) - - a.writeXorValue(&a.cnt, h.Count) - a.writeXorValue(&a.zCnt, h.ZeroCount) - a.writeXorValue(&a.sum, h.Sum) - - for i, b := range h.PositiveBuckets { - a.writeXorValue(&a.pBuckets[i], b) - } - for i, b := range h.NegativeBuckets { - a.writeXorValue(&a.nBuckets[i], b) - } - } - - // Write the incremented count back, preserving the counter-reset bits in - // the top 2 bits of byte 0. - buf := a.b.bytes() - crBits := buf[0] & CounterResetHeaderMask - binary.BigEndian.PutUint16(buf, (uint16(crBits)<<8)|(num+1)) - - a.t = t - a.tDelta = tDelta -} - // appendFloatHistogramST encodes a float histogram sample with start timestamp. -func (a *FloatHistogramSTAppender) appendFloatHistogramST(st, t int64, fh *histogram.FloatHistogram) { +// It delegates the sample encoding to the embedded base appendFloatHistogram +// and writes the ST encoding afterward. The base no longer touches the chunk +// header sample count: num is passed in and the new count (num+1) is returned, +// leaving the caller to persist it via setNumSamples (the ST override +// preserves the counter-reset bits in byte 0). +func (a *FloatHistogramSTAppender) appendFloatHistogramST(num int, st, t int64, fh *histogram.FloatHistogram) int { prevT := a.t - a.appendFloatHistogram(t, fh) - num := binary.BigEndian.Uint16(a.b.bytes()) & 0x3FFF - a.encode(a.b, num, a.t, prevT, st) + newNum := a.appendFloatHistogram(num, t, fh) + a.encode(a.b, uint16(newNum), a.t, prevT, st) + return newNum } // Append implements Appender. This implementation panics because normal float @@ -423,7 +333,7 @@ func (a *FloatHistogramSTAppender) AppendFloatHistogram(prev Appender, st, t int } if numSamples == 0 { - a.appendFloatHistogramST(st, t, fh) + a.setNumSamples(a.appendFloatHistogramST(numSamples, st, t, fh)) if fh.CounterResetHint == histogram.GaugeType { a.setCounterResetHeader(GaugeType) return nil, false, a, nil @@ -464,7 +374,7 @@ func (a *FloatHistogramSTAppender) AppendFloatHistogram(prev Appender, st, t int if counterReset { happ.setCounterResetHeader(CounterReset) } - happ.appendFloatHistogramST(st, t, fh) + happ.setNumSamples(happ.appendFloatHistogramST(0, st, t, fh)) return newChunk, false, app, nil } if len(pBackwardInserts) > 0 || len(nBackwardInserts) > 0 { @@ -483,14 +393,14 @@ func (a *FloatHistogramSTAppender) AppendFloatHistogram(prev Appender, st, t int if appendOnly { return nil, false, a, fmt.Errorf("float histogram layout change with %d positive and %d negative forwards inserts", len(pForwardInserts), len(nForwardInserts)) } - chk, app := a.recodeST( + chk, happ := a.recodeST( pForwardInserts, nForwardInserts, fh.PositiveSpans, fh.NegativeSpans, ) - app.(*FloatHistogramSTAppender).appendFloatHistogramST(st, t, fh) - return chk, true, app, nil + happ.setNumSamples(happ.appendFloatHistogramST(happ.NumSamples(), st, t, fh)) + return chk, true, happ, nil } - a.appendFloatHistogramST(st, t, fh) + a.setNumSamples(a.appendFloatHistogramST(numSamples, st, t, fh)) return nil, false, a, nil } @@ -507,7 +417,7 @@ func (a *FloatHistogramSTAppender) AppendFloatHistogram(prev Appender, st, t int } happ := app.(*FloatHistogramSTAppender) happ.setCounterResetHeader(GaugeType) - happ.appendFloatHistogramST(st, t, fh) + happ.setNumSamples(happ.appendFloatHistogramST(0, st, t, fh)) return newChunk, false, app, nil } @@ -524,15 +434,15 @@ func (a *FloatHistogramSTAppender) AppendFloatHistogram(prev Appender, st, t int if appendOnly { return nil, false, a, fmt.Errorf("float gauge histogram layout change with %d positive and %d negative forwards inserts", len(pForwardInserts), len(nForwardInserts)) } - chk, app := a.recodeST( + chk, happ := a.recodeST( pForwardInserts, nForwardInserts, fh.PositiveSpans, fh.NegativeSpans, ) - app.(*FloatHistogramSTAppender).appendFloatHistogramST(st, t, fh) - return chk, true, app, nil + happ.setNumSamples(happ.appendFloatHistogramST(happ.NumSamples(), st, t, fh)) + return chk, true, happ, nil } - a.appendFloatHistogramST(st, t, fh) + a.setNumSamples(a.appendFloatHistogramST(numSamples, st, t, fh)) return nil, false, a, nil } @@ -540,7 +450,7 @@ func (a *FloatHistogramSTAppender) AppendFloatHistogram(prev Appender, st, t int func (a *FloatHistogramSTAppender) recodeST( positiveInserts, negativeInserts []Insert, positiveSpans, negativeSpans []histogram.Span, -) (Chunk, Appender) { +) (Chunk, *FloatHistogramSTAppender) { byts := a.b.bytes() it := newFloatHistogramSTIterator(byts) hc := NewFloatHistogramSTChunk() @@ -551,6 +461,7 @@ func (a *FloatHistogramSTAppender) recodeST( happ := app.(*FloatHistogramSTAppender) numPositiveBuckets, numNegativeBuckets := countSpans(positiveSpans), countSpans(negativeSpans) + num := happ.NumSamples() for it.Next() == ValFloatHistogram { tOld, fhOld := it.AtFloatHistogram(nil) stOld := it.AtST() @@ -570,11 +481,12 @@ func (a *FloatHistogramSTAppender) recodeST( if len(negativeInserts) > 0 { fhOld.NegativeBuckets = insert(fhOld.NegativeBuckets, negativeBuckets, negativeInserts, false) } - happ.appendFloatHistogramST(stOld, tOld, fhOld) + num = happ.appendFloatHistogramST(num, stOld, tOld, fhOld) } + happ.setNumSamples(num) happ.setCounterResetHeader(CounterResetHeader(byts[0] & CounterResetHeaderMask)) - return hc, app + return hc, happ } // floatHistogramSTIterator is an iterator for FloatHistogramSTChunk that decodes ST after each sample. diff --git a/tsdb/chunkenc/histogram_st.go b/tsdb/chunkenc/histogram_st.go index a5c37cb780..b2e1d9154d 100644 --- a/tsdb/chunkenc/histogram_st.go +++ b/tsdb/chunkenc/histogram_st.go @@ -181,6 +181,14 @@ func (a *HistogramSTAppender) NumSamples() int { return int(binary.BigEndian.Uint16(a.b.bytes()) & histogramSTSampleCountMask) } +// setNumSamples writes the sample count into the low 14 bits of bytes 0-1, +// preserving the counter-reset header in the top 2 bits of byte 0. +func (a *HistogramSTAppender) setNumSamples(num int) { + buf := a.b.bytes() + crBits := buf[0] & CounterResetHeaderMask + binary.BigEndian.PutUint16(buf, uint16(crBits)<<8|uint16(num)&histogramSTSampleCountMask) +} + func (a *HistogramSTAppender) appendable(h *histogram.Histogram) ( positiveInserts, negativeInserts []Insert, backwardPositiveInserts, backwardNegativeInserts []Insert, @@ -279,131 +287,17 @@ func (a *HistogramSTAppender) appendableGauge(h *histogram.Histogram) ( return positiveInserts, negativeInserts, backwardPositiveInserts, backwardNegativeInserts, positiveSpans, negativeSpans, okToAppend } -func (a *HistogramSTAppender) appendHistogram(t int64, h *histogram.Histogram) { - var tDelta, cntDelta, zCntDelta int64 - num := binary.BigEndian.Uint16(a.b.bytes()) & 0x3FFF - - if value.IsStaleNaN(h.Sum) { - // Emptying out other fields to write no buckets, and an empty - // layout in case of first histogram in the chunk. - h = &histogram.Histogram{Sum: h.Sum} - } - - if num == 0 { - // The first append gets the privilege to dictate the layout - // but it's also responsible for encoding it into the chunk! - writeHistogramChunkLayout(a.b, h.Schema, h.ZeroThreshold, h.PositiveSpans, h.NegativeSpans, h.CustomValues) - a.schema = h.Schema - a.zThreshold = h.ZeroThreshold - - if len(h.PositiveSpans) > 0 { - a.pSpans = make([]histogram.Span, len(h.PositiveSpans)) - copy(a.pSpans, h.PositiveSpans) - } else { - a.pSpans = nil - } - if len(h.NegativeSpans) > 0 { - a.nSpans = make([]histogram.Span, len(h.NegativeSpans)) - copy(a.nSpans, h.NegativeSpans) - } else { - a.nSpans = nil - } - if len(h.CustomValues) > 0 { - a.customValues = make([]float64, len(h.CustomValues)) - copy(a.customValues, h.CustomValues) - } else { - a.customValues = nil - } - - numPBuckets, numNBuckets := countSpans(h.PositiveSpans), countSpans(h.NegativeSpans) - if numPBuckets > 0 { - a.pBuckets = make([]int64, numPBuckets) - a.pBucketsDelta = make([]int64, numPBuckets) - } else { - a.pBuckets = nil - a.pBucketsDelta = nil - } - if numNBuckets > 0 { - a.nBuckets = make([]int64, numNBuckets) - a.nBucketsDelta = make([]int64, numNBuckets) - } else { - a.nBuckets = nil - a.nBucketsDelta = nil - } - - // Now store the actual data. - putVarbitInt(a.b, t) - putVarbitUint(a.b, h.Count) - putVarbitUint(a.b, h.ZeroCount) - a.b.writeBits(math.Float64bits(h.Sum), 64) - for _, b := range h.PositiveBuckets { - putVarbitInt(a.b, b) - } - for _, b := range h.NegativeBuckets { - putVarbitInt(a.b, b) - } - } else { - // The case for the 2nd sample with single deltas is implicitly - // handled correctly with the double delta code, so we don't - // need a separate single delta logic for the 2nd sample. - - tDelta = t - a.t - cntDelta = int64(h.Count) - int64(a.cnt) - zCntDelta = int64(h.ZeroCount) - int64(a.zCnt) - - tDod := tDelta - a.tDelta - cntDod := cntDelta - a.cntDelta - zCntDod := zCntDelta - a.zCntDelta - - if value.IsStaleNaN(h.Sum) { - cntDod, zCntDod = 0, 0 - } - - putVarbitInt(a.b, tDod) - putVarbitInt(a.b, cntDod) - putVarbitInt(a.b, zCntDod) - - a.writeSumDelta(h.Sum) - - for i, b := range h.PositiveBuckets { - delta := b - a.pBuckets[i] - dod := delta - a.pBucketsDelta[i] - putVarbitInt(a.b, dod) - a.pBucketsDelta[i] = delta - } - for i, b := range h.NegativeBuckets { - delta := b - a.nBuckets[i] - dod := delta - a.nBucketsDelta[i] - putVarbitInt(a.b, dod) - a.nBucketsDelta[i] = delta - } - } - - // Write the incremented count back, preserving the counter-reset bits in - // the top 2 bits of byte 0. - buf := a.b.bytes() - crBits := buf[0] & CounterResetHeaderMask - binary.BigEndian.PutUint16(buf, (uint16(crBits)<<8)|(num+1)) - - a.t = t - a.cnt = h.Count - a.zCnt = h.ZeroCount - a.tDelta = tDelta - a.cntDelta = cntDelta - a.zCntDelta = zCntDelta - - copy(a.pBuckets, h.PositiveBuckets) - copy(a.nBuckets, h.NegativeBuckets) - // Note that the bucket deltas were already updated above. - a.sum = h.Sum -} - -// appendHistogramST encodes a histogram sample with start timestamp. -func (a *HistogramSTAppender) appendHistogramST(st, t int64, h *histogram.Histogram) { +// appendHistogramST encodes a histogram sample with start timestamp. It +// delegates the sample encoding to the embedded base appendHistogram and +// writes the ST encoding afterward. The base no longer touches the chunk +// header sample count: num is passed in and the new count (num+1) is +// returned, leaving the caller to persist it via setNumSamples (the ST +// override preserves the counter-reset bits in byte 0). +func (a *HistogramSTAppender) appendHistogramST(num int, st, t int64, h *histogram.Histogram) int { prevT := a.t - a.appendHistogram(t, h) - num := binary.BigEndian.Uint16(a.b.bytes()) & 0x3FFF - a.encode(a.b, num, a.t, prevT, st) + newNum := a.appendHistogram(num, t, h) + a.encode(a.b, uint16(newNum), a.t, prevT, st) + return newNum } func (*HistogramSTAppender) Append(int64, int64, float64) { @@ -426,7 +320,7 @@ func (a *HistogramSTAppender) AppendHistogram(prev Appender, st, t int64, h *his } if numSamples == 0 { - a.appendHistogramST(st, t, h) + a.setNumSamples(a.appendHistogramST(numSamples, st, t, h)) if h.CounterResetHint == histogram.GaugeType { a.setCounterResetHeader(GaugeType) return nil, false, a, nil @@ -461,7 +355,7 @@ func (a *HistogramSTAppender) AppendHistogram(prev Appender, st, t int64, h *his } happ := app.(*HistogramSTAppender) happ.setCounterResetHeader(counterResetHint) - happ.appendHistogramST(st, t, h) + happ.setNumSamples(happ.appendHistogramST(0, st, t, h)) return newChunk, false, app, nil } if len(pBackwardInserts) > 0 || len(nBackwardInserts) > 0 { @@ -480,14 +374,14 @@ func (a *HistogramSTAppender) AppendHistogram(prev Appender, st, t int64, h *his if appendOnly { return nil, false, a, fmt.Errorf("histogram layout change with %d positive and %d negative forwards inserts", len(pForwardInserts), len(nForwardInserts)) } - chk, app := a.recodeST( + chk, happ := a.recodeST( pForwardInserts, nForwardInserts, h.PositiveSpans, h.NegativeSpans, ) - app.(*HistogramSTAppender).appendHistogramST(st, t, h) - return chk, true, app, nil + happ.setNumSamples(happ.appendHistogramST(happ.NumSamples(), st, t, h)) + return chk, true, happ, nil } - a.appendHistogramST(st, t, h) + a.setNumSamples(a.appendHistogramST(numSamples, st, t, h)) return nil, false, a, nil } @@ -504,7 +398,7 @@ func (a *HistogramSTAppender) AppendHistogram(prev Appender, st, t int64, h *his } happ := app.(*HistogramSTAppender) happ.setCounterResetHeader(GaugeType) - happ.appendHistogramST(st, t, h) + happ.setNumSamples(happ.appendHistogramST(0, st, t, h)) return newChunk, false, app, nil } @@ -521,15 +415,15 @@ func (a *HistogramSTAppender) AppendHistogram(prev Appender, st, t int64, h *his if appendOnly { return nil, false, a, fmt.Errorf("gauge histogram layout change with %d positive and %d negative forwards inserts", len(pForwardInserts), len(nForwardInserts)) } - chk, app := a.recodeST( + chk, happ := a.recodeST( pForwardInserts, nForwardInserts, h.PositiveSpans, h.NegativeSpans, ) - app.(*HistogramSTAppender).appendHistogramST(st, t, h) - return chk, true, app, nil + happ.setNumSamples(happ.appendHistogramST(happ.NumSamples(), st, t, h)) + return chk, true, happ, nil } - a.appendHistogramST(st, t, h) + a.setNumSamples(a.appendHistogramST(numSamples, st, t, h)) return nil, false, a, nil } @@ -537,7 +431,7 @@ func (a *HistogramSTAppender) AppendHistogram(prev Appender, st, t int64, h *his func (a *HistogramSTAppender) recodeST( positiveInserts, negativeInserts []Insert, positiveSpans, negativeSpans []histogram.Span, -) (Chunk, Appender) { +) (Chunk, *HistogramSTAppender) { byts := a.b.bytes() it := newHistogramSTIterator(byts) hc := NewHistogramSTChunk() @@ -548,6 +442,7 @@ func (a *HistogramSTAppender) recodeST( happ := app.(*HistogramSTAppender) numPositiveBuckets, numNegativeBuckets := countSpans(positiveSpans), countSpans(negativeSpans) + num := happ.NumSamples() for it.Next() == ValHistogram { tOld, hOld := it.AtHistogram(nil) stOld := it.AtST() @@ -567,11 +462,12 @@ func (a *HistogramSTAppender) recodeST( if len(negativeInserts) > 0 { hOld.NegativeBuckets = insert(hOld.NegativeBuckets, negativeBuckets, negativeInserts, true) } - happ.appendHistogramST(stOld, tOld, hOld) + num = happ.appendHistogramST(num, stOld, tOld, hOld) } + happ.setNumSamples(num) happ.setCounterResetHeader(CounterResetHeader(byts[0] & CounterResetHeaderMask)) - return hc, app + return hc, happ } // histogramSTIterator is an iterator for HistogramSTChunk that decodes ST after each sample.